Source code for ewoksfluo.tasks.normalization.norm_results_stack

import logging
from typing import List
from typing import Optional

from ewokscore import Task
from ewokscore.model import BaseOutputModel
from pydantic import Field

from ..models import FluoInputs
from .norm_utils import normalization_coefficient_stack
from .norm_utils import normalization_template
from .norm_utils import save_normalized_xrf_results

_logger = logging.getLogger(__name__)


[docs] class Inputs(FluoInputs): bliss_scan_uris: List[str] = Field( description="Bliss scan URI's.", examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]], ) output_root_uri: str = Field( description="Output root URI.", examples=[ "/results/dataset.h5", "/results/dataset.h5::/1.1", "/results/dataset.h5::/1.1/norm", ], ) xrf_results_uri: Optional[str] = Field( description="Previous XRF NXprocess URI to normalize.", examples=["/results/dataset.h5::/1.1/sum/results"], ) normalization_expression: Optional[str] = Field( default=None, description="Arithmetic expression to be used for normalization." ) counter_normalization_template: str = Field( default="np.nanmean(<instrument/{}/data>)/<instrument/{}/data>", description="Arithmetic expression to be used for normalization.", ) counter_name: Optional[str] = Field( default=None, description="To be used in :math:`counter_normalization_template`.", ) detector_normalization_template: str = Field( default="1./<instrument/{}/live_time>", description="Arithmetic expression to be used for normalization.", ) detector_name: Optional[str] = Field( default=None, description="To be used in :math:`detector_normalization_template`.", )
[docs] class Outputs(BaseOutputModel): bliss_scan_uris: List[str] = Field( description="Bliss scan URI's.", examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]], ) output_root_uri: str = Field( description="Output NXentry URI.", examples=["/results/dataset.h5::/1.1"] ) xrf_results_uri: str = Field( description="Output NXprocess results URI.", examples=["/results/dataset.h5::/1.1/norm/results"], )
[docs] class NormalizeXrfResultsStack(Task, input_model=Inputs, output_model=Outputs): """Normalize XRF stack results with raw scan counters. Typical normalizers are beam monitors and measurement (live) time. """
[docs] def run(self): expression = self.inputs.normalization_expression detector_name = self.inputs.detector_name counter_name = self.inputs.counter_name if not (expression or detector_name or counter_name): _logger.warning( "Neither 'normalization_expression', 'counter_name', nor 'detector_name' was specified. Normalization is skipped." ) self.outputs.xrf_results_uri = self.inputs.xrf_results_uri self.outputs.bliss_scan_uris = self.inputs.bliss_scan_uris self.outputs.output_root_uri = self.inputs.output_root_uri return counter_normalization_template = self.inputs.counter_normalization_template detector_normalization_template = self.inputs.detector_normalization_template normalization_expression = normalization_template( expression, counter_normalization_template, counter_name, detector_normalization_template, detector_name, ) _logger.info("Multiply XRF results with: %s", normalization_expression) coefficient = normalization_coefficient_stack( self.inputs.bliss_scan_uris, normalization_expression ) process_config = {"normalization_expression": normalization_expression} self.outputs.xrf_results_uri = save_normalized_xrf_results( self.inputs.xrf_results_uri, coefficient, self.inputs.output_root_uri, process_config, ) self.outputs.bliss_scan_uris = self.inputs.bliss_scan_uris self.outputs.output_root_uri = self.inputs.output_root_uri