Streams
Overview
The central building block of the streams package is called a stream. A stream represents a mapping from integer indices to real-valued numbers, which can be evaluated on a finite index range of indices. Streams can depend on other streams or not.
Further, streams can be stateful or stateless. For a stateless stream, a given element does not depend on any element in the same stream with lower index. It is however allowed that a stateless stream depends on a stateful stream. Examples for stateless streams are expressions computed from the index alone or arithmetic expressions combining other streams. Examples for stateful streams are infinite response filters or cumulative summation.
An important concept is the domain of dependence. To compute a value for some index \(k\), a stream requires the values for a range of indices \(k R + D_\mathrm{F} \ldots k R + D_\mathrm{L}\) from a given dependency. The integer \(R\) is the downsampling ratio, and the integers \(D_\mathrm{F}\) and \(D_\mathrm{L}\) are offsets defining first and last required element. Those parameters do not depend on the index and must be known in advance when creating the stream. Note that the design only allows integer downsampling operations, but not upsampling or non-integer sampling ratios.
The streams framework makes no assumptions on the meaning of the index. The only relation between index spaces of different streams is given by the domains of dependence. It is also worth pointing out that the definition of a streams does not involve any boundaries for the indices, in stark contrast to working with arrays.
The concepts above are implemented by an abstract base class StreamBase which stores the dependencies of the stream and the domain of dependence for each dependency (collected in an immutable dataclass StreamDependency). In addition, StreamBase defines the data type of the stream elements and a bool property if the stream is stateful or stateless.
Most importantly, StreamBase defines an abstract method generate that must be imlemented by any concrete stream class. The purpose of this method is to compute a segment of the stream defined by first and last index. It requires as input segments already computed from its dependency streams, each covering the domain of dependence for the segment to be computed. Further, the method returns an internal state which has to be passed back into it upon evaluation of the next segment. It is mandatory that the generate method depends deterministically on its inputs alone (and immutable instance attributes) with no internal state except the one exposed by the interface.
Another important element of the streams package is the class StreamBundle. Its purpose is to collect streams that one might wish to evaluate at some point, and furnish each one with the desired index range and a human-readable unique identifier to be used when storing the results.
Identifiers are given by DatasetIdentifier, which is just a type alias for a tuple of strings. The motivation for this choice is the main use case where results are stored in hdf5 datasets within nested hdf5 groups.
The job of evaluating the streams in a StreamBundle is handled by the scheduler.store_bundle function. Besides the StreamBundle, one also has to specify some place to store results, see section below. Broadly speaking store_bundle chops the streams into chunks and collects the tasks for computing chunks from the chunks they depend on. It then calls a class providing the ExecutorProtocol protocol to actually execute those tasks. Currently, there are two choices, a serial executor and a parallel one.
Data storage
Any form of storage is represented by the DataStorage abstract base class. There are different implementations of this interface, most notably DataStorageHDF5 for storing result to HDF5 files. The DataStorage interface has an abstract method dataset. This method takes a DatasetIdentifier and an index range, and returns an ArrayTarget. An array target is a protocol that allows storing a numpy array with results for a given index range. The DataStorage also has an abstract property valid_identifiers which provides a set of DatasetIdentifier that needs to be stored.
Another ingredient is the stream type StreamStore, which is a regular stream that depends on a single other stream, and the output is just the input at the same index. The stream has a side effect that is the whole point: whenever a part of it is evaluated, it stores the result into an ArrayTarget. Results are trimmed first, storing nothing outside the requested output range even if the stream is evaluated on a larger range. It should also ensure that the whole output range is covered, without overlaps.
The scheduler.store_bundle creates a StreamStore for each stream in the StreamBundle collection that needs storing (as defined in the DataStorage). It evaluates those StreamStore streams on the required ranges, triggering the storage of the results.
The motivation for this design is to reduce the complexity of the scheduler core, which now only needs to know how to evaluate streams but nothing about data storage.
Stream types
Below we describe the streams available in the streams package. Generally, the interface provides simple functions to create streams for a given purpose, which one should use instead of setting up stream instances directly. The reason is that those functions can return different stream types based on the type of input streams. This allows for example to optimize the case constant streams.
Constant streams
The simplest stream is StreamConst which just represents a constant value. This stream plays an important role for optimization. All functions that provide operations on streams, such as filtering or arithmetic expressions, check for StreamConst inputs and try to propagate constantness. For example, applying a FIR filter to a StreamConst results in another StreamConst with value given by the original constant times the filter gain.
Index streams
The StreamIndices stream’s value at a given index is simply the index itself plus a constant integer offset. It is used for some unit tests.
Time grids
The StreamTimeGrid provides a regularly spaced time grid. It is created using the timestamp_stream function. It is important to understand that unlike arrays with sample times, a time stream has no start or end time, it simply associates a timestamp to a given index. The t0 argument to timestamp_stream simply gives the time for index 0, but one can evaluate streams on any index range.
Downsampling
The StreamDowsample stream provides a downsampled version of a single input stream, with integer downsampling ratio and an optional offset. It does not apply any antialiasing filters.
Given a downsampling ratio, the stream_downsample function creates an operator which can be applied to a stream and returns the downsampled stream. Applying the operator to constant streams results in another constant stream with same constant.
Note
It is tempting to just use the same stream to represent a downsampled constant stream, but this would cause the scheduler to abort. Each stream is assumed to have exactly one sample rate, and the scheduler figures out this rate by investigating the relations between interdependent streams.
Stream expressions
The StreamExpression stream type is used to apply functions operating on numpy arrays to streams. To compute an index range of the output, it takes the same index range from the input streams and passes them to a user-provided function that operates on numpy arrays and returns a numpy array. This function has to operate element-wise, i.e. any element of the output array depends only on elements of the input arrays with the same index.
As an optimization, StreamExpression passes scalar constants instead arrays to the function for any input stream that is a StreamConst.
This stream type is the most used one in the simulator. To simplify building stream expressions, there is a decorator stream_expression. This decorator is applied to a function accepting ordinary numpy arrays or scalars, and turns it into a function operating on streams. This means one can simply apply the decorated numpy-based function on streams to obtain a StreamExpression. If all input streams are of type StreamConst, the function is evaluated directly with those constants and the result is turned into another StreamConst stream. When applying the decorator, one has to specify the resulting data type as argument to the decorator function, for example @stream_expression(np.float64). For convenience, one can use keyword arguments to pass non-stream arguments to the decorated function, which are passed through.
Currently, streams do not implement arithmetic operators, and StreamExpression is used to create arithmetic expressions of streams. This might change in the future. However, defining a dedicated function for each expression has advantages. First, it keeps the stream graph simpler since there is not a StreamExpression for each arithmetic operation in an expression. This can help to reduce the memory requirement for evaluating streams, which roughly scales with the number of streams. Second, it provides a place to document the expressions.
An important application of StreamExpression is to evaluate functions of time as stream. The function stream_func_of_time turns a function of time (given as a FuncOfTimeTypes type) into a function operating on streams of type StreamTimeGrid, returning a StreamExpression that evaluates the FuncOfTime for the sample times from the StreamTimeGrid. If applied to a ConstFuncOfTime, stream_func_of_time returns a StreamConst.
Filtering streams
The stream types StreamFIR and StreamIIR allow to apply FIR filters and IIR filters to streams. Those stream types are simple wrappers around the filter functionality from the sigpro package. The user-facing interface is given by the functions stream_filter_iir, stream_filter_iir_chain, and stream_filter_fir. Those function turn a filter definition into a filter operator working on streams.
There is an important difference to the numpy-based sigpro tools. Since FIR filters have a domain of dependence greater than one, they require a margin around the input. The FIR filter tools in sigpro have the option to apply boundary conditions to fill in such margins. For streams, boundary conditions are not used at all. Instead, the stream to be filtered stream will be evaluated on a larger range than the filtered output stream to account for the margins. This is very important for the use in the simulator, taking into account the design goal that all output should be valid, eliminating the need to trim margins from results.
For IIR filters, one still has to specify howto set up the initial filter state, exactly as in sigpro. This is because one cannot simply evaluate the input stream on infinitely many preceeding points. In practice, IIR filters produce a certain amount of samples impacted by the initial condition choice. To account for this, the streams framework has an option to force a stream to be evaluated a given number of samples before the elements actually required by any dependent stream. This is what the prefix_size argument to StreamBase is about. The stream_filter_iir and stream_filter_iir_chain have a burn_in_size argument for the prefix size.
Time delays
There are two stream types for handling time shifts. Constant time shifts are handled by the StreamDelayFix stream. The StreamDelayDyn allows shifting one stream by an amount given by another stream. Both stream types assume that the input streams provide samples for regularly spaced sample times. The sample rate has to be provided when creating the streams. For StreamDelayDyn, it is also assumed that the time shift stream and the sample stream share the same timestamps.
The interpolation is performed by the interpolators provided by the sigpro package. For StreamDelayDyn, one choses the interpolation method by providing a RegularInterpolator, see sigpro documentation. Further, one has to provide the minimum and maximum supported delay in advance. The reason is that the domain of dependence for the stream must cover all valid delays, and has to be provided when creating the stream. For StreamDelayFix, one has to specify an interpolator specialized to fixed shifts, by providing a FixedShiftFactory ( see sigpro package).
The only use case in the simulator is Lagrange interpolation. For this, there is a function stream_delay_lagrange. Given interpolation order, sample rate, as well as minimum and maximum allowed delay, this returns an operator accepting two streams as input. Based on the stream types, it returns either a StreamDelayDyn, a StreamDelayFix, or a StreamConst. In detail, delaying a constant stream always results in the same stream, delaying a non-const stream by a StreamConst stream results in a StreamDelayFix, and delaying a non-const stream by another non-const stream results in a StreamDelayDyn.
Time shifts can be positive or negative, there is no concept of causality. Evaluating a shifted stream will simply trigger the evaluation of the input streams on the required interval. The required interval also includes any margins needed by the interpolation.
Unlike the sigpro package, the streams package provides only relative time shifts, but no interpolation to arbitrary absolute time coordinates. This is a limitation due to the fixed domain of dependence for streams.
Finally, there is a stream StreamShiftInv for inverting a given time shift. This is only an approximation, using the same fixed point iteration as the corresponding functions in the sigpro package. The difference is that the streams version applies the iteration on each evaluated chunk instead of globally. For a specified error tolerance, the number of iterations at a given element can therefore differ between the two implementations. As for time shifts, there is a convenience function stream_shift_inv_lagrange specifically for using Lagrange interpolation. The stream operator created by this function also has an optimization if the time shift to be inverted is a StreamConst.
Noise streams
The noisy package provides classes for noise definitions as well as methods for generating noise samples as numpy arrays. The streams package builds upon this to provide two different ways of creating noise streams. Those are described in the noise generation chapter.
One option to build a noise stream is the streams.noise.stream_noise function, the other alternative is the streams.noise_alt.stream_noise function. Both provide the same signature, creating a noise stream from a noise definition and a seed value for random number generation. There is also a shortcut streams.stream_noise which currently defaults to streams.noise_alt.stream_noise. The two are equivalent with regard to the results, but may slightly differ in performance.
Note
One difference to previous noise handling in the simulator is the following. Since streams are evaluated on a range that is needed by any dependent stream, there is no explicit control over the start index to be evaluated. Some noise types employ time integration of other noises to modify the resulting PSD. Therefore, the integration constant implicitly depends on the evaluated index range. In effect, such noises have a constant part that is not under direct control. Changing for example some interpolation order of some time delay may change the required margins and thus the evalauted range of some noise stream and thus the constant part of the noise. In practice this is only a problem for noise models that are a bit sketchy to begin with. Technically, using time integration of a given stationary noise may result in non-stationary noise, e.g. noise that can grow without bounds. It is planned to replace time integration by more sophisticated methods for shaping PSDs.
Evaluating streams
Evaluating streams is handled by the store_bundle function defined in the scheduler module. The internal operation can be divided in three parts.
The first part consists of turning the dependencies between the streams into a directed acyclic graph and to analyse this graph using standard algorithms. Since we only need a few basic graph algorithms, the streams package contains its own implementations in the graph_util module instead of using graphing libraries such as NetworkX. The StreamGraph data class describes both the basic graph of stream dependencies and in addition keeps track of the domains of dependence in the time direction. The scheduler uses the graph_util module to extract all graph properties needed for its purposes, and collect those in a StreamPlan class.
First, the scheduler determines which streams are required to evaluate the requested output streams, an removes everything else from the graph. It is therefore not wasteful to define a StreamCollection with everything that can be computed while evaluating only a subset.
Next, the scheduler splits the graph into connected components. If there are disconnected components, they are evaluated one after the other, reducing the peak memory requirement. This case is relevant for pure data transfer streams, copying data from one storage location to another. It could also emerge in postprocessing tasks, allowing to collect all tasks in one big StreamBundle without memory penalties.
For each connected component, the scheduler orders the graph into what we call layers. A layer is a set of streams and a rank, such that all streams in a layer depend only on streams contained in layers of lower rank. The lowest rank contains all streams that have no dependencies, i.e. the roots of the graph. The highest rank contains streams not needed by other streams, but for which output was requested (an output stream can have lower rank if it is also needed to compute other streams).
Further, the scheduler determines the minimal range for each stream required to evaluate all outputs stream on the requested ranges. It also defines a sort of sampling rate for each stream, such that the downsample ratio for a stream dependency is the ratio of those sample rates. The unit is irrelevant but chosen such that all rates are integers. This integer sample rate is used to divide time into chunks. The user-provided chunk size is defined with respect to the highest integer sample rate.
The second part of scheduling consists of defining tasks for computing individual chunks of each stream, and the dependencies between those tasks. All those compute tasks share a common structure. For each stream dependency, they require a segment of input data that covers the range required to compute the single output data segment. The core of each task is the generate method of the corresponding stream, which is called with the input segments. Another type of task deals with logistics of memory management, copying the portion of a segment still needed by any remaining tasks and discarding the rest.
The scheduler uses the following algorithm to define the tasks. We will phrase it as if the tasks are executed directly by the scheduler, and describe the technicalities later. First, the scheduler calls the compute method for each stream in layer of rank zero. The segment sizes are based on the integer sample rates such that each stream segment covers more or less the same time interval. The first segments for a given stream starts at the beginning of the full required range. After all output segments for layer zero are generated, the generate methods for the streams in the next layer are called. For each stream, the new output segment is computed up to the maximum index possible, given the available input segments from the previous step. This is repeated for each layer in turn. Before repeating the above for the next chunk in time, the produced segments are culled, discarding any data that is not needed for computing the next chunk. When computing a chunk, any of those culled segments from the previous chunk are joined to the newly computed segments for a given stream, and the joined segment is used in the evaluation of dependent streams. The generate methods of streams can handle input segments covering more than the range required to evaluate the specified output range, which can happen if the same segment is used as input to more than one stream.
The third part of the scheduling is the task execution. The above tasks are not executed directly by the scheduler. Instead, it calls corresponding methods of a low-level engine called the executor. The executor is an instance implementing the ExecutorProtocol protocol. Currently there are two executor implementations. The first simply executes a task immediately and manages the storage of segments. This excutor is purely serial, using only a single CPU core. The other implementation is based on the dask package. When called by the scheduler, it creates a dask delayed task, building a dask task graph. This can then be executed by dask in parallel, using multithreading.
So far, we have omitted an important operation called checkpointing. Checkpointing means that all tasks need to be completed before moving beyond the checkpoint, and only the minimum amount of data needed for the computations after the checkpoint is stored. The scheduler defines a checkpoint every few chunks, as specified by a user parameter. For the serial executor, checkpointing is a trivial non-operation. For the dask-based executor, it is crucial to prevent uncontrolled growth of required memory or piling up of tasks. The checkpoint is when the dask graph is executed, storing the checkpointed data in numpy arrays. After each checkpoint, a new dask graph is started. The module streams.scheduler_serial defines the serial executor, and the module streams.scheduler_dask the dask-based one.
Another detail omitted above is that streams can have internal state (for example IIR filters). This state is managed similarly to the output segment. The task of computing a segment also depends on the state returned when computing the previous segment. One particularity is given by the first segments, for which an initial state has to be computed. This is implemented by the scheduler passing None as state. The generate methods of the streams check for this case and call their initial state methods.
To manage segments, we use data classes that contain both the data as well as the index range this data represents within the stream. Those implement a protocol Segment. There are two Segment implementations, SegmentArray and SegmentConst. This former stores the data as a numpy array. The latter represents constant data and just stores a scalar. SegmentConst is used to optimize the case of StreamConst. The generate methods of the stream types need to accept both SegmentArray and SegmentConst as input.
Note
Earlier designs for introducing chunked processing in the simulator (aka the daskification) did not use streams, but dask arrays, which are conceptually quite similar but more general. In particular, they can be multi-dimensional and there is no explicit time coordinate along which to divide the work. As it turns out, the heuristic scheduling algorithm used in dask was not suitable for processing long simulations. Dask kept starting new parallel tasks for layer zero along the time direction, piling up segments still needed in the other layers and eventually exhausting all memory. In the current design, dask is not essential. It is not used at all with the serial executor, and the parallel executor only uses it to execute tasks in parallel. This might be done using other multithreading methods in the future.