Source code for ewoksfluo.tasks.nexus_utils

import contextlib
import importlib.metadata
import json
import logging
from datetime import datetime
from typing import Any
from typing import Dict
from typing import Generator
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union

import h5py
import numpy
from ewoksdata.data import nexus
from silx.io import h5py_utils

from ..io import hdf5

version = importlib.metadata.version("ewoksfluo")

logger = logging.getLogger(__name__)


[docs] def create_nxdata( parent: Union[h5py.File, h5py.Group], name: str, signal: Optional[str] = None, ) -> h5py.Group: nxdata = parent.create_group(name) nxdata.attrs["NX_class"] = "NXdata" if signal: nxdata.attrs["signal"] = signal nexus.select_default_plot(nxdata) return nxdata
[docs] def set_nxdata_signals(nxdata: hdf5.GroupType, signals: Sequence[str]): nxdata.attrs["signal"] = signals[0] if len(signals) > 1: nxdata.attrs["auxiliary_signals"] = signals[1:]
[docs] def now() -> str: """NeXus-compliant format of the current time""" return datetime.now().astimezone().isoformat()
[docs] @contextlib.contextmanager def save_in_ewoks_process( output_root_uri: str, start_time: str, process_config: Dict[str, Any], **kw, ) -> Generator[Tuple[h5py.Group, bool], None, None]: with nexus.create_nexus_group(output_root_uri, **kw) as ( process_group, already_existed, ): if already_existed: logger.warning( "%s::%s already exists", process_group.file.filename, process_group.name ) yield process_group, already_existed else: entry_name = process_group.name.split("/")[1] entry_group = process_group.file[entry_name] if "start_time" not in entry_group: entry_group["start_time"] = start_time try: process_group.attrs["NX_class"] = "NXprocess" process_group["program"] = "ewoksfluo" process_group["version"] = version config_group = process_group.create_group("configuration") config_group.attrs["NX_class"] = "NXnote" config_group.create_dataset( "data", data=json.dumps(process_config, cls=NumpyEncoder) ) config_group.create_dataset("date", data=now()) config_group.create_dataset("type", data="application/json") yield process_group, already_existed finally: if "end_time" in entry_group: entry_group["end_time"][()] = now() else: entry_group["end_time"] = now()
[docs] @contextlib.contextmanager def save_in_ewoks_subprocess( output_root_uri: str, start_time: str, process_config: Dict[str, Any], collection_name: str = "results", **kw, ) -> Generator[Tuple[h5py.Group, bool], None, None]: with save_in_ewoks_process(output_root_uri, start_time, process_config, **kw) as ( process_group, already_existed, ): if already_existed: results = process_group[collection_name] else: results = process_group.create_group(collection_name) results.attrs["NX_class"] = "NXcollection" yield results, already_existed
[docs] class NumpyEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, (numpy.generic, numpy.ndarray)): return obj.tolist() return super().default(obj)
[docs] @h5py_utils.retry(retry_timeout=20) def wait_scan_finished(bliss_scan_uri: str) -> None: input_file, scan_h5path = hdf5.split_h5uri(bliss_scan_uri) with h5py_utils.File(input_file) as fh: parts = [p for p in scan_h5path.split("/") if p] if parts: scan = parts[0] _ = fh[scan]["end_time"][()]