Source code for ewoksfluo.tasks.input.pick_scan

from typing import Optional

from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field

from .pick_utils import pick_scans


[docs] class Inputs(BaseInputModel): filename: str = Field( description="Bliss dataset HDF5 file name.", examples=["/data/dataset.h5"] ) scan_number: int = Field(description="Scan number.", examples=[1, 2]) retry_timeout: Optional[float] = Field( 20, description="Timeout in seconds when waiting for the scan to be fully written. " "`None` means wait forever. Negative means do not wait.", ) retry_period: float = Field( 0.5, description="Retry period in seconds when waiting for the scan to be fully written.", )
[docs] class Outputs(BaseOutputModel): bliss_scan_uri: str = Field( description="Bliss scan URI.", examples=["/data/dataset.h5::/1.1"] )
[docs] class PickScan(Task, input_model=Inputs, output_model=Outputs): """Select a single Bliss scan."""
[docs] def run(self): bliss_scan_uris = pick_scans( [self.inputs.filename], [(self.inputs.scan_number, self.inputs.scan_number)], [()], retry_timeout=self.inputs.retry_timeout, retry_period=self.inputs.retry_period, ) self.outputs.bliss_scan_uri = bliss_scan_uris[0]