Source code for ewoksfluo.tasks.raw_counters.extract_counters_stack
from typing import List
from typing import Optional
import h5py
import numpy
from ewokscore import Task
from ewokscore.model import BaseOutputModel
from pydantic import Field
from ...io import hdf5
from .. import hdf5_utils
from .. import nexus_utils
from ..models import FluoInputs
[docs]
class Inputs(FluoInputs):
bliss_scan_uris: List[str] = Field(
description="Bliss scan URI.",
examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]],
)
output_root_uri: str = Field(
description="Output root URI.",
examples=[
"/results/dataset.h5",
"/results/dataset.h5::/1.1",
"/results/dataset.h5::/1.1/merge",
],
)
xrf_results_uri: Optional[str] = Field(
default=None,
description="Previous XRF NXprocess URI to merge the raw Bliss counters with.",
examples=["/results/dataset.h5::/1.1/sum/results"],
)
[docs]
class Outputs(BaseOutputModel):
bliss_scan_uris: List[str] = Field(
description="Bliss scan URI.",
examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]],
)
output_root_uri: str = Field(
description="Output NXentry URI.", examples=["/results/dataset.h5::/1.1"]
)
xrf_results_uri: str = Field(
description="Output NXprocess results URI.",
examples=["/results/dataset.h5::/1.1/merge/results"],
)
[docs]
class ExtractRawCountersStack(Task, input_model=Inputs, output_model=Outputs):
"""Extract raw stack counters and save them like XRF results."""
[docs]
def run(self):
start_time = nexus_utils.now()
bliss_scan_uris = self.inputs.bliss_scan_uris
output_root_uri = self.inputs.output_root_uri
previous_xrf_results_uri = self.inputs.xrf_results_uri
input_file0, scan_h5path0 = hdf5.split_h5uri(bliss_scan_uris[0])
if previous_xrf_results_uri:
default_nprocess_name = "merge"
else:
default_nprocess_name = "raw"
with nexus_utils.save_in_ewoks_process(
output_root_uri,
start_time,
process_config=dict(),
default_levels=(
scan_h5path0, # TODO: what should the default be?
default_nprocess_name,
),
) as (process_group, already_existed):
if already_existed:
merged_xrf_results = process_group["results"]
else:
merged_xrf_results = process_group.create_group("results")
merged_xrf_results.attrs["NX_class"] = "NXcollection"
with hdf5.FileReadAccess(input_file0) as h5file0:
_link_raw_counters(
h5file0[scan_h5path0],
bliss_scan_uris,
merged_xrf_results,
"rawcounters",
)
if previous_xrf_results_uri:
input_file, parent_path = hdf5.split_h5uri(previous_xrf_results_uri)
with hdf5.FileReadAccess(input_file) as h5file:
_link_xrf_results(h5file[parent_path], merged_xrf_results)
self.outputs.xrf_results_uri = (
f"{merged_xrf_results.file.filename}::{merged_xrf_results.name}"
)
self.outputs.bliss_scan_uris = bliss_scan_uris
self.outputs.output_root_uri = output_root_uri
def _link_raw_counters(
raw_scan0: hdf5.GroupType,
bliss_scan_uris: List[str],
process_group: hdf5.GroupType,
name: str,
) -> None:
destination = nexus_utils.create_nxdata(process_group, name)
measurement = raw_scan0["measurement"]
nscans = len(bliss_scan_uris)
layouts = dict()
for name, dset in measurement.items():
if dset.ndim == 1:
layout = h5py.VirtualLayout(shape=(nscans, dset.size), dtype=dset.dtype)
vsource_shape = (dset.size,)
layouts[name] = layout, vsource_shape
for i, bliss_scan_uri in enumerate(bliss_scan_uris):
filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri)
for name, (layout, vsource_shape) in layouts.items():
layout[i] = h5py.VirtualSource(
filename, f"{scan_h5path}/measurement/{name}", shape=vsource_shape
)
for name, (layout, _) in layouts.items():
destination.create_virtual_dataset(name, layout, fillvalue=numpy.nan)
nexus_utils.set_nxdata_signals(destination, signals=list(layouts))
def _link_xrf_results(
xrf_results: hdf5.GroupType, process_group: hdf5.GroupType
) -> None:
for name, group in xrf_results.items():
hdf5_utils.create_hdf5_link(process_group, name, group)