from contextlib import contextmanager
from typing import Any
from typing import Dict
from typing import Generator
from typing import Literal
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union
import numpy
import pytest
from ...xrffit.fit import fit_xrf_spectra
from ...xrffit.fit.types import NotAvailable
from ...xrffit.fit.types import XRFBatchFitResult
from ..utils import generate
[docs]
@pytest.mark.parametrize("fast_fitting", [False, True], ids=["slow", "fast"])
@pytest.mark.parametrize("use_limit", [False, True], ids=["wo_limit", "w_limit"])
@pytest.mark.parametrize(
"quantification",
[None, "fundamental", "auto", "ref"],
ids=["quant_no", "quant_fundamental", "quant_auto", "quant_ref"],
)
@pytest.mark.parametrize(
"positive_peak_areas", [False, True], ids=["no_refit", "refit"]
)
@pytest.mark.parametrize(
"error_propagation",
[None, "average", "individual"],
ids=["error_no", "error_avg", "error_ind"],
)
@pytest.mark.parametrize(
"strip_background", [None, 0, 1], ids=["bkg_no", "bkg_strip", "bkg_snip"]
)
@pytest.mark.parametrize("live_times", [False, True], ids=["no_LT", "LT"])
@pytest.mark.parametrize(
"multilayer", [False, True], ids=["single_layer", "multilayer"]
)
@pytest.mark.parametrize("legacy", [False, True], ids=["pymca5", "pymca5_legacy"])
def test_fit_versus_native_fit(
fast_fitting: bool,
use_limit: bool,
quantification: Optional[str],
positive_peak_areas: bool,
error_propagation: Optional[str],
strip_background: Optional[Literal[0, 1]],
live_times: bool,
multilayer: bool,
legacy: bool,
):
"""
:param fast_fitting: fast fitting (forced linear) or slow fitting
:param use_limit: fit the entire XRF spectrum or a ROI
:param quantification: calulate concentrations from fitted peak areas
:param positive_peak_areas: constraint peak areas to be `>=0`
:param error_propagation: no error propagation, Poisson noise from average spectrum, Poisson noise for each spectrum separately)
:param strip_background: no background stripping, strip, snip
:param live_times: include live time normalization
:param multilayer: the sample matrix is a "multilayer"
:param legacy: legacy high-level native PyMca5 API for fitting
"""
if not fast_fitting:
if not positive_peak_areas:
pytest.skip("Slow fitting always needs positive constraints")
if live_times:
pytest.skip("Slow fitting does not have live-time normalization integrated")
if error_propagation == "average":
pytest.skip("Slow fitting always needs individual weights")
if strip_background == 0 and fast_fitting:
expected_error = NotImplementedError
expected_native_error = RuntimeError
else:
expected_error = None
expected_native_error = None
if multilayer:
num_layers = 2
else:
num_layers = 1
with _raises(expected_native_error) as has_native_result:
native_results = _fit_xrf_spectra(
fast_fitting=fast_fitting,
use_limit=use_limit,
quantification=quantification,
error_propagation=error_propagation,
positive_peak_areas=positive_peak_areas,
strip_background=strip_background,
live_times=live_times,
native_fitting=True,
native_legacy_fitting=legacy,
num_layers=num_layers,
)
with _raises(expected_error) as has_result:
results = _fit_xrf_spectra(
fast_fitting=fast_fitting,
use_limit=use_limit,
quantification=quantification,
error_propagation=error_propagation,
positive_peak_areas=positive_peak_areas,
strip_background=strip_background,
live_times=live_times,
native_fitting=False,
native_legacy_fitting=False,
num_layers=num_layers,
)
assert has_result == has_native_result
if not has_result:
return
if use_limit:
_assert_channel_size(results)
_assert_channel_size(native_results)
results, compare_options = _prepare_native_compare_fitresults(
results, native_results, fast_fitting, native_results
)
_compare_data(results, native_results, **compare_options)
def _assert_channel_size(results: XRFBatchFitResult) -> None:
names = ["channels", "energies"]
for name in names:
if results.has_data(name):
assert getattr(results, name).shape[-1] == 1021, name
names = [
"raw_channels",
"raw_energies",
"derivatives",
"spectra",
"model",
"residuals",
]
for name in names:
if results.has_data(name):
assert getattr(results, name).shape[-1] == 1024, name
def _prepare_native_compare_fitresults(
results: XRFBatchFitResult,
native_results: XRFBatchFitResult,
fast_fitting: bool,
legacy: bool,
) -> Tuple[XRFBatchFitResult, Dict[str, Union[float, bool, None]]]:
if legacy:
# Stores results in float32
compare_options = dict(float_atol=1e-5, float_rtol=1e-2, float_as_32bit=True)
else:
compare_options = dict(float_atol=None, float_rtol=None, float_as_32bit=False)
if not fast_fitting:
# Native slow fitting does not send non-linear fit parameters to the output buffer
new_names = results.parameter_names
native_names = native_results.parameter_names
idx = [i for i, name in enumerate(new_names) if name in native_names]
results = type(results)(
**{
**results._asdict(),
"parameter_names": native_names,
"parameters": results.parameters[idx],
"uncertainties": results.uncertainties[idx],
}
)
# Stores results in float32
compare_options["float_as_32bit"] = True
compare_options["float_rtol"] = 1e-1 # TODO: 10% is too much
return results, compare_options
def _fit_xrf_spectra(
fast_fitting: bool,
use_limit: bool,
quantification: Optional[str],
error_propagation: str,
positive_peak_areas: bool,
strip_background: Optional[str],
live_times: bool,
native_fitting: bool,
native_legacy_fitting: bool,
num_layers: int,
) -> numpy.ndarray:
if positive_peak_areas:
points_with_negative_peaks = (1,)
else:
points_with_negative_peaks = tuple()
if error_propagation is None:
fitweight = False
individual_weights = None
else:
fitweight = True
if error_propagation == "individual":
individual_weights = True
elif error_propagation == "average":
individual_weights = False
else:
raise ValueError(error_propagation)
if strip_background is None:
stripflag = False
stripalgorithm = 0 # Does not matter
else:
stripflag = True
stripalgorithm = strip_background
_, xrf_spectra, _, configuration = generate.generate_data(
npoints_per_scan=2,
energy=7.5,
points_with_negative_peaks=points_with_negative_peaks,
quantification=quantification,
num_layers=num_layers,
)
configuration = configuration.model_copy(deep=True)
xrf_spectra = xrf_spectra[0]
configuration.fit.use_limit = int(use_limit)
configuration.fit.fitweight = int(fitweight)
configuration.fit.stripflag = int(stripflag)
configuration.fit.stripalgorithm = int(stripalgorithm)
if live_times:
# time in pymca config is 1 sec
live_times = numpy.full((len(xrf_spectra),), 0.5)
configuration.concentrations.useautotime = 1
else:
live_times = None
configuration.concentrations.useautotime = 0
original_xrf_spectra = xrf_spectra.copy()
results = fit_xrf_spectra(
xrf_spectra,
configuration,
quantification=bool(quantification),
individual_weights=individual_weights,
live_times=live_times,
positive_peak_areas=positive_peak_areas,
diagnostics=True,
fast_fitting=fast_fitting,
native_fitting=native_fitting,
native_legacy_fitting=native_legacy_fitting,
)
numpy.testing.assert_array_equal(xrf_spectra, original_xrf_spectra)
return results
@contextmanager
def _raises(exc_cls: Optional[Type[Exception]]) -> Generator[bool, None, None]:
try:
if exc_cls:
with pytest.raises(exc_cls):
yield False
else:
yield True
except NotImplementedError as ex:
pytest.skip(str(ex))
def _compare_data(
actual: Any,
desired: Any,
_name: Optional[str] = "<root>",
float_atol: Optional[float] = None,
float_rtol: Optional[float] = None,
float_as_32bit: bool = False,
) -> None:
err_msg = f"{_name!r} is different"
if isinstance(desired, NotAvailable):
return
assert type(actual) is type(desired), err_msg
if isinstance(desired, dict):
for k, v in desired.items():
_compare_data(
actual[k],
v,
_name=f"{_name}.{k}",
float_atol=float_atol,
float_rtol=float_rtol,
float_as_32bit=float_as_32bit,
)
elif isinstance(desired, numpy.ndarray):
assert actual.shape == desired.shape, err_msg
atol = 1e-10
rtol = 1e-7
if numpy.issubdtype(desired.dtype, numpy.floating):
if float_atol is not None:
atol = float_atol
if float_rtol is not None:
rtol = float_rtol
if float_as_32bit:
with numpy.errstate(over="ignore"): # Stores results in float32
actual = actual.astype(numpy.float32)
desired = desired.astype(numpy.float32)
# abs(actual - desired) < atol + rtol * abs(desired)
numpy.testing.assert_allclose(
actual, desired, err_msg=err_msg, atol=atol, rtol=rtol
)
elif isinstance(desired, (tuple, list)):
assert len(actual) == len(desired), err_msg
if hasattr(desired, "_fields"):
names = desired._fields
else:
names = list(range(len(actual)))
for name, v1, v2 in zip(names, actual, desired):
_compare_data(
v1,
v2,
_name=f"{_name}[{name}]",
float_atol=float_atol,
float_rtol=float_rtol,
float_as_32bit=float_as_32bit,
)
else:
assert actual == desired, err_msg