Source code for ewoksfluo.tasks.raw_counters.extract_counters_stack

from typing import List
from typing import Optional

import h5py
import numpy
from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field

from ...io import hdf5
from ...io import output_uri
from .. import hdf5_utils
from .. import nexus_utils


[docs] class Inputs(BaseInputModel): bliss_scan_uris: List[str] = Field( description="Bliss scan URI.", examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]], ) output_root_uri: str = Field( description="Target HDF5 file URI with optional data path.", examples=[ "/results/dataset.h5", "/results/dataset.h5::/1.1", "/results/dataset.h5::/1.1/merge", ], ) output_root_group: Optional[str] = Field( default=None, description="Optional group underneath ``output_root_uri``." ) xrf_results_uri: Optional[str] = Field( default=None, description="Previous XRF NXprocess URI to merge the raw Bliss counters with.", examples=["/results/dataset.h5::/1.1/sum/results"], )
[docs] class Outputs(BaseOutputModel): bliss_scan_uris: List[str] = Field( description="Bliss scan URI.", examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]], ) output_root_uri: str = Field( description="Original output root URI received as input.", examples=["/results/dataset.h5::/1.1"], ) output_root_group: Optional[str] = Field( default=None, description="Original output root group received as input." ) xrf_results_uri: str = Field( description="Output NXprocess results URI.", examples=["/results/dataset.h5::/1.1/merge/results"], )
[docs] class ExtractRawCountersStack(Task, input_model=Inputs, output_model=Outputs): """Extract raw stack counters and save them like XRF results."""
[docs] def run(self): start_time = nexus_utils.now() bliss_scan_uris = self.inputs.bliss_scan_uris previous_xrf_results_uri = self.inputs.xrf_results_uri input_file0, scan_h5path0 = hdf5.split_h5uri(bliss_scan_uris[0]) if previous_xrf_results_uri: process_name = "merge" else: process_name = "raw" output_root_uri = output_uri.compose_full_output_uri( self.inputs.output_root_uri, default_output_data_path=scan_h5path0, extra_data_paths=(self.inputs.output_root_group, process_name), ) with nexus_utils.save_in_ewoks_process( output_root_uri, start_time, process_config=dict() ) as (process_group, already_existed): if already_existed: merged_xrf_results = process_group["results"] else: merged_xrf_results = process_group.create_group("results") merged_xrf_results.attrs["NX_class"] = "NXcollection" with hdf5.FileReadAccess(input_file0) as h5file0: _link_raw_counters( h5file0[scan_h5path0], bliss_scan_uris, merged_xrf_results, "rawcounters", ) if previous_xrf_results_uri: input_file, parent_path = hdf5.split_h5uri(previous_xrf_results_uri) with hdf5.FileReadAccess(input_file) as h5file: _link_xrf_results(h5file[parent_path], merged_xrf_results) self.outputs.xrf_results_uri = ( f"{merged_xrf_results.file.filename}::{merged_xrf_results.name}" ) self.outputs.bliss_scan_uris = self.inputs.bliss_scan_uris self.outputs.output_root_uri = self.inputs.output_root_uri self.outputs.output_root_group = self.inputs.output_root_group
def _link_raw_counters( raw_scan0: hdf5.GroupType, bliss_scan_uris: List[str], process_group: hdf5.GroupType, name: str, ) -> None: destination = nexus_utils.create_nxdata(process_group, name) measurement = raw_scan0["measurement"] nscans = len(bliss_scan_uris) layouts = dict() attrs = dict() for name, dset in measurement.items(): if dset.ndim == 1: layout = h5py.VirtualLayout(shape=(nscans, dset.size), dtype=dset.dtype) vsource_shape = (dset.size,) layouts[name] = layout, vsource_shape attrs[name] = dict(dset.attrs) for i, bliss_scan_uri in enumerate(bliss_scan_uris): filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri) for name, (layout, vsource_shape) in layouts.items(): layout[i] = h5py.VirtualSource( filename, f"{scan_h5path}/measurement/{name}", shape=vsource_shape ) for name, (layout, _) in layouts.items(): dset = destination.create_virtual_dataset(name, layout, fillvalue=numpy.nan) dset.attrs.update(attrs[name]) nexus_utils.set_nxdata_signals(destination, signals=list(layouts)) def _link_xrf_results( xrf_results: hdf5.GroupType, process_group: hdf5.GroupType ) -> None: for name, group in xrf_results.items(): hdf5_utils.create_hdf5_link(process_group, name, group)