Source code for ewoksfluo.tests.fit.test_core_fit

from contextlib import contextmanager
from typing import Any
from typing import Dict
from typing import Generator
from typing import Literal
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union

import numpy
import pytest

from ...xrffit.fit import fit_xrf_spectra
from ...xrffit.fit.types import NotAvailable
from ...xrffit.fit.types import XRFBatchFitResult
from ..utils import generate


[docs] @pytest.mark.parametrize("fast_fitting", [False, True], ids=["slow", "fast"]) @pytest.mark.parametrize("use_limit", [False, True], ids=["wo_limit", "w_limit"]) @pytest.mark.parametrize( "quantification", [None, "fundamental", "auto", "ref"], ids=["quant_no", "quant_fundamental", "quant_auto", "quant_ref"], ) @pytest.mark.parametrize( "positive_peak_areas", [False, True], ids=["no_refit", "refit"] ) @pytest.mark.parametrize( "error_propagation", [None, "average", "individual"], ids=["error_no", "error_avg", "error_ind"], ) @pytest.mark.parametrize( "strip_background", [None, 0, 1], ids=["bkg_no", "bkg_strip", "bkg_snip"] ) @pytest.mark.parametrize("live_times", [False, True], ids=["no_LT", "LT"]) @pytest.mark.parametrize( "multilayer", [False, True], ids=["single_layer", "multilayer"] ) @pytest.mark.parametrize("legacy", [False, True], ids=["pymca5", "pymca5_legacy"]) def test_fit_versus_native_fit( fast_fitting: bool, use_limit: bool, quantification: Optional[str], positive_peak_areas: bool, error_propagation: Optional[str], strip_background: Optional[Literal[0, 1]], live_times: bool, multilayer: bool, legacy: bool, ): """ :param fast_fitting: fast fitting (forced linear) or slow fitting :param use_limit: fit the entire XRF spectrum or a ROI :param quantification: calulate concentrations from fitted peak areas :param positive_peak_areas: constraint peak areas to be `>=0` :param error_propagation: no error propagation, Poisson noise from average spectrum, Poisson noise for each spectrum separately) :param strip_background: no background stripping, strip, snip :param live_times: include live time normalization :param multilayer: the sample matrix is a "multilayer" :param legacy: legacy high-level native PyMca5 API for fitting """ if not fast_fitting: if not positive_peak_areas: pytest.skip("Slow fitting always needs positive constraints") if live_times: pytest.skip("Slow fitting does not have live-time normalization integrated") if error_propagation == "average": pytest.skip("Slow fitting always needs individual weights") if strip_background == 0 and fast_fitting: expected_error = NotImplementedError expected_native_error = RuntimeError else: expected_error = None expected_native_error = None if multilayer: num_layers = 2 else: num_layers = 1 with _raises(expected_native_error) as has_native_result: native_results = _fit_xrf_spectra( fast_fitting=fast_fitting, use_limit=use_limit, quantification=quantification, error_propagation=error_propagation, positive_peak_areas=positive_peak_areas, strip_background=strip_background, live_times=live_times, native_fitting=True, native_legacy_fitting=legacy, num_layers=num_layers, ) with _raises(expected_error) as has_result: results = _fit_xrf_spectra( fast_fitting=fast_fitting, use_limit=use_limit, quantification=quantification, error_propagation=error_propagation, positive_peak_areas=positive_peak_areas, strip_background=strip_background, live_times=live_times, native_fitting=False, native_legacy_fitting=False, num_layers=num_layers, ) assert has_result == has_native_result if not has_result: return if use_limit: _assert_channel_size(results) _assert_channel_size(native_results) results, compare_options = _prepare_native_compare_fitresults( results, native_results, fast_fitting, native_results ) _compare_data(results, native_results, **compare_options)
def _assert_channel_size(results: XRFBatchFitResult) -> None: names = ["channels", "energies"] for name in names: if results.has_data(name): assert getattr(results, name).shape[-1] == 1021, name names = [ "raw_channels", "raw_energies", "derivatives", "spectra", "model", "residuals", ] for name in names: if results.has_data(name): assert getattr(results, name).shape[-1] == 1024, name def _prepare_native_compare_fitresults( results: XRFBatchFitResult, native_results: XRFBatchFitResult, fast_fitting: bool, legacy: bool, ) -> Tuple[XRFBatchFitResult, Dict[str, Union[float, bool, None]]]: if legacy: # Stores results in float32 compare_options = dict(float_atol=1e-5, float_rtol=1e-2, float_as_32bit=True) else: compare_options = dict(float_atol=None, float_rtol=None, float_as_32bit=False) if not fast_fitting: # Native slow fitting does not send non-linear fit parameters to the output buffer new_names = results.parameter_names native_names = native_results.parameter_names idx = [i for i, name in enumerate(new_names) if name in native_names] results = type(results)( **{ **results._asdict(), "parameter_names": native_names, "parameters": results.parameters[idx], "uncertainties": results.uncertainties[idx], } ) # Stores results in float32 compare_options["float_as_32bit"] = True compare_options["float_rtol"] = 1e-1 # TODO: 10% is too much return results, compare_options def _fit_xrf_spectra( fast_fitting: bool, use_limit: bool, quantification: Optional[str], error_propagation: str, positive_peak_areas: bool, strip_background: Optional[str], live_times: bool, native_fitting: bool, native_legacy_fitting: bool, num_layers: int, ) -> numpy.ndarray: if positive_peak_areas: points_with_negative_peaks = (1,) else: points_with_negative_peaks = tuple() if error_propagation is None: fitweight = False individual_weights = None else: fitweight = True if error_propagation == "individual": individual_weights = True elif error_propagation == "average": individual_weights = False else: raise ValueError(error_propagation) if strip_background is None: stripflag = False stripalgorithm = 0 # Does not matter else: stripflag = True stripalgorithm = strip_background _, xrf_spectra, _, configuration = generate.generate_data( npoints_per_scan=2, energy=7.5, points_with_negative_peaks=points_with_negative_peaks, quantification=quantification, num_layers=num_layers, ) configuration = configuration.model_copy(deep=True) xrf_spectra = xrf_spectra[0] configuration.fit.use_limit = int(use_limit) configuration.fit.fitweight = int(fitweight) configuration.fit.stripflag = int(stripflag) configuration.fit.stripalgorithm = int(stripalgorithm) if live_times: # time in pymca config is 1 sec live_times = numpy.full((len(xrf_spectra),), 0.5) configuration.concentrations.useautotime = 1 else: live_times = None configuration.concentrations.useautotime = 0 original_xrf_spectra = xrf_spectra.copy() results = fit_xrf_spectra( xrf_spectra, configuration, quantification=bool(quantification), individual_weights=individual_weights, live_times=live_times, positive_peak_areas=positive_peak_areas, diagnostics=True, fast_fitting=fast_fitting, native_fitting=native_fitting, native_legacy_fitting=native_legacy_fitting, ) numpy.testing.assert_array_equal(xrf_spectra, original_xrf_spectra) return results @contextmanager def _raises(exc_cls: Optional[Type[Exception]]) -> Generator[bool, None, None]: try: if exc_cls: with pytest.raises(exc_cls): yield False else: yield True except NotImplementedError as ex: pytest.skip(str(ex)) def _compare_data( actual: Any, desired: Any, _name: Optional[str] = "<root>", float_atol: Optional[float] = None, float_rtol: Optional[float] = None, float_as_32bit: bool = False, ) -> None: err_msg = f"{_name!r} is different" if isinstance(desired, NotAvailable): return assert type(actual) is type(desired), err_msg if isinstance(desired, dict): for k, v in desired.items(): _compare_data( actual[k], v, _name=f"{_name}.{k}", float_atol=float_atol, float_rtol=float_rtol, float_as_32bit=float_as_32bit, ) elif isinstance(desired, numpy.ndarray): assert actual.shape == desired.shape, err_msg atol = 1e-10 rtol = 1e-7 if numpy.issubdtype(desired.dtype, numpy.floating): if float_atol is not None: atol = float_atol if float_rtol is not None: rtol = float_rtol if float_as_32bit: with numpy.errstate(over="ignore"): # Stores results in float32 actual = actual.astype(numpy.float32) desired = desired.astype(numpy.float32) # abs(actual - desired) < atol + rtol * abs(desired) numpy.testing.assert_allclose( actual, desired, err_msg=err_msg, atol=atol, rtol=rtol ) elif isinstance(desired, (tuple, list)): assert len(actual) == len(desired), err_msg if hasattr(desired, "_fields"): names = desired._fields else: names = list(range(len(actual))) for name, v1, v2 in zip(names, actual, desired): _compare_data( v1, v2, _name=f"{_name}[{name}]", float_atol=float_atol, float_rtol=float_rtol, float_as_32bit=float_as_32bit, ) else: assert actual == desired, err_msg