from __future__ import annotations
from collections.abc import Sequence
from contextlib import nullcontext
import copy
from datetime import datetime
import json
from pathlib import Path
import numpy as np
import openmc
from . import IndependentOperator, PredictorIntegrator
from .chain import Chain
from .microxs import get_microxs_and_flux, write_microxs_hdf5, read_microxs_hdf5
from .results import Results
from ..checkvalue import PathLike
from ..mpi import comm
from openmc.lib import TemporarySession
def get_activation_materials(
model: openmc.Model,
mmv_list: list[openmc.MeshMaterialVolumes]
) -> openmc.Materials:
"""Get a list of activation materials for each mesh element/material.
When performing a mesh-based R2S calculation, a unique material is needed
for each activation region, which is a combination of a mesh element and a
material within that mesh element. This function generates a list of such
materials, each with a unique name and volume corresponding to the mesh
element and material.
Parameters
----------
model : openmc.Model
The full model containing the geometry and materials.
mmv_list : list of openmc.MeshMaterialVolumes
List of mesh material volumes objects, one per mesh, containing the
materials and their volumes for each mesh element.
Returns
-------
openmc.Materials
A list of materials, each corresponding to a unique mesh element and
material combination across all meshes.
"""
# Get all materials in the model
material_dict = model._get_all_materials()
# Create a new activation material for each element-material combination
# across all meshes
materials = openmc.Materials()
for mesh_idx, mmv in enumerate(mmv_list):
mat_ids = mmv._materials[mmv._materials > -1]
volumes = mmv._volumes[mmv._materials > -1]
elems, _ = np.where(mmv._materials > -1)
for elem, mat_id, vol in zip(elems, mat_ids, volumes):
mat = material_dict[mat_id]
new_mat = mat.clone()
new_mat.depletable = True
new_mat.name = f'Mesh {mesh_idx}, Element {elem}, Material {mat_id}'
new_mat.volume = vol
materials.append(new_mat)
return materials
[docs]
class R2SManager:
"""Manager for Rigorous 2-Step (R2S) method calculations.
This class is responsible for managing the materials and sources needed for
mesh-based or cell-based R2S calculations. It provides methods to get
activation materials and decay photon sources based on the mesh/cells and
materials in the OpenMC model. Multiple meshes can be specified as domains,
in which case each element--material combination of each mesh is treated as
an activation region (meshes are assumed to be non-overlapping).
This class supports the use of a different models for the neutron and photon
transport calculation. However, for cell-based calculations, it assumes that
the only changes in the model are material assignments. For mesh-based
calculations, it checks material assignments in the photon model and any
element--material combinations that don't appear in the photon model are
skipped.
Parameters
----------
neutron_model : openmc.Model
The OpenMC model to use for neutron transport.
domains : openmc.MeshBase or Sequence[openmc.MeshBase] or Sequence[openmc.Cell]
The mesh(es) or a sequence of cells that represent the spatial units
over which the R2S calculation will be performed. When a single
:class:`~openmc.MeshBase` or a sequence of meshes is given, each
element--material combination across all meshes is treated as an
activation region.
photon_model : openmc.Model, optional
The OpenMC model to use for photon transport calculations. If None, a
shallow copy of the neutron_model will be created and used.
Attributes
----------
domains : list of openmc.MeshBase or Sequence[openmc.Cell]
The meshes or a sequence of cells that represent the spatial units over
which the R2S calculation will be performed.
neutron_model : openmc.Model
The OpenMC model used for neutron transport.
photon_model : openmc.Model
The OpenMC model used for photon transport calculations.
method : {'mesh-based', 'cell-based'}
Indicates whether the R2S calculation uses mesh elements ('mesh-based')
as the spatial discretization or a list of cells ('cell-based').
results : dict
A dictionary that stores results from the R2S calculation.
"""
def __init__(
self,
neutron_model: openmc.Model,
domains: openmc.MeshBase | Sequence[openmc.MeshBase] | Sequence[openmc.Cell],
photon_model: openmc.Model | None = None,
):
self.neutron_model = neutron_model
if photon_model is None:
# Create a shallow copy of the neutron model for photon transport
self.photon_model = openmc.Model(
geometry=copy.copy(neutron_model.geometry),
materials=copy.copy(neutron_model.materials),
settings=copy.copy(neutron_model.settings),
tallies=copy.copy(neutron_model.tallies),
plots=copy.copy(neutron_model.plots),
)
else:
self.photon_model = photon_model
if isinstance(domains, openmc.MeshBase):
self.method = 'mesh-based'
self.domains = [domains]
elif isinstance(domains, Sequence) and len(domains) > 0 and \
isinstance(domains[0], openmc.MeshBase):
self.method = 'mesh-based'
self.domains = list(domains)
else:
self.method = 'cell-based'
self.domains = list(domains)
self.results = {}
[docs]
def run(
self,
timesteps: Sequence[float] | Sequence[tuple[float, str]],
source_rates: float | Sequence[float],
timestep_units: str = 's',
photon_time_indices: Sequence[int] | None = None,
output_dir: PathLike | None = None,
bounding_boxes: dict[int, openmc.BoundingBox] | None = None,
chain_file: PathLike | Chain | None = None,
micro_kwargs: dict | None = None,
mat_vol_kwargs: dict | None = None,
run_kwargs: dict | None = None,
operator_kwargs: dict | None = None,
):
"""Run the R2S calculation.
Parameters
----------
timesteps : Sequence[float] or Sequence[tuple[float, str]]
Sequence of timesteps. Note that values are not cumulative. The
units are specified by the `timestep_units` argument when
`timesteps` is an iterable of float. Alternatively, units can be
specified for each step by passing an iterable of (value, unit)
tuples.
source_rates : float or Sequence[float]
Source rate in [neutron/sec] for each interval in `timesteps`.
timestep_units : {'s', 'min', 'h', 'd', 'a'}, optional
Units for values specified in the `timesteps` argument when passing
float values. 's' means seconds, 'min' means minutes, 'h' means
hours, 'd' means days, and 'a' means years (Julian).
photon_time_indices : Sequence[int], optional
Sequence of time indices at which photon transport should be run;
represented as indices into the array of times formed by the
timesteps. For example, if two timesteps are specified, the array of
times would contain three entries, and [2] would indicate computing
photon results at the last time. A value of None indicates to run
photon transport for each time.
output_dir : PathLike, optional
Path to directory where R2S calculation outputs will be saved. If
not provided, a timestamped directory 'r2s_YYYY-MM-DDTHH-MM-SS' is
created. Subdirectories will be created for the neutron transport,
activation, and photon transport steps.
bounding_boxes : dict[int, openmc.BoundingBox], optional
Dictionary mapping cell IDs to bounding boxes used for spatial
source sampling in cell-based R2S calculations. Required if method
is 'cell-based'.
chain_file : PathLike or openmc.deplete.Chain, optional
Path to the depletion chain XML file or depletion chain object to
use during activation. If not provided, the default configured
chain file will be used.
micro_kwargs : dict, optional
Additional keyword arguments passed to
:func:`openmc.deplete.get_microxs_and_flux` during the neutron
transport step.
mat_vol_kwargs : dict, optional
Additional keyword arguments passed to
:meth:`openmc.MeshBase.material_volumes`.
run_kwargs : dict, optional
Additional keyword arguments passed to :meth:`openmc.Model.run`
during the neutron and photon transport step. By default, output is
disabled.
operator_kwargs : dict, optional
Additional keyword arguments passed to
:class:`openmc.deplete.IndependentOperator`.
Returns
-------
Path
Path to the output directory containing all calculation results
"""
if output_dir is None:
# Create timestamped output directory and broadcast to all ranks for
# consistency (different ranks may have slightly different times)
stamp = datetime.now().strftime('%Y-%m-%dT%H-%M-%S')
output_dir = Path(comm.bcast(f'r2s_{stamp}'))
else:
output_dir = Path(output_dir)
# Set run_kwargs for the neutron transport step
if micro_kwargs is None:
micro_kwargs = {}
if run_kwargs is None:
run_kwargs = {}
if operator_kwargs is None:
operator_kwargs = {}
run_kwargs.setdefault('output', False)
micro_kwargs.setdefault('run_kwargs', run_kwargs)
# DecaySpectrum distributions are resolved in the C++ solver using
# OPENMC_CHAIN_FILE. If a Chain object was passed, write an XML
# representation alongside the R2S outputs.
if isinstance(chain_file, Chain):
output_dir.mkdir(parents=True, exist_ok=True)
chain_path = output_dir / 'chain.xml'
if comm.rank == 0:
chain_file.export_to_xml(chain_path)
comm.barrier()
else:
chain_path = chain_file
chain_context = (
openmc.config.patch('chain_file', chain_path)
if chain_path is not None else nullcontext()
)
with chain_context:
self.step1_neutron_transport(
output_dir / 'neutron_transport', mat_vol_kwargs, micro_kwargs
)
self.step2_activation(
timesteps, source_rates, timestep_units,
output_dir / 'activation', operator_kwargs=operator_kwargs
)
self.step3_photon_transport(
photon_time_indices, bounding_boxes, output_dir / 'photon_transport',
mat_vol_kwargs=mat_vol_kwargs, run_kwargs=run_kwargs
)
return output_dir
[docs]
def step1_neutron_transport(
self,
output_dir: PathLike = "neutron_transport",
mat_vol_kwargs: dict | None = None,
micro_kwargs: dict | None = None
):
"""Run the neutron transport step.
This step computes the material volume fractions on each mesh, creates
mesh-material filters, and retrieves the fluxes and microscopic cross
sections for each mesh/material combination via a single transport
solve. This step will populate the 'fluxes' and 'micros' keys in the
results dictionary. For a mesh-based calculation, it will also populate
the 'mesh_material_volumes' key (a list of
:class:`~openmc.MeshMaterialVolumes`, one per mesh).
Parameters
----------
output_dir : PathLike, optional
The directory where the results will be saved.
mat_vol_kwargs : dict, optional
Additional keyword arguments based to
:meth:`openmc.MeshBase.material_volumes`.
micro_kwargs : dict, optional
Additional keyword arguments passed to
:func:`openmc.deplete.get_microxs_and_flux`.
"""
output_dir = Path(output_dir).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
if self.method == 'mesh-based':
# Compute material volume fractions on each mesh
if mat_vol_kwargs is None:
mat_vol_kwargs = {}
mat_vol_kwargs.setdefault('bounding_boxes', True)
mmv_list = []
domain_filters = []
for i, mesh in enumerate(self.domains):
mmv = comm.bcast(
mesh.material_volumes(self.neutron_model, **mat_vol_kwargs))
mmv_list.append(mmv)
# Save results to file
if comm.rank == 0:
mmv.save(output_dir / f'mesh_material_volumes_{i}.npz')
# Create mesh-material filter for this mesh
domain_filters.append(
openmc.MeshMaterialFilter.from_volumes(mesh, mmv))
self.results['mesh_material_volumes'] = mmv_list
domains = domain_filters
else:
domains: Sequence[openmc.Cell] = self.domains
# Check to make sure that each cell is filled with a material and
# that the volume has been set
# TODO: If volumes are not set, run volume calculation for cells
for cell in domains:
if cell.fill is None:
raise ValueError(
f"Cell {cell.id} is not filled with a materials. "
"Please set the fill material for each cell before "
"running the R2S calculation."
)
if cell.volume is None:
raise ValueError(
f"Cell {cell.id} does not have a volume set. "
"Please set the volume for each cell before running "
"the R2S calculation."
)
# Set default keyword arguments for microxs and flux calculation
if micro_kwargs is None:
micro_kwargs = {}
micro_kwargs.setdefault('path_statepoint', output_dir / 'statepoint.h5')
micro_kwargs.setdefault('path_input', output_dir / 'model.xml')
# Run neutron transport and get fluxes and micros. Run via openmc.lib to
# maintain a consistent parallelism strategy with the activation step.
with TemporarySession():
self.results['fluxes'], self.results['micros'] = get_microxs_and_flux(
self.neutron_model, domains, **micro_kwargs)
# Save flux and micros to file
if comm.rank == 0:
np.save(output_dir / 'fluxes.npy', self.results['fluxes'])
write_microxs_hdf5(self.results['micros'], output_dir / 'micros.h5')
[docs]
def step2_activation(
self,
timesteps: Sequence[float] | Sequence[tuple[float, str]],
source_rates: float | Sequence[float],
timestep_units: str = 's',
output_dir: PathLike = 'activation',
operator_kwargs: dict | None = None,
):
"""Run the activation step.
This step creates a unique copy of each activation material based on the
mesh elements or cells, then solves the depletion equations for each
material using the fluxes and microscopic cross sections obtained in the
neutron transport step. This step will populate the 'depletion_results'
and 'activation_materials' keys in the results dictionary.
Parameters
----------
timesteps : Sequence[float] or Sequence[tuple[float, str]]
Sequence of timesteps. Note that values are not cumulative. The
units are specified by the `timestep_units` argument when
`timesteps` is an iterable of float. Alternatively, units can be
specified for each step by passing an iterable of (value, unit)
tuples.
source_rates : float | Sequence[float]
Source rate in [neutron/sec] for each interval in `timesteps`.
timestep_units : {'s', 'min', 'h', 'd', 'a'}, optional
Units for values specified in the `timesteps` argument when passing
float values. 's' means seconds, 'min' means minutes, 'h' means
hours, 'd' means days, and 'a' means years (Julian).
output_dir : PathLike, optional
Path to directory where activation calculation outputs will be
saved.
operator_kwargs : dict, optional
Additional keyword arguments passed to
:class:`openmc.deplete.IndependentOperator`.
"""
if self.method == 'mesh-based':
# Get unique material for each (mesh, material) combination
mmv_list = self.results['mesh_material_volumes']
self.results['activation_materials'] = get_activation_materials(
self.neutron_model, mmv_list)
else:
# Create unique material for each cell
activation_mats = openmc.Materials()
for cell in self.domains:
mat = cell.fill.clone()
mat.name = f'Cell {cell.id}'
mat.depletable = True
mat.volume = cell.volume
activation_mats.append(mat)
self.results['activation_materials'] = activation_mats
# Save activation materials to file
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
self.results['activation_materials'].export_to_xml(
output_dir / 'materials.xml')
# Create depletion operator for the activation materials
if operator_kwargs is None:
operator_kwargs = {}
operator_kwargs.setdefault('normalization_mode', 'source-rate')
op = IndependentOperator(
self.results['activation_materials'],
self.results['fluxes'],
self.results['micros'],
**operator_kwargs
)
# Create time integrator and solve depletion equations
integrator = PredictorIntegrator(
op, timesteps, source_rates=source_rates, timestep_units=timestep_units
)
output_path = output_dir / 'depletion_results.h5'
integrator.integrate(final_step=False, path=output_path)
comm.barrier()
# Get depletion results
self.results['depletion_results'] = Results(output_path)
[docs]
def step3_photon_transport(
self,
time_indices: Sequence[int] | None = None,
bounding_boxes: dict[int, openmc.BoundingBox] | None = None,
output_dir: PathLike = 'photon_transport',
mat_vol_kwargs: dict | None = None,
run_kwargs: dict | None = None,
):
"""Run the photon transport step.
This step performs photon transport calculations using decay photon
sources created from the activated materials. For each specified time,
it creates appropriate photon sources and runs a transport calculation.
In mesh-based mode, the sources are created using the mesh material
volumes, while in cell-based mode, they are created using bounding boxes
for each cell. This step will populate the 'photon_tallies' key in the
results dictionary.
Parameters
----------
time_indices : Sequence[int], optional
Sequence of time indices at which photon transport should be run;
represented as indices into the array of times formed by the
timesteps. For example, if two timesteps are specified, the array of
times would contain three entries, and [2] would indicate computing
photon results at the last time. A value of None indicates to run
photon transport for each time.
bounding_boxes : dict[int, openmc.BoundingBox], optional
Dictionary mapping cell IDs to bounding boxes used for spatial
source sampling in cell-based R2S calculations. Required if method
is 'cell-based'.
output_dir : PathLike, optional
Path to directory where photon transport outputs will be saved.
mat_vol_kwargs : dict, optional
Additional keyword arguments passed to
:meth:`openmc.MeshBase.material_volumes`.
run_kwargs : dict, optional
Additional keyword arguments passed to :meth:`openmc.Model.run`
during the photon transport step. By default, output is disabled.
"""
# TODO: Automatically determine bounding box for each cell
if bounding_boxes is None and self.method == 'cell-based':
raise ValueError("bounding_boxes must be provided for cell-based "
"R2S calculations.")
# Set default run arguments if not provided
if run_kwargs is None:
run_kwargs = {}
run_kwargs.setdefault('output', False)
# Write out JSON file with tally IDs that can be used for loading
# results
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Get default time indices if not provided
if time_indices is None:
n_steps = len(self.results['depletion_results'])
time_indices = list(range(n_steps))
# Check whether the photon model is different
neutron_univ = self.neutron_model.geometry.root_universe
photon_univ = self.photon_model.geometry.root_universe
different_photon_model = (neutron_univ != photon_univ)
# For mesh-based calculations, compute material volume fractions for the
# photon model if it is different from the neutron model to account for
# potential material changes
if self.method == 'mesh-based' and different_photon_model:
if mat_vol_kwargs is None:
mat_vol_kwargs = {}
photon_mmv_list = []
for i, mesh in enumerate(self.domains):
photon_mmv = comm.bcast(
mesh.material_volumes(self.photon_model, **mat_vol_kwargs))
photon_mmv_list.append(photon_mmv)
# Save photon MMV results to file
if comm.rank == 0:
photon_mmv.save(
output_dir / f'mesh_material_volumes_{i}.npz')
self.results['mesh_material_volumes_photon'] = photon_mmv_list
if comm.rank == 0:
tally_ids = [tally.id for tally in self.photon_model.tallies]
with open(output_dir / 'tally_ids.json', 'w') as f:
json.dump(tally_ids, f)
self.results['photon_tallies'] = {}
# Get dictionary of cells in the photon model
if different_photon_model:
photon_cells = self.photon_model.geometry.get_all_cells()
# Determine eligible work items upfront (independent of time index).
if self.method == 'mesh-based':
work_items = self._get_mesh_work_items()
else:
work_items = []
for cell, original_mat in zip(
self.domains, self.results['activation_materials']):
if different_photon_model:
if cell.id not in photon_cells or \
cell.fill.id != photon_cells[cell.id].fill.id:
continue
work_items.append((cell, original_mat, bounding_boxes[cell.id]))
# Ensure photon transport is enabled in settings
self.photon_model.settings.photon_transport = True
for time_index in time_indices:
# Convert time_index (which may be negative) to a normal index
if time_index < 0:
time_index += len(self.results['depletion_results'])
# Build decay photon sources and assign to the photon model
sources = self._create_photon_sources(time_index, work_items)
self.photon_model.settings.source = sources
# Run photon transport calculation
photon_dir = Path(output_dir) / f'time_{time_index}'
with TemporarySession(self.photon_model, cwd=photon_dir):
statepoint_path = self.photon_model.run(**run_kwargs)
# Store tally results
with openmc.StatePoint(statepoint_path) as sp:
self.results['photon_tallies'][time_index] = [
sp.tallies[tally.id] for tally in self.photon_model.tallies
]
def _get_mesh_work_items(self):
"""Enumerate mesh-based work items across all meshes.
Returns a list of (index_mat, mat_id, bbox) tuples for each eligible
mesh element--material combination, where index_mat is the index into
the activation materials list, mat_id is the material ID, and bbox is
the bounding box for that mesh element--material combination.
Returns
-------
list of tuple
Each tuple is (index_mat, mat_id, bbox).
"""
mmv_list = self.results['mesh_material_volumes']
photon_mmv_list = self.results.get('mesh_material_volumes_photon')
work_items = []
index_mat = 0
for mesh_idx, mat_vols in enumerate(mmv_list):
photon_mat_vols = photon_mmv_list[mesh_idx] \
if photon_mmv_list is not None else None
n_elements = mat_vols.num_elements
for index_elem in range(n_elements):
if photon_mat_vols is not None:
photon_materials = {
mat_id
for mat_id, _ in photon_mat_vols.by_element(index_elem)
if mat_id is not None
}
for mat_id, _, bbox in mat_vols.by_element(
index_elem, include_bboxes=True):
if mat_id is None:
continue
if photon_mat_vols is not None \
and mat_id not in photon_materials:
index_mat += 1
continue
work_items.append((index_mat, mat_id, bbox))
index_mat += 1
return work_items
def _create_photon_sources(self, time_index, work_items):
"""Create decay photon sources for a set of regions.
Builds :class:`openmc.IndependentSource` objects with
:class:`openmc.stats.DecaySpectrum` energy distributions that will be
serialized to XML and resolved against the depletion chain by the C++
solver.
Parameters
----------
time_index : int
Index into depletion results.
work_items : list of tuple
For mesh-based: list of (index_mat, mat_id, bbox).
For cell-based: list of (cell, original_mat, bbox).
Returns
-------
list of openmc.IndependentSource
Photon sources for each activated region.
"""
step_result = self.results['depletion_results'][time_index]
materials = self.results['activation_materials']
mesh_based = self.method == 'mesh-based'
if mesh_based:
mat_dict = self.neutron_model._get_all_materials()
sources = []
for item in work_items:
if mesh_based:
index_mat, domain_id, bbox = item
original_mat = materials[index_mat]
domain = mat_dict[domain_id]
else:
cell, original_mat, bbox = item
domain = cell
activated_mat = step_result.get_material(str(original_mat.id))
nuclides = activated_mat.get_nuclide_atom_densities()
if not nuclides:
continue
# Eliminate nuclides with zero density
nuclides = {nuclide: density for nuclide, density in nuclides.items()
if density > 0}
energy = openmc.stats.DecaySpectrum(nuclides, activated_mat.volume)
energy.clip(inplace=True)
if not energy.nuclides:
continue
sources.append(openmc.IndependentSource(
space=openmc.stats.Box(bbox.lower_left, bbox.upper_right),
energy=energy,
particle='photon',
constraints={'domains': [domain]},
))
return sources
[docs]
def load_results(self, path: PathLike):
"""Load results from a previous R2S calculation.
Parameters
----------
path : PathLike
Path to the directory containing the R2S calculation results.
"""
path = Path(path)
# Load neutron transport results
neutron_dir = path / 'neutron_transport'
if self.method == 'mesh-based':
mmv_files = sorted(neutron_dir.glob('mesh_material_volumes*.npz'),
key=lambda p: int(p.stem.split('_')[-1])
if p.stem[-1].isdigit() else 0)
if mmv_files:
self.results['mesh_material_volumes'] = [
openmc.MeshMaterialVolumes.from_npz(f) for f in mmv_files
]
fluxes_file = neutron_dir / 'fluxes.npy'
if fluxes_file.exists():
self.results['fluxes'] = list(np.load(fluxes_file, allow_pickle=True))
micros_dict = read_microxs_hdf5(neutron_dir / 'micros.h5')
self.results['micros'] = [
micros_dict[f'domain_{i}'] for i in range(len(micros_dict))
]
# Load activation results
activation_dir = path / 'activation'
activation_results = activation_dir / 'depletion_results.h5'
if activation_results.exists():
self.results['depletion_results'] = Results(activation_results)
activation_mats_file = activation_dir / 'materials.xml'
if activation_mats_file.exists():
self.results['activation_materials'] = \
openmc.Materials.from_xml(activation_mats_file)
# Load photon transport results
photon_dir = path / 'photon_transport'
# Load photon mesh material volumes if they exist (for mesh-based calculations)
if self.method == 'mesh-based':
photon_mmv_files = sorted(
photon_dir.glob('mesh_material_volumes*.npz'),
key=lambda p: int(p.stem.split('_')[-1])
if p.stem[-1].isdigit() else 0)
if photon_mmv_files:
self.results['mesh_material_volumes_photon'] = [
openmc.MeshMaterialVolumes.from_npz(f)
for f in photon_mmv_files
]
# Load tally IDs from JSON file
tally_ids_path = photon_dir / 'tally_ids.json'
if tally_ids_path.exists():
with tally_ids_path.open('r') as f:
tally_ids = json.load(f)
self.results['photon_tallies'] = {}
# For each photon transport calc, load the statepoint and get the
# tally results based on tally_ids
for time_dir in photon_dir.glob('time_*'):
time_index = int(time_dir.name.split('_')[1])
for sp_path in time_dir.glob('statepoint.*.h5'):
with openmc.StatePoint(sp_path) as sp:
self.results['photon_tallies'][time_index] = [
sp.tallies[tally_id] for tally_id in tally_ids
]