Source code for ewoksfluo.tasks.input.pick_scans

from typing import List
from typing import Tuple

try:
    from typing import Self
except ImportError:
    from typing_extensions import Self

from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field
from pydantic import model_validator

from ..nexus_utils import wait_scan_finished


[docs] class Inputs(BaseInputModel): filenames: List[str] = Field( description="Bliss dataset HDF5 file names.", examples=[["/data/dataset1.h5", "/data/dataset2.h5"]], ) scan_ranges: List[Tuple[int, int]] = Field( description="Ranges of scan numbers.", examples=[[(1, 4), (100, 101)]], ) exclude_scans: List[List[int]] = Field( default_factory=list, description="Scan numbers to exclude for each range.", examples=[[[1, 3], []]], )
[docs] @model_validator(mode="after") def check_and_expand_lengths(self) -> Self: n_files = len(self.filenames) n_ranges = len(self.scan_ranges) n_exclude = len(self.exclude_scans) if n_files != n_ranges: raise ValueError("`filenames` and `scan_ranges` must have the same length") if n_exclude == 0: self.exclude_scans = [[] for _ in range(n_files)] return self if n_exclude != n_files: raise ValueError( "`exclude_scans` must have the same length as `filenames` or be empty" ) return self
[docs] class Outputs(BaseOutputModel): bliss_scan_uris: List[str] = Field( description="Bliss scan URI.", examples=[ [ "/data/dataset1.h5::/2.1", "/data/dataset1.h5::/4.1", "/data/dataset2.h5::/100.1", "/data/dataset2.h5::/101.1", ] ], )
[docs] class PickScans(Task, input_model=Inputs, output_model=Outputs): """Select a multiple Bliss scan from multiple files."""
[docs] def run(self): filenames = self.inputs.filenames scan_ranges = self.inputs.scan_ranges exclude_scans = self.inputs.exclude_scans bliss_scan_uris = [] for filename, scan_range, excluded_scans in zip( filenames, scan_ranges, exclude_scans ): scan_min, scan_max = scan_range excluded_scans = excluded_scans if excluded_scans else [] for scan_number in range(scan_min, scan_max + 1): if scan_number in excluded_scans: continue bliss_scan_uri = f"{filename}::/{scan_number}.1" wait_scan_finished(bliss_scan_uri) bliss_scan_uris.append(bliss_scan_uri) self.outputs.bliss_scan_uris = bliss_scan_uris