Source code for ewoksfluo.xrffit.fit.fast_fit

from typing import List
from typing import NamedTuple
from typing import Optional

import numpy
from PyMca5.PyMcaMath.fitting import Gefit
from PyMca5.PyMcaMath.fitting import SpecfitFuns
from PyMca5.PyMcaPhysics.xrf.ClassMcaTheory import ClassMcaTheory

from ..pymca_config import PyMcaXrfConfiguration
from ..pymca_config import pymca_configdict_from_model
from ..pymca_config import pymca_configdict_to_model
from ._utils.calibration import channel_info
from ._utils.calibration import energy_info
from ._utils.concentrations import calculate_massfractions
from ._utils.limits import expand_data
from ._utils.linear_fit import fit_profiles
from .types import XRFBatchFitResult


[docs] def fit_xrf_spectra( xrf_spectra: numpy.ndarray, configuration: PyMcaXrfConfiguration, quantification: Optional[bool] = None, individual_weights: Optional[bool] = None, live_times: Optional[numpy.ndarray] = None, positive_peak_areas: Optional[bool] = None, diagnostics: Optional[bool] = None, ) -> XRFBatchFitResult: """Fast fitting refers to linear fitting of several spectra by solving a single system of equations. This function does not use the high-level API from PyMca. It uses the low-level API to - calculate elemental spectra - calculate mass fractions from fit results - strip spectral background - and perform linear least-squares fit. :param xrf_spectra: shape `(num_spectra, num_channels)` :param configuration: PyMca configuration. :param quantification: Calculate mass fractions from peak area's. :param individual_weights: When fitting with weights, use the weight of each spectrum (slow) instead of the average weight. :param live_times: 1D array with one value for each XRF spectrum. Only used for mass fractions with fundamental parameters. :param positive_peak_areas: :param diagnostics: fit model and residuals. """ if quantification is None: quantification = True if individual_weights is None: individual_weights = True if positive_peak_areas is None: positive_peak_areas = True if diagnostics is None: diagnostics = False if xrf_spectra.ndim != 2: raise ValueError("XRF spectra must be a 2D array (num_spectra x num_channels)") if live_times is not None: if len(live_times) != len(xrf_spectra): raise ValueError( "Number live time values is not equal to the number of XRF spectra" ) if diagnostics: spectra = xrf_spectra else: spectra = None # Effective live time normalization useautotime = bool(configuration.concentrations.useautotime) usematrix = bool(configuration.concentrations.usematrix) autotime = quantification and useautotime and not usematrix if autotime: norm_factors = configuration.concentrations.time / live_times else: norm_factors = 1 configuration.concentrations.useautotime = 0 # Initialize McaTheory instance mca_theory = ClassMcaTheory() mca_theory.enableOptimizedLinearFit() configuration.fit.linearfitflag = 1 configuration = pymca_configdict_to_model( mca_theory.configure(pymca_configdict_from_model(configuration)) ) fixed_spectrum_stdev = _set_reference_spectrum( mca_theory, xrf_spectra, configuration, individual_weights ) # Apply channel limits channels, raw_channels = channel_info(mca_theory) num_fit_observations = len(channels) num_raw_observations = len(raw_channels) if num_fit_observations != num_raw_observations: xrf_spectra = numpy.take(xrf_spectra, channels, axis=1) if fixed_spectrum_stdev is not None: fixed_spectrum_stdev = numpy.take(fixed_spectrum_stdev, channels, axis=0) # Calculate the linear least-squares model matrix mca_theory.estimate() parameter_info_list = _free_linear_parameters(mca_theory) derivatives = _least_squares_model_matrix( mca_theory, parameter_info_list ) # shape (num_channels, num_lin_params) # Calculate the background of all XRF spectra anchor_indices = _background_anchor_indices(mca_theory, xrf_spectra, configuration) background = _calculate_background(xrf_spectra, configuration, anchor_indices) # Subtract background if background is not None: # Do not subtract in-place for the caller's sake and for type conversion xrf_spectra = xrf_spectra - background # Fit all XRF spectra peak_area_indices = [ p.fit_parameter_index for p in parameter_info_list if p.is_peak_area ] positive_constraint = peak_area_indices if positive_peak_areas else None parameters, uncertainties, num_free_parameters = fit_profiles( derivatives, xrf_spectra.T, positive_constraint=positive_constraint, weight=configuration.fit.fitweight, profiles_stdev=fixed_spectrum_stdev, ) # Model and residuals parameter_names = [p.name for p in parameter_info_list] if diagnostics: model = derivatives.dot(parameters).T # shape (num_spectra, num_channels) residuals = xrf_spectra - model if background is not None: model += background else: spectra = None model = None residuals = None # Quantification if quantification: ( mass_fractions, mass_fractions_per_layer, mass_fraction_names, quantification_parameters, ) = calculate_massfractions( mca_theory, parameters[peak_area_indices], [parameter_info_list[i].name for i in peak_area_indices], configuration, norm_factors, ) else: mass_fractions = None mass_fractions_per_layer = None mass_fraction_names = None quantification_parameters = None # To be added in the future: # - error propagation of quantification # - areal density ng/mm² https://sourceforge.net/p/pymca/mailman/message/37039583/ # - detection limits derivatives = derivatives.T # shape (num_lin_params, num_channels) if len(channels) != num_raw_observations: # Expand with NaN's so the model, residuals and base profiles match to # orginal number of observations. if diagnostics: model = expand_data(channels, model, num_raw_observations, axis=1) residuals = expand_data(channels, residuals, num_raw_observations, axis=1) derivatives = expand_data(channels, derivatives, num_raw_observations, axis=1) energies = energy_info(mca_theory, channels) raw_energies = energy_info(mca_theory, raw_channels) return XRFBatchFitResult( parameters=parameters, uncertainties=uncertainties, derivatives=derivatives, parameter_names=parameter_names, channels=channels, energies=energies, raw_channels=raw_channels, raw_energies=raw_energies, spectra=spectra, model=model, residuals=residuals, mass_fractions=mass_fractions, mass_fractions_per_layer=mass_fractions_per_layer, mass_fraction_names=mass_fraction_names, quantification_parameters=quantification_parameters, )
class _FreeLinearParameter(NamedTuple): name: str parameter_index: int # index in all parameters (linear/non-linear/fixed/free) fit_parameter_index: int # index in free linear parameters is_peak_area: bool # area of a peak group (e.g. Fe-K lines) def _free_linear_parameters(mca_theory: ClassMcaTheory) -> List[_FreeLinearParameter]: parameters = [] first_peak_area_index = mca_theory.NGLOBAL fit_parameter_index = 0 for parameter_index, (name, code) in enumerate( zip(mca_theory.PARAMETERS, mca_theory.codes[0]) ): if code == Gefit.CFIXED: continue # Not fixed means free and linear is_peak_area = parameter_index >= first_peak_area_index param = _FreeLinearParameter( name=name, parameter_index=parameter_index, fit_parameter_index=fit_parameter_index, is_peak_area=is_peak_area, ) parameters.append(param) fit_parameter_index += 1 return parameters def _least_squares_model_matrix( mca_theory: ClassMcaTheory, parameter_info_list: List[_FreeLinearParameter] ) -> numpy.ndarray: """Returns shape (num_channels, num_free_param).""" derivatives = numpy.empty( (len(mca_theory.xdata), len(parameter_info_list)), dtype=numpy.float64 ) for param in parameter_info_list: param_deriv = mca_theory.linearMcaTheoryDerivative( mca_theory.parameters, param.parameter_index, mca_theory.xdata ) derivatives[:, param.fit_parameter_index] = param_deriv return derivatives def _calculate_background( xrf_spectra: numpy.ndarray, configuration: PyMcaXrfConfiguration, anchor_indices: List[int], ) -> Optional[numpy.ndarray]: stripflag = configuration.fit.stripflag if not stripflag: return stripalgorithm = configuration.fit.stripalgorithm if stripalgorithm == 1: return _calculate_snip_background(xrf_spectra, configuration, anchor_indices) raise NotImplementedError( f"Background {stripalgorithm=} is not implemented for fast fitting" ) def _calculate_snip_background( xrf_spectra: numpy.ndarray, configuration: PyMcaXrfConfiguration, anchor_indices: List[int], ) -> numpy.ndarray: if xrf_spectra.size == 0: return numpy.zeros_like(xrf_spectra) smooth_width = configuration.fit.stripfilterwidth snip_width = configuration.fit.snipwidth background = None for i, xrf_spectrum in enumerate(xrf_spectra): backgroundi = SpecfitFuns.SavitskyGolay(xrf_spectrum, smooth_width) last_anchor_index = 0 for anchor_index in anchor_indices: if (anchor_index > last_anchor_index) and (anchor_index < backgroundi.size): backgroundi[last_anchor_index:anchor_index] = SpecfitFuns.snip1d( backgroundi[last_anchor_index:anchor_index], snip_width, 0 ) last_anchor_index = anchor_index if last_anchor_index < backgroundi.size: backgroundi[last_anchor_index:] = SpecfitFuns.snip1d( backgroundi[last_anchor_index:], snip_width, 0 ) if i == 0: background = numpy.empty(dtype=backgroundi.dtype, shape=xrf_spectra.shape) background[i] = backgroundi return background def _background_anchor_indices( mca_theory: ClassMcaTheory, xrf_spectra: numpy.ndarray, configuration: PyMcaXrfConfiguration, ) -> List[int]: """Get anchors for background subtraction""" if not configuration.fit.stripflag: return [] channels = mca_theory.xdata[:, 0] anchor_indices = [] if configuration.fit.stripanchorsflag: if configuration.fit.stripanchorslist is not None: for channel in configuration.fit.stripanchorslist: if channel <= channels[0]: continue index = numpy.nonzero(channels >= channel)[0] if len(index): index = min(index) if index > 0: anchor_indices.append(index) if len(anchor_indices) == 0: anchor_indices = [0, xrf_spectra.shape[1] - 1] return sorted(anchor_indices) def _set_reference_spectrum( mca_theory: ClassMcaTheory, xrf_spectra: numpy.ndarray, configuration: PyMcaXrfConfiguration, individual_weights: bool, ) -> Optional[numpy.ndarray]: """Returns the Least-Squares parameters for fit all XRF spectra.""" fitweight = configuration.fit.fitweight fixed_spectrum_stdev = None if fitweight == 0: # No weights ref_spectrum = xrf_spectra[0] mca_theory.setData(y=ref_spectrum) return fixed_spectrum_stdev if fitweight == 1: # Poisson weights if individual_weights: # Per-pixel Poisson weights ref_spectrum = xrf_spectra[0] mca_theory.setData(y=ref_spectrum) return fixed_spectrum_stdev # Average Poisson weights sum_spectrum = xrf_spectra.sum(axis=0) mca_theory.setData(y=sum_spectrum) fixed_spectrum_stdev = numpy.sqrt(sum_spectrum) / len(xrf_spectra) + 1 return fixed_spectrum_stdev raise NotImplementedError( f"Least-Squares parameter {fitweight=} is not implemented for fast fitting" )