import logging
import numbers
from typing import Dict
from typing import Optional
from typing import Tuple
import numpy
from ..io import hdf5
_ENERGY_UNITS = {"kev": 1, "ev": 0.001}
_ENERGY_TEMPLATE = "instrument/positioners_start/{}"
_logger = logging.getLogger(__name__)
[docs]
def get_primary_beam_energy_suburi(bliss_scan_uri: str) -> Optional[str]:
return _get_unit_position_suburi(bliss_scan_uri, _ENERGY_UNITS)
[docs]
def get_primary_beam_energy_value(
bliss_scan_uri: str,
energy_name: Optional[str] = None,
energy_uri_template: Optional[str] = None,
search_on_units: bool = False,
) -> Optional[float]:
"""Get the incident beam energy from a Bliss scan."""
if energy_name:
if not energy_uri_template:
energy_uri_template = _ENERGY_TEMPLATE
try:
energy, units = _get_template_position_value(
bliss_scan_uri, energy_name, energy_uri_template
)
except KeyError:
_logger.warning(
"'%s' does not exist. Do not use the incident beam energy from the data.",
energy_name,
)
energy, units = None, None
elif search_on_units:
energy, units = _get_unit_position_value(bliss_scan_uri, _ENERGY_UNITS)
else:
energy, units = None, None
if energy is None:
_logger.warning("No incident beam energy positioner name is provided.")
return None
if units not in _ENERGY_UNITS:
_logger.warning("Ignore unexpected incident beam energy unit %r.", units)
return energy
return energy * _ENERGY_UNITS[units]
def _get_unit_position_suburi(
bliss_scan_uri: str, units: Dict[str, float]
) -> Optional[str]:
"""Get Bliss scan sub-URI for the first positioner with one of the units."""
scan_filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri)
with hdf5.FileReadAccess(scan_filename) as nxroot:
positioners = nxroot[f"{scan_h5path}/instrument/positioners_start"]
name = _get_positioner_name(positioners, units)
if name is not None:
return f"instrument/positioners_start/{name}"
def _get_template_position_value(
bliss_scan_uri: str, position_name: str, position_uri_template: str
) -> Tuple[float, str]:
"""Get position value and unit from a Bliss scan."""
scan_filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri)
suburi = position_uri_template.format(position_name)
with hdf5.FileReadAccess(scan_filename) as nxroot:
data_path = f"{scan_h5path}/{suburi}"
dset = nxroot[data_path]
punits = dset.attrs.get("units", "").lower()
return _asscalar(dset[()]), punits
def _get_unit_position_value(
bliss_scan_uri: str, units: Dict[str, float]
) -> Optional[Tuple[numbers.Number, str]]:
"""Get the first positioner value from a Bliss scan with one of the units."""
scan_filename, scan_h5path = hdf5.split_h5uri(bliss_scan_uri)
with hdf5.FileReadAccess(scan_filename) as nxroot:
positioners = nxroot[f"{scan_h5path}/instrument/positioners_start"]
return _get_positioner_value(positioners, units)
def _get_positioner_value(
positioners: hdf5.GroupType, units: Dict[str, float]
) -> Optional[Tuple[numbers.Number, str]]:
"""Get the first dataset value with one of the units."""
for name in positioners:
positioner = positioners[name]
punits = positioner.attrs.get("units", "").lower()
if punits in units:
return _asscalar(positioner[()]), punits
def _asscalar(value: numpy.ndarray, rtol=1.0e-5, atol=1.0e-8) -> numbers.Number:
if numpy.isscalar(value):
return value
if not isinstance(value, numpy.ndarray):
raise TypeError(f"Unsupported type: {type(value)}")
if value.size == 0:
raise ValueError("Cannot convert empty array to scalar")
if value.size == 1:
return value.item()
ref_value = numpy.nanmedian(value)
if numpy.allclose(value, ref_value, atol=atol, rtol=rtol):
return ref_value
raise ValueError(f"Array values differ beyond tolerance (shape {value.shape})")
def _get_positioner_name(
positioners: hdf5.GroupType, units: Dict[str, float]
) -> Optional[str]:
for name in positioners:
positioner = positioners[name]
punits = positioner.attrs.get("units", "").lower()
if punits in units:
return name