import joblib
import numpy as np
__all__ = ['SimpleParallel', 'SplitJoinParallel']
from ..Runners import BaryonifyShell, BaryonifyGrid, BaryonifySnapshot
[docs]class SimpleParallel(object):
"""
A class to execute a list of Runner objects in parallel using a joblib instance.
The `SimpleParallel` class allows for the parallel execution of tasks encapsulated by Runner objects.
It utilizes joblib's parallel processing capabilities to distribute the workload across multiple CPU cores.
This class is particularly useful for running computationally intensive tasks concurrently, thereby reducing
the overall processing time.
Parameters
----------
Runner_list : list
A list of Runner objects, each of which must have a `process()` method that defines the task to be executed.
njobs : int, optional
The number of jobs (processes) to run in parallel. If set to -1, the number of jobs is set to the number of CPUs
available. Default is -1.
Methods
-------
single_run(i, Runner)
Executes the `process()` method of a single Runner and returns its index and output.
process()
Executes all Runners in the Runner_list in parallel, returning their outputs in the original order.
Examples
--------
>>> runners = [Runner1(), Runner2(), Runner3()]
>>> parallel_executor = SimpleParallel(runners, njobs=2)
>>> results = parallel_executor.process()
Notes
-----
- The `Runner` objects must have a `process()` method implemented. This method will be called during the parallel
execution.
- The number of parallel jobs will be adjusted to match the number of Runners if there are fewer Runners than
available CPUs.
- Pickling errors can occur if the cosmology object (which is stored as an attribute in many classes) contains
SwigPyObjects that are not pickleable. This happens with P(k) calculations. The baryonification pipeline
natively destroys these problem objects. If you are facing issues, perform an explicit check of this.
See Also
--------
joblib.Parallel : The underlying parallel execution library used for running the tasks.
joblib.delayed : A function to wrap the tasks to be executed in parallel.
"""
def __init__(self, Runner_list, njobs = -1):
self.Runner_list = Runner_list
self.njobs = njobs if njobs != -1 else joblib.externals.loky.cpu_count()
if len(Runner_list) < self.njobs:
self.njobs = len(Runner_list)
print(f"You asked for more processors than needed. Setting n_jobs = {self.njobs}")
[docs] def single_run(self, i, Runner):
"""
Executes the `process()` method of a single Runner and returns its index and output.
This method is used to run a single Runner object. It returns the index of the Runner and its output,
which allows for the ordered aggregation of results.
Parameters
----------
i : int
The index of the Runner in the Runner_list.
Runner : object
An instance of a Runner object, which must have a `process()` method.
Returns
-------
tuple
A tuple containing the index of the Runner and the output of its `process()` method.
"""
return i, Runner.process()
[docs] def process(self):
"""
Executes all Runners in the Runner_list in parallel, returning their outputs in the original order.
This method uses joblib's Parallel and delayed functions to run each Runner's `process()` method in parallel.
The outputs are collected and sorted based on the original order of the Runner_list.
Returns
-------
list
A list of outputs from the `process()` methods of each Runner, in the order they were provided in Runner_list.
"""
with joblib.parallel_backend("loky"):
jobs = [joblib.delayed(self.single_run)(i, Runner) for i, Runner in enumerate(self.Runner_list)]
outputs = joblib.Parallel(n_jobs = self.njobs, verbose=10)(jobs)
#Sort them so they are in the same order as they were input
ordered_outputs = [0] * len(outputs)
for o in outputs: ordered_outputs[o[0]] = o[1]
return ordered_outputs
[docs]class SplitJoinParallel(object):
"""
A class to split a single Runner task into multiple parallel tasks and join the results.
The `SplitJoinParallel` class takes a single Runner object and splits its task into multiple smaller tasks
to be run in parallel. It uses joblib for parallel execution, and the results are combined (joined) after
all parallel tasks are completed. This approach is particularly useful for handling large datasets or
computationally intensive tasks by distributing the workload across multiple CPU cores.
This class is currently not designed to work with the Baryonification operation and will raise an
error if it is passed any Baryonification runner.
Parameters
----------
Runner : object
A Runner object that defines the task to be executed. This object must have methods and attributes
necessary for splitting the task, such as `HaloLightConeCatalog`, `LightconeShell`, `cosmo`, etc.
seed : int
A seed value for initializing the random number generator to ensure reproducibility when shuffling
the halo catalog (shufflying is done to load balance the jobs). Default is 42.
njobs : int, optional
The number of jobs (processes) to run in parallel. If set to -1, the number of jobs is set to the number
of CPUs available. Default is -1.
Attributes
----------
Runner : object
The original Runner object that is being split for parallel processing.
seed : int
A seed value used when shuffling the halo catalog. Default is 42.
njobs : int
The number of jobs (processes) that will be run in parallel. This is adjusted based on the number of
available CPUs and the length of the Runner_list.
Runner_list : list
A list of Runner objects, each representing a subset of the original task to be run in parallel.
Methods
-------
split_run(Runner)
Splits the original Runner task into multiple smaller Runner tasks for parallel processing.
single_run(Runner)
Executes the `process()` method of a single Runner and returns its output.
process()
Executes all Runners in the Runner_list in parallel, returning the combined output.
Examples
--------
>>> runner = SomeRunnerClass()
>>> split_join_executor = SplitJoinParallel(runner, njobs=4)
>>> result = split_join_executor.process()
Notes
-----
- The `Runner` object must have specific attributes like `HaloLightConeCatalog`, `LightconeShell`, `cosmo`,
`model`, `mass_def`, `epsilon_max`, and `use_ellipticity`.
- The number of parallel jobs will be adjusted to match the number of splits if there are fewer splits than
available CPUs.
- Pickling errors can occur if the cosmology object (which is stored as an attribute in many classes) contains
SwigPyObjects that are not pickleable. This happens with P(k) calculations. The baryonification pipeline
natively destroys these problem objects. If you are facing issues, perform an explicit check of this.
See Also
--------
joblib.Parallel : The underlying parallel execution library used for running the tasks.
joblib.delayed : A function to wrap the tasks to be executed in parallel.
"""
def __init__(self, Runner, njobs = -1, seed = 42):
"""
Initializes the SplitJoinParallel class with a Runner and the number of jobs.
Parameters
----------
Runner : object
A Runner object that defines the task to be executed. This object must have methods and attributes
necessary for splitting the task, such as `HaloLightConeCatalog`, `LightconeShell`, `cosmo`, etc.
njobs : int, optional
The number of jobs (processes) to run in parallel. If set to -1, the number of jobs is set to the number
of CPUs available. Default is -1.
"""
#The SplitJoin runner only works when the final output can be linearly summed across different batches of halos.
#A painting operation is an ideal use case, while Baryonification is not, so do an explicit check here.
text = f"Runner of type {type(Runner)} is not supported for SplitJoinParallel."
assert not isinstance(Runner, (BaryonifyGrid, BaryonifyShell, BaryonifySnapshot)), text
self.Runner = Runner
self.seed = seed
self.njobs = njobs if njobs != -1 else joblib.externals.loky.cpu_count()
self.Runner_list = self.split_run(self.Runner)
[docs] def split_run(self, Runner):
"""
Splits the original Runner task into multiple smaller Runner tasks for parallel processing.
This method divides the halo catalog from the Runner into multiple subsets. Each subset is used to
create a new Runner, which will process only that subset. The splitting process includes shuffling
the catalog to optimize load balancing across processes.
Parameters
----------
Runner : object
The original Runner object to be split.
Returns
-------
Runner_list : list
A list of new Runner objects, each configured to process a subset of the original Runner's task.
"""
HaloCat = Runner.HaloLightConeCatalog
Shell = Runner.LightconeShell
cosmo = Runner.cosmo
model = Runner.model
mass_def = Runner.mass_def
eps_max = Runner.epsilon_max
ellip = Runner.use_ellipticity
#Now split
catalog = HaloCat.cat
Nsplits = self.njobs
Ntotal = len(catalog)
Npersplit = int(np.ceil(Ntotal/Nsplits))
#Randomize catalog ordering. This helps optimize the parallelization. Else if
#low redshift halos are all the start, then handful of processes will be overburdened
#while the rest are just sitting idle.
HaloCat = HaloCat[np.random.default_rng(self.seed).choice(Ntotal, size = Ntotal, replace = False)]
empty_shell = type(Shell)(map = np.zeros_like(Shell.map), cosmo = cosmo)
Runner_list = []
for i in range(Nsplits):
start = i*Npersplit
end = (i + 1)*Npersplit
#Halocatalog object is sliceable like a regular numpy array so can easily
#split the halos up but keep the same data structure
New_HaloCatalog = HaloCat[start:end]
#Create a new Runner for just a subset of catalog. Has same model, map size etc.
#Force verbose to be off as we don't want outputs for each subrun of parallel process.
New_Runner = type(Runner)(New_HaloCatalog, empty_shell, eps_max, model, ellip, mass_def, verbose = False)
Runner_list.append(New_Runner)
return Runner_list
[docs] def single_run(self, Runner):
"""
Executes the `process()` method of a single Runner and returns its output.
This method is used to run a single Runner object.
Parameters
----------
Runner : object
An instance of a Runner object, which must have a `process()` method.
Returns
-------
output
The output of the Runner's `process()` method.
"""
return Runner.process()
[docs] def process(self):
"""
Executes all Runners in the Runner_list in parallel, returning the combined output.
This method uses joblib's Parallel and delayed functions to run each Runner's `process()` method in parallel.
The outputs are combined by summing them, which is appropriate if the contributions from each runner can
be linearly summed. This is ideal for any profile painting tasks, such as `PaintProfilesShell` or
`PaintProfilesGrid`.
Returns
-------
map_out : ndarray
A combined map output generated by summing the outputs from each Runner's `process()` method.
"""
with joblib.parallel_backend("loky"):
jobs = [joblib.delayed(self.single_run)(Runner) for Runner in self.Runner_list]
outputs = joblib.Parallel(n_jobs = self.njobs, verbose=10)(jobs)
#Sum the contributions from invidual runs.
#Contributions from each halo can be linearly added so this is fine
map_out = np.sum(outputs, axis = 0)
return map_out