import logging
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
import numpy
from scipy.optimize import minimize
from . import grid_utils
from . import rounding
logger = logging.getLogger(__name__)
[docs]
def optimal_grid_axes(
scatter_coordinates: Sequence[numpy.ndarray],
fix_resolution: bool = True,
fix_limits: bool = True,
resolution: Optional[Sequence[float]] = None,
) -> List[numpy.ndarray]:
"""
Compute optimal grid axes from scattered coordinates.
Degenerate dimensions (dimensions with zero data range) are handled.
:param scatter_coordinates: shape `(Nscatter, Ndim)`
:param fix_resolution:
:param fix_limits:
:param resolution:
:returns: `Ndim` 1D arrays with length `N0`, `N1`, ...
"""
scatter_coordinates = numpy.asarray(scatter_coordinates)
# Validate parameters
if scatter_coordinates.ndim != 2:
raise ValueError(
"Scatter coordinates is not a 2D array with shape (Nscatter, Ndim)"
)
num_data_points, Ndim = scatter_coordinates.shape
if resolution is None:
resolution = [None] * Ndim
# Determine data ranges
min_limits = numpy.min(scatter_coordinates, axis=0)
max_limits = numpy.max(scatter_coordinates, axis=0)
max_data_range = max_limits - min_limits
zero_dimension_mask = max_data_range == 0
if zero_dimension_mask.any():
zero_dimensions = numpy.where(zero_dimension_mask)[0].tolist()
for dim in zero_dimensions:
logger.debug(
"Dimension %d collapsed to singleton axis at value %s",
dim,
float(min_limits[dim]),
)
# Initial grid
start_stop_size = (
_estimate_start_stop_size(scatter_coordinates[:, i], resolution=resolutioni)
for i, resolutioni in enumerate(resolution)
)
initial_start_limits, initial_stop_limits, initial_grid_shape = zip(
*start_stop_size
)
logger.debug("Initial grid shape: %s", initial_grid_shape)
initial_start_limits = numpy.array(initial_start_limits, dtype=float)
initial_stop_limits = numpy.array(initial_stop_limits, dtype=float)
initial_grid_shape = numpy.array(initial_grid_shape, dtype=int)
initial_data_range = initial_stop_limits - initial_start_limits
# Initial resolution
initial_resolution = numpy.zeros(Ndim, dtype=float)
non_degenerate_mask = initial_grid_shape > 1
initial_resolution[non_degenerate_mask] = initial_data_range[
non_degenerate_mask
] / (initial_grid_shape[non_degenerate_mask] - 1)
logger.info("Initial resolution: %s", tuple(map(float, initial_resolution)))
# Prepare parameters of optimization
initial_guess = []
bounds = []
fixed_resolution_values = numpy.zeros(Ndim, dtype=float)
fixed_resolution_mask = numpy.zeros(Ndim, dtype=bool)
fixed_limits_values = numpy.zeros(2 * Ndim, dtype=float)
fixed_limits_mask = numpy.zeros(2 * Ndim, dtype=bool)
# Compile parameters of optimization: resolution
if fix_resolution:
fixed_resolution_values[:] = initial_resolution
fixed_resolution_mask[:] = True
else:
for i in range(Ndim):
if zero_dimension_mask[i]:
# Degenerate dimensions always fixed
fixed_resolution_values[i] = 0.0
fixed_resolution_mask[i] = True
continue
initial_guess.append(initial_resolution[i])
bounds.append(
(
max_data_range[i] / max(num_data_points - 1, 1),
max_data_range[i],
)
)
# Compile parameters of optimization: limits
if fix_limits:
fixed_limits_values[:Ndim] = initial_start_limits
fixed_limits_values[Ndim:] = initial_stop_limits
fixed_limits_mask[:] = True
else:
# Start limits
for i in range(Ndim):
if zero_dimension_mask[i]:
fixed_limits_values[i] = initial_start_limits[i]
fixed_limits_mask[i] = True
else:
initial_guess.append(initial_start_limits[i])
bounds.append((min_limits[i], max_limits[i]))
# Stop limits
for i in range(Ndim):
idx = Ndim + i
if zero_dimension_mask[i]:
fixed_limits_values[idx] = initial_stop_limits[i]
fixed_limits_mask[idx] = True
else:
initial_guess.append(initial_stop_limits[i])
bounds.append((max_limits[i], max_limits[i]))
# Do optimization unless all parameters are fixed
if initial_guess:
logger.info("Grid optimization applied")
initial_guess = numpy.asarray(initial_guess, dtype=float)
result = minimize(
_scatter_grid_distance_measure,
initial_guess,
args=(
scatter_coordinates,
fixed_resolution_values,
fixed_resolution_mask,
fixed_limits_values,
fixed_limits_mask,
),
method="Nelder-Mead",
bounds=bounds,
)
fit_parameters = result.x
else:
logger.info("No grid optimization applied")
fit_parameters = None
grid_axes = _fit_parameters_to_grid_axes(
Ndim,
fit_parameters=fit_parameters,
fixed_resolution=fixed_resolution_values,
fixed_resolution_mask=fixed_resolution_mask,
fixed_limits=fixed_limits_values,
fixed_limits_mask=fixed_limits_mask,
)
final_grid_shape = tuple(len(arr) for arr in grid_axes)
final_resolution = tuple(
arr[1] - arr[0] if arr.size > 1 else 0.0 for arr in grid_axes
)
logger.info("Final grid shape: %s", final_grid_shape)
logger.info("Final resolution: %s", tuple(map(float, final_resolution)))
return grid_axes
def _estimate_start_stop_size(
array: numpy.ndarray, resolution: Optional[float] = None
) -> Tuple[float, float, int]:
"""
Estimate grid start, stop, and number of points.
Degenerate arrays return a singleton axis.
:params array: 1D array
:param resolution: bin width
:returns: start value, stop value, number of points
"""
if len(array) == 0:
return numpy.nan, numpy.nan, 0
if len(array) == 1:
return array[0], array[0], 1
array = numpy.sort(array)
# Degenerate dimension
if numpy.all(array == array[0]):
return array[0], array[0], 1
if resolution is None:
distances = numpy.diff(array)
min_distance = numpy.min(distances)
mid_distance = min_distance + (numpy.max(distances) - min_distance) / 2
mask = distances > mid_distance
if mask.any():
resolution = numpy.median(distances[mask])
else:
resolution = numpy.median(distances)
resolution = rounding.round_to_significant(resolution)
start, stop, nbins = rounding.round_range(array[0], array[-1], resolution)
npoints = nbins + 1
return start, stop, npoints
def _scatter_grid_distance_measure(
fit_parameters: numpy.ndarray,
scatter_coordinates: numpy.ndarray,
fixed_resolution: Optional[numpy.ndarray] = None,
fixed_resolution_mask: Optional[numpy.ndarray] = None,
fixed_limits: Optional[numpy.ndarray] = None,
fixed_limits_mask: Optional[numpy.ndarray] = None,
) -> float:
"""
Distance measure between scattered points and a grid with a resolution and
start and stop limits that are either refined or fixed.
:param fit_parameters:
:param scatter_coordinates: `(Nscatter, Ndim)`
:param fixed_resolution:
:param fixed_resolution_mask:
:param fixed_limits:
:param fixed_limits_mask:
:returns: sum of closest distances of grid nodes with scatter_coordinates
"""
_, Ndim = scatter_coordinates.shape
grid_axes = _fit_parameters_to_grid_axes(
Ndim,
fit_parameters=fit_parameters,
fixed_resolution=fixed_resolution,
fixed_resolution_mask=fixed_resolution_mask,
fixed_limits=fixed_limits,
fixed_limits_mask=fixed_limits_mask,
)
expanded_grid_coordinates = grid_utils.expanded_grid_coordinates(grid_axes)
# Closest grid point for each scatter point
_, scatter_distances = grid_utils.closest_point(
expanded_grid_coordinates, scatter_coordinates
)
scatter_coverage = numpy.sum(scatter_distances)
# Closest scatter point for each grid point
_, grid_distances = grid_utils.closest_point(
scatter_coordinates, expanded_grid_coordinates
)
grid_coverage = numpy.sum(grid_distances)
return scatter_coverage + grid_coverage
def _fit_parameters_to_grid_axes(
Ndim: int,
fit_parameters: Optional[numpy.ndarray] = None,
fixed_resolution: Optional[numpy.ndarray] = None,
fixed_resolution_mask: Optional[numpy.ndarray] = None,
fixed_limits: Optional[numpy.ndarray] = None,
fixed_limits_mask: Optional[numpy.ndarray] = None,
) -> List[numpy.ndarray]:
"""
Convert optimization parameters to grid axes.
Degenerate dimensions produce singleton axes.
:param fit_parameters:
:param fixed_resolution:
:param fixed_resolution_mask:
:param fixed_limits:
:param fixed_limits_mask:
:returns: `Ndim` arrays with shape `(N0,)`, `(N1,)`, ...
"""
params_offset = 0
grid_resolution = numpy.zeros(Ndim, dtype=float)
grid_start_limits = numpy.zeros(Ndim, dtype=float)
grid_stop_limits = numpy.zeros(Ndim, dtype=float)
# Extract resolution from fit parameters and fixed values
for i in range(Ndim):
if fixed_resolution_mask is not None and fixed_resolution_mask[i]:
grid_resolution[i] = fixed_resolution[i]
else:
grid_resolution[i] = fit_parameters[params_offset]
params_offset += 1
# Extract start limits from fit parameters and fixed values
for i in range(Ndim):
if fixed_limits_mask is not None and fixed_limits_mask[i]:
grid_start_limits[i] = fixed_limits[i]
else:
grid_start_limits[i] = fit_parameters[params_offset]
params_offset += 1
# Extract stop limits from fit parameters and fixed values
for i in range(Ndim):
idx = Ndim + i
if fixed_limits_mask is not None and fixed_limits_mask[idx]:
grid_stop_limits[i] = fixed_limits[idx]
else:
grid_stop_limits[i] = fit_parameters[params_offset]
params_offset += 1
grid_axes = []
for i, (start, stop, step) in enumerate(
zip(grid_start_limits, grid_stop_limits, grid_resolution)
):
if not numpy.isfinite(start) or not numpy.isfinite(stop):
raise ValueError(
f"The grid along dimension {i} cannot be determined "
f"({start} → {stop})"
)
# Degenerate dimension -> singleton axis
if start == stop:
grid_axes.append(numpy.array([start], dtype=float))
continue
if step == 0 or not numpy.isfinite(step):
raise ValueError(
f"The grid step along dimension {i} is invalid "
f"({step}) for limits ({start} → {stop})"
)
grid_axes.append(_linspace_fixed_stop(start, stop, step))
return grid_axes
def _linspace_fixed_step(start: float, stop: float, step: float) -> numpy.ndarray:
return numpy.arange(start, stop, step)
def _linspace_fixed_stop(start: float, stop: float, step: float) -> numpy.ndarray:
n = max(int((stop - start) / step + 1.5), 1)
return numpy.linspace(start, stop, n)