Source code for ewoksfluo.tasks.raw_counters.extract_counters_stack
from typing import Optional, List
import numpy
import h5py
from ewokscore import Task
from .. import nexus_utils
from .. import hdf5_utils
from ...io.hdf5 import split_h5uri
from ...io.hdf5 import ReadHdf5File
[docs]
class ExtractRawCountersStack(
Task,
input_names=["bliss_scan_uris", "output_root_uri"],
optional_input_names=["xrf_results_uri"],
output_names=["xrf_results_uri", "bliss_scan_uris", "output_root_uri"],
):
"""Extract raw stack counters and save them like XRF results."""
[docs]
def run(self):
start_time = nexus_utils.now()
bliss_scan_uris: List[str] = self.inputs.bliss_scan_uris
output_root_uri: str = self.inputs.output_root_uri
previous_xrf_results_uri: Optional[str] = self.get_input_value(
"xrf_results_uri", None
)
input_file0, scan_h5path0 = split_h5uri(bliss_scan_uris[0])
if previous_xrf_results_uri:
default_nprocess_name = "merge"
else:
default_nprocess_name = "raw"
with nexus_utils.save_in_ewoks_process(
output_root_uri,
start_time,
process_config=dict(),
default_levels=(
scan_h5path0, # TODO: what should the default be?
default_nprocess_name,
),
) as (process_group, already_existed):
if already_existed:
merged_xrf_results = process_group["results"]
else:
merged_xrf_results = process_group.create_group("results")
merged_xrf_results.attrs["NX_class"] = "NXcollection"
with ReadHdf5File(input_file0) as h5file0:
_link_raw_counters(
h5file0[scan_h5path0],
bliss_scan_uris,
merged_xrf_results,
"rawcounters",
)
if previous_xrf_results_uri:
input_file, parent_path = split_h5uri(previous_xrf_results_uri)
with ReadHdf5File(input_file) as h5file:
_link_xrf_results(h5file[parent_path], merged_xrf_results)
self.outputs.xrf_results_uri = (
f"{merged_xrf_results.file.filename}::{merged_xrf_results.name}"
)
self.outputs.bliss_scan_uris = bliss_scan_uris
self.outputs.output_root_uri = output_root_uri
def _link_raw_counters(
raw_scan0: h5py.Group,
bliss_scan_uris: List[str],
process_group: h5py.Group,
name: str,
) -> None:
destination = nexus_utils.create_nxdata(process_group, name)
measurement = raw_scan0["measurement"]
nscans = len(bliss_scan_uris)
layouts = dict()
for name, dset in measurement.items():
if dset.ndim == 1:
layout = h5py.VirtualLayout(shape=(nscans, dset.size), dtype=dset.dtype)
vsource_shape = (dset.size,)
layouts[name] = layout, vsource_shape
for i, bliss_scan_uri in enumerate(bliss_scan_uris):
filename, scan_h5path = split_h5uri(bliss_scan_uri)
for name, (layout, vsource_shape) in layouts.items():
layout[i] = h5py.VirtualSource(
filename, f"{scan_h5path}/measurement/{name}", shape=vsource_shape
)
for name, (layout, _) in layouts.items():
destination.create_virtual_dataset(name, layout, fillvalue=numpy.nan)
nexus_utils.set_nxdata_signals(destination, signals=list(layouts))
def _link_xrf_results(xrf_results: h5py.Group, process_group: h5py.Group) -> None:
for name, group in xrf_results.items():
hdf5_utils.create_hdf5_link(process_group, name, group)