Source code for ewoksfluo.xrffit.queue_messages
from typing import NamedTuple, Optional, Union
import numpy
from typing_extensions import TypeGuard
[docs]
class StopMsg(NamedTuple):
sendid: int
cmd: str = "STOP"
[docs]
class GroupMsg(NamedTuple):
sendid: int
destinationid: int
group: str
data: dict
cmd: str = "GROUP"
[docs]
class DatasetMsg(NamedTuple):
sendid: int
destinationid: int
group: str
name: str
npoints: int
attrs: dict
nscans: Optional[int]
cmd: str = "DATASET"
[docs]
class DataMsg(NamedTuple):
sendid: int
destinationid: int
group: str
name: str
value: numpy.ndarray
scan_index: Optional[int]
cmd: str = "DATA"
QueueMsg = Union[StopMsg, GroupMsg, DatasetMsg, DataMsg]
[docs]
def is_StopMsg(msg: QueueMsg) -> TypeGuard[StopMsg]:
return msg.cmd == "STOP"
[docs]
def is_GroupMsg(msg: QueueMsg) -> TypeGuard[GroupMsg]:
return msg.cmd == "GROUP"
[docs]
def is_DatasetMsg(msg: QueueMsg) -> TypeGuard[DatasetMsg]:
return msg.cmd == "DATASET"
[docs]
def is_DataMsg(msg: QueueMsg) -> TypeGuard[DataMsg]:
return msg.cmd == "DATA"