from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Tuple
[docs]
class DataType(NamedTuple):
nmca: int
nmcasum: int
nscans: int
ndim0: int
ndim1: int
mosaic: bool
partial_mca_sum: bool
npts: int = 3000
nchan: int = 1024
dim0_name: str = "sampz"
dim1_name: str = "sampy"
fit_elementgroups: List[str] = ["Ca_K", "Ce_L", "Fe_K", "Si_K"]
fit_parameters: List[str] = [
"Ca_K",
"Ce_L",
"Fe_K",
"Si_K",
"Scatter_Compton000",
"Scatter_Peak000",
]
mca_meta_counters: List[str] = [
"roi1",
"roi2",
"triggers",
"trigger_live_time",
"trigger_count_rate",
"events",
"live_time",
"event_count_rate",
"elapsed_time",
"fractional_dead_time",
]
@property
def flat_result_shape(self) -> Tuple[int, ...]:
if self.nscans == 1:
return (self.npts,)
return (self.nscans, self.npts)
@property
def flat_spectra_shape(self) -> Tuple[int, ...]:
if self.nscans == 1:
return (self.npts, self.nchan)
return (self.nscans, self.npts, self.nchan)
@property
def regridded_result_shape(self) -> Tuple[int, ...]:
if self.nscans == 1:
return (self.ndim0, self.ndim1)
return (self.nscans, self.ndim0, self.ndim1)
@property
def derivate_shape(self) -> Tuple[int, ...]:
if self.nscans == 1:
return (self.nchan,)
return (self.nscans, self.nchan)
@property
def mca_sum_names(self) -> List[str]:
if self.nmcasum == 0:
return []
if self.nmcasum == 1:
return ["mcasum"]
return [f"mcasum{i}" for i in range(self.nmcasum)]
[docs]
def build_expected(data_type: DataType) -> Dict[str, Any]:
return _nx_class(
"NXroot",
had_default=True,
**{"1.1": _nx_entry(data_type=data_type)},
)
def _nx_entry(data_type: DataType) -> Dict[str, Any]:
time_meta = {"start_time@shape": (), "end_time@shape": ()}
optional = {}
fit_results = {}
if data_type.nmcasum:
optional["instrument"] = _build_instrument(data_type=data_type)
optional["measurement"] = _build_measurement(data_type=data_type)
optional["sumspectra"] = _build_sumspectra_process(data_type=data_type)
for name in data_type.mca_sum_names:
fit_results[name] = _build_fit_process(data_type=data_type)
if data_type.partial_mca_sum:
optional["sum"] = _build_sumresults_process(data_type=data_type)
else:
for i in range(data_type.nmca):
fit_results[f"mca{i}"] = _build_fit_process(data_type=data_type)
if data_type.nmca > 1:
optional["sum"] = _build_sumresults_process(data_type=data_type)
return _nx_class(
"NXentry",
had_default=True,
**time_meta,
fit=_nx_class("NXcollection", had_default=True, **fit_results),
merge=_build_merge_process(data_type=data_type),
norm=_build_norm_process(data_type=data_type),
regrid=_build_regrid_process(data_type=data_type),
**optional,
)
def _build_fit_process(data_type: DataType) -> Dict[str, Any]:
optional = {}
if data_type.nscans == 1:
optional["diagnostics"] = _build_diagnostics(data_type=data_type)
results = _nx_class(
"NXcollection",
had_default=True,
derivatives=_build_derivatives(data_type=data_type),
fit=_build_fit_block(data_type=data_type),
massfractions=_build_massfractions(data_type=data_type),
parameters=_build_parameters(data_type=data_type),
uncertainties=_build_uncertainties(data_type=data_type),
**optional,
)
return _nx_class(
"NXprocess",
had_default=True,
**_build_program(),
configuration=_build_configuration(),
results=results,
)
def _build_merge_process(data_type: DataType) -> Dict[str, Any]:
rawunits = {
f"{data_type.dim0_name}@units": "um",
f"{data_type.dim1_name}@units": "um",
}
if data_type.nscans == 1:
skip = []
else:
skip = ["energy"]
rawsignals = [
k for k in _raw_counters(data_type=data_type, with_motors=True) if k not in skip
]
rawcounters = {f"{k}@shape": data_type.flat_result_shape for k in rawsignals}
rawcounters.update(rawunits)
results = _nx_class(
"NXcollection",
had_default=True,
massfractions=_build_massfractions(data_type=data_type),
parameters=_build_parameters(data_type=data_type),
rawcounters=_nx_data(signals=rawsignals, **rawcounters),
uncertainties=_build_uncertainties(data_type=data_type),
)
return _nx_class(
"NXprocess",
**_build_program(),
had_default=True,
configuration=_build_configuration(),
results=results,
)
def _build_norm_process(data_type: DataType) -> Dict[str, Any]:
results = _nx_class(
"NXcollection",
had_default=True,
massfractions=_build_massfractions(data_type=data_type),
parameters=_build_parameters(data_type=data_type),
uncertainties=_build_uncertainties(data_type=data_type),
)
return _nx_class(
"NXprocess",
**_build_program(),
had_default=True,
configuration=_build_configuration(),
results=results,
)
def _build_sumresults_process(data_type: DataType) -> Dict[str, Any]:
results = _nx_class(
"NXcollection",
had_default=True,
massfractions=_build_massfractions(data_type=data_type),
parameters=_build_parameters(data_type=data_type),
uncertainties=_build_uncertainties(data_type=data_type),
)
return _nx_class(
"NXprocess",
had_default=True,
**_build_program(),
configuration=_build_configuration(),
results=results,
)
def _build_regrid_process(data_type: DataType) -> Dict[str, Any]:
return _nx_class(
"NXprocess",
**_build_program(),
had_default=True,
configuration=_build_configuration(),
results=_build_regrid_results(data_type=data_type),
)
def _build_sumspectra_process(data_type: DataType) -> Dict[str, Any]:
groups = {
name: _nx_data(
signals=["data"],
**{
"data@shape": data_type.flat_spectra_shape,
"data@interpretation": "spectrum",
},
)
for name in data_type.mca_sum_names
}
return _nx_class(
"NXprocess",
had_default=True,
**_build_program(),
configuration=_build_configuration(),
**groups,
)
def _build_program() -> Dict[str, Any]:
return {"program@shape": (), "version@shape": ()}
def _build_configuration() -> Dict[str, Any]:
return _nx_class("NXnote", **{"data@shape": (), "date@shape": (), "type@shape": ()})
def _build_derivatives(data_type: DataType) -> Dict[str, Any]:
children = {
f"{s}@shape": data_type.derivate_shape for s in data_type.fit_parameters
}
children["energy@shape"] = (data_type.nchan,)
children["energy@units"] = "keV"
return _nx_data(signals=data_type.fit_parameters, axes=["energy"], **children)
def _build_diagnostics(data_type: DataType) -> Dict[str, Any]:
return _nx_data(
signals=["nFreeParameters", "nObservations"],
**{
"nFreeParameters@shape": data_type.flat_result_shape,
"nObservations@shape": data_type.flat_result_shape,
},
)
def _build_fit_block(data_type: DataType) -> Dict[str, Any]:
return _nx_data(
signals=["data", "model", "residuals"],
axes=[".", "energy"],
**{
"data@shape": data_type.flat_spectra_shape,
"model@shape": data_type.flat_spectra_shape,
"residuals@shape": data_type.flat_spectra_shape,
"energy@shape": (data_type.nchan,),
"energy@units": "keV",
},
)
def _build_massfractions(data_type: DataType) -> Dict[str, Any]:
return _nx_data(
signals=data_type.fit_elementgroups,
**{
f"{k}@shape": data_type.flat_result_shape
for k in data_type.fit_elementgroups
},
)
def _build_parameters(data_type: DataType) -> Dict[str, Any]:
return _nx_data(
signals=data_type.fit_parameters,
**{f"{k}@shape": data_type.flat_result_shape for k in data_type.fit_parameters},
)
def _build_uncertainties(data_type: DataType) -> Dict[str, Any]:
return _build_parameters(data_type=data_type)
def _build_regrid_results(data_type: DataType) -> Dict[str, Any]:
if data_type.nscans == 1:
axes = [data_type.dim0_name, data_type.dim1_name]
common = {
f"{data_type.dim0_name}@shape": (data_type.ndim0,),
f"{data_type.dim1_name}@shape": (data_type.ndim1,),
f"{data_type.dim0_name}@units": "um",
f"{data_type.dim1_name}@units": "um",
f"{data_type.dim0_name}_indices@shape": (),
f"{data_type.dim1_name}_indices@shape": (),
f"{data_type.dim0_name}@long_name": f"{data_type.dim0_name} (um)",
f"{data_type.dim1_name}@long_name": f"{data_type.dim1_name} (um)",
}
else:
axes = ["energy", data_type.dim0_name, data_type.dim1_name]
common = {
"energy@shape": (data_type.nscans,),
f"{data_type.dim0_name}@shape": (data_type.ndim0,),
f"{data_type.dim1_name}@shape": (data_type.ndim1,),
"energy@units": "keV",
f"{data_type.dim0_name}@units": "um",
f"{data_type.dim1_name}@units": "um",
"energy_indices@shape": (),
f"{data_type.dim0_name}_indices@shape": (),
f"{data_type.dim1_name}_indices@shape": (),
"energy@long_name": "energy (keV)",
f"{data_type.dim0_name}@long_name": f"{data_type.dim0_name} (um)",
f"{data_type.dim1_name}@long_name": f"{data_type.dim1_name} (um)",
}
def children(names: List[str]) -> Dict[str, Any]:
datasets = {f"{k}@shape": data_type.regridded_result_shape for k in names}
return {**datasets, **common}
groups = dict(
massfractions=_nx_data(
signals=data_type.fit_elementgroups,
axes=axes,
**children(data_type.fit_elementgroups),
),
parameters=_nx_data(
signals=data_type.fit_parameters,
axes=axes,
**children(data_type.fit_parameters),
),
rawcounters=_nx_data(
signals=_raw_counters(data_type, with_motors=False),
axes=axes,
**children(list(_raw_counters(data_type=data_type, with_motors=False))),
),
uncertainties=_nx_data(
signals=data_type.fit_parameters,
axes=axes,
**children(data_type.fit_parameters),
),
coordinates=_build_coordinates(data_type=data_type),
)
return _nx_class("NXcollection", had_default=True, **{"title@shape": ()}, **groups)
def _build_coordinates(data_type: DataType) -> Dict[str, Any]:
def generate(name: str, n: int):
return {
"title@shape": (),
"signals": [name],
f"{name}@shape": (n,),
"axes": [data_type.dim1_name, data_type.dim0_name],
f"{data_type.dim0_name}@shape": (n,),
f"{data_type.dim1_name}@shape": (n,),
f"{data_type.dim0_name}@units": "um",
f"{data_type.dim1_name}@units": "um",
f"{data_type.dim0_name}@long_name": f"{data_type.dim0_name} (um)",
f"{data_type.dim1_name}@long_name": f"{data_type.dim1_name} (um)",
}
groups = dict(
scatter_coordinates=_nx_data(**generate("indices", data_type.npts)),
closest_grid_distance=_nx_data(**generate("distance", data_type.npts)),
grid_coordinates=_nx_data(
**generate("has_data", data_type.ndim0 * data_type.ndim1)
),
closest_scatter_distance=_nx_data(
**generate("distance", data_type.ndim0 * data_type.ndim1)
),
coordinates=_nx_data(
**generate(
"coordinate_type", data_type.npts + data_type.ndim0 * data_type.ndim1
)
),
)
return _nx_class("NXcollection", **groups)
def _build_instrument(data_type: DataType) -> Dict[str, Any]:
energy_scalar = {"energy@shape": (), "energy@units": "keV"}
mot0 = {
f"{data_type.dim0_name}@shape": data_type.flat_result_shape,
f"{data_type.dim0_name}@units": "um",
}
mot1 = {
f"{data_type.dim1_name}@shape": data_type.flat_result_shape,
f"{data_type.dim1_name}@units": "um",
}
mot0_scalar = {
f"{data_type.dim0_name}@shape": (),
f"{data_type.dim0_name}@units": "um",
}
mot1_scalar = {
f"{data_type.dim1_name}@shape": (),
f"{data_type.dim1_name}@units": "um",
}
positioners = {
name: _nx_class(
"NXpositioner",
**{"value@shape": data_type.flat_result_shape, "value@units": "um"},
)
for name in [data_type.dim0_name, data_type.dim1_name]
}
counters = {
"I0": _nx_class("NXdetector", **{"data@shape": data_type.flat_result_shape})
}
mcas = {
f"mca{i}": _build_mca_detector(data_type=data_type)
for i in range(data_type.nmca)
}
mcasums = {
name: _build_mca_detector_sum(data_type=data_type)
for name in data_type.mca_sum_names
}
if data_type.mosaic:
positioners_start = _nx_class("NXcollection", **energy_scalar, **mot0, **mot1)
positioners_end = _nx_class("NXcollection", **energy_scalar, **mot0, **mot1)
else:
positioners_start = _nx_class(
"NXcollection", **energy_scalar, **mot0_scalar, **mot1_scalar
)
positioners_end = _nx_class(
"NXcollection", **energy_scalar, **mot0_scalar, **mot1_scalar
)
return _nx_class(
"NXinstrument",
**counters,
**mcas,
**mcasums,
**positioners,
positioners=_nx_class("NXcollection", **energy_scalar, **mot0, **mot1),
positioners_start=positioners_start,
positioners_end=positioners_end,
)
def _build_measurement(data_type: DataType) -> Dict[str, Any]:
counters = {
f"{name}@shape": data_type.flat_result_shape
for name in _raw_counters(data_type=data_type, with_motors=True)
}
counters.update(
{
f"{data_type.dim0_name}@units": "um",
f"{data_type.dim1_name}@units": "um",
}
)
mcas = {
f"mca{i}@shape": data_type.flat_spectra_shape for i in range(data_type.nmca)
}
mcasums = {}
for name in data_type.mca_sum_names:
mcasums[f"{name}@shape"] = data_type.flat_spectra_shape
mcasums[f"{name}@interpretation"] = "spectrum"
return _nx_class("NXcollection", **counters, **mcas, **mcasums)
def _build_mca_detector(data_type: DataType) -> Dict[str, Any]:
meta_counters = {
f"{ctr}@shape": data_type.flat_result_shape
for ctr in data_type.mca_meta_counters
}
return _nx_class(
"NXdetector",
**{
"data@shape": data_type.flat_spectra_shape,
"spectrum@shape": data_type.flat_spectra_shape,
**meta_counters,
},
)
def _build_mca_detector_sum(data_type: DataType) -> Dict[str, Any]:
return _nx_class(
"NXdetector",
**{
"data@shape": data_type.flat_spectra_shape,
"data@interpretation": "spectrum",
},
)
def _raw_counters(data_type: DataType, with_motors: bool) -> List[str]:
counters = ["I0"]
if with_motors:
if data_type.nscans > 1:
counters.append("energy")
counters += [data_type.dim0_name, data_type.dim1_name]
for i in range(data_type.nmca):
counters.extend([f"mca{i}_{ctr}" for ctr in data_type.mca_meta_counters])
return counters
def _nx_class(
nx_class: str, *, had_default: bool = False, **children
) -> Dict[str, Any]:
node = {"@NX_class": nx_class}
if had_default:
node["@extra_attrs"] = {"default"}
node.update(children)
return node
def _nx_data(
*,
signals: List[str],
axes: Optional[List[str]] = None,
had_default: bool = False,
**children: Dict[str, Any],
) -> Dict[str, Any]:
node = _nx_class("NXdata", had_default=had_default)
node["@signals"] = set(signals)
if axes:
node["@axes"] = list(axes)
if children:
node.update(children)
return node