Source code for lisainstrument.sigpro.iir_filters_numpy

"""Utilities for defining IIR filters and filtering 1D numpy arrays

This module provides basic definitions for IIR filters and functionality
to apply them to 1D numpy arrays. The streams.iirfilter module uses
this module for applying IIR filter to streams, i.e. chunked sata processing.

To define an IIR filter, use the class DefFilterIIR. The filters are specified
by their coefficients. There are convinience functions for setting up various
iir filter types: make_iir_def_derivative(), make_iir_def_cumsum(), and
make_iir_def_trapezoidal().

The filter definitions can be used to create corresponding functions operating
on numpy arrays. For this, use the `make_filter_iir_numpy` function.

When applying IIR filters, one needs to specify initial conditions to handle
the left boundary. The available options are specified using the IIRFilterIC
enum class.

Example use:

>>> input_data = np.linspace(0, 100, 103)
>>> iir_def = make_iir_def_cumsum()
>>> iir_filt = make_filter_iir_numpy(iir_def, IIRFilterIC.ZEROPAD)
>>> filtered_data = iir_filt(input_data)


Internally, the module is organized as follows.
The main engine is the IIRCoreOp class. It implements a protocol
SequentialNumpyMapType which describes an operation that can be applied
repeatedly to numpy arrays and has an internal state that can change
on each call. An important constraint is the convention that the result
of applying a SequentialNumpyMapType operation sequentially on chunks of
a larger array does not depend on the chunk sizes, but only on the array
and the initial internal state. Another aspect of the SequentialNumpyMapType is
to provide an internal state for use with the first array, i.e. the initial
condition.

The IIRCoreOp implements the SequentialNumpyMapType for IIR filters, using
scipy's IIR filter functions. The internal state is the initial condition
for the iir filter. This generalization SequentialNumpyMapType is useful when
implementing such operations for chunked processing.

The filter_iir_numpy() function uses IIRCoreOp to apply a DefFilterIIR filter
definition to a whole numpy array, setting up initial conditions according
to boundary condition specified using the IIRFilterIC enum. Finally, the
make_filter_iir_numpy() function turns the above into an unary operator,
binding all arguments except the array to be filtered.
"""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Iterable, Protocol

import numpy as np
from attrs import field, frozen
from scipy import signal

from lisainstrument.sigpro.types_numpy import NumpyArray1D, make_numpy_array_1d

if TYPE_CHECKING:
    from numpy.typing import ArrayLike


[docs] @frozen class DefFilterIIR: r"""This dataclass defines a IIR filter The infinite impulse response filter is given by .. math:: y_n = \frac{1}{a_0} \left( b_0 x_n + b_1 x_{n-1} + \ldots + b_M x_{n-M} - a_1 y_{n-1} - \ldots - a_N y_{n-N} \right) Attributes: coeffs_a: Filter coefficients :math:`a_i` coeffs_b: Filter coefficients :math:`b_i` """ coeffs_a: list[float] = field(converter=lambda lst: [float(e) for e in lst]) coeffs_b: list[float] = field(converter=lambda lst: [float(e) for e in lst])
[docs] @coeffs_a.validator def check(self, _, value): """Validate filter coefficients a""" if len(value) < 1: msg = "IIR filter coeff_a needs at least one entry" raise ValueError(msg) if value[0] == 0: msg = "IIR filter coeff_a[0] cannot be zero" raise ValueError(msg)
[docs] def make_iir_def_derivative(dx: float) -> DefFilterIIR: """Return IIR filter definition for time derivative as first order finite differences Arguments: dx: constant grid spacing """ coeffs_a = [1.0] coeffs_b = [1 / dx, -1 / dx] return DefFilterIIR(coeffs_b=coeffs_b, coeffs_a=coeffs_a)
[docs] def make_iir_def_cumsum() -> DefFilterIIR: """Return IIR filter definition equivalent to cumulative sum""" coeffs_a = [1.0, -1.0] coeffs_b = [1.0] return DefFilterIIR(coeffs_b=coeffs_b, coeffs_a=coeffs_a)
[docs] def make_iir_def_trapezoidal(dx: float) -> DefFilterIIR: """Return IIR filter definition equivalent to integration with trapezoidal rule Arguments: dx: constant grid spacing """ coeffs_a = [1.0, -1.0] h = 0.5 * dx coeffs_b = [h, h] return DefFilterIIR(coeffs_b=coeffs_b, coeffs_a=coeffs_a)
[docs] class IIRFilterIC(Enum): """Enum for various methods of constructing initial state in IIR filters STEADY: Initial conditions chosen such that, given some constant, applying the filter to a constant sequence of the same value will result in a constant output. Note this is not possible for all filters. It will fail for a cumsum filter. For the 1st order finite difference filter on the other hand, filtering the constant sequence will result in a sequence of zeroes. ZEROPAD: Initial conditions chosen as if preceeding data is all zero """ STEADY = 1 ZEROPAD = 2
[docs] class SequentialNumpyMapType(Protocol): # pylint: disable = too-few-public-methods """This protocol represents a transformation of arrays that can be applied to consecutive segments. The transformation is computed by calling the `apply` method in ascending order on contiguous segments of the whole array. The transformation exposes an internal state that is obtained from the `initial state` and `apply` methods and needs to be passed along in each call to `apply`. The convention is that the transform result does not depend on the choice of segments boundaries. The internal state should not be used or manipulated in any way by the caller. The transformation is not allowed to have any internal state in addition to the exposed one, and is not allowed to have side effects (except for testing and debugging purposes). """
[docs] def initial_state(self, first: float) -> Any: """Compute initial conditions Arguments: first: first element of data to be filtered """
[docs] def apply(self, chunk: NumpyArray1D, fstate: Any) -> tuple[NumpyArray1D, Any]: """The filter function Arguments: chunk: Segment of larger 1D array, either the first one or the one following the segment from the previous call. fstate: filter state Returns: Tuple with transformed array and new filter state """
[docs] class IIRCoreOp(SequentialNumpyMapType): # pylint: disable = too-few-public-methods """Function class for applying IIR filter to numpy array The object can be called either on a whole array or sequentially on smaller, contiguous, non-overlapping segments in strictly ascending order. For the first case, use the `__call__` method. for the latter, first call the `initial state` method to get initial conditions for the first segment. Then use the `apply` method to process each segment and obtain initial conditions for processing the next segment. """ def __init__(self, fdef: DefFilterIIR, ic: IIRFilterIC): """Constructor Arguments: fdef: The definition of the IIR filter to be employed ic: Which prescription to use for initial filter state """ self._fdef = fdef self._ic = ic self._a = np.array(fdef.coeffs_a) self._b = np.array(fdef.coeffs_b) def _initial_state_zero(self) -> np.ndarray: """Initial IIR filter state according to ZEROPAD prescription""" return np.zeros(max(len(self._a), len(self._b)) - 1) def _initial_state_steady(self, x0: float) -> np.ndarray: """Initial IIR filter state according to STEADY prescription Arguments: x0: value for which input constant sequence will yield constant output """ return signal.lfilter_zi(self._b, self._a) * x0
[docs] def initial_state(self, first: float) -> Any: """Set up initial state for IIR filter. Arguments: first: First element of data to be filtered Returns: Filter initial conditions """ match self._ic: case IIRFilterIC.STEADY: fstate = self._initial_state_steady(first) case IIRFilterIC.ZEROPAD: fstate = self._initial_state_zero() case _: msg = f"Initial filter state {self._ic} not implemented" raise RuntimeError(msg) return fstate
[docs] def apply(self, chunk: NumpyArray1D, fstate: Any) -> tuple[NumpyArray1D, Any]: """Apply IIR filter to a segment of a 1D numpy array. The filtered data has the same length as the input. Arguments: chunk: 1D array segment to be filtered fstate: filter initial conditions for this segment Returns: Tuple with filtered data and initial conditions for next segment """ y, fstate = signal.lfilter(self._b, self._a, chunk, zi=fstate) return make_numpy_array_1d(y), fstate
def __call__(self, data: NumpyArray1D) -> NumpyArray1D: """Apply filter to whole 1D numpy array at once. Arguments: data: The array to be filtered Returns: The filtered data, with same length as input """ fstate = self.initial_state(data[0]) y, _ = self.apply(data, fstate) return y
[docs] class IIRChainCoreOp( SequentialNumpyMapType ): # pylint: disable = too-few-public-methods """Function class for applying a chain of IIR filters to a numpy array The object can be called either on a whole array or sequentially on smaller, contiguous, non-overlapping segments in strictly ascending order. For the first case, use the `__call__` method. for the latter, first call the `initial state` method to get initial conditions for the first segment. Then use the `apply` method to process each segment and obtain initial conditions for processing the next segment. """ def __init__(self, fdef: Iterable[DefFilterIIR], ic: IIRFilterIC): """Constructor. The initial condition type is the same for all filters. An empty list of filters is legal and correponds to not applying any filter, i.e. the identity. Arguments: fdef: The definition of the IIR filter to be employed ic: Which prescription to use for initial filter state """ self._filters = [IIRCoreOp(d, ic) for d in fdef]
[docs] def initial_state(self, first: float) -> Any: """Set up initial state for IIR filter chain. Arguments: first: First element for first filter in the chain. Returns: Filter chain initial conditions """ fstate = [] for filti in self._filters: fsi = filti.initial_state(first) first = filti.apply(make_numpy_array_1d(np.array([first])), fsi)[0][0] fstate.append(fsi) return np.array(fstate)
[docs] def apply(self, chunk: NumpyArray1D, fstate: Any) -> tuple[NumpyArray1D, Any]: """Apply IIR filter chain to a segment of a 1D numpy array. The filtered data has the same length as the input. Arguments: chunk: 1D array segment to be filtered fstate: filter initial conditions for this segment Returns: Tuple with filtered data and initial conditions for next segment """ fstate_new = [] y = chunk for filti, fsi in zip(self._filters, fstate): y, nfsi = filti.apply(y, fsi) fstate_new.append(nfsi) return make_numpy_array_1d(y), np.array(fstate_new)
def __call__(self, data: NumpyArray1D) -> NumpyArray1D: """Apply filter to whole 1D numpy array at once. Arguments: data: The array to be filtered Returns: The filtered data, with same length as input """ fstate = self.initial_state(data[0]) y, _ = self.apply(data, fstate) return y
[docs] def filter_iir_numpy( fdef: DefFilterIIR, ic: IIRFilterIC, data: np.ndarray ) -> NumpyArray1D: """Filter a 1D numpy array using an IIR filter. Arguments: fdef: The definition of the IIR filter ic: Which prescription to use for initial filter state data: The numpy array to be filtered Returns: Filtered numpy array """ fmap = IIRCoreOp(fdef, ic) return fmap(make_numpy_array_1d(data))
[docs] def make_filter_iir_numpy( fdef: DefFilterIIR, ic: IIRFilterIC ) -> Callable[[np.ndarray], NumpyArray1D]: """Create a filter function that accepts a 1D numpy array and returns the array filtered according to the given filter definition. Arguments: fdef: The definition of the IIR filter ic: Which prescription to use for initial filter state Returns: Filter function """ def op(data: np.ndarray) -> NumpyArray1D: """Apply IIR filter to 1D numpy array""" return filter_iir_numpy(fdef, ic, make_numpy_array_1d(data)) return op
[docs] def get_iir_steady_state(fdef: DefFilterIIR, const: float) -> float: """Compute constant output when applying IIR filter to infinite constant sequence Not all filter coefficients have a steady state, this raises an exception if not. Arguments: fdef: The IIR filter definition to get steady state for const: the constant value of the assumed input sequence Returns: The constant value of the output sequence """ op = IIRCoreOp(fdef, IIRFilterIC.STEADY) state = op.initial_state(const) ca = np.empty(2) ca[:] = const return op.apply(ca, state)[0][-1]
[docs] def get_iir_chain_steady_state(fdef: Iterable[DefFilterIIR], const: float) -> float: """Like get_iir_steady_state but for filter chains Not all filter coefficients have a steady state, this raises an exception if not. Arguments: fdef: IIR filters of the chain in order of application const: the constant value of the assumed input sequence Returns: The constant value of the output sequence """ for d in fdef: const = get_iir_steady_state(d, const) return const