from typing import List
from typing import NamedTuple
from typing import Optional
import numpy
from PyMca5.PyMcaMath.fitting import Gefit
from PyMca5.PyMcaMath.fitting import SpecfitFuns
from PyMca5.PyMcaPhysics.xrf.ClassMcaTheory import ClassMcaTheory
from ..pymca_config import PyMcaXrfConfiguration
from ..pymca_config import pymca_configdict_from_model
from ..pymca_config import pymca_configdict_to_model
from ._utils.calibration import channel_info
from ._utils.calibration import energy_info
from ._utils.concentrations import calculate_massfractions
from ._utils.limits import expand_data
from ._utils.linear_fit import fit_profiles
from .types import XRFBatchFitResult
[docs]
def fit_xrf_spectra(
xrf_spectra: numpy.ndarray,
configuration: PyMcaXrfConfiguration,
quantification: Optional[bool] = None,
individual_weights: Optional[bool] = None,
live_times: Optional[numpy.ndarray] = None,
positive_peak_areas: Optional[bool] = None,
diagnostics: Optional[bool] = None,
) -> XRFBatchFitResult:
"""Fast fitting refers to linear fitting of several spectra by solving a single
system of equations. This function does not use the high-level API from PyMca.
It uses the low-level API to
- calculate elemental spectra
- calculate mass fractions from fit results
- strip spectral background
- and perform linear least-squares fit.
:param xrf_spectra: shape `(num_spectra, num_channels)`
:param configuration: PyMca configuration.
:param quantification: Calculate mass fractions from peak area's.
:param individual_weights: When fitting with weights, use the weight of each
spectrum (slow) instead of the average weight.
:param live_times: 1D array with one value for each XRF spectrum.
Only used for mass fractions with fundamental parameters.
:param positive_peak_areas:
:param diagnostics: fit model and residuals.
"""
if quantification is None:
quantification = True
if individual_weights is None:
individual_weights = True
if positive_peak_areas is None:
positive_peak_areas = True
if diagnostics is None:
diagnostics = False
if xrf_spectra.ndim != 2:
raise ValueError("XRF spectra must be a 2D array (num_spectra x num_channels)")
if live_times is not None:
if len(live_times) != len(xrf_spectra):
raise ValueError(
"Number live time values is not equal to the number of XRF spectra"
)
if diagnostics:
spectra = xrf_spectra
else:
spectra = None
# Effective live time normalization
useautotime = bool(configuration.concentrations.useautotime)
usematrix = bool(configuration.concentrations.usematrix)
autotime = quantification and useautotime and not usematrix
if autotime:
norm_factors = configuration.concentrations.time / live_times
else:
norm_factors = 1
configuration.concentrations.useautotime = 0
# Initialize McaTheory instance
mca_theory = ClassMcaTheory()
mca_theory.enableOptimizedLinearFit()
configuration.fit.linearfitflag = 1
configuration = pymca_configdict_to_model(
mca_theory.configure(pymca_configdict_from_model(configuration))
)
fixed_spectrum_stdev = _set_reference_spectrum(
mca_theory, xrf_spectra, configuration, individual_weights
)
# Apply channel limits
channels, raw_channels = channel_info(mca_theory)
num_fit_observations = len(channels)
num_raw_observations = len(raw_channels)
if num_fit_observations != num_raw_observations:
xrf_spectra = numpy.take(xrf_spectra, channels, axis=1)
if fixed_spectrum_stdev is not None:
fixed_spectrum_stdev = numpy.take(fixed_spectrum_stdev, channels, axis=0)
# Calculate the linear least-squares model matrix
mca_theory.estimate()
parameter_info_list = _free_linear_parameters(mca_theory)
derivatives = _least_squares_model_matrix(
mca_theory, parameter_info_list
) # shape (num_channels, num_lin_params)
# Calculate the background of all XRF spectra
anchor_indices = _background_anchor_indices(mca_theory, xrf_spectra, configuration)
background = _calculate_background(xrf_spectra, configuration, anchor_indices)
# Subtract background
if background is not None:
# Do not subtract in-place for the caller's sake and for type conversion
xrf_spectra = xrf_spectra - background
# Fit all XRF spectra
peak_area_indices = [
p.fit_parameter_index for p in parameter_info_list if p.is_peak_area
]
positive_constraint = peak_area_indices if positive_peak_areas else None
parameters, uncertainties, num_free_parameters = fit_profiles(
derivatives,
xrf_spectra.T,
positive_constraint=positive_constraint,
weight=configuration.fit.fitweight,
profiles_stdev=fixed_spectrum_stdev,
)
# Model and residuals
parameter_names = [p.name for p in parameter_info_list]
if diagnostics:
model = derivatives.dot(parameters).T # shape (num_spectra, num_channels)
residuals = xrf_spectra - model
if background is not None:
model += background
else:
spectra = None
model = None
residuals = None
# Quantification
if quantification:
(
mass_fractions,
mass_fractions_per_layer,
mass_fraction_names,
quantification_parameters,
) = calculate_massfractions(
mca_theory,
parameters[peak_area_indices],
[parameter_info_list[i].name for i in peak_area_indices],
configuration,
norm_factors,
)
else:
mass_fractions = None
mass_fractions_per_layer = None
mass_fraction_names = None
quantification_parameters = None
# To be added in the future:
# - error propagation of quantification
# - areal density ng/mm² https://sourceforge.net/p/pymca/mailman/message/37039583/
# - detection limits
derivatives = derivatives.T # shape (num_lin_params, num_channels)
if len(channels) != num_raw_observations:
# Expand with NaN's so the model, residuals and base profiles match to
# orginal number of observations.
if diagnostics:
model = expand_data(channels, model, num_raw_observations, axis=1)
residuals = expand_data(channels, residuals, num_raw_observations, axis=1)
derivatives = expand_data(channels, derivatives, num_raw_observations, axis=1)
energies = energy_info(mca_theory, channels)
raw_energies = energy_info(mca_theory, raw_channels)
return XRFBatchFitResult(
parameters=parameters,
uncertainties=uncertainties,
derivatives=derivatives,
parameter_names=parameter_names,
channels=channels,
energies=energies,
raw_channels=raw_channels,
raw_energies=raw_energies,
spectra=spectra,
model=model,
residuals=residuals,
mass_fractions=mass_fractions,
mass_fractions_per_layer=mass_fractions_per_layer,
mass_fraction_names=mass_fraction_names,
quantification_parameters=quantification_parameters,
)
class _FreeLinearParameter(NamedTuple):
name: str
parameter_index: int # index in all parameters (linear/non-linear/fixed/free)
fit_parameter_index: int # index in free linear parameters
is_peak_area: bool # area of a peak group (e.g. Fe-K lines)
def _free_linear_parameters(mca_theory: ClassMcaTheory) -> List[_FreeLinearParameter]:
parameters = []
first_peak_area_index = mca_theory.NGLOBAL
fit_parameter_index = 0
for parameter_index, (name, code) in enumerate(
zip(mca_theory.PARAMETERS, mca_theory.codes[0])
):
if code == Gefit.CFIXED:
continue
# Not fixed means free and linear
is_peak_area = parameter_index >= first_peak_area_index
param = _FreeLinearParameter(
name=name,
parameter_index=parameter_index,
fit_parameter_index=fit_parameter_index,
is_peak_area=is_peak_area,
)
parameters.append(param)
fit_parameter_index += 1
return parameters
def _least_squares_model_matrix(
mca_theory: ClassMcaTheory, parameter_info_list: List[_FreeLinearParameter]
) -> numpy.ndarray:
"""Returns shape (num_channels, num_free_param)."""
derivatives = numpy.empty(
(len(mca_theory.xdata), len(parameter_info_list)), dtype=numpy.float64
)
for param in parameter_info_list:
param_deriv = mca_theory.linearMcaTheoryDerivative(
mca_theory.parameters, param.parameter_index, mca_theory.xdata
)
derivatives[:, param.fit_parameter_index] = param_deriv
return derivatives
def _calculate_background(
xrf_spectra: numpy.ndarray,
configuration: PyMcaXrfConfiguration,
anchor_indices: List[int],
) -> Optional[numpy.ndarray]:
stripflag = configuration.fit.stripflag
if not stripflag:
return
stripalgorithm = configuration.fit.stripalgorithm
if stripalgorithm == 1:
return _calculate_snip_background(xrf_spectra, configuration, anchor_indices)
raise NotImplementedError(
f"Background {stripalgorithm=} is not implemented for fast fitting"
)
def _calculate_snip_background(
xrf_spectra: numpy.ndarray,
configuration: PyMcaXrfConfiguration,
anchor_indices: List[int],
) -> numpy.ndarray:
if xrf_spectra.size == 0:
return numpy.zeros_like(xrf_spectra)
smooth_width = configuration.fit.stripfilterwidth
snip_width = configuration.fit.snipwidth
background = None
for i, xrf_spectrum in enumerate(xrf_spectra):
backgroundi = SpecfitFuns.SavitskyGolay(xrf_spectrum, smooth_width)
last_anchor_index = 0
for anchor_index in anchor_indices:
if (anchor_index > last_anchor_index) and (anchor_index < backgroundi.size):
backgroundi[last_anchor_index:anchor_index] = SpecfitFuns.snip1d(
backgroundi[last_anchor_index:anchor_index], snip_width, 0
)
last_anchor_index = anchor_index
if last_anchor_index < backgroundi.size:
backgroundi[last_anchor_index:] = SpecfitFuns.snip1d(
backgroundi[last_anchor_index:], snip_width, 0
)
if i == 0:
background = numpy.empty(dtype=backgroundi.dtype, shape=xrf_spectra.shape)
background[i] = backgroundi
return background
def _background_anchor_indices(
mca_theory: ClassMcaTheory,
xrf_spectra: numpy.ndarray,
configuration: PyMcaXrfConfiguration,
) -> List[int]:
"""Get anchors for background subtraction"""
if not configuration.fit.stripflag:
return []
channels = mca_theory.xdata[:, 0]
anchor_indices = []
if configuration.fit.stripanchorsflag:
if configuration.fit.stripanchorslist is not None:
for channel in configuration.fit.stripanchorslist:
if channel <= channels[0]:
continue
index = numpy.nonzero(channels >= channel)[0]
if len(index):
index = min(index)
if index > 0:
anchor_indices.append(index)
if len(anchor_indices) == 0:
anchor_indices = [0, xrf_spectra.shape[1] - 1]
return sorted(anchor_indices)
def _set_reference_spectrum(
mca_theory: ClassMcaTheory,
xrf_spectra: numpy.ndarray,
configuration: PyMcaXrfConfiguration,
individual_weights: bool,
) -> Optional[numpy.ndarray]:
"""Returns the Least-Squares parameters for fit all XRF spectra."""
fitweight = configuration.fit.fitweight
fixed_spectrum_stdev = None
if fitweight == 0:
# No weights
ref_spectrum = xrf_spectra[0]
mca_theory.setData(y=ref_spectrum)
return fixed_spectrum_stdev
if fitweight == 1:
# Poisson weights
if individual_weights:
# Per-pixel Poisson weights
ref_spectrum = xrf_spectra[0]
mca_theory.setData(y=ref_spectrum)
return fixed_spectrum_stdev
# Average Poisson weights
sum_spectrum = xrf_spectra.sum(axis=0)
mca_theory.setData(y=sum_spectrum)
fixed_spectrum_stdev = numpy.sqrt(sum_spectrum) / len(xrf_spectra) + 1
return fixed_spectrum_stdev
raise NotImplementedError(
f"Least-Squares parameter {fitweight=} is not implemented for fast fitting"
)