Source code for ewoksfluo.xrffit.outputbuffer_utils
from multiprocessing import Queue
from typing import Optional
from contextlib import contextmanager
from .handlers import NexusOutputHandler
from .handlers import QueueOutputHandler
from .buffers import PyMcaOutputBuffer
from .buffers import ExternalOutputBuffer
[docs]
@contextmanager
def queue_outputbuffer_context(
queue: Queue,
sendid: int,
destinationid: int,
nscans: Optional[int] = None,
scan_index: Optional[int] = None,
diagnostics: bool = False,
figuresofmerit: bool = True,
):
with QueueOutputHandler(
queue, sendid, destinationid, nscans, scan_index
) as handler:
yield ExternalOutputBuffer(
handler, diagnostics=diagnostics, figuresofmerit=figuresofmerit
)
[docs]
@contextmanager
def outputbuffer_context(
output_root_uri: str,
diagnostics: bool = False,
figuresofmerit: bool = True,
output_handler: str = "nexus",
open_options: Optional[dict] = None,
):
if open_options is None:
open_options = {}
if output_handler == "nexus":
if diagnostics:
default_group = "fit"
else:
default_group = "parameters"
with NexusOutputHandler(
output_root_uri, default_group=default_group, **open_options
) as handler:
yield ExternalOutputBuffer(
handler, diagnostics=diagnostics, figuresofmerit=figuresofmerit
)
elif output_handler == "pymca":
# Handler and buffer are coupled
yield PyMcaOutputBuffer(
output_root_uri,
diagnostics=diagnostics,
figuresofmerit=figuresofmerit,
**open_options,
)
else:
raise ValueError(f"Unknown output handler {output_handler}")