Source code for ewoksfluo.xrffit.legacy.handlers.queue

from queue import Empty
from queue import Queue
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Set
from typing import Tuple

import numpy

from ..queue_messages import DataMsg
from ..queue_messages import DatasetMsg
from ..queue_messages import GroupMsg
from ..queue_messages import QueueMsg
from ..queue_messages import StopMsg
from ..queue_messages import is_DataMsg
from ..queue_messages import is_DatasetMsg
from ..queue_messages import is_GroupMsg
from ..queue_messages import is_StopMsg
from .abstract import AbstractDataHandler
from .abstract import AbstractOutputHandler
from .nexus import NexusDataHandlerType
from .nexus import NexusOutputHandler


class _QueueDataHandler(AbstractDataHandler):
    def __init__(
        self,
        group: str,
        name: str,
        queue: Queue,
        sendid: int,
        destinationid: int,
        scan_index: Optional[int],
    ) -> None:
        self._queue = queue
        self._sendid = sendid
        self._destinationid = destinationid
        self._group = group
        self._name = name
        self._scan_index = scan_index

    def add_points(self, value: numpy.ndarray) -> None:
        msg = DataMsg(
            self._sendid,
            self._destinationid,
            self._group,
            self._name,
            value,
            self._scan_index,
        )
        self._queue.put(msg)


[docs] class QueueOutputHandler(AbstractOutputHandler): def __init__( self, queue: Queue, sendid: int, destinationid: int, nscans: Optional[int], scan_index: Optional[int], ): self._queue = queue self._sendid = sendid self._destinationid = destinationid self._nscans = nscans self._scan_index = scan_index def __enter__(self) -> "QueueOutputHandler": return self def __exit__(self, *args): stop_queue(self._queue, self._sendid)
[docs] def create_group(self, name: str, data: dict) -> None: msg = GroupMsg(self._sendid, self._destinationid, name, data) self._queue.put(msg)
[docs] def create_nxdata_handler( self, group_name: str, name: str, npoints: int, attrs: Optional[dict] = None, ) -> _QueueDataHandler: if attrs is None: attrs = {} msg = DatasetMsg( self._sendid, self._destinationid, group_name, name, npoints, attrs, self._nscans, ) self._queue.put(msg) return _QueueDataHandler( group_name, name, self._queue, self._sendid, self._destinationid, self._scan_index, )
[docs] def consume_handler_queue( output_handlers: Dict[int, NexusOutputHandler], queue: Queue, all_sendids: Set[int], raise_on_error: Callable[[], None], ): datasets: Dict[Tuple[int, str, str], NexusDataHandlerType] = dict() stopped_sendids: Set[int] = set() while True: raise_on_error() try: msg: QueueMsg = queue.get(timeout=0.5) except Empty: continue if is_StopMsg(msg): stopped_sendids.add(msg.sendid) if all_sendids == stopped_sendids: return continue if is_GroupMsg(msg): destid = msg.destinationid output_handler = output_handlers[destid] output_handler.create_group(msg.group, msg.data) continue if is_DatasetMsg(msg): group, name, destid = msg.group, msg.name, msg.destinationid output_handler = output_handlers[destid] dataset_key = (destid, group, name) if dataset_key not in datasets: datasets[dataset_key] = output_handler.create_nxdata_handler( group, name, msg.npoints, msg.attrs, msg.nscans ) continue if is_DataMsg(msg): group, name, destid = msg.group, msg.name, msg.destinationid output_handler = output_handlers[destid] dataset_key = (destid, group, name) datasets[dataset_key].add_points(msg.value, msg.scan_index) continue raise ValueError(f"Unknown command {msg.cmd}")
[docs] def stop_queue(queue: Queue, sendid: int): msg = StopMsg(sendid) queue.put(msg)