from numbers import Real
from math import exp, erf, pi, sqrt
from copy import deepcopy
import warnings
import os
import h5py
import pickle
import numpy as np
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import openmc.checkvalue as cv
from ..exceptions import DataError
from ..mixin import EqualityMixin
from . import WMP_VERSION, WMP_VERSION_MAJOR
from .data import K_BOLTZMANN
from .neutron import IncidentNeutron
from .resonance import ResonanceRange
# Constants that determine which value to access
_MP_EA = 0 # Pole
# Residue indices
_MP_RS = 1 # Residue scattering
_MP_RA = 2 # Residue absorption
_MP_RF = 3 # Residue fission
# Polynomial fit indices
_FIT_S = 0 # Scattering
_FIT_A = 1 # Absorption
_FIT_F = 2 # Fission
# Upper temperature limit (K)
TEMPERATURE_LIMIT = 3000
# Logging control
DETAILED_LOGGING = 2
def _faddeeva(z):
r"""Evaluate the complex Faddeeva function.
Technically, the value we want is given by the equation:
.. math::
w(z) = \frac{i}{\pi} \int_{-\infty}^{\infty} \frac{1}{z - t}
\exp(-t^2) \text{d}t
as shown in Equation 63 from Hwang, R. N. "A rigorous pole
representation of multilevel cross sections and its practical
applications." Nuclear Science and Engineering 96.3 (1987): 192-209.
The :func:`scipy.special.wofz` function evaluates
:math:`w(z) = \exp(-z^2) \text{erfc}(-iz)`. These two forms of the Faddeeva
function are related by a transformation.
If we call the integral form :math:`w_\text{int}`, and the function form
:math:`w_\text{fun}`:
.. math::
w_\text{int}(z) =
\begin{cases}
w_\text{fun}(z) & \text{for } \text{Im}(z) > 0\\
-w_\text{fun}(z^*)^* & \text{for } \text{Im}(z) < 0
\end{cases}
Parameters
----------
z : complex
Argument to the Faddeeva function.
Returns
-------
complex
:math:`\frac{i}{\pi} \int_{-\infty}^{\infty} \frac{1}{z - t} \exp(-t^2)
\text{d}t`
"""
from scipy.special import wofz
if np.angle(z) > 0:
return wofz(z)
else:
return -np.conj(wofz(z.conjugate()))
def _broaden_wmp_polynomials(E, dopp, n):
r"""Evaluate Doppler-broadened windowed multipole curvefit.
The curvefit is a polynomial of the form :math:`\frac{a}{E}
+ \frac{b}{\sqrt{E}} + c + d \sqrt{E} + \ldots`
Parameters
----------
E : float
Energy to evaluate at.
dopp : float
sqrt(atomic weight ratio / kT) in units of eV.
n : int
Number of components to the polynomial.
Returns
-------
np.ndarray
The value of each Doppler-broadened curvefit polynomial term.
"""
sqrtE = sqrt(E)
beta = sqrtE * dopp
half_inv_dopp2 = 0.5 / dopp**2
quarter_inv_dopp4 = half_inv_dopp2**2
if beta > 6.0:
# Save time, ERF(6) is 1 to machine precision.
# beta/sqrtpi*exp(-beta**2) is also approximately 1 machine epsilon.
erf_beta = 1.0
exp_m_beta2 = 0.0
else:
erf_beta = erf(beta)
exp_m_beta2 = exp(-beta**2)
# Assume that, for sure, we'll use a second order (1/E, 1/V, const)
# fit, and no less.
factors = np.zeros(n)
factors[0] = erf_beta / E
factors[1] = 1.0 / sqrtE
factors[2] = (factors[0] * (half_inv_dopp2 + E)
+ exp_m_beta2 / (beta * sqrt(pi)))
# Perform recursive broadening of high order components. range(1, n-2)
# replaces a do i = 1, n-3. All indices are reduced by one due to the
# 1-based vs. 0-based indexing.
for i in range(1, n-2):
if i != 1:
factors[i+2] = (-factors[i-2] * (i - 1.0) * i * quarter_inv_dopp4
+ factors[i] * (E + (1.0 + 2.0 * i) * half_inv_dopp2))
else:
factors[i+2] = factors[i]*(E + (1.0 + 2.0 * i) * half_inv_dopp2)
return factors
def _vectfit_xs(energy, ce_xs, mts, rtol=1e-3, atol=1e-5, orders=None,
n_vf_iter=30, log=False, path_out=None):
"""Convert point-wise cross section to multipole data via vector fitting.
Parameters
----------
energy : np.ndarray
Energy array
ce_xs : np.ndarray
Point-wise cross sections to be fitted, with shape (number of reactions,
number of energy points)
mts : Iterable of int
Reaction list
rtol : float, optional
Relative error tolerance
atol : float, optional
Absolute error tolerance
orders : Iterable of int, optional
A list of orders (number of poles) to be searched
n_vf_iter : int, optional
Number of maximum VF iterations
log : bool or int, optional
Whether to print running logs (use int for verbosity control)
path_out : str, optional
Path to save the figures to show discrepancies between the original and
fitted cross sections for different reactions
Returns
-------
tuple
(poles, residues)
"""
# import vectfit package: https://github.com/liangjg/vectfit
import vectfit as vf
ne = energy.size
nmt = len(mts)
if ce_xs.shape != (nmt, ne):
raise ValueError('Inconsistent cross section data.')
# construct test data: interpolate xs with finer grids
n_finer = 10
ne_test = (ne - 1)*n_finer + 1
test_energy = np.interp(np.arange(ne_test),
np.arange(ne_test, step=n_finer), energy)
test_energy[[0, -1]] = energy[[0, -1]] # avoid numerical issue
test_xs_ref = np.zeros((nmt, ne_test))
for i in range(nmt):
test_xs_ref[i] = np.interp(test_energy, energy, ce_xs[i])
if log:
print(" energy: {:.3e} to {:.3e} eV ({} points)".format(
energy[0], energy[-1], ne))
print(" error tolerance: rtol={}, atol={}".format(rtol, atol))
# transform xs (sigma) and energy (E) to f (sigma*E) and s (sqrt(E)) to be
# compatible with the multipole representation
f = ce_xs * energy
s = np.sqrt(energy)
test_s = np.sqrt(test_energy)
# inverse weighting is used for minimizing the relative deviation instead of
# absolute deviation in vector fitting
with np.errstate(divide='ignore'):
weight = 1.0/f
# avoid too large weights which will harm the fitting accuracy
min_cross_section = 1e-7
for i in range(nmt):
if np.all(ce_xs[i] <= min_cross_section):
weight[i] = 1.0
elif np.any(ce_xs[i] <= min_cross_section):
weight[i, ce_xs[i] <= min_cross_section] = \
max(weight[i, ce_xs[i] > min_cross_section])
# detect peaks (resonances) and determine VF order search range
peaks, _ = find_peaks(ce_xs[0] + ce_xs[1])
n_peaks = peaks.size
if orders is not None:
# make sure orders are even integers
orders = list(set([int(i/2)*2 for i in orders if i >= 2]))
else:
lowest_order = max(2, 2*n_peaks)
highest_order = max(200, 4*n_peaks)
orders = list(range(lowest_order, highest_order + 1, 2))
if log:
print("Found {} peaks".format(n_peaks))
print("Fitting orders from {} to {}".format(orders[0], orders[-1]))
# perform VF with increasing orders
found_ideal = False
n_discarded = 0 # for accelation, number of discarded searches
best_quality = best_ratio = -np.inf
for i, order in enumerate(orders):
if log:
print("Order={}({}/{})".format(order, i, len(orders)))
# initial guessed poles
poles_r = np.linspace(s[0], s[-1], order//2)
poles = poles_r + poles_r*0.01j
poles = np.sort(np.append(poles, np.conj(poles)))
found_better = False
# fitting iteration
for i_vf in range(n_vf_iter):
if log >= DETAILED_LOGGING:
print("VF iteration {}/{}".format(i_vf + 1, n_vf_iter))
# call vf
poles, residues, cf, f_fit, rms = vf.vectfit(f, s, poles, weight)
# convert real pole to conjugate pairs
n_real_poles = 0
new_poles = []
for p in poles:
p_r, p_i = np.real(p), np.imag(p)
if (s[0] <= p_r <= s[-1]) and p_i == 0.:
new_poles += [p_r+p_r*0.01j, p_r-p_r*0.01j]
n_real_poles += 1
else:
new_poles += [p]
new_poles = np.array(new_poles)
# re-calculate residues if poles changed
if n_real_poles > 0:
if log >= DETAILED_LOGGING:
print(" # real poles: {}".format(n_real_poles))
new_poles, residues, cf, f_fit, rms = \
vf.vectfit(f, s, new_poles, weight, skip_pole=True)
# assess the result on test grid
test_xs = vf.evaluate(test_s, new_poles, residues) / test_energy
abserr = np.abs(test_xs - test_xs_ref)
with np.errstate(invalid='ignore', divide='ignore'):
relerr = abserr / test_xs_ref
if np.any(np.isnan(abserr)):
maxre, ratio, ratio2 = np.inf, -np.inf, -np.inf
elif np.all(abserr <= atol):
maxre, ratio, ratio2 = 0., 1., 1.
else:
maxre = np.max(relerr[abserr > atol])
ratio = np.sum((relerr < rtol) | (abserr < atol)) / relerr.size
ratio2 = np.sum((relerr < 10*rtol) | (abserr < atol)) / relerr.size
# define a metric for choosing the best fitting results
# basically, it is preferred to have more points within accuracy
# tolerance, smaller maximum deviation and fewer poles
#TODO: improve the metric with clearer basis
quality = ratio + ratio2 - min(0.1*maxre, 1) - 0.001*new_poles.size
if np.any(test_xs < -atol):
quality = -np.inf
if log >= DETAILED_LOGGING:
print(" # poles: {}".format(new_poles.size))
print(" Max relative error: {:.3f}%".format(maxre*100))
print(" Satisfaction: {:.1f}%, {:.1f}%".format(ratio*100, ratio2*100))
print(" Quality: {:.2f}".format(quality))
if quality > best_quality:
if log >= DETAILED_LOGGING:
print(" Best so far!")
found_better = True
best_quality, best_ratio = quality, ratio
best_poles, best_residues = new_poles, residues
best_test_xs, best_relerr = test_xs, relerr
if best_ratio >= 1.0:
if log:
print("Found ideal results. Stop!")
found_ideal = True
break
else:
if log >= DETAILED_LOGGING:
print(" Discarded!")
if found_ideal:
break
# acceleration
if found_better:
n_discarded = 0
else:
if order > max(2*n_peaks, 50) and best_ratio > 0.7:
n_discarded += 1
if n_discarded >= 10 or (n_discarded >= 5 and best_ratio > 0.9):
if log >= DETAILED_LOGGING:
print("Couldn't get better results. Stop!")
break
# merge conjugate poles
real_idx = []
conj_idx = []
found_conj = False
for i, p in enumerate(best_poles):
if found_conj:
found_conj = False
continue
if np.imag(p) == 0.:
real_idx.append(i)
else:
if i < best_poles.size and np.conj(p) == best_poles[i + 1]:
found_conj = True
conj_idx.append(i)
else:
raise RuntimeError("Complex poles are not conjugate!")
if log:
print("Found {} real poles and {} conjugate complex pairs.".format(
len(real_idx), len(conj_idx)))
mp_poles = best_poles[real_idx + conj_idx]
mp_residues = np.concatenate((best_residues[:, real_idx],
best_residues[:, conj_idx]*2), axis=1)/1j
if log:
print("Final number of poles: {}".format(mp_poles.size))
if path_out:
if not os.path.exists(path_out):
os.makedirs(path_out)
for i, mt in enumerate(mts):
if not test_xs_ref[i].any():
continue
fig, ax1 = plt.subplots()
lns1 = ax1.loglog(test_energy, test_xs_ref[i], 'g', label="ACE xs")
lns2 = ax1.loglog(test_energy, best_test_xs[i], 'b', label="VF xs")
ax2 = ax1.twinx()
lns3 = ax2.loglog(test_energy, best_relerr[i], 'r',
label="Relative error", alpha=0.5)
lns = lns1 + lns2 + lns3
labels = [l.get_label() for l in lns]
ax1.legend(lns, labels, loc='best')
ax1.set_xlabel('energy (eV)')
ax1.set_ylabel('cross section (b)', color='b')
ax1.tick_params('y', colors='b')
ax2.set_ylabel('relative error', color='r')
ax2.tick_params('y', colors='r')
plt.title("MT {} vector fitted with {} poles".format(mt, mp_poles.size))
fig.tight_layout()
fig_file = os.path.join(path_out, "{:.0f}-{:.0f}_MT{}.png".format(
energy[0], energy[-1], mt))
plt.savefig(fig_file)
plt.close()
if log:
print("Saved figure: {}".format(fig_file))
return (mp_poles, mp_residues)
def vectfit_nuclide(endf_file, njoy_error=5e-4, vf_pieces=None,
log=False, path_out=None, mp_filename=None, **kwargs):
r"""Generate multipole data for a nuclide from ENDF.
Parameters
----------
endf_file : str
Path to ENDF evaluation
njoy_error : float, optional
Fractional error tolerance for processing point-wise data with NJOY
vf_pieces : integer, optional
Number of equal-in-momentum spaced energy pieces for data fitting
log : bool or int, optional
Whether to print running logs (use int for verbosity control)
path_out : str, optional
Path to write out mutipole data file and vector fitting figures
mp_filename : str, optional
File name to write out multipole data
**kwargs
Keyword arguments passed to :func:`openmc.data.multipole._vectfit_xs`
Returns
-------
mp_data
Dictionary containing necessary multipole data of the nuclide
"""
# ======================================================================
# PREPARE POINT-WISE XS
# make 0K ACE data using njoy
if log:
print("Running NJOY to get 0K point-wise data (error={})...".format(njoy_error))
nuc_ce = IncidentNeutron.from_njoy(endf_file, temperatures=[0.0],
error=njoy_error, broadr=False, heatr=False, purr=False)
if log:
print("Parsing cross sections within resolved resonance range...")
# Determine upper energy: the lower of RRR upper bound and first threshold
endf_res = IncidentNeutron.from_endf(endf_file).resonances
if hasattr(endf_res, 'resolved') and \
hasattr(endf_res.resolved, 'energy_max') and \
type(endf_res.resolved) is not ResonanceRange:
E_max = endf_res.resolved.energy_max
elif hasattr(endf_res, 'unresolved') and \
hasattr(endf_res.unresolved, 'energy_min'):
E_max = endf_res.unresolved.energy_min
else:
E_max = nuc_ce.energy['0K'][-1]
E_max_idx = np.searchsorted(nuc_ce.energy['0K'], E_max, side='right') - 1
for mt in nuc_ce.reactions:
if hasattr(nuc_ce.reactions[mt].xs['0K'], '_threshold_idx'):
threshold_idx = nuc_ce.reactions[mt].xs['0K']._threshold_idx
if 0 < threshold_idx < E_max_idx:
E_max_idx = threshold_idx
# parse energy and cross sections
energy = nuc_ce.energy['0K'][:E_max_idx + 1]
E_min, E_max = energy[0], energy[-1]
n_points = energy.size
total_xs = nuc_ce[1].xs['0K'](energy)
elastic_xs = nuc_ce[2].xs['0K'](energy)
try:
absorption_xs = nuc_ce[27].xs['0K'](energy)
except KeyError:
absorption_xs = np.zeros_like(total_xs)
fissionable = False
try:
fission_xs = nuc_ce[18].xs['0K'](energy)
fissionable = True
except KeyError:
pass
# make vectors
if fissionable:
ce_xs = np.vstack((elastic_xs, absorption_xs, fission_xs))
mts = [2, 27, 18]
else:
ce_xs = np.vstack((elastic_xs, absorption_xs))
mts = [2, 27]
if log:
print(" MTs: {}".format(mts))
print(" Energy range: {:.3e} to {:.3e} eV ({} points)".format(
E_min, E_max, n_points))
# ======================================================================
# PERFORM VECTOR FITTING
if vf_pieces is None:
# divide into pieces for complex nuclides
peaks, _ = find_peaks(total_xs)
n_peaks = peaks.size
if n_peaks > 200 or n_points > 30000 or n_peaks * n_points > 100*10000:
vf_pieces = max(5, n_peaks // 50, n_points // 2000)
else:
vf_pieces = 1
piece_width = (sqrt(E_max) - sqrt(E_min)) / vf_pieces
alpha = nuc_ce.atomic_weight_ratio/(K_BOLTZMANN*TEMPERATURE_LIMIT)
poles, residues = [], []
# VF piece by piece
for i_piece in range(vf_pieces):
if log:
print("Vector fitting piece {}/{}...".format(i_piece + 1, vf_pieces))
# start E of this piece
e_bound = (sqrt(E_min) + piece_width*(i_piece-0.5))**2
if i_piece == 0 or sqrt(alpha*e_bound) < 4.0:
e_start = E_min
e_start_idx = 0
else:
e_start = max(E_min, (sqrt(alpha*e_bound) - 4.0)**2/alpha)
e_start_idx = np.searchsorted(energy, e_start, side='right') - 1
# end E of this piece
e_bound = (sqrt(E_min) + piece_width*(i_piece + 1))**2
e_end = min(E_max, (sqrt(alpha*e_bound) + 4.0)**2/alpha)
e_end_idx = np.searchsorted(energy, e_end, side='left') + 1
e_idx = range(e_start_idx, min(e_end_idx + 1, n_points))
p, r = _vectfit_xs(energy[e_idx], ce_xs[:, e_idx], mts, log=log,
path_out=path_out, **kwargs)
poles.append(p)
residues.append(r)
# collect multipole data into a dictionary
mp_data = {"name": nuc_ce.name,
"AWR": nuc_ce.atomic_weight_ratio,
"E_min": E_min,
"E_max": E_max,
"poles": poles,
"residues": residues}
# dump multipole data to file
if path_out:
if not os.path.exists(path_out):
os.makedirs(path_out)
if not mp_filename:
mp_filename = "{}_mp.pickle".format(nuc_ce.name)
mp_filename = os.path.join(path_out, mp_filename)
with open(mp_filename, 'wb') as f:
pickle.dump(mp_data, f)
if log:
print("Dumped multipole data to file: {}".format(mp_filename))
return mp_data
def _windowing(mp_data, n_cf, rtol=1e-3, atol=1e-5, n_win=None, spacing=None,
log=False):
"""Generate windowed multipole library from multipole data with specific
settings of window size, curve fit order, etc.
Parameters
----------
mp_data : dict
Multipole data
n_cf : int
Curve fitting order
rtol : float, optional
Maximum relative error tolerance
atol : float, optional
Minimum absolute error tolerance
n_win : int, optional
Number of equal-in-mementum spaced energy windows
spacing : float, optional
Inner window spacing (sqrt energy space)
log : bool or int, optional
Whether to print running logs (use int for verbosity control)
Returns
-------
openmc.data.WindowedMultipole
Resonant cross sections represented in the windowed multipole
format.
"""
# import vectfit package: https://github.com/liangjg/vectfit
import vectfit as vf
# unpack multipole data
name = mp_data["name"]
awr = mp_data["AWR"]
E_min = mp_data["E_min"]
E_max = mp_data["E_max"]
mp_poles = mp_data["poles"]
mp_residues = mp_data["residues"]
n_pieces = len(mp_poles)
piece_width = (sqrt(E_max) - sqrt(E_min)) / n_pieces
alpha = awr / (K_BOLTZMANN*TEMPERATURE_LIMIT)
# determine window size
if n_win is None:
if spacing is not None:
# ensure the windows are within the multipole energy range
n_win = int((sqrt(E_max) - sqrt(E_min)) / spacing)
E_max = (sqrt(E_min) + n_win*spacing)**2
else:
n_win = 1000
# inner window size
spacing = (sqrt(E_max) - sqrt(E_min)) / n_win
# make sure inner window size is smaller than energy piece size
if spacing > piece_width:
raise ValueError('Window spacing cannot be larger than piece spacing.')
if log:
print("Windowing:")
print(" config: # windows={}, spacing={}, CF order={}".format(
n_win, spacing, n_cf))
print(" error tolerance: rtol={}, atol={}".format(rtol, atol))
# sort poles (and residues) by the real component of the pole
for ip in range(n_pieces):
indices = mp_poles[ip].argsort()
mp_poles[ip] = mp_poles[ip][indices]
mp_residues[ip] = mp_residues[ip][:, indices]
# initialize an array to record whether each pole is used or not
poles_unused = [np.ones_like(p, dtype=int) for p in mp_poles]
# optimize the windows: the goal is to find the least set of significant
# consecutive poles and curve fit coefficients to reproduce cross section
win_data = []
for iw in range(n_win):
if log >= DETAILED_LOGGING:
print("Processing window {}/{}...".format(iw + 1, n_win))
# inner window boundaries
inbegin = sqrt(E_min) + spacing * iw
inend = inbegin + spacing
incenter = (inbegin + inend) / 2.0
# extend window energy range for Doppler broadening
if iw == 0 or sqrt(alpha)*inbegin < 4.0:
e_start = inbegin**2
else:
e_start = max(E_min, (sqrt(alpha)*inbegin - 4.0)**2/alpha)
e_end = min(E_max, (sqrt(alpha)*inend + 4.0)**2/alpha)
# locate piece and relevant poles
i_piece = min(n_pieces - 1, int((inbegin - sqrt(E_min))/piece_width + 0.5))
poles, residues = mp_poles[i_piece], mp_residues[i_piece]
n_poles = poles.size
# generate energy points for fitting: equally spaced in momentum
n_points = min(max(100, int((e_end - e_start)*4)), 10000)
energy_sqrt = np.linspace(np.sqrt(e_start), np.sqrt(e_end), n_points)
energy = energy_sqrt**2
# reference xs from multipole form, note the residue terms in the
# multipole and vector fitting representations differ by a 1j
xs_ref = vf.evaluate(energy_sqrt, poles, residues*1j) / energy
# curve fit matrix
matrix = np.vstack([energy**(0.5*i - 1) for i in range(n_cf + 1)]).T
# start from 0 poles, initialize pointers to the center nearest pole
center_pole_ind = np.argmin((np.fabs(poles.real - incenter)))
lp = rp = center_pole_ind
while True:
if log >= DETAILED_LOGGING:
print("Trying poles {} to {}".format(lp, rp))
# calculate the cross sections contributed by the windowed poles
if rp > lp:
xs_wp = vf.evaluate(energy_sqrt, poles[lp:rp],
residues[:, lp:rp]*1j) / energy
else:
xs_wp = np.zeros_like(xs_ref)
# do least square curve fit on the remains
coefs = np.linalg.lstsq(matrix, (xs_ref - xs_wp).T, rcond=None)[0]
xs_fit = (matrix @ coefs).T
# assess the result
abserr = np.abs(xs_fit + xs_wp - xs_ref)
with np.errstate(invalid='ignore', divide='ignore'):
relerr = abserr / xs_ref
if not np.any(np.isnan(abserr)):
re = relerr[abserr > atol]
if re.size == 0 or np.all(re <= rtol) or \
(re.max() <= 2*rtol and (re > rtol).sum() <= 0.01*relerr.size) or \
(iw == 0 and np.all(relerr.mean(axis=1) <= rtol)):
# meet tolerances
if log >= DETAILED_LOGGING:
print("Accuracy satisfied.")
break
# we expect pure curvefit will succeed for the first window
# TODO: find the energy boundary below which no poles are allowed
if iw == 0:
raise RuntimeError('Pure curvefit failed for the first window!')
# try to include one more pole (next center nearest)
if rp >= n_poles:
lp -= 1
elif lp <= 0 or poles[rp] - incenter <= incenter - poles[lp - 1]:
rp += 1
else:
lp -= 1
# save data for this window
win_data.append((i_piece, lp, rp, coefs))
# mark the windowed poles as used poles
poles_unused[i_piece][lp:rp] = 0
# flatten and shrink by removing unused poles
data = [] # used poles and residues
for ip in range(n_pieces):
used = (poles_unused[ip] == 0)
# stack poles and residues for library format
data.append(np.vstack([mp_poles[ip][used], mp_residues[ip][:, used]]).T)
# stack poles/residues in sequence vertically
data = np.vstack(data)
# new start/end pole indices
windows = []
curvefit = []
for iw in range(n_win):
ip, lp, rp, coefs = win_data[iw]
# adjust indices and change to 1-based for the library format
n_prev_poles = sum([poles_unused[i].size for i in range(ip)])
n_unused = sum([(poles_unused[i] == 1).sum() for i in range(ip)]) + \
(poles_unused[ip][:lp] == 1).sum()
lp += n_prev_poles - n_unused + 1
rp += n_prev_poles - n_unused
windows.append([lp, rp])
curvefit.append(coefs)
# construct the WindowedMultipole object
wmp = WindowedMultipole(name)
wmp.spacing = spacing
wmp.sqrtAWR = sqrt(awr)
wmp.E_min = E_min
wmp.E_max = E_max
wmp.data = data
wmp.windows = np.asarray(windows)
wmp.curvefit = np.asarray(curvefit)
# TODO: check if Doppler brodening of the polynomial curvefit is negligible
wmp.broaden_poly = np.ones((n_win,), dtype=bool)
return wmp
[docs]class WindowedMultipole(EqualityMixin):
"""Resonant cross sections represented in the windowed multipole format.
Parameters
----------
name : str
Name of the nuclide using the GND naming convention
Attributes
----------
name : str
Name of the nuclide using the GND naming convention
spacing : float
The width of each window in sqrt(E)-space. For example, the frst window
will end at (sqrt(E_min) + spacing)**2 and the second window at
(sqrt(E_min) + 2*spacing)**2.
sqrtAWR : float
Square root of the atomic weight ratio of the target nuclide.
E_min : float
Lowest energy in eV the library is valid for.
E_max : float
Highest energy in eV the library is valid for.
data : np.ndarray
A 2D array of complex poles and residues. data[i, 0] gives the energy
at which pole i is located. data[i, 1:] gives the residues associated
with the i-th pole. There are 3 residues, one each for the scattering,
absorption, and fission channels.
windows : np.ndarray
A 2D array of Integral values. windows[i, 0] - 1 is the index of the
first pole in window i. windows[i, 1] - 1 is the index of the last pole
in window i.
broaden_poly : np.ndarray
A 1D array of boolean values indicating whether or not the polynomial
curvefit in that window should be Doppler broadened.
curvefit : np.ndarray
A 3D array of Real curvefit polynomial coefficients. curvefit[i, 0, :]
gives coefficients for the scattering cross section in window i.
curvefit[i, 1, :] gives absorption coefficients and curvefit[i, 2, :]
gives fission coefficients. The polynomial terms are increasing powers
of sqrt(E) starting with 1/E e.g:
a/E + b/sqrt(E) + c + d sqrt(E) + ...
"""
def __init__(self, name):
self.name = name
self.spacing = None
self.sqrtAWR = None
self.E_min = None
self.E_max = None
self.data = None
self.windows = None
self.broaden_poly = None
self.curvefit = None
@property
def name(self):
return self._name
@property
def fit_order(self):
return self.curvefit.shape[1] - 1
@property
def fissionable(self):
return self.data.shape[1] == 4
@property
def n_poles(self):
return self.data.shape[0]
@property
def n_windows(self):
return self.windows.shape[0]
@property
def poles_per_window(self):
return (self.windows[:, 1] - self.windows[:, 0] + 1).mean()
@property
def spacing(self):
return self._spacing
@property
def sqrtAWR(self):
return self._sqrtAWR
@property
def E_min(self):
return self._E_min
@property
def E_max(self):
return self._E_max
@property
def data(self):
return self._data
@property
def windows(self):
return self._windows
@property
def broaden_poly(self):
return self._broaden_poly
@property
def curvefit(self):
return self._curvefit
@name.setter
def name(self, name):
cv.check_type('name', name, str)
self._name = name
@spacing.setter
def spacing(self, spacing):
if spacing is not None:
cv.check_type('spacing', spacing, Real)
cv.check_greater_than('spacing', spacing, 0.0, equality=False)
self._spacing = spacing
@sqrtAWR.setter
def sqrtAWR(self, sqrtAWR):
if sqrtAWR is not None:
cv.check_type('sqrtAWR', sqrtAWR, Real)
cv.check_greater_than('sqrtAWR', sqrtAWR, 0.0, equality=False)
self._sqrtAWR = sqrtAWR
@E_min.setter
def E_min(self, E_min):
if E_min is not None:
cv.check_type('E_min', E_min, Real)
cv.check_greater_than('E_min', E_min, 0.0, equality=True)
self._E_min = E_min
@E_max.setter
def E_max(self, E_max):
if E_max is not None:
cv.check_type('E_max', E_max, Real)
cv.check_greater_than('E_max', E_max, 0.0, equality=False)
self._E_max = E_max
@data.setter
def data(self, data):
if data is not None:
cv.check_type('data', data, np.ndarray)
if len(data.shape) != 2:
raise ValueError('Multipole data arrays must be 2D')
if data.shape[1] not in (3, 4):
raise ValueError(
'data.shape[1] must be 3 or 4. One value for the pole.'
' One each for the scattering and absorption residues. '
'Possibly one more for a fission residue.')
if not np.issubdtype(data.dtype, np.complexfloating):
raise TypeError('Multipole data arrays must be complex dtype')
self._data = data
@windows.setter
def windows(self, windows):
if windows is not None:
cv.check_type('windows', windows, np.ndarray)
if len(windows.shape) != 2:
raise ValueError('Multipole windows arrays must be 2D')
if not np.issubdtype(windows.dtype, np.integer):
raise TypeError('Multipole windows arrays must be integer'
' dtype')
self._windows = windows
@broaden_poly.setter
def broaden_poly(self, broaden_poly):
if broaden_poly is not None:
cv.check_type('broaden_poly', broaden_poly, np.ndarray)
if len(broaden_poly.shape) != 1:
raise ValueError('Multipole broaden_poly arrays must be 1D')
if not np.issubdtype(broaden_poly.dtype, np.bool_):
raise TypeError('Multipole broaden_poly arrays must be boolean'
' dtype')
self._broaden_poly = broaden_poly
@curvefit.setter
def curvefit(self, curvefit):
if curvefit is not None:
cv.check_type('curvefit', curvefit, np.ndarray)
if len(curvefit.shape) != 3:
raise ValueError('Multipole curvefit arrays must be 3D')
if curvefit.shape[2] not in (2, 3): # sig_s, sig_a (maybe sig_f)
raise ValueError('The third dimension of multipole curvefit'
' arrays must have a length of 2 or 3')
if not np.issubdtype(curvefit.dtype, np.floating):
raise TypeError('Multipole curvefit arrays must be float dtype')
self._curvefit = curvefit
[docs] @classmethod
def from_hdf5(cls, group_or_filename):
"""Construct a WindowedMultipole object from an HDF5 group or file.
Parameters
----------
group_or_filename : h5py.Group or str
HDF5 group containing multipole data. If given as a string, it is
assumed to be the filename for the HDF5 file, and the first group is
used to read from.
Returns
-------
openmc.data.WindowedMultipole
Resonant cross sections represented in the windowed multipole
format.
"""
if isinstance(group_or_filename, h5py.Group):
group = group_or_filename
need_to_close = False
else:
h5file = h5py.File(str(group_or_filename), 'r')
need_to_close = True
# Make sure version matches
if 'version' in h5file.attrs:
major, minor = h5file.attrs['version']
if major != WMP_VERSION_MAJOR:
raise DataError(
'WMP data format uses version {}. {} whereas your '
'installation of the OpenMC Python API expects version '
'{}.x.'.format(major, minor, WMP_VERSION_MAJOR))
else:
raise DataError(
'WMP data does not indicate a version. Your installation of '
'the OpenMC Python API expects version {}.x data.'
.format(WMP_VERSION_MAJOR))
group = list(h5file.values())[0]
name = group.name[1:]
out = cls(name)
# Read scalars.
out.spacing = group['spacing'][()]
out.sqrtAWR = group['sqrtAWR'][()]
out.E_min = group['E_min'][()]
out.E_max = group['E_max'][()]
# Read arrays.
err = "WMP '{}' array shape is not consistent with the '{}' array shape"
out.data = group['data'][()]
out.windows = group['windows'][()]
out.broaden_poly = group['broaden_poly'][...].astype(np.bool)
if out.broaden_poly.shape[0] != out.windows.shape[0]:
raise ValueError(err.format('broaden_poly', 'windows'))
out.curvefit = group['curvefit'][()]
if out.curvefit.shape[0] != out.windows.shape[0]:
raise ValueError(err.format('curvefit', 'windows'))
# _broaden_wmp_polynomials assumes the curve fit has at least 3 terms.
if out.fit_order < 2:
raise ValueError("Windowed multipole is only supported for "
"curvefits with 3 or more terms.")
# If HDF5 file was opened here, make sure it gets closed
if need_to_close:
h5file.close()
return out
[docs] @classmethod
def from_endf(cls, endf_file, log=False, vf_options=None, wmp_options=None):
"""Generate windowed multipole neutron data from an ENDF evaluation.
.. versionadded:: 0.12.1
Parameters
----------
endf_file : str
Path to ENDF evaluation
log : bool or int, optional
Whether to print running logs (use int for verbosity control)
vf_options : dict, optional
Dictionary of keyword arguments, e.g. {'njoy_error': 0.001},
passed to :func:`openmc.data.multipole.vectfit_nuclide`
wmp_options : dict, optional
Dictionary of keyword arguments, e.g. {'search': True, 'rtol': 0.01},
passed to :func:`openmc.data.WindowedMultipole.from_multipole`
Returns
-------
openmc.data.WindowedMultipole
Resonant cross sections represented in the windowed multipole
format.
"""
if vf_options is None:
vf_options = {}
if wmp_options is None:
wmp_options = {}
if log:
vf_options.update(log=log)
wmp_options.update(log=log)
# generate multipole data from EDNF
mp_data = vectfit_nuclide(endf_file, **vf_options)
# windowing
return cls.from_multipole(mp_data, **wmp_options)
[docs] @classmethod
def from_multipole(cls, mp_data, search=None, log=False, **kwargs):
"""Generate windowed multipole neutron data from multipole data.
Parameters
----------
mp_data : dictionary or str
Dictionary or Path to the multipole data stored in a pickle file
search : bool, optional
Whether to search for optimal window size and curvefit order.
Defaults to True if no windowing parameters are specified.
log : bool or int, optional
Whether to print running logs (use int for verbosity control)
**kwargs
Keyword arguments passed to :func:`openmc.data.multipole._windowing`
Returns
-------
openmc.data.WindowedMultipole
Resonant cross sections represented in the windowed multipole
format.
"""
if isinstance(mp_data, str):
# load multipole data from file
with open(mp_data, 'rb') as f:
mp_data = pickle.load(f)
if search is None:
if 'n_cf' in kwargs and ('n_win' in kwargs or 'spacing' in kwargs):
search = False
else:
search = True
# windowing with specific options
if not search:
# set default value for curvefit order if not specified
if 'n_cf' not in kwargs:
kwargs.update(n_cf=5)
return _windowing(mp_data, log=log, **kwargs)
# search optimal WMP from a range of window sizes and CF orders
if log:
print("Start searching ...")
n_poles = sum([p.size for p in mp_data["poles"]])
n_win_min = max(5, n_poles // 20)
n_win_max = 2000 if n_poles < 2000 else 8000
best_wmp = best_metric = None
for n_w in np.unique(np.linspace(n_win_min, n_win_max, 20, dtype=int)):
for n_cf in range(10, 1, -1):
if log:
print("Testing N_win={} N_cf={}".format(n_w, n_cf))
# update arguments dictionary
kwargs.update(n_win=n_w, n_cf=n_cf)
# windowing
try:
wmp = _windowing(mp_data, log=log, **kwargs)
except Exception as e:
if log:
print('Failed: ' + str(e))
break
# select wmp library with metric:
# - performance: average # used poles per window and CF order
# - memory: # windows
metric = -(wmp.poles_per_window * 10. + wmp.fit_order * 1. +
wmp.n_windows * 0.01)
if best_wmp is None or metric > best_metric:
if log:
print("Best library so far.")
best_wmp = deepcopy(wmp)
best_metric = metric
# return the best wmp library
if log:
print("Final library: {} poles, {} windows, {:.2g} poles per window, "
"{} CF order".format(best_wmp.n_poles, best_wmp.n_windows,
best_wmp.poles_per_window, best_wmp.fit_order))
return best_wmp
def _evaluate(self, E, T):
"""Compute scattering, absorption, and fission cross sections.
Parameters
----------
E : Real
Energy of the incident neutron in eV.
T : Real
Temperature of the target in K.
Returns
-------
3-tuple of Real
Total, absorption, and fission microscopic cross sections at the
given energy and temperature.
"""
if E < self.E_min: return (0, 0, 0)
if E > self.E_max: return (0, 0, 0)
# ======================================================================
# Bookkeeping
# Define some frequently used variables.
sqrtkT = sqrt(K_BOLTZMANN * T)
sqrtE = sqrt(E)
invE = 1.0 / E
# Locate us. The i_window calc omits a + 1 present in F90 because of
# the 1-based vs. 0-based indexing. Similarly startw needs to be
# decreased by 1. endw does not need to be decreased because
# range(startw, endw) does not include endw.
i_window = min(self.n_windows - 1,
int(np.floor((sqrtE - sqrt(self.E_min)) / self.spacing)))
startw = self.windows[i_window, 0] - 1
endw = self.windows[i_window, 1]
# Initialize the ouptut cross sections.
sig_s = 0.0
sig_a = 0.0
sig_f = 0.0
# ======================================================================
# Add the contribution from the curvefit polynomial.
if sqrtkT != 0 and self.broaden_poly[i_window]:
# Broaden the curvefit.
dopp = self.sqrtAWR / sqrtkT
broadened_polynomials = _broaden_wmp_polynomials(E, dopp,
self.fit_order + 1)
for i_poly in range(self.fit_order + 1):
sig_s += (self.curvefit[i_window, i_poly, _FIT_S]
* broadened_polynomials[i_poly])
sig_a += (self.curvefit[i_window, i_poly, _FIT_A]
* broadened_polynomials[i_poly])
if self.fissionable:
sig_f += (self.curvefit[i_window, i_poly, _FIT_F]
* broadened_polynomials[i_poly])
else:
temp = invE
for i_poly in range(self.fit_order + 1):
sig_s += self.curvefit[i_window, i_poly, _FIT_S] * temp
sig_a += self.curvefit[i_window, i_poly, _FIT_A] * temp
if self.fissionable:
sig_f += self.curvefit[i_window, i_poly, _FIT_F] * temp
temp *= sqrtE
# ======================================================================
# Add the contribution from the poles in this window.
if sqrtkT == 0.0:
# If at 0K, use asymptotic form.
for i_pole in range(startw, endw):
psi_chi = -1j / (self.data[i_pole, _MP_EA] - sqrtE)
c_temp = psi_chi / E
sig_s += (self.data[i_pole, _MP_RS] * c_temp).real
sig_a += (self.data[i_pole, _MP_RA] * c_temp).real
if self.fissionable:
sig_f += (self.data[i_pole, _MP_RF] * c_temp).real
else:
# At temperature, use Faddeeva function-based form.
dopp = self.sqrtAWR / sqrtkT
for i_pole in range(startw, endw):
Z = (sqrtE - self.data[i_pole, _MP_EA]) * dopp
w_val = _faddeeva(Z) * dopp * invE * sqrt(pi)
sig_s += (self.data[i_pole, _MP_RS] * w_val).real
sig_a += (self.data[i_pole, _MP_RA] * w_val).real
if self.fissionable:
sig_f += (self.data[i_pole, _MP_RF] * w_val).real
return sig_s, sig_a, sig_f
def __call__(self, E, T):
"""Compute scattering, absorption, and fission cross sections.
Parameters
----------
E : Real or Iterable of Real
Energy of the incident neutron in eV.
T : Real
Temperature of the target in K.
Returns
-------
3-tuple of Real or 3-tuple of numpy.ndarray
Total, absorption, and fission microscopic cross sections at the
given energy and temperature.
"""
fun = np.vectorize(lambda x: self._evaluate(x, T))
return fun(E)
[docs] def export_to_hdf5(self, path, mode='a', libver='earliest'):
"""Export windowed multipole data to an HDF5 file.
Parameters
----------
path : str
Path to write HDF5 file to
mode : {'r+', 'w', 'x', 'a'}
Mode that is used to open the HDF5 file. This is the second argument
to the :class:`h5py.File` constructor.
libver : {'earliest', 'latest'}
Compatibility mode for the HDF5 file. 'latest' will produce files
that are less backwards compatible but have performance benefits.
"""
# Open file and write version.
with h5py.File(str(path), mode, libver=libver) as f:
f.attrs['filetype'] = np.string_('data_wmp')
f.attrs['version'] = np.array(WMP_VERSION)
g = f.create_group(self.name)
# Write scalars.
g.create_dataset('spacing', data=np.array(self.spacing))
g.create_dataset('sqrtAWR', data=np.array(self.sqrtAWR))
g.create_dataset('E_min', data=np.array(self.E_min))
g.create_dataset('E_max', data=np.array(self.E_max))
# Write arrays.
g.create_dataset('data', data=self.data)
g.create_dataset('windows', data=self.windows)
g.create_dataset('broaden_poly',
data=self.broaden_poly.astype(np.int8))
g.create_dataset('curvefit', data=self.curvefit)