Source code for ewoksfluo.gui.data_viewer

import os
import sys
import logging
import tempfile
import traceback
from contextlib import contextmanager

from types import MappingProxyType
from typing import Optional, Union, Generator, Iterator, Tuple

from functools import cached_property

import h5py
import numpy

from AnyQt import QtCore
from AnyQt import QtWidgets
from AnyQt.QtCore import Qt

from silx.io import commonh5
from silx.io import h5py_utils
from silx.app.view.Viewer import Viewer as SilxViewer
from ..io.hdf5 import ReadHdf5File

_logger = logging.getLogger(__name__)


[docs] class DataViewer(SilxViewer): """Browse data from files supported by silx. To create the widget .. code: python viewer = DataViewer(parent) viewer.setVisible(True) parent.layout().addWidget(viewer) To close and refresh files .. code: python viewer.updateFile("/path/to/file1.h5") viewer.updateFile("/path/to/file2.h5") viewer.closeFile("/path/to/file1.h5") To close all files .. code: python viewer.closeAll() """ def __init__(self, parent=None): super().__init__(parent=parent) if parent is not None: self.setWindowFlags(Qt.Widget) # We cannot handle file opening in the viewer # because files are also opened when synchronizing model = self._findHdf5TreeModel() model.insertFile = _insertFile.__get__(model, type(model))
[docs] def close(self): if self.parent(): self.parent().close() else: self.close()
[docs] def closeFile(self, filename: str) -> None: """When the file is opened, close it.""" _, h5file = self._getFileObject(filename) if h5file is None: return model = self._findHdf5TreeModel() model.removeH5pyObject(h5file)
[docs] def updateFile(self, filename, **file_open_options): """When the file exists, append when not already appended and refresh view.""" # Remove when non existing if not os.path.exists(filename): self.closeFile(filename) return # Append when missing index, h5file = self._getFileObject(filename) if h5file is None: self.appendFile(filename, **file_open_options) index, h5file = self._getFileObject(filename) # Select the file (TODO: errors) selection_model = self._treeview.selectionModel() selection_model.clearSelection() self._treeview.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows) selection_model.select( index, QtCore.QItemSelectionModel.Select | QtCore.QItemSelectionModel.Rows ) self._treeview.setCurrentIndex(index) # Refresh the current selection self._Viewer__refreshAction.trigger()
@property def _treeview(self): return self._Viewer__treeview def _findHdf5TreeModel(self): return self._treeview.findHdf5TreeModel() def _iterH5FileObjects( self, ) -> Iterator[Tuple[QtCore.QModelIndex, "ViewerFile"]]: model = self._findHdf5TreeModel() root_index = QtCore.QModelIndex() root = model.nodeFromIndex(root_index) for row in range(root.childCount()): hdf5item = root.child(row) index = model.index(row, 0, root_index) yield index, hdf5item.obj def _getFileObject( self, filename ) -> Tuple[Optional[QtCore.QModelIndex], Optional["ViewerFile"]]: filename = os.path.normpath(os.path.abspath(filename)) for index, h5file in self._iterH5FileObjects(): filename2 = os.path.normpath(os.path.abspath(h5file.filename)) if filename2 == filename: return index, h5file return None, None
def _insertFile(self, filename, row=-1): """Open the file with the viewer proxy that does not keep the file opened.""" try: h5file = ViewerFile(filename) if self._Hdf5TreeModel__ownFiles: self._Hdf5TreeModel__openedFiles.append(h5file) self.sigH5pyObjectLoaded.emit(h5file, filename) self.insertH5pyObject(h5file, row=row, filename=filename) except IOError: _logger.debug("File '%s' can't be read.", filename, exc_info=True) raise
[docs] class ViewerDataset(commonh5.Dataset): """Proxy to a HDF5 dataset that does not keep the file opened.""" def __init__(self, name: str, parent: Union["ViewerFile", "ViewerGroup"]): super().__init__(name, None, parent=parent, attrs=None) @contextmanager def _h5dataset(self) -> Generator[h5py.Dataset, None, None]: with self.file._h5open() as h5file: yield h5file[self.name] def _get_h5attribute(self, attr: str): with self._h5dataset() as h5dataset: return getattr(h5dataset, attr) dtype = cached_property(lambda self: self._get_h5attribute("dtype")) shape = cached_property(lambda self: self._get_h5attribute("shape")) size = cached_property(lambda self: self._get_h5attribute("size")) ndim = cached_property(lambda self: self._get_h5attribute("ndim")) compression = cached_property(lambda self: self._get_h5attribute("compression")) compression_opts = cached_property( lambda self: self._get_h5attribute("compression_opts") ) chunks = cached_property(lambda self: self._get_h5attribute("chunks")) is_virtual = cached_property(lambda self: self._get_h5attribute("is_virtual")) virtual_sources = cached_property( lambda self: self._get_h5attribute("virtual_sources") ) external = cached_property(lambda self: self._get_h5attribute("external")) def __len__(self): with self._h5dataset() as h5dataset: return len(h5dataset) def __getitem__(self, item): with self._h5dataset() as h5dataset: return h5dataset[item] def __iter__(self): return self[()].__iter__() def __bool__(self): with self._h5dataset() as h5dataset: return bool(h5dataset) def __getattr__(self, item): """Proxy to underlying numpy array methods. Called for example when doing `numpy.array(dataset)`. """ data = self[()] if hasattr(data, item): return getattr(data, item) raise AttributeError("Dataset has no attribute %s" % item) @cached_property def attrs(self): with self._h5dataset() as h5dataset: return MappingProxyType(dict(h5dataset.attrs)) @property def value(self): raise NotImplementedError() # Should not be used: h5py v2 property def _get_data(self): # All method calling this method in the base class should be overridden stack_trace = traceback.format_stack() _logger.warning( f"ViewerDataset._get_data should not be called\nStack trace:\n{''.join(stack_trace)}" ) return self[()]
[docs] class ViewerGroup(commonh5.Group): """Proxy to a HDF5 group that does not keep the file opened.""" def __init__(self, name, parent): with parent.file._h5open() as h5file: full_name = f"{parent.name}/{name}" h5group = h5file[full_name] attrs = dict(h5group.attrs) super().__init__(name, parent=parent, attrs=attrs) _add_nodes(self, h5group)
[docs] class ViewerFile(commonh5.File): """Proxy to a HDF5 file that does not keep the file opened.""" def __init__(self, name: str, **file_open_options): self._file_open_options = file_open_options with ReadHdf5File(name, **self._file_open_options) as h5file: h5group = h5file["/"] # for order preservation attrs = dict(h5group.attrs) super().__init__(name=name, mode="r", attrs=attrs) _add_nodes(self, h5group) @contextmanager def _h5open(self) -> Generator[h5py.File, None, None]: with ReadHdf5File(self.filename, **self._file_open_options) as h5file: yield h5file
def _add_nodes( commongroup: Union[ViewerFile, ViewerGroup], h5item: Union[h5py.Group, h5py.File] ): for base_name in h5item: if h5item.get(base_name, default=None, getclass=True) is h5py.Group: commongroup.add_node(ViewerGroup(base_name, commongroup)) else: commongroup.add_node(ViewerDataset(base_name, commongroup))
[docs] def generate_example_data(name): filename = os.path.join(tempfile.gettempdir(), name) with h5py_utils.File(filename, mode="w") as nxroot: nxroot.attrs["NX_class"] = "NXroot" nxroot.attrs["creator"] = "test" nxroot.attrs["default"] = "2.1" # 1D data nxentry = nxroot.create_group("1.1") nxentry.attrs["NX_class"] = "NXroot" nxentry.attrs["default"] = "plot" nxentry["title"] = "ascan samx 0 3 3 0.1" measurement = nxentry.create_group("measurement") measurement.attrs["NX_class"] = "NXcollection" measurement["samx"] = [0, 1, 2, 3] measurement["diode1"] = [0, -1, -2, -3] measurement["diode2"] = [3, 1, 2, 0] nxdata = nxentry.create_group("plot") nxdata.attrs["NX_class"] = "NXdata" nxdata.attrs["signal"] = "diode1" nxdata.attrs["auxiliary_signals"] = ["diode2"] nxdata.attrs["axes"] = ["samx"] nxdata["samx"] = h5py.SoftLink(measurement["samx"].name) nxdata["diode1"] = h5py.SoftLink(measurement["diode1"].name) nxdata["diode2"] = h5py.SoftLink(measurement["diode2"].name) # 2D data nxentry = nxroot.create_group("2.1") nxentry.attrs["NX_class"] = "NXroot" nxentry.attrs["default"] = "plot" nxentry["title"] = "amesh samx 0 3 3 samy 0 4 4 0.1" measurement = nxentry.create_group("measurement") measurement.attrs["NX_class"] = "NXcollection" measurement["samx"] = [0, 0.1, 0.2, 0.3] measurement["samy"] = [0, 0.1, 0.2, 0.3, 0.4] measurement["diode1"] = numpy.zeros((4, 5)) measurement["diode2"] = numpy.ones((4, 5)) nxdata = nxentry.create_group("plot") nxdata.attrs["NX_class"] = "NXdata" nxdata.attrs["signal"] = "diode1" nxdata.attrs["auxiliary_signals"] = ["diode2"] nxdata.attrs["axes"] = ["samx", "samy"] nxdata["samx"] = h5py.SoftLink(measurement["samx"].name) nxdata["samx_indices"] = 0 nxdata["samy"] = h5py.SoftLink(measurement["samy"].name) nxdata["samy_indices"] = 1 nxdata["diode1"] = h5py.SoftLink(measurement["diode1"].name) nxdata["diode2"] = h5py.SoftLink(measurement["diode2"].name) return filename
[docs] def main(argv=None) -> Optional[int]: import argparse if argv is None: argv = sys.argv parser = argparse.ArgumentParser( description="Load HDF5 files and display their structure." ) parser.add_argument("files", nargs="*", help="List of HDF5 files to load.") parser.add_argument( "--example", action="store_true", help="Generate and open example data." ) args = parser.parse_args(args=argv[1:]) files = args.files if args.example: files.insert(0, generate_example_data("data_viewer_example.h5")) app = QtWidgets.QApplication(argv) dataviewer = DataViewer() dataviewer.resize(1300, 500) dataviewer._Viewer__splitter.setSizes([500, 800]) for filename in files: dataviewer.appendFile(filename) dataviewer.show() return app.exec_()
if __name__ == "__main__": sys.exit(main())