from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Iterable, Sequence
from functools import cache
from math import sqrt, pi, exp, log
from numbers import Real
from pathlib import Path
from warnings import warn
import lxml.etree as ET
import numpy as np
from scipy.integrate import trapezoid
from scipy.special import exprel, hyp1f1, lambertw
import scipy
import openmc.checkvalue as cv
from openmc.data import atomic_mass, NEUTRON_MASS
import openmc.data
from .._xml import get_elem_list, get_text
from ..mixin import EqualityMixin
_INTERPOLATION_SCHEMES = {
'histogram',
'linear-linear',
'linear-log',
'log-linear',
'log-log'
}
def exprel2(x):
"""Evaluate 2*(exp(x)-1-x)/x^2 without loss of precision near 0"""
return hyp1f1(1, 3, x)
def log1prel(x):
"""Evaluate log(1+x)/x without loss of precision near 0"""
return np.where(np.abs(x) < 1e-16, 1.0, np.log1p(x) / x)
[docs]
class Univariate(EqualityMixin, ABC):
"""Probability distribution of a single random variable.
The Univariate class is an abstract class that can be derived to implement a
specific probability distribution.
Parameters
----------
bias : Iterable of float, optional
Distribution or discrete probabilities for biased sampling or discrete
probabilities for biased sampling.
"""
def __init__(self, bias: Univariate | Sequence[float] | None = None):
self.bias = bias
@property
def bias(self):
return self._bias
@bias.setter
def bias(self, bias):
check_bias_support(self, bias)
self._bias = bias
def _append_bias_to_xml(self, element: ET.Element) -> None:
"""Append bias distribution element to XML if present."""
if self.bias is not None:
if self.bias.bias is not None:
raise RuntimeError('Biasing distributions should not have their own bias.')
bias_elem = self.bias.to_xml_element("bias")
element.append(bias_elem)
@classmethod
def _read_bias_from_xml(cls, elem: ET.Element):
"""Read bias distribution from XML element if present."""
bias_elem = elem.find('bias')
if bias_elem is not None:
return Univariate.from_xml_element(bias_elem)
return None
def _append_array_bias_to_xml(self, element: ET.Element) -> None:
"""Append array-based bias probabilities to XML."""
if self.bias is not None:
bias_elem = ET.SubElement(element, "bias")
bias_elem.text = ' '.join(map(str, self.bias))
@classmethod
def _read_array_bias_from_xml(cls, elem: ET.Element):
"""Read array-based bias probabilities from XML."""
bias_elem = elem.find('bias')
if bias_elem is not None:
return get_elem_list(elem, "bias", float)
return None
@abstractmethod
def to_xml_element(self, element_name):
return ''
@abstractmethod
def __len__(self):
return 0
@classmethod
@abstractmethod
def from_xml_element(cls, elem):
distribution = get_text(elem, 'type')
if distribution == 'discrete':
return Discrete.from_xml_element(elem)
elif distribution == 'uniform':
return Uniform.from_xml_element(elem)
elif distribution == 'powerlaw':
return PowerLaw.from_xml_element(elem)
elif distribution == 'maxwell':
return Maxwell.from_xml_element(elem)
elif distribution == 'watt':
return Watt.from_xml_element(elem)
elif distribution == 'normal':
return Normal.from_xml_element(elem)
elif distribution == 'muir':
# Support older files where Muir had its own class
return muir(*get_elem_list(elem, "parameters", float))
elif distribution == 'tabular':
return Tabular.from_xml_element(elem)
elif distribution == 'legendre':
return Legendre.from_xml_element(elem)
elif distribution == 'mixture':
return Mixture.from_xml_element(elem)
elif distribution == 'decay_spectrum':
return DecaySpectrum.from_xml_element(elem)
@abstractmethod
def _sample_unbiased(self, n_samples: int = 1, seed: int | None = None):
"""Sample without bias handling.
Parameters
----------
n_samples : int
Number of sampled values to generate
seed : int or None
Initial random number seed.
Returns
-------
numpy.ndarray
The array of sampled values
"""
pass
[docs]
def sample(self, n_samples: int = 1, seed: int | None = None):
"""Sample the univariate distribution, handling biasing automatically.
Parameters
----------
n_samples : int
Number of sampled values to generate
seed : int or None
Initial random number seed.
Returns
-------
tuple of numpy.ndarray
A tuple of (samples, weights)
"""
if self.bias is None:
x = self._sample_unbiased(n_samples, seed)
return x, np.ones_like(x)
else:
if self.bias.bias is not None:
raise RuntimeError('Biasing distributions should not have their own bias.')
x, _ = self.bias.sample(n_samples=n_samples, seed=seed)
weight = self.evaluate(x) / self.bias.evaluate(x)
return x, weight
[docs]
def integral(self):
"""Return integral of distribution
.. versionadded:: 0.13.1
Returns
-------
float
Integral of distribution
"""
return 1.0
[docs]
@abstractmethod
def evaluate(self, x: float | Sequence[float]):
"""Evaluate the probability density at the provided value.
Parameters
----------
x : float or sequence of float
Location to evaluate p(x)
Returns
-------
float or numpy.ndarray
Value of p(x)
"""
pass
@property
@abstractmethod
def support(self):
"""Return the support of the probability distribution.
Returns
-------
set or tuple of float or dict
Returns the set of unique points assigned probability mass in a
discrete distribution, the sampling interval for a continuous
distribution, or a dictionary storing the discrete and continuous
parts of the support of a mixed random variable
"""
pass
def _intensity_clip(intensity: Sequence[float], tolerance: float = 1e-6) -> np.ndarray:
"""Clip low-importance points from an array of intensities.
Given an array of intensities, this function returns an array of indices for
points that contribute non-negligibly to the total sum of intensities.
Parameters
----------
intensity : sequence of float
Intensities in arbitrary units.
tolerance : float
Maximum fraction of intensities that will be discarded.
Returns
-------
Array of indices
"""
# Get indices of intensities from largest to smallest
index_sort = np.argsort(intensity)[::-1]
# Get intensities from largest to smallest
sorted_intensity = np.asarray(intensity)[index_sort]
# Determine cumulative sum of probabilities
cumsum = np.cumsum(sorted_intensity)
cumsum /= cumsum[-1]
# Find index that satisfies cutoff
index_cutoff = np.searchsorted(cumsum, 1.0 - tolerance)
# Now get indices up to cutoff
new_indices = index_sort[:index_cutoff + 1]
# Put back in the order of the original array and return
new_indices.sort()
return new_indices
[docs]
class Discrete(Univariate):
"""Distribution characterized by a probability mass function.
The Discrete distribution assigns probability values to discrete values of a
random variable, rather than expressing the distribution as a continuous
random variable.
Parameters
----------
x : Iterable of float
Values of the random variable
p : Iterable of float
Discrete probability for each value
bias : Iterable of float, optional
Alternative discrete probabilities for biased sampling. Defaults to
None for unbiased sampling.
Attributes
----------
x : numpy.ndarray
Values of the random variable
p : numpy.ndarray
Discrete probability for each value
support : set
Values of the random variable over which the distribution is
nonzero-valued
bias : numpy.ndarray or None
Discrete probabilities for biased sampling
"""
def __init__(self, x, p, bias=None):
self.x = x
self.p = p
super().__init__(bias)
def __len__(self):
return len(self.x)
@property
def x(self):
return self._x
@x.setter
def x(self, x):
if isinstance(x, Real):
x = [x]
cv.check_type('discrete values', x, Iterable, Real)
self._x = np.array(x, dtype=float)
@property
def p(self):
return self._p
@p.setter
def p(self, p):
if isinstance(p, Real):
p = [p]
cv.check_type('discrete probabilities', p, Iterable, Real)
for pk in p:
cv.check_greater_than('discrete probability', pk, 0.0, True)
self._p = np.array(p, dtype=float)
@property
def support(self):
return set(np.unique(self._x))
@Univariate.bias.setter
def bias(self, bias):
if bias is None:
self._bias = bias
else:
if isinstance(bias, Real):
bias = [bias]
cv.check_type('discrete bias probabilities', bias, Iterable, Real)
for bk in bias:
cv.check_greater_than('discrete probability', bk, 0.0, True)
if len(bias) != len(self.x):
raise RuntimeError("Discrete distribution has unequal number of "
"biased and unbiased probability entries.")
self._bias = np.array(bias, dtype=float)
def cdf(self):
return np.insert(np.cumsum(self.p), 0, 0.0)
[docs]
def sample(self, n_samples=1, seed=None):
if self.bias is None:
samples = self._sample_unbiased(n_samples, seed)
return samples, np.ones_like(samples)
else:
rng = np.random.RandomState(seed)
p = self.p / self.p.sum()
b = self.bias / self.bias.sum()
indices = rng.choice(self.x.size, n_samples, p=b)
biased_sample = self.x[indices]
wgt = p[indices] / b[indices]
return biased_sample, wgt
def _sample_unbiased(self, n_samples=1, seed=None):
rng = np.random.RandomState(seed)
p = self.p / self.p.sum()
return rng.choice(self.x, n_samples, p=p)
[docs]
def normalize(self):
"""Normalize the probabilities stored on the distribution"""
norm = sum(self.p)
self.p = [val / norm for val in self.p]
[docs]
def evaluate(self, x):
raise NotImplementedError
[docs]
def to_xml_element(self, element_name):
"""Return XML representation of the discrete distribution
Parameters
----------
element_name : str
XML element name
Returns
-------
element : lxml.etree._Element
XML element containing discrete distribution data
"""
element = ET.Element(element_name)
element.set("type", "discrete")
params = ET.SubElement(element, "parameters")
params.text = ' '.join(map(str, self.x)) + ' ' + ' '.join(map(str, self.p))
self._append_array_bias_to_xml(element)
return element
[docs]
@classmethod
def from_xml_element(cls, elem: ET.Element):
"""Generate discrete distribution from an XML element
Parameters
----------
elem : lxml.etree._Element
XML element
Returns
-------
openmc.stats.Discrete
Discrete distribution generated from XML element
"""
params = get_elem_list(elem, "parameters", float)
x = params[:len(params)//2]
p = params[len(params)//2:]
bias_dist = cls._read_array_bias_from_xml(elem)
return cls(x, p, bias=bias_dist)
[docs]
@classmethod
def merge(
cls,
dists: Sequence[Discrete],
probs: Sequence[float]
):
"""Merge multiple discrete distributions into a single distribution
.. versionadded:: 0.13.1
Parameters
----------
dists : iterable of openmc.stats.Discrete
Discrete distributions to combine
probs : iterable of float
Probability of each distribution
Returns
-------
openmc.stats.Discrete
Combined discrete distribution
"""
if len(dists) != len(probs):
raise ValueError("Number of distributions and probabilities must match.")
biasing = False
for d in dists:
if d.bias is not None:
# If we find that at least one distribution is biased, all
# distributions which are not biased will be assigned their
# default probability vector as a "bias" so that biased
# sampling can occur on the merged distribution.
biasing = True
break
# Combine distributions accounting for duplicate x values
x_merged = set()
p_merged = defaultdict(float)
new_bias = None
if biasing:
b_merged = defaultdict(float)
# Generate any missing bias distributions
dists = dists.copy()
for i, d in enumerate(dists):
if d.bias is None:
dists[i] = Discrete(d.x, d.p, bias=d.p)
for dist, p_dist in zip(dists, probs):
for x, p, b in zip(dist.x, dist.p, dist.bias):
x_merged.add(x)
p_merged[x] += p*p_dist
b_merged[x] += b*p_dist
# Create values and bias probabilities as arrays
x_arr = np.array(sorted(x_merged))
new_bias = np.array([b_merged[x] for x in x_arr])
else:
for dist, p_dist in zip(dists, probs):
for x, p in zip(dist.x, dist.p):
x_merged.add(x)
p_merged[x] += p*p_dist
# Create values as array
x_arr = np.array(sorted(x_merged))
# Create probabilities as array
p_arr = np.array([p_merged[x] for x in x_arr])
return cls(x_arr, p_arr, new_bias)
[docs]
def integral(self):
"""Return integral of distribution
.. versionadded:: 0.13.1
Returns
-------
float
Integral of discrete distribution
"""
return np.sum(self.p)
[docs]
def mean(self) -> float:
"""Return mean of the discrete distribution
The mean is the weighted average of the discrete values.
.. versionadded:: 0.15.3
Returns
-------
float
Mean of discrete distribution
"""
return np.sum(self.x * self.p) / np.sum(self.p)
[docs]
def clip(self, tolerance: float = 1e-6, inplace: bool = False) -> Discrete:
r"""Remove low-importance points from discrete distribution.
Given a probability mass function :math:`p(x)` with :math:`\{x_1, x_2,
x_3, \dots\}` the possible values of the random variable with
corresponding probabilities :math:`\{p_1, p_2, p_3, \dots\}`, this
function will remove any low-importance points such that :math:`\sum_i
x_i p_i` is preserved to within some threshold.
For biased distributions, clipping should be performed before the bias
probabilities are added.
.. versionadded:: 0.14.0
Parameters
----------
tolerance : float
Maximum fraction of :math:`\sum_i x_i p_i` that will be discarded.
inplace : bool
Whether to modify the current object in-place or return a new one.
Returns
-------
Discrete distribution with low-importance points removed
"""
if self.bias is not None:
raise RuntimeError("Biased discrete distributions should be clipped "
"before applying bias.")
cv.check_less_than("tolerance", tolerance, 1.0, equality=True)
cv.check_greater_than("tolerance", tolerance, 0.0, equality=True)
# Compute intensities
intensity = self.p * self.x
# Get indices for intensities above threshold
indices = _intensity_clip(intensity, tolerance=tolerance)
# Create new discrete distribution
if inplace:
self.x = self.x[indices]
self.p = self.p[indices]
return self
else:
new_x = self.x[indices]
new_p = self.p[indices]
return type(self)(new_x, new_p)
[docs]
def delta_function(value: float, intensity: float = 1.0) -> Discrete:
"""Return a discrete distribution with a single point.
.. versionadded:: 0.15.1
Parameters
----------
value : float
Value of the random variable.
intensity : float, optional
When used for an energy distribution, this can be used to assign an
intensity.
Returns
-------
Discrete distribution with a single point
"""
return Discrete([value], [intensity])
[docs]
class PowerLaw(Univariate):
"""Distribution with power law probability over a finite interval [a,b]
The power law distribution has density function :math:`p(x) dx = c x^n dx`.
.. versionadded:: 0.13.0
Parameters
----------
a : float, optional
Lower bound of the sampling interval. Defaults to zero.
b : float, optional
Upper bound of the sampling interval. Defaults to unity.
n : float, optional
Power law exponent. Defaults to zero, which is equivalent to a uniform
distribution.
bias : openmc.stats.Univariate, optional
Distribution for biased sampling.
Attributes
----------
a : float
Lower bound of the sampling interval
b : float
Upper bound of the sampling interval
n : float
Power law exponent
support : tuple of float
A 2-tuple (lower, upper) defining the interval over which the
distribution is nonzero-valued
bias : openmc.stats.Univariate or None
Distribution for biased sampling
"""
def __init__(self, a: float = 0.0, b: float = 1.0, n: float = 0.,
bias: Univariate | None = None):
if a >= b:
raise ValueError(
"Lower bound of sampling interval must be less than upper bound.")
self.a = a
self.b = b
self.n = n
super().__init__(bias)
def __len__(self):
return 3
@property
def a(self):
return self._a
@a.setter
def a(self, a):
cv.check_type('interval lower bound', a, Real)
if a < 0:
raise ValueError(
"PowerLaw sampling is restricted to positive-valued intervals.")
self._a = a
@property
def b(self):
return self._b
@b.setter
def b(self, b):
cv.check_type('interval upper bound', b, Real)
if b < 0:
raise ValueError(
"PowerLaw sampling is restricted to positive-valued intervals.")
self._b = b
@property
def n(self):
return self._n
@n.setter
def n(self, n):
cv.check_type('power law exponent', n, Real)
self._n = n
@property
def support(self):
return (self._a, self._b)
def _sample_unbiased(self, n_samples=1, seed=None):
rng = np.random.RandomState(seed)
xi = rng.random(n_samples)
pwr = self.n + 1
offset = self.a**pwr
span = self.b**pwr - offset
return np.power(offset + xi * span, 1/pwr)
[docs]
def evaluate(self, x):
c = (self.n + 1)/(self.b**(self.n + 1) - self.a**(self.n + 1))
return np.where((self.a <= x) & (x <= self.b), c * np.abs(x)**self.n, 0.0)
[docs]
def to_xml_element(self, element_name: str):
"""Return XML representation of the power law distribution
Parameters
----------
element_name : str
XML element name
Returns
-------
element : lxml.etree._Element
XML element containing distribution data
"""
element = ET.Element(element_name)
element.set("type", "powerlaw")
element.set("parameters", f'{self.a} {self.b} {self.n}')
self._append_bias_to_xml(element)
return element
[docs]
@classmethod
def from_xml_element(cls, elem: ET.Element):
"""Generate power law distribution from an XML element
Parameters
----------
elem : lxml.etree._Element
XML element
Returns
-------
openmc.stats.PowerLaw
Distribution generated from XML element
"""
params = get_elem_list(elem, "parameters", float)
bias_dist = cls._read_bias_from_xml(elem)
return cls(*map(float, params), bias=bias_dist)
[docs]
class Maxwell(Univariate):
r"""Maxwellian distribution in energy.
The Maxwellian distribution in energy is characterized by a single parameter
:math:`\theta` and has a density function :math:`p(E) dE = c \sqrt{E}
e^{-E/\theta} dE`.
Parameters
----------
theta : float
Effective temperature for distribution in eV
bias : openmc.stats.Univariate, optional
Distribution for biased sampling.
Attributes
----------
theta : float
Effective temperature for distribution in eV
support : tuple of float
A 2-tuple (lower, upper) defining the interval over which the
distribution is nonzero-valued
bias : openmc.stats.Univariate or None
Distribution for biased sampling
"""
def __init__(self, theta, bias: Univariate | None = None):
self.theta = theta
super().__init__(bias)
def __len__(self):
return 1
@property
def theta(self):
return self._theta
@theta.setter
def theta(self, theta):
cv.check_type('Maxwell temperature', theta, Real)
cv.check_greater_than('Maxwell temperature', theta, 0.0)
self._theta = theta
@property
def support(self):
return (0.0, np.inf)
def _sample_unbiased(self, n_samples=1, seed=None):
rng = np.random.RandomState(seed)
return self.sample_maxwell(self.theta, n_samples, rng=rng)
@staticmethod
def sample_maxwell(t, n_samples: int, rng=None):
if rng is None:
rng = np.random.default_rng()
return rng.gamma(1.5, t, n_samples)
[docs]
def evaluate(self, E):
return scipy.stats.gamma.pdf(E, 1.5, scale=self.theta)
[docs]
def to_xml_element(self, element_name: str):
"""Return XML representation of the Maxwellian distribution
Parameters
----------
element_name : str
XML element name
Returns
-------
element : lxml.etree._Element
XML element containing Maxwellian distribution data
"""
element = ET.Element(element_name)
element.set("type", "maxwell")
element.set("parameters", str(self.theta))
self._append_bias_to_xml(element)
return element
[docs]
@classmethod
def from_xml_element(cls, elem: ET.Element):
"""Generate Maxwellian distribution from an XML element
Parameters
----------
elem : lxml.etree._Element
XML element
Returns
-------
openmc.stats.Maxwell
Maxwellian distribution generated from XML element
"""
theta = float(get_text(elem, 'parameters'))
bias_dist = cls._read_bias_from_xml(elem)
return cls(theta, bias=bias_dist)
[docs]
class Watt(Univariate):
r"""Watt fission energy spectrum.
The Watt fission energy spectrum is characterized by two parameters
:math:`a` and :math:`b` and has density function :math:`p(E) dE = c e^{-E/a}
\sinh \sqrt{b \, E} dE`.
Parameters
----------
a : float
First parameter of distribution in units of eV
b : float
Second parameter of distribution in units of 1/eV
bias : openmc.stats.Univariate, optional
Distribution for biased sampling.
Attributes
----------
a : float
First parameter of distribution in units of eV
b : float
Second parameter of distribution in units of 1/eV
support : tuple of float
A 2-tuple (lower, upper) defining the interval over which the
distribution is nonzero-valued
bias : openmc.stats.Univariate or None
Distribution for biased sampling
"""
def __init__(self, a=0.988e6, b=2.249e-6, bias: Univariate | None = None):
self.a = a
self.b = b
super().__init__(bias)
def __len__(self):
return 2
@property
def a(self):
return self._a
@a.setter
def a(self, a):
cv.check_type('Watt a', a, Real)
cv.check_greater_than('Watt a', a, 0.0)
self._a = a
@property
def b(self):
return self._b
@b.setter
def b(self, b):
cv.check_type('Watt b', b, Real)
cv.check_greater_than('Watt b', b, 0.0)
self._b = b
@property
def support(self):
return (0.0, np.inf)
def _sample_unbiased(self, n_samples=1, seed=None):
rng = np.random.RandomState(seed)
w = Maxwell.sample_maxwell(self.a, n_samples, rng=rng)
u = rng.uniform(-1., 1., n_samples)
aab = self.a * self.a * self.b
return w + 0.25*aab + u*np.sqrt(aab*w)
[docs]
def evaluate(self, E):
c = 2.0/(sqrt(pi * self.b) * (self.a**1.5) * exp(self.a*self.b/4))
return c*np.exp(-E/self.a)*np.sinh(np.sqrt(self.b*E))
[docs]
def to_xml_element(self, element_name: str):
"""Return XML representation of the Watt distribution
Parameters
----------
element_name : str
XML element name
Returns
-------
element : lxml.etree._Element
XML element containing Watt distribution data
"""
element = ET.Element(element_name)
element.set("type", "watt")
element.set("parameters", f'{self.a} {self.b}')
self._append_bias_to_xml(element)
return element
[docs]
@classmethod
def from_xml_element(cls, elem: ET.Element):
"""Generate Watt distribution from an XML element
Parameters
----------
elem : lxml.etree._Element
XML element
Returns
-------
openmc.stats.Watt
Watt distribution generated from XML element
"""
params = get_elem_list(elem, "parameters", float)
bias_dist = cls._read_bias_from_xml(elem)
return cls(*map(float, params), bias=bias_dist)
[docs]
class Normal(Univariate):
r"""Normally distributed sampling with optional truncation.
The normal distribution is characterized by parameters :math:`\mu` and
:math:`\sigma` and has density function :math:`p(X) = 1/(\sqrt{2\pi}\sigma)
e^{-(X-\mu)^2/(2\sigma^2)}`. When truncated to the interval [lower, upper],
the distribution is renormalized so that the PDF integrates to 1 over the
truncation interval.
.. versionchanged:: 0.15.4
Added optional truncation bounds via `lower` and `upper` parameters.
Parameters
----------
mean_value : float
Mean value of the distribution
std_dev : float
Standard deviation of the Normal distribution
lower : float, optional
Lower truncation bound. Defaults to -infinity (no lower bound).
upper : float, optional
Upper truncation bound. Defaults to +infinity (no upper bound).
bias : openmc.stats.Univariate, optional
Distribution for biased sampling.
Attributes
----------
mean_value : float
Mean of the Normal distribution
std_dev : float
Standard deviation of the Normal distribution
lower : float
Lower truncation bound
upper : float
Upper truncation bound
support : tuple of float
A 2-tuple (lower, upper) defining the interval over which the
distribution is nonzero-valued
bias : openmc.stats.Univariate or None
Distribution for biased sampling
"""
def __init__(self, mean_value, std_dev, lower=-np.inf, upper=np.inf,
bias: Univariate | None = None):
self.mean_value = mean_value
self.std_dev = std_dev
self.lower = lower
self.upper = upper
self._compute_normalization()
super().__init__(bias)
def __len__(self):
if self._is_truncated:
return 4
return 2
@property
def mean_value(self):
return self._mean_value
@mean_value.setter
def mean_value(self, mean_value):
cv.check_type('Normal mean_value', mean_value, Real)
self._mean_value = mean_value
@property
def std_dev(self):
return self._std_dev
@std_dev.setter
def std_dev(self, std_dev):
cv.check_type('Normal std_dev', std_dev, Real)
cv.check_greater_than('Normal std_dev', std_dev, 0.0)
self._std_dev = std_dev
@property
def lower(self):
return self._lower
@lower.setter
def lower(self, lower):
cv.check_type('Normal lower bound', lower, Real)
self._lower = lower
@property
def upper(self):
return self._upper
@upper.setter
def upper(self, upper):
cv.check_type('Normal upper bound', upper, Real)
self._upper = upper
def _compute_normalization(self):
"""Compute normalization factor for truncated distribution."""
# Check if truncation bounds are finite
self._is_truncated = (self._lower > -np.inf or self._upper < np.inf)
if self._lower >= self._upper:
raise ValueError("Normal distribution lower bound must be less "
"than upper bound.")
if self._is_truncated:
alpha = (self._lower - self._mean_value) / self._std_dev
beta = (self._upper - self._mean_value) / self._std_dev
cdf_diff = scipy.stats.norm.cdf(beta) - scipy.stats.norm.cdf(alpha)
if cdf_diff <= 0:
raise ValueError("Truncation bounds exclude entire distribution")
self._norm_factor = 1.0 / cdf_diff
else:
self._norm_factor = 1.0
@property
def support(self):
return (self._lower, self._upper)
def _sample_unbiased(self, n_samples=1, seed=None):
rng = np.random.RandomState(seed)
if not self._is_truncated:
return rng.normal(self.mean_value, self.std_dev, n_samples)
else:
# Use scipy's truncated normal for efficient direct sampling
a = (self._lower - self._mean_value) / self._std_dev
b = (self._upper - self._mean_value) / self._std_dev
return scipy.stats.truncnorm.rvs(
a, b, loc=self._mean_value, scale=self._std_dev,
size=n_samples, random_state=rng
)
[docs]
def evaluate(self, x):
"""Evaluate PDF at x, returning normalized value for truncated dist."""
x = np.asarray(x)
f = scipy.stats.norm.pdf(x, self.mean_value, self.std_dev)
if self._is_truncated:
# PDF is zero outside bounds
in_bounds = (x >= self._lower) & (x <= self._upper)
f = np.where(in_bounds, f * self._norm_factor, 0.0)
return f
[docs]
def to_xml_element(self, element_name: str):
"""Return XML representation of the Normal distribution
Parameters
----------
element_name : str
XML element name
Returns
-------
element : lxml.etree._Element
XML element containing Normal distribution data
"""
element = ET.Element(element_name)
element.set("type", "normal")
if self._is_truncated:
element.set("parameters",
f'{self.mean_value} {self.std_dev} {self.lower} {self.upper}')
else:
element.set("parameters", f'{self.mean_value} {self.std_dev}')
self._append_bias_to_xml(element)
return element
[docs]
@classmethod
def from_xml_element(cls, elem: ET.Element):
"""Generate Normal distribution from an XML element
Parameters
----------
elem : lxml.etree._Element
XML element
Returns
-------
openmc.stats.Normal
Normal distribution generated from XML element
"""
params = get_elem_list(elem, "parameters", float)
bias_dist = cls._read_bias_from_xml(elem)
if len(params) == 4:
return cls(params[0], params[1], params[2], params[3], bias=bias_dist)
else:
return cls(params[0], params[1], bias=bias_dist)
[docs]
def muir(e0: float, m_rat: float, kt: float, bias: Univariate | None = None):
"""Generate a Muir energy spectrum
The Muir energy spectrum is a normal distribution, but for convenience
reasons allows the user to specify three parameters to define the
distribution: the mean energy of particles ``e0``, the mass of reactants
``m_rat``, and the ion temperature ``kt``.
.. versionadded:: 0.13.2
Parameters
----------
e0 : float
Mean of the Muir distribution in [eV]
m_rat : float
Ratio of the sum of the masses of the reaction inputs to 1 amu
kt : float
Ion temperature for the Muir distribution in [eV]
bias : openmc.stats.Univariate, optional
Distribution for biased sampling.
Returns
-------
openmc.stats.Normal
Corresponding normal distribution
"""
# https://permalink.lanl.gov/object/tr?what=info:lanl-repo/lareport/LA-05411-MS
std_dev = sqrt(2 * e0 * kt / m_rat)
return Normal(e0, std_dev, bias=bias)
# Retain deprecated name for the time being
def Muir(*args, **kwargs):
# warn of name change
warn(
"The Muir(...) class has been replaced by the muir(...) function and "
"will be removed in a future version of OpenMC. Use muir(...) instead.",
FutureWarning
)
return muir(*args, **kwargs)
[docs]
def fusion_neutron_spectrum(
ion_temp: float,
reactants: str = 'DD',
bias: Univariate | None = None
) -> Normal:
r"""Return a Gaussian energy distribution for fusion neutron emission.
Computes the mean energy and spectral width of the neutron energy spectrum
from thermonuclear fusion reactions in a plasma with Maxwellian ion velocity
distributions. The mean neutron energy is calculated as
.. math::
\langle E_n \rangle = E_0 + \Delta E_\text{th}
where :math:`E_0` is the neutron energy at zero ion temperature and
:math:`\Delta E_\text{th}` is the thermal peak shift due to the motion of
the reacting ions. The spectral width is characterized by the FWHM:
.. math::
W_{1/2} = \omega_0 (1 + \delta_\omega) \sqrt{T_i}
where :math:`\omega_0` is the width at the :math:`T_i \to 0` limit and
:math:`\delta_\omega` is a temperature-dependent correction term. Both
:math:`\Delta E_\text{th}` and :math:`\delta_\omega` are evaluated using
interpolation formulas from `Ballabio et al.
<https://doi.org/10.1088/0029-5515/38/11/310>`_: Table III for :math:`0 <
T_i \le 40` keV and Table IV for :math:`40 < T_i < 100` keV. The returned
distribution is a normal (Gaussian) approximation to the spectrum.
.. versionadded:: 0.15.4
Parameters
----------
ion_temp : float
Ion temperature of the plasma in [eV].
reactants : {'DD', 'DT'}
Fusion reactants. 'DD' corresponds to the D(d,n)\ :sup:`3`\ He reaction
and 'DT' to the T(d,n)\ :math:`\alpha` reaction.
bias : openmc.stats.Univariate, optional
Distribution for biased sampling.
Returns
-------
openmc.stats.Normal
Normal distribution with mean and standard deviation corresponding to
the first and second moments of the fusion neutron energy spectrum. Both
the mean and standard deviation are in [eV].
"""
if ion_temp < 0.0 or ion_temp > 100e3:
raise ValueError("Ion temperature must be between 0 and 100 keV.")
# Formulas from doi:10.1088/0029-5515/38/11/310
mn = NEUTRON_MASS
md = atomic_mass('H2')
ev_per_c2 = 931.49410372*1e6
if reactants == 'DD':
mhe3 = atomic_mass('He3')
Q = (md + md - mhe3 - mn)*ev_per_c2
E_n = mhe3/(mhe3 + mn)*Q
w0 = 82.542
# Low-T constants for peak shift (Table III)
a1 = 4.69515
a2 = -0.040729
a3 = 0.47
a4 = 0.81844
# Low-T constants for width correction (Table III)
b1 = 1.7013e-3
b2 = 0.16888
b3 = 0.49
b4 = 7.9460e-4
# High-T constants for peak shift (Table IV)
a5 = 18.225
a6 = 2.1525
# High-T constants for width correction (Table IV)
b5 = 8.4619e-3
b6 = 8.3241e-4
elif reactants == 'DT':
mt = atomic_mass('H3')
ma = atomic_mass('He4')
Q = (md + mt - ma - mn)*ev_per_c2
E_n = ma/(ma + mn)*Q
w0 = 177.259
# Low-T constants for peak shift (Table III)
a1 = 5.30509
a2 = 2.4736e-3
a3 = 1.84
a4 = 1.3818
# Low-T constants for width correction (Table III)
b1 = 5.1068e-4
b2 = 7.6223e-3
b3 = 1.78
b4 = 8.7691e-5
# High-T constants for peak shift (Table IV)
a5 = 37.771
a6 = 0.92181
# High-T constants for width correction (Table IV)
b5 = 2.0199e-3
b6 = 5.9501e-5
else:
raise ValueError("Invalid reactants specified. Must be 'DD' or 'DT'.")
# Ion temperature in keV
T = ion_temp * 1e-3
if T <= 40.0:
# Low-temperature interpolation (Table III, 0 < T_i <= 40 keV)
Delta_E = a1/(1 + a2*T**a3)*T**(2/3) + a4*T
delta_w = b1/(1 + b2*T**b3)*T**(2/3) + b4*T
else:
# High-temperature interpolation (Table IV, 40 < T_i < 100 keV)
Delta_E = a5 + a6*T
delta_w = b5 + b6*T
# Calculate FWHM
fwhm = (w0*(1 + delta_w) * sqrt(T))*1e3
sigma = fwhm / (2*sqrt(2*log(2)))
return Normal(E_n + Delta_E * 1e3, sigma, bias=bias)
[docs]
class Tabular(Univariate):
"""Piecewise continuous probability distribution.
This class is used to represent a probability distribution whose density
function is tabulated at specific values with a specified interpolation
scheme.
Parameters
----------
x : Iterable of float
Tabulated values of the random variable
p : Iterable of float
Tabulated probabilities. For histogram interpolation, if the length of
`p` is the same as `x`, the last value is ignored. Probabilities `p` are
given per unit of `x`.
interpolation : {'histogram', 'linear-linear', 'linear-log', 'log-linear', 'log-log'}, optional
Indicates how the density function is interpolated between tabulated
points. Defaults to 'linear-linear'.
ignore_negative : bool
Ignore negative probabilities
bias : openmc.stats.Univariate, optional
Distribution for biased sampling.
Attributes
----------
x : numpy.ndarray
Tabulated values of the random variable
p : numpy.ndarray
Tabulated probabilities
interpolation : {'histogram', 'linear-linear', 'linear-log', 'log-linear', 'log-log'}
Indicates how the density function is interpolated between tabulated
points. Defaults to 'linear-linear'.
support : tuple of float
A 2-tuple (lower, upper) defining the interval over which the
distribution is nonzero-valued
bias : openmc.stats.Univariate or None
Distribution for biased sampling
Notes
-----
The probabilities `p` are interpreted per unit of the corresponding
independent variable `x`. This follows the definition of a probability
density function (PDF) in probability theory, where the PDF represents the
relative likelihood of the random variable taking on a particular value per
unit of the variable. For example, if `x` represents energy in eV, then `p`
should represent probabilities per eV.
"""
def __init__(
self,
x: Sequence[float],
p: Sequence[float],
interpolation: str = 'linear-linear',
ignore_negative: bool = False,
bias: Univariate | None = None
):
self.interpolation = interpolation
cv.check_type('tabulated values', x, Iterable, Real)
cv.check_type('tabulated probabilities', p, Iterable, Real)
x = np.array(x, dtype=float)
p = np.array(p, dtype=float)
if p.size > x.size:
raise ValueError('Number of probabilities exceeds number of table values.')
if self.interpolation != 'histogram' and x.size != p.size:
raise ValueError(f'Tabulated values ({x.size}) and probabilities '
f'({p.size}) should have the same length')
if not ignore_negative:
for pk in p:
cv.check_greater_than('tabulated probability', pk, 0.0, True)
self._x = x
self._p = p
super().__init__(bias)
def __len__(self):
return self.p.size
@property
def x(self):
return self._x
@property
def p(self):
return self._p
@property
def interpolation(self):
return self._interpolation
@interpolation.setter
def interpolation(self, interpolation):
cv.check_value('interpolation', interpolation, _INTERPOLATION_SCHEMES)
self._interpolation = interpolation
@property
def support(self):
return (self._x[0], self._x[-1])
def cdf(self):
c = np.zeros_like(self.x)
x = self.x
p = self.p
if self.interpolation == 'histogram':
c[1:] = p[:x.size-1] * np.diff(x)
elif self.interpolation == 'linear-linear':
c[1:] = 0.5 * (p[:-1] + p[1:]) * np.diff(x)
elif self.interpolation == "linear-log":
m = np.diff(p) / np.diff(np.log(x))
c[1:] = p[:-1] * np.diff(x) + m * (
x[1:] * (np.diff(np.log(x)) - 1.0) + x[:-1]
)
elif self.interpolation == "log-linear":
m = np.diff(np.log(p)) / np.diff(x)
c[1:] = p[:-1] * np.diff(x) * exprel(m * np.diff(x))
elif self.interpolation == "log-log":
m = np.diff(np.log(x * p)) / np.diff(np.log(x))
c[1:] = (x * p)[:-1] * np.diff(np.log(x)) * exprel(m * np.diff(np.log(x)))
else:
raise NotImplementedError(
f"Cannot generate CDFs for tabular "
f"distributions using {self.interpolation} interpolation"
)
return np.cumsum(c)
[docs]
def mean(self):
"""Compute the mean of the tabular distribution"""
# use normalized probabilities when computing mean
p = self.p / self.cdf().max()
x = self.x
x_min = x[:-1]
x_max = x[1:]
p_min = p[: x.size - 1]
if self.interpolation == "linear-linear":
m = np.diff(p) / np.diff(x)
mean = ((1.0 / 3.0) * m * np.diff(x**3)
+ 0.5 * (p_min - m * x_min) * np.diff(x**2)).sum()
elif self.interpolation == "linear-log":
m = np.diff(p) / np.diff(np.log(x))
mean = (
(1.0 / 4.0) * m * x_min**2
* ((x_max / x_min)**2 * (2 * np.diff(np.log(x)) - 1) + 1)
+ 0.5 * p_min * np.diff(x**2)
).sum()
elif self.interpolation == "log-linear":
m = np.diff(np.log(p)) / np.diff(x)
mean = (p_min * (
np.diff(x) ** 2
* ((0.5 * exprel2(m * np.diff(x)) * (m * np.diff(x) - 1) + 1))
+ np.diff(x) * x_min * exprel(m * np.diff(x)))
).sum()
elif self.interpolation == "log-log":
m = np.diff(np.log(p)) / np.diff(np.log(x))
mean = (p_min * x_min**2 * np.diff(np.log(x))
* exprel((m + 2) * np.diff(np.log(x)))).sum()
elif self.interpolation == "histogram":
mean = (0.5 * (x_min + x_max) * np.diff(x) * p_min).sum()
else:
raise NotImplementedError(
f"Cannot compute mean for tabular "
f"distributions using {self.interpolation} interpolation"
)
return mean
[docs]
def normalize(self):
"""Normalize the probabilities stored on the distribution"""
self._p /= self.cdf().max()
def _sample_unbiased(self, n_samples: int = 1, seed: int | None = None):
rng = np.random.RandomState(seed)
xi = rng.random(n_samples)
# always use normalized probabilities when sampling
cdf = self.cdf()
p = self.p / cdf.max()
cdf /= cdf.max()
# get CDF bins that are above the
# sampled values
c_i = np.full(n_samples, cdf[0])
cdf_idx = np.zeros(n_samples, dtype=int)
for i, val in enumerate(cdf[:-1]):
mask = xi > val
c_i[mask] = val
cdf_idx[mask] = i
# get table values at each index where
# the random number is less than the next cdf
# entry
x_i = self.x[cdf_idx]
p_i = p[cdf_idx]
if self.interpolation == 'histogram':
# mask where probability is greater than zero
pos_mask = p_i > 0.0
# probabilities greater than zero are set proportional to the
# position of the random numebers in relation to the cdf value
p_i[pos_mask] = x_i[pos_mask] + (xi[pos_mask] - c_i[pos_mask]) \
/ p_i[pos_mask]
# probabilities smaller than zero are set to the random number value
p_i[~pos_mask] = x_i[~pos_mask]
samples_out = p_i
elif self.interpolation == 'linear-linear':
# get variable and probability values for the
# next entry
x_i1 = self.x[cdf_idx + 1]
p_i1 = p[cdf_idx + 1]
# compute slope between entries
m = (p_i1 - p_i) / (x_i1 - x_i)
# set values for zero slope
zero = m == 0.0
m[zero] = x_i[zero] + (xi[zero] - c_i[zero]) / p_i[zero]
# set values for non-zero slope
non_zero = ~zero
quad = np.power(p_i[non_zero], 2) + 2.0 * m[non_zero] * (xi[non_zero] - c_i[non_zero])
quad[quad < 0.0] = 0.0
m[non_zero] = x_i[non_zero] + (np.sqrt(quad) - p_i[non_zero]) / m[non_zero]
samples_out = m
elif self.interpolation == "linear-log":
# get variable and probability values for the
# next entry
x_i1 = self.x[cdf_idx + 1]
p_i1 = p[cdf_idx + 1]
# compute slope between entries
m = (p_i1 - p_i) / np.log(x_i1 / x_i)
# set values for zero slope
zero = m == 0.0
m[zero] = x_i[zero] + (xi[zero] - c_i[zero]) / p_i[zero]
positive = m > 0
negative = m < 0
a = p_i / m - 1
m[positive] = (
x_i
* ((xi - c_i) / (m * x_i) + a)
/ np.real(lambertw((((xi - c_i) / (m * x_i) + a)) * np.exp(a)))
)[positive]
m[negative] = (
x_i
* ((xi - c_i) / (m * x_i) + a)
/ np.real(lambertw((((xi - c_i) / (m * x_i) + a)) * np.exp(a), -1.0))
)[negative]
samples_out = m
elif self.interpolation == "log-linear":
# get variable and probability values for the
# next entry
x_i1 = self.x[cdf_idx + 1]
p_i1 = p[cdf_idx + 1]
# compute slope between entries
m = np.log(p_i1 / p_i) / (x_i1 - x_i)
f = (xi - c_i) / p_i
samples_out = x_i + f * log1prel(m * f)
elif self.interpolation == "log-log":
# get variable and probability values for the
# next entry
x_i1 = self.x[cdf_idx + 1]
p_i1 = p[cdf_idx + 1]
# compute slope between entries
m = np.log((x_i1 * p_i1) / (x_i * p_i)) / np.log(x_i1 / x_i)
f = (xi - c_i) / (x_i * p_i)
samples_out = x_i * np.exp(f * log1prel(m * f))
else:
raise NotImplementedError(
f"Cannot sample tabular distributions "
f"for {self.inteprolation} interpolation "
)
assert all(samples_out < self.x[-1])
return samples_out
[docs]
def sample(self, n_samples: int = 1, seed: int | None = None):
if self.bias is None:
samples = self._sample_unbiased(n_samples, seed)
return samples, np.ones_like(samples)
else:
if self.bias.bias is not None:
raise RuntimeError('Biasing distributions should not have their own bias.')
biased_sample, _ = self.bias.sample(n_samples=n_samples, seed=seed)
self.normalize() # must have normalized probabilities to apply correct weights
wgt = np.array([self.evaluate(s) / self.bias.evaluate(s) for s in biased_sample])
return biased_sample, wgt
[docs]
def evaluate(self, x):
if self.interpolation == 'linear-linear':
i = np.searchsorted(self.x, x, side='left') - 1
if i < 0 or i >= len(self.p) - 1:
return 0.0
x0, x1 = self.x[i], self.x[i + 1]
p0, p1 = self.p[i], self.p[i + 1]
t = (x - x0) / (x1 - x0)
return (1 - t) * p0 + t * p1
elif self.interpolation == 'histogram':
i = np.searchsorted(self.x, x, side='right') - 1
if i < 0 or i >= len(self.p):
return 0.0
return self.p[i]
else:
raise NotImplementedError('Can only evaluate tabular '
'distributions using histogram '
'or linear-linear interpolation.')
[docs]
def to_xml_element(self, element_name: str):
"""Return XML representation of the tabular distribution
Parameters
----------
element_name : str
XML element name
Returns
-------
element : lxml.etree._Element
XML element containing tabular distribution data
"""
element = ET.Element(element_name)
element.set("type", "tabular")
element.set("interpolation", self.interpolation)
params = ET.SubElement(element, "parameters")
params.text = ' '.join(map(str, self.x)) + ' ' + ' '.join(map(str, self.p))
self._append_bias_to_xml(element)
return element
[docs]
@classmethod
def from_xml_element(cls, elem: ET.Element):
"""Generate tabular distribution from an XML element
Parameters
----------
elem : lxml.etree._Element
XML element
Returns
-------
openmc.stats.Tabular
Tabular distribution generated from XML element
"""
interpolation = get_text(elem, 'interpolation')
params = get_elem_list(elem, "parameters", float)
m = (len(params) + 1)//2 # +1 for when len(params) is odd
x = params[:m]
p = params[m:]
bias_dist = cls._read_bias_from_xml(elem)
return cls(x, p, interpolation, bias=bias_dist)
[docs]
def integral(self):
"""Return integral of distribution
.. versionadded:: 0.13.1
Returns
-------
float
Integral of tabular distrbution
"""
if self.interpolation == 'histogram':
return np.sum(np.diff(self.x) * self.p[:self.x.size-1])
elif self.interpolation == 'linear-linear':
return trapezoid(self.p, self.x)
elif self.interpolation == "linear-log":
m = np.diff(self.p) / np.diff(np.log(self.x))
return np.sum(
self.p[:-1] * np.diff(self.x)
+ m * (self.x[1:] * (np.diff(np.log(self.x)) - 1.0) + self.x[:-1])
)
elif self.interpolation == "log-linear":
m = np.diff(np.log(self.p)) / np.diff(self.x)
return np.sum(self.p[:-1] * np.diff(self.x) * exprel(m * np.diff(self.x)))
elif self.interpolation == "log-log":
m = np.diff(np.log(self.p)) / np.diff(np.log(self.x))
return np.sum(self.p[:-1] * self.x[:-1] * np.diff(np.log(self.x))
* exprel((m + 1) * np.diff(np.log(self.x))))
else:
raise NotImplementedError(
f'integral() not supported for {self.interpolation} interpolation')
[docs]
class Legendre(Univariate):
r"""Probability density given by a Legendre polynomial expansion
:math:`\sum\limits_{\ell=0}^N \frac{2\ell + 1}{2} a_\ell P_\ell(\mu)`.
Parameters
----------
coefficients : Iterable of Real
Expansion coefficients :math:`a_\ell`. Note that the :math:`(2\ell +
1)/2` factor should not be included.
bias : openmc.stats.Univariate or None, optional
Distribution for biased sampling.
Attributes
----------
coefficients : Iterable of Real
Expansion coefficients :math:`a_\ell`. Note that the :math:`(2\ell +
1)/2` factor should not be included.
support : tuple of float
A 2-tuple (lower, upper) defining the interval over which the
distribution is nonzero-valued
bias : openmc.stats.Univariate or None
Distribution for biased sampling
"""
def __init__(self, coefficients: Sequence[float], bias: Univariate | None = None):
super().__init__(bias)
self.coefficients = coefficients
self._legendre_poly = None
def __call__(self, x):
# Create Legendre polynomial if we haven't yet
if self._legendre_poly is None:
l = np.arange(len(self._coefficients))
coeffs = (2.*l + 1.)/2. * self._coefficients
self._legendre_poly = np.polynomial.Legendre(coeffs)
return self._legendre_poly(x)
def __len__(self):
return len(self._coefficients)
@property
def coefficients(self):
return self._coefficients
@coefficients.setter
def coefficients(self, coefficients):
self._coefficients = np.asarray(coefficients)
@property
def support(self):
raise NotImplementedError
def _sample_unbiased(self, n_samples=1, seed=None):
raise NotImplementedError
[docs]
def sample(self, n_samples=1, seed=None):
raise NotImplementedError
[docs]
def evaluate(self, x):
raise NotImplementedError
def to_xml_element(self, element_name):
raise NotImplementedError
@classmethod
def from_xml_element(cls, elem):
raise NotImplementedError
[docs]
class Mixture(Univariate):
"""Probability distribution characterized by a mixture of random variables.
Parameters
----------
probability : Iterable of Real
Probability of selecting a particular distribution
distribution : Iterable of Univariate
List of distributions with corresponding probabilities
bias : Iterable of Real, optional
Probability of selecting a particular distribution under biased
sampling
Attributes
----------
probability : Iterable of Real
Probability of selecting a particular distribution
distribution : Iterable of Univariate
List of distributions with corresponding probabilities
support : dict
Dictionary containing discrete and continuous parts of the support
bias : numpy.ndarray or None
Probability of selecting each distribution under biased sampling
"""
def __init__(
self,
probability: Sequence[float],
distribution: Sequence[Univariate],
bias: Sequence[float] | None = None
):
super().__init__(bias)
self.probability = probability
self.distribution = distribution
def __len__(self):
return sum(len(d) for d in self.distribution)
@property
def probability(self):
return self._probability
@probability.setter
def probability(self, probability):
cv.check_type('mixture distribution probabilities', probability,
Iterable, Real)
for p in probability:
cv.check_greater_than('mixture distribution probabilities',
p, 0.0, True)
self._probability = np.array(probability, dtype=float)
@property
def distribution(self):
return self._distribution
@distribution.setter
def distribution(self, distribution):
cv.check_type('mixture distribution components', distribution,
Iterable, Univariate)
self._distribution = distribution
@Univariate.bias.setter
def bias(self, bias):
if bias is None:
self._bias = bias
else:
cv.check_type('biased mixture distribution probabilities', bias,
Iterable, Real)
for b in bias:
cv.check_greater_than('biased mixture distribution probabilities',
b, 0.0, True)
self._bias = np.array(bias, dtype=float)
@property
def support(self):
discrete_points = set()
intervals = []
for dist in self.distribution:
if isinstance(dist, Discrete):
discrete_points |= dist.support
else:
intervals.append(tuple(dist.support))
if intervals:
# simplify union by combining intervals when able
sorted_intervals = sorted(intervals, key=lambda x: x[0])
merged = [sorted_intervals[0]]
for current in sorted_intervals[1:]:
prev_start, prev_end = merged[-1]
curr_start, curr_end = current
if curr_start <= prev_end:
merged[-1] = (prev_start, max(prev_end, curr_end))
else:
merged.append(current)
intervals = merged
return {"discrete": discrete_points, "continuous": intervals}
def cdf(self):
return np.insert(np.cumsum(self.probability), 0, 0.0)
def _sample_unbiased(self, n_samples=1, seed=None):
# Mixture uses internal bias mechanism, not base class bias
rng = np.random.RandomState(seed)
# Get probability of each distribution accounting for its intensity
p = np.array([prob*dist.integral() for prob, dist in
zip(self.probability, self.distribution)])
p /= p.sum()
# Sample from the distributions
idx = rng.choice(range(len(self.distribution)), n_samples, p=p)
# Draw samples from the distributions sampled above
out = np.empty_like(idx, dtype=float)
out_wgt = np.empty_like(idx, dtype=float)
for i in np.unique(idx):
n_dist_samples = np.count_nonzero(idx == i)
samples, weights = self.distribution[i].sample(n_dist_samples)
out[idx == i] = samples
out_wgt[idx == i] = weights
return out, out_wgt
[docs]
def sample(self, n_samples=1, seed=None):
# Mixture uses internal bias mechanism, not base class bias
if self.bias is None:
return self._sample_unbiased(n_samples, seed)
rng = np.random.RandomState(seed)
# Get probability of each distribution accounting for its intensity
p = np.array([prob*dist.integral() for prob, dist in
zip(self.probability, self.distribution)])
p /= p.sum()
b = np.array([prob*dist.integral() for prob, dist in
zip(self.bias, self.distribution)])
b /= b.sum()
# Sample from the distributions using biased probabilities
idx = rng.choice(range(len(self.distribution)), n_samples, p=b)
idx_wgt = np.ones(n_samples)
for i in np.unique(idx):
idx_wgt[idx == i] = p[i]/b[i]
# Draw samples from the distributions sampled above
out = np.empty_like(idx, dtype=float)
out_wgt = np.empty_like(idx, dtype=float)
for i in np.unique(idx):
n_dist_samples = np.count_nonzero(idx == i)
samples, weights = self.distribution[i].sample(n_dist_samples)
out[idx == i] = samples
out_wgt[idx == i] = weights * idx_wgt[idx == i]
return out, out_wgt
[docs]
def evaluate(self, x):
raise NotImplementedError(
"evaluate() is undefined for Mixture distributions")
[docs]
def normalize(self):
"""Normalize the probabilities stored on the distribution"""
norm = sum(self.probability)
self.probability = [val / norm for val in self.probability]
[docs]
def to_xml_element(self, element_name: str):
"""Return XML representation of the mixture distribution
.. versionadded:: 0.13.0
Parameters
----------
element_name : str
XML element name
Returns
-------
element : lxml.etree._Element
XML element containing mixture distribution data
"""
element = ET.Element(element_name)
element.set("type", "mixture")
for p, d in zip(self.probability, self.distribution):
data = ET.SubElement(element, "pair")
data.set("probability", str(p))
data.append(d.to_xml_element("dist"))
self._append_array_bias_to_xml(element)
return element
[docs]
@classmethod
def from_xml_element(cls, elem: ET.Element):
"""Generate mixture distribution from an XML element
.. versionadded:: 0.13.0
Parameters
----------
elem : lxml.etree._Element
XML element
Returns
-------
openmc.stats.Mixture
Mixture distribution generated from XML element
"""
probability = []
distribution = []
for pair in elem.findall('pair'):
probability.append(float(get_text(pair, 'probability')))
distribution.append(Univariate.from_xml_element(pair.find("dist")))
bias_dist = cls._read_array_bias_from_xml(elem)
return cls(probability, distribution, bias=bias_dist)
[docs]
def integral(self):
"""Return integral of the distribution
.. versionadded:: 0.13.1
Returns
-------
float
Integral of the distribution
"""
return sum([
p*dist.integral()
for p, dist in zip(self.probability, self.distribution)
])
[docs]
def mean(self) -> float:
"""Return mean of the mixture distribution
The mean is the weighted average of the means of the component
distributions, weighted by probability * integral.
.. versionadded:: 0.15.3
Returns
-------
float
Mean of the mixture distribution
"""
# Weight each component by its probability and integral
weights = [p*dist.integral() for p, dist in
zip(self.probability, self.distribution)]
total_weight = sum(weights)
if total_weight == 0:
return 0.0
return sum([w*dist.mean() for w, dist in
zip(weights, self.distribution)]) / total_weight
[docs]
def clip(self, tolerance: float = 1e-6, inplace: bool = False) -> Mixture:
r"""Remove low-importance points / distributions
Like :meth:`Discrete.clip`, this method will remove low-importance
points from discrete distributions contained within the mixture but it
will also clip any distributions that have negligible contributions to
the overall intensity.
.. versionadded:: 0.14.0
Parameters
----------
tolerance : float
Maximum fraction of intensities that will be discarded.
inplace : bool
Whether to modify the current object in-place or return a new one.
Returns
-------
Distribution with low-importance points / distributions removed
"""
# Calculate mean * integral for original distribution to compare later.
original_mean_integral = self.mean() * self.integral()
# Determine indices for any distributions that contribute non-negligibly
# to overall mean * integral
mean_integrals = [prob*dist.mean()*dist.integral() for prob, dist in
zip(self.probability, self.distribution)]
indices = _intensity_clip(mean_integrals, tolerance=tolerance)
# Clip mixture of distributions
probability = self.probability[indices]
distribution = [self.distribution[i] for i in indices]
# Clip points from Discrete distributions
distribution = [
dist.clip(tolerance, inplace) if isinstance(dist, Discrete) else dist
for dist in distribution
]
if inplace:
# Set attributes of current object and return
self.probability = probability
self.distribution = distribution
new_dist = self
else:
# Create new distribution
new_dist = type(self)(probability, distribution)
# Show warning if mean * integral of new distribution is not within
# tolerance of original. For energy distributions, mean * integral
# represents total energy.
new_mean_integral = new_dist.mean() * new_dist.integral()
diff = (original_mean_integral - new_mean_integral)/original_mean_integral
if diff > tolerance:
warn("Clipping mixture distribution resulted in a mean*integral "
f"that is lower by a fraction of {diff} when tolerance={tolerance}.")
return new_dist
[docs]
class DecaySpectrum(Univariate):
"""Energy distribution from decay photon spectra of a mixture of nuclides.
This distribution stores nuclide names, their atom densities, and the volume
of the region. When written to XML and read by the C++ solver, the nuclide
names are resolved against the depletion chain to obtain the decay photon
energy spectra and decay constants. The resulting distribution is a mixture
of per-nuclide photon spectra weighted by absolute activity. The volume is
necessary so that the C++ solver can compute the total photon emission rate
in [photons/s], which is used as the source strength.
.. versionadded:: 0.15.4
Parameters
----------
nuclides : dict
Dictionary mapping nuclide name (str) to atom density (float) in units
of [atom/b-cm].
volume : float
Volume of the source region in [cm³]. Used together with atom densities
to compute the absolute photon emission rate.
Attributes
----------
nuclides : dict
Dictionary mapping nuclide name to atom density in [atom/b-cm].
volume : float
Volume of the source region in [cm³].
"""
def __init__(self, nuclides: dict[str, float], volume: float):
super().__init__(bias=None)
self._dist_cache = None
self._dist_cache_key = None
self.nuclides = nuclides
self.volume = volume
def __len__(self):
return len(self.nuclides)
@property
def nuclides(self):
return self._nuclides
@nuclides.setter
def nuclides(self, nuclides):
cv.check_type('nuclides', nuclides, dict)
for name, density in nuclides.items():
cv.check_type('nuclide name', name, str)
cv.check_type(f'atom density for {name}', density, Real)
cv.check_greater_than(f'atom density for {name}', density, 0.0, True)
self._nuclides = dict(nuclides)
self._dist_cache = None
self._dist_cache_key = None
@property
def volume(self):
return self._volume
@volume.setter
def volume(self, volume):
cv.check_type('volume', volume, Real)
cv.check_greater_than('volume', volume, 0.0)
self._volume = float(volume)
self._dist_cache = None
self._dist_cache_key = None
@staticmethod
def _chain_file_cache_key():
"""Return a hashable key for the active depletion chain."""
chain_file = openmc.config.get('chain_file')
if chain_file is None:
return None
path = Path(chain_file).resolve()
try:
stat = path.stat()
except OSError:
return (path, None, None)
return (path, stat.st_mtime, stat.st_size)
[docs]
def to_distribution(self):
"""Convert to a concrete distribution using decay chain data.
Builds a combined photon energy distribution by looking up each nuclide
in the depletion chain via :func:`openmc.data.decay_photon_energy` and
weighting by absolute atom count (``density * 1e24 * volume``). The
result is cached on the object; the cache is invalidated automatically
when :attr:`nuclides` or :attr:`volume` are reassigned.
Requires ``openmc.config['chain_file']`` to be set.
Returns
-------
openmc.stats.Univariate or None
Combined photon energy distribution, or ``None`` if no nuclide in
:attr:`nuclides` has a photon source in the chain.
"""
chain_key = self._chain_file_cache_key()
if self._dist_cache is not None and self._dist_cache_key == chain_key:
return self._dist_cache
dists = []
weights = []
for name, density in self.nuclides.items():
dist = openmc.data.decay_photon_energy(name)
if dist is not None:
dists.append(dist)
weights.append(density * 1e24 * self.volume)
if not dists:
return None
self._dist_cache = combine_distributions(dists, weights)
self._dist_cache_key = chain_key
return self._dist_cache
[docs]
def to_xml_element(self, element_name: str):
"""Return XML representation of the decay photon distribution
Parameters
----------
element_name : str
XML element name
Returns
-------
element : lxml.etree._Element
XML element containing decay photon distribution data
"""
element = ET.Element(element_name)
element.set("type", "decay_spectrum")
element.set("volume", str(self.volume))
nuclides = ET.SubElement(element, "nuclides")
nuclides.text = ' '.join(self.nuclides)
parameters = ET.SubElement(element, "parameters")
parameters.text = ' '.join(str(density) for density in self.nuclides.values())
return element
[docs]
@classmethod
def from_xml_element(cls, elem: ET.Element):
"""Generate decay photon distribution from an XML element
Parameters
----------
elem : lxml.etree._Element
XML element
Returns
-------
openmc.stats.DecaySpectrum
Decay photon distribution generated from XML element
"""
volume = float(elem.get('volume'))
names = get_elem_list(elem, 'nuclides', str)
densities = get_elem_list(elem, 'parameters', float)
nuclides = dict(zip(names, densities))
return cls(nuclides, volume)
def _sample_unbiased(self, n_samples=1, seed=None):
dist = self.to_distribution()
if dist is None:
raise RuntimeError(
"DecaySpectrum._sample_unbiased requires chain data but none "
"was found. Ensure openmc.config['chain_file'] is set and the "
"chain contains photon sources for the nuclides present."
)
return dist.sample(n_samples, seed)[0]
[docs]
def integral(self):
"""Return integral of the distribution
Returns the total photon emission rate in [photons/s] by delegating to
:meth:`to_distribution`. Returns ``0.0`` when no chain data is
available (e.g., ``openmc.config['chain_file']`` is not set).
Returns
-------
float
Total photon emission rate in [photons/s], or ``0.0`` if chain
data is unavailable.
"""
try:
dist = self.to_distribution()
except Exception:
return 0.0
if dist is None:
return 0.0
return dist.integral()
@staticmethod
@cache
def _photon_integral(nuclide: str, chain_key) -> float | None:
"""Return the per-atom photon emission integral for a nuclide"""
dist = openmc.data.decay_photon_energy(nuclide)
return dist.integral() if dist is not None else None
[docs]
def clip(self, tolerance: float = 1e-9, inplace: bool = False):
"""Remove nuclides with negligible contribution to photon emission.
Nuclides that are stable or have no photon source in the depletion
chain are removed unconditionally. The remaining nuclides are ranked
by their photon emission rate (proportional to
``atom_density * decay_constant * photon_yield``) and the least
important are discarded until the cumulative discarded fraction of the
total emission rate exceeds *tolerance*.
Requires ``openmc.config['chain_file']`` to be set.
Parameters
----------
tolerance : float
Maximum fraction of total photon emission rate that may be
discarded.
inplace : bool
Whether to modify the current object in-place or return a new one.
Returns
-------
openmc.stats.DecaySpectrum
Distribution with negligible nuclides removed.
"""
# Compute per-nuclide emission rate; drop non-emitters
emitting_names = []
emitting_densities = []
rates = []
chain_key = self._chain_file_cache_key()
for name, density in self.nuclides.items():
integral = DecaySpectrum._photon_integral(name, chain_key)
if integral is None:
continue
emitting_names.append(name)
emitting_densities.append(density)
rates.append(density * self.volume * integral)
if not emitting_names:
new_nuclides = {}
else:
indices = _intensity_clip(rates, tolerance=tolerance)
new_nuclides = {
emitting_names[i]: emitting_densities[i] for i in indices
}
if inplace:
self._nuclides = new_nuclides
self._dist_cache = None
self._dist_cache_key = None
return self
return type(self)(new_nuclides, self.volume)
@property
def support(self):
return (0.0, np.inf)
[docs]
def evaluate(self, x):
"""Evaluate the probability density at a given value.
Delegates to the combined distribution built from chain data. Raises
``NotImplementedError`` if the combined distribution is a
:class:`~openmc.stats.Mixture` (which does not support
``evaluate()``).
Parameters
----------
x : float
Value at which to evaluate the PDF.
Returns
-------
float
Probability density at *x*.
"""
dist = self.to_distribution()
if dist is None:
raise RuntimeError(
"DecaySpectrum.evaluate requires chain data. Ensure "
"openmc.config['chain_file'] is set."
)
return dist.evaluate(x)
[docs]
def mean(self):
"""Return the mean of the distribution.
Delegates to the combined distribution built from chain data.
Returns
-------
float
Mean photon energy in [eV].
"""
dist = self.to_distribution()
if dist is None:
raise RuntimeError(
"DecaySpectrum.mean requires chain data. Ensure "
"openmc.config['chain_file'] is set."
)
return dist.mean()
[docs]
def combine_distributions(
dists: Sequence[Discrete | Tabular | Mixture],
probs: Sequence[float]
):
"""Combine distributions with specified probabilities
This function can be used to combine multiple instances of
:class:`~openmc.stats.Discrete`, :class:`~openmc.stats.Tabular` and
:class:`~openmc.stats.Mixture` of them. Multiple discrete distributions are
merged into a single distribution and the remainder of the distributions are
put into a :class:`~openmc.stats.Mixture` distribution.
.. versionadded:: 0.13.1
Parameters
----------
dists : sequence of openmc.stats.Discrete, openmc.stats.Tabular, or openmc.stats.Mixture
Distributions to combine
probs : sequence of float
Probability (or intensity) of each distribution
"""
new_probs = []
new_dists = []
for i, dist in enumerate(dists):
cv.check_type(f'dists[{i}]', dist, (Discrete, Tabular, Mixture))
cv.check_type(f'probs[{i}]', probs[i], Real)
cv.check_greater_than(f'probs[{i}]', probs[i], 0.0)
if isinstance(dist, Mixture):
if dist.bias is not None:
warn("A Mixture distribution with a bias specified was passed "
"to combine_distributions. The bias will be discarded "
"during flattening.")
for j, d in enumerate(dist.distribution):
cv.check_type(f'dists[{i}].distribution[{j}]', d, (Discrete, Tabular))
new_probs.append(probs[i]*dist.probability[j])
new_dists.append(d)
else:
new_probs.append(probs[i])
new_dists.append(dist)
probs = new_probs
dists = new_dists
# Get list of discrete/continuous distribution indices
discrete_index = [i for i, d in enumerate(dists) if isinstance(d, Discrete)]
cont_index = [i for i, d in enumerate(dists) if isinstance(d, Tabular)]
cont_dists = [dists[i] for i in cont_index]
cont_probs = [probs[i] for i in cont_index]
if discrete_index:
# Create combined discrete distribution
dist_discrete = [dists[i] for i in discrete_index]
discrete_probs = [probs[i] for i in discrete_index]
combined_dist = Discrete.merge(dist_discrete, discrete_probs)
if cont_index:
return Mixture(cont_probs + [1.0], cont_dists + [combined_dist])
else:
return combined_dist
else:
if len(cont_dists) == 1:
dist = cont_dists[0]
return Tabular(dist.x, dist.p * cont_probs[0],
dist.interpolation, bias=dist.bias)
else:
return Mixture(cont_probs, cont_dists)
def check_bias_support(parent: Univariate, bias: Univariate | None):
"""Ensure that bias distributions share the support of the univariate
distribution they are biasing.
Parameters
----------
parent : openmc.stats.Univariate
Distributions to be biased
bias : openmc.stats.Univariate or None
Proposed bias distribution
"""
if bias is None:
return
def mismatch_error(err_type, msg):
raise err_type(f"Support of parent {type(parent).__name__} and bias "
f"{type(bias).__name__} distributions do not match. "
f"{msg}")
p_sup, b_sup = parent.support, bias.support
if isinstance(p_sup, set) or isinstance(b_sup, set):
raise RuntimeError("Discrete distributions cannot be used as biasing "
"distributions or be biased by another Univariate "
"distribution. Instead, assign a vector of "
"alternate probabilities to the bias attribute.")
elif isinstance(p_sup, dict) or isinstance (b_sup, dict):
raise RuntimeError("Mixture distributions cannot be used as biasing "
"distributions or be biased by another Univariate "
"distribution. Instead, instantiate the Mixture "
"object using biased member distributions, or "
"assign a vector of alternative probabilities to "
"the bias attribute.")
elif isinstance(p_sup, tuple):
if isinstance(b_sup, tuple):
if p_sup != b_sup:
mismatch_error(ValueError, "")
else:
mismatch_error(TypeError, "Incompatible support types.")
else:
raise TypeError("Unrecognized type for parent distribution support")