Source code for ewoksfluo.tasks.regrid_data.scatter_utils
from typing import Sequence, Tuple, List
import numpy
[docs]
def scan_scatter_coordinates(
positions: Sequence[numpy.ndarray],
) -> Tuple[Sequence[numpy.ndarray], List[int]]:
"""Re-shape C-order flattened motor positions and sort from slow axis to fast axis.
The provided positions are the actual motor positions of an nD-dimensional scan
of shape `(n0, n1, ...)`, flattened in C-order to a shape of `(nPoints,).`
This algorithm works as long as the deviations of the motor positions from a
regular grid are smaller than 30% of the step size of the scan.
:param positions: `shape = (nD, nPoints)` where `nPoints` is a C-order
flattened form of `(n0, n1, ...)`
:returns: array of `shape = (nD, n0, n1, ...)`, axes indices from slow to fast
"""
ndim = len(positions)
if ndim <= 1:
return positions, list(range(ndim))
for threshold in numpy.linspace(0.5, 0.8, 10):
lst = sorted(
_motor_reset_size(values, threshold=threshold) for values in positions
)
ncumprod, _ = zip(*lst)
# For example 4 dimensions: ncumprod = [n0, n0*n1, n0*n1*n2, n0*n1*n2]
ncumprod = list(ncumprod)
ntotal = positions[0].size
ncumprod[-1] = ntotal
# For example 4 dimensions: ncumprod = [n0, n0*n1, n0*n1*n2, n0*n1*n2*n3]
denom = [1] + ncumprod[:-1]
shape_forder = [numi // denomi for numi, denomi in zip(ncumprod, denom)]
ntotal2 = int(numpy.prod(shape_forder))
if ntotal == ntotal2:
break
if ntotal != ntotal2:
raise RuntimeError("Could not determine scan shape from motor positions")
shape_forder = tuple(shape_forder)
indices_forder = [None] * ndim
remaining_indices = list(range(ndim))
for faxis, n in enumerate(ncumprod):
lst = sorted(
(_smooth_motion_metric(positions[i][:n]), i) for i in remaining_indices
)
metric, index = lst[0]
if not numpy.isfinite(metric):
continue
indices_forder[faxis] = index
remaining_indices.remove(index)
if len(remaining_indices) == 1:
break
if len(remaining_indices) != 1:
raise RuntimeError("Could not determine motor order")
indices_forder[indices_forder.index(None)] = remaining_indices[0]
shape_corder = shape_forder[::-1]
indices_corder = indices_forder[::-1]
positions = [arr.reshape(shape_corder) for arr in positions]
return positions, indices_corder
def _motor_reset_size(
values: numpy.ndarray, threshold: float = 0.5
) -> Tuple[int, float]:
"""Get the median and relative stddev of the number of points between motor resets.
:returns: median, stdev
"""
nvalues = len(values)
if nvalues == 0:
# No motor positions
return 0, numpy.inf
if nvalues == 1:
# Only one motor position
return 1, numpy.inf
steps = numpy.abs(numpy.diff(values))
stepmin = steps.min()
stepmax = steps.max()
if stepmin == stepmax:
# All motor positions are equal
return 1, numpy.inf
mid = (stepmax - stepmin) * threshold
resets = numpy.where(steps > mid)[0]
if resets.size == 1:
# Only one motor reset
return resets[0] + 1, numpy.inf
resetdiff = numpy.diff(resets)
resetsize = int(numpy.median(resetdiff))
resetstdev = numpy.std(resetdiff)
return resetsize, resetstdev / nvalues
def _smooth_motion_metric(values: numpy.ndarray) -> float:
"""Calculate a value which is smaller when the values represent a smooth motion"""
vmin = values.min()
vmax = values.max()
if vmin == vmax:
# All motor positions are equal
return numpy.inf
return numpy.std(values - numpy.linspace(vmin, vmax, values.size)) / abs(
vmax - vmin
)