Source code for ewoksfluo.tasks.positioner_utils

import logging
import numbers
from typing import Dict
from typing import Optional
from typing import Tuple

import numpy

from ..io import hdf5

_ENERGY_UNITS = {"kev": 1, "ev": 0.001}
_ENERGY_TEMPLATE = "instrument/positioners_start/{}"

_logger = logging.getLogger(__name__)


[docs] def get_primary_beam_energy_suburi(bliss_scan_uri: str) -> Optional[str]: return _get_unit_position_suburi(bliss_scan_uri, _ENERGY_UNITS)
[docs] def get_primary_beam_energy_value( bliss_scan_uri: str, energy_name: Optional[str] = None, energy_uri_template: Optional[str] = None, search_on_units: bool = False, ) -> Optional[float]: """Get the incident beam energy from a Bliss scan.""" if energy_name: if not energy_uri_template: energy_uri_template = _ENERGY_TEMPLATE try: energy, units = _get_template_position_value( bliss_scan_uri, energy_name, energy_uri_template ) except KeyError: _logger.warning( "'%s' does not exist. Do not use the incident beam energy from the data.", energy_name, ) energy, units = None, None elif search_on_units: energy, units = _get_unit_position_value(bliss_scan_uri, _ENERGY_UNITS) else: energy, units = None, None if energy is None: _logger.warning("No incident beam energy position name is provided.") return None if units not in _ENERGY_UNITS: _logger.warning("Ignore unexpected incident beam energy unit %r.", units) return energy return energy * _ENERGY_UNITS[units]
def _get_unit_position_suburi( bliss_scan_uri: str, units: Dict[str, float] ) -> Optional[str]: """Get Bliss scan sub-URI for the first positioner with one of the units.""" scan_filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri) with hdf5.FileReadAccess(scan_filename) as nxroot: positioners = nxroot[f"{scan_h5path}/instrument/positioners_start"] name = _get_positioner_name(positioners, units) if name is not None: return f"instrument/positioners_start/{name}" def _get_template_position_value( bliss_scan_uri: str, position_name: str, position_uri_template: str ) -> Tuple[float, str]: """Get position value and unit from a Bliss scan.""" scan_filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri) suburi = position_uri_template.format(position_name) with hdf5.FileReadAccess(scan_filename) as nxroot: data_path = f"{scan_h5path}/{suburi}" dset = nxroot[data_path] punits = dset.attrs.get("units", "").lower() return _asscalar(dset[()]), punits def _get_unit_position_value( bliss_scan_uri: str, units: Dict[str, float] ) -> Optional[Tuple[numbers.Number, str]]: """Get the first positioner value from a Bliss scan with one of the units.""" scan_filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri) with hdf5.FileReadAccess(scan_filename) as nxroot: positioners = nxroot[f"{scan_h5path}/instrument/positioners_start"] return _get_positioner_value(positioners, units) def _get_positioner_value( positioners: hdf5.GroupType, units: Dict[str, float] ) -> Optional[Tuple[numbers.Number, str]]: """Get the first dataset value with one of the units.""" for name in positioners: positioner = positioners[name] punits = positioner.attrs.get("units", "").lower() if punits in units: return _asscalar(positioner[()]), punits def _asscalar(value: numpy.ndarray, rtol=1.0e-5, atol=1.0e-8) -> numbers.Number: if numpy.isscalar(value): return value if not isinstance(value, numpy.ndarray): raise TypeError(f"Unsupported type: {type(value)}") if value.size == 0: raise ValueError("Cannot convert empty array to scalar") if value.size == 1: return value.item() ref_value = numpy.nanmedian(value) if numpy.allclose(value, ref_value, atol=atol, rtol=rtol): return ref_value raise ValueError(f"Array values differ beyond tolerance (shape {value.shape})") def _get_positioner_name( positioners: hdf5.GroupType, units: Dict[str, float] ) -> Optional[str]: for name in positioners: positioner = positioners[name] punits = positioner.attrs.get("units", "").lower() if punits in units: return name