Source code for ewoksfluo.io.hdf5

import os
from typing import Tuple, Union, Optional, Generator
from contextlib import contextmanager

import h5py
from silx.io.url import DataUrl
from silx.io import h5py_utils


[docs] def split_h5uri(url: str) -> Tuple[str, str]: obj = DataUrl(url) return obj.file_path(), obj.data_path() or ""
[docs] def join_h5url(root_url: str, sub_url: str) -> str: file_path, data_path = split_h5uri(root_url) while data_path.endswith("/"): data_path = data_path[:-1] while data_path.endswith("::"): data_path = data_path[:-2] while sub_url.startswith("/"): sub_url = sub_url[1:] while sub_url.endswith("/"): sub_url = sub_url[:-1] return f"{file_path}::{data_path}/{sub_url}"
[docs] class ReadHdf5File(h5py_utils.File): """Use in cases where you want to read something from an HDF5 which might be already open for writing.""" def __init__(self, *args, mode: str = "r", **kwargs): assert mode == "r", "must be opened read-only" try: super().__init__(*args, mode=mode, **kwargs) except Exception: super().__init__(*args, mode="a", **kwargs)