Source code for ewoksfluo.tasks.nexus_utils
import contextlib
import importlib.metadata
import json
import logging
from datetime import datetime
from typing import Any
from typing import Dict
from typing import Generator
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
import h5py
import numpy
from ewoksdata.data import nexus
from ..io import hdf5
version = importlib.metadata.version("ewoksfluo")
logger = logging.getLogger(__name__)
[docs]
def create_nxdata(
parent: Union[h5py.File, h5py.Group],
name: str,
signal: Optional[str] = None,
) -> h5py.Group:
nxdata = parent.create_group(name)
nxdata.attrs["NX_class"] = "NXdata"
if signal:
nxdata.attrs["signal"] = signal
nexus.select_default_plot(nxdata)
return nxdata
[docs]
def set_nxdata_signals(nxdata: hdf5.GroupType, signals: Sequence[str]):
nxdata.attrs["signal"] = signals[0]
if len(signals) > 1:
nxdata.attrs["auxiliary_signals"] = signals[1:]
[docs]
def now() -> str:
"""NeXus-compliant format of the current time"""
return datetime.now().astimezone().isoformat()
[docs]
@contextlib.contextmanager
def save_in_ewoks_process(
output_root_uri: str,
start_time: str,
process_config: Dict[str, Any],
default_levels=("results", "process"),
**kw,
) -> Generator[Tuple[h5py.Group, bool], None, None]:
with nexus.create_nexus_group(
output_root_uri, default_levels=default_levels, **kw
) as (
process_group,
already_existed,
):
if already_existed:
logger.warning(
"%s::%s already exists", process_group.file.filename, process_group.name
)
yield process_group, already_existed
else:
entry_name = process_group.name.split("/")[1]
entry_group = process_group.file[entry_name]
if "start_time" not in entry_group:
entry_group["start_time"] = start_time
try:
process_group.attrs["NX_class"] = "NXprocess"
process_group["program"] = "ewoksfluo"
process_group["version"] = version
config_group = process_group.create_group("configuration")
config_group.attrs["NX_class"] = "NXnote"
config_group.create_dataset(
"data", data=json.dumps(process_config, cls=NumpyEncoder)
)
config_group.create_dataset("date", data=now())
config_group.create_dataset("type", data="application/json")
yield process_group, already_existed
finally:
if "end_time" in entry_group:
entry_group["end_time"][()] = now()
else:
entry_group["end_time"] = now()
[docs]
@contextlib.contextmanager
def save_in_ewoks_subprocess(
output_root_uri: str,
start_time: str,
process_config: Dict[str, Any],
collection_name: str = "results",
**kw,
) -> Generator[Tuple[h5py.Group, bool], None, None]:
with save_in_ewoks_process(output_root_uri, start_time, process_config, **kw) as (
process_group,
already_existed,
):
if already_existed:
results = process_group[collection_name]
else:
results = process_group.create_group(collection_name)
results.attrs["NX_class"] = "NXcollection"
yield results, already_existed
[docs]
class NumpyEncoder(json.JSONEncoder):
[docs]
def default(self, obj):
if isinstance(obj, (numpy.generic, numpy.ndarray)):
return obj.tolist()
return super().default(obj)