import pathlib
from functools import lru_cache
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple
import h5py
import numpy
from ..io import hdf5
from ..tasks.example_data.xrf_spectra import EmissionLineGroup
from ..tasks.example_data.xrf_spectra import ScatterLineGroup
from ..tasks.example_data.xrf_spectra import xrf_spectra
from ..xrffit.pymca_config import PyMcaXrfConfiguration
[docs]
def h5content(group: hdf5.GroupType):
info = dict()
if group.attrs:
info["@attrs"] = set(group.attrs)
for name, item in group.items():
if hdf5.is_group(item):
info[name] = h5content(item)
else:
if item.attrs:
info[f"{name}@attrs"] = set(item.attrs)
info[f"{name}@shape"] = item.shape
return info
[docs]
@lru_cache
def generate_data(
npoints_per_scan: int,
energy: float,
tmp_path: Optional[pathlib.Path] = None,
samefile: bool = True,
nscans: int = 1,
ndetectors: int = 1,
points_with_negative_peaks: Tuple[int, ...] = tuple(),
quantification: Optional[str] = None,
num_layers: int = 1,
) -> Tuple[
List[List[str]], numpy.ndarray, Dict[str, numpy.ndarray], PyMcaXrfConfiguration
]:
xrf_spectra_uris = list()
parameters = dict()
spectra = list()
fit_background = bool(points_with_negative_peaks)
for detector in range(ndetectors):
det_xrf_spectra_uris = list()
xrf_spectra_uris.append(det_xrf_spectra_uris)
det_spectra = list()
spectra.append(det_spectra)
for scan, filename in enumerate(
_generate_scan_filenames(tmp_path, samefile, nscans), 1
):
linegroups = [
EmissionLineGroup(
"Si", "K", _generate_counts(300, npoints_per_scan, scan)
),
EmissionLineGroup(
"Al", "K", _generate_counts(400, npoints_per_scan, scan)
),
EmissionLineGroup(
"Cl",
"K",
_generate_counts(
200,
npoints_per_scan,
scan,
negative_indices=points_with_negative_peaks,
),
),
EmissionLineGroup(
"Pb", "M", _generate_counts(500, npoints_per_scan, scan)
),
EmissionLineGroup(
"P", "K", _generate_counts(200, npoints_per_scan, scan)
),
EmissionLineGroup(
"S", "K", _generate_counts(600, npoints_per_scan, scan)
),
EmissionLineGroup(
"Ca", "K", _generate_counts(500, npoints_per_scan, scan)
),
EmissionLineGroup(
"Ti", "K", _generate_counts(400, npoints_per_scan, scan)
),
EmissionLineGroup(
"Ce", "L", _generate_counts(500, npoints_per_scan, scan)
),
EmissionLineGroup(
"Fe", "K", _generate_counts(1000, npoints_per_scan, scan)
),
]
scattergroups = [
ScatterLineGroup(
"Peak000", _generate_counts(100, npoints_per_scan, scan)
),
ScatterLineGroup(
"Compton000", _generate_counts(100, npoints_per_scan, scan)
),
]
if detector == 0:
for group in linegroups:
lst = parameters.setdefault(f"{group.element}_{group.name}", list())
lst.extend(group.counts)
for group in scattergroups:
lst = parameters.setdefault(f"{group.prefix}_{group.name}", list())
lst.extend(group.counts)
_spectra, configuration = xrf_spectra(
linegroups,
scattergroups,
energy=energy,
fit_background=fit_background,
quantification=quantification,
num_layers=num_layers,
)
det_spectra.extend(_spectra)
if not filename:
continue
with h5py.File(filename, mode="a") as h5file:
scan_name = f"/{scan}.1"
spectra_name = f"{scan_name}/measurement/mca{detector:02d}"
energy_name = f"{scan_name}/instrument/positioners_start/energy"
h5file[spectra_name] = _spectra
if energy_name not in h5file:
h5file[energy_name] = energy
h5file[energy_name].attrs["units"] = "keV"
det_xrf_spectra_uris.append(f"{filename}::{spectra_name}")
# For each detector: list of spectra and peak areas for all points in all scans
spectra = numpy.asarray(spectra)
parameters = {name: numpy.asarray(values) for name, values in parameters.items()}
return xrf_spectra_uris, spectra, parameters, configuration
def _generate_scan_filenames(
tmp_path: Optional[pathlib.Path], samefile: bool, nscans: int
) -> Generator[Optional[str], None, None]:
if tmp_path is None:
for scan in range(1, nscans + 1):
yield None
elif samefile:
tmp_path.mkdir(parents=True, exist_ok=True)
for scan in range(1, nscans + 1):
yield str(tmp_path / "spectra.h5")
else:
tmp_path.mkdir(parents=True, exist_ok=True)
for scan in range(1, nscans + 1):
yield str(tmp_path / f"spectra{scan}.h5")
def _generate_counts(
start_counts: int,
npoints_per_scan: int,
scan: int,
negative_indices: Tuple[int, ...] = tuple(),
) -> List[int]:
step = 50
total_step = npoints_per_scan * step
start = start_counts + (scan - 1) * total_step
stop = start + total_step
values = list(range(start, stop, step))
for i in negative_indices:
values[i] = -values[i]
return values