Source code for ewoksfluo.math.optimal_grid

import logging
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple

import numpy
from scipy.optimize import minimize

from . import grid_utils
from . import rounding

logger = logging.getLogger(__name__)


[docs] def optimal_grid_axes( scatter_coordinates: Sequence[numpy.ndarray], fix_resolution: bool = True, fix_limits: bool = True, resolution: Optional[Sequence[float]] = None, ) -> List[numpy.ndarray]: """ Compute optimal grid axes from scattered coordinates. Degenerate dimensions (dimensions with zero data range) are handled. :param scatter_coordinates: shape `(Nscatter, Ndim)` :param fix_resolution: :param fix_limits: :param resolution: :returns: `Ndim` 1D arrays with length `N0`, `N1`, ... """ scatter_coordinates = numpy.asarray(scatter_coordinates) # Validate parameters if scatter_coordinates.ndim != 2: raise ValueError( "Scatter coordinates is not a 2D array with shape (Nscatter, Ndim)" ) num_data_points, Ndim = scatter_coordinates.shape if resolution is None: resolution = [None] * Ndim # Determine data ranges min_limits = numpy.min(scatter_coordinates, axis=0) max_limits = numpy.max(scatter_coordinates, axis=0) max_data_range = max_limits - min_limits zero_dimension_mask = max_data_range == 0 if zero_dimension_mask.any(): zero_dimensions = numpy.where(zero_dimension_mask)[0].tolist() for dim in zero_dimensions: logger.debug( "Dimension %d collapsed to singleton axis at value %s", dim, float(min_limits[dim]), ) # Initial grid start_stop_size = ( _estimate_start_stop_size(scatter_coordinates[:, i], resolution=resolutioni) for i, resolutioni in enumerate(resolution) ) initial_start_limits, initial_stop_limits, initial_grid_shape = zip( *start_stop_size ) logger.debug("Initial grid shape: %s", initial_grid_shape) initial_start_limits = numpy.array(initial_start_limits, dtype=float) initial_stop_limits = numpy.array(initial_stop_limits, dtype=float) initial_grid_shape = numpy.array(initial_grid_shape, dtype=int) initial_data_range = initial_stop_limits - initial_start_limits # Initial resolution initial_resolution = numpy.zeros(Ndim, dtype=float) non_degenerate_mask = initial_grid_shape > 1 initial_resolution[non_degenerate_mask] = initial_data_range[ non_degenerate_mask ] / (initial_grid_shape[non_degenerate_mask] - 1) logger.info("Initial resolution: %s", tuple(map(float, initial_resolution))) # Prepare parameters of optimization initial_guess = [] bounds = [] fixed_resolution_values = numpy.zeros(Ndim, dtype=float) fixed_resolution_mask = numpy.zeros(Ndim, dtype=bool) fixed_limits_values = numpy.zeros(2 * Ndim, dtype=float) fixed_limits_mask = numpy.zeros(2 * Ndim, dtype=bool) # Compile parameters of optimization: resolution if fix_resolution: fixed_resolution_values[:] = initial_resolution fixed_resolution_mask[:] = True else: for i in range(Ndim): if zero_dimension_mask[i]: # Degenerate dimensions always fixed fixed_resolution_values[i] = 0.0 fixed_resolution_mask[i] = True continue initial_guess.append(initial_resolution[i]) bounds.append( ( max_data_range[i] / max(num_data_points - 1, 1), max_data_range[i], ) ) # Compile parameters of optimization: limits if fix_limits: fixed_limits_values[:Ndim] = initial_start_limits fixed_limits_values[Ndim:] = initial_stop_limits fixed_limits_mask[:] = True else: # Start limits for i in range(Ndim): if zero_dimension_mask[i]: fixed_limits_values[i] = initial_start_limits[i] fixed_limits_mask[i] = True else: initial_guess.append(initial_start_limits[i]) bounds.append((min_limits[i], max_limits[i])) # Stop limits for i in range(Ndim): idx = Ndim + i if zero_dimension_mask[i]: fixed_limits_values[idx] = initial_stop_limits[i] fixed_limits_mask[idx] = True else: initial_guess.append(initial_stop_limits[i]) bounds.append((max_limits[i], max_limits[i])) # Do optimization unless all parameters are fixed if initial_guess: logger.info("Grid optimization applied") initial_guess = numpy.asarray(initial_guess, dtype=float) result = minimize( _scatter_grid_distance_measure, initial_guess, args=( scatter_coordinates, fixed_resolution_values, fixed_resolution_mask, fixed_limits_values, fixed_limits_mask, ), method="Nelder-Mead", bounds=bounds, ) fit_parameters = result.x else: logger.info("No grid optimization applied") fit_parameters = None grid_axes = _fit_parameters_to_grid_axes( Ndim, fit_parameters=fit_parameters, fixed_resolution=fixed_resolution_values, fixed_resolution_mask=fixed_resolution_mask, fixed_limits=fixed_limits_values, fixed_limits_mask=fixed_limits_mask, ) final_grid_shape = tuple(len(arr) for arr in grid_axes) final_resolution = tuple( arr[1] - arr[0] if arr.size > 1 else 0.0 for arr in grid_axes ) logger.info("Final grid shape: %s", final_grid_shape) logger.info("Final resolution: %s", tuple(map(float, final_resolution))) return grid_axes
def _estimate_start_stop_size( array: numpy.ndarray, resolution: Optional[float] = None ) -> Tuple[float, float, int]: """ Estimate grid start, stop, and number of points. Degenerate arrays return a singleton axis. :params array: 1D array :param resolution: bin width :returns: start value, stop value, number of points """ if len(array) == 0: return numpy.nan, numpy.nan, 0 if len(array) == 1: return array[0], array[0], 1 array = numpy.sort(array) # Degenerate dimension if numpy.all(array == array[0]): return array[0], array[0], 1 if resolution is None: distances = numpy.diff(array) min_distance = numpy.min(distances) mid_distance = min_distance + (numpy.max(distances) - min_distance) / 2 mask = distances > mid_distance if mask.any(): resolution = numpy.median(distances[mask]) else: resolution = numpy.median(distances) resolution = rounding.round_to_significant(resolution) start, stop, nbins = rounding.round_range(array[0], array[-1], resolution) npoints = nbins + 1 return start, stop, npoints def _scatter_grid_distance_measure( fit_parameters: numpy.ndarray, scatter_coordinates: numpy.ndarray, fixed_resolution: Optional[numpy.ndarray] = None, fixed_resolution_mask: Optional[numpy.ndarray] = None, fixed_limits: Optional[numpy.ndarray] = None, fixed_limits_mask: Optional[numpy.ndarray] = None, ) -> float: """ Distance measure between scattered points and a grid with a resolution and start and stop limits that are either refined or fixed. :param fit_parameters: :param scatter_coordinates: `(Nscatter, Ndim)` :param fixed_resolution: :param fixed_resolution_mask: :param fixed_limits: :param fixed_limits_mask: :returns: sum of closest distances of grid nodes with scatter_coordinates """ _, Ndim = scatter_coordinates.shape grid_axes = _fit_parameters_to_grid_axes( Ndim, fit_parameters=fit_parameters, fixed_resolution=fixed_resolution, fixed_resolution_mask=fixed_resolution_mask, fixed_limits=fixed_limits, fixed_limits_mask=fixed_limits_mask, ) expanded_grid_coordinates = grid_utils.expanded_grid_coordinates(grid_axes) # Closest grid point for each scatter point _, scatter_distances = grid_utils.closest_point( expanded_grid_coordinates, scatter_coordinates ) scatter_coverage = numpy.sum(scatter_distances) # Closest scatter point for each grid point _, grid_distances = grid_utils.closest_point( scatter_coordinates, expanded_grid_coordinates ) grid_coverage = numpy.sum(grid_distances) return scatter_coverage + grid_coverage def _fit_parameters_to_grid_axes( Ndim: int, fit_parameters: Optional[numpy.ndarray] = None, fixed_resolution: Optional[numpy.ndarray] = None, fixed_resolution_mask: Optional[numpy.ndarray] = None, fixed_limits: Optional[numpy.ndarray] = None, fixed_limits_mask: Optional[numpy.ndarray] = None, ) -> List[numpy.ndarray]: """ Convert optimization parameters to grid axes. Degenerate dimensions produce singleton axes. :param fit_parameters: :param fixed_resolution: :param fixed_resolution_mask: :param fixed_limits: :param fixed_limits_mask: :returns: `Ndim` arrays with shape `(N0,)`, `(N1,)`, ... """ params_offset = 0 grid_resolution = numpy.zeros(Ndim, dtype=float) grid_start_limits = numpy.zeros(Ndim, dtype=float) grid_stop_limits = numpy.zeros(Ndim, dtype=float) # Extract resolution from fit parameters and fixed values for i in range(Ndim): if fixed_resolution_mask is not None and fixed_resolution_mask[i]: grid_resolution[i] = fixed_resolution[i] else: grid_resolution[i] = fit_parameters[params_offset] params_offset += 1 # Extract start limits from fit parameters and fixed values for i in range(Ndim): if fixed_limits_mask is not None and fixed_limits_mask[i]: grid_start_limits[i] = fixed_limits[i] else: grid_start_limits[i] = fit_parameters[params_offset] params_offset += 1 # Extract stop limits from fit parameters and fixed values for i in range(Ndim): idx = Ndim + i if fixed_limits_mask is not None and fixed_limits_mask[idx]: grid_stop_limits[i] = fixed_limits[idx] else: grid_stop_limits[i] = fit_parameters[params_offset] params_offset += 1 grid_axes = [] for i, (start, stop, step) in enumerate( zip(grid_start_limits, grid_stop_limits, grid_resolution) ): if not numpy.isfinite(start) or not numpy.isfinite(stop): raise ValueError( f"The grid along dimension {i} cannot be determined " f"({start}{stop})" ) # Degenerate dimension -> singleton axis if start == stop: grid_axes.append(numpy.array([start], dtype=float)) continue if step == 0 or not numpy.isfinite(step): raise ValueError( f"The grid step along dimension {i} is invalid " f"({step}) for limits ({start}{stop})" ) grid_axes.append(_linspace_fixed_stop(start, stop, step)) return grid_axes def _linspace_fixed_step(start: float, stop: float, step: float) -> numpy.ndarray: return numpy.arange(start, stop, step) def _linspace_fixed_stop(start: float, stop: float, step: float) -> numpy.ndarray: n = max(int((stop - start) / step + 1.5), 1) return numpy.linspace(start, stop, n)