Source code for py21cmfast.wrapper.photoncons

"""
Module for the photon conservation models.

The excursion set reionisation model applied in ionize_box does not conserve photons.
As a result there is an offset between the expected global ionized fraction and the value
calculated from the ionized bubble maps. These models apply approximate corrections in order
to bring the bubble maps more in line with the expected global values.

The application of the model is controlled by the flag option PHOTON_CONS_TYPE, which
can take 4 values:

0. No correction is applied
1. We use the ionizing emissivity grids from a different redshift when calculating
   ionized bubble maps, this mapping from one redshift to another is obtained by
   performing a calibration simulation and measuring its redshift difference with the
   expected global evolution.
2. The power-law slope of the ionizing escape fraction is adjusted, using a fit
   ALPHA_ESC -> X + Y*Q(z), where Q is the expected global ionized fraction. This
   relation is fit by performing a calibration simulation as in (1), and comparing it to
   a range of expected global histories with different power-law slopes.
3. The normalisation of the ionizing escape fraction is adjusted, using a fit
   F_ESC10 -> X + Y*Q(z), where Q is the expected global ionized fraction. This relation
   is fit by performing a calibration simulation as in (1), and taking its ratio with
   the expected global evolution.

Notes
-----
The function map for the photon conservation model looks like::

    wrapper.run_lightcone/coeval()
        setup_photon_cons_correction()
            calibrate_photon_cons_correction()
                _init_photon_conservation_correction() --> computes and stores global evolution
                -->perfoms calibration simulation
                _calibrate_photon_conservation_correction() --> stores calibration evolution
            IF PHOTON_CONS_TYPE=='z-photoncons':
                lib.ComputeIonizedBox()
                    lib.adjust_redshifts_for_photoncons()
                        lib.determine_deltaz_for_photoncons() (first time only) --> calculates the deltaz array with some smoothing
                        --> does more smoothing and returns the adjusted redshift
            ELIF PHOTON_CONS_TYPE=='alpha-photoncons':
                photoncons_alpha() --> computes and stores ALPHA_ESC shift vs global neutral fraction
                lib.ComputeIonizedBox()
                    get_fesc_fit() --> applies the fit to the current redshift
            ELIF PHOTON_CONS_TYPE=='f-photoncons':
                photoncons_fesc() --> computes and stores F_ESC10 shift vs global neutral fraction
                lib.ComputeIonizedBox()
                    get_fesc_fit() --> applies the fit to the current redshift

"""

import logging

import attrs
import numpy as np
from scipy.optimize import curve_fit

from ..c_21cmfast import ffi, lib
from ._utils import _process_exitcode
from .cfuncs import broadcast_params
from .inputs import InputParameters
from .outputs import InitialConditions

[docs] logger = logging.getLogger(__name__)
# NOTE: if/when we move the photoncons model data to python we should use this # structure to hold z, xHI and parameter delta arrays @attrs.define(kw_only=True) class _PhotonConservationState: """Singleton class which contains the state of the photon-conservation model.""" calibration_inputs: InputParameters | None = None @property def c_memory_allocated(self) -> bool: """Whether the memory for the parameter shifts has been allocated in the backend.""" return lib.photon_cons_allocated @c_memory_allocated.setter def c_memory_allocated(self, val): lib.photon_cons_allocated = ffi.cast("bool", val) _photoncons_state = _PhotonConservationState() @broadcast_params def _init_photon_conservation_correction(*, inputs, **kwargs): # This function calculates the global expected evolution of reionisation and saves # it to C global arrays z_Q and Q_value (as well as other non-global confusingly named arrays), # and constructs a GSL interpolator z_at_Q_spline return lib.InitialisePhotonCons() def _calibrate_photon_conservation_correction( *, redshifts_estimate, nf_estimate, NSpline ): # This function passes the calibration simulation results to C, # Storing a clipped version in global arrays nf_vals and z_vals, # and constructing the GSL interpolator z_NFHistory_spline redshifts_estimate = np.array(redshifts_estimate, dtype="float64") nf_estimate = np.array(nf_estimate, dtype="float64") z = ffi.cast("double *", ffi.from_buffer(redshifts_estimate)) xHI = ffi.cast("double *", ffi.from_buffer(nf_estimate)) logger.debug(f"PhotonCons nf estimates: {nf_estimate}") return lib.PhotonCons_Calibration(z, xHI, NSpline) def _calc_zstart_photon_cons(): # gets the starting redshift of the z-based photon conservation model # Set by neutral fraction astro_params.PHOTONCONS_ZSTART from ._utils import _call_c_simple return _call_c_simple(lib.ComputeZstart_PhotonCons) def _get_photon_nonconservation_data() -> dict: """ Access C global data representing the photon-nonconservation corrections. .. note:: If photon conservation is switched off via PHOTON_CONS_TYPE='no-photoncons' or the initialisation for photon conservation has not been performed yet, this will return None. Returns ------- dict : z_analytic: array of redshifts defining the analytic ionized fraction Q_analytic: array of analytic ionized fractions corresponding to `z_analytic` z_calibration: array of redshifts defining the ionized fraction from 21cmFAST without recombinations nf_calibration: array of calibration ionized fractions corresponding to `z_calibration` delta_z_photon_cons: the change in redshift required to calibrate 21cmFAST, as a function of z_calibration nf_photoncons: the neutral fraction as a function of redshift """ # Check if photon conservation has been initialised at all if not _photoncons_state.c_memory_allocated: return {} arbitrary_large_size = 2000 data = np.zeros((6, arbitrary_large_size)) IntVal1 = np.array(np.zeros(1), dtype="int32") IntVal2 = np.array(np.zeros(1), dtype="int32") IntVal3 = np.array(np.zeros(1), dtype="int32") c_z_at_Q = ffi.cast("double *", ffi.from_buffer(data[0])) c_Qval = ffi.cast("double *", ffi.from_buffer(data[1])) c_z_cal = ffi.cast("double *", ffi.from_buffer(data[2])) c_nf_cal = ffi.cast("double *", ffi.from_buffer(data[3])) c_PC_nf = ffi.cast("double *", ffi.from_buffer(data[4])) c_PC_deltaz = ffi.cast("double *", ffi.from_buffer(data[5])) c_int_NQ = ffi.cast("int *", ffi.from_buffer(IntVal1)) c_int_NC = ffi.cast("int *", ffi.from_buffer(IntVal2)) c_int_NP = ffi.cast("int *", ffi.from_buffer(IntVal3)) # Run the C code errcode = lib.ObtainPhotonConsData( c_z_at_Q, c_Qval, c_int_NQ, c_z_cal, c_nf_cal, c_int_NC, c_PC_nf, c_PC_deltaz, c_int_NP, ) _process_exitcode(errcode, lib.ObtainPhotonConsData, ()) ArrayIndices = [ IntVal1[0], IntVal1[0], IntVal2[0], IntVal2[0], IntVal3[0], IntVal3[0], ] data_list = [ "z_analytic", "Q_analytic", "z_calibration", "nf_calibration", "nf_photoncons", "delta_z_photon_cons", ] return { name: d[:index] for name, d, index in zip(data_list, data, ArrayIndices, strict=True) }
[docs] def setup_photon_cons( initial_conditions: InitialConditions, inputs: InputParameters | None = None, **kwargs, ): r""" Set up the photon non-conservation correction. First performs a simplified calibration simulation and saves its neutral fraction history, to be matched to the analytic expression from solving the filling factor ODE. This matching can happen via a redshift adjustment, or an adjustment to the escape fraction power-law parameters Parameters ---------- initial_conditions : :class:`~InitialConditions`, The InitialConditions instance to use for the photonconservation calculation inputs : :class:`~InputParameters`, optional An InputParameters instance. If not given will taken from initial_conditions. Other Parameters ---------------- Any other parameters able to be passed to :func:`compute_initial_conditions`. """ if inputs is None: inputs = initial_conditions.inputs if inputs.astro_options.PHOTON_CONS_TYPE == "no-photoncons": return {} from ..drivers._param_config import check_consistency_of_outputs_with_inputs check_consistency_of_outputs_with_inputs(inputs, [initial_conditions]) # calculate global and calibration simulation xH histories and save them in C calibrate_photon_cons( inputs=inputs, initial_conditions=initial_conditions, **kwargs, ) _photoncons_state.calibration_inputs = inputs # The PHOTON_CONS_TYPE == 1 case is handled in C (for now....), but we get the data anyway if inputs.astro_options.PHOTON_CONS_TYPE == "z-photoncons": photoncons_data = _get_photon_nonconservation_data() if inputs.astro_options.PHOTON_CONS_TYPE == "alpha-photoncons": photoncons_data = photoncons_alpha(inputs, **kwargs) if inputs.astro_options.PHOTON_CONS_TYPE == "f-photoncons": photoncons_data = photoncons_fesc(inputs) return photoncons_data
[docs] def calibrate_photon_cons( inputs: InputParameters, initial_conditions: InitialConditions, **kwargs, ): r""" Perform a photon conservation calibration simulation. Parameters ---------- initial_conditions : :class:`~InitialConditions`, The InitialConditions instance to use for the photonconservation calculation inputs : :class:`~InputParameters`, An InputParameters instance. Other Parameters ---------------- See docs of :func:`compute_initial_conditions` for more information. """ # avoiding circular imports by importing here from ..drivers.single_field import compute_ionization_field, perturb_field # Create a new astro_params and astro_options just for the photon_cons correction # NOTE: Since the calibration cannot do INHOMO_RECO, we set the R_BUBBLE_MAX # to the default w/o recombinations ONLY when the original box has INHOMO_RECO enabled. # TODO: figure out if it's possible to find a "closest" Rmax, since the correction fails when # the histories are too different. # Using the halo sampling twice would be very slow, so switch to L-INTEGRAL for the calibration source_model_calibration = { "E-INTEGRAL": "E-INTEGRAL", "L-INTEGRAL": "L-INTEGRAL", "DEXM-ESF": "L-INTEGRAL", "CHMF-SAMPLER": "L-INTEGRAL", } inputs_calibration = inputs.evolve_input_structs( USE_TS_FLUCT=False, INHOMO_RECO=False, USE_MINI_HALOS=False, SOURCE_MODEL=source_model_calibration[inputs.matter_options.SOURCE_MODEL], PHOTON_CONS_TYPE="no-photoncons", R_BUBBLE_MAX=( 15 if inputs.astro_options.INHOMO_RECO else inputs.astro_params.R_BUBBLE_MAX ), ) ib = None prev_perturb = None # Arrays for redshift and neutral fraction for the calibration curve neutral_fraction_photon_cons = [] # Initialise the analytic expression for the reionisation history logger.info("About to start photon conservation correction") _init_photon_conservation_correction(inputs=inputs, **kwargs) # Determine the starting redshift to start scrolling through to create the # calibration reionisation history logger.info("Calculating photon conservation zstart") z = _calc_zstart_photon_cons() fast_node_redshifts = [z] # NOTE: Not checking any redshift consistency for the calibration run # Since the z-step is Q-dependent, we can't predict the redshifts inputs_calibration = inputs_calibration.clone(node_redshifts=None) while z > inputs.astro_params.PHOTONCONS_CALIBRATION_END: # Determine the ionisation box with recombinations, spin temperature etc. # turned off. this_perturb = perturb_field( redshift=z, inputs=inputs_calibration, initial_conditions=initial_conditions, **kwargs, ) ib2 = compute_ionization_field( inputs=inputs_calibration, previous_ionized_box=ib, initial_conditions=initial_conditions, perturbed_field=this_perturb, previous_perturbed_field=prev_perturb, **kwargs, ) mean_nf = np.mean(ib2.get("neutral_fraction")) # Save mean/global quantities neutral_fraction_photon_cons.append(mean_nf) # Can speed up sampling in regions where the evolution is slower if 0.3 < mean_nf <= 0.9: z -= 0.15 elif 0.01 < mean_nf <= 0.3: z -= 0.05 else: z -= 0.5 ib = ib2 if inputs.astro_options.USE_MINI_HALOS: prev_perturb = this_perturb fast_node_redshifts.append(z) fast_node_redshifts = np.array(fast_node_redshifts[::-1]) neutral_fraction_photon_cons = np.array(neutral_fraction_photon_cons[::-1]) # Construct the spline for the calibration curve logger.info("Calibrating photon conservation correction") _calibrate_photon_conservation_correction( redshifts_estimate=fast_node_redshifts, nf_estimate=neutral_fraction_photon_cons, NSpline=len(fast_node_redshifts), )
# (Jdavies): I needed a function to access the delta z from the wrapper # get_photoncons_data does not have the edge cases that adjust_redshifts_for_photoncons does @broadcast_params
[docs] def get_photoncons_dz(inputs, redshift, **kwargs): """Access the delta z arrays from the photon conservation model in C.""" deltaz = np.zeros(1).astype("f4") redshift_pc_in = np.array([redshift]).astype("f4") stored_redshift_pc_in = np.array([redshift]).astype("f4") lib.adjust_redshifts_for_photoncons( ffi.cast("float *", redshift_pc_in.ctypes.data), ffi.cast("float *", stored_redshift_pc_in.ctypes.data), ffi.cast("float *", deltaz.ctypes.data), ) return redshift_pc_in[0], stored_redshift_pc_in[0], deltaz[0]
# NOTE: alpha_func here MUST MATCH the C version TODO: remove one of them
[docs] def alpha_func(Q, a_const, a_slope): """Linear Function to fit in the simpler photon conservation model.""" return a_const + a_slope * Q
# (jdavies): this will be a very hacky way to make a (d_alphastar vs z) array # for a photoncons done by ALPHA_STAR instead of redshift # This will work by taking the calibration simulation, plotting a RANGE of analytic # Q vs z curves for different ALPHA_STAR, and then finding the aloha star which has the inverse ratio # with the reference analytic as the calibration # TODO: don't rely on the photoncons functions since they do a bunch of other stuff in C
[docs] def photoncons_alpha(inputs, **kwargs): """Run the Simpler photons conservation model using ALPHA_ESC. This adjusts the slope of the escape fraction instead of redshifts to match a global evolution. """ # HACK: I need to allocate the deltaz arrays so I can return the other ones properly, this isn't a great solution # TODO: Move the deltaz interp tables to python if not _photoncons_state.c_memory_allocated: lib.determine_deltaz_for_photoncons() _photoncons_state.c_memory_allocated = True # Q(analytic) limits to fit the curve max_q_fit = 0.99 min_q_fit = 0.2 ap_c = inputs.astro_params.cdict ref_pc_data = _get_photon_nonconservation_data() z = ref_pc_data["z_calibration"] alpha_arr = ( np.linspace(-2.0, 1.0, num=31) + ap_c["ALPHA_ESC"] ) # roughly -0.1 steps for an extended range of alpha test_pc_data = np.zeros((alpha_arr.size, ref_pc_data["z_calibration"].size)) # fit to the same z-array ref_interp = np.interp( ref_pc_data["z_calibration"], ref_pc_data["z_analytic"], ref_pc_data["Q_analytic"], ) for i, a in enumerate(alpha_arr): # alter astro params with new alpha inputs_photoncons = inputs.evolve_input_structs(ALPHA_ESC=a) # find the analytic curve wth that alpha # TODO: Theres a small memory leak here since global arrays are allocated (for some reason) # TODO: use ffi to free them? # This will be fixed by moving the photoncons to python _init_photon_conservation_correction(inputs=inputs_photoncons, **kwargs) # save it pcd_buf = _get_photon_nonconservation_data() # interpolate to the calibration redshifts test_pc_data[i, ...] = np.interp( ref_pc_data["z_calibration"], pcd_buf["z_analytic"], pcd_buf["Q_analytic"] ) # filling factors sometimes go above 1, this causes problems in late-time ratios # I want this in the test alphas to get the right photon ratio, but not in the reference analytic ref_interp[ref_interp > 1] = 1.0 # ratio of each alpha with calibration ratio_test = (test_pc_data) / ref_interp[None, ...] # ratio of given alpha with calibration ratio_ref = (1 - ref_pc_data["nf_calibration"]) / ref_interp ratio_diff = ratio_test - 1 / ratio_ref[None, :] # find N(alpha)/ref == ref/cal diff_test = ( (test_pc_data) + (1 - ref_pc_data["nf_calibration"])[None, ...] - 2 * ref_interp[None, ...] ) # find N(alpha) - ref == ref - cal reverse_test = ( test_pc_data - (1 - ref_pc_data["nf_calibration"])[None, ...] ) # find NF(alpha) == cal, then apply - alpha alpha_estimate_ratio = np.zeros_like(z) alpha_estimate_diff = np.zeros_like(z) alpha_estimate_reverse = np.zeros_like(z) # find the roots of each function roots_ratio_idx = np.where(np.diff(np.sign(ratio_diff), axis=0)) roots_diff_idx = np.where(np.diff(np.sign(diff_test), axis=0)) roots_reverse_idx = np.where(np.diff(np.sign(reverse_test), axis=0)) # find alpha estimates for arr_in, roots_arr, arr_out in zip( [ratio_diff, diff_test, reverse_test], [roots_ratio_idx, roots_diff_idx, roots_reverse_idx], [alpha_estimate_ratio, alpha_estimate_diff, alpha_estimate_reverse], strict=True, ): last_alpha = ap_c["ALPHA_ESC"] for i in range(z.size)[::-1]: # get the roots at this redshift roots_z = np.where(roots_arr[1] == i) # if there are no roots, assign nan if roots_z[0].size == 0: arr_out[i] = np.nan continue alpha_idx = roots_arr[0][roots_z] # interpolate y0 = arr_in[alpha_idx, i] y1 = arr_in[alpha_idx + 1, i] x0 = alpha_arr[alpha_idx] x1 = alpha_arr[alpha_idx + 1] guesses = -y0 * (x1 - x0) / (y1 - y0) + x0 # choose the root which gives the smoothest alpha vs z curve arr_out[i] = guesses[np.argmin(np.fabs(guesses - last_alpha))] last_alpha = arr_out[i] # initialise the output structure before the fits results = { "z_calibration": ref_pc_data["z_calibration"], "z_analytic": ref_pc_data["z_analytic"], "Q_analytic": ref_pc_data["Q_analytic"], "nf_photoncons": 1 - ref_interp, "Q_alpha": test_pc_data, "nf_calibration": ref_pc_data["nf_calibration"], "alpha_ratio": alpha_estimate_ratio, "alpha_diff": alpha_estimate_diff, "alpha_reverse": alpha_estimate_reverse, "alpha_arr": alpha_arr, "fit_yint": ap_c["ALPHA_ESC"], "fit_slope": 0, # start with no correction "found_alpha": False, } # adjust the reverse one (we found the alpha which is close to the calibration sim, undo it) alpha_estimate_reverse = 2 * ap_c["ALPHA_ESC"] - alpha_estimate_reverse # fit to the curve # make sure there's an estimate and Q isn't too high/low fit_alpha = alpha_estimate_ratio sel = np.isfinite(fit_alpha) & (ref_interp < max_q_fit) & (ref_interp > min_q_fit) # if there are no alpha roots found, it's likely this is a strange reionisation history # but we can't apply the alpha correction so throw an error # if we can't fit due to not enough ionisation, catch that here if ref_interp.max() < min_q_fit: results["fit_yint"] = last_alpha logger.warning( f"These parameters result in little ionisation, running with flat alpha correction {last_alpha}" ) elif np.count_nonzero(sel) == 1: results["fit_yint"] = last_alpha logger.warning( f"ONLY ONE REDSHIFT HAD AN ALPHA FIT WITHIN THE RANGE, running with flat alpha correction {last_alpha}" ) # here there are no fits, meaning we likely have a strange reionisaiton history where the photon conservation is worse than alpha can correct elif np.count_nonzero(sel) == 0: logger.warning( "No alpha within the range can fit the global history for any z, running without alpha correction" ) logger.warning( "THIS REIONISAITON HISTORY IS LIKELY HIGHLY NON-CONSERVING OF PHOTONS" ) else: popt, _pcov = curve_fit(alpha_func, ref_interp[sel], fit_alpha[sel]) # pass to C logger.info(f"ALPHA_ESC Original = {ap_c['ALPHA_ESC']:.3f}") logger.info(f"Running with ALPHA_ESC = {popt[0]:.2f} + {popt[1]:.2f} * Q") results["fit_yint"] = popt[0] results["fit_slope"] = popt[1] results["found_alpha"] = True lib.set_alphacons_params(results["fit_yint"], results["fit_slope"]) return results
[docs] def photoncons_fesc(inputs): """Run the Even Simpler photon conservation model using F_ESC10. Adjusts the normalisation of the escape fraction to match a global evolution. """ # HACK: I need to allocate the deltaz arrays so I can return the other ones properly, this isn't a great solution if not _photoncons_state.c_memory_allocated: lib.determine_deltaz_for_photoncons() _photoncons_state.c_memory_allocated = True # Q(analytic) limits to fit the curve max_q_fit = 0.99 min_q_fit = 0.2 ref_pc_data = _get_photon_nonconservation_data() # fit to the same z-array ref_interp = np.interp( ref_pc_data["z_calibration"], ref_pc_data["z_analytic"], ref_pc_data["Q_analytic"], ) ap_c = inputs.astro_params.cdict # filling factors sometimes go above 1, this causes problems in late-time ratios # I want this in the test alphas to get the right photon ratio, but not in the reference analytic ref_interp[ref_interp > 1] = 1.0 # ratio of each alpha with calibration ratio_ref = ref_interp / (1 - ref_pc_data["nf_calibration"]) fit_fesc = ratio_ref * ap_c["F_ESC10"] sel = np.isfinite(fit_fesc) & (ref_interp < max_q_fit) & (ref_interp > min_q_fit) popt, _pcov = curve_fit(alpha_func, ref_interp[sel], fit_fesc[sel]) # pass to C logger.info(f"F_ESC10 Original = {ap_c['F_ESC10']:.3f}") logger.info(f"Running with F_ESC10 = {popt[0]:.2f} + {popt[1]:.2f} * Q") # initialise the output structure before the fits results = { "z_calibration": ref_pc_data["z_calibration"], "z_analytic": ref_pc_data["z_analytic"], "Q_analytic": ref_pc_data["Q_analytic"], "nf_calibration": ref_pc_data["nf_calibration"], "nf_photoncons": 1 - ref_interp, "Q_ratio": ratio_ref, "fesc_target": fit_fesc, "fit_yint": popt[0], "fit_slope": popt[1], # start with no correction } lib.set_alphacons_params(results["fit_yint"], results["fit_slope"]) return results