Source code for ewoksfluo.io.zap_utils

"""Read data from ZAP scans in SPEC"""

import os
import re
from glob import glob
from typing import Dict, Sequence, Iterator, Tuple

import numpy
import fabio

from .types import ScanData, ZapInfo


_REMOVE_COUNTER_PREFIXES = {"zap_p201_": "", "zap_": "", "arr_": "", "xmap_": ""}

_REMOVE_EDF_HEADER_KEYS = (
    "EDF_BinarySize",
    "EDF_HeaderSize",
    "Dim_1",
    "Dim_2",
    "Image",
    "Size",
    "xdata",
    "xnb",
    "acquisition_time",
)


_XMAP_COUNTER_PATTERN = re.compile("^xmap_(.+?)(Sum|[0-9]+)$")


[docs] def rename_counter(name: str) -> str: for substr, substrnew in _REMOVE_COUNTER_PREFIXES.items(): name = name.replace(substr, substrnew) return name
[docs] def iter_zap_data( zap_info: ZapInfo, exclude_mca_detectors: Sequence[int], motor_names: Sequence[str] ) -> Iterator[ScanData]: """Yields MCA spectra, MCA statistics and counters from EDF files""" yield from _iter_xia_ctr_data(zap_info, exclude_mca_detectors, motor_names) static_data = dict() yield from _iter_xia_stats_data(zap_info, exclude_mca_detectors, static_data) for detnr in range(static_data.get("ndet", 0)): if detnr in exclude_mca_detectors: continue yield from _iter_xia_mca_data(zap_info, detnr) yield from _iter_lima_data(zap_info)
[docs] def get_zap_positioners(zap_info: ZapInfo) -> Dict[str, float]: pattern = os.path.join( zap_info.directory, f"{zap_info.radix}_xiast_{zap_info.scannb:04d}_{zap_info.zapnb:04d}_[0-9][0-9][0-9][0-9].edf", ) files = sorted(glob(pattern)) if not files: return dict() with fabio.open(files[0]) as f: return _get_motors_from_xia_header(f.header)
def _iter_xia_stats_data( zap_info: ZapInfo, exclude_mca_detectors: Sequence[int], static_data: dict ) -> Iterator[ScanData]: """We have one xiast file for each line in a 2D scan. The 2D data in each xiast file has the shape `(ncols, 6*n)` where * `ncols` the number of points in each line of the 2D scan * `n`: the number of detectors * `6`: the number of statistics recorded per detector The columns of the 2D data in each xiast file are: ..code: |DETNR1|EVENTS1|ICR1|OCR1|TLT1|DT1|...|DETNRn|EVENTSn|ICRn|OCRn|TLTn|DTn """ pattern = os.path.join( zap_info.directory, f"{zap_info.radix}_xiast_{zap_info.scannb:04d}_{zap_info.zapnb:04d}_[0-9][0-9][0-9][0-9].edf", ) files = sorted(glob(pattern)) if not files: return 0 for filename in files: nstats = 6 with fabio.open(filename) as f: data = f.data ncols, ntotalstats = data.shape ndet = ntotalstats // nstats if not static_data: static_data["ndet"] = ndet ( detnr, events, trigger_count_rate, event_count_rate, trigger_live_time, fractional_dead_time, ) = data.reshape((ncols, ndet, nstats)).T trigger_live_time = trigger_live_time / 1000 fractional_dead_time = fractional_dead_time / 100 triggers = trigger_count_rate * trigger_live_time elapsed_time = events / event_count_rate live_time = events / trigger_count_rate for i in range(ndet): j = detnr[i][0] if j in exclude_mca_detectors: continue group = f"xia_det{j}" yield ScanData( group=group, name="events", detector_type="mca", data=events[i], local_alias="", global_alias=f"{group}_events", ) yield ScanData( group=group, name="trigger_count_rate", detector_type="mca", data=trigger_count_rate[i], local_alias="", global_alias=f"{group}_trigger_count_rate", ) yield ScanData( group=group, name="event_count_rate", detector_type="mca", data=event_count_rate[i], local_alias="", global_alias=f"{group}_event_count_rate", ) yield ScanData( group=group, name="trigger_live_time", detector_type="mca", data=trigger_live_time[i], local_alias="", global_alias=f"{group}_trigger_live_time", ) yield ScanData( group=group, name="fractional_dead_time", detector_type="mca", data=fractional_dead_time[i], local_alias="", global_alias=f"{group}_fractional_dead_time", ) yield ScanData( group=group, name="triggers", detector_type="mca", data=triggers[i], local_alias="", global_alias=f"{group}_triggers", ) yield ScanData( group=group, name="elapsed_time", detector_type="mca", data=elapsed_time[i], local_alias="", global_alias=f"{group}_elapsed_time", ) yield ScanData( group=group, name="live_time", detector_type="mca", data=live_time[i], local_alias="", global_alias=f"{group}_live_time", ) def _iter_xia_mca_data(zap_info: ZapInfo, detnr: int) -> Iterator[ScanData]: """We have one xiaXX file for each detector and each line in a 2D scan. The 2D data in each xiaXX file has the shape `(ncols, nchan)` where * `ncols` the number of points in each line of a 2D scan * `nchan`: the number of MCA channels """ pattern = os.path.join( zap_info.directory, f"{zap_info.radix}_xia{detnr:02d}_{zap_info.scannb:04d}_{zap_info.zapnb:04d}_[0-9][0-9][0-9][0-9].edf", ) files = sorted(glob(pattern)) for filename in files: with fabio.open(filename) as f: yield ScanData( group=f"xia_det{detnr}", name="spectrum", detector_type="mca", data=f.data, local_alias="data", global_alias=f"xia_det{detnr}", ) def _iter_lima_data(zap_info: ZapInfo) -> Iterator[ScanData]: """We have one lima file for each detector and for each point in a 2D scan.""" pattern = os.path.join( zap_info.directory, f"{zap_info.radix}_*_{zap_info.scannb:04d}_[0-9][0-9][0-9][0-9]_[0-9][0-9][0-9][0-9].edf", ) files = {_detector_key(filename): filename for filename in glob(pattern)} for (camera, *_), filename in sorted(files.items()): if camera.startswith("xia"): continue with fabio.open(filename) as f: data = f.data if data.ndim != 2: continue yield ScanData( group=camera, name="image", detector_type="lima", data=data[numpy.newaxis, ...], local_alias="data", global_alias=camera, ) def _detector_key(filename: str) -> Tuple[str, int, int]: parts = os.path.splitext(os.path.basename(filename))[0].split("_") camera, _, slownr, fastnr = parts[-4:] return camera, int(slownr), int(fastnr) def _iter_xia_ctr_data( zap_info: Dict[str, numpy.ndarray], exclude_mca_detectors: Sequence[int], motor_names: Sequence[str], ) -> Iterator[ScanData]: """We have one file for each counter with the shape `(nrows, ncols)` where * `nrows` the number of lines in a 2D scan * `ncols` the number of points in each line of a 2D scan """ pattern = os.path.join( zap_info.directory, f"{zap_info.radix}_*_{zap_info.scannb:04d}_{zap_info.zapnb:04d}.edf", ) files = glob(pattern) if not files: return before, _, after = pattern.partition("*") nbefore = len(before) nafter = len(after) raw_camera_names = set() for filename in files: raw_ctr_name = filename[nbefore:-nafter] if raw_ctr_name in raw_camera_names: continue is_xmap_counter = _XMAP_COUNTER_PATTERN.match(raw_ctr_name) if is_xmap_counter: roi, detnr = is_xmap_counter.groups() if detnr == "Sum": continue detnr = int(detnr) if detnr in exclude_mca_detectors: continue group = f"xia_det{detnr}" name = rename_counter(roi) detector_type = "mca" global_alias = f"{group}_{name}" else: group = rename_counter(raw_ctr_name) name = "data" global_alias = group if group in motor_names: detector_type = "positioner" else: detector_type = "" with fabio.open(filename) as f: if "ccd_expo_time" in f.header: raw_camera_names.add(raw_ctr_name) continue yield ScanData( group=group, name=name, detector_type=detector_type, data=f.data.flatten(), local_alias="", global_alias=global_alias, ) def _get_motors_from_xia_header(header: Dict[str, str]) -> Dict[str, float]: motors = dict() for name, value in header.items(): if name in _REMOVE_EDF_HEADER_KEYS: continue try: value = float(value) except ValueError: continue motors[name] = value return motors