Source code for ewoksfluo.xrffit.legacy.outputbuffer_utils
from contextlib import contextmanager
from multiprocessing import Queue
from typing import Generator
from typing import Optional
from .buffers import AbstractOutputBuffer
from .buffers import ExternalOutputBuffer
from .buffers import PyMcaOutputBuffer
from .handlers import NexusOutputHandler
from .handlers import QueueOutputHandler
[docs]
@contextmanager
def queue_outputbuffer_context(
queue: Queue,
sendid: int,
destinationid: int,
nscans: Optional[int] = None,
scan_index: Optional[int] = None,
diagnostics: bool = False,
figuresofmerit: bool = True,
) -> Generator[ExternalOutputBuffer, None, None]:
with QueueOutputHandler(
queue, sendid, destinationid, nscans, scan_index
) as handler:
yield ExternalOutputBuffer(
handler, diagnostics=diagnostics, figuresofmerit=figuresofmerit
)
[docs]
@contextmanager
def outputbuffer_context(
output_root_uri: str,
diagnostics: bool = False,
figuresofmerit: bool = True,
output_handler: str = "nexus",
open_options: Optional[dict] = None,
) -> Generator[AbstractOutputBuffer, None, None]:
if open_options is None:
open_options = {}
if output_handler == "nexus":
if diagnostics:
default_group = "fit"
else:
default_group = "parameters"
with NexusOutputHandler(
output_root_uri, default_group=default_group, **open_options
) as handler:
yield ExternalOutputBuffer(
handler, diagnostics=diagnostics, figuresofmerit=figuresofmerit
)
elif output_handler == "pymca":
# Handler and buffer are coupled
yield PyMcaOutputBuffer(
output_root_uri,
diagnostics=diagnostics,
figuresofmerit=figuresofmerit,
**open_options,
)
else:
raise ValueError(f"Unknown output handler {output_handler}")