Source code for ewoksfluo.tasks.raw_counters.extract_counters_stack
from typing import List
from typing import Optional
import h5py
import numpy
from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field
from ...io import hdf5
from ...io import output_uri
from .. import hdf5_utils
from .. import nexus_utils
[docs]
class Inputs(BaseInputModel):
bliss_scan_uris: List[str] = Field(
description="Bliss scan URI.",
examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]],
)
output_root_uri: str = Field(
description="Target HDF5 file URI with optional data path.",
examples=[
"/results/dataset.h5",
"/results/dataset.h5::/1.1",
"/results/dataset.h5::/1.1/merge",
],
)
output_root_group: Optional[str] = Field(
default=None, description="Optional group underneath ``output_root_uri``."
)
xrf_results_uri: Optional[str] = Field(
default=None,
description="Previous XRF NXprocess URI to merge the raw Bliss counters with.",
examples=["/results/dataset.h5::/1.1/sum/results"],
)
[docs]
class Outputs(BaseOutputModel):
bliss_scan_uris: List[str] = Field(
description="Bliss scan URI.",
examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]],
)
output_root_uri: str = Field(
description="Original output root URI received as input.",
examples=["/results/dataset.h5::/1.1"],
)
output_root_group: Optional[str] = Field(
default=None, description="Original output root group received as input."
)
xrf_results_uri: str = Field(
description="Output NXprocess results URI.",
examples=["/results/dataset.h5::/1.1/merge/results"],
)
[docs]
class ExtractRawCountersStack(Task, input_model=Inputs, output_model=Outputs):
"""Extract raw stack counters and save them like XRF results."""
[docs]
def run(self):
start_time = nexus_utils.now()
bliss_scan_uris = self.inputs.bliss_scan_uris
previous_xrf_results_uri = self.inputs.xrf_results_uri
_, scan_h5path0 = hdf5.split_h5uri(bliss_scan_uris[0])
if previous_xrf_results_uri:
process_name = "merge"
else:
process_name = "raw"
output_root_uri = output_uri.compose_full_output_uri(
self.inputs.output_root_uri,
default_output_data_path=scan_h5path0,
extra_data_paths=(self.inputs.output_root_group, process_name),
)
with nexus_utils.save_in_ewoks_process(
output_root_uri, start_time, process_config=dict()
) as (process_group, already_existed):
if already_existed:
merged_xrf_results = process_group["results"]
else:
merged_xrf_results = process_group.create_group("results")
merged_xrf_results.attrs["NX_class"] = "NXcollection"
_link_raw_counters(bliss_scan_uris, merged_xrf_results, "rawcounters")
if previous_xrf_results_uri:
input_file, parent_path = hdf5.split_h5uri(previous_xrf_results_uri)
with hdf5.FileReadAccess(input_file) as h5file:
_link_xrf_results(h5file[parent_path], merged_xrf_results)
self.outputs.xrf_results_uri = (
f"{merged_xrf_results.file.filename}::{merged_xrf_results.name}"
)
self.outputs.bliss_scan_uris = self.inputs.bliss_scan_uris
self.outputs.output_root_uri = self.inputs.output_root_uri
self.outputs.output_root_group = self.inputs.output_root_group
def _link_raw_counters(
bliss_scan_uris: List[str],
process_group: hdf5.GroupType,
name: str,
) -> None:
destination = nexus_utils.create_nxdata(process_group, name)
nscans = len(bliss_scan_uris)
# First pass:
# - determine maximum dataset size across scans
# - collect attrs and dtypes
max_sizes = {}
attrs = {}
dtypes = {}
for bliss_scan_uri in bliss_scan_uris:
filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri)
with hdf5.FileReadAccess(filename) as h5file:
entry = h5file[scan_h5path]
if "measurement" not in entry:
continue
measurement = entry["measurement"]
for dset_name, dset in measurement.items():
if dset.ndim != 1:
continue
max_sizes[dset_name] = max(max_sizes.get(dset_name, 0), dset.size)
if dset_name not in attrs:
attrs[dset_name] = dict(dset.attrs)
dtypes[dset_name] = dset.dtype
# Create layouts with final shapes
layouts = {
dset_name: h5py.VirtualLayout(
shape=(nscans, max_sizes[dset_name]),
dtype=dtypes[dset_name],
)
for dset_name in max_sizes
}
# Second pass: attach virtual sources
for i, bliss_scan_uri in enumerate(bliss_scan_uris):
filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri)
with hdf5.FileReadAccess(filename) as h5file:
entry = h5file[scan_h5path]
if "measurement" not in entry:
continue
measurement = entry["measurement"]
for dset_name, layout in layouts.items():
if dset_name not in measurement:
continue
dset = measurement[dset_name]
vsource = h5py.VirtualSource(
filename,
f"{scan_h5path}/measurement/{dset_name}",
shape=(dset.size,),
)
# Map only the valid region for this scan
layout[i, : dset.size] = vsource
# Create virtual datasets
for dset_name, layout in layouts.items():
dset = destination.create_virtual_dataset(
dset_name, layout, fillvalue=numpy.nan
)
dset.attrs.update(attrs[dset_name])
nexus_utils.set_nxdata_signals(destination, signals=list(layouts))
def _link_xrf_results(
xrf_results: hdf5.GroupType, process_group: hdf5.GroupType
) -> None:
for name, group in xrf_results.items():
hdf5_utils.create_hdf5_link(process_group, name, group)