Source code for py21cmfast.io.caching

"""Module to deal with the cache.

The module has a manager that essentially establishes a database of cached files,
and provides methods to handle the caching of output data (i.e. determining the
filename for a given set of parameters).
"""

import logging
import re
from hashlib import md5
from pathlib import Path
from typing import ClassVar, Self

import attrs
import numpy as np

from .._cfg import config
from ..wrapper import outputs as op
from ..wrapper.inputs import InputParameters
from ..wrapper.outputs import (
    _ALL_OUTPUT_STRUCTS,
    OutputStruct,
    _HashType,
)
from .h5 import read_inputs, read_output_struct, write_output_to_hdf5

[docs] logger = logging.getLogger(__name__)
@attrs.define(frozen=True) class OutputCache: """An object that manages cache files from 21cmFAST simulations. This object has a single attribute -- the top-level directory of the cache. This directory can be anywhere on disk. A number of methods exist on the object to interact with the cache, including finding existing cache files for a particular OutputStruct, writing/reading an OutputStruct to/from the cache, and listing existing datasets. The cache is meant for single-field OutputStruct objects, not "collections" of outputs in an evolved universe (like Coeval or Lightcone objects). """ direc: Path = attrs.field( default=Path(config["direc"]).expanduser(), converter=Path ) _output_to_cache_map: ClassVar = { kls.__name__: kls._compat_hash for kls in _ALL_OUTPUT_STRUCTS.values() if not kls._meta } _path_structures: ClassVar = { _HashType.user_cosmo: "{matter_cosmo}/{seed}/InitialConditions.h5", _HashType.zgrid: "{matter_cosmo}/{seed}/{zgrid}/{redshift}/{cls}.h5", _HashType.full: "{matter_cosmo}/{seed}/{zgrid}/{redshift}/{astro_flag}/{cls}.h5", } @classmethod def _get_hashes(cls, inputs: InputParameters) -> dict[str, str]: """Return a dict of hashes for different components of the calculation.""" # Python builtin hashes can be negative which looks weird in filenames return { "matter_cosmo": md5( ( repr(inputs.cosmo_params) + repr(inputs.simulation_options) + repr(inputs.matter_options) ).encode() ).hexdigest(), "seed": inputs.random_seed, "zgrid": md5(repr(inputs.node_redshifts).encode()).hexdigest(), "astro_flag": md5( (repr(inputs.astro_params) + repr(inputs.astro_options)).encode() ).hexdigest(), } @classmethod def _fill_path_template( cls, *, kind: str | None = None, inputs: InputParameters | None = None, all_seeds: bool = False, redshift: float | None = None, ) -> str: """Fill the path templates with given values. Does the conditional formatting required for each field, since we don't do number formatting for the wildcards. Returns the filled template path with optional wildcards for searching. """ # get the hashes if inputs is not None: hashes = cls._get_hashes(inputs) # format required hashes to string hashes["seed"] = r"\d+" if all_seeds else f"{hashes['seed']:d}" hashes["matter_cosmo"] = f"{hashes['matter_cosmo']}" hashes["zgrid"] = f"{hashes['zgrid']}" hashes["astro_flag"] = f"{hashes['astro_flag']}" else: hashes = { "matter_cosmo": ".+?", "seed": r"\d+", "zgrid": ".+?", "astro_flag": ".+?", } # do the conditional formatting hashes["redshift"] = f"{redshift:.4f}" if redshift is not None else ".+?" hashes["cls"] = kind if kind in cls._output_to_cache_map else ".+?" # precedence: outputclass mapped (class name -> template) > template provided as _HashType (template directly) > full astro path path_template = cls._output_to_cache_map.get(kind, kind) template = cls._path_structures.get( path_template, cls._path_structures[_HashType.full] ) template = template.format(**hashes) return template def get_filename(self, obj: OutputStruct) -> str: """ Generate a filename for a given OutputStruct object based on its properties. This method constructs a unique filename using the object's class name, redshift (if available), and hashes of its input parameters. The filename structure is determined by the _path_structures dictionary. Parameters ---------- obj : OutputStruct The OutputStruct object for which to generate a filename. Returns ------- str The generated filename for the given OutputStruct object. """ return self._fill_path_template( kind=obj.__class__.__name__, redshift=getattr(obj, "redshift", None), inputs=obj.inputs, all_seeds=False, ) def get_path(self, obj: OutputStruct) -> Path: """ Get the full path for a given OutputStruct object. This method combines the cache directory with the filename generated for the given OutputStruct object to create a complete file path. Parameters ---------- obj : OutputStruct The OutputStruct object for which to generate the full path. Returns ------- Path The complete file path for the given OutputStruct object. """ return self.direc / self.get_filename(obj) def find_existing(self, obj: OutputStruct) -> Path | None: """ Try to find existing boxes which match the parameters of this instance. Parameters ---------- obj : OutputStruct The OutputStruct instance to search for. Returns ------- Path The path to an existing cached OutputStruct matching this instance, or None if no match is found. """ # Try an explicit path f = self.get_path(obj) return f if f.exists() else None def write(self, obj: OutputStruct) -> None: """ Write an OutputStruct object to the cache. This method writes the given OutputStruct object to an HDF5 file in the cache, using the path determined by the object's properties. Parameters ---------- obj : OutputStruct The OutputStruct object to be written to the cache. """ pth = self.get_path(obj) write_output_to_hdf5(obj, path=pth) def list_datasets( self, *, kind: str | None = None, inputs: InputParameters | None = None, all_seeds: bool = True, redshift: float | None = None, ) -> list[Path]: """Return all datasets in the cache which match a given set of filters. Parameters ---------- kind: str, optional Filter by this kind (a class name of an OutputStruct). inputs : InputParameters Filter by these input parameters all_seeds Set to False to only include the seed within `inputs`. redshift The redshift to search for. Returns ------- files list of paths pointing to files matching the filters. """ kinds_list = self._output_to_cache_map.keys() if kind is None else [kind] templates = [ self._fill_path_template( kind=k, inputs=inputs, all_seeds=all_seeds, redshift=redshift, ) for k in kinds_list ] allfiles = self.direc.glob("**/*") matches = [] for fl in allfiles: for template in templates: match = re.search(template, str(fl)) if match is not None: matches.append(Path(match.string)) return matches def load(self, obj: OutputStruct) -> OutputStruct: """Load a cache-backed object from disk corresponding to a given object.""" existing = self.find_existing(obj) if existing is None: raise OSError(f"No cache exists for {obj} yet!") return read_output_struct(existing, struct=obj.__class__.__name__) def _pathfield(): return attrs.field( default=None, converter=attrs.converters.optional(Path), ) def _dict_of_paths_field(): def _convert(x: dict | None) -> tuple[Path]: if x is None: return x if isinstance(x, dict): return {float(z): Path(d) for z, d in x.items()} return attrs.field( default=None, converter=_convert, ) @attrs.define class RunCache: """An object that specifies all cache files that should/can exist for a full run. This object should be instantiated via the `.from_inputs()` class method. The instance simply holds references to all possible cache files for a particular total simulation (including all evolution over redshift). Not all of these files might exist: if a file doesn't exist it implies that the simulation has not run for that redshift/field yet. Attributes with values of None are not meant to exist as part of the simulation (e.g. they may be TsBox instances when USE_TS_FLUCT=False). """ InitialConditions: Path = _pathfield() PerturbedField: dict[float, Path] = _dict_of_paths_field() TsBox: dict[float, Path] = _dict_of_paths_field() IonizedBox: dict[float, Path] = _dict_of_paths_field() BrightnessTemp: dict[float, Path] = _dict_of_paths_field() HaloBox: dict[float, Path] | None = _dict_of_paths_field() HaloCatalog: dict[float, Path] | None = _dict_of_paths_field() XraySourceBox: dict[float, Path] | None = _dict_of_paths_field() inputs: InputParameters | None = attrs.field(default=None) @classmethod def from_inputs(cls, inputs: InputParameters, cache: OutputCache) -> Self: """ Create a RunCache instance from input parameters and an OutputCache. This method generates file paths for various output structures based on the provided input parameters and cache configuration. Parameters ---------- inputs : InputParameters The input parameters for the simulation. cache : OutputCache The output cache object containing directory and path structure information. Returns ------- RunCache A new RunCache instance with file paths for various output structures. """ ics = cache.direc / cache._fill_path_template( kind="InitialConditions", inputs=inputs, ) others = { "PerturbedField": {}, "IonizedBox": {}, "BrightnessTemp": {}, } if inputs.astro_options.USE_TS_FLUCT: others |= {"TsBox": {}} if inputs.matter_options.lagrangian_source_grid: others |= {"XraySourceBox": {}, "HaloBox": {}} if inputs.matter_options.has_discrete_halos: others |= {"HaloCatalog": {}} for z in inputs.node_redshifts: for name, val in others.items(): val[z] = cache.direc / cache._fill_path_template( kind=name, redshift=z, inputs=inputs, ) return cls( InitialConditions=ics, **others, inputs=inputs, ) @classmethod def from_example_file(cls, path: Path | str) -> Self: """Create a RunCache object from an example file. This method can be used to determine all the cache files that make up a full simulation, given a single example file. Note that this method is somewhat ambiguous when the input file is "high up" in the simulation heirarchy (e.g. InitialConditions or PerturbedField) because the input parameters to these objects may differ from those of the full simulation, in their astro_params and astro_options. For this reason, it is better to supply a cache object like IonizedBox or BrightnessTemp. Parameters ---------- path : Path | str The path to a particular file in cache. The returned OutputCache object will include this file. """ inputs = read_inputs(Path(path)) for kind in OutputCache._output_to_cache_map: template = OutputCache._fill_path_template( kind=kind, redshift=None, inputs=inputs, all_seeds=False, ) match = re.search(template, str(path)) if match is not None: parent = Path(str(path)[: match.start()]) break else: raise ValueError( f"The file {path} does not seem to be within a cache structure." ) return cls.from_inputs(inputs, OutputCache(parent)) def is_complete_at( self, z: float | None = None, index: float | None = None ) -> bool: """Determine whether the simulation has been completed down to a given redshift.""" if index is not None and z is not None: raise ValueError("Cannot specify both z and index") if index is not None: z = self.inputs.node_redshifts[index] for kind in attrs.asdict(self, recurse=False).values(): if not isinstance(kind, dict): continue if not kind[z].exists(): return False return True def get_output_struct_at_z( self, kind: type[OutputStruct] | str, z: float | None = None, index: int | None = None, match_z_within: float = 0.01, ): """Return an output struct of a given kind at or close to a given redshift. Parameters ---------- z : float The redshift at which to return an output struct. index : int The node-redshift index at which to return the output struct. allow_closest : bool Whether to allow the closest redshift available in the cache to be returned. Returns ------- OutputStruct The output struct corresponding to the kind and redshift. """ if not isinstance(kind, str): kind = kind.__name__ if kind not in attrs.fields_dict(self.__class__): raise ValueError(f"Unknown output kind: {kind}") if index is not None: if z is not None: raise ValueError("Cannot specify both z and index") z = self.inputs.node_redshifts[index] zs_of_kind = np.array(list(getattr(self, kind).keys())) if z not in zs_of_kind: closest = np.argmin(np.abs(zs_of_kind - z)) if abs(zs_of_kind[closest] - z) > match_z_within: raise ValueError( f"No output struct found for kind '{kind}' at redshift {z} (closest available: {zs_of_kind[closest]} at z idx = {closest})" ) z = zs_of_kind[closest] fl = getattr(self, kind)[z] return read_output_struct(fl) def get_ics(self) -> op.InitialConditions: """Return the initial conditions.""" return read_output_struct(self.InitialConditions) def get_all_boxes_at_z( self, z: float | None = None, index: int | None = None, match_z_within: float = 0.01, return_ics: bool = False, ) -> dict[str, OutputStruct]: """Return all boxes at or close to a given redshift. Parameters ---------- z : float The redshift at which to return the boxes. index : int The node-redshift index at which to return the boxes. match_z_within : float The maximum difference between the requested and closest available redshift. Returns ------- dict[str, Box] A dictionary mapping box names to their corresponding Box instances. """ kinds = [ k for k, v in attrs.asdict(self, recurse=False).items() if isinstance(v, dict) ] out = { k: self.get_output_struct_at_z(k, z, index, match_z_within) for k in kinds } if return_ics: out["InitialConditions"] = self.get_ics() return out def get_coeval_at_z( self, z: float | None = None, index: int | None = None, match_z_within: float = 0.01, ): """Return a Coeval object at or close to a given redshift. Parameters ---------- z : float The redshift at which to return the Coeval object. index : int The node-redshift index at which to return the Coeval object. match_z_within : float The maximum difference between the requested and closest available redshift. Returns ------- Coeval The Coeval object at the given redshift. """ from py21cmfast.drivers.coeval import Coeval boxes = self.get_all_boxes_at_z(z, index, match_z_within, return_ics=True) return Coeval( initial_conditions=boxes["InitialConditions"], perturbed_field=boxes["PerturbedField"], ionized_box=boxes["IonizedBox"], brightness_temperature=boxes["BrightnessTemp"], ts_box=boxes.get("TsBox"), halobox=boxes.get("HaloBox"), ) def is_complete(self) -> bool: """Whether the cache for the full simulation is complete.""" if not self.InitialConditions.exists(): return False for kind in attrs.asdict(self, recurse=False).values(): if not isinstance(kind, dict): continue for fl in kind.values(): if not fl.exists(): return False return True @attrs.define class CacheConfig: """A configuration object that specifies whether a certain field should be cached.""" initial_conditions: bool = attrs.field(default=True, converter=bool) perturbed_field: bool = attrs.field(default=True, converter=bool) spin_temp: bool = attrs.field(default=True, converter=bool) ionized_box: bool = attrs.field(default=True, converter=bool) brightness_temp: bool = attrs.field(default=True, converter=bool) halobox: bool = attrs.field(default=True, converter=bool) halo_catalog: bool = attrs.field(default=True, converter=bool) xray_source_box: bool = attrs.field(default=True, converter=bool) @classmethod def on(cls) -> Self: """Generate a CacheConfig where all boxes are cached.""" return cls() @classmethod def off(cls): """Generate a CacheConfig where no boxes are cached.""" return cls( initial_conditions=False, perturbed_field=False, spin_temp=False, ionized_box=False, brightness_temp=False, halobox=False, halo_catalog=False, xray_source_box=False, ) @classmethod def noloop(cls): """Generate a CacheConfig where only boxes not requiring evolution are cached.""" return cls( initial_conditions=True, perturbed_field=True, spin_temp=False, ionized_box=False, brightness_temp=False, halobox=False, halo_catalog=True, xray_source_box=False, ) @classmethod def last_step_only(cls): """Generate a CacheConfig where only boxes needed from more than one step away are cached. This represents the minimum caching setup which will *never* store every redshift in memory. PerturbedField and PerturbedHaloCatalogs are all calculated at the start of the run, and HaloBox is required at multiple redshifts for the XraySourceBox. So this caching setup allows free purging of these objects without losing data. """ return cls( initial_conditions=False, perturbed_field=True, spin_temp=False, ionized_box=False, brightness_temp=False, halobox=True, halo_catalog=True, xray_source_box=False, )