Source code for ewoksfluo.io.hdf5

import os
import pathlib
import warnings
from collections import abc
from contextlib import contextmanager
from typing import Dict
from typing import Generator
from typing import Optional
from typing import Tuple
from typing import Union

import h5py
from silx.io import h5py_utils
from silx.io.url import DataUrl


[docs] def split_h5uri(url: Union[str, pathlib.Path, DataUrl]) -> Tuple[str, str]: if isinstance(url, DataUrl): obj = url else: obj = DataUrl(str(url)) return obj.file_path(), obj.data_path() or ""
[docs] def join_h5url(root_url: Union[str, pathlib.Path, DataUrl], sub_url: str) -> str: file_path, data_path = split_h5uri(root_url) while data_path.endswith("/"): data_path = data_path[:-1] while data_path.endswith("::"): data_path = data_path[:-2] while sub_url.startswith("/"): sub_url = sub_url[1:] while sub_url.endswith("/"): sub_url = sub_url[:-1] return f"{file_path}::{data_path}/{sub_url}"
[docs] def is_file(item) -> bool: return isinstance(item, (h5py.File, FileModExtAccess))
[docs] def is_group(item) -> bool: return isinstance(item, (h5py.Group, GroupModExtAccess))
[docs] def is_dataset(item) -> bool: return isinstance(item, h5py.Dataset)
FileType = Union[h5py.File, "FileModExtAccess"] GroupType = Union[h5py.Group, "GroupModExtAccess"] DatasetType = h5py.Dataset
[docs] class GroupModExtAccess(abc.MutableMapping): """ Wrapper around h5py.Group that delegates path resolution to the owning FileModExtAccess. """ def __init__(self, native_group: h5py.Group, file_wrapper: "FileModExtAccess"): self._native_group = native_group self._file_wrapper = file_wrapper def __getitem__(self, name: str) -> Union[h5py.Dataset, "GroupModExtAccess"]: return self._file_wrapper._resolve_from(self._native_group, name) def __setitem__(self, name: str, value) -> None: parent_name, _, set_name = name.rpartition("/") parent = self._file_wrapper._resolve_from(self._native_group, parent_name) parent._native_group[set_name] = value def __delitem__(self, name: str) -> None: parent_name, _, del_name = name.rpartition("/") parent = self._file_wrapper._resolve_from(self._native_group, parent_name) del parent._native_group[del_name] def __iter__(self): return iter(self._native_group) def __len__(self) -> int: return len(self._native_group) @property def name(self) -> str: return self._native_group.name @property def file(self) -> "FileModExtAccess": return self._file_wrapper
[docs] def get(self, name: str, default=None, getclass=False, getlink=False): if getclass or getlink: return self._native_group.get( name, default=default, getclass=getclass, getlink=getlink ) if name in self._native_group: return self[name] return default
@property def attrs(self): return self._native_group.attrs def __repr__(self): return f'<{type(self).__name__} "{self.name}">'
[docs] class FileModExtAccess(GroupModExtAccess): """ Wrapper around an ``h5py_utils.File`` that ensures any ``h5py.ExternalLink`` is opened with alternative access parameters. """ def __init__(self, filename: Union[str, pathlib.Path], **open_options): if not isinstance(filename, (str, pathlib.Path)): raise TypeError("filename must be a string") self._external_open_options = dict(open_options) self._native_file = h5py_utils.File(filename, **open_options) self._external_files: Dict[str, FileModExtAccess] = {} super().__init__(self._native_file, self)
[docs] def set_external_access(self, **opts): self._external_open_options.update(opts)
@property def filename(self) -> str: return self._native_file.filename
[docs] def close(self): # Close external files then root file for root in list(self._external_files.values()): try: root.close() except Exception: pass self._external_files.clear() if self._native_file is not None: try: self._native_file.close() finally: self._native_file = None
def __enter__(self) -> "FileModExtAccess": return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close() def _open_external( self, filename: str, owner_file: h5py_utils.File ) -> h5py_utils.File: """Open (or return cached) external file. Always opened with read + no locking.""" if not os.path.isabs(filename): base_dir = os.path.dirname(owner_file.filename) filename = os.path.join(base_dir, filename) key = os.path.realpath(filename) if key in self._external_files: return self._external_files[key] ext_h5 = FileModExtAccess(filename, **self._external_open_options) self._external_files[key] = ext_h5 return ext_h5 def _resolve_from( self, native_parent: Union[h5py_utils.File, h5py.Group], name: str ) -> Union[h5py.Dataset, "GroupModExtAccess"]: # If absolute, start from the file that owns the parent is_absolute = name.startswith("/") if is_absolute: native_current = native_parent.file else: native_current = native_parent current_file = self # Traverse tree structure explicitly with explicit link resolution parts = [p for p in name.split("/") if p] i = 0 while i < len(parts): part = parts[i] try: link = native_current.get(part, getlink=True) except Exception: link = None if isinstance(link, h5py.ExternalLink): ext_file = self._open_external(link.filename, native_current.file) link_parts = [p for p in link.path.split("/") if p] # jump into external target path + remaining tail parts = link_parts + parts[i + 1 :] native_current = ext_file._native_file current_file = ext_file i = 0 continue if isinstance(link, h5py.SoftLink): link_parts = [p for p in link.path.split("/") if p] parts = link_parts + parts[i + 1 :] link_is_absolute = link.path.startswith("/") if link_is_absolute: native_current = native_current.file i = 0 continue # Hard link, dataset, group or virtual dataset obj = native_current[part] i += 1 if i < len(parts): # Need to descend further: obj must be a group if isinstance(obj, h5py.Group): native_current = obj continue else: raise KeyError( f"Cannot descend into non-group '{part}' while resolving path '{name}'" ) else: # Last component: return dataset or wrapped group native_current = obj # Return dataset or wrapped group if isinstance(native_current, h5py.Group): return GroupModExtAccess(native_current, current_file) return native_current
[docs] class FileReadAccess(FileModExtAccess): """Use in cases where you want to read something from an HDF5 which might be already open for writing. """ def __init__(self, filename: Union[str, pathlib.Path], mode: str = "r", **kwargs): if not isinstance(filename, (str, pathlib.Path)): raise TypeError("filename must be a string") assert mode == "r", "must be opened read-only" try: super().__init__(filename, mode=mode, **kwargs) except Exception: super().__init__(filename, mode="a", **kwargs) self.set_external_access(mode="r")
[docs] class ReadHdf5File(FileReadAccess): def __init__(self, *args, **kwargs): warnings.warn( "ReadHdf5File is deprecated, please use FileReadAccess instead.", category=DeprecationWarning, stacklevel=2, ) super().__init__(*args, **kwargs)
[docs] def is_same_file( url1: Union[str, pathlib.Path, DataUrl], url2: Union[str, pathlib.Path, DataUrl] ) -> bool: filename1, _ = split_h5uri(url1) filename2, _ = split_h5uri(url2) filename1 = os.path.abspath(os.path.normpath(filename1)) filename2 = os.path.abspath(os.path.normpath(filename2)) return filename1 == filename2