Source code for ewoksfluo.tasks.normalization.norm_results_stack

import logging
from typing import List
from typing import Optional

from ewokscore import Task
from ewokscore.model import BaseInputModel
from ewokscore.model import BaseOutputModel
from pydantic import Field

from ...io import hdf5
from ...io import output_uri
from .norm_utils import normalization_coefficient_stack
from .norm_utils import normalization_template
from .norm_utils import save_normalized_xrf_results

_logger = logging.getLogger(__name__)


[docs] class Inputs(BaseInputModel): bliss_scan_uris: List[str] = Field( description="Bliss scan URI's.", examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]], ) output_root_uri: str = Field( description="Target HDF5 file URI with optional data path.", examples=[ "/results/dataset.h5", "/results/dataset.h5::/1.1", "/results/dataset.h5::/1.1/norm", ], ) output_root_group: Optional[str] = Field( default=None, description="Optional group underneath ``output_root_uri``." ) xrf_results_uri: str = Field( description="Previous XRF NXprocess URI to normalize.", examples=["/results/dataset.h5::/1.1/sum/results"], ) normalization_expression: Optional[str] = Field( default=None, description="Arithmetic expression to be used for normalization." ) counter_normalization_template: Optional[str] = Field( default="np.nanmean(<instrument/{}/data>)/<instrument/{}/data>", description="Arithmetic expression to be used for normalization. " "HDF5 URI's <...> are relative to ``bliss_scan_uris``.", ) counter_name: Optional[str] = Field( default=None, description="To be used in ``counter_normalization_template``.", ) detector_normalization_template: Optional[str] = Field( default="1./<instrument/{}/live_time>", description="Arithmetic expression to be used for normalization. " "HDF5 URI's <...> are relative to ``bliss_scan_uris``.", ) detector_name: Optional[str] = Field( default=None, description="To be used in ``detector_normalization_template``.", )
[docs] class Outputs(BaseOutputModel): bliss_scan_uris: List[str] = Field( description="Bliss scan URI's.", examples=[["/data/dataset.h5::/1.1", "/data/dataset.h5::/2.1"]], ) output_root_uri: str = Field( description="Original output root URI received as input.", examples=["/results/dataset.h5::/1.1"], ) output_root_group: Optional[str] = Field( default=None, description="Original output root group received as input." ) xrf_results_uri: str = Field( description="Output NXprocess results URI.", examples=["/results/dataset.h5::/1.1/norm/results"], )
[docs] class NormalizeXrfResultsStack(Task, input_model=Inputs, output_model=Outputs): """Normalize XRF stack results with raw scan counters. Typical normalizers are beam monitors and measurement (live) time. """
[docs] def run(self): expression = self.inputs.normalization_expression detector_name = self.inputs.detector_name counter_name = self.inputs.counter_name if not (expression or detector_name or counter_name): _logger.warning( "Neither 'normalization_expression', 'counter_name', nor 'detector_name' was specified. Normalization is skipped." ) self.outputs.xrf_results_uri = self.inputs.xrf_results_uri self.outputs.bliss_scan_uris = self.inputs.bliss_scan_uris self.outputs.output_root_uri = self.inputs.output_root_uri self.outputs.output_root_group = self.inputs.output_root_group return counter_normalization_template = self.inputs.counter_normalization_template detector_normalization_template = self.inputs.detector_normalization_template normalization_expression = normalization_template( expression, counter_normalization_template, counter_name, detector_normalization_template, detector_name, ) _logger.info("Multiply XRF results with: %s", normalization_expression) coefficient = normalization_coefficient_stack( self.inputs.bliss_scan_uris, normalization_expression ) process_config = {"normalization_expression": normalization_expression} _, scan_h5path0 = hdf5.split_h5uri(self.inputs.bliss_scan_uris[0]) output_root_uri = output_uri.compose_full_output_uri( self.inputs.output_root_uri, default_output_data_path=scan_h5path0, extra_data_paths=(self.inputs.output_root_group, "norm"), ) self.outputs.xrf_results_uri = save_normalized_xrf_results( self.inputs.xrf_results_uri, coefficient, output_root_uri, process_config ) self.outputs.bliss_scan_uris = self.inputs.bliss_scan_uris self.outputs.output_root_uri = self.inputs.output_root_uri self.outputs.output_root_group = self.inputs.output_root_group