# Source code for openmc.tallies

```
from collections.abc import Iterable, MutableSequence
import copy
from functools import partial, reduce
from itertools import product
from numbers import Integral, Real
import operator
from pathlib import Path
from xml.etree import ElementTree as ET
import h5py
import numpy as np
import pandas as pd
from scipy.misc import derivative
import scipy.sparse as sps
import openmc
import openmc.checkvalue as cv
from ._xml import clean_indentation, reorder_attributes, get_text
from .mixin import IDManagerMixin
from .mesh import MeshBase
# The tally arithmetic product types. The tensor product performs the full
# cross product of the data in two tallies with respect to a specified axis
# (filters, nuclides, or scores). The entrywise product performs the arithmetic
# operation entrywise across the entries in two tallies with respect to a
# specified axis.
_PRODUCT_TYPES = ['tensor', 'entrywise']
# The following indicate acceptable types when setting Tally.scores,
# Tally.nuclides, and Tally.filters
_SCORE_CLASSES = (str, openmc.CrossScore, openmc.AggregateScore)
_NUCLIDE_CLASSES = (str, openmc.CrossNuclide, openmc.AggregateNuclide)
_FILTER_CLASSES = (openmc.Filter, openmc.CrossFilter, openmc.AggregateFilter)
# Valid types of estimators
ESTIMATOR_TYPES = ['tracklength', 'collision', 'analog']
[docs]class Tally(IDManagerMixin):
"""A tally defined by a set of scores that are accumulated for a list of
nuclides given a set of filters.
Parameters
----------
tally_id : int, optional
Unique identifier for the tally. If none is specified, an identifier
will automatically be assigned
name : str, optional
Name of the tally. If not specified, the name is the empty string.
Attributes
----------
id : int
Unique identifier for the tally
name : str
Name of the tally
filters : list of openmc.Filter
List of specified filters for the tally
nuclides : list of openmc.Nuclide
List of nuclides to score results for
scores : list of str
List of defined scores, e.g. 'flux', 'fission', etc.
estimator : {'analog', 'tracklength', 'collision'}
Type of estimator for the tally
triggers : list of openmc.Trigger
List of tally triggers
num_scores : int
Total number of scores
num_filter_bins : int
Total number of filter bins accounting for all filters
num_bins : int
Total number of bins for the tally
shape : 3-tuple of int
The shape of the tally data array ordered as the number of filter bins,
nuclide bins and score bins
filter_strides : list of int
Stride in memory for each filter
num_realizations : int
Total number of realizations
with_summary : bool
Whether or not a Summary has been linked
sum : numpy.ndarray
An array containing the sum of each independent realization for each bin
sum_sq : numpy.ndarray
An array containing the sum of each independent realization squared for
each bin
mean : numpy.ndarray
An array containing the sample mean for each bin
std_dev : numpy.ndarray
An array containing the sample standard deviation for each bin
derived : bool
Whether or not the tally is derived from one or more other tallies
sparse : bool
Whether or not the tally uses SciPy's LIL sparse matrix format for
compressed data storage
derivative : openmc.TallyDerivative
A material perturbation derivative to apply to all scores in the tally.
"""
next_id = 1
used_ids = set()
def __init__(self, tally_id=None, name=''):
# Initialize Tally class attributes
self.id = tally_id
self.name = name
self._filters = cv.CheckedList(_FILTER_CLASSES, 'tally filters')
self._nuclides = cv.CheckedList(_NUCLIDE_CLASSES, 'tally nuclides')
self._scores = cv.CheckedList(_SCORE_CLASSES, 'tally scores')
self._estimator = None
self._triggers = cv.CheckedList(openmc.Trigger, 'tally triggers')
self._derivative = None
self._num_realizations = 0
self._with_summary = False
self._sum = None
self._sum_sq = None
self._mean = None
self._std_dev = None
self._with_batch_statistics = False
self._derived = False
self._sparse = False
self._sp_filename = None
self._results_read = False
def __repr__(self):
parts = ['Tally']
parts.append('{: <15}=\t{}'.format('ID', self.id))
parts.append('{: <15}=\t{}'.format('Name', self.name))
if self.derivative is not None:
parts.append('{: <15}=\t{}'.format('Derivative ID', self.derivative.id))
filters = ', '.join(type(f).__name__ for f in self.filters)
parts.append('{: <15}=\t{}'.format('Filters', filters))
nuclides = ' '.join(str(nuclide) for nuclide in self.nuclides)
parts.append('{: <15}=\t{}'.format('Nuclides', nuclides))
parts.append('{: <15}=\t{}'.format('Scores', self.scores))
parts.append('{: <15}=\t{}'.format('Estimator', self.estimator))
return '\n\t'.join(parts)
@property
def name(self):
return self._name
@property
def filters(self):
return self._filters
@property
def nuclides(self):
return self._nuclides
@property
def num_nuclides(self):
return len(self._nuclides)
@property
def scores(self):
return self._scores
@property
def num_scores(self):
return len(self._scores)
@property
def num_filters(self):
return len(self.filters)
@property
def num_filter_bins(self):
return reduce(operator.mul, (f.num_bins for f in self.filters), 1)
@property
def num_bins(self):
return self.num_filter_bins * self.num_nuclides * self.num_scores
@property
def shape(self):
return (self.num_filter_bins, self.num_nuclides, self.num_scores)
@property
def estimator(self):
return self._estimator
@property
def triggers(self):
return self._triggers
@property
def num_realizations(self):
return self._num_realizations
@property
def with_summary(self):
return self._with_summary
def _read_results(self):
if self._results_read:
return
# Open the HDF5 statepoint file
with h5py.File(self._sp_filename, 'r') as f:
# Extract Tally data from the file
data = f[f'tallies/tally {self.id}/results']
sum_ = data[:, :, 0]
sum_sq = data[:, :, 1]
# Reshape the results arrays
sum_ = np.reshape(sum_, self.shape)
sum_sq = np.reshape(sum_sq, self.shape)
# Set the data for this Tally
self._sum = sum_
self._sum_sq = sum_sq
# Convert NumPy arrays to SciPy sparse LIL matrices
if self.sparse:
self._sum = sps.lil_matrix(self._sum.flatten(), self._sum.shape)
self._sum_sq = sps.lil_matrix(self._sum_sq.flatten(), self._sum_sq.shape)
# Indicate that Tally results have been read
self._results_read = True
@property
def sum(self):
if not self._sp_filename or self.derived:
return None
# Make sure results have been read
self._read_results()
if self.sparse:
return np.reshape(self._sum.toarray(), self.shape)
else:
return self._sum
@property
def sum_sq(self):
if not self._sp_filename or self.derived:
return None
# Make sure results have been read
self._read_results()
if self.sparse:
return np.reshape(self._sum_sq.toarray(), self.shape)
else:
return self._sum_sq
@property
def mean(self):
if self._mean is None:
if not self._sp_filename:
return None
self._mean = self.sum / self.num_realizations
# Convert NumPy array to SciPy sparse LIL matrix
if self.sparse:
self._mean = sps.lil_matrix(self._mean.flatten(),
self._mean.shape)
if self.sparse:
return np.reshape(self._mean.toarray(), self.shape)
else:
return self._mean
@property
def std_dev(self):
if self._std_dev is None:
if not self._sp_filename:
return None
n = self.num_realizations
nonzero = np.abs(self.mean) > 0
self._std_dev = np.zeros_like(self.mean)
self._std_dev[nonzero] = np.sqrt((self.sum_sq[nonzero]/n -
self.mean[nonzero]**2)/(n - 1))
# Convert NumPy array to SciPy sparse LIL matrix
if self.sparse:
self._std_dev = sps.lil_matrix(self._std_dev.flatten(),
self._std_dev.shape)
self.with_batch_statistics = True
if self.sparse:
return np.reshape(self._std_dev.toarray(), self.shape)
else:
return self._std_dev
@property
def with_batch_statistics(self):
return self._with_batch_statistics
@property
def derived(self):
return self._derived
@property
def derivative(self):
return self._derivative
@property
def sparse(self):
return self._sparse
@estimator.setter
def estimator(self, estimator):
cv.check_value('estimator', estimator, ESTIMATOR_TYPES)
self._estimator = estimator
@triggers.setter
def triggers(self, triggers):
cv.check_type('tally triggers', triggers, MutableSequence)
self._triggers = cv.CheckedList(openmc.Trigger, 'tally triggers',
triggers)
@name.setter
def name(self, name):
cv.check_type('tally name', name, str, none_ok=True)
self._name = name
@derivative.setter
def derivative(self, deriv):
cv.check_type('tally derivative', deriv, openmc.TallyDerivative,
none_ok=True)
self._derivative = deriv
@filters.setter
def filters(self, filters):
cv.check_type('tally filters', filters, MutableSequence)
# If the filter is already in the Tally, raise an error
visited_filters = set()
for f in filters:
if f in visited_filters:
msg = (f'Unable to add a duplicate filter "{f}" to Tally '
f'ID="{self.id}" since duplicate filters are not '
'supported in the OpenMC Python API')
raise ValueError(msg)
visited_filters.add(f)
self._filters = cv.CheckedList(_FILTER_CLASSES, 'tally filters', filters)
@nuclides.setter
def nuclides(self, nuclides):
cv.check_type('tally nuclides', nuclides, MutableSequence)
# If the nuclide is already in the Tally, raise an error
visited_nuclides = set()
for nuc in nuclides:
if nuc in visited_nuclides:
msg = (f'Unable to add a duplicate nuclide "{nuc}" to Tally ID='
f'"{self.id}" since duplicate nuclides are not supported '
'in the OpenMC Python API')
raise ValueError(msg)
visited_nuclides.add(nuc)
self._nuclides = cv.CheckedList(_NUCLIDE_CLASSES, 'tally nuclides',
nuclides)
@scores.setter
def scores(self, scores):
cv.check_type('tally scores', scores, MutableSequence)
visited_scores = set()
for i, score in enumerate(scores):
# If the score is already in the Tally, raise an error
if score in visited_scores:
msg = (f'Unable to add a duplicate score "{score}" to Tally '
f'ID="{self.id}" since duplicate scores are not '
'supported in the OpenMC Python API')
raise ValueError(msg)
visited_scores.add(score)
# If score is a string, strip whitespace
if isinstance(score, str):
# Check to see if scores are deprecated before storing
for deprecated in ['scatter-', 'nu-scatter-', 'scatter-p',
'nu-scatter-p', 'scatter-y', 'nu-scatter-y',
'flux-y', 'total-y']:
if score.strip().startswith(deprecated):
msg = score.strip() + ' is no longer supported.'
raise ValueError(msg)
scores[i] = score.strip()
self._scores = cv.CheckedList(_SCORE_CLASSES, 'tally scores', scores)
@num_realizations.setter
def num_realizations(self, num_realizations):
cv.check_type('number of realizations', num_realizations, Integral)
cv.check_greater_than('number of realizations', num_realizations, 0, True)
self._num_realizations = num_realizations
@with_summary.setter
def with_summary(self, with_summary):
cv.check_type('with_summary', with_summary, bool)
self._with_summary = with_summary
@with_batch_statistics.setter
def with_batch_statistics(self, with_batch_statistics):
cv.check_type('with_batch_statistics', with_batch_statistics, bool)
self._with_batch_statistics = with_batch_statistics
@sum.setter
def sum(self, sum):
cv.check_type('sum', sum, Iterable)
self._sum = sum
@sum_sq.setter
def sum_sq(self, sum_sq):
cv.check_type('sum_sq', sum_sq, Iterable)
self._sum_sq = sum_sq
@sparse.setter
def sparse(self, sparse):
"""Convert tally data from NumPy arrays to SciPy list of lists (LIL)
sparse matrices, and vice versa.
This property may be used to reduce the amount of data in memory during
tally data processing. The tally data will be stored as SciPy LIL
matrices internally within the Tally object. All tally data access
properties and methods will return data as a dense NumPy array.
"""
cv.check_type('sparse', sparse, bool)
# Convert NumPy arrays to SciPy sparse LIL matrices
if sparse and not self.sparse:
if self._sum is not None:
self._sum = sps.lil_matrix(self._sum.flatten(), self._sum.shape)
if self._sum_sq is not None:
self._sum_sq = sps.lil_matrix(self._sum_sq.flatten(),
self._sum_sq.shape)
if self._mean is not None:
self._mean = sps.lil_matrix(self._mean.flatten(),
self._mean.shape)
if self._std_dev is not None:
self._std_dev = sps.lil_matrix(self._std_dev.flatten(),
self._std_dev.shape)
self._sparse = True
# Convert SciPy sparse LIL matrices to NumPy arrays
elif not sparse and self.sparse:
if self._sum is not None:
self._sum = np.reshape(self._sum.toarray(), self.shape)
if self._sum_sq is not None:
self._sum_sq = np.reshape(self._sum_sq.toarray(), self.shape)
if self._mean is not None:
self._mean = np.reshape(self._mean.toarray(), self.shape)
if self._std_dev is not None:
self._std_dev = np.reshape(self._std_dev.toarray(), self.shape)
self._sparse = False
[docs] def remove_score(self, score):
"""Remove a score from the tally
Parameters
----------
score : str
Score to remove
"""
if score not in self.scores:
msg = f'Unable to remove score "{score}" from Tally ' \
f'ID="{self.id}" since the Tally does not contain this score'
raise ValueError(msg)
self._scores.remove(score)
[docs] def remove_filter(self, old_filter):
"""Remove a filter from the tally
Parameters
----------
old_filter : openmc.Filter
Filter to remove
"""
if old_filter not in self.filters:
msg = f'Unable to remove filter "{old_filter}" from Tally ' \
f'ID="{self.id}" since the Tally does not contain this filter'
raise ValueError(msg)
self._filters.remove(old_filter)
[docs] def remove_nuclide(self, nuclide):
"""Remove a nuclide from the tally
Parameters
----------
nuclide : openmc.Nuclide
Nuclide to remove
"""
if nuclide not in self.nuclides:
msg = f'Unable to remove nuclide "{nuclide}" from Tally ' \
f'ID="{self.id}" since the Tally does not contain this nuclide'
raise ValueError(msg)
self._nuclides.remove(nuclide)
def _can_merge_filters(self, other):
"""Determine if another tally's filters can be merged with this one's
The types of filters between the two tallies must match identically.
The bins in all of the filters must match identically, or be mergeable
in only one filter. This is a helper method for the can_merge(...)
and merge(...) methods.
Parameters
----------
other : openmc.Tally
Tally to check for mergeable filters
"""
# Two tallies must have the same number of filters
if len(self.filters) != len(other.filters):
return False
# Return False if only one tally has a delayed group filter
tally1_dg = self.contains_filter(openmc.DelayedGroupFilter)
tally2_dg = other.contains_filter(openmc.DelayedGroupFilter)
if tally1_dg != tally2_dg:
return False
# Look to see if all filters are the same, or one or more can be merged
for filter1 in self.filters:
mergeable = False
for filter2 in other.filters:
if filter1 == filter2 or filter1.can_merge(filter2):
mergeable = True
break
# If no mergeable filter was found, the tallies are not mergeable
if not mergeable:
return False
# Tally filters are mergeable if all conditional checks passed
return True
def _can_merge_nuclides(self, other):
"""Determine if another tally's nuclides can be merged with this one's
The nuclides between the two tallies must be mutually exclusive or
identically matching. This is a helper method for the can_merge(...)
and merge(...) methods.
Parameters
----------
other : openmc.Tally
Tally to check for mergeable nuclides
"""
no_nuclides_match = True
all_nuclides_match = True
# Search for each of this tally's nuclides in the other tally
for nuclide in self.nuclides:
if nuclide not in other.nuclides:
all_nuclides_match = False
else:
no_nuclides_match = False
# Search for each of the other tally's nuclides in this tally
for nuclide in other.nuclides:
if nuclide not in self.nuclides:
all_nuclides_match = False
else:
no_nuclides_match = False
# Either all nuclides should match, or none should
return no_nuclides_match or all_nuclides_match
def _can_merge_scores(self, other):
"""Determine if another tally's scores can be merged with this one's
The scores between the two tallies must be mutually exclusive or
identically matching. This is a helper method for the can_merge(...)
and merge(...) methods.
Parameters
----------
other : openmc.Tally
Tally to check for mergeable scores
"""
no_scores_match = True
all_scores_match = True
# Search for each of this tally's scores in the other tally
for score in self.scores:
if score in other.scores:
no_scores_match = False
# Search for each of the other tally's scores in this tally
for score in other.scores:
if score not in self.scores:
all_scores_match = False
else:
no_scores_match = False
if score == 'current' and score not in self.scores:
return False
# Nuclides cannot be specified on 'flux' scores
if 'flux' in self.scores or 'flux' in other.scores:
if self.nuclides != other.nuclides:
return False
# Either all scores should match, or none should
return no_scores_match or all_scores_match
[docs] def can_merge(self, other):
"""Determine if another tally can be merged with this one
If results have been loaded from a statepoint, then tallies are only
mergeable along one and only one of filter bins, nuclides or scores.
Parameters
----------
other : openmc.Tally
Tally to check for merging
"""
if not isinstance(other, Tally):
return False
# Must have same estimator
if self.estimator != other.estimator:
return False
equal_filters = sorted(self.filters) == sorted(other.filters)
equal_nuclides = sorted(self.nuclides) == sorted(other.nuclides)
equal_scores = sorted(self.scores) == sorted(other.scores)
equality = [equal_filters, equal_nuclides, equal_scores]
# If all filters, nuclides and scores match then tallies are mergeable
if all(equality):
return True
# Variables to indicate filter bins, nuclides, and scores that can be merged
can_merge_filters = self._can_merge_filters(other)
can_merge_nuclides = self._can_merge_nuclides(other)
can_merge_scores = self._can_merge_scores(other)
mergeability = [can_merge_filters, can_merge_nuclides, can_merge_scores]
if not all(mergeability):
return False
# If the tally results have been read from the statepoint, at least two
# of filters, nuclides and scores must match
else:
return not self._results_read or sum(equality) >= 2
[docs] def merge(self, other):
"""Merge another tally with this one
If results have been loaded from a statepoint, then tallies are only
mergeable along one and only one of filter bins, nuclides or scores.
Parameters
----------
other : openmc.Tally
Tally to merge with this one
Returns
-------
merged_tally : openmc.Tally
Merged tallies
"""
if not self.can_merge(other):
msg = f'Unable to merge tally ID="{other.id}" with "{self.id}"'
raise ValueError(msg)
# Create deep copy of tally to return as merged tally
merged_tally = copy.deepcopy(self)
# Differentiate Tally with a new auto-generated Tally ID
merged_tally.id = None
# Create deep copy of other tally to use for array concatenation
other_copy = copy.deepcopy(other)
# Identify if filters, nuclides and scores are mergeable and/or equal
merge_filters = self._can_merge_filters(other)
merge_nuclides = self._can_merge_nuclides(other)
merge_scores = self._can_merge_scores(other)
equal_filters = sorted(self.filters) == sorted(other.filters)
equal_nuclides = sorted(self.nuclides) == sorted(other.nuclides)
equal_scores = sorted(self.scores) == sorted(other.scores)
# If two tallies can be merged along a filter's bins
if merge_filters and not equal_filters:
# Search for mergeable filters
for i, filter1 in enumerate(self.filters):
for filter2 in other.filters:
if filter1 != filter2 and filter1.can_merge(filter2):
other_copy._swap_filters(other_copy.filters[i], filter2)
merged_tally.filters[i] = filter1.merge(filter2)
join_right = filter1 < filter2
merge_axis = i
break
# If two tallies can be merged along nuclide bins
if merge_nuclides and not equal_nuclides:
merge_axis = self.num_filters
join_right = True
# Add unique nuclides from other tally to merged tally
for nuclide in other.nuclides:
if nuclide not in merged_tally.nuclides:
merged_tally.nuclides.append(nuclide)
# If two tallies can be merged along score bins
if merge_scores and not equal_scores:
merge_axis = self.num_filters + 1
join_right = True
# Add unique scores from other tally to merged tally
for score in other.scores:
if score not in merged_tally.scores:
merged_tally.scores.append(score)
# Add triggers from other tally to merged tally
for trigger in other.triggers:
merged_tally.triggers.append(trigger)
# If results have not been read, then return tally for input generation
if self._results_read is None:
return merged_tally
# Otherwise, this is a derived tally which needs merged results arrays
else:
self._derived = True
# Concatenate sum arrays if present in both tallies
if self.sum is not None and other_copy.sum is not None:
self_sum = self.get_reshaped_data(value='sum')
other_sum = other_copy.get_reshaped_data(value='sum')
if join_right:
merged_sum = np.concatenate((self_sum, other_sum),
axis=merge_axis)
else:
merged_sum = np.concatenate((other_sum, self_sum),
axis=merge_axis)
merged_tally._sum = np.reshape(merged_sum, merged_tally.shape)
# Concatenate sum_sq arrays if present in both tallies
if self.sum_sq is not None and other.sum_sq is not None:
self_sum_sq = self.get_reshaped_data(value='sum_sq')
other_sum_sq = other_copy.get_reshaped_data(value='sum_sq')
if join_right:
merged_sum_sq = np.concatenate((self_sum_sq, other_sum_sq),
axis=merge_axis)
else:
merged_sum_sq = np.concatenate((other_sum_sq, self_sum_sq),
axis=merge_axis)
merged_tally._sum_sq = np.reshape(merged_sum_sq, merged_tally.shape)
# Concatenate mean arrays if present in both tallies
if self.mean is not None and other.mean is not None:
self_mean = self.get_reshaped_data(value='mean')
other_mean = other_copy.get_reshaped_data(value='mean')
if join_right:
merged_mean = np.concatenate((self_mean, other_mean),
axis=merge_axis)
else:
merged_mean = np.concatenate((other_mean, self_mean),
axis=merge_axis)
merged_tally._mean = np.reshape(merged_mean, merged_tally.shape)
# Concatenate std. dev. arrays if present in both tallies
if self.std_dev is not None and other.std_dev is not None:
self_std_dev = self.get_reshaped_data(value='std_dev')
other_std_dev = other_copy.get_reshaped_data(value='std_dev')
if join_right:
merged_std_dev = np.concatenate((self_std_dev, other_std_dev),
axis=merge_axis)
else:
merged_std_dev = np.concatenate((other_std_dev, self_std_dev),
axis=merge_axis)
merged_tally._std_dev = np.reshape(merged_std_dev, merged_tally.shape)
# Sparsify merged tally if both tallies are sparse
merged_tally.sparse = self.sparse and other.sparse
return merged_tally
[docs] def to_xml_element(self):
"""Return XML representation of the tally
Returns
-------
element : xml.etree.ElementTree.Element
XML element containing tally data
"""
element = ET.Element("tally")
# Tally ID
element.set("id", str(self.id))
# Optional Tally name
if self.name != '':
element.set("name", self.name)
# Optional Tally filters
if len(self.filters) > 0:
subelement = ET.SubElement(element, "filters")
subelement.text = ' '.join(str(f.id) for f in self.filters)
# Optional Nuclides
if self.nuclides:
subelement = ET.SubElement(element, "nuclides")
subelement.text = ' '.join(str(n) for n in self.nuclides)
# Scores
if len(self.scores) == 0:
msg = f'Unable to get XML for Tally ID="{self.id}" since it does ' \
'not contain any scores'
raise ValueError(msg)
subelement = ET.SubElement(element, "scores")
subelement.text = ' '.join(str(x) for x in self.scores)
# Tally estimator type
if self.estimator is not None:
subelement = ET.SubElement(element, "estimator")
subelement.text = self.estimator
# Optional Triggers
for trigger in self.triggers:
element.append(trigger.to_xml_element())
# Optional derivatives
if self.derivative is not None:
subelement = ET.SubElement(element, "derivative")
subelement.text = str(self.derivative.id)
return element
[docs] @classmethod
def from_xml_element(cls, elem, **kwargs):
"""Generate tally object from an XML element
Parameters
----------
elem : xml.etree.ElementTree.Element
XML element
Returns
-------
openmc.Tally
Tally object
"""
tally_id = int(elem.get('id'))
name = elem.get('name', '')
tally = cls(tally_id=tally_id, name=name)
# Read filters
filters_elem = elem.find('filters')
if filters_elem is not None:
filter_ids = [int(x) for x in filters_elem.text.split()]
tally.filters = [kwargs['filters'][uid] for uid in filter_ids]
# Read nuclides
nuclides_elem = elem.find('nuclides')
if nuclides_elem is not None:
tally.nuclides = nuclides_elem.text.split()
# Read scores
scores_elem = elem.find('scores')
if scores_elem is not None:
tally.scores = scores_elem.text.split()
# Set estimator
estimator_elem = elem.find('estimator')
if estimator_elem is not None:
tally.estimator = estimator_elem.text
# Read triggers
tally.triggers = [
openmc.Trigger.from_xml_element(trigger_elem)
for trigger_elem in elem.findall('trigger')
]
# Read tally derivative
deriv_elem = elem.find('derivative')
if deriv_elem is not None:
deriv_id = int(deriv_elem.text)
tally.derivative = kwargs['derivatives'][deriv_id]
return tally
[docs] def contains_filter(self, filter_type):
"""Looks for a filter in the tally that matches a specified type
Parameters
----------
filter_type : openmc.FilterMeta
Type of the filter, e.g. MeshFilter
Returns
-------
filter_found : bool
True if the tally contains a filter of the requested type;
otherwise false
"""
for test_filter in self.filters:
if type(test_filter) is filter_type:
return True
return False
[docs] def find_filter(self, filter_type):
"""Return a filter in the tally that matches a specified type
Parameters
----------
filter_type : openmc.FilterMeta
Type of the filter, e.g. MeshFilter
Returns
-------
filter_found : openmc.Filter
Filter from this tally with matching type, or None if no matching
Filter is found
Raises
------
ValueError
If no matching Filter is found
"""
# Look through all of this Tally's Filters for the type requested
for test_filter in self.filters:
if type(test_filter) is filter_type:
return test_filter
# Also check to see if the desired filter is wrapped up in an
# aggregate
elif isinstance(test_filter, openmc.AggregateFilter):
if isinstance(test_filter.aggregate_filter, filter_type):
return test_filter
# If we did not find the Filter, throw an Exception
msg = f'Unable to find filter type "{filter_type}" in Tally ' \
f'ID="{self.id}"'
raise ValueError(msg)
[docs] def get_nuclide_index(self, nuclide):
"""Returns the index in the Tally's results array for a Nuclide bin
Parameters
----------
nuclide : str
The name of the Nuclide (e.g., 'H1', 'U238')
Returns
-------
nuclide_index : int
The index in the Tally data array for this nuclide.
Raises
------
KeyError
When the argument passed to the 'nuclide' parameter cannot be found
in the Tally.
"""
# Look for the user-requested nuclide in all of the Tally's Nuclides
for i, test_nuclide in enumerate(self.nuclides):
# If the Summary was linked, then values are Nuclide objects
if isinstance(test_nuclide, openmc.Nuclide):
if test_nuclide.name == nuclide:
return i
# If the Summary has not been linked, then values are ZAIDs
else:
if test_nuclide == nuclide:
return i
msg = (f'Unable to get the nuclide index for Tally since "{nuclide}" '
'is not one of the nuclides')
raise KeyError(msg)
[docs] def get_score_index(self, score):
"""Returns the index in the Tally's results array for a score bin
Parameters
----------
score : str
The score string (e.g., 'absorption', 'nu-fission')
Returns
-------
score_index : int
The index in the Tally data array for this score.
Raises
------
ValueError
When the argument passed to the 'score' parameter cannot be found in
the Tally.
"""
try:
score_index = self.scores.index(score)
except ValueError:
msg = f'Unable to get the score index for Tally since "{score}" ' \
'is not one of the scores'
raise ValueError(msg)
return score_index
[docs] def get_filter_indices(self, filters=[], filter_bins=[]):
"""Get indices into the filter axis of this tally's data arrays.
This is a helper method for the Tally.get_values(...) method to
extract tally data. This method returns the indices into the filter
axis of the tally's data array (axis=0) for particular combinations
of filters and their corresponding bins.
Parameters
----------
filters : Iterable of openmc.FilterMeta
An iterable of filter types
(e.g., [MeshFilter, EnergyFilter]; default is [])
filter_bins : Iterable of tuple
A list of tuples of filter bins corresponding to the filter_types
parameter (e.g., [(1,), ((0., 0.625e-6),)]; default is []). Each
tuple contains bins for the corresponding filter type in the filters
parameter. Each bin is an integer ID for Material-, Surface-,
Cell-, Cellborn-, and Universe- Filters. Each bin is an integer
for the cell instance ID for DistribcellFilters. Each bin is a
2-tuple of floats for Energy- and Energyout- Filters corresponding
to the energy boundaries of the bin of interest. The bin is an
(x,y,z) 3-tuple for MeshFilters corresponding to the mesh cell
of interest. The order of the bins in the list must correspond to
the filter_types parameter.
Returns
-------
numpy.ndarray
A NumPy array of the filter indices
"""
cv.check_type('filters', filters, Iterable, openmc.FilterMeta)
cv.check_type('filter_bins', filter_bins, Iterable, tuple)
# If user did not specify any specific Filters, use them all
if not filters:
return np.arange(self.num_filter_bins)
# Initialize empty list of indices for each bin in each Filter
filter_indices = []
# Loop over all of the Tally's Filters
for i, self_filter in enumerate(self.filters):
# If a user-requested Filter, get the user-requested bins
for j, test_filter in enumerate(filters):
if type(self_filter) is test_filter:
bins = filter_bins[j]
break
else:
# If not a user-requested Filter, get all bins
if isinstance(self_filter, openmc.DistribcellFilter):
# Create list of cell instance IDs for distribcell Filters
bins = list(range(self_filter.num_bins))
elif isinstance(self_filter, openmc.EnergyFunctionFilter):
# EnergyFunctionFilters don't have bins so just add a None
bins = [None]
else:
# Create list of IDs for bins for all other filter types
bins = self_filter.bins
# Add indices for each bin in this Filter to the list
indices = np.array([self_filter.get_bin_index(b) for b in bins])
filter_indices.append(indices)
# Account for stride in each of the previous filters
for indices in filter_indices[:i]:
indices *= self_filter.num_bins
# Apply outer product sum between all filter bin indices
return list(map(sum, product(*filter_indices)))
[docs] def get_nuclide_indices(self, nuclides):
"""Get indices into the nuclide axis of this tally's data arrays.
This is a helper method for the Tally.get_values(...) method to
extract tally data. This method returns the indices into the nuclide
axis of the tally's data array (axis=1) for one or more nuclides.
Parameters
----------
nuclides : list of str
A list of nuclide name strings
(e.g., ['U235', 'U238']; default is [])
Returns
-------
numpy.ndarray
A NumPy array of the nuclide indices
"""
cv.check_iterable_type('nuclides', nuclides, str)
# If user did not specify any specific Nuclides, use them all
if not nuclides:
return np.arange(self.num_nuclides)
# Determine the score indices from any of the requested scores
nuclide_indices = np.zeros_like(nuclides, dtype=int)
for i, nuclide in enumerate(nuclides):
nuclide_indices[i] = self.get_nuclide_index(nuclide)
return nuclide_indices
[docs] def get_score_indices(self, scores):
"""Get indices into the score axis of this tally's data arrays.
This is a helper method for the Tally.get_values(...) method to
extract tally data. This method returns the indices into the score
axis of the tally's data array (axis=2) for one or more scores.
Parameters
----------
scores : list of str or openmc.CrossScore
A list of one or more score strings
(e.g., ['absorption', 'nu-fission']; default is [])
Returns
-------
numpy.ndarray
A NumPy array of the score indices
"""
for score in scores:
if not isinstance(score, (str, openmc.CrossScore)):
msg = f'Unable to get score indices for score "{score}" in ' \
f'ID="{self.id}" since it is not a string or CrossScore ' \
'Tally'
raise ValueError(msg)
# Determine the score indices from any of the requested scores
if scores:
score_indices = np.zeros(len(scores), dtype=int)
for i, score in enumerate(scores):
score_indices[i] = self.get_score_index(score)
# If user did not specify any specific scores, use them all
else:
score_indices = np.arange(self.num_scores)
return score_indices
[docs] def get_values(self, scores=[], filters=[], filter_bins=[],
nuclides=[], value='mean'):
"""Returns one or more tallied values given a list of scores, filters,
filter bins and nuclides.
This method constructs a 3D NumPy array for the requested Tally data
indexed by filter bin, nuclide bin, and score index. The method will
order the data in the array as specified in the parameter lists.
Parameters
----------
scores : list of str
A list of one or more score strings
(e.g., ['absorption', 'nu-fission']; default is [])
filters : Iterable of openmc.FilterMeta
An iterable of filter types
(e.g., [MeshFilter, EnergyFilter]; default is [])
filter_bins : list of Iterables
A list of tuples of filter bins corresponding to the filter_types
parameter (e.g., [(1,), ((0., 0.625e-6),)]; default is []). Each
tuple contains bins for the corresponding filter type in the filters
parameter. Each bins is the integer ID for 'material', 'surface',
'cell', 'cellborn', and 'universe' Filters. Each bin is an integer
for the cell instance ID for 'distribcell' Filters. Each bin is a
2-tuple of floats for 'energy' and 'energyout' filters corresponding
to the energy boundaries of the bin of interest. The bin is an
(x,y,z) 3-tuple for 'mesh' filters corresponding to the mesh cell
of interest. The order of the bins in the list must correspond to
the filter_types parameter.
nuclides : list of str
A list of nuclide name strings
(e.g., ['U235', 'U238']; default is [])
value : str
A string for the type of value to return - 'mean' (default),
'std_dev', 'rel_err', 'sum', or 'sum_sq' are accepted
Returns
-------
float or numpy.ndarray
A scalar or NumPy array of the Tally data indexed in the order
each filter, nuclide and score is listed in the parameters.
Raises
------
ValueError
When this method is called before the Tally is populated with data,
or the input parameters do not correspond to the Tally's attributes,
e.g., if the score(s) do not match those in the Tally.
"""
# Ensure that the tally has data
if (value == 'mean' and self.mean is None) or \
(value == 'std_dev' and self.std_dev is None) or \
(value == 'rel_err' and self.mean is None) or \
(value == 'sum' and self.sum is None) or \
(value == 'sum_sq' and self.sum_sq is None):
msg = f'The Tally ID="{self.id}" has no data to return'
raise ValueError(msg)
# Get filter, nuclide and score indices
filter_indices = self.get_filter_indices(filters, filter_bins)
nuclide_indices = self.get_nuclide_indices(nuclides)
score_indices = self.get_score_indices(scores)
# Construct outer product of all three index types with each other
indices = np.ix_(filter_indices, nuclide_indices, score_indices)
# Return the desired result from Tally
if value == 'mean':
data = self.mean[indices]
elif value == 'std_dev':
data = self.std_dev[indices]
elif value == 'rel_err':
data = self.std_dev[indices] / self.mean[indices]
elif value == 'sum':
data = self.sum[indices]
elif value == 'sum_sq':
data = self.sum_sq[indices]
else:
msg = f'Unable to return results from Tally ID="{value}" since ' \
f'the requested value "{self.id}" is not \'mean\', ' \
'\'std_dev\', \'rel_err\', \'sum\', or \'sum_sq\''
raise LookupError(msg)
return data
[docs] def get_pandas_dataframe(self, filters=True, nuclides=True, scores=True,
derivative=True, paths=True, float_format='{:.2e}'):
"""Build a Pandas DataFrame for the Tally data.
This method constructs a Pandas DataFrame object for the Tally data
with columns annotated by filter, nuclide and score bin information.
This capability has been tested for Pandas >=0.13.1. However, it is
recommended to use v0.16 or newer versions of Pandas since this method
uses the Multi-index Pandas feature.
Parameters
----------
filters : bool
Include columns with filter bin information (default is True).
nuclides : bool
Include columns with nuclide bin information (default is True).
scores : bool
Include columns with score bin information (default is True).
derivative : bool
Include columns with differential tally info (default is True).
paths : bool, optional
Construct columns for distribcell tally filters (default is True).
The geometric information in the Summary object is embedded into a
Multi-index column with a geometric "path" to each distribcell
instance.
float_format : str
All floats in the DataFrame will be formatted using the given
format string before printing.
Returns
-------
pandas.DataFrame
A Pandas DataFrame with each column annotated by filter, nuclide and
score bin information (if these parameters are True), and the mean
and standard deviation of the Tally's data.
Raises
------
KeyError
When this method is called before the Tally is populated with data
"""
# Ensure that the tally has data
if self.mean is None or self.std_dev is None:
msg = f'The Tally ID="{self.id}" has no data to return'
raise KeyError(msg)
# Initialize a pandas dataframe for the tally data
df = pd.DataFrame()
# Find the total length of the tally data array
data_size = self.mean.size
# Build DataFrame columns for filters if user requested them
if filters:
# Append each Filter's DataFrame to the overall DataFrame
for f, stride in zip(self.filters, self.filter_strides):
filter_df = f.get_pandas_dataframe(
data_size, stride, paths=paths)
df = pd.concat([df, filter_df], axis=1)
# Include DataFrame column for nuclides if user requested it
if nuclides:
nuclides = []
column_name = 'nuclide'
for nuclide in self.nuclides:
if isinstance(nuclide, openmc.Nuclide):
nuclides.append(nuclide.name)
elif isinstance(nuclide, openmc.AggregateNuclide):
nuclides.append(nuclide.name)
column_name = f'{nuclide.aggregate_op}(nuclide)'
else:
nuclides.append(nuclide)
# Tile the nuclide bins into a DataFrame column
nuclides = np.repeat(nuclides, len(self.scores))
tile_factor = data_size / len(nuclides)
df[column_name] = np.tile(nuclides, int(tile_factor))
# Include column for scores if user requested it
if scores:
scores = []
column_name = 'score'
for score in self.scores:
if isinstance(score, (str, openmc.CrossScore)):
scores.append(str(score))
elif isinstance(score, openmc.AggregateScore):
scores.append(score.name)
column_name = f'{score.aggregate_op}(score)'
tile_factor = data_size / len(self.scores)
df[column_name] = np.tile(scores, int(tile_factor))
# Include columns for derivatives if user requested it
if derivative and (self.derivative is not None):
df['d_variable'] = self.derivative.variable
if self.derivative.material is not None:
df['d_material'] = self.derivative.material
if self.derivative.nuclide is not None:
df['d_nuclide'] = self.derivative.nuclide
# Append columns with mean, std. dev. for each tally bin
df['mean'] = self.mean.ravel()
df['std. dev.'] = self.std_dev.ravel()
df = df.dropna(axis=1)
# Expand the columns into Pandas MultiIndices for readability
if pd.__version__ >= '0.16':
columns = copy.deepcopy(df.columns.values)
# Convert all elements in columns list to tuples
for i, column in enumerate(columns):
if not isinstance(column, tuple):
columns[i] = (column,)
# Make each tuple the same length
max_len_column = len(max(columns, key=len))
for i, column in enumerate(columns):
delta_len = max_len_column - len(column)
if delta_len > 0:
new_column = list(column)
new_column.extend(['']*delta_len)
columns[i] = tuple(new_column)
# Create and set a MultiIndex for the DataFrame's columns, but only
# if any column actually is multi-level (e.g., a mesh filter)
if any(len(c) > 1 for c in columns):
df.columns = pd.MultiIndex.from_tuples(columns)
# Modify the df.to_string method so that it prints formatted strings.
# Credit to http://stackoverflow.com/users/3657742/chrisb for this trick
df.to_string = partial(df.to_string, float_format=float_format.format)
return df
[docs] def get_reshaped_data(self, value='mean'):
"""Returns an array of tally data with one dimension per filter.
The tally data in OpenMC is stored as a 3D array with the dimensions
corresponding to filters, nuclides and scores. As a result, tally data
can be opaque for a user to directly index (i.e., without use of
:meth:`openmc.Tally.get_values`) since one must know how to properly use
the number of bins and strides for each filter to index into the first
(filter) dimension.
This builds and returns a reshaped version of the tally data array with
unique dimensions corresponding to each tally filter. For example,
suppose this tally has arrays of data with shape (8,5,5) corresponding
to two filters (2 and 4 bins, respectively), five nuclides and five
scores. This method will return a version of the data array with the
with a new shape of (2,4,5,5) such that the first two dimensions
correspond directly to the two filters with two and four bins.
Parameters
----------
value : str
A string for the type of value to return - 'mean' (default),
'std_dev', 'rel_err', 'sum', or 'sum_sq' are accepted
Returns
-------
numpy.ndarray
The tally data array indexed by filters, nuclides and scores.
"""
# Get the 3D array of data in filters, nuclides and scores
data = self.get_values(value=value)
# Build a new array shape with one dimension per filter
new_shape = tuple(f.num_bins for f in self.filters)
new_shape += (self.num_nuclides, self.num_scores)
# Reshape the data with one dimension for each filter
data = np.reshape(data, new_shape)
return data
[docs] def hybrid_product(self, other, binary_op, filter_product=None,
nuclide_product=None, score_product=None):
"""Combines filters, scores and nuclides with another tally.
This is a helper method for the tally arithmetic operator overloaded
methods. It is called a "hybrid product" because it performs a
combination of tensor (or Kronecker) and entrywise (or Hadamard)
products. The filters from both tallies are combined using an entrywise
(or Hadamard) product on matching filters. By default, if all nuclides
are identical in the two tallies, the entrywise product is performed
across nuclides; else the tensor product is performed. By default, if
all scores are identical in the two tallies, the entrywise product is
performed across scores; else the tensor product is performed. Users
can also call the method explicitly and specify the desired product.
Parameters
----------
other : openmc.Tally
The tally on the right hand side of the hybrid product
binary_op : {'+', '-', '*', '/', '^'}
The binary operation in the hybrid product
filter_product : {'tensor', 'entrywise' or None}
The type of product (tensor or entrywise) to be performed between
filter data. The default is the entrywise product. Currently only
the entrywise product is supported since a tally cannot contain
two of the same filter.
nuclide_product : {'tensor', 'entrywise' or None}
The type of product (tensor or entrywise) to be performed between
nuclide data. The default is the entrywise product if all nuclides
between the two tallies are the same; otherwise the default is
the tensor product.
score_product : {'tensor', 'entrywise' or None}
The type of product (tensor or entrywise) to be performed between
score data. The default is the entrywise product if all scores
between the two tallies are the same; otherwise the default is
the tensor product.
Returns
-------
openmc.Tally
A new Tally that is the hybrid product with this one.
Raises
------
ValueError
When this method is called before the other tally is populated
with data.
"""
# Set default value for filter product if it was not set
if filter_product is None:
filter_product = 'entrywise'
elif filter_product == 'tensor':
msg = 'Unable to perform Tally arithmetic with a tensor product' \
'for the filter data as this is not currently supported.'
raise ValueError(msg)
# Set default value for nuclide product if it was not set
if nuclide_product is None:
if self.nuclides == other.nuclides:
nuclide_product = 'entrywise'
else:
nuclide_product = 'tensor'
# Set default value for score product if it was not set
if score_product is None:
if self.scores == other.scores:
score_product = 'entrywise'
else:
score_product = 'tensor'
# Check product types
cv.check_value('filter product', filter_product, _PRODUCT_TYPES)
cv.check_value('nuclide product', nuclide_product, _PRODUCT_TYPES)
cv.check_value('score product', score_product, _PRODUCT_TYPES)
# Check that results have been read
if not other.derived and other.sum is None:
msg = f'Unable to use tally arithmetic with Tally ' \
f'ID="{other.id}" since it does not contain any results.'
raise ValueError(msg)
new_tally = Tally()
new_tally._derived = True
new_tally.with_batch_statistics = True
new_tally._num_realizations = self.num_realizations
new_tally._estimator = self.estimator
new_tally._with_summary = self.with_summary
new_tally._sp_filename = self._sp_filename
# Construct a combined derived name from the two tally operands
if self.name != '' and other.name != '':
new_name = f'({self.name} {binary_op} {other.name})'
new_tally.name = new_name
# Query the mean and std dev so the tally data is read in from file
# if it has not already been read in.
self.mean, self.std_dev, other.mean, other.std_dev
# Create copies of self and other tallies to rearrange for tally
# arithmetic
self_copy = copy.deepcopy(self)
other_copy = copy.deepcopy(other)
self_copy.sparse = False
other_copy.sparse = False
# Align the tally data based on desired hybrid product
data = self_copy._align_tally_data(other_copy, filter_product,
nuclide_product, score_product)
# Perform tally arithmetic operation
if binary_op == '+':
new_tally._mean = data['self']['mean'] + data['other']['mean']
new_tally._std_dev = np.sqrt(data['self']['std. dev.']**2 +
data['other']['std. dev.']**2)
elif binary_op == '-':
new_tally._mean = data['self']['mean'] - data['other']['mean']
new_tally._std_dev = np.sqrt(data['self']['std. dev.']**2 +
data['other']['std. dev.']**2)
elif binary_op == '*':
with np.errstate(divide='ignore', invalid='ignore'):
self_rel_err = data['self']['std. dev.'] / data['self']['mean']
other_rel_err = data['other']['std. dev.'] / data['other']['mean']
new_tally._mean = data['self']['mean'] * data['other']['mean']
new_tally._std_dev = np.abs(new_tally.mean) * \
np.sqrt(self_rel_err**2 + other_rel_err**2)
elif binary_op == '/':
with np.errstate(divide='ignore', invalid='ignore'):
self_rel_err = data['self']['std. dev.'] / data['self']['mean']
other_rel_err = data['other']['std. dev.'] / data['other']['mean']
new_tally._mean = data['self']['mean'] / data['other']['mean']
new_tally._std_dev = np.abs(new_tally.mean) * \
np.sqrt(self_rel_err**2 + other_rel_err**2)
elif binary_op == '^':
with np.errstate(divide='ignore', invalid='ignore'):
mean_ratio = data['other']['mean'] / data['self']['mean']
first_term = mean_ratio * data['self']['std. dev.']
second_term = \
np.log(data['self']['mean']) * data['other']['std. dev.']
new_tally._mean = data['self']['mean'] ** data['other']['mean']
new_tally._std_dev = np.abs(new_tally.mean) * \
np.sqrt(first_term**2 + second_term**2)
# Convert any infs and nans to zero
new_tally._mean[np.isinf(new_tally._mean)] = 0
new_tally._mean = np.nan_to_num(new_tally._mean)
new_tally._std_dev[np.isinf(new_tally._std_dev)] = 0
new_tally._std_dev = np.nan_to_num(new_tally._std_dev)
# Set tally attributes
if self_copy.estimator == other_copy.estimator:
new_tally.estimator = self_copy.estimator
if self_copy.with_summary and other_copy.with_summary:
new_tally.with_summary = self_copy.with_summary
if self_copy.num_realizations == other_copy.num_realizations:
new_tally.num_realizations = self_copy.num_realizations
# Add filters to the new tally
if filter_product == 'entrywise':
for self_filter in self_copy.filters:
new_tally.filters.append(self_filter)
else:
all_filters = [self_copy.filters, other_copy.filters]
for self_filter, other_filter in product(*all_filters):
new_filter = openmc.CrossFilter(self_filter, other_filter,
binary_op)
new_tally.filters.append(new_filter)
# Add nuclides to the new tally
if nuclide_product == 'entrywise':
for self_nuclide in self_copy.nuclides:
new_tally.nuclides.append(self_nuclide)
else:
all_nuclides = [self_copy.nuclides, other_copy.nuclides]
for self_nuclide, other_nuclide in product(*all_nuclides):
new_nuclide = openmc.CrossNuclide(self_nuclide, other_nuclide,
binary_op)
new_tally.nuclides.append(new_nuclide)
# Define helper function that handles score units appropriately
# depending on the binary operator
def cross_score(score1, score2, binary_op):
if binary_op == '+' or binary_op == '-':
if score1 == score2:
return score1
else:
return openmc.CrossScore(score1, score2, binary_op)
else:
return openmc.CrossScore(score1, score2, binary_op)
# Add scores to the new tally
if score_product == 'entrywise':
for self_score in self_copy.scores:
new_score = cross_score(self_score, self_score, binary_op)
new_tally.scores.append(new_score)
else:
all_scores = [self_copy.scores, other_copy.scores]
for self_score, other_score in product(*all_scores):
new_score = cross_score(self_score, other_score, binary_op)
new_tally.scores.append(new_score)
return new_tally
@property
def filter_strides(self):
all_strides = []
stride = self.num_nuclides * self.num_scores
for self_filter in reversed(self.filters):
all_strides.append(stride)
stride *= self_filter.num_bins
return all_strides[::-1]
def _align_tally_data(self, other, filter_product, nuclide_product,
score_product):
"""Aligns data from two tallies for tally arithmetic.
This is a helper method to construct a dict of dicts of the "aligned"
data arrays from each tally for tally arithmetic. The method analyzes
the filters, scores and nuclides in both tallies and determines how to
appropriately align the data for vectorized arithmetic. For example,
if the two tallies have different filters, this method will use NumPy
'tile' and 'repeat' operations to the new data arrays such that all
possible combinations of the data in each tally's bins will be made
when the arithmetic operation is applied to the arrays.
Parameters
----------
other : openmc.Tally
The tally to outer product with this tally
filter_product : {'entrywise'}
The type of product to be performed between filter data. Currently,
only the entrywise product is supported for the filter product.
nuclide_product : {'tensor', 'entrywise'}
The type of product (tensor or entrywise) to be performed between
nuclide data.
score_product : {'tensor', 'entrywise'}
The type of product (tensor or entrywise) to be performed between
score data.
Returns
-------
dict
A dictionary of dictionaries to "aligned" 'mean' and 'std. dev'
NumPy arrays for each tally's data.
"""
# Get the set of filters that each tally is missing
other_missing_filters = set(self.filters) - set(other.filters)
self_missing_filters = set(other.filters) - set(self.filters)
# Add filters present in self but not in other to other
for other_filter in other_missing_filters:
filter_copy = copy.deepcopy(other_filter)
other._mean = np.repeat(other.mean, filter_copy.num_bins, axis=0)
other._std_dev = np.repeat(other.std_dev, filter_copy.num_bins, axis=0)
other.filters.append(filter_copy)
# Add filters present in other but not in self to self
for self_filter in self_missing_filters:
filter_copy = copy.deepcopy(self_filter)
self._mean = np.repeat(self.mean, filter_copy.num_bins, axis=0)
self._std_dev = np.repeat(self.std_dev, filter_copy.num_bins, axis=0)
self.filters.append(filter_copy)
# Align other filters with self filters
for i, self_filter in enumerate(self.filters):
other_index = other.filters.index(self_filter)
# If necessary, swap other filter
if other_index != i:
other._swap_filters(self_filter, other.filters[i])
# Repeat and tile the data by nuclide in preparation for performing
# the tensor product across nuclides.
if nuclide_product == 'tensor':
self._mean = np.repeat(self.mean, other.num_nuclides, axis=1)
self._std_dev = np.repeat(self.std_dev, other.num_nuclides, axis=1)
other._mean = np.tile(other.mean, (1, self.num_nuclides, 1))
other._std_dev = np.tile(other.std_dev, (1, self.num_nuclides, 1))
# Add nuclides to each tally such that each tally contains the complete
# set of nuclides necessary to perform an entrywise product. New
# nuclides added to a tally will have all their scores set to zero.
else:
# Get the set of nuclides that each tally is missing
other_missing_nuclides = set(self.nuclides) - set(other.nuclides)
self_missing_nuclides = set(other.nuclides) - set(self.nuclides)
# Add nuclides present in self but not in other to other
for nuclide in other_missing_nuclides:
other._mean = np.insert(other.mean, other.num_nuclides, 0, axis=1)
other._std_dev = np.insert(other.std_dev, other.num_nuclides, 0,
axis=1)
other.nuclides.append(nuclide)
# Add nuclides present in other but not in self to self
for nuclide in self_missing_nuclides:
self._mean = np.insert(self.mean, self.num_nuclides, 0, axis=1)
self._std_dev = np.insert(self.std_dev, self.num_nuclides, 0,
axis=1)
self.nuclides.append(nuclide)
# Align other nuclides with self nuclides
for i, nuclide in enumerate(self.nuclides):
other_index = other.get_nuclide_index(nuclide)
# If necessary, swap other nuclide
if other_index != i:
other._swap_nuclides(nuclide, other.nuclides[i])
# Repeat and tile the data by score in preparation for performing
# the tensor product across scores.
if score_product == 'tensor':
self._mean = np.repeat(self.mean, other.num_scores, axis=2)
self._std_dev = np.repeat(self.std_dev, other.num_scores, axis=2)
other._mean = np.tile(other.mean, (1, 1, self.num_scores))
other._std_dev = np.tile(other.std_dev, (1, 1, self.num_scores))
# Add scores to each tally such that each tally contains the complete set
# of scores necessary to perform an entrywise product. New scores added
# to a tally will be set to zero.
else:
# Get the set of scores that each tally is missing
other_missing_scores = set(self.scores) - set(other.scores)
self_missing_scores = set(other.scores) - set(self.scores)
# Add scores present in self but not in other to other
for score in other_missing_scores:
other._mean = np.insert(other.mean, other.num_scores, 0, axis=2)
other._std_dev = np.insert(other.std_dev, other.num_scores, 0, axis=2)
other.scores.append(score)
# Add scores present in other but not in self to self
for score in self_missing_scores:
self._mean = np.insert(self.mean, self.num_scores, 0, axis=2)
self._std_dev = np.insert(self.std_dev, self.num_scores, 0, axis=2)
self.scores.append(score)
# Align other scores with self scores
for i, score in enumerate(self.scores):
other_index = other.scores.index(score)
# If necessary, swap other score
if other_index != i:
other._swap_scores(score, other.scores[i])
data = {}
data['self'] = {}
data['other'] = {}
data['self']['mean'] = self.mean
data['other']['mean'] = other.mean
data['self']['std. dev.'] = self.std_dev
data['other']['std. dev.'] = other.std_dev
return data
def _swap_filters(self, filter1, filter2):
"""Reverse the ordering of two filters in this tally
This is a helper method for tally arithmetic which helps align the data
in two tallies with shared filters. This method reverses the order of
the two filters in place.
Parameters
----------
filter1 : Filter
The filter to swap with filter2
filter2 : Filter
The filter to swap with filter1
Raises
------
ValueError
If this is a derived tally or this method is called before the tally
is populated with data.
"""
cv.check_type('filter1', filter1, _FILTER_CLASSES)
cv.check_type('filter2', filter2, _FILTER_CLASSES)
# Check that the filters exist in the tally and are not the same
if filter1 == filter2:
return
elif filter1 not in self.filters:
msg = f'Unable to swap "{filter1.type}" filter1 in Tally ' \
f'ID="{self.id}" since it does not contain such a filter'
raise ValueError(msg)
elif filter2 not in self.filters:
msg = f'Unable to swap "{filter2.type}" filter2 in Tally ' \
f'ID="{self.id}" since it does not contain such a filter'
raise ValueError(msg)
# Construct lists of tuples for the bins in each of the two filters
filters = [type(filter1), type(filter2)]
if isinstance(filter1, openmc.DistribcellFilter):
filter1_bins = [b for b in range(filter1.num_bins)]
elif isinstance(filter1, openmc.EnergyFunctionFilter):
filter1_bins = [None]
else:
filter1_bins = filter1.bins
if isinstance(filter2, openmc.DistribcellFilter):
filter2_bins = [b for b in range(filter2.num_bins)]
elif isinstance(filter2, openmc.EnergyFunctionFilter):
filter2_bins = [None]
else:
filter2_bins = filter2.bins
# Create variables to store views of data in the misaligned structure
mean = {}
std_dev = {}
# Store the data from the misaligned structure
for i, (bin1, bin2) in enumerate(product(filter1_bins, filter2_bins)):
filter_bins = [(bin1,), (bin2,)]
if self.mean is not None:
mean[i] = self.get_values(
filters=filters, filter_bins=filter_bins, value='mean')
if self.std_dev is not None:
std_dev[i] = self.get_values(
filters=filters, filter_bins=filter_bins, value='std_dev')
# Swap the filters in the copied version of this Tally
filter1_index = self.filters.index(filter1)
filter2_index = self.filters.index(filter2)
self.filters[filter1_index] = filter2
self.filters[filter2_index] = filter1
# Realign the data
for i, (bin1, bin2) in enumerate(product(filter1_bins, filter2_bins)):
filter_bins = [(bin1,), (bin2,)]
indices = self.get_filter_indices(filters, filter_bins)
if self.mean is not None:
self.mean[indices, :, :] = mean[i]
if self.std_dev is not None:
self.std_dev[indices, :, :] = std_dev[i]
def _swap_nuclides(self, nuclide1, nuclide2):
"""Reverse the ordering of two nuclides in this tally
This is a helper method for tally arithmetic which helps align the data
in two tallies with shared nuclides. This method reverses the order of
the two nuclides in place.
Parameters
----------
nuclide1 : Nuclide
The nuclide to swap with nuclide2
nuclide2 : Nuclide
The nuclide to swap with nuclide1
Raises
------
ValueError
If this is a derived tally or this method is called before the tally
is populated with data.
"""
# Check that results have been read
if not self.derived and self.sum is None:
msg = f'Unable to use tally arithmetic with Tally ID="{self.id}" ' \
'since it does not contain any results.'
raise ValueError(msg)
cv.check_type('nuclide1', nuclide1, _NUCLIDE_CLASSES)
cv.check_type('nuclide2', nuclide2, _NUCLIDE_CLASSES)
# Check that the nuclides exist in the tally and are not the same
if nuclide1 == nuclide2:
msg = 'Unable to swap a nuclide with itself'
raise ValueError(msg)
elif nuclide1 not in self.nuclides:
msg = f'Unable to swap nuclide1 "{nuclide1.name}" in Tally ' \
f'ID="{self.id}" since it does not contain such a nuclide'
raise ValueError(msg)
elif nuclide2 not in self.nuclides:
msg = f'Unable to swap "{nuclide2.name}" nuclide2 in Tally ' \
f'ID="{self.id}" since it does not contain such a nuclide'
raise ValueError(msg)
# Swap the nuclides in the Tally
nuclide1_index = self.get_nuclide_index(nuclide1)
nuclide2_index = self.get_nuclide_index(nuclide2)
self.nuclides[nuclide1_index] = nuclide2
self.nuclides[nuclide2_index] = nuclide1
# Adjust the mean data array to relect the new nuclide order
if self.mean is not None:
nuclide1_mean = self.mean[:, nuclide1_index, :].copy()
nuclide2_mean = self.mean[:, nuclide2_index, :].copy()
self.mean[:, nuclide2_index, :] = nuclide1_mean
self.mean[:, nuclide1_index, :] = nuclide2_mean
# Adjust the std_dev data array to relect the new nuclide order
if self.std_dev is not None:
nuclide1_std_dev = self.std_dev[:, nuclide1_index, :].copy()
nuclide2_std_dev = self.std_dev[:, nuclide2_index, :].copy()
self.std_dev[:, nuclide2_index, :] = nuclide1_std_dev
self.std_dev[:, nuclide1_index, :] = nuclide2_std_dev
def _swap_scores(self, score1, score2):
"""Reverse the ordering of two scores in this tally
This is a helper method for tally arithmetic which helps align the data
in two tallies with shared scores. This method reverses the order
of the two scores in place.
Parameters
----------
score1 : str or CrossScore
The score to swap with score2
score2 : str or CrossScore
The score to swap with score1
Raises
------
ValueError
If this is a derived tally or this method is called before the tally
is populated with data.
"""
# Check that results have been read
if not self.derived and self.sum is None:
msg = 'Unable to use tally arithmetic with Tally ID="{}" ' \
'since it does not contain any results.'.format(self.id)
raise ValueError(msg)
# Check that the scores are valid
if not isinstance(score1, (str, openmc.CrossScore)):
msg = 'Unable to swap score1 "{}" in Tally ID="{}" since it is ' \
'not a string or CrossScore'.format(score1, self.id)
raise ValueError(msg)
elif not isinstance(score2, (str, openmc.CrossScore)):
msg = 'Unable to swap score2 "{}" in Tally ID="{}" since it is ' \
'not a string or CrossScore'.format(score2, self.id)
raise ValueError(msg)
# Check that the scores exist in the tally and are not the same
if score1 == score2:
msg = 'Unable to swap a score with itself'
raise ValueError(msg)
elif score1 not in self.scores:
msg = 'Unable to swap score1 "{}" in Tally ID="{}" since it ' \
'does not contain such a score'.format(score1, self.id)
raise ValueError(msg)
elif score2 not in self.scores:
msg = 'Unable to swap score2 "{}" in Tally ID="{}" since it ' \
'does not contain such a score'.format(score2, self.id)
raise ValueError(msg)
# Swap the scores in the Tally
score1_index = self.get_score_index(score1)
score2_index = self.get_score_index(score2)
self.scores[score1_index] = score2
self.scores[score2_index] = score1
# Adjust the mean data array to relect the new nuclide order
if self.mean is not None:
score1_mean = self.mean[:, :, score1_index].copy()
score2_mean = self.mean[:, :, score2_index].copy()
self.mean[:, :, score2_index] = score1_mean
self.mean[:, :, score1_index] = score2_mean
# Adjust the std_dev data array to relect the new nuclide order
if self.std_dev is not None:
score1_std_dev = self.std_dev[:, :, score1_index].copy()
score2_std_dev = self.std_dev[:, :, score2_index].copy()
self.std_dev[:, :, score2_index] = score1_std_dev
self.std_dev[:, :, score1_index] = score2_std_dev
def __add__(self, other):
"""Adds this tally to another tally or scalar value.
This method builds a new tally with data that is the sum of this
tally's data and that from the other tally or scalar value. If the
filters, scores and nuclides in the two tallies are not the same, then
they are combined in all possible ways in the new derived tally.
Uncertainty propagation is used to compute the standard deviation
for the new tally's data. It is important to note that this makes
the assumption that the tally data is independently distributed.
In most use cases, this is *not* true and may lead to under-prediction
of the uncertainty. The uncertainty propagation model is from the
following source:
https://en.wikipedia.org/wiki/Propagation_of_uncertainty
Parameters
----------
other : openmc.Tally or float
The tally or scalar value to add to this tally
Returns
-------
openmc.Tally
A new derived tally which is the sum of this tally and the other
tally or scalar value in the addition.
Raises
------
ValueError
When this method is called before the Tally is populated with data.
"""
# Check that results have been read
if not self.derived and self.sum is None:
msg = 'Unable to use tally arithmetic with Tally ID="{}" ' \
'since it does not contain any results.'.format(self.id)
raise ValueError(msg)
if isinstance(other, Tally):
new_tally = self.hybrid_product(other, binary_op='+')
# If both tally operands were sparse, sparsify the new tally
if self.sparse and other.sparse:
new_tally.sparse = True
elif isinstance(other, Real):
new_tally = Tally(name='derived')
new_tally._derived = True
new_tally.with_batch_statistics = True
new_tally.name = self.name
new_tally._mean = self.mean + other
new_tally._std_dev = self.std_dev
new_tally.estimator = self.estimator
new_tally.with_summary = self.with_summary
new_tally.num_realizations = self.num_realizations
new_tally.filters = copy.deepcopy(self.filters)
new_tally.nuclides = copy.deepcopy(self.nuclides)
new_tally.scores = copy.deepcopy(self.scores)
# If this tally operand is sparse, sparsify the new tally
new_tally.sparse = self.sparse
else:
msg = 'Unable to add "{}" to Tally ID="{}"'.format(other, self.id)
raise ValueError(msg)
return new_tally
def __sub__(self, other):
"""Subtracts another tally or scalar value from this tally.
This method builds a new tally with data that is the difference of
this tally's data and that from the other tally or scalar value. If the
filters, scores and nuclides in the two tallies are not the same, then
they are combined in all possible ways in the new derived tally.
Uncertainty propagation is used to compute the standard deviation
for the new tally's data. It is important to note that this makes
the assumption that the tally data is independently distributed.
In most use cases, this is *not* true and may lead to under-prediction
of the uncertainty. The uncertainty propagation model is from the
following source:
https://en.wikipedia.org/wiki/Propagation_of_uncertainty
Parameters
----------
other : openmc.Tally or float
The tally or scalar value to subtract from this tally
Returns
-------
openmc.Tally
A new derived tally which is the difference of this tally and the
other tally or scalar value in the subtraction.
Raises
------
ValueError
When this method is called before the Tally is populated with data.
"""
# Check that results have been read
if not self.derived and self.sum is None:
msg = 'Unable to use tally arithmetic with Tally ID="{}" ' \
'since it does not contain any results.'.format(self.id)
raise ValueError(msg)
if isinstance(other, Tally):
new_tally = self.hybrid_product(other, binary_op='-')
# If both tally operands were sparse, sparsify the new tally
if self.sparse and other.sparse:
new_tally.sparse = True
elif isinstance(other, Real):
new_tally = Tally(name='derived')
new_tally._derived = True
new_tally.name = self.name
new_tally._mean = self.mean - other
new_tally._std_dev = self.std_dev
new_tally.estimator = self.estimator
new_tally.with_summary = self.with_summary
new_tally.num_realizations = self.num_realizations
new_tally.filters = copy.deepcopy(self.filters)
new_tally.nuclides = copy.deepcopy(self.nuclides)
new_tally.scores = copy.deepcopy(self.scores)
# If this tally operand is sparse, sparsify the new tally
new_tally.sparse = self.sparse
else:
msg = 'Unable to subtract "{}" from Tally ID="{}"'.format(other, self.id)
raise ValueError(msg)
return new_tally
def __mul__(self, other):
"""Multiplies this tally with another tally or scalar value.
This method builds a new tally with data that is the product of
this tally's data and that from the other tally or scalar value. If the
filters, scores and nuclides in the two tallies are not the same, then
they are combined in all possible ways in the new derived tally.
Uncertainty propagation is used to compute the standard deviation
for the new tally's data. It is important to note that this makes
the assumption that the tally data is independently distributed.
In most use cases, this is *not* true and may lead to under-prediction
of the uncertainty. The uncertainty propagation model is from the
following source:
https://en.wikipedia.org/wiki/Propagation_of_uncertainty
Parameters
----------
other : openmc.Tally or float
The tally or scalar value to multiply with this tally
Returns
-------
openmc.Tally
A new derived tally which is the product of this tally and the
other tally or scalar value in the multiplication.
Raises
------
ValueError
When this method is called before the Tally is populated with data.
"""
# Check that results have been read
if not self.derived and self.sum is None:
msg = 'Unable to use tally arithmetic with Tally ID="{}" ' \
'since it does not contain any results.'.format(self.id)
raise ValueError(msg)
if isinstance(other, Tally):
new_tally = self.hybrid_product(other, binary_op='*')
# If original tally operands were sparse, sparsify the new tally
if self.sparse and other.sparse:
new_tally.sparse = True
elif isinstance(other, Real):
new_tally = Tally(name='derived')
new_tally._derived = True
new_tally.name = self.name
new_tally._mean = self.mean * other
new_tally._std_dev = self.std_dev * np.abs(other)
new_tally.estimator = self.estimator
new_tally.with_summary = self.with_summary
new_tally.num_realizations = self.num_realizations
new_tally.filters = copy.deepcopy(self.filters)
new_tally.nuclides = copy.deepcopy(self.nuclides)
new_tally.scores = copy.deepcopy(self.scores)
# If this tally operand is sparse, sparsify the new tally
new_tally.sparse = self.sparse
else:
msg = 'Unable to multiply Tally ID="{}" by "{}"'.format(self.id, other)
raise ValueError(msg)
return new_tally
def __truediv__(self, other):
"""Divides this tally by another tally or scalar value.
This method builds a new tally with data that is the dividend of
this tally's data and that from the other tally or scalar value. If the
filters, scores and nuclides in the two tallies are not the same, then
they are combined in all possible ways in the new derived tally.
Uncertainty propagation is used to compute the standard deviation
for the new tally's data. It is important to note that this makes
the assumption that the tally data is independently distributed.
In most use cases, this is *not* true and may lead to under-prediction
of the uncertainty. The uncertainty propagation model is from the
following source:
https://en.wikipedia.org/wiki/Propagation_of_uncertainty
Parameters
----------
other : openmc.Tally or float
The tally or scalar value to divide this tally by
Returns
-------
openmc.Tally
A new derived tally which is the dividend of this tally and the
other tally or scalar value in the division.
Raises
------
ValueError
When this method is called before the Tally is populated with data.
"""
# Check that results have been read
if not self.derived and self.sum is None:
msg = 'Unable to use tally arithmetic with Tally ID="{}" ' \
'since it does not contain any results.'.format(self.id)
raise ValueError(msg)
if isinstance(other, Tally):
new_tally = self.hybrid_product(other, binary_op='/')
# If original tally operands were sparse, sparsify the new tally
if self.sparse and other.sparse:
new_tally.sparse = True
elif isinstance(other, Real):
new_tally = Tally(name='derived')
new_tally._derived = True
new_tally.name = self.name
new_tally._mean = self.mean / other
new_tally._std_dev = self.std_dev * np.abs(1. / other)
new_tally.estimator = self.estimator
new_tally.with_summary = self.with_summary
new_tally.num_realizations = self.num_realizations
new_tally.filters = copy.deepcopy(self.filters)
new_tally.nuclides = copy.deepcopy(self.nuclides)
new_tally.scores = copy.deepcopy(self.scores)
# If this tally operand is sparse, sparsify the new tally
new_tally.sparse = self.sparse
else:
msg = 'Unable to divide Tally ID="{}" by "{}"'.format(self.id, other)
raise ValueError(msg)
return new_tally
def __div__(self, other):
return self.__truediv__(other)
def __pow__(self, power):
"""Raises this tally to another tally or scalar value power.
This method builds a new tally with data that is the power of
this tally's data to that from the other tally or scalar value. If the
filters, scores and nuclides in the two tallies are not the same, then
they are combined in all possible ways in the new derived tally.
Uncertainty propagation is used to compute the standard deviation
for the new tally's data. It is important to note that this makes
the assumption that the tally data is independently distributed.
In most use cases, this is *not* true and may lead to under-prediction
of the uncertainty. The uncertainty propagation model is from the
following source:
https://en.wikipedia.org/wiki/Propagation_of_uncertainty
Parameters
----------
power : openmc.Tally or float
The tally or scalar value exponent
Returns
-------
openmc.Tally
A new derived tally which is this tally raised to the power of the
other tally or scalar value in the exponentiation.
Raises
------
ValueError
When this method is called before the Tally is populated with data.
"""
# Check that results have been read
if not self.derived and self.sum is None:
msg = 'Unable to use tally arithmetic with Tally ID="{}" ' \
'since it does not contain any results.'.format(self.id)
raise ValueError(msg)
if isinstance(power, Tally):
new_tally = self.hybrid_product(power, binary_op='^')
# If original tally operand was sparse, sparsify the new tally
if self.sparse:
new_tally.sparse = True
elif isinstance(power, Real):
new_tally = Tally(name='derived')
new_tally._derived = True
new_tally.name = self.name
new_tally._mean = self._mean ** power
self_rel_err = self.std_dev / self.mean
new_tally._std_dev = np.abs(new_tally._mean * power * self_rel_err)
new_tally.estimator = self.estimator
new_tally.with_summary = self.with_summary
new_tally.num_realizations = self.num_realizations
new_tally.filters = copy.deepcopy(self.filters)
new_tally.nuclides = copy.deepcopy(self.nuclides)
new_tally.scores = copy.deepcopy(self.scores)
# If original tally was sparse, sparsify the exponentiated tally
new_tally.sparse = self.sparse
else:
msg = 'Unable to raise Tally ID="{}" to power "{}"'.format(self.id, power)
raise ValueError(msg)
return new_tally
def __radd__(self, other):
"""Right addition with a scalar value.
This reverses the operands and calls the __add__ method.
Parameters
----------
other : float
The scalar value to add to this tally
Returns
-------
openmc.Tally
A new derived tally of this tally added with the scalar value.
"""
return self + other
def __rsub__(self, other):
"""Right subtraction from a scalar value.
This reverses the operands and calls the __sub__ method.
Parameters
----------
other : float
The scalar value to subtract this tally from
Returns
-------
openmc.Tally
A new derived tally of this tally subtracted from the scalar value.
"""
return -1. * self + other
def __rmul__(self, other):
"""Right multiplication with a scalar value.
This reverses the operands and calls the __mul__ method.
Parameters
----------
other : float
The scalar value to multiply with this tally
Returns
-------
openmc.Tally
A new derived tally of this tally multiplied by the scalar value.
"""
return self * other
def __rdiv__(self, other):
"""Right division with a scalar value.
This reverses the operands and calls the __div__ method.
Parameters
----------
other : float
The scalar value to divide by this tally
Returns
-------
openmc.Tally
A new derived tally of the scalar value divided by this tally.
"""
return other * self**-1
def __abs__(self):
"""The absolute value of this tally.
Returns
-------
openmc.Tally
A new derived tally which is the absolute value of this tally.
"""
new_tally = copy.deepcopy(self)
new_tally._mean = np.abs(new_tally.mean)
return new_tally
def __neg__(self):
"""The negated value of this tally.
Returns
-------
openmc.Tally
A new derived tally which is the negated value of this tally.
"""
new_tally = self * -1
return new_tally
[docs] def get_slice(self, scores=[], filters=[], filter_bins=[], nuclides=[],
squeeze=False):
"""Build a sliced tally for the specified filters, scores and nuclides.
This method constructs a new tally to encapsulate a subset of the data
represented by this tally. The subset of data to include in the tally
slice is determined by the scores, filters and nuclides specified in
the input parameters.
Parameters
----------
scores : list of str
A list of one or more score strings (e.g., ['absorption',
'nu-fission']
filters : Iterable of openmc.FilterMeta
An iterable of filter types (e.g., [MeshFilter, EnergyFilter])
filter_bins : list of Iterables
A list of iterables of filter bins corresponding to the specified
filter types (e.g., [(1,), ((0., 0.625e-6),)]). Each iterable
contains bins to slice for the corresponding filter type in the
filters parameter. Each bin is the integer ID for 'material',
'surface', 'cell', 'cellborn', and 'universe' Filters. Each bin is
an integer for the cell instance ID for 'distribcell' Filters. Each
bin is a 2-tuple of floats for 'energy' and 'energyout' filters
corresponding to the energy boundaries of the bin of interest. The
bin is an (x,y,z) 3-tuple for 'mesh' filters corresponding to the
mesh cell of interest. The order of the bins in the list must
correspond to the `filters` argument.
nuclides : list of str
A list of nuclide name strings (e.g., ['U235', 'U238'])
squeeze : bool
Whether to remove filters with only a single bin in the sliced tally
Returns
-------
openmc.Tally
A new tally which encapsulates the subset of data requested in the
order each filter, nuclide and score is listed in the parameters.
Raises
------
ValueError
When this method is called before the Tally is populated with data.
"""
# Ensure that the tally has data
if not self.derived and self.sum is None:
msg = 'Unable to use tally arithmetic with Tally ID="{}" ' \
'since it does not contain any results.'.format(self.id)
raise ValueError(msg)
# Create deep copy of tally to return as sliced tally
new_tally = copy.deepcopy(self)
new_tally._derived = True
# Differentiate Tally with a new auto-generated Tally ID
new_tally.id = None
new_tally.sparse = False
if not self.derived and self.sum is not None:
new_sum = self.get_values(scores, filters, filter_bins,
nuclides, 'sum')
new_tally.sum = new_sum
if not self.derived and self.sum_sq is not None:
new_sum_sq = self.get_values(scores, filters, filter_bins,
nuclides, 'sum_sq')
new_tally.sum_sq = new_sum_sq
if self.mean is not None:
new_mean = self.get_values(scores, filters, filter_bins,
nuclides, 'mean')
new_tally._mean = new_mean
if self.std_dev is not None:
new_std_dev = self.get_values(scores, filters, filter_bins,
nuclides, 'std_dev')
new_tally._std_dev = new_std_dev
# SCORES
if scores:
score_indices = []
# Determine the score indices from any of the requested scores
for score in self.scores:
if score not in scores:
score_index = self.get_score_index(score)
score_indices.append(score_index)
# Loop over indices in reverse to remove excluded scores
for score_index in reversed(score_indices):
new_tally.remove_score(self.scores[score_index])
# NUCLIDES
if nuclides:
nuclide_indices = []
# Determine the nuclide indices from any of the requested nuclides
for nuclide in self.nuclides:
if nuclide.name not in nuclides:
nuclide_index = self.get_nuclide_index(nuclide.name)
nuclide_indices.append(nuclide_index)
# Loop over indices in reverse to remove excluded Nuclides
for nuclide_index in reversed(nuclide_indices):
new_tally.remove_nuclide(self.nuclides[nuclide_index])
# FILTERS
if filters:
# Determine the filter indices from any of the requested filters
for i, filter_type in enumerate(filters):
f = new_tally.find_filter(filter_type)
# Remove filters with only a single bin if requested
if squeeze:
if len(filter_bins[i]) == 1:
new_tally.filters.remove(f)
continue
else:
raise RuntimeError('Cannot remove sliced filter with '
'more than one bin.')
# Remove and/or reorder filter bins to user specifications
bin_indices = [f.get_bin_index(b)
for b in filter_bins[i]]
bin_indices = np.unique(bin_indices)
# Set bins for sliced filter
new_filter = copy.copy(f)
new_filter.bins = [f.bins[i] for i in bin_indices]
# Set number of bins manually for mesh/distribcell filters
if filter_type is openmc.DistribcellFilter:
new_filter._num_bins = f._num_bins
# Replace existing filter with new one
for j, test_filter in enumerate(new_tally.filters):
if isinstance(test_filter, filter_type):
new_tally.filters[j] = new_filter
# If original tally was sparse, sparsify the sliced tally
new_tally.sparse = self.sparse
return new_tally
[docs] def summation(self, scores=[], filter_type=None,
filter_bins=[], nuclides=[], remove_filter=False):
"""Vectorized sum of tally data across scores, filter bins and/or
nuclides using tally aggregation.
This method constructs a new tally to encapsulate the sum of the data
represented by the summation of the data in this tally. The tally data
sum is determined by the scores, filter bins and nuclides specified
in the input parameters.
Parameters
----------
scores : list of str
A list of one or more score strings to sum across
(e.g., ['absorption', 'nu-fission']; default is [])
filter_type : openmc.FilterMeta
Type of the filter, e.g. MeshFilter
filter_bins : Iterable of int or tuple
A list of the filter bins corresponding to the filter_type parameter
Each bin in the list is the integer ID for 'material', 'surface',
'cell', 'cellborn', and 'universe' Filters. Each bin is an integer
for the cell instance ID for 'distribcell' Filters. Each bin is a
2-tuple of floats for 'energy' and 'energyout' filters corresponding
to the energy boundaries of the bin of interest. Each bin is an
(x,y,z) 3-tuple for 'mesh' filters corresponding to the mesh cell of
interest.
nuclides : list of str
A list of nuclide name strings to sum across
(e.g., ['U235', 'U238']; default is [])
remove_filter : bool
If a filter is being summed over, this bool indicates whether to
remove that filter in the returned tally. Default is False.
Returns
-------
openmc.Tally
A new tally which encapsulates the sum of data requested.
"""
# Create new derived Tally for summation
tally_sum = Tally()
tally_sum._derived = True
tally_sum._estimator = self.estimator
tally_sum._num_realizations = self.num_realizations
tally_sum._with_batch_statistics = self.with_batch_statistics
tally_sum._with_summary = self.with_summary
tally_sum._sp_filename = self._sp_filename
tally_sum._results_read = self._results_read
# Get tally data arrays reshaped with one dimension per filter
mean = self.get_reshaped_data(value='mean')
std_dev = self.get_reshaped_data(value='std_dev')
# Sum across any filter bins specified by the user
if isinstance(filter_type, openmc.FilterMeta):
find_filter = self.find_filter(filter_type)
# If user did not specify filter bins, sum across all bins
if len(filter_bins) == 0:
bin_indices = np.arange(find_filter.num_bins)
if isinstance(find_filter, openmc.DistribcellFilter):
filter_bins = np.arange(find_filter.num_bins)
elif isinstance(find_filter, openmc.EnergyFunctionFilter):
filter_bins = [None]
else:
filter_bins = find_filter.bins
# Only sum across bins specified by the user
else:
bin_indices = \
[find_filter.get_bin_index(bin) for bin in filter_bins]
# Sum across the bins in the user-specified filter
for i, self_filter in enumerate(self.filters):
if type(self_filter) == filter_type:
shape = mean.shape
mean = np.take(mean, indices=bin_indices, axis=i)
std_dev = np.take(std_dev, indices=bin_indices, axis=i)
# NumPy take introduces a new dimension in output array
# for some special cases that must be removed
if len(mean.shape) > len(shape):
mean = np.squeeze(mean, axis=i)
std_dev = np.squeeze(std_dev, axis=i)
mean = np.sum(mean, axis=i, keepdims=True)
std_dev = np.sum(std_dev**2, axis=i, keepdims=True)
std_dev = np.sqrt(std_dev)
# Add AggregateFilter to the tally sum
if not remove_filter:
filter_sum = openmc.AggregateFilter(self_filter,
[tuple(filter_bins)], 'sum')
tally_sum.filters.append(filter_sum)
# Add a copy of each filter not summed across to the tally sum
else:
tally_sum.filters.append(copy.deepcopy(self_filter))
# Add a copy of this tally's filters to the tally sum
else:
tally_sum._filters = copy.deepcopy(self.filters)
# Sum across any nuclides specified by the user
if len(nuclides) != 0:
nuclide_bins = [self.get_nuclide_index(nuclide) for nuclide in nuclides]
axis_index = self.num_filters
mean = np.take(mean, indices=nuclide_bins, axis=axis_index)
std_dev = np.take(std_dev, indices=nuclide_bins, axis=axis_index)
mean = np.sum(mean, axis=axis_index, keepdims=True)
std_dev = np.sum(std_dev**2, axis=axis_index, keepdims=True)
std_dev = np.sqrt(std_dev)
# Add AggregateNuclide to the tally sum
nuclide_sum = openmc.AggregateNuclide(nuclides, 'sum')
tally_sum.nuclides.append(nuclide_sum)
# Add a copy of this tally's nuclides to the tally sum
else:
tally_sum._nuclides = copy.deepcopy(self.nuclides)
# Sum across any scores specified by the user
if len(scores) != 0:
score_bins = [self.get_score_index(score) for score in scores]
axis_index = self.num_filters + 1
mean = np.take(mean, indices=score_bins, axis=axis_index)
std_dev = np.take(std_dev, indices=score_bins, axis=axis_index)
mean = np.sum(mean, axis=axis_index, keepdims=True)
std_dev = np.sum(std_dev**2, axis=axis_index, keepdims=True)
std_dev = np.sqrt(std_dev)
# Add AggregateScore to the tally sum
score_sum = openmc.AggregateScore(scores, 'sum')
tally_sum.scores.append(score_sum)
# Add a copy of this tally's scores to the tally sum
else:
tally_sum._scores = copy.deepcopy(self.scores)
# Reshape condensed data arrays with one dimension for all filters
mean = np.reshape(mean, tally_sum.shape)
std_dev = np.reshape(std_dev, tally_sum.shape)
# Assign tally sum's data with the new arrays
tally_sum._mean = mean
tally_sum._std_dev = std_dev
# If original tally was sparse, sparsify the tally summation
tally_sum.sparse = self.sparse
return tally_sum
[docs] def average(self, scores=[], filter_type=None,
filter_bins=[], nuclides=[], remove_filter=False):
"""Vectorized average of tally data across scores, filter bins and/or
nuclides using tally aggregation.
This method constructs a new tally to encapsulate the average of the
data represented by the average of the data in this tally. The tally
data average is determined by the scores, filter bins and nuclides
specified in the input parameters.
Parameters
----------
scores : list of str
A list of one or more score strings to average across
(e.g., ['absorption', 'nu-fission']; default is [])
filter_type : openmc.FilterMeta
Type of the filter, e.g. MeshFilter
filter_bins : Iterable of int or tuple
A list of the filter bins corresponding to the filter_type parameter
Each bin in the list is the integer ID for 'material', 'surface',
'cell', 'cellborn', and 'universe' Filters. Each bin is an integer
for the cell instance ID for 'distribcell' Filters. Each bin is a
2-tuple of floats for 'energy' and 'energyout' filters corresponding
to the energy boundaries of the bin of interest. Each bin is an
(x,y,z) 3-tuple for 'mesh' filters corresponding to the mesh cell of
interest.
nuclides : list of str
A list of nuclide name strings to average across
(e.g., ['U235', 'U238']; default is [])
remove_filter : bool
If a filter is being averaged over, this bool indicates whether to
remove that filter in the returned tally. Default is False.
Returns
-------
openmc.Tally
A new tally which encapsulates the average of data requested.
"""
# Create new derived Tally for average
tally_avg = Tally()
tally_avg._derived = True
tally_avg._estimator = self.estimator
tally_avg._num_realizations = self.num_realizations
tally_avg._with_batch_statistics = self.with_batch_statistics
tally_avg._with_summary = self.with_summary
tally_avg._sp_filename = self._sp_filename
tally_avg._results_read = self._results_read
# Get tally data arrays reshaped with one dimension per filter
mean = self.get_reshaped_data(value='mean')
std_dev = self.get_reshaped_data(value='std_dev')
# Average across any filter bins specified by the user
if isinstance(filter_type, openmc.FilterMeta):
find_filter = self.find_filter(filter_type)
# If user did not specify filter bins, average across all bins
if len(filter_bins) == 0:
bin_indices = np.arange(find_filter.num_bins)
if isinstance(find_filter, openmc.DistribcellFilter):
filter_bins = np.arange(find_filter.num_bins)
elif isinstance(find_filter, openmc.EnergyFunctionFilter):
filter_bins = [None]
else:
filter_bins = find_filter.bins
# Only average across bins specified by the user
else:
bin_indices = \
[find_filter.get_bin_index(bin) for bin in filter_bins]
# Average across the bins in the user-specified filter
for i, self_filter in enumerate(self.filters):
if isinstance(self_filter, filter_type):
shape = mean.shape
mean = np.take(mean, indices=bin_indices, axis=i)
std_dev = np.take(std_dev, indices=bin_indices, axis=i)
# NumPy take introduces a new dimension in output array
# for some special cases that must be removed
if len(mean.shape) > len(shape):
mean = np.squeeze(mean, axis=i)
std_dev = np.squeeze(std_dev, axis=i)
mean = np.nanmean(mean, axis=i, keepdims=True)
std_dev = np.nanmean(std_dev**2, axis=i, keepdims=True)
std_dev /= len(bin_indices)
std_dev = np.sqrt(std_dev)
# Add AggregateFilter to the tally avg
if not remove_filter:
filter_sum = openmc.AggregateFilter(self_filter,
[tuple(filter_bins)], 'avg')
tally_avg.filters.append(filter_sum)
# Add a copy of each filter not averaged across to the tally avg
else:
tally_avg.filters.append(copy.deepcopy(self_filter))
# Add a copy of this tally's filters to the tally avg
else:
tally_avg._filters = copy.deepcopy(self.filters)
# Sum across any nuclides specified by the user
if len(nuclides) != 0:
nuclide_bins = [self.get_nuclide_index(nuclide) for nuclide in nuclides]
axis_index = self.num_filters
mean = np.take(mean, indices=nuclide_bins, axis=axis_index)
std_dev = np.take(std_dev, indices=nuclide_bins, axis=axis_index)
mean = np.nanmean(mean, axis=axis_index, keepdims=True)
std_dev = np.nanmean(std_dev**2, axis=axis_index, keepdims=True)
std_dev /= len(nuclide_bins)
std_dev = np.sqrt(std_dev)
# Add AggregateNuclide to the tally avg
nuclide_avg = openmc.AggregateNuclide(nuclides, 'avg')
tally_avg.nuclides.append(nuclide_avg)
# Add a copy of this tally's nuclides to the tally avg
else:
tally_avg._nuclides = copy.deepcopy(self.nuclides)
# Sum across any scores specified by the user
if len(scores) != 0:
score_bins = [self.get_score_index(score) for score in scores]
axis_index = self.num_filters + 1
mean = np.take(mean, indices=score_bins, axis=axis_index)
std_dev = np.take(std_dev, indices=score_bins, axis=axis_index)
mean = np.nanmean(mean, axis=axis_index, keepdims=True)
std_dev = np.nanmean(std_dev**2, axis=axis_index, keepdims=True)
std_dev /= len(score_bins)
std_dev = np.sqrt(std_dev)
# Add AggregateScore to the tally avg
score_sum = openmc.AggregateScore(scores, 'avg')
tally_avg.scores.append(score_sum)
# Add a copy of this tally's scores to the tally avg
else:
tally_avg._scores = copy.deepcopy(self.scores)
# Reshape condensed data arrays with one dimension for all filters
mean = np.reshape(mean, tally_avg.shape)
std_dev = np.reshape(std_dev, tally_avg.shape)
# Assign tally avg's data with the new arrays
tally_avg._mean = mean
tally_avg._std_dev = std_dev
# If original tally was sparse, sparsify the tally average
tally_avg.sparse = self.sparse
return tally_avg
[docs] def diagonalize_filter(self, new_filter, filter_position=-1):
"""Diagonalize the tally data array along a new axis of filter bins.
This is a helper method for the tally arithmetic methods. This method
adds the new filter to a derived tally constructed copied from this one.
The data in the derived tally arrays is "diagonalized" along the bins in
the new filter. This functionality is used by the openmc.mgxs module; to
transport-correct scattering matrices by subtracting a 'scatter-P1'
reaction rate tally with an energy filter from a 'scatter' reaction
rate tally with both energy and energyout filters.
Parameters
----------
new_filter : Filter
The filter along which to diagonalize the data in the new
filter_position : int
Where to place the new filter in the Tally.filters list. Defaults
to last position.
Returns
-------
openmc.Tally
A new derived Tally with data diagaonalized along the new filter.
"""
cv.check_type('new_filter', new_filter, _FILTER_CLASSES)
cv.check_type('filter_position', filter_position, Integral)
if new_filter in self.filters:
msg = 'Unable to diagonalize Tally ID="{}" which already ' \
'contains a "{}" filter'.format(self.id, type(new_filter))
raise ValueError(msg)
# Add the new filter to a copy of this Tally
new_tally = copy.deepcopy(self)
new_tally.filters.insert(filter_position, new_filter)
# Determine "base" indices along the new "diagonal", and the factor
# by which the "base" indices should be repeated to account for all
# other filter bins in the diagonalized tally
indices = np.arange(0, new_filter.num_bins**2, new_filter.num_bins+1)
diag_factor = self.num_filter_bins // new_filter.num_bins
diag_indices = np.zeros(self.num_filter_bins, dtype=int)
# Determine the filter indices along the new "diagonal"
for i in range(diag_factor):
start = i * new_filter.num_bins
end = (i+1) * new_filter.num_bins
diag_indices[start:end] = indices + (i * new_filter.num_bins**2)
# Inject this Tally's data along the diagonal of the diagonalized Tally
if not self.derived and self.sum is not None:
new_tally._sum = np.zeros(new_tally.shape, dtype=np.float64)
new_tally._sum[diag_indices, :, :] = self.sum
if not self.derived and self.sum_sq is not None:
new_tally._sum_sq = np.zeros(new_tally.shape, dtype=np.float64)
new_tally._sum_sq[diag_indices, :, :] = self.sum_sq
if self.mean is not None:
new_tally._mean = np.zeros(new_tally.shape, dtype=np.float64)
new_tally._mean[diag_indices, :, :] = self.mean
if self.std_dev is not None:
new_tally._std_dev = np.zeros(new_tally.shape, dtype=np.float64)
new_tally._std_dev[diag_indices, :, :] = self.std_dev
# If original tally was sparse, sparsify the diagonalized tally
new_tally.sparse = self.sparse
return new_tally
[docs]class Tallies(cv.CheckedList):
"""Collection of Tallies used for an OpenMC simulation.
This class corresponds directly to the tallies.xml input file. It can be
thought of as a normal Python list where each member is a :class:`Tally`. It
behaves like a list as the following example demonstrates:
>>> t1 = openmc.Tally()
>>> t2 = openmc.Tally()
>>> t3 = openmc.Tally()
>>> tallies = openmc.Tallies([t1])
>>> tallies.append(t2)
>>> tallies += [t3]
Parameters
----------
tallies : Iterable of openmc.Tally
Tallies to add to the collection
"""
def __init__(self, tallies=None):
super().__init__(Tally, 'tallies collection')
if tallies is not None:
self += tallies
[docs] def append(self, tally, merge=False):
"""Append tally to collection
Parameters
----------
tally : openmc.Tally
Tally to append
merge : bool
Indicate whether the tally should be merged with an existing tally,
if possible. Defaults to False.
"""
if not isinstance(tally, Tally):
msg = 'Unable to add a non-Tally "{}" to the ' \
'Tallies instance'.format(tally)
raise TypeError(msg)
if merge:
merged = False
# Look for a tally to merge with this one
for i, tally2 in enumerate(self):
# If a mergeable tally is found
if tally2.can_merge(tally):
# Replace tally2 with the merged tally
merged_tally = tally2.merge(tally)
self[i] = merged_tally
merged = True
break
# If no mergeable tally was found, simply add this tally
if not merged:
super().append(tally)
else:
super().append(tally)
[docs] def insert(self, index, item):
"""Insert tally before index
Parameters
----------
index : int
Index in list
item : openmc.Tally
Tally to insert
"""
super().insert(index, item)
[docs] def merge_tallies(self):
"""Merge any mergeable tallies together. Note that n-way merges are
possible.
"""
for i, tally1 in enumerate(self):
for j, tally2 in enumerate(self):
# Do not merge the same tally with itself
if i == j:
continue
# If the two tallies are mergeable
if tally1.can_merge(tally2):
# Replace tally 1 with the merged tally
merged_tally = tally1.merge(tally2)
self[i] = merged_tally
# Remove tally 2 since it is no longer needed
self.pop(j)
# Continue iterating from the first loop
break
def _create_tally_subelements(self, root_element):
for tally in self:
root_element.append(tally.to_xml_element())
def _create_mesh_subelements(self, root_element):
already_written = set()
for tally in self:
for f in tally.filters:
if isinstance(f, openmc.MeshFilter):
if f.mesh.id not in already_written:
if len(f.mesh.name) > 0:
root_element.append(ET.Comment(f.mesh.name))
root_element.append(f.mesh.to_xml_element())
already_written.add(f.mesh.id)
def _create_filter_subelements(self, root_element):
already_written = dict()
for tally in self:
for f in tally.filters:
if f not in already_written:
root_element.append(f.to_xml_element())
already_written[f] = f.id
elif f.id != already_written[f]:
# Set the IDs of identical filters with different
# user-defined IDs to the same value
f.id = already_written[f]
def _create_derivative_subelements(self, root_element):
# Get a list of all derivatives referenced in a tally.
derivs = []
for tally in self:
deriv = tally.derivative
if deriv is not None and deriv not in derivs:
derivs.append(deriv)
# Add the derivatives to the XML tree.
for d in derivs:
root_element.append(d.to_xml_element())
[docs] def export_to_xml(self, path='tallies.xml'):
"""Create a tallies.xml file that can be used for a simulation.
Parameters
----------
path : str
Path to file to write. Defaults to 'tallies.xml'.
"""
root_element = ET.Element("tallies")
self._create_mesh_subelements(root_element)
self._create_filter_subelements(root_element)
self._create_tally_subelements(root_element)
self._create_derivative_subelements(root_element)
# Clean the indentation in the file to be user-readable
clean_indentation(root_element)
# Check if path is a directory
p = Path(path)
if p.is_dir():
p /= 'tallies.xml'
# Write the XML Tree to the tallies.xml file
reorder_attributes(root_element) # TODO: Remove when support is Python 3.8+
tree = ET.ElementTree(root_element)
tree.write(str(p), xml_declaration=True, encoding='utf-8')
[docs] @classmethod
def from_xml(cls, path='tallies.xml'):
"""Generate tallies from XML file
Parameters
----------
path : str, optional
Path to tallies XML file
Returns
-------
openmc.Tallies
Tallies object
"""
tree = ET.parse(path)
root = tree.getroot()
# Read mesh elements
meshes = {}
for elem in root.findall('mesh'):
mesh = MeshBase.from_xml_element(elem)
meshes[mesh.id] = mesh
# Read filter elements
filters = {}
for elem in root.findall('filter'):
filter = openmc.Filter.from_xml_element(elem, meshes=meshes)
filters[filter.id] = filter
# Read derivative elements
derivatives = {}
for elem in root.findall('derivative'):
deriv = openmc.TallyDerivative.from_xml_element(elem)
derivatives[deriv.id] = deriv
# Read tally elements
tallies = []
for elem in root.findall('tally'):
tally = openmc.Tally.from_xml_element(
elem, filters=filters, derivatives=derivatives
)
tallies.append(tally)
return cls(tallies)
```