"""The stepresult module.
Contains capabilities for generating and saving results of a single depletion
timestep.
"""
import copy
import warnings
from pathlib import Path
import h5py
import numpy as np
import openmc
from openmc.checkvalue import PathLike
from openmc.mpi import MPI, comm
from .reaction_rates import ReactionRates
VERSION_RESULTS = (1, 3)
__all__ = ["StepResult"]
[docs]
class StepResult:
"""Result of a single depletion timestep
.. versionchanged:: 0.13.1
Name changed from ``Results`` to ``StepResult``
Attributes
----------
k : tuple of (float, float)
Eigenvalue and uncertainty at end of step.
time : list of float
Time at beginning, end of step, in seconds.
source_rate : float
Source rate during timestep in [W] or [neutron/sec]
n_mat : int
Number of mats.
n_nuc : int
Number of nuclides.
rates : ReactionRates
The reaction rates at end of step.
volume : dict of str to float
Dictionary mapping mat id to volume.
index_mat : dict of str to int
A dictionary mapping mat ID as string to index.
index_nuc : dict of str to int
A dictionary mapping nuclide name as string to index.
mat_to_hdf5_ind : dict of str to int
A dictionary mapping mat ID as string to global index.
n_hdf5_mats : int
Number of materials in entire geometry.
data : numpy.ndarray
Atom quantity, stored by mat, then by nuclide.
proc_time : int
Average time spent depleting a material across all
materials and processes
keff_search_root : float
The root returned by the keff search control.
"""
def __init__(self):
self.k = None
self.time = None
self.source_rate = None
self.rates = None
self.volume = None
self.proc_time = None
self.index_mat = None
self.index_nuc = None
self.mat_to_hdf5_ind = None
self.name_list = None
self.data = None
self.keff_search_root = None
def __repr__(self):
t = self.time[0]
dt = self.time[1] - self.time[0]
return f"<StepResult: t={t}, dt={dt}, source={self.source_rate}>"
def __getitem__(self, pos):
"""Retrieves an item from results.
Parameters
----------
pos : tuple
A two-length tuple containing a mat index and a nuc
index. Both can be integers or slices, or can be
strings corresponding to their respective dictionary.
Returns
-------
float
The atoms for mat, nuc
"""
mat, nuc = pos
if isinstance(mat, openmc.Material):
mat = str(mat.id)
if isinstance(mat, str):
mat = self.index_mat[mat]
if isinstance(nuc, str):
nuc = self.index_nuc[nuc]
return self.data[mat, nuc]
def __setitem__(self, pos, val):
"""Sets an item from results.
Parameters
----------
pos : tuple
A two-length tuple containing a mat index and a nuc
index. Both can be integers or slices, or can be
strings corresponding to their respective dictionary.
val : float
The value to set data to.
"""
mat, nuc = pos
if isinstance(mat, str):
mat = self.index_mat[mat]
if isinstance(nuc, str):
nuc = self.index_nuc[nuc]
self.data[mat, nuc] = val
@property
def n_mat(self):
return len(self.index_mat)
@property
def n_nuc(self):
return len(self.index_nuc)
@property
def n_hdf5_mats(self):
return len(self.mat_to_hdf5_ind)
[docs]
def allocate(self, volume, nuc_list, burn_list, full_burn_list, name_list=None):
"""Allocate memory for depletion step data
Parameters
----------
volume : dict of str float
Volumes corresponding to materials in full_burn_dict
nuc_list : list of str
A list of all nuclide names. Used for sorting the simulation.
burn_list : list of int
A list of all mat IDs to be burned. Used for sorting the simulation.
full_burn_list : list of str
List of all burnable material IDs
name_list : list of str, optional
Material names corresponding to materials in full_burn_list
"""
self.volume = copy.deepcopy(volume)
self.index_nuc = {nuc: i for i, nuc in enumerate(nuc_list)}
self.index_mat = {mat: i for i, mat in enumerate(burn_list)}
self.mat_to_hdf5_ind = {mat: i for i, mat in enumerate(full_burn_list)}
self.mat_to_name = dict(zip(full_burn_list, name_list)) if name_list is not None else {}
# Create storage array
self.data = np.zeros((self.n_mat, self.n_nuc))
[docs]
def distribute(self, local_materials, ranges):
"""Create a new object containing data for distributed materials
Parameters
----------
local_materials : iterable of str
Materials for this process
ranges : iterable of int
Slice-like object indicating indicies of ``local_materials``
in the material dimension of :attr:`data` and each element
in :attr:`rates`
Returns
-------
StepResult
New results object
"""
new = StepResult()
new.volume = {lm: self.volume[lm] for lm in local_materials}
new.index_mat = {mat: idx for (idx, mat) in enumerate(local_materials)}
# Direct transfer
direct_attrs = ("time", "k", "source_rate", "index_nuc",
"mat_to_hdf5_ind", "mat_to_name", "proc_time")
for attr in direct_attrs:
setattr(new, attr, getattr(self, attr))
# Get applicable slice of data
new.data = self.data[ranges]
new.rates = self.rates[ranges]
return new
[docs]
def get_material(self, mat_id: str | int) -> openmc.Material:
"""Return material object for given depleted composition
.. versionadded:: 0.13.2
Parameters
----------
mat_id : str or int
Material ID as a string or integer
Returns
-------
openmc.Material
Equivalent material
Raises
------
KeyError
If specified material ID is not found in the StepResult
"""
# Coerce to str since internal dictionaries use str keys
mat_id = str(mat_id)
with warnings.catch_warnings():
warnings.simplefilter('ignore', openmc.IDWarning)
material = openmc.Material(material_id=int(mat_id))
try:
vol = self.volume[mat_id]
except KeyError as e:
raise KeyError(
f'mat_id {mat_id} not found in StepResult. Available mat_id '
f'values are {list(self.volume.keys())}'
) from e
if mat_id in self.mat_to_name:
material.name = self.mat_to_name[mat_id]
for nuc, _ in sorted(self.index_nuc.items(), key=lambda x: x[1]):
atoms = self[mat_id, nuc]
if atoms <= 0.0:
continue
atom_per_bcm = atoms / vol * 1e-24
material.add_nuclide(nuc, atom_per_bcm)
material.volume = vol
return material
[docs]
def export_to_hdf5(self, filename, step, write_rates: bool = False):
"""Export results to an HDF5 file
Parameters
----------
filename : str
The filename to write to
step : int
What step is this?
write_rates : bool, optional
Whether to include reaction rate datasets in the results file.
"""
# Write new file if first time step, else add to existing file
kwargs = {'mode': "w" if step == 0 else "a"}
if h5py.get_config().mpi and comm.size > 1:
# Write results in parallel
kwargs['driver'] = 'mpio'
kwargs['comm'] = comm
with h5py.File(filename, **kwargs) as handle:
self._to_hdf5(handle, step, parallel=True,
write_rates=write_rates)
else:
# Gather results at root process
all_results = comm.gather(self)
# Only root process writes results
if comm.rank == 0:
with h5py.File(filename, **kwargs) as handle:
for res in all_results:
res._to_hdf5(handle, step, parallel=False,
write_rates=write_rates)
def _write_hdf5_metadata(self, handle, write_rates):
"""Writes result metadata in HDF5 file
Parameters
----------
handle : h5py.File or h5py.Group
An hdf5 file or group type to store this in.
write_rates : bool
Whether reaction rate datasets are being written.
"""
# Create and save the 5 dictionaries:
# quantities
# self.index_mat -> self.volume (TODO: support for changing volumes)
# self.index_nuc
# reactions
# self.rates.index_nuc (can be different from above, above is superset)
# self.rates.index_rx
# these are shared by every step of the simulation, and should be deduplicated.
# Store concentration mat and nuclide dictionaries (along with volumes)
handle.attrs['version'] = np.array(VERSION_RESULTS)
handle.attrs['filetype'] = np.bytes_('depletion results')
mat_list = sorted(self.mat_to_hdf5_ind, key=int)
nuc_list = sorted(self.index_nuc)
include_rates = (
write_rates
and self.rates is not None
and bool(self.rates.index_nuc)
and bool(self.rates.index_rx)
)
rxn_list = sorted(self.rates.index_rx) if include_rates else []
n_mats = self.n_hdf5_mats
n_nuc_number = len(nuc_list)
n_nuc_rxn = len(self.rates.index_nuc) if include_rates else 0
n_rxn = len(rxn_list)
mat_group = handle.create_group("materials")
for mat in mat_list:
mat_single_group = mat_group.create_group(mat)
mat_single_group.attrs["index"] = self.mat_to_hdf5_ind[mat]
mat_single_group.attrs["volume"] = self.volume[mat]
if mat in self.mat_to_name:
mat_single_group.attrs["name"] = self.mat_to_name[mat]
nuc_group = handle.create_group("nuclides")
for nuc in nuc_list:
nuc_single_group = nuc_group.create_group(nuc)
nuc_single_group.attrs["atom number index"] = self.index_nuc[nuc]
if include_rates and nuc in self.rates.index_nuc:
nuc_single_group.attrs["reaction rate index"] = (
self.rates.index_nuc[nuc])
if include_rates:
rxn_group = handle.create_group("reactions")
for rxn in rxn_list:
rxn_single_group = rxn_group.create_group(rxn)
rxn_single_group.attrs["index"] = (
self.rates.index_rx[rxn])
# Construct array storage
handle.create_dataset("number", (1, n_mats, n_nuc_number),
maxshape=(None, n_mats, n_nuc_number),
chunks=True,
dtype='float64')
if include_rates and n_nuc_rxn > 0 and n_rxn > 0:
handle.create_dataset(
"reaction rates", (1, n_mats, n_nuc_rxn, n_rxn),
maxshape=(None, n_mats, n_nuc_rxn, n_rxn),
chunks=True, dtype='float64')
handle.create_dataset("eigenvalues", (1, 2),
maxshape=(None, 2), dtype='float64')
handle.create_dataset("time", (1, 2), maxshape=(None, 2), dtype='float64')
handle.create_dataset("source_rate", (1,), maxshape=(None,),
dtype='float64')
handle.create_dataset(
"depletion time", (1,), maxshape=(None,),
dtype="float64")
handle.create_dataset(
"keff_search_root", (1,), maxshape=(None,),
dtype="float64")
def _to_hdf5(self, handle, index, parallel=False, write_rates: bool = False):
"""Converts results object into an hdf5 object.
Parameters
----------
handle : h5py.File or h5py.Group
An HDF5 file or group type to store this in.
index : int
What step is this?
parallel : bool
Being called with parallel HDF5?
write_rates : bool, optional
Whether reaction rate datasets are being written.
"""
if "/number" not in handle:
if parallel:
comm.barrier()
self._write_hdf5_metadata(handle, write_rates)
if parallel:
comm.barrier()
# Grab handles
number_dset = handle["/number"]
has_reactions = ("reaction rates" in handle)
if has_reactions:
rxn_dset = handle["/reaction rates"]
eigenvalues_dset = handle["/eigenvalues"]
time_dset = handle["/time"]
source_rate_dset = handle["/source_rate"]
proc_time_dset = handle["/depletion time"]
keff_search_root_dset = handle["/keff_search_root"]
# Get number of results stored
number_shape = list(number_dset.shape)
number_results = number_shape[0]
new_shape = index + 1
if number_results < new_shape:
# Extend first dimension by 1
number_shape[0] = new_shape
number_dset.resize(number_shape)
if has_reactions:
rxn_shape = list(rxn_dset.shape)
rxn_shape[0] = new_shape
rxn_dset.resize(rxn_shape)
eigenvalues_shape = list(eigenvalues_dset.shape)
eigenvalues_shape[0] = new_shape
eigenvalues_dset.resize(eigenvalues_shape)
time_shape = list(time_dset.shape)
time_shape[0] = new_shape
time_dset.resize(time_shape)
source_rate_shape = list(source_rate_dset.shape)
source_rate_shape[0] = new_shape
source_rate_dset.resize(source_rate_shape)
proc_shape = list(proc_time_dset.shape)
proc_shape[0] = new_shape
proc_time_dset.resize(proc_shape)
keff_search_root_shape = list(keff_search_root_dset.shape)
keff_search_root_shape[0] = new_shape
keff_search_root_dset.resize(keff_search_root_shape)
# If nothing to write, just return
if len(self.index_mat) == 0:
return
# Add data
inds = [self.mat_to_hdf5_ind[mat] for mat in self.index_mat]
low = min(inds)
high = max(inds)
number_dset[index, low:high+1] = self.data
if has_reactions:
rxn_dset[index, low:high+1] = self.rates
if comm.rank == 0:
eigenvalues_dset[index] = self.k
time_dset[index] = self.time
source_rate_dset[index] = self.source_rate
if self.proc_time is not None:
proc_time_dset[index] = (
self.proc_time / (comm.size * self.n_hdf5_mats)
)
keff_search_root_dset[index] = self.keff_search_root
[docs]
@classmethod
def from_hdf5(cls, handle, step):
"""Loads results object from HDF5.
Parameters
----------
handle : h5py.File or h5py.Group
An HDF5 file or group type to load from.
step : int
Index for depletion step
"""
results = cls()
# Grab handles
number_dset = handle["/number"]
eigenvalues_dset = handle["/eigenvalues"]
time_dset = handle["/time"]
if "source_rate" in handle:
source_rate_dset = handle["/source_rate"]
else:
# Older versions used "power" instead of "source_rate"
source_rate_dset = handle["/power"]
# Check if this is an old format file (with stages dimension) or new format
# Old format: number has shape (n_steps, n_stages, n_mats, n_nucs)
# New format: number has shape (n_steps, n_mats, n_nucs)
has_stages = len(number_dset.shape) == 4
if has_stages:
# Old format - extract data from first stage (index 0)
results.data = number_dset[step, 0, :, :]
results.k = eigenvalues_dset[step, 0, :]
# source_rate had shape (n_steps, n_stages) in old format
results.source_rate = source_rate_dset[step, 0]
else:
# New format - no stages dimension
results.data = number_dset[step, :, :]
results.k = eigenvalues_dset[step, :]
results.source_rate = source_rate_dset[step]
results.time = time_dset[step, :]
if "depletion time" in handle:
proc_time_dset = handle["/depletion time"]
if step < proc_time_dset.shape[0]:
results.proc_time = proc_time_dset[step]
if "keff_search_root" in handle:
keff_search_root_dset = handle["/keff_search_root"]
results.keff_search_root = keff_search_root_dset[step]
if results.proc_time is None:
results.proc_time = np.array([np.nan])
# Reconstruct dictionaries
results.volume = {}
results.index_mat = {}
results.index_nuc = {}
results.mat_to_name = {}
rxn_nuc_to_ind = {}
rxn_to_ind = {}
for mat, mat_handle in handle["/materials"].items():
vol = mat_handle.attrs["volume"]
ind = mat_handle.attrs["index"]
results.volume[mat] = vol
results.index_mat[mat] = ind
if "name" in mat_handle.attrs:
results.mat_to_name[mat] = mat_handle.attrs["name"]
for nuc, nuc_handle in handle["/nuclides"].items():
ind_atom = nuc_handle.attrs["atom number index"]
results.index_nuc[nuc] = ind_atom
if "reaction rate index" in nuc_handle.attrs:
rxn_nuc_to_ind[nuc] = nuc_handle.attrs["reaction rate index"]
if "reactions" in handle:
for rxn, rxn_handle in handle["/reactions"].items():
rxn_to_ind[rxn] = rxn_handle.attrs["index"]
# Reconstruct reaction rates
rate = ReactionRates(results.index_mat, rxn_nuc_to_ind, rxn_to_ind, True)
if "reaction rates" in handle:
if has_stages:
# Old format: (n_steps, n_stages, n_mats, n_nucs, n_rxns)
rate[:] = handle["/reaction rates"][step, 0, :, :, :]
else:
# New format: (n_steps, n_mats, n_nucs, n_rxns)
rate[:] = handle["/reaction rates"][step, :, :, :]
results.rates = rate
return results
[docs]
@staticmethod
def save(
op,
x,
op_results,
t,
source_rate,
step_ind,
proc_time=None,
write_rates: bool = False,
keff_search_root=None,
path: PathLike = "depletion_results.h5"
):
"""Creates and writes depletion results to disk
Parameters
----------
op : openmc.deplete.abc.TransportOperator
The operator used to generate these results.
x : numpy.array
End-of-step concentrations for each material
op_results : openmc.deplete.OperatorResult
Result of applying transport operator at end of step
t : list of float
Time indices.
source_rate : float
Source rate during time step in [W] or [neutron/sec]
step_ind : int
Step index.
proc_time : float or None
Total process time spent depleting materials. This may
be process-dependent and will be reduced across MPI
processes.
write_rates : bool, optional
Whether reaction rates should be written to the results file.
keff_search_root : float
The root returned by the keff search control.
path : PathLike
Path to file to write. Defaults to 'depletion_results.h5'.
.. versionadded:: 0.14.0
"""
# Get indexing terms
vol_dict, nuc_list, burn_list, full_burn_list, name_list = op.get_results_info()
# Create results
results = StepResult()
results.allocate(vol_dict, nuc_list, burn_list, full_burn_list, name_list)
n_mat = len(burn_list)
for mat_i in range(n_mat):
results[mat_i, :] = x[mat_i]
if isinstance(op_results.k, type(None)):
results.k = (None, None)
else:
results.k = (op_results.k.nominal_value, op_results.k.std_dev)
results.rates = op_results.rates
results.time = t
results.source_rate = source_rate
results.proc_time = proc_time
if results.proc_time is not None:
results.proc_time = comm.reduce(proc_time, op=MPI.SUM)
results.keff_search_root = keff_search_root
if not Path(path).is_file():
Path(path).parent.mkdir(parents=True, exist_ok=True)
results.export_to_hdf5(path, step_ind, write_rates)
[docs]
def transfer_volumes(self, model):
"""Transfers volumes from depletion results to geometry
Parameters
----------
model : OpenMC model to be used in a depletion restart
calculation
"""
if not model.materials:
materials = openmc.Materials(
model.geometry.get_all_materials().values()
)
else:
materials = model.materials
for material in materials:
if material.depletable:
material.volume = self.volume[str(material.id)]