Source code for ewoksfluo.io.spec

"""Read data from SPEC files"""

import os
import re
from glob import glob
from typing import Optional, Dict, Sequence, Iterator, List, Tuple

import numpy
from silx.io.specfile import SpecFile as _SpecFile

from . import types
from .zap_utils import rename_counter
from .zap_utils import iter_zap_data, get_zap_positioners


[docs] class SpecFile: def __init__(self, filename: str): self._filename = filename self._specfile = None def __enter__(self): self._specfile = _SpecFile(self._filename) return self def __exit__(self, *args): self.close()
[docs] def close(self): self._specfile.close() self._specfile = None
[docs] def get_scans(self) -> List[Tuple[int, int]]: return [tuple(map(int, k.split("."))) for k in self._specfile.keys()]
[docs] def get_number_of_subsubscans(self, scannb: int, subscan: int = 1) -> int: scan = self._specfile[f"{scannb}.{subscan}"] title = self.get_title(scannb, subscan) if "fXAS_scan" in title: return scan.data.shape[1] return 1
[docs] def iter_data( self, scannb: int, subscan: int = 1, subsubscan: int = 1, exclude_mca_detectors: Sequence[int] = tuple(), ) -> Iterator[types.ScanData]: scan = self._specfile[f"{scannb}.{subscan}"] header = scan.scan_header_dict motor_names = [rename_counter(s) for s in scan.motor_names] labels = [rename_counter(s) for s in scan.labels] spec_data = scan.data title = self.get_title(scannb, subscan) if "fXAS_scan" in title: zap_info = _zap_info_from_fluoxas_scan( header, spec_data.shape[1], subsubscan ) if zap_info: yield from iter_zap_data(zap_info, exclude_mca_detectors, motor_names) else: for name, data in zip(labels, spec_data): if name in motor_names or name in header["S"]: detector_type = "positioner" data_name = "value" else: detector_type = "" data_name = "data" yield types.ScanData( group=name, name=data_name, detector_type=detector_type, data=numpy.array(data), local_alias="", global_alias=name, ) yield from _iter_motors_from_title(title, labels) zap_info = _zap_info_from_scan(header) if zap_info: yield from iter_zap_data(zap_info, exclude_mca_detectors, motor_names)
[docs] def get_positioners( self, scannb: int, subscan: int = 1, subsubscan: int = 1 ) -> Dict[str, float]: scan = self._specfile[f"{scannb}.{subscan}"] motor_names = [rename_counter(s) for s in scan.motor_names] title = self.get_title(scannb, subscan) if "fXAS_scan" in title: zap_info = _zap_info_from_fluoxas_scan( scan.scan_header_dict, scan.data.shape[1], subsubscan ) else: zap_info = _zap_info_from_scan(scan.scan_header_dict) if zap_info: positioners = get_zap_positioners(zap_info) else: positioners = dict() positioners.update((zip(motor_names, scan.motor_positions))) return positioners
[docs] def get_title(self, scannb: int, subscan: int = 1) -> str: scan = self._specfile[f"{scannb}.{subscan}"] title = re.split(r"\s", scan.scan_header_dict["S"]) return " ".join([s for s in title[1:] if s]).strip()
def _zap_info_from_scan(header: Dict[str, str]) -> Optional[types.ZapInfo]: xia_keys = { "DIRECTORY": ("directory", str), "RADIX": ("radix", str), "ZAP SCAN NUMBER": ("scannb", int), "ZAP IMAGE NUMBER": ("zapnb", int), } info = _xia_info_from_header(header, xia_keys) if info is None: return return types.ZapInfo(**info) def _zap_info_from_fluoxas_scan( header: Dict[str, str], npoints: int, subsubscan: int ) -> Optional[types.ZapInfo]: xia_keys = { "DIRECTORY": ("directory", str), } info = _xia_info_from_header(header, xia_keys) if info is None: return directory, radix = os.path.split(info["directory"]) radix_prefix, _, nb_stop = radix.rpartition("_") nb_stop = int(nb_stop) nb_start = nb_stop - npoints zapnb = 0 nb = nb_start + subsubscan radix = f"{radix_prefix}_{nb:04d}" zap_directory = os.path.join(directory, radix) pattern = os.path.join(zap_directory, f"{radix}_xiast_*_{zapnb:04d}_0000.edf") files = glob(pattern) if len(files) != 1: return repattern = os.path.join( zap_directory, f"{radix}_xiast_([0-9]+)_{zapnb:04d}_0000.edf" ) m = re.match(repattern, files[0]) if not m: return scannb = int(m.groups()[0]) return types.ZapInfo( directory=zap_directory, radix=radix, scannb=scannb, zapnb=zapnb ) def _xia_info_from_header(header: dict, xia_keys: dict) -> Optional[dict]: info = dict() for s in header.get("C", "").split("\n"): tmp = s.split(":") if len(tmp) != 2: continue key, value = [s.strip() for s in tmp] key, deserialize = xia_keys.get(key, (None, None)) if key: info[key] = deserialize(value) if len(info) != len(xia_keys): return return info def _iter_motors_from_title( title: str, exclude: Sequence[str] = tuple() ) -> Iterator[types.ScanData]: title = [s for s in re.split(r"\s", title) if s] if title[0] == "zapline": try: cmd, motfast, startfast, endfast, npixelsfast, time = title except ValueError: return if motfast not in exclude: data = _contspace(startfast, endfast, npixelsfast) yield types.ScanData( group=motfast, name="value", detector_type="positioner", data=data, local_alias="", global_alias=motfast, ) elif title[0] == "zapimage": try: ( cmd, motfast, startfast, endfast, npixelsfast, time, motslow, startslow, endslow, nstepsslow, zero, ) = title except ValueError: return if motfast not in exclude: data = numpy.tile( _contspace(startfast, endfast, npixelsfast), int(nstepsslow) + 1, ) yield types.ScanData( group=motfast, name="value", detector_type="positioner", data=data, local_alias="", global_alias=motfast, ) if motslow not in exclude: data = numpy.repeat( _stepspace(startslow, endslow, nstepsslow), int(npixelsfast), ) yield types.ScanData( group=motslow, name="value", detector_type="positioner", data=data, local_alias="", global_alias=motslow, ) elif title[0] == "ascan": try: cmd, motfast, startfast, endfast, nstepsfast, time = title except ValueError: return if motfast not in exclude: data = _stepspace(startfast, endfast, nstepsfast) yield types.ScanData( group=motfast, name="value", detector_type="positioner", data=data, local_alias="", global_alias=motfast, ) def _contspace(start: str, stop: str, npoints: str) -> numpy.ndarray: start = float(start) stop = float(stop) npoints = int(npoints) nsteps = npoints - 1 half_step = (stop - start) / (2 * nsteps) return numpy.linspace(start + half_step, stop - half_step, npoints) def _stepspace(start: str, stop: str, nsteps: str) -> numpy.ndarray: start = float(start) stop = float(stop) npoints = int(nsteps) + 1 return numpy.linspace(start, stop, npoints)