Source code for ewoksfluo.tasks.hdf5_utils

# TODO: copied from ewoksxrpd with a different (instrument group is not linked directly) -> extract to ewoksdata

import os
from typing import Tuple
from typing import Union

import h5py
from blissdata.h5api import dynamic_hdf5
from numpy.typing import DTypeLike
from silx.io.url import DataUrl

from ..io import hdf5






def _get_hdf5_filename(file_obj) -> str:
    try:
        return file_obj.filename
    except AttributeError:
        # to be fixed in blissdata
        return file_obj._retry_handler.file_obj.filename






def _normalize_hdf5_item_name(*parts) -> str:
    name = "/".join([s for part in parts for s in part.split("/") if s])
    return f"/{name}"






[docs] def get_dataset_shape_and_dtype(url: str) -> Tuple[Tuple[int, ...], DTypeLike]: filename, dset_name = hdf5.split_h5uri(url) with hdf5.FileReadAccess(filename) as root: if dset_name not in root: raise ValueError(f"{url!r} does not exist") dset = root[dset_name] if not hdf5.is_dataset(dset): raise ValueError(f"{url!r} does a dataset") return dset.shape, dset.dtype