ParticleDA.jl
ParticleDA.jl is a Julia package to perform data assimilation with particle filters, distributed using MPI.
Installation
To install the latest stable version of the package, open the Julia REPL, enter the package manager with ], then run the command
add ParticleDA.jlIf you plan to develop the package (make changes, submit pull requests, etc), in the package manager mode run this command
dev ParticleDAThis will automatically clone the repository to your local directory ~/.julia/dev/ParticleDA.
You can exit from the package manager mode by pressing CTRL + C or, alternatively, the backspace key when there is no input in the prompt.
Usage
After installing the package, you can start using it in Julia's REPL with
using ParticleDAThe run the particle filter you can use the function run_particle_filter:
ParticleDA.run_particle_filter — Functionrun_particle_filter(
    init_model,
    input_file_path,
    observation_file_path,
    filter_type=BootstrapFilter,
    summary_stat_type=MeanAndVarSummaryStat;
    rng=Random.TaskLocalRNG()
) -> Tuple{Matrix, Union{NamedTuple, Nothing}}Run particle filter. init_model is the function which initialise the model, input_file_path is the path to the YAML file with the input parameters. observation_file_path is the path to the HDF5 file containing the observation sequence to perform filtering for. filter_type is the particle filter type to use.   See ParticleFilter for the possible values. summary_stat_type is a type  specifying the summary statistics of the particles to compute at each time step. See  AbstractSummaryStat for the possible values. rng is a random number generator to use to generate random variates while filtering - a seeded random  number generator may be specified to ensure reproducible results. If running with multiple threads a thread-safe generator such as Random.TaskLocalRNG (the default) must be used.
Returns a tuple containing the state particles representing an estimate of the filtering distribution at the final observation time (with each particle a column of the returned matrix) and a named tuple containing the estimated summary statistics of this final  filtering distribution. If running on multiple ranks using MPI, the returned states array will correspond only to the particles local to this rank and the summary statistics will be returned only on the master rank with all other ranks returning nothing for their second return value.
To simulate observations from the model (which can be used to for example test the filtering algorithms) you can use the function simulate_observations_from_model
ParticleDA.simulate_observations_from_model — Functionsimulate_observations_from_model(
    init_model, input_file_path, output_file_path; rng=Random.TaskLocalRNG()
) -> MatrixSimulate observations from the state space model initialised by the init_model function with parameters specified by the model key in the input YAML file at  input_file_path and save the simulated observation and state sequences to a HDF5 file at output_file_path. rng is a random number generator to use to generate random variates while simulating from the model - a seeded random number generator may be specified to ensure reproducible results.
The input YAML file at input_file_path should have a simulate_observations key with value a dictionary with keys seed and n_time_step corresponding to respectively the number of time steps to generate observations for from the model and the seed to use to initialise the state of the random number generator used to simulate the observations.
The simulated observation sequence is returned as a matrix with columns corresponding to the observation vectors at each time step.
The filter_type argument to run_particle_filter should be a concrete subtype of the ParticleFilter abstract type.
ParticleDA.ParticleFilter — TypeParticleFilterAbstract type for particle filters. Currently implemented subtypes are:
ParticleDA.BootstrapFilter — TypeBootstrapFilter <: ParticleFilterSingleton type BootstrapFilter.  This can be used as argument of  run_particle_filter to select the bootstrap filter.
ParticleDA.OptimalFilter — TypeOptimalFilter <: ParticleFilterSingleton type OptimalFilter.  This can be used as argument of  run_particle_filter to select the optimal proposal filter (for conditionally linear-Gaussian models).
The summary_stat_type argument to run_particle_filter should be a concrete subtype of the AbstractSummaryStat abstract type.
ParticleDA.AbstractSummaryStat — TypeAbstractSummaryStatAbstract type for summary statistics of particle ensemble. Concrete subtypes can be passed as the filter_type argument to run_particle_filter to specify which summary statistics to record and how they are computed.
See also: AbstractSumReductionSummaryStat,  AbstractCustomReductionSummaryStat.
ParticleDA.AbstractCustomReductionSummaryStat — TypeAbstractCustomReductionSummaryStat <: AbstractSummaryStatAbstract type for summary statistics computed using custom MPI reductions. Allows greater flexibility in computing statistics which can support more numerically stable implementations, but at a cost of not being compatible with all CPU architectures. In particular, MPI.jl does not currently support custom operators  on Power PC and ARM architecures.
ParticleDA.AbstractSumReductionSummaryStat — TypeAbstractSumReductionSummaryStat <: AbstractSummaryStatAbstract type for summary statistics computed using standard MPI sum reductions. Compatible with a wider range of CPU architectures but may require less numerically stable implementations.
ParticleDA.MeanAndVarSummaryStat — TypeMeanAndVarSummaryStat <: AbstractCustomReductionSummaryStatCustom reduction based summary statistic type which computes the means and variances o the particle ensemble for each state dimension. On CPU architectures which do not support custom reductions NaiveMeanAndVarSummaryStat can be used instead.
ParticleDA.MeanSummaryStat — TypeMeanSummaryStat <: AbstractCustomReductionSummaryStatCustom reduction based summary statistic type which computes the means of the particle ensemble for each state dimension. On CPU architectures which do not support custom reductions NaiveMeanSummaryStat can be used instead.
ParticleDA.NaiveMeanAndVarSummaryStat — TypeNaiveMeanAndVarSummaryStat <: AbstractSumReductionSummaryStatSum reduction based summary statistic type which computes the means and variances of the particle ensemble for each state dimension. The mean and variance are computed by directly accumulating the  sums of the particle values, the squared particle values and number of particles on each rank, with the variance computed as the scaled difference between the sum of the squares and square of the sums. This 'naive' implementation avoids custom MPI reductions but can be numerically unstable for large ensembles or state components with large values. If custom reductions are supported by the CPU architecture in use the more numerically stable MeanAndVarSummaryStat should be used instead.
ParticleDA.NaiveMeanSummaryStat — TypeNaiveMeanSummaryStat <: AbstractSumReductionSummaryStatSum reduction based summary statistic type which computes the means of the particle ensemble for each state dimension. The mean is computed by directly accumulating the  sums of the particle values and number of particles on each rank. If custom reductions  are supported by the CPU architecture in use the more numerically stable  MeanSummaryStat should be used instead.
The next section details how to write the interface between the model and the particle filter.
Interfacing the model
The model needs to define a custom data structure and a few functions, that will be used by run_particle_filter:
- a custom structure which holds the data about the model. This will be used to dispatch the methods to be defined, listed below;
 - an initialisation function with the following signature:
withinit(params_dict::Dict, n_tasks::Integer) -> modelparams_dicta dictionary with the parameters of the model andn_tasksan integer specifying the maximum number tasks (coroutines) parallelisable operations will be scheduled over. This initialisation function should create an instance of the model data structure and return it. The value ofn_taskscan be used to create task-specific buffers for writing to when computing the model updates to avoid reallocating data structures on each function call. As tasks may be run in parallel over multiple threads, any buffers used in functions called within tasks should be unique to the task; to facilitate this functions in the model interface (see below) which may be called within tasks scheduled in parallel are passed atask_indexargument which is a integer index in1:n_taskswhich is guaranteed to be unique to a particular task and so can be used to index in to task specific buffers. - The model needs to extend the following methods, using the model type for dispatch:
 
ParticleDA.get_state_dimension — FunctionParticleDA.get_state_dimension(model) -> IntegerReturn the positive integer dimension of the state vector which is assumed to be fixed for all time steps.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.get_observation_dimension — FunctionParticleDA.get_observation_dimension(model) -> IntegerReturn the positive integer dimension of the observation vector which is assumed to be fixed for all time steps.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.get_state_eltype — FunctionParticleDA.get_state_eltype(model) -> TypeReturn the element type of the state vector which is assumed to be fixed for all time steps.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.get_observation_eltype — FunctionParticleDA.get_observation_eltype(model) -> TypeReturn the element type of the observation vector which is assumed to be fixed for all time steps.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.sample_initial_state! — FunctionParticleDA.sample_initial_state!(state, model, rng, task_index=1)Sample value for state vector from its initial distribution for model described by  model using random number generator rng to generate random draws and writing to state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.update_state_deterministic! — FunctionParticleDA.update_state_deterministic!(state, model, time_index, task_index=1)Apply the deterministic component of the state time update at discrete time index  time_index for the model described by model for the state vector state writing the updated state back to the state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.update_state_stochastic! — FunctionParticleDA.update_state_stochastic!(state, model, rng, task_index=1)Apply the stochastic component of the state time update for the model described by model for the state vector state, using random number generator rng to generate random draws and writing the updated state back to the state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.sample_observation_given_state! — FunctionParticleDA.sample_observation_given_state!(
    observation, state, model, rng, task_index=1
)Simulate noisy observations of the state state of model described by model and write to observation array using rng to generate any random draws.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.get_log_density_observation_given_state — FunctionParticleDA.get_log_density_observation_given_state(
    observation, state, model, task_index=1
) -> RealReturn the logarithm of the probability density of an observation vector observation given a state vector state for the model associated with model. Any additive  terms that are constant with respect to the state may be neglected.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.write_model_metadata — FunctionParticleDA.write_model_metadata(file::HDF5.File, model)Write metadata for with the model described by model to the HDF5 file file.
This method is intended to be extended by the user with the above signature, specifying the type of model.
- Optionally, if the model has additive Gaussian observation and state noise, it may also extend the following methods, again using the model type for dispatch, to allow using the more statistically efficient 
OptimalFilterfor filtering 
ParticleDA.get_observation_mean_given_state! — FunctionParticleDA.get_observation_mean_given_state!(
    observation_mean, state, model, task_index=1
)Compute the mean of the multivariate normal distribution on the observations given the current state and write to the first argument.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with  the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_state_noise — FunctionParticleDA.get_covariance_state_noise(model, i, j) -> RealReturn covariance cov(U[i], U[j]) between components of the zero-mean Gaussian state  noise vector U.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with  the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_observation_noise — FunctionParticleDA.get_covariance_observation_noise(model, i, j) -> RealReturn covariance cov(V[i], V[j]) between components of the zero-mean Gaussian  observation noise vector V.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with  the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_state_observation_given_previous_state — FunctionParticleDA.get_covariance_state_observation_given_previous_state(
    model, i, j
) -> RealReturn the covariance cov(X[i], Y[j]) between components of the state vector  X = F(x) + U and observation vector Y = H * X + V where H is the linear  observation operator, F the (potentially non-linear) forward operator describing the  deterministic state dynamics, U is a zero-mean Gaussian state noise vector, V is a  zero-mean Gaussian observation noise vector and x is the state at the previous  observation time.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with  the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_observation_observation_given_previous_state — FunctionParticleDA.get_covariance_observation_observation_given_previous_state(
    model, i, j
) -> RealReturn covariance cov(Y[i], Y[j]) between components of the observation vector  Y = H * (F(x) + U) + V where H is the linear observation operator, F the  (potentially non-linear) forward operator describing the deterministic state dynamics, U is a zero-mean Gaussian state noise vector, V is a zero-mean Gaussian observation noise vector and x is the state at the previous observation time.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with  the optimal proposal filter implementation in OptimalFilter.
Other function you may want to extend
ParticleDA.get_state_indices_correlated_to_observations — FunctionParticleDA.get_state_indices_correlated_to_observations(
    model
) -> AbstractVector{Int}Return the vector containing the indices of the state vector X which at least one of  the observations Yare correlated to, that isi ∈ 1:getstatedimension(model)such thatcov(X[i], Y[j]) > 0for at least onej ∈ 1:getobservationdimension(model). This is used to avoid needing to compute and store zero covariance terms. Defaults to returning all state indices which will always give correct results but will be inefficient if there are zero blocks incov(X, Y)`.
ParticleDA.get_initial_state_mean! — FunctionParticleDA.get_initial_state_mean!(state_mean, model)Compute the mean of the multivariate normal distribution on the initial state and write to the first argument.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering linear-Gaussian models with the Kalman filter for testing.
ParticleDA.write_snapshot — FunctionParticleDA.write_snapshot(
    output_filename, model, filter_data, states, time_index, save_states
)Write a snapshot of the model and filter states to the HDF5 file output_filename for the model and filters described by model and filter_data respectively at time index time_index, optionally saving the current ensemble of state particles represented by the two-dimensional array states (first axis state component, second particle index) if save_states == true. time_index == 0 corresponds to the initial model and filter states before any updates and non-time dependent model data will be written out when called with this value of time_index.
ParticleDA.write_state — FunctionParticleDA.write_state(
    file::HDF5.File,
    state::AbstractVector{T},
    time_index::Int,
    group_name::String,
    model
)Write the model state at time index time_index represented by the vector state to  the HDF5 file file with group_name for the model represented by model.
ParticleDA.get_covariance_initial_state — FunctionParticleDA.get_covariance_initial_state(model, i, j) -> RealReturn covariance cov(X[i], X[j]) between components of the initial state vector X.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering linear-Gaussian models with the Kalman filter for testing.
ParticleDA.init_filter — FunctionParticleDA.init_filter(filter_params, model, nprt_per_rank, ::T) -> NamedTupleInitialise any data structures required by filter of type T, with filtering specific parameters specified by filter_params, state space model to perform filtering with described by model and nprt_per_rank particles per MPI rank.
New filter implementations should extend this method specifying T as the appropriate singleton type for the new filter.
ParticleDA.sample_proposal_and_compute_log_weights! — FunctionParticleDA.sample_proposal_and_compute_log_weights!(
    states, log_weights, observation, model, filter_data, ::T, rng
)Sample new values for two-dimensional array of state vectors states from proposal distribution writing in-place to states array and compute logarithm of unnormalized  particle weights writing to log_weights given observation vector observation, with state space model described by model, named tuple of filter specific data structures filter_data, filter type T and random number generator rng used to generate any random draws.
New filter implementations should extend this method specifying T as the appropriate singleton type for the new filter.
ParticleDA.write_observation — FunctionParticleDA.write_observation(
    file::HDF5.File,
    observation::AbstractVector,
    time_index::Int,
    model
)Write the observations at time index time_index represented by the vector observation to the HDF5 file file for the model represented by model.
ParticleDA.write_weights — FunctionParticleDA.write_weights(
    file::HDF5.File,
    weights::AbstractVector{T},
    time_index::Int,
    model
)Write the particle weights at time index time_index represented by the vector  weights to the HDF5 file file for the model represented by model.
ParticleDA.get_initial_state_mean — FunctionParticleDA.get_initial_state_mean(model)Compute the mean of the multivariate normal distribution on the initial state.
Input parameters
You can store the input parameters in an YAML file with the following structure
filter:
  key1: value1
model:
  model_name1:
    key2: value2
    key3: value3
  model_name2:
    key4: value4
    key5: value5The parameters under filter are related to the particle filter, under model you can specify the parameters for different models.
The particle filter parameters are saved in the following data structure:
ParticleDA.FilterParameters — TypeFilterParameters()Parameters for ParticleDA run. Keyword arguments:
master_rank: Id of MPI rank that performs serial computationsverbose::Bool: Flag to control whether to write outputoutput_filename::String: Name of output filenprt::Int: Number of particles for particle filterenable_timers::Bool: Flag to control run time measurementsparticle_save_time_indices: Set of time indices to save particles atseed: Seed to initialise state of random number generator used for filteringn_tasks: Number of tasks to use for running parallelisable operations. Positive integers indicate the number of tasks directly, while the absolute value of negative integers indicate the number of task to use per-thread (as reported byThreads.nthreads()). Using multiple tasks per thread will improve the ability of the scheduler to balance load across threads but potentially increase overheads. If simulation of the model being filtered use multiple threads then it may be beneficial to set then_tasks = 1to avoid too much contention between threads.
Example: estimating the state in a tsunami model
A full example of a model interfacing ParticleDA is available in test/models/llw2d.jl.  This model represents a simple two dimensional simulation of tsunami dynamics and is  partly based on the tsunami data assimilation code by  Takuto Maeda. The particle filter can be run with observations simulated from the model as follows
# Load ParticleDA
using ParticleDA
# Save some variables for later use
test_dir = joinpath(dirname(pathof(ParticleDA)), "..", "test")
llw2d_src = joinpath(test_dir, "models", "llw2d.jl")
input_file = joinpath(test_dir, "integration_test_1.yaml")
observation_file = tempname()
# Instantiate the test environment
using Pkg
Pkg.activate(test_dir)
Pkg.instantiate()
# Include the sample model source code and load it
include(llw2d_src)
using .LLW2d
# Simulate observations from the model to use 
simulate_observations_from_model(LLW2d.init, input_file, observation_file)
# Run the (optimal proposal) particle filter with simulated observations computing the
# mean and variance of the particle ensemble. On non-Intel architectures you may need
# to use NaiveMeanAndVarSummaryStat instead
final_states, final_statistics = run_particle_filter(
  LLW2d.init, input_file, observation_file, OptimalFilter, MeanAndVarSummaryStat
)Parameters of the test model
The Input parameters section shows how to pass the input parameters for the filter and the model.  For the model included in the test suite, called llw2d, you can set the following parameters:
Main.LLW2d.LLW2dModelParameters — TypeLLW2dModelParameters()Parameters for the linear long wave two-dimensional (LLW2d) model. Keyword arguments:
nx::Int: Number of grid points in the x directionny::Int: Number of grid points in the y directionx_length::AbstractFloat: Domain size in metres in the x directiony_length::AbstractFloat: Domain size in metres in the y directiondx::AbstractFloat: Distance in metres between grid points in the x directiondy::AbstractFloat: Distance in metrtes between grid points in the y directionstation_filename::String: Name of input file for station coordinatesn_stations_x::Int: Number of observation stations in the x direction (if using regular grid)n_stations_y::Int: Number of observation stations in the y direction (if using regular grid)station_distance_x::Float: Distance in metres between stations in the x direction (if using regular grid)station_distance_y::Float: Distance in metres between stations in the y direction (if using regular grid)station_boundary_x::Float: Distance in metres between bottom left edge of box and first station in the x direction (if using regular grid)station_boundary_y::Float: Distance in metres between bottom left edge of box and first station in the y direction (if using regular grid)n_integration_step::Int: Number of sub-steps to integrate the forward model per time steptime_step::AbstractFloat: Time step length in secondspeak_position::Vector{AbstractFloat}: The `[x, y] coordinates in metres of the initial wave peakpeak_height::AbstractFloat: The height in metres of the initial wave peaksource_size::AbstractFloat: Cutoff distance in metres from the peak for the initial wavebathymetry_setup::AbstractFloat: Bathymetry set-uplambda::AbstractFloat: Length scale for Matérn covariance kernel in background noisenu::AbstractFloat: Smoothess parameter for Matérn covariance kernel in background noisesigma::AbstractFloat: Marginal standard deviation for Matérn covariance kernel in background noiselambda_initial_state::AbstractFloat: Length scale for Matérn covariance kernel in initial state of particlesnu_initial_state::AbstractFloat: Smoothess parameter for Matérn covariance kernel in initial state of particlessigma_initial_state::AbstractFloat: Marginal standard deviation for Matérn covariance kernel in initial state of particlespadding::Int: Min padding for circulant embedding gaussian random field generatorprimes::Int: Whether the size of the minimum circulant embedding of the covariance matrix can be written as a product of small primes (2, 3, 5 and 7). Default istrue.use_peak_initial_state_mean::Bool: Whether to set mean of initial height field to a wave peak (true) or to all zeros (false). In both cases the initial mean of the other state variables is zero.absorber_thickness_fraction::Float: Thickness of absorber for sponge absorbing boundary conditions, fraction of grid sizeboundary_damping::Float: Damping for boundariescutoff_depth::Float: Shallowest water depthobs_noise_std::Vector: Standard deviations of noise added to observations of the true stateobserved_state_var_indices::Vector: Vector containing the indices of the observed state variables (1: height, 2: velocity x-component, 3: velocity y-component)
Observation station coordinates
The coordinates of the observation stations can be set in two different ways.
Provide the coordinates in an input file. Set the parameter
station_filenameto the name of your input file. The input file is in plain text, the format is one row per station containing x, y - coordinates in metres. Here is a simple example with two stations# Comment lines starting with '#' will be ignored by the code # This file contains two stations: at [1km, 1km] and [2km, 2km] 1.0e3, 1.0e3 2.0e3, 2.0e3Provide parameters for an equispaced rectilinear grid of observation stations. The values of these parameters should then be set:
n_stations_x: Number of observation stations in the x directionn_stations_y: Number of observation stations in the y directionstation_distance_x: Distance between stations in the x direction (m)station_distance_y: Distance between stations in the y direction (m)station_boundary_x: Distance between bottom left edge of box and first station in the x direction (m)station_boundary_y: Distance between bottom left edge of box and first station in the y direction (m)
As an example, one could set
n_stations_x=2, n_stations_y=2, station_distance_x=1.0e3, station_distance_y=1.0e3, station_boundary_x=10.0e3, station_boundary_y=10.0e3,to generate 4 stations at
[10km, 10km],[10km, 11km],[11km, 10km]and[11km, 11km].
Output
If the filter parameter verbose is set to true, run_particle_filter will produce an HDF5 file in the run directory. The file name is particle_da.h5 by default (this is configurable using the output_filename filter parameter). The file contains the summary statistics of the estimate state distribution (by default the mean and variance), particle weights, parameters used, and other metadata at each time step observation were assimilated. To read the output file, use the HDF5 library.
A basic plotting tool is provided in a Jupyter notebook. This is intended as a template to build more sophisticated postprocessing tools, but can be used for some simple analysis. Set the variable timestamp in the third cell to plot different time slices from the output file. More functionality may be added as the package develops.
Running in parallel
The particle state update is parallelised using both MPI and threading. According to our preliminary tests both methods work well at small scale. To use the threading, set the environment variable JULIA_NUM_THREADS to the number of threads you want to use before starting Julia and then call the run_particle_filter function normally. You can check the number of threads julia has available by calling in Julia's REPL
Threads.nthreads()To use the MPI parallelisation, write a Julia script that calls the run_particle_filter function for the relevant model and observations and run it in an Unix shell with
mpirun -np <your_number_of_processes> julia <your_julia_script>Note that the parallel performance may vary depending on the performance of the algorithm. In general, a degeneracy of the particle weights will lead to poor load balance and parallel performance. See this issue for more details.
Testing
We have a basic test suite for ParticleDA.jl.  You can run the tests by entering the package manager mode in Julia's REPL with ] and running the command
test ParticleDALicense
The ParticleDA.jl package is licensed under the MIT "Expat" License.