"""Dynamical systems module.
Defines classes and utilities for representing general dynamical systems
with state transitions and measurement functions.
"""
import warnings
from typing import Callable
import numpy as np
from numpy.typing import NDArray
from scipy.stats import rv_continuous
[docs]
class DynamicalSystem:
"""
A class representing a general dynamical system with state and measurement equations.
This class encapsulates the dynamics, measurements, and associated probability
distributions for both state and measurement noise. Can be considered as:
Discrete time:
Dynamics: x_{k+1} = f(x_{k}) + w_{k}
Measurements: y_{k} = g(x_{k}) + v_{k}
Continous time:
Dynamics: x'(t) = f(x(t)) + w(t)
Measurements: y(t) = g(x(t)) + v(t)
Attributes
----------
nx : int
Dimension of the state space.
ny : int
Dimension of the measurement/output space.
f : Callable
State transition function (dynamics).
g : Callable
Measurement/output function.
dist_X : rv_continuous
Probability distribution for initial states.
dist_dyn : rv_continuous
Probability distribution for dynamics noise.
dist_obs : rv_continuous
Probability distribution for measurement noise.
discrete_time : bool
Indicates if the system is in discrete time, if False the system is in continous time.
Notes
-----
The functions f and g model the *noise-free* dynamics and measurements;
the noise terms w and v are added separately (see ``dynamics`` and
``measurements``). They should have the following signatures:
- f(x: ndarray) -> ndarray
- g(x: ndarray) -> ndarray
where x is the state vector. Both must also accept a batch of column
vectors of shape (nx, n_features), since ``KoopmanOperator`` evaluates
them on the full dictionary at once.
"""
def __init__(
self,
nx: int,
ny: int,
f: Callable[[NDArray[np.float64]], NDArray[np.float64]],
g: Callable[[NDArray[np.float64]], NDArray[np.float64]],
dist_X: rv_continuous,
dist_dyn: rv_continuous,
dist_obs: rv_continuous,
discrete_time: bool,
) -> None:
if nx <= 0:
raise ValueError(f"State dimension nx must be positive, got {nx}")
if ny <= 0:
raise ValueError(f"Measurement dimension ny must be positive, got {ny}")
self.nx = nx
self.ny = ny
self.f = f
self.g = g
self.dist_X = dist_X
self.dist_dyn = dist_dyn
self.dist_obs = dist_obs
self.discrete_time = discrete_time
[docs]
def dynamics(self, x: NDArray[np.float64], w: NDArray[np.float64]) -> NDArray[np.float64]:
"""
Apply the state transition function.
Parameters
----------
x : np.ndarray
Current state vector.
w : np.ndarray
Process noise vector.
Returns
-------
np.ndarray
Next state vector.
"""
return self.f(x) + w
[docs]
def measurements(self, x: NDArray[np.float64], v: NDArray[np.float64]) -> NDArray[np.float64]:
"""
Apply the measurement function.
Parameters
----------
x : np.ndarray
Current state vector.
v : np.ndarray
Measurement noise vector.
Returns
-------
np.ndarray
Measurement/output vector.
"""
return self.g(x) + v
[docs]
def sample_state(self, size: int = 1) -> NDArray[np.float64]:
"""
Sample from the state distribution.
Parameters
----------
size : int, optional
Number of samples to draw. Default is 1.
Returns
-------
np.ndarray
Sampled state(s).
"""
return self.dist_X.rvs(size=size).reshape((size, self.nx))
[docs]
def create_additive_system(
nx: int,
ny: int,
f: Callable[[NDArray[np.float64], NDArray[np.float64]], NDArray[np.float64]],
g: Callable[[NDArray[np.float64], NDArray[np.float64]], NDArray[np.float64]],
dist_X: rv_continuous,
dist_dyn: rv_continuous,
dist_obs: rv_continuous,
N_samples: int,
discrete_time: bool = True,
) -> DynamicalSystem:
"""
Create an additive dynamical system where noise is added to the state and observation functions.
Parameters
----------
nx : int
Dimension of the state space.
ny : int
Dimension of the measurement space.
f : Callable
State transition function (with noise).
g : Callable
Measurement function (with noise).
dist_X : rv_continuous
Initial state distribution.
dist_dyn : rv_continuous
Dynamics noise distribution.
dist_obs : rv_continuous
Measurement noise distribution.
N_samples: int
Number of samples to generate empirical mean of dynamics and observation.
discrete_time : bool, optional
Indicates if the system is in discrete time. Default is True.
Returns
-------
DynamicalSystem
New dynamical system instance with additive noise.
Notes
-----
The resulting system has the form:
x[k+1] = f(x[k]) + w[k]
y[k] = g(x[k]) + v[k]
where w and v are noise terms.
.. deprecated::
``create_additive_system`` is deprecated and will be removed in a future
major release. The state-dependent noise distributions it builds are
incompatible with the covariance sampling used by
``apply_koopman_kalman_filter`` (which calls ``dist_dyn.rvs(size=...)``
without a state argument). Construct a :class:`DynamicalSystem` directly
instead, providing ``f``/``g`` as the noise-free maps.
"""
warnings.warn(
"create_additive_system is deprecated and will be removed in a future "
"release; it is incompatible with apply_koopman_kalman_filter. Build a "
"DynamicalSystem directly instead.",
DeprecationWarning,
stacklevel=2,
)
def new_f(x: NDArray[np.float64]) -> NDArray[np.float64]:
return np.mean([f(x.reshape((len(x), 1)), dist_dyn.rvs(N_samples))], axis=1)
def new_g(x: NDArray[np.float64]) -> NDArray[np.float64]:
return np.mean([g(x.reshape((len(x), 1)), dist_obs.rvs(N_samples))], axis=1)
class DynDist:
def __init__(self) -> None:
pass
def rvs(self, x: NDArray[np.float64], size: int = 1) -> NDArray[np.float64]:
return f(x, dist_dyn.rvs(size=size)) - new_f(x)
class ObsDist:
def __init__(self) -> None:
pass
def rvs(self, x: NDArray[np.float64], size: int = 1) -> NDArray[np.float64]:
return g(x, dist_obs.rvs(size=size)) - new_g(x)
return DynamicalSystem(
nx, ny, new_f, new_g, dist_X, DynDist(), ObsDist(), discrete_time=discrete_time
)