Source code for ewoksfluo.xrffit.utils
from multiprocessing import Queue
from typing import Optional
from contextlib import contextmanager
from .handlers import NexusOutputHandler
from .handlers import QueueOutputHandler
from .buffers import PyMcaOutputBuffer
from .buffers import ExternalOutputBuffer
[docs]
@contextmanager
def queue_outputbuffer_context(
queue: Queue,
sendid: int,
destinationid: int,
nscans: Optional[int] = None,
scan_index: Optional[int] = None,
diagnostics: bool = False,
figuresofmerit: bool = True,
):
with QueueOutputHandler(
queue, sendid, destinationid, nscans, scan_index
) as handler:
yield ExternalOutputBuffer(
handler, diagnostics=diagnostics, figuresofmerit=figuresofmerit
)
[docs]
@contextmanager
def outputbuffer_context(
output_uri: str,
diagnostics: bool = False,
figuresofmerit: bool = True,
output_handler: str = "nexus",
open_options: Optional[dict] = None,
):
if open_options is None:
open_options = {}
if output_handler == "nexus":
if diagnostics:
default_group = "fit"
else:
default_group = "parameters"
with NexusOutputHandler(
output_uri, default_group=default_group, **open_options
) as handler:
yield ExternalOutputBuffer(
handler, diagnostics=diagnostics, figuresofmerit=figuresofmerit
)
return
if output_handler == "pymca":
# Handler and buffer are coupled
yield PyMcaOutputBuffer(
output_uri,
diagnostics=diagnostics,
figuresofmerit=figuresofmerit,
**open_options,
)
return