Source code for openmc.deplete.results

"""The results module.

Contains results generation and saving capabilities.
"""

from collections import OrderedDict
import copy

import h5py
import numpy as np

from . import comm, MPI
from .reaction_rates import ReactionRates

VERSION_RESULTS = (1, 1)


__all__ = ["Results"]


[docs]class Results: """Output of a depletion run Attributes ---------- k : list of (float, float) Eigenvalue and uncertainty for each substep. time : list of float Time at beginning, end of step, in seconds. source_rate : float Source rate during timestep in [W] or [neutron/sec] n_mat : int Number of mats. n_nuc : int Number of nuclides. rates : list of ReactionRates The reaction rates for each substep. volume : OrderedDict of str to float Dictionary mapping mat id to volume. mat_to_ind : OrderedDict of str to int A dictionary mapping mat ID as string to index. nuc_to_ind : OrderedDict of str to int A dictionary mapping nuclide name as string to index. mat_to_hdf5_ind : OrderedDict of str to int A dictionary mapping mat ID as string to global index. n_hdf5_mats : int Number of materials in entire geometry. n_stages : int Number of stages in simulation. data : numpy.ndarray Atom quantity, stored by stage, mat, then by nuclide. proc_time: int Average time spent depleting a material across all materials and processes """ def __init__(self): self.k = None self.time = None self.source_rate = None self.rates = None self.volume = None self.proc_time = None self.mat_to_ind = None self.nuc_to_ind = None self.mat_to_hdf5_ind = None self.data = None def __getitem__(self, pos): """Retrieves an item from results. Parameters ---------- pos : tuple A three-length tuple containing a stage index, mat index and a nuc index. All can be integers or slices. The second two can be strings corresponding to their respective dictionary. Returns ------- float The atoms for stage, mat, nuc """ stage, mat, nuc = pos if isinstance(mat, str): mat = self.mat_to_ind[mat] if isinstance(nuc, str): nuc = self.nuc_to_ind[nuc] return self.data[stage, mat, nuc] def __setitem__(self, pos, val): """Sets an item from results. Parameters ---------- pos : tuple A three-length tuple containing a stage index, mat index and a nuc index. All can be integers or slices. The second two can be strings corresponding to their respective dictionary. val : float The value to set data to. """ stage, mat, nuc = pos if isinstance(mat, str): mat = self.mat_to_ind[mat] if isinstance(nuc, str): nuc = self.nuc_to_ind[nuc] self.data[stage, mat, nuc] = val @property def n_mat(self): return len(self.mat_to_ind) @property def n_nuc(self): return len(self.nuc_to_ind) @property def n_hdf5_mats(self): return len(self.mat_to_hdf5_ind) @property def n_stages(self): return self.data.shape[0]
[docs] def allocate(self, volume, nuc_list, burn_list, full_burn_list, stages): """Allocates memory of Results. Parameters ---------- volume : dict of str float Volumes corresponding to materials in full_burn_dict nuc_list : list of str A list of all nuclide names. Used for sorting the simulation. burn_list : list of int A list of all mat IDs to be burned. Used for sorting the simulation. full_burn_list : list of str List of all burnable material IDs stages : int Number of stages in simulation. """ self.volume = copy.deepcopy(volume) self.nuc_to_ind = {nuc: i for i, nuc in enumerate(nuc_list)} self.mat_to_ind = {mat: i for i, mat in enumerate(burn_list)} self.mat_to_hdf5_ind = {mat: i for i, mat in enumerate(full_burn_list)} # Create storage array self.data = np.zeros((stages, self.n_mat, self.n_nuc))
[docs] def distribute(self, local_materials, ranges): """Create a new object containing data for distributed materials Parameters ---------- local_materials : iterable of str Materials for this process ranges : iterable of int Slice-like object indicating indicies of ``local_materials`` in the material dimension of :attr:`data` and each element in :attr:`rates` Returns ------- Results New results object """ new = Results() new.volume = {lm: self.volume[lm] for lm in local_materials} new.mat_to_ind = {mat: idx for (idx, mat) in enumerate(local_materials)} # Direct transfer direct_attrs = ("time", "k", "source_rate", "nuc_to_ind", "mat_to_hdf5_ind", "proc_time") for attr in direct_attrs: setattr(new, attr, getattr(self, attr)) # Get applicable slice of data new.data = self.data[:, ranges] new.rates = [r[ranges] for r in self.rates] return new
[docs] def export_to_hdf5(self, filename, step): """Export results to an HDF5 file Parameters ---------- filename : str The filename to write to step : int What step is this? """ # Write new file if first time step, else add to existing file kwargs = {'mode': "w" if step == 0 else "a"} if h5py.get_config().mpi and comm.size > 1: # Write results in parallel kwargs['driver'] = 'mpio' kwargs['comm'] = comm with h5py.File(filename, **kwargs) as handle: self._to_hdf5(handle, step, parallel=True) else: # Gather results at root process all_results = comm.gather(self) # Only root process writes results if comm.rank == 0: with h5py.File(filename, **kwargs) as handle: for res in all_results: res._to_hdf5(handle, step, parallel=False)
def _write_hdf5_metadata(self, handle): """Writes result metadata in HDF5 file Parameters ---------- handle : h5py.File or h5py.Group An hdf5 file or group type to store this in. """ # Create and save the 5 dictionaries: # quantities # self.mat_to_ind -> self.volume (TODO: support for changing volumes) # self.nuc_to_ind # reactions # self.rates[0].nuc_to_ind (can be different from above, above is superset) # self.rates[0].react_to_ind # these are shared by every step of the simulation, and should be deduplicated. # Store concentration mat and nuclide dictionaries (along with volumes) handle.attrs['version'] = np.array(VERSION_RESULTS) handle.attrs['filetype'] = np.string_('depletion results') mat_list = sorted(self.mat_to_hdf5_ind, key=int) nuc_list = sorted(self.nuc_to_ind) rxn_list = sorted(self.rates[0].index_rx) n_mats = self.n_hdf5_mats n_nuc_number = len(nuc_list) n_nuc_rxn = len(self.rates[0].index_nuc) n_rxn = len(rxn_list) n_stages = self.n_stages mat_group = handle.create_group("materials") for mat in mat_list: mat_single_group = mat_group.create_group(mat) mat_single_group.attrs["index"] = self.mat_to_hdf5_ind[mat] mat_single_group.attrs["volume"] = self.volume[mat] nuc_group = handle.create_group("nuclides") for nuc in nuc_list: nuc_single_group = nuc_group.create_group(nuc) nuc_single_group.attrs["atom number index"] = self.nuc_to_ind[nuc] if nuc in self.rates[0].index_nuc: nuc_single_group.attrs["reaction rate index"] = self.rates[0].index_nuc[nuc] rxn_group = handle.create_group("reactions") for rxn in rxn_list: rxn_single_group = rxn_group.create_group(rxn) rxn_single_group.attrs["index"] = self.rates[0].index_rx[rxn] # Construct array storage handle.create_dataset("number", (1, n_stages, n_mats, n_nuc_number), maxshape=(None, n_stages, n_mats, n_nuc_number), chunks=(1, 1, n_mats, n_nuc_number), dtype='float64') handle.create_dataset("reaction rates", (1, n_stages, n_mats, n_nuc_rxn, n_rxn), maxshape=(None, n_stages, n_mats, n_nuc_rxn, n_rxn), chunks=(1, 1, n_mats, n_nuc_rxn, n_rxn), dtype='float64') handle.create_dataset("eigenvalues", (1, n_stages, 2), maxshape=(None, n_stages, 2), dtype='float64') handle.create_dataset("time", (1, 2), maxshape=(None, 2), dtype='float64') handle.create_dataset("source_rate", (1, n_stages), maxshape=(None, n_stages), dtype='float64') handle.create_dataset( "depletion time", (1,), maxshape=(None,), dtype="float64") def _to_hdf5(self, handle, index, parallel=False): """Converts results object into an hdf5 object. Parameters ---------- handle : h5py.File or h5py.Group An HDF5 file or group type to store this in. index : int What step is this? parallel : bool Being called with parallel HDF5? """ if "/number" not in handle: if parallel: comm.barrier() self._write_hdf5_metadata(handle) if parallel: comm.barrier() # Grab handles number_dset = handle["/number"] rxn_dset = handle["/reaction rates"] eigenvalues_dset = handle["/eigenvalues"] time_dset = handle["/time"] source_rate_dset = handle["/source_rate"] proc_time_dset = handle["/depletion time"] # Get number of results stored number_shape = list(number_dset.shape) number_results = number_shape[0] new_shape = index + 1 if number_results < new_shape: # Extend first dimension by 1 number_shape[0] = new_shape number_dset.resize(number_shape) rxn_shape = list(rxn_dset.shape) rxn_shape[0] = new_shape rxn_dset.resize(rxn_shape) eigenvalues_shape = list(eigenvalues_dset.shape) eigenvalues_shape[0] = new_shape eigenvalues_dset.resize(eigenvalues_shape) time_shape = list(time_dset.shape) time_shape[0] = new_shape time_dset.resize(time_shape) source_rate_shape = list(source_rate_dset.shape) source_rate_shape[0] = new_shape source_rate_dset.resize(source_rate_shape) proc_shape = list(proc_time_dset.shape) proc_shape[0] = new_shape proc_time_dset.resize(proc_shape) # If nothing to write, just return if len(self.mat_to_ind) == 0: return # Add data # Note, for the last step, self.n_stages = 1, even if n_stages != 1. n_stages = self.n_stages inds = [self.mat_to_hdf5_ind[mat] for mat in self.mat_to_ind] low = min(inds) high = max(inds) for i in range(n_stages): number_dset[index, i, low:high+1] = self.data[i] rxn_dset[index, i, low:high+1] = self.rates[i] if comm.rank == 0: eigenvalues_dset[index, i] = self.k[i] if comm.rank == 0: time_dset[index] = self.time source_rate_dset[index] = self.source_rate if self.proc_time is not None: proc_time_dset[index] = ( self.proc_time / (comm.size * self.n_hdf5_mats) )
[docs] @classmethod def from_hdf5(cls, handle, step): """Loads results object from HDF5. Parameters ---------- handle : h5py.File or h5py.Group An HDF5 file or group type to load from. step : int Index for depletion step """ results = cls() # Grab handles number_dset = handle["/number"] eigenvalues_dset = handle["/eigenvalues"] time_dset = handle["/time"] if "source_rate" in handle: source_rate_dset = handle["/source_rate"] else: # Older versions used "power" instead of "source_rate" source_rate_dset = handle["/power"] results.data = number_dset[step, :, :, :] results.k = eigenvalues_dset[step, :] results.time = time_dset[step, :] results.source_rate = source_rate_dset[step, :] if "depletion time" in handle: proc_time_dset = handle["/depletion time"] if step < proc_time_dset.shape[0]: results.proc_time = proc_time_dset[step] if results.proc_time is None: results.proc_time = np.array([np.nan]) # Reconstruct dictionaries results.volume = OrderedDict() results.mat_to_ind = OrderedDict() results.nuc_to_ind = OrderedDict() rxn_nuc_to_ind = OrderedDict() rxn_to_ind = OrderedDict() for mat, mat_handle in handle["/materials"].items(): vol = mat_handle.attrs["volume"] ind = mat_handle.attrs["index"] results.volume[mat] = vol results.mat_to_ind[mat] = ind for nuc, nuc_handle in handle["/nuclides"].items(): ind_atom = nuc_handle.attrs["atom number index"] results.nuc_to_ind[nuc] = ind_atom if "reaction rate index" in nuc_handle.attrs: rxn_nuc_to_ind[nuc] = nuc_handle.attrs["reaction rate index"] for rxn, rxn_handle in handle["/reactions"].items(): rxn_to_ind[rxn] = rxn_handle.attrs["index"] results.rates = [] # Reconstruct reactions for i in range(results.n_stages): rate = ReactionRates(results.mat_to_ind, rxn_nuc_to_ind, rxn_to_ind, True) rate[:] = handle["/reaction rates"][step, i, :, :, :] results.rates.append(rate) return results
[docs] @staticmethod def save(op, x, op_results, t, source_rate, step_ind, proc_time=None): """Creates and writes depletion results to disk Parameters ---------- op : openmc.deplete.TransportOperator The operator used to generate these results. x : list of list of numpy.array The prior x vectors. Indexed [i][cell] using the above equation. op_results : list of openmc.deplete.OperatorResult Results of applying transport operator t : list of float Time indices. source_rate : float Source rate during time step in [W] or [neutron/sec] step_ind : int Step index. proc_time : float or None Total process time spent depleting materials. This may be process-dependent and will be reduced across MPI processes. """ # Get indexing terms vol_dict, nuc_list, burn_list, full_burn_list = op.get_results_info() stages = len(x) # Create results results = Results() results.allocate(vol_dict, nuc_list, burn_list, full_burn_list, stages) n_mat = len(burn_list) for i in range(stages): for mat_i in range(n_mat): results[i, mat_i, :] = x[i][mat_i] results.k = [(r.k.nominal_value, r.k.std_dev) for r in op_results] results.rates = [r.rates for r in op_results] results.time = t results.source_rate = source_rate results.proc_time = proc_time if results.proc_time is not None: results.proc_time = comm.reduce(proc_time, op=MPI.SUM) results.export_to_hdf5("depletion_results.h5", step_ind)
[docs] def transfer_volumes(self, geometry): """Transfers volumes from depletion results to geometry Parameters ---------- geometry : OpenMC geometry to be used in a depletion restart calculation """ for cell in geometry.get_all_material_cells().values(): for material in cell.get_all_materials().values(): if material.depletable: material.volume = self.volume[str(material.id)]