Source code for ewoksfluo.tasks.input.pick_scan
from typing import Optional
from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field
from .pick_utils import pick_scans
[docs]
class Outputs(BaseOutputModel):
bliss_scan_uri: str = Field(
description="Bliss scan URI.", examples=["/data/dataset.h5::/1.1"]
)
[docs]
class PickScan(Task, input_model=Inputs, output_model=Outputs):
"""Select a single Bliss scan."""
[docs]
def run(self):
bliss_scan_uris = pick_scans(
[self.inputs.filename],
[(self.inputs.scan_number, self.inputs.scan_number)],
[()],
retry_timeout=self.inputs.retry_timeout,
retry_period=self.inputs.retry_period,
)
self.outputs.bliss_scan_uri = bliss_scan_uris[0]