Source code for ewoksfluo.tests.utils.generate

import pathlib
from functools import lru_cache
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple

import h5py
import numpy

from ...tasks.example_data.xrf_spectra import EmissionLineGroup
from ...tasks.example_data.xrf_spectra import ScatterLineGroup
from ...tasks.example_data.xrf_spectra import xrf_spectra
from ...xrffit.pymca_config import PyMcaXrfConfiguration


[docs] @lru_cache def generate_data( npoints_per_scan: int, energy: float, tmp_path: Optional[pathlib.Path] = None, samefile: bool = True, nscans: int = 1, ndetectors: int = 1, points_with_negative_peaks: Tuple[int, ...] = tuple(), quantification: Optional[str] = None, num_layers: int = 1, ) -> Tuple[ List[List[str]], numpy.ndarray, Dict[str, numpy.ndarray], PyMcaXrfConfiguration ]: xrf_spectra_uris = list() parameters = dict() spectra = list() fit_background = bool(points_with_negative_peaks) for detector in range(ndetectors): det_xrf_spectra_uris = list() xrf_spectra_uris.append(det_xrf_spectra_uris) det_spectra = list() spectra.append(det_spectra) for scan, filename in enumerate( _generate_scan_filenames(tmp_path, samefile, nscans), 1 ): linegroups = [ EmissionLineGroup( "Si", "K", _generate_counts(300, npoints_per_scan, scan) ), EmissionLineGroup( "Al", "K", _generate_counts(400, npoints_per_scan, scan) ), EmissionLineGroup( "Cl", "K", _generate_counts( 200, npoints_per_scan, scan, negative_indices=points_with_negative_peaks, ), ), EmissionLineGroup( "Pb", "M", _generate_counts(500, npoints_per_scan, scan) ), EmissionLineGroup( "P", "K", _generate_counts(200, npoints_per_scan, scan) ), EmissionLineGroup( "S", "K", _generate_counts(600, npoints_per_scan, scan) ), EmissionLineGroup( "Ca", "K", _generate_counts(500, npoints_per_scan, scan) ), EmissionLineGroup( "Ti", "K", _generate_counts(400, npoints_per_scan, scan) ), EmissionLineGroup( "Ce", "L", _generate_counts(500, npoints_per_scan, scan) ), EmissionLineGroup( "Fe", "K", _generate_counts(1000, npoints_per_scan, scan) ), ] scattergroups = [ ScatterLineGroup( "Peak000", _generate_counts(100, npoints_per_scan, scan) ), ScatterLineGroup( "Compton000", _generate_counts(100, npoints_per_scan, scan) ), ] if detector == 0: for group in linegroups: lst = parameters.setdefault(f"{group.element}_{group.name}", list()) lst.extend(group.counts) for group in scattergroups: lst = parameters.setdefault(f"{group.prefix}_{group.name}", list()) lst.extend(group.counts) _spectra, configuration = xrf_spectra( linegroups, scattergroups, energy=energy, fit_background=fit_background, quantification=quantification, num_layers=num_layers, ) det_spectra.extend(_spectra) if not filename: continue with h5py.File(filename, mode="a") as h5file: scan_name = f"/{scan}.1" spectra_name = f"{scan_name}/measurement/mca{detector:02d}" energy_name = f"{scan_name}/instrument/positioners_start/energy" h5file[spectra_name] = _spectra if energy_name not in h5file: h5file[energy_name] = energy h5file[energy_name].attrs["units"] = "keV" det_xrf_spectra_uris.append(f"{filename}::{spectra_name}") # For each detector: list of spectra and peak areas for all points in all scans spectra = numpy.asarray(spectra) parameters = {name: numpy.asarray(values) for name, values in parameters.items()} return xrf_spectra_uris, spectra, parameters, configuration
def _generate_scan_filenames( tmp_path: Optional[pathlib.Path], samefile: bool, nscans: int ) -> Generator[Optional[str], None, None]: if tmp_path is None: for scan in range(1, nscans + 1): yield None elif samefile: tmp_path.mkdir(parents=True, exist_ok=True) for scan in range(1, nscans + 1): yield str(tmp_path / "spectra.h5") else: tmp_path.mkdir(parents=True, exist_ok=True) for scan in range(1, nscans + 1): yield str(tmp_path / f"spectra{scan}.h5") def _generate_counts( start_counts: int, npoints_per_scan: int, scan: int, negative_indices: Tuple[int, ...] = tuple(), ) -> List[int]: step = 50 total_step = npoints_per_scan * step start = start_counts + (scan - 1) * total_step stop = start + total_step values = list(range(start, stop, step)) for i in negative_indices: values[i] = -values[i] return values