Source code for lisainstrument.sigpro.dynamic_delay_numpy

"""Functions for applying dynamic real-valued shifts to numpy arrays using Lagrange interpolation

The main class in this module is DynamicShiftNumpy. It allows to perform a time-shifting
opration on a numpy array, with time-dependent time shift. A similar functionality is implemented
in the streams.delay module for chunked processing. Both are using the same core
interpolation methods.

The interpolation method to be used is provided by the user in form of an object
implementing the RegularInterpolator protocol defined in the module regular_interpolators.
This interpolation engine is based on numpy arrays. The RegularInterpolator protocol is
not responsible for setting up boundary conditions, which is the responsability of
DynamicShiftNumpy.

The other parameters that determine the DynamicShiftNumpy behavior are collected
in a class DynShiftCfg. It contains the left and right boundary conditions. The available
options are defined by the ShiftBC enum class. DynShiftCfg also contains limits
for the allowable minimum and maximum time shift. Those have to be supplied by the user
because they cannot be determined from the data in DynamicShiftDask and DynamicShiftNumpy
is required to behave exactly like DynamicShiftDask.

The convenience functions make_dynamic_shift_lagrange_numpy() and
make_dynamic_shift_linear_numpy() return a DynamicShiftNumpy instance
employing Lagrange or linear interpolation, respectively. For testing,
there is another Lagrange interpolator based on the legacy dsp.timeshift
code. Use lisainstrument.legacy.dynamic_delay_dsp.make_dynamic_shift_dsp_numpy()
to obtain a corresponding DynamicShiftNumpy instance.

Example use:

>>> op = make_dynamic_shift_lagrange_numpy(
        order=31,
        min_delay=-2., max_delay=21.,
        left_bound=ShiftBC.FLAT, right_bound=ShiftBC.EXCEPTION
    )
>>> delay = np.linspace(-1.2,20.4,100)
>>> data = np.linspace(0,1, len(delay)
>>> shifted_data = op(data, -delay)


Internally, the module works as follows. DynamicShiftNumpy contains a
user-provided RegularInterpolator. The latter can interpolate to points
within the given data, minus a margin size defined by RegularInterpolator
implementations. Before calling RegularInterpolator, DynamicShiftNumpy
extends the data by suitable margins filled according to the selected
boundary conditions. The margin size is computed from the margins needed
by the interpolator as well as the fixed limits specified for the timeshift.
"""

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from typing import Final

import numpy as np
from typing_extensions import assert_never

from lisainstrument.sigpro.regular_interpolators import (
    RegularInterpolator,
    make_regular_interpolator_lagrange,
    make_regular_interpolator_linear,
)
from lisainstrument.sigpro.types_numpy import NumpyArray1D


[docs] class ShiftBC(Enum): """Enum for various methods of handling boundaries in dynamic shift ZEROPAD: Assume all data outside given range is zero. FLAT: Assume all data outside given range is equal to value on nearest boundary EXCEPTION: raise exception if data outside range is needed """ EXCEPTION = 1 ZEROPAD = 2 FLAT = 3
[docs] @dataclass(frozen=True) class DynShiftCfg: """Config class for dynamic shift interpolation Attributes: min_delay: Assume that any shift > -max_delay max_delay: Assume that any shift < -min_delay left_bound: Treatment of left boundary right_bound: Treatment of right boundary """ min_delay: float max_delay: float left_bound: ShiftBC right_bound: ShiftBC @property def min_delay_int(self) -> int: """Minimum delay in integer index space""" return int(np.floor(self.min_delay)) @property def max_delay_int(self) -> int: """Maximum delay in integer index space""" return int(np.ceil(self.max_delay)) def __post__init__(self): if self.min_delay > self.max_delay: msg = f"Invalid delay range for dynamic shift ({self.min_delay}, {self.max_delay})" raise ValueError(msg)
[docs] class DynamicShiftNumpy: """Interpolate to locations given by a non-const shift. This allows to interpolate samples in a numpy array to locations specified by a shift given by another numpy array of same size. The shift is specified in units of the array index, i.e. there is no separate coordinate array. A positive shift refers to values right of a given sample, negative shifts to values on the left. The boundary treatment can be specified for each boundary in terms of ShiftBC enums. The interpolation method is not fixed but provided via an interpolator instance implementing the RegularInterpMethod protocol. For technical reasons, the shift values have to be within some bound that has to be provided. """ def __init__( self, cfg: DynShiftCfg, interp: RegularInterpolator, ): """Set up interpolator. Arguments: cfg: limits of delay and boundary treatment interp: interpolation method """ self._cfg: Final = cfg self._interp_np: Final = interp @property def margin_left(self) -> int: """Left margin size. If positive, specifies how many samples on the left have to be added by boundary conditions. If negative, specifies how many samples on the left are unused. """ return self._interp_np.margin_left + self._cfg.max_delay_int @property def margin_right(self) -> int: """Right margin size. If positive, specifies how many samples on the right have to be added by boundary conditions. If negative, specifies how many samples on the right are unused. """ return self._interp_np.margin_right - self._cfg.min_delay_int def __call__(self, samples: np.ndarray, shift: np.ndarray) -> NumpyArray1D: """Apply shift given by numpy array to samples in another numpy array. The shift and sample arrays need to have the same size, and each shift provides the interpolation location relative to the sample with the same index. Shifts are floating point values. A shift of +1 refers to the sample on the right, -1 the sample on the left, etc. All arrays have to be 1D. Arguments: samples: 1D numpy array with data samples shift: 1D numpy array with shifts Returns: Numpy array with interpolated samples """ out_size = len(shift) npad_left = 0 padv_left = 0.0 if self.margin_left > 0: match self._cfg.left_bound: case ShiftBC.ZEROPAD: npad_left = self.margin_left padv_left = 0.0 case ShiftBC.FLAT: npad_left = self.margin_left padv_left = samples[0] case ShiftBC.EXCEPTION: msg = ( f"DynamicShiftNumpy: left edge handling {self._cfg.left_bound.name} not " f"possible for given max delay {self._cfg.max_delay}." ) raise RuntimeError(msg) case _ as unreachable: assert_never(unreachable) npad_right = 0 padv_right = 0.0 if self.margin_right > 0: match self._cfg.right_bound: case ShiftBC.ZEROPAD: npad_right = self.margin_right padv_right = 0.0 case ShiftBC.FLAT: npad_right = self.margin_right padv_right = samples[-1] case ShiftBC.EXCEPTION: msg = ( f"DynamicShiftNumpy: right edge handling {self._cfg.right_bound.name} not " f"possible for given min delay {self._cfg.min_delay=}." ) raise RuntimeError(msg) case _ as unreachable: assert_never(unreachable) if npad_left > 0 or npad_right > 0: samples = np.pad( samples, (npad_left, npad_right), mode="constant", constant_values=(padv_left, padv_right), ) n_size = npad_left + out_size + self.margin_right n_first = npad_left - self.margin_left samples_needed = samples[n_first : n_first + n_size] return self._interp_np.apply_shift(samples_needed, shift, self.margin_left)
[docs] def make_dynamic_shift_lagrange_numpy( order: int, min_delay: float, max_delay: float, left_bound: ShiftBC, right_bound: ShiftBC, ) -> DynamicShiftNumpy: """Set up DynamicShiftNumpy instance with Lagrange interpolation method. Arguments: order: Order of the Lagrange polynomials min_delay: Assume that any shift > -max_delay max_delay: Assume that any shift < -min_delay left_bound: Treatment of left boundary right_bound: Treatment of right boundary Returns: Interpolation function of type DynamicShiftNumpy """ interp = make_regular_interpolator_lagrange(order) cfg = DynShiftCfg(min_delay, max_delay, left_bound, right_bound) return DynamicShiftNumpy(cfg, interp)
[docs] def make_dynamic_shift_linear_numpy( min_delay: float, max_delay: float, left_bound: ShiftBC, right_bound: ShiftBC, ) -> DynamicShiftNumpy: """Set up DynamicShiftNumpy instance with linear interpolation method. Arguments: min_delay: Assume that any shift > -max_delay max_delay: Assume that any shift < -min_delay left_bound: Treatment of left boundary right_bound: Treatment of right boundary Returns: Interpolation function of type DynamicShiftNumpy """ interp = make_regular_interpolator_linear() cfg = DynShiftCfg(min_delay, max_delay, left_bound, right_bound) return DynamicShiftNumpy(cfg, interp)