ParticleDA.jl
ParticleDA.jl
is a Julia package to perform data assimilation with particle filters, distributed using MPI.
Installation
To install the latest stable version of the package, open the Julia REPL, enter the package manager with ]
, then run the command
add ParticleDA.jl
If you plan to develop the package (make changes, submit pull requests, etc), in the package manager mode run this command
dev ParticleDA
This will automatically clone the repository to your local directory ~/.julia/dev/ParticleDA
.
You can exit from the package manager mode by pressing CTRL + C
or, alternatively, the backspace key when there is no input in the prompt.
Usage
After installing the package, you can start using it in Julia's REPL with
using ParticleDA
The run the particle filter you can use the function run_particle_filter
:
ParticleDA.run_particle_filter
— Functionrun_particle_filter(
init_model,
input_file_path,
observation_file_path,
filter_type=BootstrapFilter,
summary_stat_type=MeanAndVarSummaryStat;
rng=Random.TaskLocalRNG()
) -> Tuple{Matrix, Union{NamedTuple, Nothing}}
Run particle filter. init_model
is the function which initialise the model, input_file_path
is the path to the YAML file with the input parameters. observation_file_path
is the path to the HDF5 file containing the observation sequence to perform filtering for. filter_type
is the particle filter type to use. See ParticleFilter
for the possible values. summary_stat_type
is a type specifying the summary statistics of the particles to compute at each time step. See AbstractSummaryStat
for the possible values. rng
is a random number generator to use to generate random variates while filtering - a seeded random number generator may be specified to ensure reproducible results. If running with multiple threads a thread-safe generator such as Random.TaskLocalRNG
(the default) must be used.
Returns a tuple containing the state particles representing an estimate of the filtering distribution at the final observation time (with each particle a column of the returned matrix) and a named tuple containing the estimated summary statistics of this final filtering distribution. If running on multiple ranks using MPI, the returned states array will correspond only to the particles local to this rank and the summary statistics will be returned only on the master rank with all other ranks returning nothing
for their second return value.
To simulate observations from the model (which can be used to for example test the filtering algorithms) you can use the function simulate_observations_from_model
ParticleDA.simulate_observations_from_model
— Functionsimulate_observations_from_model(
init_model, input_file_path, output_file_path; rng=Random.TaskLocalRNG()
) -> Matrix
Simulate observations from the state space model initialised by the init_model
function with parameters specified by the model
key in the input YAML file at input_file_path
and save the simulated observation and state sequences to a HDF5 file at output_file_path
. rng
is a random number generator to use to generate random variates while simulating from the model - a seeded random number generator may be specified to ensure reproducible results.
The input YAML file at input_file_path
should have a simulate_observations
key with value a dictionary with keys seed
and n_time_step
corresponding to respectively the number of time steps to generate observations for from the model and the seed to use to initialise the state of the random number generator used to simulate the observations.
The simulated observation sequence is returned as a matrix with columns corresponding to the observation vectors at each time step.
The filter_type
argument to run_particle_filter
should be a concrete subtype of the ParticleFilter
abstract type.
ParticleDA.ParticleFilter
— TypeParticleFilter
Abstract type for particle filters. Currently implemented subtypes are:
ParticleDA.BootstrapFilter
— TypeBootstrapFilter <: ParticleFilter
Singleton type BootstrapFilter
. This can be used as argument of run_particle_filter
to select the bootstrap filter.
ParticleDA.OptimalFilter
— TypeOptimalFilter <: ParticleFilter
Singleton type OptimalFilter
. This can be used as argument of run_particle_filter
to select the optimal proposal filter (for conditionally linear-Gaussian models).
The summary_stat_type
argument to run_particle_filter
should be a concrete subtype of the AbstractSummaryStat
abstract type.
ParticleDA.AbstractSummaryStat
— TypeAbstractSummaryStat
Abstract type for summary statistics of particle ensemble. Concrete subtypes can be passed as the filter_type
argument to run_particle_filter
to specify which summary statistics to record and how they are computed.
See also: AbstractSumReductionSummaryStat
, AbstractCustomReductionSummaryStat
.
ParticleDA.AbstractCustomReductionSummaryStat
— TypeAbstractCustomReductionSummaryStat <: AbstractSummaryStat
Abstract type for summary statistics computed using custom MPI reductions. Allows greater flexibility in computing statistics which can support more numerically stable implementations, but at a cost of not being compatible with all CPU architectures. In particular, MPI.jl
does not currently support custom operators on Power PC and ARM architecures.
ParticleDA.AbstractSumReductionSummaryStat
— TypeAbstractSumReductionSummaryStat <: AbstractSummaryStat
Abstract type for summary statistics computed using standard MPI sum reductions. Compatible with a wider range of CPU architectures but may require less numerically stable implementations.
ParticleDA.MeanAndVarSummaryStat
— TypeMeanAndVarSummaryStat <: AbstractCustomReductionSummaryStat
Custom reduction based summary statistic type which computes the means and variances o the particle ensemble for each state dimension. On CPU architectures which do not support custom reductions NaiveMeanAndVarSummaryStat
can be used instead.
ParticleDA.MeanSummaryStat
— TypeMeanSummaryStat <: AbstractCustomReductionSummaryStat
Custom reduction based summary statistic type which computes the means of the particle ensemble for each state dimension. On CPU architectures which do not support custom reductions NaiveMeanSummaryStat
can be used instead.
ParticleDA.NaiveMeanAndVarSummaryStat
— TypeNaiveMeanAndVarSummaryStat <: AbstractSumReductionSummaryStat
Sum reduction based summary statistic type which computes the means and variances of the particle ensemble for each state dimension. The mean and variance are computed by directly accumulating the sums of the particle values, the squared particle values and number of particles on each rank, with the variance computed as the scaled difference between the sum of the squares and square of the sums. This 'naive' implementation avoids custom MPI reductions but can be numerically unstable for large ensembles or state components with large values. If custom reductions are supported by the CPU architecture in use the more numerically stable MeanAndVarSummaryStat
should be used instead.
ParticleDA.NaiveMeanSummaryStat
— TypeNaiveMeanSummaryStat <: AbstractSumReductionSummaryStat
Sum reduction based summary statistic type which computes the means of the particle ensemble for each state dimension. The mean is computed by directly accumulating the sums of the particle values and number of particles on each rank. If custom reductions are supported by the CPU architecture in use the more numerically stable MeanSummaryStat
should be used instead.
The next section details how to write the interface between the model and the particle filter.
Interfacing the model
The model needs to define a custom data structure and a few functions, that will be used by run_particle_filter
:
- a custom structure which holds the data about the model. This will be used to dispatch the methods to be defined, listed below;
- an initialisation function with the following signature:
withinit(params_dict::Dict, n_tasks::Integer) -> model
params_dict
a dictionary with the parameters of the model andn_tasks
an integer specifying the maximum number tasks (coroutines) parallelisable operations will be scheduled over. This initialisation function should create an instance of the model data structure and return it. The value ofn_tasks
can be used to create task-specific buffers for writing to when computing the model updates to avoid reallocating data structures on each function call. As tasks may be run in parallel over multiple threads, any buffers used in functions called within tasks should be unique to the task; to facilitate this functions in the model interface (see below) which may be called within tasks scheduled in parallel are passed atask_index
argument which is a integer index in1:n_tasks
which is guaranteed to be unique to a particular task and so can be used to index in to task specific buffers. - The model needs to extend the following methods, using the model type for dispatch:
ParticleDA.get_state_dimension
— FunctionParticleDA.get_state_dimension(model) -> Integer
Return the positive integer dimension of the state vector which is assumed to be fixed for all time steps.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.get_observation_dimension
— FunctionParticleDA.get_observation_dimension(model) -> Integer
Return the positive integer dimension of the observation vector which is assumed to be fixed for all time steps.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.get_state_eltype
— FunctionParticleDA.get_state_eltype(model) -> Type
Return the element type of the state vector which is assumed to be fixed for all time steps.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.get_observation_eltype
— FunctionParticleDA.get_observation_eltype(model) -> Type
Return the element type of the observation vector which is assumed to be fixed for all time steps.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.sample_initial_state!
— FunctionParticleDA.sample_initial_state!(state, model, rng, task_index=1)
Sample value for state vector from its initial distribution for model described by model
using random number generator rng
to generate random draws and writing to state
argument.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.update_state_deterministic!
— FunctionParticleDA.update_state_deterministic!(state, model, time_index, task_index=1)
Apply the deterministic component of the state time update at discrete time index time_index
for the model described by model
for the state vector state
writing the updated state back to the state
argument.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.update_state_stochastic!
— FunctionParticleDA.update_state_stochastic!(state, model, rng, task_index=1)
Apply the stochastic component of the state time update for the model described by model
for the state vector state
, using random number generator rng
to generate random draws and writing the updated state back to the state
argument.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.sample_observation_given_state!
— FunctionParticleDA.sample_observation_given_state!(
observation, state, model, rng, task_index=1
)
Simulate noisy observations of the state state
of model described by model
and write to observation
array using rng
to generate any random draws.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.get_log_density_observation_given_state
— FunctionParticleDA.get_log_density_observation_given_state(
observation, state, model, task_index=1
) -> Real
Return the logarithm of the probability density of an observation vector observation
given a state vector state
for the model associated with model
. Any additive terms that are constant with respect to the state may be neglected.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
ParticleDA.write_model_metadata
— FunctionParticleDA.write_model_metadata(file::HDF5.File, model)
Write metadata for with the model described by model
to the HDF5 file file
.
This method is intended to be extended by the user with the above signature, specifying the type of model
.
- Optionally, if the model has additive Gaussian observation and state noise, it may also extend the following methods, again using the model type for dispatch, to allow using the more statistically efficient
OptimalFilter
for filtering
ParticleDA.get_observation_mean_given_state!
— FunctionParticleDA.get_observation_mean_given_state!(
observation_mean, state, model, task_index=1
)
Compute the mean of the multivariate normal distribution on the observations given the current state and write to the first argument.
This method is intended to be extended by the user with the above signature, specifying the type of model
. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter
.
ParticleDA.get_covariance_state_noise
— FunctionParticleDA.get_covariance_state_noise(model, i, j) -> Real
Return covariance cov(U[i], U[j])
between components of the zero-mean Gaussian state noise vector U
.
This method is intended to be extended by the user with the above signature, specifying the type of model
. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter
.
ParticleDA.get_covariance_observation_noise
— FunctionParticleDA.get_covariance_observation_noise(model, i, j) -> Real
Return covariance cov(V[i], V[j])
between components of the zero-mean Gaussian observation noise vector V
.
This method is intended to be extended by the user with the above signature, specifying the type of model
. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter
.
ParticleDA.get_covariance_state_observation_given_previous_state
— FunctionParticleDA.get_covariance_state_observation_given_previous_state(
model, i, j
) -> Real
Return the covariance cov(X[i], Y[j])
between components of the state vector X = F(x) + U
and observation vector Y = H * X + V
where H
is the linear observation operator, F
the (potentially non-linear) forward operator describing the deterministic state dynamics, U
is a zero-mean Gaussian state noise vector, V
is a zero-mean Gaussian observation noise vector and x
is the state at the previous observation time.
This method is intended to be extended by the user with the above signature, specifying the type of model
. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter
.
ParticleDA.get_covariance_observation_observation_given_previous_state
— FunctionParticleDA.get_covariance_observation_observation_given_previous_state(
model, i, j
) -> Real
Return covariance cov(Y[i], Y[j])
between components of the observation vector Y = H * (F(x) + U) + V
where H
is the linear observation operator, F
the (potentially non-linear) forward operator describing the deterministic state dynamics, U
is a zero-mean Gaussian state noise vector, V
is a zero-mean Gaussian observation noise vector and x
is the state at the previous observation time.
This method is intended to be extended by the user with the above signature, specifying the type of model
. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter
.
Other function you may want to extend
ParticleDA.get_state_indices_correlated_to_observations
— FunctionParticleDA.get_state_indices_correlated_to_observations(
model
) -> AbstractVector{Int}
Return the vector containing the indices of the state vector X
which at least one of the observations Y
are correlated to, that is
i ∈ 1:getstatedimension(model)such that
cov(X[i], Y[j]) > 0for at least one
j ∈ 1:getobservationdimension(model). This is used to avoid needing to compute and store zero covariance terms. Defaults to returning all state indices which will always give correct results but will be inefficient if there are zero blocks in
cov(X, Y)`.
ParticleDA.get_initial_state_mean!
— FunctionParticleDA.get_initial_state_mean!(state_mean, model)
Compute the mean of the multivariate normal distribution on the initial state and write to the first argument.
This method is intended to be extended by the user with the above signature, specifying the type of model
. Only required for filtering linear-Gaussian models with the Kalman filter for testing.
ParticleDA.write_snapshot
— FunctionParticleDA.write_snapshot(
output_filename, model, filter_data, states, time_index, save_states
)
Write a snapshot of the model and filter states to the HDF5 file output_filename
for the model and filters described by model
and filter_data
respectively at time index time_index
, optionally saving the current ensemble of state particles represented by the two-dimensional array states
(first axis state component, second particle index) if save_states == true
. time_index == 0
corresponds to the initial model and filter states before any updates and non-time dependent model data will be written out when called with this value of time_index
.
ParticleDA.write_state
— FunctionParticleDA.write_state(
file::HDF5.File,
state::AbstractVector{T},
time_index::Int,
group_name::String,
model
)
Write the model state at time index time_index
represented by the vector state
to the HDF5 file file
with group_name
for the model represented by model
.
ParticleDA.get_covariance_initial_state
— FunctionParticleDA.get_covariance_initial_state(model, i, j) -> Real
Return covariance cov(X[i], X[j])
between components of the initial state vector X
.
This method is intended to be extended by the user with the above signature, specifying the type of model
. Only required for filtering linear-Gaussian models with the Kalman filter for testing.
ParticleDA.init_filter
— FunctionParticleDA.init_filter(filter_params, model, nprt_per_rank, ::T) -> NamedTuple
Initialise any data structures required by filter of type T
, with filtering specific parameters specified by filter_params
, state space model to perform filtering with described by model
and nprt_per_rank
particles per MPI rank.
New filter implementations should extend this method specifying T
as the appropriate singleton type for the new filter.
ParticleDA.sample_proposal_and_compute_log_weights!
— FunctionParticleDA.sample_proposal_and_compute_log_weights!(
states, log_weights, observation, model, filter_data, ::T, rng
)
Sample new values for two-dimensional array of state vectors states
from proposal distribution writing in-place to states
array and compute logarithm of unnormalized particle weights writing to log_weights
given observation vector observation
, with state space model described by model
, named tuple of filter specific data structures filter_data
, filter type T
and random number generator rng
used to generate any random draws.
New filter implementations should extend this method specifying T
as the appropriate singleton type for the new filter.
ParticleDA.write_observation
— FunctionParticleDA.write_observation(
file::HDF5.File,
observation::AbstractVector,
time_index::Int,
model
)
Write the observations at time index time_index
represented by the vector observation
to the HDF5 file file
for the model represented by model
.
ParticleDA.write_weights
— FunctionParticleDA.write_weights(
file::HDF5.File,
weights::AbstractVector{T},
time_index::Int,
model
)
Write the particle weights at time index time_index
represented by the vector weights
to the HDF5 file file
for the model represented by model
.
ParticleDA.get_initial_state_mean
— FunctionParticleDA.get_initial_state_mean(model)
Compute the mean of the multivariate normal distribution on the initial state.
Input parameters
You can store the input parameters in an YAML file with the following structure
filter:
key1: value1
model:
model_name1:
key2: value2
key3: value3
model_name2:
key4: value4
key5: value5
The parameters under filter
are related to the particle filter, under model
you can specify the parameters for different models.
The particle filter parameters are saved in the following data structure:
ParticleDA.FilterParameters
— TypeFilterParameters()
Parameters for ParticleDA run. Keyword arguments:
master_rank
: Id of MPI rank that performs serial computationsverbose::Bool
: Flag to control whether to write outputoutput_filename::String
: Name of output filenprt::Int
: Number of particles for particle filterenable_timers::Bool
: Flag to control run time measurementsparticle_save_time_indices
: Set of time indices to save particles atseed
: Seed to initialise state of random number generator used for filteringn_tasks
: Number of tasks to use for running parallelisable operations. Positive integers indicate the number of tasks directly, while the absolute value of negative integers indicate the number of task to use per-thread (as reported byThreads.nthreads()
). Using multiple tasks per thread will improve the ability of the scheduler to balance load across threads but potentially increase overheads. If simulation of the model being filtered use multiple threads then it may be beneficial to set then_tasks = 1
to avoid too much contention between threads.
Example: estimating the state in a tsunami model
A full example of a model interfacing ParticleDA
is available in test/models/llw2d.jl
. This model represents a simple two dimensional simulation of tsunami dynamics and is partly based on the tsunami data assimilation code by Takuto Maeda. The particle filter can be run with observations simulated from the model as follows
# Load ParticleDA
using ParticleDA
# Save some variables for later use
test_dir = joinpath(dirname(pathof(ParticleDA)), "..", "test")
llw2d_src = joinpath(test_dir, "models", "llw2d.jl")
input_file = joinpath(test_dir, "integration_test_1.yaml")
observation_file = tempname()
# Instantiate the test environment
using Pkg
Pkg.activate(test_dir)
Pkg.instantiate()
# Include the sample model source code and load it
include(llw2d_src)
using .LLW2d
# Simulate observations from the model to use
simulate_observations_from_model(LLW2d.init, input_file, observation_file)
# Run the (optimal proposal) particle filter with simulated observations computing the
# mean and variance of the particle ensemble. On non-Intel architectures you may need
# to use NaiveMeanAndVarSummaryStat instead
final_states, final_statistics = run_particle_filter(
LLW2d.init, input_file, observation_file, OptimalFilter, MeanAndVarSummaryStat
)
Parameters of the test model
The Input parameters section shows how to pass the input parameters for the filter and the model. For the model included in the test suite, called llw2d
, you can set the following parameters:
Main.LLW2d.LLW2dModelParameters
— TypeLLW2dModelParameters()
Parameters for the linear long wave two-dimensional (LLW2d) model. Keyword arguments:
nx::Int
: Number of grid points in the x directionny::Int
: Number of grid points in the y directionx_length::AbstractFloat
: Domain size in metres in the x directiony_length::AbstractFloat
: Domain size in metres in the y directiondx::AbstractFloat
: Distance in metres between grid points in the x directiondy::AbstractFloat
: Distance in metrtes between grid points in the y directionstation_filename::String
: Name of input file for station coordinatesn_stations_x::Int
: Number of observation stations in the x direction (if using regular grid)n_stations_y::Int
: Number of observation stations in the y direction (if using regular grid)station_distance_x::Float
: Distance in metres between stations in the x direction (if using regular grid)station_distance_y::Float
: Distance in metres between stations in the y direction (if using regular grid)station_boundary_x::Float
: Distance in metres between bottom left edge of box and first station in the x direction (if using regular grid)station_boundary_y::Float
: Distance in metres between bottom left edge of box and first station in the y direction (if using regular grid)n_integration_step::Int
: Number of sub-steps to integrate the forward model per time steptime_step::AbstractFloat
: Time step length in secondspeak_position::Vector{AbstractFloat}
: The `[x, y] coordinates in metres of the initial wave peakpeak_height::AbstractFloat
: The height in metres of the initial wave peaksource_size::AbstractFloat
: Cutoff distance in metres from the peak for the initial wavebathymetry_setup::AbstractFloat
: Bathymetry set-uplambda::AbstractFloat
: Length scale for Matérn covariance kernel in background noisenu::AbstractFloat
: Smoothess parameter for Matérn covariance kernel in background noisesigma::AbstractFloat
: Marginal standard deviation for Matérn covariance kernel in background noiselambda_initial_state::AbstractFloat
: Length scale for Matérn covariance kernel in initial state of particlesnu_initial_state::AbstractFloat
: Smoothess parameter for Matérn covariance kernel in initial state of particlessigma_initial_state::AbstractFloat
: Marginal standard deviation for Matérn covariance kernel in initial state of particlespadding::Int
: Min padding for circulant embedding gaussian random field generatorprimes::Int
: Whether the size of the minimum circulant embedding of the covariance matrix can be written as a product of small primes (2, 3, 5 and 7). Default istrue
.use_peak_initial_state_mean::Bool
: Whether to set mean of initial height field to a wave peak (true) or to all zeros (false). In both cases the initial mean of the other state variables is zero.absorber_thickness_fraction::Float
: Thickness of absorber for sponge absorbing boundary conditions, fraction of grid sizeboundary_damping::Float
: Damping for boundariescutoff_depth::Float
: Shallowest water depthobs_noise_std::Vector
: Standard deviations of noise added to observations of the true stateobserved_state_var_indices::Vector
: Vector containing the indices of the observed state variables (1: height, 2: velocity x-component, 3: velocity y-component)
Observation station coordinates
The coordinates of the observation stations can be set in two different ways.
Provide the coordinates in an input file. Set the parameter
station_filename
to the name of your input file. The input file is in plain text, the format is one row per station containing x, y - coordinates in metres. Here is a simple example with two stations# Comment lines starting with '#' will be ignored by the code # This file contains two stations: at [1km, 1km] and [2km, 2km] 1.0e3, 1.0e3 2.0e3, 2.0e3
Provide parameters for an equispaced rectilinear grid of observation stations. The values of these parameters should then be set:
n_stations_x
: Number of observation stations in the x directionn_stations_y
: Number of observation stations in the y directionstation_distance_x
: Distance between stations in the x direction (m)station_distance_y
: Distance between stations in the y direction (m)station_boundary_x
: Distance between bottom left edge of box and first station in the x direction (m)station_boundary_y
: Distance between bottom left edge of box and first station in the y direction (m)
As an example, one could set
n_stations_x=2, n_stations_y=2, station_distance_x=1.0e3, station_distance_y=1.0e3, station_boundary_x=10.0e3, station_boundary_y=10.0e3,
to generate 4 stations at
[10km, 10km]
,[10km, 11km]
,[11km, 10km]
and[11km, 11km]
.
Output
If the filter parameter verbose
is set to true
, run_particle_filter
will produce an HDF5 file in the run directory. The file name is particle_da.h5
by default (this is configurable using the output_filename
filter parameter). The file contains the summary statistics of the estimate state distribution (by default the mean and variance), particle weights, parameters used, and other metadata at each time step observation were assimilated. To read the output file, use the HDF5 library.
A basic plotting tool is provided in a Jupyter notebook. This is intended as a template to build more sophisticated postprocessing tools, but can be used for some simple analysis. Set the variable timestamp
in the third cell to plot different time slices from the output file. More functionality may be added as the package develops.
Running in parallel
The particle state update is parallelised using both MPI and threading. According to our preliminary tests both methods work well at small scale. To use the threading, set the environment variable JULIA_NUM_THREADS
to the number of threads you want to use before starting Julia and then call the run_particle_filter
function normally. You can check the number of threads julia has available by calling in Julia's REPL
Threads.nthreads()
To use the MPI parallelisation, write a Julia script that calls the run_particle_filter
function for the relevant model and observations and run it in an Unix shell with
mpirun -np <your_number_of_processes> julia <your_julia_script>
Note that the parallel performance may vary depending on the performance of the algorithm. In general, a degeneracy of the particle weights will lead to poor load balance and parallel performance. See this issue for more details.
Testing
We have a basic test suite for ParticleDA.jl
. You can run the tests by entering the package manager mode in Julia's REPL with ]
and running the command
test ParticleDA
License
The ParticleDA.jl
package is licensed under the MIT "Expat" License.