Source code for openmc.deplete.pool

"""Dedicated module containing depletion function

Provided to avoid some circular imports
"""
from itertools import repeat, starmap
from multiprocessing import Pool

from scipy.sparse import bmat
import numpy as np

from openmc.mpi import comm

# Configurable switch that enables / disables the use of
# multiprocessing routines during depletion
USE_MULTIPROCESSING = True

# Allow user to override the number of worker processes to use for depletion
# calculations
NUM_PROCESSES = None

def _distribute(items):
    """Distribute items across MPI communicator

    Parameters
    ----------
    items : list
        List of items of distribute

    Returns
    -------
    list
        Items assigned to process that called

    """
    min_size, extra = divmod(len(items), comm.size)
    j = 0
    for i in range(comm.size):
        chunk_size = min_size + int(i < extra)
        if comm.rank == i:
            return items[j:j + chunk_size]
        j += chunk_size

[docs]def deplete(func, chain, n, rates, dt, matrix_func=None, transfer_rates=None, *matrix_args): """Deplete materials using given reaction rates for a specified time Parameters ---------- func : callable Function to use to get new compositions. Expected to have the signature ``func(A, n0, t) -> n1`` chain : openmc.deplete.Chain Depletion chain n : list of numpy.ndarray List of atom number arrays for each material. Each array in the list contains the number of [atom] of each nuclide. rates : openmc.deplete.ReactionRates Reaction rates (from transport operator) dt : float Time in [s] to deplete for maxtrix_func : callable, optional Function to form the depletion matrix after calling ``matrix_func(chain, rates, fission_yields)``, where ``fission_yields = {parent: {product: yield_frac}}`` Expected to return the depletion matrix required by ``func`` transfer_rates : openmc.deplete.TransferRates, Optional Object to perform continuous reprocessing. .. versionadded:: 0.14.0 matrix_args: Any, optional Additional arguments passed to matrix_func Returns ------- n_result : list of numpy.ndarray Updated list of atom number arrays for each material. Each array in the list contains the number of [atom] of each nuclide. """ fission_yields = chain.fission_yields if len(fission_yields) == 1: fission_yields = repeat(fission_yields[0]) elif len(fission_yields) != len(n): raise ValueError( "Number of material fission yield distributions {} is not " "equal to the number of compositions {}".format( len(fission_yields), len(n))) if matrix_func is None: matrices = map(chain.form_matrix, rates, fission_yields) else: matrices = map(matrix_func, repeat(chain), rates, fission_yields, *matrix_args) if transfer_rates is not None: # Calculate transfer rate terms as diagonal matrices transfers = map(chain.form_rr_term, repeat(transfer_rates), transfer_rates.local_mats) # Subtract transfer rate terms from Bateman matrices matrices = [matrix - transfer for (matrix, transfer) in zip(matrices, transfers)] if len(transfer_rates.index_transfer) > 0: # Gather all on comm.rank 0 matrices = comm.gather(matrices) n = comm.gather(n) if comm.rank == 0: # Expand lists matrices = [elm for matrix in matrices for elm in matrix] n = [n_elm for n_mat in n for n_elm in n_mat] # Calculate transfer rate terms as diagonal matrices transfer_pair = { mat_pair: chain.form_rr_term(transfer_rates, mat_pair) for mat_pair in transfer_rates.index_transfer } # Combine all matrices together in a single matrix of matrices # to be solved in one go n_rows = n_cols = len(transfer_rates.burnable_mats) rows = [] for row in range(n_rows): cols = [] for col in range(n_cols): mat_pair = (transfer_rates.burnable_mats[row], transfer_rates.burnable_mats[col]) if row == col: # Fill the diagonals with the Bateman matrices cols.append(matrices[row]) elif mat_pair in transfer_rates.index_transfer: # Fill the off-diagonals with the transfer pair matrices cols.append(transfer_pair[mat_pair]) else: cols.append(None) rows.append(cols) matrix = bmat(rows) # Concatenate vectors of nuclides in one n_multi = np.concatenate(n) n_result = func(matrix, n_multi, dt) # Split back the nuclide vector result into the original form n_result = np.split(n_result, np.cumsum([len(i) for i in n])[:-1]) else: n_result = None # Braodcast result to other ranks n_result = comm.bcast(n_result) # Distribute results across MPI n_result = _distribute(n_result) return n_result inputs = zip(matrices, n, repeat(dt)) if USE_MULTIPROCESSING: with Pool(NUM_PROCESSES) as pool: n_result = list(pool.starmap(func, inputs)) else: n_result = list(starmap(func, inputs)) return n_result