Source code for ewoksfluo.tasks.raw_counters.extract_counters_stack

from typing import List
from typing import Optional

import h5py
import numpy
from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field

from ...io import hdf5
from ...io import output_uri
from .. import hdf5_utils
from .. import nexus_utils


[docs] class Inputs(BaseInputModel): bliss_scan_uris: List[str] = Field( description="Bliss scan URI.", examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]], ) output_root_uri: str = Field( description="Target HDF5 file URI with optional data path.", examples=[ "/results/dataset.h5", "/results/dataset.h5::/1.1", "/results/dataset.h5::/1.1/merge", ], ) output_root_group: Optional[str] = Field( default=None, description="Optional group underneath ``output_root_uri``." ) xrf_results_uri: Optional[str] = Field( default=None, description="Previous XRF NXprocess URI to merge the raw Bliss counters with.", examples=["/results/dataset.h5::/1.1/sum/results"], )
[docs] class Outputs(BaseOutputModel): bliss_scan_uris: List[str] = Field( description="Bliss scan URI.", examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]], ) output_root_uri: str = Field( description="Original output root URI received as input.", examples=["/results/dataset.h5::/1.1"], ) output_root_group: Optional[str] = Field( default=None, description="Original output root group received as input." ) xrf_results_uri: str = Field( description="Output NXprocess results URI.", examples=["/results/dataset.h5::/1.1/merge/results"], )
[docs] class ExtractRawCountersStack(Task, input_model=Inputs, output_model=Outputs): """Extract raw stack counters and save them like XRF results."""
[docs] def run(self): start_time = nexus_utils.now() bliss_scan_uris = self.inputs.bliss_scan_uris previous_xrf_results_uri = self.inputs.xrf_results_uri _, scan_h5path0 = hdf5.split_h5uri(bliss_scan_uris[0]) if previous_xrf_results_uri: process_name = "merge" else: process_name = "raw" output_root_uri = output_uri.compose_full_output_uri( self.inputs.output_root_uri, default_output_data_path=scan_h5path0, extra_data_paths=(self.inputs.output_root_group, process_name), ) with nexus_utils.save_in_ewoks_process( output_root_uri, start_time, process_config=dict() ) as (process_group, already_existed): if already_existed: merged_xrf_results = process_group["results"] else: merged_xrf_results = process_group.create_group("results") merged_xrf_results.attrs["NX_class"] = "NXcollection" _link_raw_counters(bliss_scan_uris, merged_xrf_results, "rawcounters") if previous_xrf_results_uri: input_file, parent_path = hdf5.split_h5uri(previous_xrf_results_uri) with hdf5.FileReadAccess(input_file) as h5file: _link_xrf_results(h5file[parent_path], merged_xrf_results) self.outputs.xrf_results_uri = ( f"{merged_xrf_results.file.filename}::{merged_xrf_results.name}" ) self.outputs.bliss_scan_uris = self.inputs.bliss_scan_uris self.outputs.output_root_uri = self.inputs.output_root_uri self.outputs.output_root_group = self.inputs.output_root_group
def _link_raw_counters( bliss_scan_uris: List[str], process_group: hdf5.GroupType, name: str, ) -> None: destination = nexus_utils.create_nxdata(process_group, name) nscans = len(bliss_scan_uris) # First pass: # - determine maximum dataset size across scans # - collect attrs and dtypes max_sizes = {} attrs = {} dtypes = {} for bliss_scan_uri in bliss_scan_uris: filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri) with hdf5.FileReadAccess(filename) as h5file: entry = h5file[scan_h5path] if "measurement" not in entry: continue measurement = entry["measurement"] for dset_name, dset in measurement.items(): if dset.ndim != 1: continue max_sizes[dset_name] = max(max_sizes.get(dset_name, 0), dset.size) if dset_name not in attrs: attrs[dset_name] = dict(dset.attrs) dtypes[dset_name] = dset.dtype # Create layouts with final shapes layouts = { dset_name: h5py.VirtualLayout( shape=(nscans, max_sizes[dset_name]), dtype=dtypes[dset_name], ) for dset_name in max_sizes } # Second pass: attach virtual sources for i, bliss_scan_uri in enumerate(bliss_scan_uris): filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri) with hdf5.FileReadAccess(filename) as h5file: entry = h5file[scan_h5path] if "measurement" not in entry: continue measurement = entry["measurement"] for dset_name, layout in layouts.items(): if dset_name not in measurement: continue dset = measurement[dset_name] vsource = h5py.VirtualSource( filename, f"{scan_h5path}/measurement/{dset_name}", shape=(dset.size,), ) # Map only the valid region for this scan layout[i, : dset.size] = vsource # Create virtual datasets for dset_name, layout in layouts.items(): dset = destination.create_virtual_dataset( dset_name, layout, fillvalue=numpy.nan ) dset.attrs.update(attrs[dset_name]) nexus_utils.set_nxdata_signals(destination, signals=list(layouts)) def _link_xrf_results( xrf_results: hdf5.GroupType, process_group: hdf5.GroupType ) -> None: for name, group in xrf_results.items(): hdf5_utils.create_hdf5_link(process_group, name, group)