Source code for ewoksfluo.tasks.raw_counters.extract_counters

from typing import Optional

from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field

from ...io import hdf5
from ...io import output_uri
from .. import hdf5_utils
from .. import nexus_utils


[docs] class Inputs(BaseInputModel): bliss_scan_uri: str = Field( description="Bliss scan URI.", examples=["/data/dataset.h5::/1.1"] ) output_root_uri: str = Field( description="Target HDF5 file URI with optional data path.", examples=[ "/results/dataset.h5", "/results/dataset.h5::/1.1", "/results/dataset.h5::/1.1/merge", ], ) output_root_group: Optional[str] = Field( default=None, description="Optional group underneath ``output_root_uri``." ) xrf_results_uri: Optional[str] = Field( default=None, description="Previous XRF NXprocess URI to merge the raw Bliss counters with.", examples=["/results/dataset.h5::/1.1/sum/results"], )
[docs] class Outputs(BaseOutputModel): bliss_scan_uri: str = Field( description="Bliss scan URI.", examples=["/data/dataset.h5::/1.1"] ) output_root_uri: str = Field( description="Original output root URI received as input.", examples=["/results/dataset.h5::/1.1"], ) output_root_group: Optional[str] = Field( default=None, description="Original output root group received as input." ) xrf_results_uri: str = Field( description="Output NXprocess results URI.", examples=["/results/dataset.h5::/1.1/merge/results"], )
[docs] class ExtractRawCounters(Task, input_model=Inputs, output_model=Outputs): """Extract raw single-scan counters and save them like XRF results."""
[docs] def run(self): start_time = nexus_utils.now() previous_xrf_results_uri = self.inputs.xrf_results_uri input_file, scan_h5path = hdf5.split_h5uri(self.inputs.bliss_scan_uri) if previous_xrf_results_uri: process_name = "merge" else: process_name = "raw" output_root_uri = output_uri.compose_full_output_uri( self.inputs.output_root_uri, default_output_data_path=scan_h5path, extra_data_paths=(self.inputs.output_root_group, process_name), ) with nexus_utils.save_in_ewoks_process( output_root_uri, start_time, process_config=dict(), ) as (process_group, already_existed): if already_existed: merged_xrf_results = process_group["results"] else: merged_xrf_results = process_group.create_group("results") merged_xrf_results.attrs["NX_class"] = "NXcollection" with hdf5.FileReadAccess(input_file) as h5file: _link_raw_counters( h5file[scan_h5path], merged_xrf_results, "rawcounters" ) if previous_xrf_results_uri: input_file, parent_path = hdf5.split_h5uri( previous_xrf_results_uri ) with hdf5.FileReadAccess(input_file) as h5file: _link_xrf_results(h5file[parent_path], merged_xrf_results) self.outputs.xrf_results_uri = ( f"{merged_xrf_results.file.filename}::{merged_xrf_results.name}" ) self.outputs.bliss_scan_uri = self.inputs.bliss_scan_uri self.outputs.output_root_uri = self.inputs.output_root_uri self.outputs.output_root_group = self.inputs.output_root_group
def _link_raw_counters( raw_scan: hdf5.GroupType, process_group: hdf5.GroupType, name: str ) -> None: destination = nexus_utils.create_nxdata(process_group, name) measurement = raw_scan["measurement"] signals = list() for name, dset in measurement.items(): if dset.ndim == 1: hdf5_utils.create_hdf5_link(destination, name, dset) signals.append(name) nexus_utils.set_nxdata_signals(destination, signals=signals) def _link_xrf_results( xrf_results: hdf5.GroupType, process_group: hdf5.GroupType ) -> None: for name, group in xrf_results.items(): hdf5_utils.create_hdf5_link(process_group, name, group)