Source code for openmc.filter

from __future__ import annotations
from abc import ABCMeta
from collections.abc import Iterable
import hashlib
from itertools import product
from numbers import Real, Integral
import warnings

import lxml.etree as ET
import numpy as np
import pandas as pd

import openmc
import openmc.checkvalue as cv
from .cell import Cell
from .material import Material
from .mixin import IDManagerMixin
from .surface import Surface
from .universe import UniverseBase
from ._xml import get_text


_FILTER_TYPES = (
    'universe', 'material', 'cell', 'cellborn', 'surface', 'mesh', 'energy',
    'energyout', 'mu', 'polar', 'azimuthal', 'distribcell', 'delayedgroup',
    'energyfunction', 'cellfrom', 'materialfrom', 'legendre', 'spatiallegendre',
    'sphericalharmonics', 'zernike', 'zernikeradial', 'particle', 'cellinstance',
    'collision', 'time'
)

_CURRENT_NAMES = (
    'x-min out', 'x-min in', 'x-max out', 'x-max in',
    'y-min out', 'y-min in', 'y-max out', 'y-max in',
    'z-min out', 'z-min in', 'z-max out', 'z-max in'
)

_PARTICLES = {'neutron', 'photon', 'electron', 'positron'}


class FilterMeta(ABCMeta):
    """Metaclass for filters that ensures class names are appropriate."""

    def __new__(cls, name, bases, namespace, **kwargs):
        # Check the class name.
        required_suffix = 'Filter'
        if not name.endswith(required_suffix):
            raise ValueError("All filter class names must end with 'Filter'")

        # Create a 'short_name' attribute that removes the 'Filter' suffix.
        namespace['short_name'] = name[:-len(required_suffix)]

        # Subclass methods can sort of inherit the docstring of parent class
        # methods.  If a function is defined without a docstring, most (all?)
        # Python interpreters will search through the parent classes to see if
        # there is a docstring for a function with the same name, and they will
        # use that docstring.  However, Sphinx does not have that functionality.
        # This chunk of code handles this docstring inheritance manually so that
        # the autodocumentation will pick it up.
        if name != required_suffix:
            # Look for newly-defined functions that were also in Filter.
            for func_name in namespace:
                if func_name in Filter.__dict__:
                    # Inherit the docstring from Filter if not defined.
                    if isinstance(namespace[func_name],
                                  (classmethod, staticmethod)):
                        new_doc = namespace[func_name].__func__.__doc__
                        old_doc = Filter.__dict__[func_name].__func__.__doc__
                        if new_doc is None and old_doc is not None:
                            namespace[func_name].__func__.__doc__ = old_doc
                    else:
                        new_doc = namespace[func_name].__doc__
                        old_doc = Filter.__dict__[func_name].__doc__
                        if new_doc is None and old_doc is not None:
                            namespace[func_name].__doc__ = old_doc

        # Make the class.
        return super().__new__(cls, name, bases, namespace, **kwargs)


def _repeat_and_tile(bins, repeat_factor, data_size):
    filter_bins = np.repeat(bins, repeat_factor)
    tile_factor = data_size // len(filter_bins)
    return np.tile(filter_bins, tile_factor)


[docs]class Filter(IDManagerMixin, metaclass=FilterMeta): """Tally modifier that describes phase-space and other characteristics. Parameters ---------- bins : Integral or Iterable of Integral or Iterable of Real The bins for the filter. This takes on different meaning for different filters. See the docstrings for subclasses of this filter or the online documentation for more details. filter_id : int Unique identifier for the filter Attributes ---------- bins : Integral or Iterable of Integral or Iterable of Real The bins for the filter id : int Unique identifier for the filter num_bins : Integral The number of filter bins shape : tuple The shape of the filter """ next_id = 1 used_ids = set() def __init__(self, bins, filter_id=None): self.bins = bins self.id = filter_id def __eq__(self, other): if type(self) is not type(other): return False elif len(self.bins) != len(other.bins): return False else: return np.allclose(self.bins, other.bins) def __gt__(self, other): if type(self) is not type(other): if self.short_name in _FILTER_TYPES and \ other.short_name in _FILTER_TYPES: delta = _FILTER_TYPES.index(self.short_name) - \ _FILTER_TYPES.index(other.short_name) return delta > 0 else: return False else: return max(self.bins) > max(other.bins) def __lt__(self, other): return not self > other def __hash__(self): string = type(self).__name__ + '\n' string += '{: <16}=\t{}\n'.format('\tBins', self.bins) return hash(string) def __repr__(self): string = type(self).__name__ + '\n' string += '{: <16}=\t{}\n'.format('\tBins', self.bins) string += '{: <16}=\t{}\n'.format('\tID', self.id) return string @classmethod def _recursive_subclasses(cls): """Return all subclasses and their subclasses, etc.""" all_subclasses = [] for subclass in cls.__subclasses__(): all_subclasses.append(subclass) all_subclasses.extend(subclass._recursive_subclasses()) return all_subclasses
[docs] @classmethod def from_hdf5(cls, group, **kwargs): """Construct a new Filter instance from HDF5 data. Parameters ---------- group : h5py.Group HDF5 group to read from Keyword arguments ----------------- meshes : dict Dictionary mapping integer IDs to openmc.MeshBase objects. Only used for openmc.MeshFilter objects. """ filter_id = int(group.name.split('/')[-1].lstrip('filter ')) # If the HDF5 'type' variable matches this class's short_name, then # there is no overridden from_hdf5 method. Pass the bins to __init__. if group['type'][()].decode() == cls.short_name.lower(): out = cls(group['bins'][()], filter_id=filter_id) out._num_bins = group['n_bins'][()] return out # Search through all subclasses and find the one matching the HDF5 # 'type'. Call that class's from_hdf5 method. for subclass in cls._recursive_subclasses(): if group['type'][()].decode() == subclass.short_name.lower(): return subclass.from_hdf5(group, **kwargs) raise ValueError("Unrecognized Filter class: '" + group['type'][()].decode() + "'")
@property def bins(self): return self._bins @bins.setter def bins(self, bins): self.check_bins(bins) self._bins = bins @property def num_bins(self): return len(self.bins) @property def shape(self): return (self.num_bins,)
[docs] def check_bins(self, bins): """Make sure given bins are valid for this filter. Raises ------ TypeError ValueError """ pass
[docs] def to_xml_element(self): """Return XML Element representing the Filter. Returns ------- element : lxml.etree._Element XML element containing filter data """ element = ET.Element('filter') element.set('id', str(self.id)) element.set('type', self.short_name.lower()) subelement = ET.SubElement(element, 'bins') subelement.text = ' '.join(str(b) for b in self.bins) return element
[docs] @classmethod def from_xml_element(cls, elem, **kwargs): """Generate a filter from an XML element Parameters ---------- elem : lxml.etree._Element XML element **kwargs Keyword arguments (e.g., mesh information) Returns ------- openmc.Filter Filter object """ filter_type = elem.get('type') if filter_type is None: filter_type = elem.find('type').text # If the filter type matches this class's short_name, then # there is no overridden from_xml_element method if filter_type == cls.short_name.lower(): # Get bins from element -- the default here works for any filters # that just store a list of bins that can be represented as integers filter_id = int(elem.get('id')) bins = [int(x) for x in get_text(elem, 'bins').split()] return cls(bins, filter_id=filter_id) # Search through all subclasses and find the one matching the HDF5 # 'type'. Call that class's from_hdf5 method for subclass in cls._recursive_subclasses(): if filter_type == subclass.short_name.lower(): return subclass.from_xml_element(elem, **kwargs)
[docs] def can_merge(self, other): """Determine if filter can be merged with another. Parameters ---------- other : openmc.Filter Filter to compare with Returns ------- bool Whether the filter can be merged """ return type(self) is type(other)
[docs] def merge(self, other): """Merge this filter with another. Parameters ---------- other : openmc.Filter Filter to merge with Returns ------- merged_filter : openmc.Filter Filter resulting from the merge """ if not self.can_merge(other): msg = f'Unable to merge "{type(self)}" with "{type(other)}"' raise ValueError(msg) # Merge unique filter bins merged_bins = np.concatenate((self.bins, other.bins)) merged_bins = np.unique(merged_bins, axis=0) # Create a new filter with these bins and a new auto-generated ID return type(self)(merged_bins)
[docs] def is_subset(self, other): """Determine if another filter is a subset of this filter. If all of the bins in the other filter are included as bins in this filter, then it is a subset of this filter. Parameters ---------- other : openmc.Filter The filter to query as a subset of this filter Returns ------- bool Whether or not the other filter is a subset of this filter """ if type(self) is not type(other): return False for b in other.bins: if b not in self.bins: return False return True
[docs] def get_bin_index(self, filter_bin): """Returns the index in the Filter for some bin. Parameters ---------- filter_bin : int or tuple The bin is the integer ID for 'material', 'surface', 'cell', 'cellborn', and 'universe' Filters. The bin is an integer for the cell instance ID for 'distribcell' Filters. The bin is a 2-tuple of floats for 'energy' and 'energyout' filters corresponding to the energy boundaries of the bin of interest. The bin is an (x,y,z) 3-tuple for 'mesh' filters corresponding to the mesh cell of interest. Returns ------- filter_index : int The index in the Tally data array for this filter bin. """ if filter_bin not in self.bins: msg = ('Unable to get the bin index for Filter since ' f'"{filter_bin}" is not one of the bins') raise ValueError(msg) if isinstance(self.bins, np.ndarray): return np.where(self.bins == filter_bin)[0][0] else: return self.bins.index(filter_bin)
[docs] def get_pandas_dataframe(self, data_size, stride, **kwargs): """Builds a Pandas DataFrame for the Filter's bins. This method constructs a Pandas DataFrame object for the filter with columns annotated by filter bin information. This is a helper method for :meth:`Tally.get_pandas_dataframe`. Parameters ---------- data_size : int The total number of bins in the tally corresponding to this filter stride : int Stride in memory for the filter Keyword arguments ----------------- paths : bool Only used for DistribcellFilter. If True (default), expand distribcell indices into multi-index columns describing the path to that distribcell through the CSG tree. NOTE: This option assumes that all distribcell paths are of the same length and do not have the same universes and cells but different lattice cell indices. Returns ------- pandas.DataFrame A Pandas DataFrame with columns of strings that characterize the filter's bins. The number of rows in the DataFrame is the same as the total number of bins in the corresponding tally, with the filter bin appropriately tiled to map to the corresponding tally bins. See also -------- Tally.get_pandas_dataframe(), CrossFilter.get_pandas_dataframe() """ # Initialize Pandas DataFrame df = pd.DataFrame() filter_bins = np.repeat(self.bins, stride) tile_factor = data_size // len(filter_bins) filter_bins = np.tile(filter_bins, tile_factor) df = pd.concat([df, pd.DataFrame( {self.short_name.lower(): filter_bins})]) return df
class WithIDFilter(Filter): """Abstract parent for filters of types with IDs (Cell, Material, etc.).""" def __init__(self, bins, filter_id=None): bins = np.atleast_1d(bins) # Make sure bins are either integers or appropriate objects cv.check_iterable_type('filter bins', bins, (Integral, self.expected_type)) # Extract ID values bins = np.array([b if isinstance(b, Integral) else b.id for b in bins]) super().__init__(bins, filter_id) def check_bins(self, bins): # Check the bin values. for edge in bins: cv.check_greater_than('filter bin', edge, 0, equality=True)
[docs]class UniverseFilter(WithIDFilter): """Bins tally event locations based on the Universe they occurred in. Parameters ---------- bins : openmc.UniverseBase, int, or iterable thereof The Universes to tally. Either :class:`openmc.UniverseBase` objects or their Integral ID numbers can be used. filter_id : int Unique identifier for the filter Attributes ---------- bins : Iterable of Integral openmc.UniverseBase IDs. id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ expected_type = UniverseBase
[docs]class MaterialFilter(WithIDFilter): """Bins tally event locations based on the Material they occurred in. Parameters ---------- bins : openmc.Material, Integral, or iterable thereof The material(s) to tally. Either :class:`openmc.Material` objects or their Integral ID numbers can be used. filter_id : int Unique identifier for the filter Attributes ---------- bins : Iterable of Integral openmc.Material IDs. id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ expected_type = Material
[docs]class MaterialFromFilter(WithIDFilter): """Bins tally event locations based on the Material they occurred in. Parameters ---------- bins : openmc.Material, Integral, or iterable thereof The material(s) to tally. Either :class:`openmc.Material` objects or their Integral ID numbers can be used. filter_id : int Unique identifier for the filter Attributes ---------- bins : Iterable of Integral openmc.Material IDs. id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ expected_type = Material
[docs]class CellFilter(WithIDFilter): """Bins tally event locations based on the Cell they occurred in. Parameters ---------- bins : openmc.Cell, int, or iterable thereof The cells to tally. Either :class:`openmc.Cell` objects or their ID numbers can be used. filter_id : int Unique identifier for the filter Attributes ---------- bins : Iterable of Integral openmc.Cell IDs. id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ expected_type = Cell
[docs]class CellFromFilter(WithIDFilter): """Bins tally on which cell the particle came from. Parameters ---------- bins : openmc.Cell, Integral, or iterable thereof The cell(s) to tally. Either :class:`openmc.Cell` objects or their integral ID numbers can be used. filter_id : int Unique identifier for the filter Attributes ---------- bins : Integral or Iterable of Integral Cell IDs. id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ expected_type = Cell
[docs]class CellBornFilter(WithIDFilter): """Bins tally events based on which cell the particle was born in. Parameters ---------- bins : openmc.Cell, Integral, or iterable thereof The birth cells to tally. Either :class:`openmc.Cell` objects or their integral ID numbers can be used. filter_id : int Unique identifier for the filter Attributes ---------- bins : Iterable of Integral Cell IDs. id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ expected_type = Cell
# Temporary alias for CellbornFilter def CellbornFilter(*args, **kwargs): warnings.warn('The name of "CellbornFilter" has changed to ' '"CellBornFilter". "CellbornFilter" will be ' 'removed in the future.', FutureWarning) return CellBornFilter(*args, **kwargs)
[docs]class CellInstanceFilter(Filter): """Bins tally events based on which cell instance a particle is in. This filter is similar to :class:`DistribcellFilter` but allows one to select particular instances to be tallied (instead of obtaining *all* instances by default) and allows instances from different cells to be specified in a single filter. .. versionadded:: 0.12 Parameters ---------- bins : iterable of 2-tuples or numpy.ndarray The cell instances to tally, given as 2-tuples. For the first value in the tuple, either openmc.Cell objects or their integral ID numbers can be used. The second value indicates the cell instance. filter_id : int Unique identifier for the filter Attributes ---------- bins : numpy.ndarray 2D numpy array of cell IDs and instances id : int Unique identifier for the filter num_bins : Integral The number of filter bins See Also -------- DistribcellFilter """ def __init__(self, bins, filter_id=None): self.bins = bins self.id = filter_id @Filter.bins.setter def bins(self, bins): pairs = np.empty((len(bins), 2), dtype=int) for i, (cell, instance) in enumerate(bins): cv.check_type('cell', cell, (openmc.Cell, Integral)) cv.check_type('instance', instance, Integral) pairs[i, 0] = cell if isinstance(cell, Integral) else cell.id pairs[i, 1] = instance self._bins = pairs
[docs] def get_pandas_dataframe(self, data_size, stride, **kwargs): """Builds a Pandas DataFrame for the Filter's bins. This method constructs a Pandas DataFrame object for the filter with columns annotated by filter bin information. This is a helper method for :meth:`Tally.get_pandas_dataframe`. Parameters ---------- data_size : int The total number of bins in the tally corresponding to this filter stride : int Stride in memory for the filter Returns ------- pandas.DataFrame A Pandas DataFrame with a multi-index column for the cell instance. The number of rows in the DataFrame is the same as the total number of bins in the corresponding tally, with the filter bin appropriately tiled to map to the corresponding tally bins. See also -------- Tally.get_pandas_dataframe(), CrossFilter.get_pandas_dataframe() """ # Repeat and tile bins as necessary to account for other filters. bins = np.repeat(self.bins, stride, axis=0) tile_factor = data_size // len(bins) bins = np.tile(bins, (tile_factor, 1)) columns = pd.MultiIndex.from_product([[self.short_name.lower()], ['cell', 'instance']]) return pd.DataFrame(bins, columns=columns)
[docs] def to_xml_element(self): """Return XML Element representing the Filter. Returns ------- element : lxml.etree._Element XML element containing filter data """ element = ET.Element('filter') element.set('id', str(self.id)) element.set('type', self.short_name.lower()) subelement = ET.SubElement(element, 'bins') subelement.text = ' '.join(str(i) for i in self.bins.ravel()) return element
[docs] @classmethod def from_xml_element(cls, elem, **kwargs): filter_id = int(elem.get('id')) bins = [int(x) for x in get_text(elem, 'bins').split()] cell_instances = list(zip(bins[::2], bins[1::2])) return cls(cell_instances, filter_id=filter_id)
[docs]class SurfaceFilter(WithIDFilter): """Filters particles by surface crossing Parameters ---------- bins : openmc.Surface, int, or iterable of Integral The surfaces to tally over. Either openmc.Surface objects or their ID numbers can be used. filter_id : int Unique identifier for the filter Attributes ---------- bins : Iterable of Integral The surfaces to tally over. Either openmc.Surface objects or their ID numbers can be used. id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ expected_type = Surface
[docs]class ParticleFilter(Filter): """Bins tally events based on the Particle type. Parameters ---------- bins : str, or iterable of str The particles to tally represented as strings ('neutron', 'photon', 'electron', 'positron'). filter_id : int Unique identifier for the filter Attributes ---------- bins : iterable of str The particles to tally id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ def __eq__(self, other): if type(self) is not type(other): return False elif len(self.bins) != len(other.bins): return False else: return np.all(self.bins == other.bins) __hash__ = Filter.__hash__ @Filter.bins.setter def bins(self, bins): bins = np.atleast_1d(bins) cv.check_iterable_type('filter bins', bins, str) for edge in bins: cv.check_value('filter bin', edge, _PARTICLES) self._bins = bins
[docs] @classmethod def from_hdf5(cls, group, **kwargs): if group['type'][()].decode() != cls.short_name.lower(): raise ValueError("Expected HDF5 data for filter type '" + cls.short_name.lower() + "' but got '" + group['type'][()].decode() + " instead") particles = [b.decode() for b in group['bins'][()]] filter_id = int(group.name.split('/')[-1].lstrip('filter ')) return cls(particles, filter_id=filter_id)
[docs] @classmethod def from_xml_element(cls, elem, **kwargs): filter_id = int(elem.get('id')) bins = get_text(elem, 'bins').split() return cls(bins, filter_id=filter_id)
[docs]class MeshFilter(Filter): """Bins tally event locations by mesh elements. Parameters ---------- mesh : openmc.MeshBase The mesh object that events will be tallied onto filter_id : int Unique identifier for the filter Attributes ---------- mesh : openmc.MeshBase The mesh object that events will be tallied onto id : int Unique identifier for the filter translation : Iterable of float This array specifies a vector that is used to translate (shift) the mesh for this filter bins : list of tuple A list of mesh indices for each filter bin, e.g. [(1, 1, 1), (2, 1, 1), ...] num_bins : Integral The number of filter bins """ def __init__(self, mesh, filter_id=None): self.mesh = mesh self.id = filter_id self._translation = None def __hash__(self): string = type(self).__name__ + '\n' string += '{: <16}=\t{}\n'.format('\tMesh ID', self.mesh.id) return hash(string) def __repr__(self): string = type(self).__name__ + '\n' string += '{: <16}=\t{}\n'.format('\tMesh ID', self.mesh.id) string += '{: <16}=\t{}\n'.format('\tID', self.id) string += '{: <16}=\t{}\n'.format('\tTranslation', self.translation) return string
[docs] @classmethod def from_hdf5(cls, group, **kwargs): if group['type'][()].decode() != cls.short_name.lower(): raise ValueError("Expected HDF5 data for filter type '" + cls.short_name.lower() + "' but got '" + group['type'][()].decode() + " instead") if 'meshes' not in kwargs: raise ValueError(cls.__name__ + " requires a 'meshes' keyword " "argument.") mesh_id = group['bins'][()] mesh_obj = kwargs['meshes'][mesh_id] filter_id = int(group.name.split('/')[-1].lstrip('filter ')) out = cls(mesh_obj, filter_id=filter_id) translation = group.get('translation') if translation: out.translation = translation[()] return out
@property def mesh(self): return self._mesh @mesh.setter def mesh(self, mesh): cv.check_type('filter mesh', mesh, openmc.MeshBase) self._mesh = mesh if isinstance(mesh, openmc.UnstructuredMesh): if mesh.has_statepoint_data: self.bins = list(range(len(mesh.volumes))) else: self.bins = [] else: self.bins = list(mesh.indices) @property def shape(self): if isinstance(self, MeshSurfaceFilter): return (self.num_bins,) return self.mesh.dimension @property def translation(self): return self._translation @translation.setter def translation(self, t): cv.check_type('mesh filter translation', t, Iterable, Real) cv.check_length('mesh filter translation', t, 3) self._translation = np.asarray(t)
[docs] def can_merge(self, other): # Mesh filters cannot have more than one bin return False
[docs] def get_pandas_dataframe(self, data_size, stride, **kwargs): """Builds a Pandas DataFrame for the Filter's bins. This method constructs a Pandas DataFrame object for the filter with columns annotated by filter bin information. This is a helper method for :meth:`Tally.get_pandas_dataframe`. Parameters ---------- data_size : int The total number of bins in the tally corresponding to this filter stride : int Stride in memory for the filter Returns ------- pandas.DataFrame A Pandas DataFrame with three columns describing the x,y,z mesh cell indices corresponding to each filter bin. The number of rows in the DataFrame is the same as the total number of bins in the corresponding tally, with the filter bin appropriately tiled to map to the corresponding tally bins. See also -------- Tally.get_pandas_dataframe(), CrossFilter.get_pandas_dataframe() """ # Initialize Pandas DataFrame df = pd.DataFrame() # Initialize dictionary to build Pandas Multi-index column filter_dict = {} # Append mesh ID as outermost index of multi-index mesh_key = f'mesh {self.mesh.id}' # Find mesh dimensions - use 3D indices for simplicity n_dim = len(self.mesh.dimension) if n_dim == 3: nx, ny, nz = self.mesh.dimension elif n_dim == 2: nx, ny = self.mesh.dimension nz = 1 else: nx = self.mesh.dimension ny = nz = 1 # Generate multi-index sub-column for x-axis filter_dict[mesh_key, 'x'] = _repeat_and_tile( np.arange(1, nx + 1), stride, data_size) # Generate multi-index sub-column for y-axis filter_dict[mesh_key, 'y'] = _repeat_and_tile( np.arange(1, ny + 1), nx * stride, data_size) # Generate multi-index sub-column for z-axis filter_dict[mesh_key, 'z'] = _repeat_and_tile( np.arange(1, nz + 1), nx * ny * stride, data_size) # Initialize a Pandas DataFrame from the mesh dictionary df = pd.concat([df, pd.DataFrame(filter_dict)]) return df
[docs] def to_xml_element(self): """Return XML Element representing the Filter. Returns ------- element : lxml.etree._Element XML element containing filter data """ element = super().to_xml_element() element[0].text = str(self.mesh.id) if self.translation is not None: element.set('translation', ' '.join(map(str, self.translation))) return element
[docs] @classmethod def from_xml_element(cls, elem: ET.Element, **kwargs) -> MeshFilter: mesh_id = int(get_text(elem, 'bins')) mesh_obj = kwargs['meshes'][mesh_id] filter_id = int(elem.get('id')) out = cls(mesh_obj, filter_id=filter_id) translation = elem.get('translation') if translation: out.translation = [float(x) for x in translation.split()] return out
[docs]class MeshBornFilter(MeshFilter): """Filter events by the mesh cell a particle originated from. Parameters ---------- mesh : openmc.MeshBase The mesh object that events will be tallied onto filter_id : int Unique identifier for the filter Attributes ---------- mesh : openmc.MeshBase The mesh object that events will be tallied onto id : int Unique identifier for the filter translation : Iterable of float This array specifies a vector that is used to translate (shift) the mesh for this filter bins : list of tuple A list of mesh indices for each filter bin, e.g. [(1, 1, 1), (2, 1, 1), ...] num_bins : Integral The number of filter bins """
[docs]class MeshSurfaceFilter(MeshFilter): """Filter events by surface crossings on a mesh. Parameters ---------- mesh : openmc.MeshBase The mesh object that events will be tallied onto filter_id : int Unique identifier for the filter Attributes ---------- mesh : openmc.MeshBase The mesh object that events will be tallied onto translation : Iterable of float This array specifies a vector that is used to translate (shift) the mesh for this filter id : int Unique identifier for the filter bins : list of tuple A list of mesh indices / surfaces for each filter bin, e.g. [(1, 1, 'x-min out'), (1, 1, 'x-min in'), ...] num_bins : Integral The number of filter bins """ @MeshFilter.mesh.setter def mesh(self, mesh): cv.check_type('filter mesh', mesh, openmc.MeshBase) self._mesh = mesh # Take the product of mesh indices and current names n_dim = mesh.n_dimension self.bins = [mesh_tuple + (surf,) for mesh_tuple, surf in product(mesh.indices, _CURRENT_NAMES[:4*n_dim])]
[docs] def get_pandas_dataframe(self, data_size, stride, **kwargs): """Builds a Pandas DataFrame for the Filter's bins. This method constructs a Pandas DataFrame object for the filter with columns annotated by filter bin information. This is a helper method for :meth:`Tally.get_pandas_dataframe`. Parameters ---------- data_size : int The total number of bins in the tally corresponding to this filter stride : int Stride in memory for the filter Returns ------- pandas.DataFrame A Pandas DataFrame with three columns describing the x,y,z mesh cell indices corresponding to each filter bin. The number of rows in the DataFrame is the same as the total number of bins in the corresponding tally, with the filter bin appropriately tiled to map to the corresponding tally bins. See also -------- Tally.get_pandas_dataframe(), CrossFilter.get_pandas_dataframe() """ # Initialize Pandas DataFrame df = pd.DataFrame() # Initialize dictionary to build Pandas Multi-index column filter_dict = {} # Append mesh ID as outermost index of multi-index mesh_key = f'mesh {self.mesh.id}' # Find mesh dimensions - use 3D indices for simplicity n_surfs = 4 * len(self.mesh.dimension) if len(self.mesh.dimension) == 3: nx, ny, nz = self.mesh.dimension elif len(self.mesh.dimension) == 2: nx, ny = self.mesh.dimension nz = 1 else: nx = self.mesh.dimension ny = nz = 1 # Generate multi-index sub-column for x-axis filter_dict[mesh_key, 'x'] = _repeat_and_tile( np.arange(1, nx + 1), n_surfs * stride, data_size) # Generate multi-index sub-column for y-axis if len(self.mesh.dimension) > 1: filter_dict[mesh_key, 'y'] = _repeat_and_tile( np.arange(1, ny + 1), n_surfs * nx * stride, data_size) # Generate multi-index sub-column for z-axis if len(self.mesh.dimension) > 2: filter_dict[mesh_key, 'z'] = _repeat_and_tile( np.arange(1, nz + 1), n_surfs * nx * ny * stride, data_size) # Generate multi-index sub-column for surface filter_dict[mesh_key, 'surf'] = _repeat_and_tile( _CURRENT_NAMES[:n_surfs], stride, data_size) # Initialize a Pandas DataFrame from the mesh dictionary return pd.concat([df, pd.DataFrame(filter_dict)])
[docs]class CollisionFilter(Filter): """Bins tally events based on the number of collisions. .. versionadded:: 0.12.2 Parameters ---------- bins : Iterable of int A list or iterable of the number of collisions, as integer values. The events whose post-scattering collision number equals one of the provided values will be counted. filter_id : int Unique identifier for the filter Attributes ---------- id : int Unique identifier for the filter bins : numpy.ndarray An array of integer values representing the number of collisions events by which to filter num_bins : int The number of filter bins """ def __init__(self, bins, filter_id=None): self.bins = np.asarray(bins) self.id = filter_id
[docs] def check_bins(self, bins): for x in bins: # Values should be integers cv.check_type('filter value', x, Integral) cv.check_greater_than('filter value', x, 0, equality=True)
class RealFilter(Filter): """Tally modifier that describes phase-space and other characteristics Parameters ---------- values : iterable of float A list of values for which each successive pair constitutes a range of values for a single bin filter_id : int Unique identifier for the filter Attributes ---------- values : numpy.ndarray An array of values for which each successive pair constitutes a range of values for a single bin id : int Unique identifier for the filter bins : numpy.ndarray An array of shape (N, 2) where each row is a pair of values indicating a filter bin range num_bins : int The number of filter bins """ def __init__(self, values, filter_id=None): self.values = np.asarray(values) self.bins = np.vstack((self.values[:-1], self.values[1:])).T self.id = filter_id def __gt__(self, other): if type(self) is type(other): # Compare largest/smallest bin edges in filters # This logic is used when merging tallies with real filters return self.values[0] >= other.values[-1] else: return super().__gt__(other) def __repr__(self): string = type(self).__name__ + '\n' string += '{: <16}=\t{}\n'.format('\tValues', self.values) string += '{: <16}=\t{}\n'.format('\tID', self.id) return string @Filter.bins.setter def bins(self, bins): Filter.bins.__set__(self, np.asarray(bins)) def check_bins(self, bins): for v0, v1 in bins: # Values should be real cv.check_type('filter value', v0, Real) cv.check_type('filter value', v1, Real) # Make sure that each tuple has values that are increasing if v1 < v0: raise ValueError(f'Values {v0} and {v1} appear to be out of ' 'order') for pair0, pair1 in zip(bins[:-1], bins[1:]): # Successive pairs should be ordered if pair1[1] < pair0[1]: raise ValueError(f'Values {pair1[1]} and {pair0[1]} appear to ' 'be out of order') def can_merge(self, other): if type(self) is not type(other): return False if self.bins[0, 0] == other.bins[-1][1]: # This low edge coincides with other's high edge return True elif self.bins[-1][1] == other.bins[0, 0]: # This high edge coincides with other's low edge return True else: return False def merge(self, other): if not self.can_merge(other): msg = f'Unable to merge "{type(self)}" with "{type(other)}" filters' raise ValueError(msg) # Merge unique filter bins merged_values = np.concatenate((self.values, other.values)) merged_values = np.unique(merged_values) # Create a new filter with these bins and a new auto-generated ID return type(self)(sorted(merged_values)) def is_subset(self, other): """Determine if another filter is a subset of this filter. If all of the bins in the other filter are included as bins in this filter, then it is a subset of this filter. Parameters ---------- other : openmc.Filter The filter to query as a subset of this filter Returns ------- bool Whether or not the other filter is a subset of this filter """ if type(self) is not type(other): return False elif self.num_bins != other.num_bins: return False else: return np.allclose(self.values, other.values) def get_bin_index(self, filter_bin): i = np.where(self.bins[:, 1] == filter_bin[1])[0] if len(i) == 0: msg = ('Unable to get the bin index for Filter since ' f'"{filter_bin}" is not one of the bins') raise ValueError(msg) else: return i[0] def get_pandas_dataframe(self, data_size, stride, **kwargs): """Builds a Pandas DataFrame for the Filter's bins. This method constructs a Pandas DataFrame object for the filter with columns annotated by filter bin information. This is a helper method for :meth:`Tally.get_pandas_dataframe`. Parameters ---------- data_size : int The total number of bins in the tally corresponding to this filter stride : int Stride in memory for the filter Returns ------- pandas.DataFrame A Pandas DataFrame with one column of the lower energy bound and one column of upper energy bound for each filter bin. The number of rows in the DataFrame is the same as the total number of bins in the corresponding tally, with the filter bin appropriately tiled to map to the corresponding tally bins. See also -------- Tally.get_pandas_dataframe(), CrossFilter.get_pandas_dataframe() """ # Initialize Pandas DataFrame df = pd.DataFrame() # Extract the lower and upper energy bounds, then repeat and tile # them as necessary to account for other filters. lo_bins = np.repeat(self.bins[:, 0], stride) hi_bins = np.repeat(self.bins[:, 1], stride) tile_factor = data_size // len(lo_bins) lo_bins = np.tile(lo_bins, tile_factor) hi_bins = np.tile(hi_bins, tile_factor) # Add the new energy columns to the DataFrame. if hasattr(self, 'units'): units = f' [{self.units}]' else: units = '' df.loc[:, self.short_name.lower() + ' low' + units] = lo_bins df.loc[:, self.short_name.lower() + ' high' + units] = hi_bins return df def to_xml_element(self): """Return XML Element representing the Filter. Returns ------- element : lxml.etree._Element XML element containing filter data """ element = super().to_xml_element() element[0].text = ' '.join(str(x) for x in self.values) return element @classmethod def from_xml_element(cls, elem, **kwargs): filter_id = int(elem.get('id')) bins = [float(x) for x in get_text(elem, 'bins').split()] return cls(bins, filter_id=filter_id)
[docs]class EnergyFilter(RealFilter): """Bins tally events based on incident particle energy. Parameters ---------- values : Iterable of Real A list of values for which each successive pair constitutes a range of energies in [eV] for a single bin filter_id : int Unique identifier for the filter Attributes ---------- values : numpy.ndarray An array of values for which each successive pair constitutes a range of energies in [eV] for a single bin id : int Unique identifier for the filter bins : numpy.ndarray An array of shape (N, 2) where each row is a pair of energies in [eV] for a single filter bin num_bins : int The number of filter bins """ units = 'eV' def __init__(self, values, filter_id=None): cv.check_length('values', values, 2) super().__init__(values, filter_id)
[docs] def get_bin_index(self, filter_bin): # Use lower energy bound to find index for RealFilters deltas = np.abs(self.bins[:, 1] - filter_bin[1]) / filter_bin[1] min_delta = np.min(deltas) if min_delta < 1E-3: return deltas.argmin() else: msg = ('Unable to get the bin index for Filter since ' f'"{filter_bin}" is not one of the bins') raise ValueError(msg)
[docs] def check_bins(self, bins): super().check_bins(bins) for v0, v1 in bins: cv.check_greater_than('filter value', v0, 0., equality=True) cv.check_greater_than('filter value', v1, 0., equality=True)
[docs] def get_tabular(self, values, **kwargs): """Create a tabulated distribution based on tally results with an energy filter This method provides an easy way to create a distribution in energy (e.g., a source spectrum) based on tally results that were obtained from using an :class:`~openmc.EnergyFilter`. .. versionadded:: 0.13.3 Parameters ---------- values : iterable of float Array of numeric values, typically from a tally result **kwargs Keyword arguments passed to :class:`openmc.stats.Tabular` Returns ------- openmc.stats.Tabular Tabular distribution with histogram interpolation """ probabilities = np.array(values, dtype=float) probabilities /= probabilities.sum() # Determine probability per eV, adding extra 0 at the end since it is a histogram probability_per_ev = probabilities / np.diff(self.values) probability_per_ev = np.append(probability_per_ev, 0.0) kwargs.setdefault('interpolation', 'histogram') return openmc.stats.Tabular(self.values, probability_per_ev, **kwargs)
@property def lethargy_bin_width(self): """Calculates the base 10 log width of energy bins which is useful when plotting the normalized flux. Returns ------- numpy.array Array of bin widths """ return np.log10(self.bins[:, 1]/self.bins[:, 0])
[docs] @classmethod def from_group_structure(cls, group_structure): """Construct an EnergyFilter instance from a standard group structure. .. versionadded:: 0.13.1 Parameters ---------- group_structure : str Name of the group structure. Must be a valid key of openmc.mgxs.GROUP_STRUCTURES dictionary. """ cv.check_value('group_structure', group_structure, openmc.mgxs.GROUP_STRUCTURES.keys()) return cls(openmc.mgxs.GROUP_STRUCTURES[group_structure.upper()])
[docs]class EnergyoutFilter(EnergyFilter): """Bins tally events based on outgoing particle energy. Parameters ---------- values : Iterable of Real A list of values for which each successive pair constitutes a range of energies in [eV] for a single bin filter_id : int Unique identifier for the filter Attributes ---------- values : numpy.ndarray An array of values for which each successive pair constitutes a range of energies in [eV] for a single bin id : int Unique identifier for the filter bins : numpy.ndarray An array of shape (N, 2) where each row is a pair of energies in [eV] for a single filter bin num_bins : int The number of filter bins """
[docs]class TimeFilter(RealFilter): """Bins tally events based on the particle's time. .. versionadded:: 0.13.0 Parameters ---------- values : iterable of float A list of values for which each successive pair constitutes a range of time in [s] for a single bin filter_id : int Unique identifier for the filter Attributes ---------- values : numpy.ndarray An array of values for which each successive pair constitutes a range of time in [s] for a single bin id : int Unique identifier for the filter bins : numpy.ndarray An array of shape (N, 2) where each row is a pair of time in [s] for a single filter bin num_bins : int The number of filter bins """ units = 's'
[docs] def get_bin_index(self, filter_bin): # Use lower energy bound to find index for RealFilters deltas = np.abs(self.bins[:, 1] - filter_bin[1]) / filter_bin[1] min_delta = np.min(deltas) if min_delta < 1e-3: return deltas.argmin() else: msg = ('Unable to get the bin index for Filter since ' f'"{filter_bin}" is not one of the bins') raise ValueError(msg)
[docs] def check_bins(self, bins): super().check_bins(bins) for v0, v1 in bins: cv.check_greater_than('filter value', v0, 0., equality=True) cv.check_greater_than('filter value', v1, 0., equality=True)
def _path_to_levels(path): """Convert distribcell path to list of levels Parameters ---------- path : str Distribcell path Returns ------- list List of levels in path """ # Split path into universes/cells/lattices path_items = path.split('->') # Pair together universe and cell information from the same level idx = [i for i, item in enumerate(path_items) if item.startswith('u')] for i in reversed(idx): univ_id = int(path_items.pop(i)[1:]) cell_id = int(path_items.pop(i)[1:]) path_items.insert(i, ('universe', univ_id, cell_id)) # Reformat lattice into tuple idx = [i for i, item in enumerate(path_items) if isinstance(item, str)] for i in idx: item = path_items.pop(i)[1:-1] lat_id, lat_xyz = item.split('(') lat_id = int(lat_id) lat_xyz = tuple(int(x) for x in lat_xyz.split(',')) path_items.insert(i, ('lattice', lat_id, lat_xyz)) return path_items
[docs]class DistribcellFilter(Filter): """Bins tally event locations on instances of repeated cells. This filter provides a separate score for each unique instance of a repeated cell in a geometry. Note that only one cell can be specified in this filter. The related :class:`CellInstanceFilter` allows one to obtain scores for particular cell instances as well as instances from different cells. Parameters ---------- cell : openmc.Cell or Integral The distributed cell to tally. Either an openmc.Cell or an Integral cell ID number can be used. filter_id : int Unique identifier for the filter Attributes ---------- bins : Iterable of Integral An iterable with one element---the ID of the distributed Cell. id : int Unique identifier for the filter num_bins : int The number of filter bins paths : list of str The paths traversed through the CSG tree to reach each distribcell instance (for 'distribcell' filters only) See Also -------- CellInstanceFilter """ def __init__(self, cell, filter_id=None): self._paths = None super().__init__(cell, filter_id)
[docs] @classmethod def from_hdf5(cls, group, **kwargs): if group['type'][()].decode() != cls.short_name.lower(): raise ValueError("Expected HDF5 data for filter type '" + cls.short_name.lower() + "' but got '" + group['type'][()].decode() + " instead") filter_id = int(group.name.split('/')[-1].lstrip('filter ')) out = cls(group['bins'][()], filter_id=filter_id) out._num_bins = group['n_bins'][()] return out
@property def num_bins(self): # Need to handle number of bins carefully -- for distribcell tallies, we # need to know how many instances of the cell there are return self._num_bins @property def paths(self): return self._paths @paths.setter def paths(self, paths): cv.check_iterable_type('paths', paths, str) self._paths = paths @Filter.bins.setter def bins(self, bins): # Format the bins as a 1D numpy array. bins = np.atleast_1d(bins) # Make sure there is only 1 bin. if not len(bins) == 1: msg = (f'Unable to add bins "{bins}" to a DistribcellFilter since ' 'only a single distribcell can be used per tally') raise ValueError(msg) # Check the type and extract the id, if necessary. cv.check_type('distribcell bin', bins[0], (Integral, openmc.Cell)) if isinstance(bins[0], openmc.Cell): bins = np.atleast_1d(bins[0].id) self._bins = bins
[docs] def can_merge(self, other): # Distribcell filters cannot have more than one bin return False
[docs] def get_bin_index(self, filter_bin): # Filter bins for distribcells are indices of each unique placement of # the Cell in the Geometry (consecutive integers starting at 0). return filter_bin
[docs] def get_pandas_dataframe(self, data_size, stride, **kwargs): """Builds a Pandas DataFrame for the Filter's bins. This method constructs a Pandas DataFrame object for the filter with columns annotated by filter bin information. This is a helper method for :meth:`Tally.get_pandas_dataframe`. Parameters ---------- data_size : int The total number of bins in the tally corresponding to this filter stride : int Stride in memory for the filter Keyword arguments ----------------- paths : bool If True (default), expand distribcell indices into multi-index columns describing the path to that distribcell through the CSG tree. NOTE: This option assumes that all distribcell paths are of the same length and do not have the same universes and cells but different lattice cell indices. Returns ------- pandas.DataFrame A Pandas DataFrame with columns describing distributed cells. The dataframe will have either: 1. a single column with the cell instance IDs (without summary info) 2. separate columns for the cell IDs, universe IDs, and lattice IDs and x,y,z cell indices corresponding to each (distribcell paths). The number of rows in the DataFrame is the same as the total number of bins in the corresponding tally, with the filter bin appropriately tiled to map to the corresponding tally bins. See also -------- Tally.get_pandas_dataframe(), CrossFilter.get_pandas_dataframe() """ # Initialize Pandas DataFrame df = pd.DataFrame() level_df = None paths = kwargs.setdefault('paths', True) # Create Pandas Multi-index columns for each level in CSG tree if paths: # Distribcell paths require linked metadata from the Summary if self.paths is None: msg = 'Unable to construct distribcell paths since ' \ 'the Summary is not linked to the StatePoint' raise ValueError(msg) # Make copy of array of distribcell paths to use in # Pandas Multi-index column construction num_offsets = len(self.paths) paths = [_path_to_levels(p) for p in self.paths] # Loop over CSG levels in the distribcell paths num_levels = len(paths[0]) for i_level in range(num_levels): # Use level key as first index in Pandas Multi-index column level_key = f'level {i_level + 1}' # Create a dictionary for this level for Pandas Multi-index level_dict = {} # Use the first distribcell path to determine if level # is a universe/cell or lattice level path = paths[0] if path[i_level][0] == 'lattice': # Initialize prefix Multi-index keys lat_id_key = (level_key, 'lat', 'id') lat_x_key = (level_key, 'lat', 'x') lat_y_key = (level_key, 'lat', 'y') lat_z_key = (level_key, 'lat', 'z') # Allocate NumPy arrays for each CSG level and # each Multi-index column in the DataFrame level_dict[lat_id_key] = np.empty(num_offsets) level_dict[lat_x_key] = np.empty(num_offsets) level_dict[lat_y_key] = np.empty(num_offsets) if len(path[i_level][2]) == 3: level_dict[lat_z_key] = np.empty(num_offsets) else: # Initialize prefix Multi-index keys univ_key = (level_key, 'univ', 'id') cell_key = (level_key, 'cell', 'id') # Allocate NumPy arrays for each CSG level and # each Multi-index column in the DataFrame level_dict[univ_key] = np.empty(num_offsets) level_dict[cell_key] = np.empty(num_offsets) # Populate Multi-index arrays with all distribcell paths for i, path in enumerate(paths): level = path[i_level] if level[0] == 'lattice': # Assign entry to Lattice Multi-index column level_dict[lat_id_key][i] = level[1] level_dict[lat_x_key][i] = level[2][0] level_dict[lat_y_key][i] = level[2][1] if len(level[2]) == 3: level_dict[lat_z_key][i] = level[2][2] else: # Assign entry to Universe, Cell Multi-index columns level_dict[univ_key][i] = level[1] level_dict[cell_key][i] = level[2] # Tile the Multi-index columns for level_key, level_bins in level_dict.items(): level_dict[level_key] = _repeat_and_tile( level_bins, stride, data_size) # Initialize a Pandas DataFrame from the level dictionary if level_df is None: level_df = pd.DataFrame(level_dict) else: level_df = pd.concat([level_df, pd.DataFrame(level_dict)], axis=1) # Create DataFrame column for distribcell instance IDs # NOTE: This is performed regardless of whether the user # requests Summary geometric information filter_bins = _repeat_and_tile( np.arange(self.num_bins), stride, data_size) df = pd.DataFrame({self.short_name.lower() : filter_bins}) # Concatenate with DataFrame of distribcell instance IDs if level_df is not None: level_df = level_df.dropna(axis=1, how='all') level_df = level_df.astype(int) df = pd.concat([level_df, df], axis=1) return df
[docs]class MuFilter(RealFilter): """Bins tally events based on particle scattering angle. Parameters ---------- values : int or Iterable of Real A grid of scattering angles which events will binned into. Values represent the cosine of the scattering angle. If an iterable is given, the values will be used explicitly as grid points. If a single int is given, the range [-1, 1] will be divided up equally into that number of bins. filter_id : int Unique identifier for the filter Attributes ---------- values : numpy.ndarray An array of values for which each successive pair constitutes a range of scattering angle cosines for a single bin id : int Unique identifier for the filter bins : numpy.ndarray An array of shape (N, 2) where each row is a pair of scattering angle cosines for a single filter bin num_bins : Integral The number of filter bins """ def __init__(self, values, filter_id=None): if isinstance(values, Integral): values = np.linspace(-1., 1., values + 1) super().__init__(values, filter_id)
[docs] def check_bins(self, bins): super().check_bins(bins) for x in np.ravel(bins): if not np.isclose(x, -1.): cv.check_greater_than('filter value', x, -1., equality=True) if not np.isclose(x, 1.): cv.check_less_than('filter value', x, 1., equality=True)
[docs]class PolarFilter(RealFilter): """Bins tally events based on the incident particle's direction. Parameters ---------- values : int or Iterable of Real A grid of polar angles which events will binned into. Values represent an angle in radians relative to the z-axis. If an iterable is given, the values will be used explicitly as grid points. If a single int is given, the range [0, pi] will be divided up equally into that number of bins. filter_id : int Unique identifier for the filter Attributes ---------- values : numpy.ndarray An array of values for which each successive pair constitutes a range of polar angles in [rad] for a single bin id : int Unique identifier for the filter bins : numpy.ndarray An array of shape (N, 2) where each row is a pair of polar angles for a single filter bin id : int Unique identifier for the filter num_bins : Integral The number of filter bins """ units = 'rad' def __init__(self, values, filter_id=None): if isinstance(values, Integral): values = np.linspace(0., np.pi, values + 1) super().__init__(values, filter_id)
[docs] def check_bins(self, bins): super().check_bins(bins) for x in np.ravel(bins): if not np.isclose(x, 0.): cv.check_greater_than('filter value', x, 0., equality=True) if not np.isclose(x, np.pi): cv.check_less_than('filter value', x, np.pi, equality=True)
[docs]class AzimuthalFilter(RealFilter): """Bins tally events based on the incident particle's direction. Parameters ---------- values : int or Iterable of Real A grid of azimuthal angles which events will binned into. Values represent an angle in radians relative to the x-axis and perpendicular to the z-axis. If an iterable is given, the values will be used explicitly as grid points. If a single int is given, the range [-pi, pi) will be divided up equally into that number of bins. filter_id : int Unique identifier for the filter Attributes ---------- values : numpy.ndarray An array of values for which each successive pair constitutes a range of azimuthal angles in [rad] for a single bin id : int Unique identifier for the filter bins : numpy.ndarray An array of shape (N, 2) where each row is a pair of azimuthal angles for a single filter bin num_bins : Integral The number of filter bins """ units = 'rad' def __init__(self, values, filter_id=None): if isinstance(values, Integral): values = np.linspace(-np.pi, np.pi, values + 1) super().__init__(values, filter_id)
[docs] def check_bins(self, bins): super().check_bins(bins) for x in np.ravel(bins): if not np.isclose(x, -np.pi): cv.check_greater_than('filter value', x, -np.pi, equality=True) if not np.isclose(x, np.pi): cv.check_less_than('filter value', x, np.pi, equality=True)
[docs]class DelayedGroupFilter(Filter): """Bins fission events based on the produced neutron precursor groups. Parameters ---------- bins : iterable of int The delayed neutron precursor groups. For example, ENDF/B-VII.1 uses 6 precursor groups so a tally with all groups will have bins = [1, 2, 3, 4, 5, 6]. filter_id : int Unique identifier for the filter Attributes ---------- bins : iterable of int The delayed neutron precursor groups. For example, ENDF/B-VII.1 uses 6 precursor groups so a tally with all groups will have bins = [1, 2, 3, 4, 5, 6]. id : int Unique identifier for the filter num_bins : Integral The number of filter bins """
[docs] def check_bins(self, bins): # Check the bin values. for g in bins: cv.check_greater_than('delayed group', g, 0)
[docs]class EnergyFunctionFilter(Filter): """Multiplies tally scores by an arbitrary function of incident energy. The arbitrary function is described by a piecewise linear-linear interpolation of energy and y values. Values outside of the given energy range will be evaluated as zero. Parameters ---------- energy : Iterable of Real A grid of energy values in [eV] y : iterable of Real A grid of interpolant values in [eV] interpolation : str Interpolation scheme: {'histogram', 'linear-linear', 'linear-log', 'log-linear', 'log-log', 'quadratic', 'cubic'} filter_id : int Unique identifier for the filter Attributes ---------- energy : Iterable of Real A grid of energy values in [eV] y : iterable of Real A grid of interpolant values in [eV] interpolation : str Interpolation scheme: {'histogram', 'linear-linear', 'linear-log', 'log-linear', 'log-log', 'quadratic', 'cubic'} id : int Unique identifier for the filter num_bins : Integral The number of filter bins (always 1 for this filter) """ # keys selected to match those in function.py where possible # skip 6 b/c ENDF-6 reserves this value for # "special one-dimensional interpolation law" INTERPOLATION_SCHEMES = {1: 'histogram', 2: 'linear-linear', 3: 'linear-log', 4: 'log-linear', 5: 'log-log', 7: 'quadratic', 8: 'cubic'} def __init__(self, energy, y, interpolation='linear-linear', filter_id=None): self.energy = energy self.y = y self.id = filter_id self.interpolation = interpolation def __eq__(self, other): if type(self) is not type(other): return False elif not self.interpolation == other.interpolation: return False elif not all(self.energy == other.energy): return False else: return all(self.y == other.y) def __gt__(self, other): if type(self) is not type(other): if self.short_name in _FILTER_TYPES and \ other.short_name in _FILTER_TYPES: delta = _FILTER_TYPES.index(self.short_name) - \ _FILTER_TYPES.index(other.short_name) return delta > 0 else: return False else: return False def __lt__(self, other): if type(self) is not type(other): if self.short_name in _FILTER_TYPES and \ other.short_name in _FILTER_TYPES: delta = _FILTER_TYPES.index(self.short_name) - \ _FILTER_TYPES.index(other.short_name) return delta < 0 else: return False else: return False def __hash__(self): string = type(self).__name__ + '\n' string += '{: <16}=\t{}\n'.format('\tEnergy', self.energy) string += '{: <16}=\t{}\n'.format('\tInterpolant', self.y) string += '{: <16}=\t{}\n'.format('\tInterpolation', self.interpolation) return hash(string) def __repr__(self): string = type(self).__name__ + '\n' string += '{: <16}=\t{}\n'.format('\tEnergy', self.energy) string += '{: <16}=\t{}\n'.format('\tInterpolant', self.y) string += '{: <16}=\t{}\n'.format('\tInterpolation', self.interpolation) string += '{: <16}=\t{}\n'.format('\tID', self.id) return string
[docs] @classmethod def from_hdf5(cls, group, **kwargs): if group['type'][()].decode() != cls.short_name.lower(): raise ValueError("Expected HDF5 data for filter type '" + cls.short_name.lower() + "' but got '" + group['type'][()].decode() + " instead") energy = group['energy'][()] y_grp = group['y'] y = y_grp[()] filter_id = int(group.name.split('/')[-1].lstrip('filter ')) out = cls(energy, y, filter_id=filter_id) if 'interpolation' in y_grp.attrs: out.interpolation = \ cls.INTERPOLATION_SCHEMES[y_grp.attrs['interpolation'][()]] return out
[docs] @classmethod def from_tabulated1d(cls, tab1d): """Construct a filter from a Tabulated1D object. Parameters ---------- tab1d : openmc.data.Tabulated1D A linear-linear Tabulated1D object with only a single interpolation region. Returns ------- EnergyFunctionFilter """ cv.check_type('EnergyFunctionFilter tab1d', tab1d, openmc.data.Tabulated1D) if tab1d.n_regions > 1: raise ValueError('Only Tabulated1Ds with a single interpolation ' 'region are supported') interpolation_val = tab1d.interpolation[0] if interpolation_val not in cls.INTERPOLATION_SCHEMES.keys(): raise ValueError('Only histogram, linear-linear, linear-log, log-linear, and ' 'log-log Tabulated1Ds are supported') return cls(tab1d.x, tab1d.y, cls.INTERPOLATION_SCHEMES[interpolation_val])
@property def energy(self): return self._energy @energy.setter def energy(self, energy): # Format the bins as a 1D numpy array. energy = np.atleast_1d(energy) # Make sure the values are Real and positive. cv.check_type('filter energy grid', energy, Iterable, Real) for E in energy: cv.check_greater_than('filter energy grid', E, 0, equality=True) self._energy = energy @property def y(self): return self._y @y.setter def y(self, y): # Format the bins as a 1D numpy array. y = np.atleast_1d(y) # Make sure the values are Real. cv.check_type('filter interpolant values', y, Iterable, Real) self._y = y @property def interpolation(self): return self._interpolation @interpolation.setter def interpolation(self, val): cv.check_type('interpolation', val, str) cv.check_value('interpolation', val, self.INTERPOLATION_SCHEMES.values()) if val == 'quadratic' and len(self.energy) < 3: raise ValueError('Quadratic interpolation requires 3 or more values.') if val == 'cubic' and len(self.energy) < 4: raise ValueError('Cubic interpolation requires 3 or more values.') self._interpolation = val @property def bins(self): raise AttributeError('EnergyFunctionFilters have no bins.') @bins.setter def bins(self, bins): raise RuntimeError('EnergyFunctionFilters have no bins.') @property def num_bins(self): return 1
[docs] def to_xml_element(self): """Return XML Element representing the Filter. Returns ------- element : lxml.etree._Element XML element containing filter data """ element = ET.Element('filter') element.set('id', str(self.id)) element.set('type', self.short_name.lower()) subelement = ET.SubElement(element, 'energy') subelement.text = ' '.join(str(e) for e in self.energy) subelement = ET.SubElement(element, 'y') subelement.text = ' '.join(str(y) for y in self.y) subelement = ET.SubElement(element, 'interpolation') subelement.text = self.interpolation return element
[docs] @classmethod def from_xml_element(cls, elem, **kwargs): filter_id = int(elem.get('id')) energy = [float(x) for x in get_text(elem, 'energy').split()] y = [float(x) for x in get_text(elem, 'y').split()] out = cls(energy, y, filter_id=filter_id) if elem.find('interpolation') is not None: out.interpolation = elem.find('interpolation').text return out
[docs] def can_merge(self, other): return False
[docs] def is_subset(self, other): return self == other
[docs] def get_bin_index(self, filter_bin): # This filter only has one bin. Always return 0. return 0
[docs] def get_pandas_dataframe(self, data_size, stride, **kwargs): """Builds a Pandas DataFrame for the Filter's bins. This method constructs a Pandas DataFrame object for the filter with columns annotated by filter bin information. This is a helper method for :meth:`Tally.get_pandas_dataframe`. Parameters ---------- data_size : int The total number of bins in the tally corresponding to this filter stride : int Stride in memory for the filter Returns ------- pandas.DataFrame A Pandas DataFrame with a column that is filled with a hash of this filter. EnergyFunctionFilters have only 1 bin so the purpose of this DataFrame column is to differentiate the filter from other EnergyFunctionFilters. The number of rows in the DataFrame is the same as the total number of bins in the corresponding tally. See also -------- Tally.get_pandas_dataframe(), CrossFilter.get_pandas_dataframe() """ df = pd.DataFrame() # There is no clean way of sticking all the energy, y data into a # DataFrame so instead we'll just make a column with the filter name # and fill it with a hash of the __repr__. We want a hash that is # reproducible after restarting the interpreter so we'll use hashlib.md5 # rather than the intrinsic hash(). hash_fun = hashlib.md5() hash_fun.update(repr(self).encode('utf-8')) out = hash_fun.hexdigest() # The full 16 bytes make for a really wide column. Just 7 bytes (14 # hex characters) of the digest are probably sufficient. out = out[:14] filter_bins = _repeat_and_tile(out, stride, data_size) df = pd.concat([df, pd.DataFrame( {self.short_name.lower(): filter_bins})]) return df