Signal processing

The sigpro subpackage contains low-level signal processing tools. The functions are based on numpy arrays and do not offer chunked processing. The design is however geared to support the streams package for chunked processing.

The purpose of this package is to provide the signal processing algorithms needed in the stream package. It is generic enough to be of use elsewhere outside the context of the simulator, however.

Interpolation

Functions for interpolating numpy arrays with 1D regularly spaced data

This provides a generic interface RegularInterpolator as well as two interpolation methods, linear and Lagrange. The latter is written from scratch, see module regular_interpolator_dsp for another one based on the dsp.timeshift Lagrange interpolator.

class lisainstrument.sigpro.regular_interpolators.RegularInterpCore(*args, **kwargs)[source]

Protocol for interpolator engine interface for regularly spaced samples

This defines an interface for core functionality of interpolating regularly spaced samples in 1D, using numpy arrays. It is not intended for direct use but for use in the RegularInterpolator class.

Boundary treatment is not part of this protocol. Implementations should only accept locations that can be interpolated to without using any form of boundary conditions, and raise an exception otherwise. The margin sizes required by the interpolation method are exposed as properties.

apply(samples: ndarray[tuple[int], dtype[number]], locations: ndarray[tuple[int], dtype[number]], int_offsets: ndarray[tuple[int], dtype[number]] | int = 0) ndarray[source]

Interpolate regularly spaced data to location in index-space

The locations to interpolate to are the sum of the locations and int_offsets arguments. The location argument is an 1D array with arbitrary floating point locations, and the int_offset argument is an integer or integer array with additional integer offsets. The locations argument is not restricted, i.e. it is not necessarily the residual fractional locations but can have arbitrary values.

The locations refer to the index of the array with the sampled data, starting at 0. The length of samples array does not have to match the size of the location arrays. If int_offsets is an array, it needs to have same size arrays location and. All arrays need to be onedimensional.

Implementations do not need to check the array dimensionality, size consistency, and types. This is done in RegularInterpolator.

Parameters:
  • samples – 1D numpy array with sampled data

  • locations – real-valued 1D numpy array with locations to interpolate to

  • int_offsets – integer or integer 1D array with additional offsets to the locations

Returns:

Interpolated samples

apply_shift(samples: ndarray[tuple[int], dtype[number]], shift: ndarray[tuple[int], dtype[number]], shift_offset: int) ndarray[source]

Iterpolate to location specified in terms of shifts instead absolute locations

The locations are specified via an array s of real-valued shifts. For the element s[i] of the shift array with array index i, the absolute location within the index space of the input samples is given by i + s[i] + ofs, where ofs is a constant integer offset. A zero shift means the output sample with index i is the input sample with index i+ofs. The offset can be positive or negative. Shift values that would require samples not in the input are not allowed. The output should be the same as for

apply(samples, shift, shift_offset + np.arange(shift.shape[0]))

Parameters:
  • samples – 1D numpy array with sampled data

  • shift – 1D float numpy array with shifts

  • shift_offset – constant integer offset

Returns:

Interpolated samples

property margin_left: int

Margin size (>= 0) on the left boundary

The interpolator cannot be called with locations within this margin from the leftmost sample.

property margin_right: int

Margin size (>= 0) on the right boundary

The interpolator cannot be called with locations within this margin from the rightmost sample.

class lisainstrument.sigpro.regular_interpolators.RegularInterpLagrange(order: int)[source]

Class implementing interpolation of regularly spaced 1D data using Lagrange polynomials.

The algorithm uses Lagrange interpolation is specialized to regularly spaced data. The coefficients of the Lagrange polynomials are computed in advance, and converted to a set of FIR filters. The FIR filters will be applied to the samples and the result at the integer locations multiplied with the corresponding power of the fractional locations.

The formulation in terms of FIR filters works as follows. In general each interpolation location \(x\) will use a Lagrange polynomial centered around a sample index we denote \(n(x)\). The interpolated function can be written as

\[y(x) = \sum_{a=0}^P (x-x_{n(x)})^a C_{n(x)}^a\]

where the \(x_n\) are the sample point locations and \(C_{n}^a\) are the coefficients of the interpolating polynomial centered around \(n\). The latter in turn depend linearly on the data samples, as

\[C_n^a = \sum_{k=D}^{D+N-1} y_{n+k} L_k^a\]

For fixed \(a\), the above obviously describes a FIR filter, where \(D\) is the offset and \(N\) the length of the convolution kernel. Thus, to evaluate the interpolation function, we apply \(P\) different FIR filters to the sample data, and compute a weigthed sum with weights that depend on the interpolation locations.

For the Lagrange polynomials, we have \(N = P + 1\). The choice for \(n(x)\) and for the offset \(D\) are related. The goal is to use the polynomial build from a stencil with center closest to the given interpolation location. For odd length, the center point is obtained by rounding the location, and that the remaining fractional shift is within \([-1/2,1/2]\). For even length, the center points is the floor of the location, with remaining fractional shift within \([0,1)\)

See RegularInterpCore for general properties not specific to the interpolation method.

apply(samples: ndarray[tuple[int], dtype[number]], locations: ndarray[tuple[int], dtype[number]], int_offsets: ndarray[tuple[int], dtype[number]] | int = 0) ndarray[source]

Interpolate regularly spaced data to location in index-space

See RegularInterpCore.apply()

Parameters:
  • samples – real-valued 1D numpy array with sampled data

  • locations – real-valued 1D numpy array with locations to interpolate to

  • int_offsets – integer or integer 1D array with additional offsets to the locations.

Returns:

Interpolated samples.

apply_shift(samples: ndarray[tuple[int], dtype[number]], shift: ndarray[tuple[int], dtype[number]], shift_offset: int) ndarray[source]

Iterpolate to location specified in terms of shifts instead absolute locations

See RegularInterpCore.apply_shift().

Parameters:
  • samples – 1D numpy array with sampled data

  • shift – 1D float numpy array with shifts

  • shift_offset – constant integer offset

Returns:

Interpolated samples

property margin_left: int

Margin size (>= 0) on the left boundary

The interpolator cannot be called with locations within this margin from the leftmost sample.

property margin_right: int

Margin size (>= 0) on the right boundary

The interpolator cannot be called with locations within this margin from the rightmost sample.

class lisainstrument.sigpro.regular_interpolators.RegularInterpLinear(*args, **kwargs)[source]

Class implementing interpolation of regularly spaced 1D data using linear interpolation.

See RegularInterpCore for general properties not specific to the interpolation method.

apply(samples: ndarray[tuple[int], dtype[number]], locations: ndarray[tuple[int], dtype[number]], int_offsets: ndarray[tuple[int], dtype[number]] | int = 0) ndarray[source]

Interpolate regularly spaced data to location in index-space

See RegularInterpCore.apply()

Parameters:
  • samples – 1D numpy array with sampled data

  • locations – real-valued 1D numpy array with locations to interpolate to

  • int_offsets – integer or integer 1D array with additional offsets to the locations.

Returns:

Interpolated samples.

apply_shift(samples: ndarray[tuple[int], dtype[number]], shift: ndarray[tuple[int], dtype[number]], shift_offset: int) ndarray[source]

Iterpolate to location specified in terms of shifts instead absolute locations

See RegularInterpCore.apply_shift().

Parameters:
  • samples – 1D numpy array with sampled data

  • shift – 1D float numpy array with shifts

  • shift_offset – constant integer offset

Returns:

Interpolated samples

property margin_left: int

Margin size (= 0) on the left boundary

The linear interpolator can be called for all locations within the sample range.

property margin_right: int

Margin size (= 0) on the right boundary

The linear interpolator can be called for all locations within the sample range.

class lisainstrument.sigpro.regular_interpolators.RegularInterpolator(core: RegularInterpCore)[source]

User-facing class for interpolation of regularly spaced data

The interpolation method is not fixed but given by an interpolation engine. The main purpose of this class is to provide the parameter checks common to all interpolation methods.

apply_shift(samples_: ndarray, shift_: ndarray, shift_offset: int) ndarray[tuple[int], dtype[number]][source]

Iterpolate to location specified in terms of shifts instead absolute locations

See RegularInterpCore.apply_shift().

Parameters:
  • samples – 1D numpy array with sampled data

  • shifts – 1D float numpy array with shifts

  • shift_offset – constant integer offset

Returns:

Interpolated samples

property margin_left: int

Margin size (>= 0) on the left boundary

The interpolator cannot be called with locations within this margin from the leftmost sample.

property margin_right: int

Margin size (>= 0) on the right boundary

The interpolator cannot be called with locations within this margin from the rightmost sample.

lisainstrument.sigpro.regular_interpolators.make_lagrange_polynomials(length: int, offset: int) list[Polynomial][source]

Construct lagrange interpolating polynomials

This constructs Lagrange interpolation polynomials with given order, specialized to regularly spaced coordinates with spacing of one, with a center specified in terms of an integer offset.

This produces $N$ polynomials $p_j(x)$ of order $N-1$, which satisfy $p_j(k) = 1$ if $k = j+D$ and $p_j(k) = 0$ for integers $k=0 ldots N-1, k ne j$

Parameters:
  • length – The number $N$ of polynomials of order $N-1$

  • offset – The offset $D$

Returns:

List of polynomials $p_j$ given as numpy Polynomial objects

lisainstrument.sigpro.regular_interpolators.make_regular_interpolator_lagrange(order: int) RegularInterpolator[source]

Create an interpolator using Lagrange interpolation

See RegularInterpLagrange for details of the method.

Parameters:

order – order of the interpolating polynomials

Returns:

Interpolation function

lisainstrument.sigpro.regular_interpolators.make_regular_interpolator_linear() RegularInterpolator[source]

Create an interpolator using linear interpolation

Returns:

Interpolation function

Dynamic time shift

Functions for applying dynamic real-valued shifts to numpy arrays using Lagrange interpolation

The main class in this module is DynamicShiftNumpy. It allows to perform a time-shifting opration on a numpy array, with time-dependent time shift. A similar functionality is implemented in the streams.delay module for chunked processing. Both are using the same core interpolation methods.

The interpolation method to be used is provided by the user in form of an object implementing the RegularInterpolator protocol defined in the module regular_interpolators. This interpolation engine is based on numpy arrays. The RegularInterpolator protocol is not responsible for setting up boundary conditions, which is the responsability of DynamicShiftNumpy.

The other parameters that determine the DynamicShiftNumpy behavior are collected in a class DynShiftCfg. It contains the left and right boundary conditions. The available options are defined by the ShiftBC enum class. DynShiftCfg also contains limits for the allowable minimum and maximum time shift. Those have to be supplied by the user because they cannot be determined from the data in DynamicShiftDask and DynamicShiftNumpy is required to behave exactly like DynamicShiftDask.

The convenience functions make_dynamic_shift_lagrange_numpy() and make_dynamic_shift_linear_numpy() return a DynamicShiftNumpy instance employing Lagrange or linear interpolation, respectively. For testing, there is another Lagrange interpolator based on the legacy dsp.timeshift code. Use lisainstrument.legacy.dynamic_delay_dsp.make_dynamic_shift_dsp_numpy() to obtain a corresponding DynamicShiftNumpy instance.

Example use:

>>> op = make_dynamic_shift_lagrange_numpy(
        order=31,
        min_delay=-2., max_delay=21.,
        left_bound=ShiftBC.FLAT, right_bound=ShiftBC.EXCEPTION
    )
>>> delay = np.linspace(-1.2,20.4,100)
>>> data = np.linspace(0,1, len(delay)
>>> shifted_data = op(data, -delay)

Internally, the module works as follows. DynamicShiftNumpy contains a user-provided RegularInterpolator. The latter can interpolate to points within the given data, minus a margin size defined by RegularInterpolator implementations. Before calling RegularInterpolator, DynamicShiftNumpy extends the data by suitable margins filled according to the selected boundary conditions. The margin size is computed from the margins needed by the interpolator as well as the fixed limits specified for the timeshift.

class lisainstrument.sigpro.dynamic_delay_numpy.DynShiftCfg(min_delay: float, max_delay: float, left_bound: ShiftBC, right_bound: ShiftBC)[source]

Config class for dynamic shift interpolation

min_delay

Assume that any shift > -max_delay

Type:

float

max_delay

Assume that any shift < -min_delay

Type:

float

left_bound

Treatment of left boundary

Type:

lisainstrument.sigpro.dynamic_delay_numpy.ShiftBC

right_bound

Treatment of right boundary

Type:

lisainstrument.sigpro.dynamic_delay_numpy.ShiftBC

property max_delay_int: int

Maximum delay in integer index space

property min_delay_int: int

Minimum delay in integer index space

class lisainstrument.sigpro.dynamic_delay_numpy.DynamicShiftNumpy(cfg: DynShiftCfg, interp: RegularInterpolator)[source]

Interpolate to locations given by a non-const shift.

This allows to interpolate samples in a numpy array to locations specified by a shift given by another numpy array of same size. The shift is specified in units of the array index, i.e. there is no separate coordinate array. A positive shift refers to values right of a given sample, negative shifts to values on the left.

The boundary treatment can be specified for each boundary in terms of ShiftBC enums.

The interpolation method is not fixed but provided via an interpolator instance implementing the RegularInterpMethod protocol.

For technical reasons, the shift values have to be within some bound that has to be provided.

property margin_left: int

Left margin size.

If positive, specifies how many samples on the left have to be added by boundary conditions. If negative, specifies how many samples on the left are unused.

property margin_right: int

Right margin size.

If positive, specifies how many samples on the right have to be added by boundary conditions. If negative, specifies how many samples on the right are unused.

class lisainstrument.sigpro.dynamic_delay_numpy.ShiftBC(value)[source]

Enum for various methods of handling boundaries in dynamic shift

ZEROPAD: Assume all data outside given range is zero. FLAT: Assume all data outside given range is equal to value on nearest boundary EXCEPTION: raise exception if data outside range is needed

lisainstrument.sigpro.dynamic_delay_numpy.make_dynamic_shift_lagrange_numpy(order: int, min_delay: float, max_delay: float, left_bound: ShiftBC, right_bound: ShiftBC) DynamicShiftNumpy[source]

Set up DynamicShiftNumpy instance with Lagrange interpolation method.

Parameters:
  • order – Order of the Lagrange polynomials

  • min_delay – Assume that any shift > -max_delay

  • max_delay – Assume that any shift < -min_delay

  • left_bound – Treatment of left boundary

  • right_bound – Treatment of right boundary

Returns:

Interpolation function of type DynamicShiftNumpy

lisainstrument.sigpro.dynamic_delay_numpy.make_dynamic_shift_linear_numpy(min_delay: float, max_delay: float, left_bound: ShiftBC, right_bound: ShiftBC) DynamicShiftNumpy[source]

Set up DynamicShiftNumpy instance with linear interpolation method.

Parameters:
  • min_delay – Assume that any shift > -max_delay

  • max_delay – Assume that any shift < -min_delay

  • left_bound – Treatment of left boundary

  • right_bound – Treatment of right boundary

Returns:

Interpolation function of type DynamicShiftNumpy

Fixed time shift

Functions for applying fixed real-valued shifts to numpy arrays using Lagrange interpolation

This provides a generic interface FixedShiftCore as well as an implementation using Lagrange interpolation. The latter is written from scratch, see module fixed_shift_dsp for another one based on the dsp.timeshift Lagrange interpolator.

class lisainstrument.sigpro.fixed_shift_numpy.FixedShiftCore(*args, **kwargs)[source]

Protocol for applying fixed shift to regularly spaced samples

This defines an interface for applying a fixed shift to regularly spaced samples in 1D, provided as numpy arrays.

Boundary treatment is not part of this protocol. Implementations only compute locations that can be interpolated to without using any form of boundary conditions. The corresponding margin sizes required by the interpolation method are exposed as properties.

Arbitrary shifts are valid, but the main use case are shifts of magnitude around 1. For large shifts, the total margin size required will increase, in which case it might be more efficient to handle the integer shift before and then apply the remaining fractional shift.

apply(samples: ndarray) ndarray[tuple[int], dtype[number]][source]

Apply the fractional shift to a 1D numpy array.

The output is the shifted input with left and right margins removed. Denoting the input data as \(y_i\) with \(i=0 \ldots N-1\), and the interpolated input data as \(y(t)\), such that \(y(i)=y_i\), the output \(z_k\) is given by \(z_k = y(k + M_L + s), k=0 \ldots N - 1 - M_L - M_R\), where \(M_L\) and \(M_R\) are the left and right margin sizes.

Parameters:
  • samples – 1D numpy array with sampled data

  • start – integer part of

  • size – number of points to return

Returns:

Interpolated samples

property margin_left: int

Margin size (>= 0) needed on the left boundary

property margin_right: int

Margin size (>= 0) needed on the right boundary

property shift: float

The shift

class lisainstrument.sigpro.fixed_shift_numpy.FixedShiftLagrange(order: int, shift: float)[source]

Class implementing fixed shift of regularly spaced 1D data using Lagrange interpolation.

The algorithm uses Lagrange interpolation, using a FIR filter based on lagrange polynomials evaluated at fixed fractional shift. This uses a stencil with center as close to the location as possible. For odd length, the center point is obtained by rounding the location, and that the remaining fractional shift is within \([-1/2,1/2]\). For even locations, the center points is the floor of the location, with remaining fractional shift within \([0,1)\)

See FixedShiftCore for general properties not specific to the interpolation method.

apply(samples: ndarray) ndarray[tuple[int], dtype[number]][source]

Apply shift, see FixedShiftCore.apply()

Parameters:

samples – 1D numpy array with sampled data

Returns:

Shifted input excluding margins

static factory(order: int) Callable[[float], FixedShiftCore][source]

Factory for making FixedShiftLagrange instances

Parameters:

order – The order of the Lagrange polynomials to use

Returns:

Factory function for making FixedShiftLagrange instances from shift

property margin_left: int

Margin size (>= 0) needed on the left boundary

property margin_right: int

Margin size (>= 0) needed on the right boundary

property shift: float

The shift

class lisainstrument.sigpro.fixed_shift_numpy.FixedShiftNumpy(left_bound: ShiftBC, right_bound: ShiftBC, interp_fac: Callable[[float], FixedShiftCore])[source]

Interpolate to locations given by a const shift.

This allows to interpolate samples in a numpy array to locations specified by a fixed shift. The shift is specified in units of the array index, i.e. there is no separate coordinate array. A positive shift refers to values right of a given sample, negative shifts to values on the left.

The boundary treatment can be specified for each boundary in terms of ShiftBC enums.

The interpolation method is not fixed but provided via an interpolator instance implementing the FixedShiftCore protocol.

lisainstrument.sigpro.fixed_shift_numpy.make_fir_lagrange_fixed(length: int, frac_shift: float) DefFilterFIR[source]

Create FIR filter corresponding to non-integer shift using Lagrange interpolation

This creates a FIR filter with coefficients given by an Lagrangian interpolation polynomial evaluated at a fixed location \(s\).

We work completely in index space of the input sequence, i.e. the coordinate of a given index is the index. The location \(s\) also refers to index space.

Thus, we specialize Lagrange interpolation to the case where the sample locations are \(t_j = j + D\), whith \(j=0 \ldots L-1\), and \(L\) being the number of points to use. The integer offset \(D\) determines which points to use, and should be chosen such that the center \(D + L/2\) lies near \(s\). In other words, the input sequence indices \(D \ldots D+L-1\) are used to interpolate to (fractional) index \(s\).

So far we described interpolation to one location. The FIR filter is defined by simple translation, such that the element with index 0 in the output sequence corresponds to the input sequence interpolated to fractional index \(s\), and any element \(a\) to \(s+a\).

\[\begin{split}y_a &= \sum_{j=0}^{L-1} K_j x_{a+j+D} \\ K_j &= \prod_{\substack{m=0\\m \neq j}}^{L-1} \frac{s-D-m}{j-m}\end{split}\]

The offset is chosen to center the stencil around a shift \(s=0\) for odd length and \(s=1/2\) for even length. The shift should not exceed the bounds \(-1/2 < s < 1/2\) for odd length or \(0 < s < 1\) for even length significantly, to avoid large overshoots that inherently occur off-center for high order lagrange interpolation.

Parameters:
  • length – Number of FIR coefficients (=Lagrange order + 1)

  • frac_shift – The shift \(s\)

Returns:

FIR filter definition

lisainstrument.sigpro.fixed_shift_numpy.make_fixed_shift_lagrange_numpy(left_bound: ShiftBC, right_bound: ShiftBC, order: int) FixedShiftNumpy[source]

Create a FixedShiftNumpy instance that uses Lagrange interpolator

Parameters:
  • left_bound – boundary treatment on the left

  • right_bound – boundary treatment on the right

  • order – Order of the Lagrange plolynomials

Returns:

Fixed shift interpolator

lisainstrument.sigpro.fixed_shift_numpy.make_numpy_array_1d_float(a: ndarray) ndarray[tuple[int], dtype[number]][source]

Ensure numpy array is 1D and floating point type

Generic time shift

Time shift operator employing dynamic or fixed shift to numpy arrays

AdaptiveShiftNumpy is a wrapper that stores a fixed and a dynamic time shift operators and uses the appropriate one when called.

class lisainstrument.sigpro.adaptive_delay_numpy.AdaptiveShiftNumpy(delay_const: Callable[[ndarray, float], ndarray], delay_dynamic: Callable[[ndarray, ndarray], ndarray], fsample: float)[source]

Time shifting that accepts fixed or dynamic shifts as well as constant data

Instances act as interpolator function, which delegates to interpolation methods for fixed or dynamic time shifting. The specialised interpolation functions are provided during construction.

In addition, instances store a fixed sample rate that is assumed for any interpolated data, allowing time shifts to be given in time units.

delay(x: ndarray | float, delay_time: ndarray | float) ndarray | float[source]

Like shift, but specified in terms of a delay, with opposite sign

A positive delay means the result depends on data to the left (aka the past).

Parameters:
  • x – the data to be shifted

  • delay_time – time delay (= -shift_time) [s]

Returns:

The delayed data

dynamic(x: ndarray | float, shift_time: ndarray) ndarray | float[source]

Apply dynamic time shift

The shift is given in time units, and the data is assumed to be sampled with rate fsample given in the constructor.

Both data and time shift can be scalar or 1D numpy arrays. Scalars are interpreted as constant arrays. In case of scalar data, the same scalar is returned. In case of scalar shift, a more efficient algorithm is used, which should yield identical results as for a const shift array.

Parameters:
  • x – the data to be shifted, as scalar or 1D array

  • shift_time – time shift [s], as 1D array

Returns:

The shifted data

fixed(x: ndarray | float, shift_time: float) ndarray | float[source]

Apply fixed timeshift

Parameters:
  • x – the data to be shifted, as scalar or 1D array

  • shift_time – scalar time shift [s]

shift(x: ndarray | float, shift_time: ndarray | float) ndarray | float[source]

Apply adaptive time shift to sequence, chosing most efficient method

The shift is given in time units, and the data is assumed to be sampled with rate fsample given in the constructor. A positive shift means the result depends on data to the right.

Both data and time shift can be constant or non-constant sequences, represented by 1d numpy arrays or float values, respectively. Shifting a constant sequence results in the same constant sequence, as constant sequences are interpreted as infinite (no boundaries). Shifting a non-constant sequence applies the most efficient method depending if the shift is variable, constant, or zero.

Parameters:
  • x – the data to be shifted

  • shift_time – time shift [s]

Returns:

The shifted data

Spline interpolation

This module allows to perform spline interpolation in a chunked fashion.

The main use case is interpolation of arbitrary large data read from file in the context of chunked data processing. This module has no chunking functionality itself but provides useful low level infrastructure. The source of data is abstracted as a function that provides data samples covering a given time interval. For example, it could read a partial dataset from file. The idea is that this function will be called sequentially for consequtive time intervals. Time series are represented by the TimeSeriesNumpy class, and the above callable by the type alias TimeSeriesSource. The function make_chunked_spline_interpolator and make_global_spline_interpolator turn a TimeSeriesSource into an interpolation function. The difference is that the first constructs spline interpolators on the fly for each segment, while the other uses a fixed interpolation spline covering a fixed time interval.

The resulting interpolating functions are returned as FuncOfTime instances which can be used by stream_func_of_time from the streams.sampling module to create streams of sampled data.

lisainstrument.sigpro.chunked_splines.make_chunked_spline_interpolator(func: Callable[[float, float], TimeSeriesNumpy | None], order: int) FuncOfTime[source]

Turns a source of time series into a chunked spline interpolation function

The input is a function that returns a timeseries covering a time interval given as arguments. The timeseries is assumed to include a margin sufficient to be used for spline interpolation on the requested interval. The output is a function of time based on interpolating the time series on the fly. Given an array with sample times, the input function is used to get a time series covering the same interval. This time series is used to construct a spline interpolator, which is evaluated at the sample times. The resulting interpolation function can be evaluated anywhere and zero-pads data outside the range of the available data.

Parameters:
  • func – Callable providing time series for a given interval

  • order – The order of the spline interpolation

Returns:

Interpolation function

lisainstrument.sigpro.chunked_splines.make_global_spline_interpolator(func: Callable[[float, float], TimeSeriesNumpy | None], tmin: float, tmax: float, order: int) FuncOfTime | ConstFuncOfTime[source]

Turns a source of time series into a global spline interpolation function

The input is a function that returns a timeseries covering a time interval given as arguments. The timeseries is assumed to include a margin sufficient to be used for spline interpolation on the requested interval. The output is a fixed interpolation function covering the specified interval.

Parameters:
  • func – Callable providing time series for a given interval

  • tmin – Start of interval to be covered by interpolation function

  • tmax – End of interval to be covered by interpolation function

  • order – The order of the spline interpolation

Returns:

Interpolation function

lisainstrument.sigpro.chunked_splines.make_spline_interpolator(func: Callable[[float, float], TimeSeriesNumpy | None] | float, tmin: float, tmax: float, order: int, chunked: bool) FuncOfTime | ConstFuncOfTime[source]

Wrapper presenting spline-interpolated or const function as FuncOfTime or ConstFuncOfTime.

This sets up a spline interplator based on a data source, with option to chose between chunked and global interpolation. Further, providing a scalar float instead of a data source is interpreted as constant data without need for interpolation, and a ConstFuncOfTime is returned instead of an interpolation function.

Parameters:
  • func – Scalar or callable source for time series

  • tmin – Start of interval that has to be covered (used only when chunked==False)

  • tmax – End of interval that has to be covered (used only when chunked==False)

  • order – Order of the interpolation splines

  • chunked – Wether to used chunked or global interpolation

Returns:

Interpolation function as FuncOfTime or ConstFuncOfTime

FIR filters

Utilities for defining FIR filters and filtering 1D numpy arrays

This module provides basic definitions for FIR filters and functionality to apply them to simple 1D numpy arrays. This is used in the streams.firfilter module to apply FIR filter to streams in chunked data processing.

To define a FIR filter, use the class DefFilterFIR. The filters are specified by their coefficients and the center location. Instances of DefFilterFIR provide the required margin sizes (domain of dependence). To set up such a filter where the center location corresponds to what is called a causal filter, use make_fir_causal_normal_from_coeffs(). Further, there is a convinience function make_fir_causal_kaiser() to set up a FIR definition for a Kaiser window.

The filter definitions can be used to create corresponding functions operating on numpy arrays. For this, use the make_filter_fir_numpy function.

When applying FIR filters, one needs to specify boundary conditions. The available options are specified using the EdgeHandling enum class.

Example use:

>>> input_data = np.linspace(0, 100, 103)
>>> fir_coeffs = [0.1, 0.7, 0.1, 0.1]
>>> fir_def = DefFilterFIR(filter_coeffs=fir_coeffs, offset=-1)
>>> fir_filt = make_filter_fir_numpy(fir_def, EdgeHandling.ZEROPAD, EdgeHandling.VALID)
>>> filtered_data = fir_filt(input_data)

Internally, the module is organized as follows. The application of a FIR filter without handling boundary conditions is implemented in a class FIRCoreOp. It requires that the input arrays are already suitably padded with margins. This requirement is not exclusive to FIR filters. It is encoded in the SemiLocalMapType protocol which is implemented by FIRCoreOp. This generalization is useful when implementing filters in chunked processing.

Applying a transform of the SemiLocalMapType to a numpy array is the purpose of the semilocal_map_numpy() function. This also takes care of the boundary conditions by padding its input accordingly. The special case of applying a FIR filter is handled by the convenience function filter_fir_numpy(). Finally, the make_filter_fir_numpy() function turns the above into an unary operator, binding all arguments except the array to be filtered.

class lisainstrument.sigpro.fir_filters_numpy.DefFilterFIR(filter_coeffs, offset: int)[source]

This dataclass defines a FIR filter

The finite impulse response filter is given by

\[y_a = \sum_{i=0}^{L-1} K_i x_{a + i + D} = \sum_{k=a+D}^{a+D+L-1} x_{k} K_{k-a-D}\]

Note that there are different conventions for the order of coefficients. In standard convolution notation, the convolution kernel is given by the coefficients above in reversed order.

filter_coeffs

Filter coefficients \(K_i\)

Type:

list[float]

offset

Offset \(D\)

Type:

int

check(_, value)[source]

Validate filter coefficients

property domain_of_dependence: tuple[int, int]

The domain of dependence

A point with index \(a\) in the output sequence depends on indices \(a+D \ldots a+D+L-1\). This property provides the domain of dependence for \(a=0\).

property gain: float

The gain factor for a constant signal

The gain factor is defined by

property length: int

The length of the domain of dependence of a given output sample on the input samples. This does not take into account zeros anywhere in the coefficients. Thus the result is simply the number of coefficients.

class lisainstrument.sigpro.fir_filters_numpy.EdgeHandling(value)[source]

Enum for various methods of handling boundaries in filters.

VALID: Use only valid points that can be computed from the given data,

without zero padding or similar

ZEROPAD: Compute output for every input point, pad input with suitable

number of zeros before applying filter.

class lisainstrument.sigpro.fir_filters_numpy.FIRCoreOp(fdef: DefFilterFIR)[source]

Function class applying FIR to numpy array

This does not include boundary tratment and only returns valid points. It does provide the margin sizes corresponding to invalid boundary points on each side.

property margin_left: int

How many points at the left boundary are missing in the output

property margin_right: int

How many points at the right boundary are missing in the output

class lisainstrument.sigpro.fir_filters_numpy.SemiLocalMapType(*args, **kwargs)[source]

Protocol for semi-local maps of 1D numpy arrays

This is used to describe array operations which require boundary points

property margin_left: int

How many points at the left boundary are missing in the output

property margin_right: int

How many points at the right boundary are missing in the output

lisainstrument.sigpro.fir_filters_numpy.filter_fir_numpy(fdef: DefFilterFIR, bound_left: EdgeHandling, bound_right: EdgeHandling, data: ndarray) ndarray[tuple[int], dtype[number]][source]

Apply FIR filter to 1D numpy array

Parameters:
  • fdef – The definition of the FIR filter

  • bound_left – Boundary treatment on left side

  • bound_right – Boundary treatment on right side

  • data – The 1D numpy array to be filtered

Returns:

Filtered 1D numpy array.

lisainstrument.sigpro.fir_filters_numpy.make_filter_fir_numpy(fdef: DefFilterFIR, bound_left: EdgeHandling, bound_right: EdgeHandling) Callable[[ndarray], ndarray[tuple[int], dtype[number]]][source]

Create a function that applies a given FIR filter to numpy arrays, employing the specified boundary treatment.

Parameters:
  • fdef – The definition of the FIR filter

  • bound_left – Boundary treatment on left side

  • bound_right – Boundary treatment on right side

Returns:

Function which accepts a single 1D numpy array as input and returns the filtered array.

lisainstrument.sigpro.fir_filters_numpy.make_fir_causal_kaiser(fsamp: float, attenuation: float, freq1: float, freq2: float) DefFilterFIR[source]

Create FIR filter definition for Kaiser window with given attenuation and transition band

This creates FIR coefficients from a Kaiser window specified by the desired properties of attenuation and transition band. The filter offset is set to describe a completely causal filter.

Parameters:
  • fsamp – Sampling rate [Hz]. Must be strictly positive.

  • attenuation – Required stop-band attenuation [dB]. Has to be greater zero.

  • freq1 – Start of transition band [Hz]

  • freq2 – End of transition band / start of stop band [Hz]. Has to be below Nyquist frequency.

Returns:

The FIR definition

lisainstrument.sigpro.fir_filters_numpy.make_fir_causal_normal_from_coeffs(coeffs: ArrayLike) DefFilterFIR[source]

Create causal, unity-gain FIR filter definition from coefficients.

This creates FIR coefficients from FIR coefficients. The latter are normalized first such that the filter has unity gain. The filter offset is set to describe a completely causal filter.

Parameters:

coeffs – filter coefficients

Returns:

The FIR definition

lisainstrument.sigpro.fir_filters_numpy.semilocal_map_numpy(op: SemiLocalMapType, bound_left: EdgeHandling, bound_right: EdgeHandling, data: ndarray[tuple[int], dtype[number]]) ndarray[tuple[int], dtype[number]][source]

Apply a semi local map to numpy array and employ boundary conditions.

Parameters:
  • op – the semi local mapping

  • bound_left – Boundary treatment on left side

  • bound_right – Boundary treatment on right side

  • data – the 1D array to be mapped

Returns:

The mapped data. The size is the same as the input if both boundary conditions are ZEROPAD. A boundary condition VALID reduces the output size by the corresponding margin size of the semilocal map.

IIR filters

Utilities for defining IIR filters and filtering 1D numpy arrays

This module provides basic definitions for IIR filters and functionality to apply them to 1D numpy arrays. The streams.iirfilter module uses this module for applying IIR filter to streams, i.e. chunked sata processing.

To define an IIR filter, use the class DefFilterIIR. The filters are specified by their coefficients. There are convinience functions for setting up various iir filter types: make_iir_def_derivative(), make_iir_def_cumsum(), and make_iir_def_trapezoidal().

The filter definitions can be used to create corresponding functions operating on numpy arrays. For this, use the make_filter_iir_numpy function.

When applying IIR filters, one needs to specify initial conditions to handle the left boundary. The available options are specified using the IIRFilterIC enum class.

Example use:

>>> input_data = np.linspace(0, 100, 103)
>>> iir_def = make_iir_def_cumsum()
>>> iir_filt = make_filter_iir_numpy(iir_def, IIRFilterIC.ZEROPAD)
>>> filtered_data = iir_filt(input_data)

Internally, the module is organized as follows. The main engine is the IIRCoreOp class. It implements a protocol SequentialNumpyMapType which describes an operation that can be applied repeatedly to numpy arrays and has an internal state that can change on each call. An important constraint is the convention that the result of applying a SequentialNumpyMapType operation sequentially on chunks of a larger array does not depend on the chunk sizes, but only on the array and the initial internal state. Another aspect of the SequentialNumpyMapType is to provide an internal state for use with the first array, i.e. the initial condition.

The IIRCoreOp implements the SequentialNumpyMapType for IIR filters, using scipy’s IIR filter functions. The internal state is the initial condition for the iir filter. This generalization SequentialNumpyMapType is useful when implementing such operations for chunked processing.

The filter_iir_numpy() function uses IIRCoreOp to apply a DefFilterIIR filter definition to a whole numpy array, setting up initial conditions according to boundary condition specified using the IIRFilterIC enum. Finally, the make_filter_iir_numpy() function turns the above into an unary operator, binding all arguments except the array to be filtered.

class lisainstrument.sigpro.iir_filters_numpy.DefFilterIIR(coeffs_a, coeffs_b)[source]

This dataclass defines a IIR filter

The infinite impulse response filter is given by

\[y_n = \frac{1}{a_0} \left( b_0 x_n + b_1 x_{n-1} + \ldots + b_M x_{n-M} - a_1 y_{n-1} - \ldots - a_N y_{n-N} \right)\]
coeffs_a

Filter coefficients \(a_i\)

Type:

list[float]

coeffs_b

Filter coefficients \(b_i\)

Type:

list[float]

check(_, value)[source]

Validate filter coefficients a

class lisainstrument.sigpro.iir_filters_numpy.IIRChainCoreOp(fdef: Iterable[DefFilterIIR], ic: IIRFilterIC)[source]

Function class for applying a chain of IIR filters to a numpy array

The object can be called either on a whole array or sequentially on smaller, contiguous, non-overlapping segments in strictly ascending order. For the first case, use the __call__ method. for the latter, first call the initial state method to get initial conditions for the first segment. Then use the apply method to process each segment and obtain initial conditions for processing the next segment.

apply(chunk: ndarray[tuple[int], dtype[number]], fstate: Any) tuple[ndarray[tuple[int], dtype[number]], Any][source]

Apply IIR filter chain to a segment of a 1D numpy array.

The filtered data has the same length as the input.

Parameters:
  • chunk – 1D array segment to be filtered

  • fstate – filter initial conditions for this segment

Returns:

Tuple with filtered data and initial conditions for next segment

initial_state(first: float) Any[source]

Set up initial state for IIR filter chain.

Parameters:

first – First element for first filter in the chain.

Returns:

Filter chain initial conditions

class lisainstrument.sigpro.iir_filters_numpy.IIRCoreOp(fdef: DefFilterIIR, ic: IIRFilterIC)[source]

Function class for applying IIR filter to numpy array

The object can be called either on a whole array or sequentially on smaller, contiguous, non-overlapping segments in strictly ascending order. For the first case, use the __call__ method. for the latter, first call the initial state method to get initial conditions for the first segment. Then use the apply method to process each segment and obtain initial conditions for processing the next segment.

apply(chunk: ndarray[tuple[int], dtype[number]], fstate: Any) tuple[ndarray[tuple[int], dtype[number]], Any][source]

Apply IIR filter to a segment of a 1D numpy array.

The filtered data has the same length as the input.

Parameters:
  • chunk – 1D array segment to be filtered

  • fstate – filter initial conditions for this segment

Returns:

Tuple with filtered data and initial conditions for next segment

initial_state(first: float) Any[source]

Set up initial state for IIR filter.

Parameters:

first – First element of data to be filtered

Returns:

Filter initial conditions

class lisainstrument.sigpro.iir_filters_numpy.IIRFilterIC(value)[source]

Enum for various methods of constructing initial state in IIR filters

STEADY: Initial conditions chosen such that, given some constant, applying the filter to a constant sequence of the same value will result in a constant output. Note this is not possible for all filters. It will fail for a cumsum filter. For the 1st order finite difference filter on the other hand, filtering the constant sequence will result in a sequence of zeroes.

ZEROPAD: Initial conditions chosen as if preceeding data is all zero

class lisainstrument.sigpro.iir_filters_numpy.SequentialNumpyMapType(*args, **kwargs)[source]

This protocol represents a transformation of arrays that can be applied to consecutive segments.

The transformation is computed by calling the apply method in ascending order on contiguous segments of the whole array. The transformation exposes an internal state that is obtained from the initial state and apply methods and needs to be passed along in each call to apply. The convention is that the transform result does not depend on the choice of segments boundaries. The internal state should not be used or manipulated in any way by the caller. The transformation is not allowed to have any internal state in addition to the exposed one, and is not allowed to have side effects (except for testing and debugging purposes).

apply(chunk: ndarray[tuple[int], dtype[number]], fstate: Any) tuple[ndarray[tuple[int], dtype[number]], Any][source]

The filter function

Parameters:
  • chunk – Segment of larger 1D array, either the first one or the one following the segment from the previous call.

  • fstate – filter state

Returns:

Tuple with transformed array and new filter state

initial_state(first: float) Any[source]

Compute initial conditions

Parameters:

first – first element of data to be filtered

lisainstrument.sigpro.iir_filters_numpy.filter_iir_numpy(fdef: DefFilterIIR, ic: IIRFilterIC, data: ndarray) ndarray[tuple[int], dtype[number]][source]

Filter a 1D numpy array using an IIR filter.

Parameters:
  • fdef – The definition of the IIR filter

  • ic – Which prescription to use for initial filter state

  • data – The numpy array to be filtered

Returns:

Filtered numpy array

lisainstrument.sigpro.iir_filters_numpy.get_iir_chain_steady_state(fdef: Iterable[DefFilterIIR], const: float) float[source]

Like get_iir_steady_state but for filter chains

Not all filter coefficients have a steady state, this raises an exception if not.

Parameters:
  • fdef – IIR filters of the chain in order of application

  • const – the constant value of the assumed input sequence

Returns:

The constant value of the output sequence

lisainstrument.sigpro.iir_filters_numpy.get_iir_steady_state(fdef: DefFilterIIR, const: float) float[source]

Compute constant output when applying IIR filter to infinite constant sequence

Not all filter coefficients have a steady state, this raises an exception if not.

Parameters:
  • fdef – The IIR filter definition to get steady state for

  • const – the constant value of the assumed input sequence

Returns:

The constant value of the output sequence

lisainstrument.sigpro.iir_filters_numpy.make_filter_iir_numpy(fdef: DefFilterIIR, ic: IIRFilterIC) Callable[[ndarray], ndarray[tuple[int], dtype[number]]][source]

Create a filter function that accepts a 1D numpy array and returns the array filtered according to the given filter definition.

Parameters:
  • fdef – The definition of the IIR filter

  • ic – Which prescription to use for initial filter state

Returns:

Filter function

lisainstrument.sigpro.iir_filters_numpy.make_iir_def_cumsum() DefFilterIIR[source]

Return IIR filter definition equivalent to cumulative sum

lisainstrument.sigpro.iir_filters_numpy.make_iir_def_derivative(dx: float) DefFilterIIR[source]

Return IIR filter definition for time derivative as first order finite differences

Parameters:

dx – constant grid spacing

lisainstrument.sigpro.iir_filters_numpy.make_iir_def_trapezoidal(dx: float) DefFilterIIR[source]

Return IIR filter definition equivalent to integration with trapezoidal rule

Parameters:

dx – constant grid spacing

Time shift inversion

Time coordinate inversion

Functions for inverting a 1D (time)coordinate transform given as numpy array

The inversion function internally requires an interpolation operator implementing the RegularInterpolator interface, and which is provided by the user. Use make_shift_inverse_lagrange_numpy to create an inversion operator employing Lagrange interpolation.

class lisainstrument.sigpro.shift_inversion_numpy.ShiftInverseNumpy(fsample: float, max_abs_shift: float, interp: RegularInterpolator, max_iter: int, tolerance: float)[source]

Invert time coordinate transformation given as numpy array

The purpose of this class is the inversion of a 1D coordinate transform \(v \rightarrow U(v)\) between some coordinates \(u\) and \(v\). The inverse is denoted \(u \rightarrow V(u)\), i.e. \(V(U(v)) = v\).

The transform is expressed as a shift \(\delta U(v) = U(v) - v\). It will be provided as samples \(\delta U_k = \delta U(v_k)\) at regularly spaced sample locations \(v_k = v_0 + k \Delta v\).

The inverse will be expressed as \(\delta V(u) = u - V(u)\). It will be computed for locations \(u_l = u_0 + l \Delta u\) regularly spaced with respect to the \(u\)-coordinate, i.e. the output will be the sequence \(\delta V_l = u_l - V(u_l)\).

Currently, we restrict to the special case where \(u_k = v_k\), i.e. identical offsets \(v_0 = u_0\) and spacings \(\Delta u = \Delta v\).

By convention, the coordinates refer to times, and coordinate shift, tolerance, and sample rate are all given in SI units.

property margin_left: int

Left margin size.

Specifies how many samples on the left have to be added by boundary conditions.

property margin_right: int

Right margin size.

Specifies how many samples on the right have to be added by boundary conditions.

lisainstrument.sigpro.shift_inversion_numpy.fixed_point_iter(f: Callable[[ndarray[tuple[int], dtype[number]]], ndarray[tuple[int], dtype[number]]], ferr: Callable[[ndarray[tuple[int], dtype[number]], ndarray[tuple[int], dtype[number]]], float], x: ndarray[tuple[int], dtype[number]], tolerance: float, max_iter: int) ndarray[tuple[int], dtype[number]][source]

Perform a fixed point iteration for functions operating on a 1D array.

This uses fixed-point iteration to find a solution for

\[x = f(x)\]

where \(x\) is a 1D array and \(f\) returns an array of the same size.

The convergence criterion is provided by the user via a function \(r(x)\) that returns a scalar error measure. The iteration is performed until \(r(x) < \epsilon\). If convergence is not achieved after a given number of iterations, an exception is raised.

Parameters:
  • f – The function \(f(x)\)

  • ferr – The error measure \(r(x)\)

  • x – The initial data for the iteration.

  • tolerance – The tolerance \(\epsilon\)

  • max_iter – Maximum number of iterations

Returns:

Array with solution

lisainstrument.sigpro.shift_inversion_numpy.make_shift_inverse_lagrange_numpy(order: int, fsample: float, max_abs_shift: float, max_iter: int, tolerance: float) ShiftInverseNumpy[source]

Set up ShiftInverseNumpy instance with Lagrange interpolation method.

Parameters:
  • order – Order of the Lagrange polynomials

  • fsample – Sample rate \(f_s > 0\) [s]

  • max_abs_shift – Upper limit \(S_\mathrm{max} \ge 0\) [s] for coordinate shift

  • max_iter – Maximum iterations before fail

  • tolerance – Maximum absolute error of result

Returns:

Inversion function of type ShiftInverseNumpy