Source code for ewoksfluo.tasks.input.pick_scan

from typing import Optional

from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field

from ..nexus_utils import wait_scan_finished


[docs] class Inputs(BaseInputModel): filename: str = Field( description="Bliss dataset HDF5 file name.", examples=["/data/dataset.h5"] ) scan_number: int = Field(description="Scan number.", examples=[1, 2]) retry_timeout: Optional[float] = Field( 20, description="Timeout in seconds when waiting for the scan to be fully written. `None` means wait forever.", ) retry_period: float = Field( 0.5, description="Retry period in seconds when waiting for the scan to be fully written.", )
[docs] class Outputs(BaseOutputModel): bliss_scan_uri: str = Field( description="Bliss scan URI.", examples=["/data/dataset.h5::/1.1"] )
[docs] class PickScan(Task, input_model=Inputs, output_model=Outputs): """Select a single Bliss scan."""
[docs] def run(self): filename = self.inputs.filename scan_number = self.inputs.scan_number bliss_scan_uri = f"{filename}::/{scan_number}.1" wait_scan_finished( bliss_scan_uri, retry_timeout=self.inputs.retry_timeout, retry_period=self.inputs.retry_period, ) self.outputs.bliss_scan_uri = bliss_scan_uri