Source code for BaryonForge.utils.misc

import numpy as np
import pyccl as ccl
from operator import add, mul, sub, truediv, pow, abs, neg, pos
import warnings
from scipy import interpolate

__all__ = ['generate_operator_method', 'destory_Pk', 'build_cosmodict', 'safe_Pchip_minimize', 'combine_fftpars']

[docs]def generate_operator_method(op, reflect = False): """ Defines a method for generating simple arithmetic operations for the Profile classes. The `generate_operator_method` function dynamically creates methods that can be used to perform arithmetic operations (such as addition, subtraction, multiplication, etc.) on instances of the `HaloProfile` classes or similar. This function alters the `_real()` routine in the `HaloProfile` classes so that the new result is equivalent to computing the `_real()` of the individual classes and then performing the specified arithmetic operation. Parameters ---------- op : function The arithmetic operation to be applied. It should be one of the Python arithmetic operators like `add`, `mul`, `sub`, `pow`, `truediv`, `abs`, `neg`, or `pos` from the `operator` module. reflect : bool, optional If `True`, the order of the operands is reversed (i.e., the operation is performed with the second operand as the first). Used to handle right-handed/left-handed operations. Default is `False`. Returns ------- operator_method : function A function that defines how the arithmetic operation should be performed on the `HaloProfile` classes. This function can handle operations with other `HaloProfile` instances, integers, or floats. Examples -------- To use `generate_operator_method` for addition, you might do: >>> from operator import add >>> add_method = generate_operator_method(add) >>> profile_sum = add_method(profile1, profile2) Notes ----- - The method returned can handle both unary and binary operations depending on the operator specified. """ if op in [add, mul, sub, pow, truediv]: def operator_method(self, other): assert isinstance(other, (int, float, ccl.halos.profiles.HaloProfile)), f"Object must be int/float/SchneiderProfile but is type '{type(other).__name__}'." mdl_pars = self.model_params hyp_pars = self.hyper_params fft_pars = self.precision_fftlog.to_dict() if isinstance(other, ccl.halos.profiles.HaloProfile): mdl_pars |= other.model_params hyp_pars |= other.hyper_params fft_pars = combine_fftpars(fft_pars, other.precision_fftlog.to_dict()) Combined = self.__class__(**mdl_pars, **hyp_pars) Combined.update_precision_fftlog(**fft_pars) def __tmp_real__(cosmo, r, M, a): A = self._real(cosmo, r, M, a) if isinstance(other, ccl.halos.profiles.HaloProfile): B = other._real(cosmo, r, M, a) else: B = other if not reflect: return op(A, B) else: return op(B, A) def __str_prf__(): op_name = op.__name__ cl_name = self.__str_prf__() #Check if it has cutom string output already #If not then use the class name. Or if int/float just use number if isinstance(other, ccl.halos.profiles.HaloProfile): if hasattr(other, '__str_prf__'): ot_name = other.__str_prf__() else: ot_name = other.__class__.__name__ else: ot_name = other if not reflect: return f"{op_name}[{cl_name}, {ot_name}]" else: return f"{op_name}[{ot_name}, {cl_name}]" Combined._real = __tmp_real__ Combined.__str_prf__ = __str_prf__ #Do the same operations on fourier side only if the profile exists. #This happens for a small handful of profiles if hasattr(self, '_fourier') & ((not isinstance(other, ccl.halos.profiles.HaloProfile)) | hasattr(other, '_fourier')): def __tmp_fourier__(cosmo, r, M, a): A = self._fourier(cosmo, r, M, a) if isinstance(other, ccl.halos.profiles.HaloProfile): B = other._fourier(cosmo, r, M, a) else: B = other if not reflect: return op(A, B) else: return op(B, A) Combined._fourier = __tmp_fourier__ return Combined #For some operators we don't need a second input, so rewrite function for that elif op in [abs, neg, pos]: def operator_method(self): Combined = self.__class__(**self.model_params, **self.hyper_params) Combined.update_precision_fftlog(**self.precision_fftlog.to_dict()) def __tmp_real__(cosmo, r, M, a): A = self._real(cosmo, r, M, a) return op(A) def __str_prf__(): op_name = op.__name__ cl_name = self.__str_prf__() return f"{op_name}[{cl_name}]" Combined._real = __tmp_real__ Combined.__str_prf__ = __str_prf__ return Combined return operator_method
[docs]def destory_Pk(cosmo): """ Removes some SwigPyObject from cosmo that are unable to be pickled and therefore caused the multiprocessing to break. Parameters ---------- cosmo : object A ccl Cosmology object Returns ------- cosmo : object The ccl Cosmology object but with some attributes (that are necessarily SwigPyObjects) destoryed in order to allow pickling. Notes ----- - For efficiency in pipeline, this function should be used on a Cosmology object only once the main profile/cosmology operations are finished. Otherwise this forces the recalculation of the power spectrum (which may not be an issue depending on your use-case, but something to be aware of). """ cosmo._pk_lin = {} cosmo._pk_nl = {} return cosmo
[docs]def build_cosmodict(cosmo): """ Generate a dictionary containing a subset of cosmological parameters from a pyccl Cosmology object. This function extracts key cosmological parameter values from a pyccl Cosmology object and stores them in a dictionary. If the `sigma8` parameter is not already computed (NaN), it triggers its computation. Parameters ---------- cosmo : pyccl.core.Cosmology A pyccl Cosmology object containing the cosmological model parameters. Returns ------- dict A dictionary with the following keys: - 'Omega_m' : float The total matter density parameter. - 'Omega_b' : float The baryonic matter density parameter. - 'sigma8' : float The normalization of the power spectrum. - 'h' : float The dimensionless Hubble constant. - 'n_s' : float The spectral index of the primordial power spectrum. - 'w0' : float The equation-of-state parameter for dark energy (constant term). - 'wa' : float The equation-of-state parameter for dark energy (time-dependent term). Notes ----- If `sigma8` is not already computed in the Cosmology object, this function invokes `cosmo.compute_sigma()` to compute and update its value before returning the dictionary. """ cdict = {'Omega_m' : cosmo.cosmo.params.Omega_m, 'Omega_b' : cosmo.cosmo.params.Omega_b, 'sigma8' : cosmo.cosmo.params.sigma8, 'h' : cosmo.cosmo.params.h, 'n_s' : cosmo.cosmo.params.n_s, 'w0' : cosmo.cosmo.params.w0, 'wa' : cosmo.cosmo.params.wa, } if np.isnan(cdict['sigma8']): cosmo.compute_sigma() cdict['sigma'] = cosmo.cosmo.params.sigma8 return cdict
[docs]def safe_Pchip_minimize(x, y): if (np.min(x) > 0) | (np.max(x) < 0): warnings.warn(f"Cannot minimize. Range {np.min(x)} < LHS - RHS < {np.max(x)} does not include zero!" "Setting R_ej = inf. This error can occur if f_ej = 0.", UserWarning) return np.inf cen = np.argmin(np.abs(x - 0)) #Find the point around which we should search for minima buf = 5 #Large enough (one-sided) buffer in case any weird interpolator effects from using too few points ind = slice(max(cen - buf, 0), min(cen + buf, len(x))) #Many reasons why Pchip could fail try: return interpolate.PchipInterpolator(x[ind], y[ind])(0) except ValueError: warnings.warn(f"Cannot minimize. Interpolator crashed. Reverting to using np.min()", UserWarning) return y[cen]
#FFT params and the rules for determining a #superset of parameter values between two profiles _fft_precision_logic = { 'padding_lo_fftlog' : min, 'padding_hi_fftlog' : max, 'n_per_decade' : max, 'extrapol' : None, 'padding_lo_extra' : min, 'padding_hi_extra' : max, 'large_padding_2D' : None, 'plaw_fourier' : None, 'plaw_projected' : None }
[docs]def combine_fftpars(setA, setB): """ Combine two dictionaries of FFT-related parameters using predefined merge rules. This function merges two parameter dictionaries (``setA`` and ``setB``) by iterating over a global list of merge rules (``_fft_precision_logic``). Each entry in ``_fft_precision_logic`` specifies: - a key ``k`` expected in both dictionaries, and - a merge rule ``rule`` which is either a callable or ``None``. If ``rule`` is a callable, it is applied to the corresponding entries ``setA[k]`` and ``setB[k]`` to compute the merged value. If ``rule`` is ``None`` and the values in ``setA`` and ``setB`` differ, the value from ``setA`` is taken as the default and a warning is issued. If the values match, that value is used. Parameters ---------- setA, setB : dict First and second dictionary of FFT parameter values. All keys appearing in ``_fft_precision_logic`` must be present. Returns ------- dict A new dictionary containing the merged FFT parameter values. Warns ----- UserWarning If a merge rule is ``None`` and the values for a parameter differ between ``setA`` and ``setB``. In this case, the value from ``setA`` is used. """ new_set = {} #Loop over all parameters defined by CCL's FFT precision for k, rule in _fft_precision_logic.items(): A, B = setA[k], setB[k] #If we said there is no merging rule, it means we have no #way of automatically setting a value to the combined profile #so just pick the value in the first dict and call it a day (with a warning). if (rule == None): new_set[k] = A if A != B: warnings.warn(f"Value of parameter {k} is inconsistent between two profiles you are combining ({A}, {B}). " "We will use {A} as the default value") #Otherwise, we have clear rules for setting params #such that you encompass the requirements of #both profiles. else: new_set[k] = rule(A, B) return new_set