import pathlib
from functools import lru_cache
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple
import h5py
import numpy
from ...tasks.example_data.xrf_spectra import EmissionLineGroup
from ...tasks.example_data.xrf_spectra import ScatterLineGroup
from ...tasks.example_data.xrf_spectra import xrf_spectra
from ...xrffit.pymca_config import PyMcaXrfConfiguration
[docs]
@lru_cache
def generate_data(
npoints_per_scan: int,
energy: float,
tmp_path: Optional[pathlib.Path] = None,
samefile: bool = True,
nscans: int = 1,
ndetectors: int = 1,
points_with_negative_peaks: Tuple[int, ...] = tuple(),
quantification: Optional[str] = None,
num_layers: int = 1,
) -> Tuple[
List[List[str]], numpy.ndarray, Dict[str, numpy.ndarray], PyMcaXrfConfiguration
]:
xrf_spectra_uris = list()
parameters = dict()
spectra = list()
fit_background = bool(points_with_negative_peaks)
for detector in range(ndetectors):
det_xrf_spectra_uris = list()
xrf_spectra_uris.append(det_xrf_spectra_uris)
det_spectra = list()
spectra.append(det_spectra)
for scan, filename in enumerate(
_generate_scan_filenames(tmp_path, samefile, nscans), 1
):
linegroups = [
EmissionLineGroup(
"Si", "K", _generate_counts(300, npoints_per_scan, scan)
),
EmissionLineGroup(
"Al", "K", _generate_counts(400, npoints_per_scan, scan)
),
EmissionLineGroup(
"Cl",
"K",
_generate_counts(
200,
npoints_per_scan,
scan,
negative_indices=points_with_negative_peaks,
),
),
EmissionLineGroup(
"Pb", "M", _generate_counts(500, npoints_per_scan, scan)
),
EmissionLineGroup(
"P", "K", _generate_counts(200, npoints_per_scan, scan)
),
EmissionLineGroup(
"S", "K", _generate_counts(600, npoints_per_scan, scan)
),
EmissionLineGroup(
"Ca", "K", _generate_counts(500, npoints_per_scan, scan)
),
EmissionLineGroup(
"Ti", "K", _generate_counts(400, npoints_per_scan, scan)
),
EmissionLineGroup(
"Ce", "L", _generate_counts(500, npoints_per_scan, scan)
),
EmissionLineGroup(
"Fe", "K", _generate_counts(1000, npoints_per_scan, scan)
),
]
scattergroups = [
ScatterLineGroup(
"Peak000", _generate_counts(100, npoints_per_scan, scan)
),
ScatterLineGroup(
"Compton000", _generate_counts(100, npoints_per_scan, scan)
),
]
if detector == 0:
for group in linegroups:
lst = parameters.setdefault(f"{group.element}_{group.name}", list())
lst.extend(group.counts)
for group in scattergroups:
lst = parameters.setdefault(f"{group.prefix}_{group.name}", list())
lst.extend(group.counts)
_spectra, configuration = xrf_spectra(
linegroups,
scattergroups,
energy=energy,
fit_background=fit_background,
quantification=quantification,
num_layers=num_layers,
)
det_spectra.extend(_spectra)
if not filename:
continue
with h5py.File(filename, mode="a") as h5file:
scan_name = f"/{scan}.1"
spectra_name = f"{scan_name}/measurement/mca{detector:02d}"
energy_name = f"{scan_name}/instrument/positioners_start/energy"
h5file[spectra_name] = _spectra
if energy_name not in h5file:
h5file[energy_name] = energy
h5file[energy_name].attrs["units"] = "keV"
det_xrf_spectra_uris.append(f"{filename}::{spectra_name}")
# For each detector: list of spectra and peak areas for all points in all scans
spectra = numpy.asarray(spectra)
parameters = {name: numpy.asarray(values) for name, values in parameters.items()}
return xrf_spectra_uris, spectra, parameters, configuration
def _generate_scan_filenames(
tmp_path: Optional[pathlib.Path], samefile: bool, nscans: int
) -> Generator[Optional[str], None, None]:
if tmp_path is None:
for scan in range(1, nscans + 1):
yield None
elif samefile:
tmp_path.mkdir(parents=True, exist_ok=True)
for scan in range(1, nscans + 1):
yield str(tmp_path / "spectra.h5")
else:
tmp_path.mkdir(parents=True, exist_ok=True)
for scan in range(1, nscans + 1):
yield str(tmp_path / f"spectra{scan}.h5")
def _generate_counts(
start_counts: int,
npoints_per_scan: int,
scan: int,
negative_indices: Tuple[int, ...] = tuple(),
) -> List[int]:
step = 50
total_step = npoints_per_scan * step
start = start_counts + (scan - 1) * total_step
stop = start + total_step
values = list(range(start, stop, step))
for i in negative_indices:
values[i] = -values[i]
return values