Source code for ewoksfluo.tasks.raw_counters.extract_counters
from typing import Optional
from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field
from ...io import hdf5
from ...io import output_uri
from .. import hdf5_utils
from .. import nexus_utils
[docs]
class Inputs(BaseInputModel):
bliss_scan_uri: str = Field(
description="Bliss scan URI.", examples=["/data/dataset.h5::/1.1"]
)
output_root_uri: str = Field(
description="Target HDF5 file URI with optional data path.",
examples=[
"/results/dataset.h5",
"/results/dataset.h5::/1.1",
"/results/dataset.h5::/1.1/merge",
],
)
output_root_group: Optional[str] = Field(
default=None, description="Optional group underneath ``output_root_uri``."
)
xrf_results_uri: Optional[str] = Field(
default=None,
description="Previous XRF NXprocess URI to merge the raw Bliss counters with.",
examples=["/results/dataset.h5::/1.1/sum/results"],
)
[docs]
class Outputs(BaseOutputModel):
bliss_scan_uri: str = Field(
description="Bliss scan URI.", examples=["/data/dataset.h5::/1.1"]
)
output_root_uri: str = Field(
description="Original output root URI received as input.",
examples=["/results/dataset.h5::/1.1"],
)
output_root_group: Optional[str] = Field(
default=None, description="Original output root group received as input."
)
xrf_results_uri: str = Field(
description="Output NXprocess results URI.",
examples=["/results/dataset.h5::/1.1/merge/results"],
)
[docs]
class ExtractRawCounters(Task, input_model=Inputs, output_model=Outputs):
"""Extract raw single-scan counters and save them like XRF results."""
[docs]
def run(self):
start_time = nexus_utils.now()
previous_xrf_results_uri = self.inputs.xrf_results_uri
input_file, scan_h5path = hdf5.split_h5uri(self.inputs.bliss_scan_uri)
if previous_xrf_results_uri:
process_name = "merge"
else:
process_name = "raw"
output_root_uri = output_uri.compose_full_output_uri(
self.inputs.output_root_uri,
default_output_data_path=scan_h5path,
extra_data_paths=(self.inputs.output_root_group, process_name),
)
with nexus_utils.save_in_ewoks_process(
output_root_uri,
start_time,
process_config=dict(),
) as (process_group, already_existed):
if already_existed:
merged_xrf_results = process_group["results"]
else:
merged_xrf_results = process_group.create_group("results")
merged_xrf_results.attrs["NX_class"] = "NXcollection"
with hdf5.FileReadAccess(input_file) as h5file:
_link_raw_counters(
h5file[scan_h5path], merged_xrf_results, "rawcounters"
)
if previous_xrf_results_uri:
input_file, parent_path = hdf5.split_h5uri(
previous_xrf_results_uri
)
with hdf5.FileReadAccess(input_file) as h5file:
_link_xrf_results(h5file[parent_path], merged_xrf_results)
self.outputs.xrf_results_uri = (
f"{merged_xrf_results.file.filename}::{merged_xrf_results.name}"
)
self.outputs.bliss_scan_uri = self.inputs.bliss_scan_uri
self.outputs.output_root_uri = self.inputs.output_root_uri
self.outputs.output_root_group = self.inputs.output_root_group
def _link_raw_counters(
raw_scan: hdf5.GroupType, process_group: hdf5.GroupType, name: str
) -> None:
destination = nexus_utils.create_nxdata(process_group, name)
measurement = raw_scan["measurement"]
signals = list()
for name, dset in measurement.items():
if dset.ndim == 1:
hdf5_utils.create_hdf5_link(destination, name, dset)
signals.append(name)
nexus_utils.set_nxdata_signals(destination, signals=signals)
def _link_xrf_results(
xrf_results: hdf5.GroupType, process_group: hdf5.GroupType
) -> None:
for name, group in xrf_results.items():
hdf5_utils.create_hdf5_link(process_group, name, group)