Source code for ewoksfluo.tests.utils

import pathlib
from functools import lru_cache
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple

import h5py
import numpy

from ..io import hdf5
from ..tasks.example_data.xrf_spectra import EmissionLineGroup
from ..tasks.example_data.xrf_spectra import ScatterLineGroup
from ..tasks.example_data.xrf_spectra import xrf_spectra
from ..xrffit.pymca_config import PyMcaXrfConfiguration


[docs] def h5content(group: hdf5.GroupType): info = dict() if group.attrs: info["@attrs"] = set(group.attrs) for name, item in group.items(): if hdf5.is_group(item): info[name] = h5content(item) else: if item.attrs: info[f"{name}@attrs"] = set(item.attrs) info[f"{name}@shape"] = item.shape return info
[docs] @lru_cache def generate_data( npoints_per_scan: int, energy: float, tmp_path: Optional[pathlib.Path] = None, samefile: bool = True, nscans: int = 1, ndetectors: int = 1, points_with_negative_peaks: Tuple[int, ...] = tuple(), quantification: Optional[str] = None, num_layers: int = 1, ) -> Tuple[ List[List[str]], numpy.ndarray, Dict[str, numpy.ndarray], PyMcaXrfConfiguration ]: xrf_spectra_uris = list() parameters = dict() spectra = list() fit_background = bool(points_with_negative_peaks) for detector in range(ndetectors): det_xrf_spectra_uris = list() xrf_spectra_uris.append(det_xrf_spectra_uris) det_spectra = list() spectra.append(det_spectra) for scan, filename in enumerate( _generate_scan_filenames(tmp_path, samefile, nscans), 1 ): linegroups = [ EmissionLineGroup( "Si", "K", _generate_counts(300, npoints_per_scan, scan) ), EmissionLineGroup( "Al", "K", _generate_counts(400, npoints_per_scan, scan) ), EmissionLineGroup( "Cl", "K", _generate_counts( 200, npoints_per_scan, scan, negative_indices=points_with_negative_peaks, ), ), EmissionLineGroup( "Pb", "M", _generate_counts(500, npoints_per_scan, scan) ), EmissionLineGroup( "P", "K", _generate_counts(200, npoints_per_scan, scan) ), EmissionLineGroup( "S", "K", _generate_counts(600, npoints_per_scan, scan) ), EmissionLineGroup( "Ca", "K", _generate_counts(500, npoints_per_scan, scan) ), EmissionLineGroup( "Ti", "K", _generate_counts(400, npoints_per_scan, scan) ), EmissionLineGroup( "Ce", "L", _generate_counts(500, npoints_per_scan, scan) ), EmissionLineGroup( "Fe", "K", _generate_counts(1000, npoints_per_scan, scan) ), ] scattergroups = [ ScatterLineGroup( "Peak000", _generate_counts(100, npoints_per_scan, scan) ), ScatterLineGroup( "Compton000", _generate_counts(100, npoints_per_scan, scan) ), ] if detector == 0: for group in linegroups: lst = parameters.setdefault(f"{group.element}_{group.name}", list()) lst.extend(group.counts) for group in scattergroups: lst = parameters.setdefault(f"{group.prefix}_{group.name}", list()) lst.extend(group.counts) _spectra, configuration = xrf_spectra( linegroups, scattergroups, energy=energy, fit_background=fit_background, quantification=quantification, num_layers=num_layers, ) det_spectra.extend(_spectra) if not filename: continue with h5py.File(filename, mode="a") as h5file: scan_name = f"/{scan}.1" spectra_name = f"{scan_name}/measurement/mca{detector:02d}" energy_name = f"{scan_name}/instrument/positioners_start/energy" h5file[spectra_name] = _spectra if energy_name not in h5file: h5file[energy_name] = energy h5file[energy_name].attrs["units"] = "keV" det_xrf_spectra_uris.append(f"{filename}::{spectra_name}") # For each detector: list of spectra and peak areas for all points in all scans spectra = numpy.asarray(spectra) parameters = {name: numpy.asarray(values) for name, values in parameters.items()} return xrf_spectra_uris, spectra, parameters, configuration
def _generate_scan_filenames( tmp_path: Optional[pathlib.Path], samefile: bool, nscans: int ) -> Generator[Optional[str], None, None]: if tmp_path is None: for scan in range(1, nscans + 1): yield None elif samefile: tmp_path.mkdir(parents=True, exist_ok=True) for scan in range(1, nscans + 1): yield str(tmp_path / "spectra.h5") else: tmp_path.mkdir(parents=True, exist_ok=True) for scan in range(1, nscans + 1): yield str(tmp_path / f"spectra{scan}.h5") def _generate_counts( start_counts: int, npoints_per_scan: int, scan: int, negative_indices: Tuple[int, ...] = tuple(), ) -> List[int]: step = 50 total_step = npoints_per_scan * step start = start_counts + (scan - 1) * total_step stop = start + total_step values = list(range(start, stop, step)) for i in negative_indices: values[i] = -values[i] return values