Chunked Postprocessing
Here we demonstrate how to post-process data in simulator output files, using the chunked processing infrastructure.
import logging
logging.basicConfig(level=logging.WARNING)
import numpy as np
First, we create a result file as example
from lisainstrument import Instrument
from lisainstrument import SchedulerConfigParallel, SchedulerConfigSerial
instru = Instrument(size=25000, noises_f_min_hz=5e-4, clock_f_min_hz=5e-4)
# using large cutoff freqs for example to reduce noise filter burn-in periods
WARNING:lisainstrument.instru.instru_fplan:Using default set of locking beatnote frequencies might cause interferometric beatnote frequencies to fall outside the requirement range of 5..25 MHz
path = "import_demo.h5"
cfg_parallel = SchedulerConfigParallel(chunk_size=100000, num_chunks=2, num_workers=4)
instru.export_hdf5(path=path, overwrite=True, cfgscheduler=cfg_parallel)
Interface to simulator result files
There is no need to work directly with hdf5, the information in result files is available via an interface
from lisainstrument import sim_result_file
reader = sim_result_file(path)
To find out if the file contains the basic or extended set of quantities, use
print(reader.is_extended)
False
If only the metadata is needed, one can use this interface to get an object with attributes for all simulation parameters.
metadata = reader.metadata
# dir(metadata)
print(metadata.seed)
print(metadata.mosa_jitter_eta_asds)
3003495066
{'12': 2e-09, '13': 2e-09, '21': 2e-09, '23': 2e-09, '31': 2e-09, '32': 2e-09}
When reading to memory as shown in the next section, the metadata is also part of the resulting data object and does not need to be passed around independently.
Read to memory
For small simulations we can read back all results to memory, getting the same object we would have gotten from the export_numpy method of the Instrument class.
We recall that there are two categories of results. Here we read the core (aka basic) set of results.
results1 = reader.read_core()
There is also an option to load just a portion of the results within a given time window, as shown below.
tmin2 = results1.t[0] + 1010
tmax2 = results1.t[-1] - 523
results2 = reader.read_core(t_min=tmin2, t_max=tmax2)
It is important to understand that the relation between indices and timestamps is the same for restricted and full results. The start time in the metadata is identical:
print(results1.meta_data.t0, results2.meta_data.t0)
75.0 75.0
The difference between the two is the index range of the arrays, which can be obtained as shown below. This might be a source of errors since normally the index ranges for most quantities start at zero.
print(results1.mosa_ranges.ref_carrier_fluctuations)
print(results2.mosa_ranges.ref_carrier_fluctuations)
(0, 25000)
(4040, 22907)
The timestamp properties are correct also for the restricted result. Below we use them to plot the full and restricted data.
%matplotlib inline
from matplotlib import pyplot as plt
plt.close()
plt.plot(results1.t, results1.mosa_data.ref_carrier_fluctuations["12"], "k-")
plt.plot(results2.t, results2.mosa_data.ref_carrier_fluctuations["12"], "g-")
[<matplotlib.lines.Line2D at 0x7f3773b15950>]
Postprocessing large data
We now demonstrate how to read data from a simulation result file, compute some function involving data and time, and save to another file, all in a chunked fashion that works with arbitrarily large datasets.
from lisainstrument.streams import (
stream_expression,
timestamp_stream,
store_bundle_numpy,
)
First, we define the expression we want to compute. For this we just provide a function working with ordinary numpy arrays, and then apply a decorator to turn it into an expression that accepts streams.
@stream_expression(dtype=np.float64)
def example_stream_expr(a: np.ndarray, b: np.ndarray, *, p: float) -> np.ndarray:
return a * p + b
Next, we represent the time series in our example result file as a stream bundle.
stb = reader.as_stream_bundle()
We can get the streams for each dataset using their dataset identifiers.
dsid12 = ("debug", "ref_carrier_fluctuations", "12")
s12 = stb.stream_from_dsid(dsid12)
Just as an example, we define a stream that maps the stream index to regularly spaced sample times.
sts = timestamp_stream(reader.metadata.dt, reader.metadata.t0)
Now we apply our expression to the two streams. Note how keyword-only arguments are used for parameters that are not streams.
par_p = 2.0e-6
spp = example_stream_expr(sts, s12, p=par_p)
Next, we have to add any stream we want to output to the stream bundle. Intermediate results don’t need to be added unless they should be stored as well. This step defines a permanent identifier for the stream which will be used in files. We also specify the index range on which the each stream has to be evaluated. Here we can just use the full range of the input streams since we do not apply any delays or filters. Otherwise, we would have to add margins.
rg = reader.range_by_dataset_id(dsid12)
dsidpp = ("post", "example")
stb.add(dsidpp, spp, rg)
dsidts = ("post", "time")
stb.add(dsidts, sts, rg)
Before showing the chunked processing workflow, we evaluate our new streams completely into memory, as a dictionary mapping dataset identifiers to numpy arrays.
ppdatasets = {dsidpp, dsidts}
store = store_bundle_numpy(stb, datasets=ppdatasets, cfgscheduler=cfg_parallel)
stored = store.as_dict()
Plotting our result shows what we expect.
plt.close()
t = stored[dsidts]
plt.plot(t, stored[dsidpp], "k-")
plt.plot(t, par_p * t, "r--")
[<matplotlib.lines.Line2D at 0x7f377174a990>]
Now we transfer the result directly to file, evaluated chunk by chunk without the need to fit all into memory. The function used for this creates hdf5 files structured similar to the simulator result files. One can also provide metadata that will be stored as HDF5 file attributes.
from lisainstrument.streams import store_bundle_hdf5
pppath = "postp_demo.h5"
ppmeta = {"p": par_p}
store_bundle_hdf5(pppath, stb, ppmeta, ppdatasets, overwrite=True)
Note that one can also call store_bundle_hdf5 with a h5py.File instance in place of the path, which allows to directly add further data or attributes manually in addition to those written by store_bundle_hdf5.
Reading back postprocessing result files
We can also use the file we just created as input for further stream processing. For reading generic hdf5 files, we the method below.
import h5py
from lisainstrument.streams import generic_hdf5_file_as_stream_bundle
with h5py.File(pppath) as ppfile:
stb2 = generic_hdf5_file_as_stream_bundle(ppfile, ppdatasets)
store2 = store_bundle_numpy(stb2, datasets=ppdatasets)
stored2 = store2.as_dict()
Here, we read the whole dataset into memory again, but we could as well do further processing directly to yet another file, as already demonstrated. For the stream processing, it does not matter where the stream originate from. Finally, we show that the read-back results are indeed identical.
print(np.max(np.abs(stored[dsidpp] - stored2[dsidpp])))
0.0
Note that when reading hdf5 files created by store_bundle_hdf5, one can also use a function instru_hdf5_file_as_stream_bundle in place of generic_hdf5_file_as_stream_bundle. The advantage is that
the stream index ranges written as dataset attributes by store_bundle_hdf5 are read back, while generic_hdf5_file_as_stream_bundle assumes indices starting at zero unless instructed otherwise (using the ranges parameter).