Source code for kkf.systems

"""Dynamical systems module.

Defines classes and utilities for representing general dynamical systems
with state transitions and measurement functions.
"""

import warnings
from typing import Callable

import numpy as np
from numpy.typing import NDArray
from scipy.stats import rv_continuous


[docs] class DynamicalSystem: """ A class representing a general dynamical system with state and measurement equations. This class encapsulates the dynamics, measurements, and associated probability distributions for both state and measurement noise. Can be considered as: Discrete time: Dynamics: x_{k+1} = f(x_{k}) + w_{k} Measurements: y_{k} = g(x_{k}) + v_{k} Continous time: Dynamics: x'(t) = f(x(t)) + w(t) Measurements: y(t) = g(x(t)) + v(t) Attributes ---------- nx : int Dimension of the state space. ny : int Dimension of the measurement/output space. f : Callable State transition function (dynamics). g : Callable Measurement/output function. dist_X : rv_continuous Probability distribution for initial states. dist_dyn : rv_continuous Probability distribution for dynamics noise. dist_obs : rv_continuous Probability distribution for measurement noise. discrete_time : bool Indicates if the system is in discrete time, if False the system is in continous time. Notes ----- The functions f and g model the *noise-free* dynamics and measurements; the noise terms w and v are added separately (see ``dynamics`` and ``measurements``). They should have the following signatures: - f(x: ndarray) -> ndarray - g(x: ndarray) -> ndarray where x is the state vector. Both must also accept a batch of column vectors of shape (nx, n_features), since ``KoopmanOperator`` evaluates them on the full dictionary at once. """ def __init__( self, nx: int, ny: int, f: Callable[[NDArray[np.float64]], NDArray[np.float64]], g: Callable[[NDArray[np.float64]], NDArray[np.float64]], dist_X: rv_continuous, dist_dyn: rv_continuous, dist_obs: rv_continuous, discrete_time: bool, ) -> None: if nx <= 0: raise ValueError(f"State dimension nx must be positive, got {nx}") if ny <= 0: raise ValueError(f"Measurement dimension ny must be positive, got {ny}") self.nx = nx self.ny = ny self.f = f self.g = g self.dist_X = dist_X self.dist_dyn = dist_dyn self.dist_obs = dist_obs self.discrete_time = discrete_time
[docs] def dynamics(self, x: NDArray[np.float64], w: NDArray[np.float64]) -> NDArray[np.float64]: """ Apply the state transition function. Parameters ---------- x : np.ndarray Current state vector. w : np.ndarray Process noise vector. Returns ------- np.ndarray Next state vector. """ return self.f(x) + w
[docs] def measurements(self, x: NDArray[np.float64], v: NDArray[np.float64]) -> NDArray[np.float64]: """ Apply the measurement function. Parameters ---------- x : np.ndarray Current state vector. v : np.ndarray Measurement noise vector. Returns ------- np.ndarray Measurement/output vector. """ return self.g(x) + v
[docs] def sample_state(self, size: int = 1) -> NDArray[np.float64]: """ Sample from the state distribution. Parameters ---------- size : int, optional Number of samples to draw. Default is 1. Returns ------- np.ndarray Sampled state(s). """ return self.dist_X.rvs(size=size).reshape((size, self.nx))
[docs] def create_additive_system( nx: int, ny: int, f: Callable[[NDArray[np.float64], NDArray[np.float64]], NDArray[np.float64]], g: Callable[[NDArray[np.float64], NDArray[np.float64]], NDArray[np.float64]], dist_X: rv_continuous, dist_dyn: rv_continuous, dist_obs: rv_continuous, N_samples: int, discrete_time: bool = True, ) -> DynamicalSystem: """ Create an additive dynamical system where noise is added to the state and observation functions. Parameters ---------- nx : int Dimension of the state space. ny : int Dimension of the measurement space. f : Callable State transition function (with noise). g : Callable Measurement function (with noise). dist_X : rv_continuous Initial state distribution. dist_dyn : rv_continuous Dynamics noise distribution. dist_obs : rv_continuous Measurement noise distribution. N_samples: int Number of samples to generate empirical mean of dynamics and observation. discrete_time : bool, optional Indicates if the system is in discrete time. Default is True. Returns ------- DynamicalSystem New dynamical system instance with additive noise. Notes ----- The resulting system has the form: x[k+1] = f(x[k]) + w[k] y[k] = g(x[k]) + v[k] where w and v are noise terms. .. deprecated:: ``create_additive_system`` is deprecated and will be removed in a future major release. The state-dependent noise distributions it builds are incompatible with the covariance sampling used by ``apply_koopman_kalman_filter`` (which calls ``dist_dyn.rvs(size=...)`` without a state argument). Construct a :class:`DynamicalSystem` directly instead, providing ``f``/``g`` as the noise-free maps. """ warnings.warn( "create_additive_system is deprecated and will be removed in a future " "release; it is incompatible with apply_koopman_kalman_filter. Build a " "DynamicalSystem directly instead.", DeprecationWarning, stacklevel=2, ) def new_f(x: NDArray[np.float64]) -> NDArray[np.float64]: return np.mean([f(x.reshape((len(x), 1)), dist_dyn.rvs(N_samples))], axis=1) def new_g(x: NDArray[np.float64]) -> NDArray[np.float64]: return np.mean([g(x.reshape((len(x), 1)), dist_obs.rvs(N_samples))], axis=1) class DynDist: def __init__(self) -> None: pass def rvs(self, x: NDArray[np.float64], size: int = 1) -> NDArray[np.float64]: return f(x, dist_dyn.rvs(size=size)) - new_f(x) class ObsDist: def __init__(self) -> None: pass def rvs(self, x: NDArray[np.float64], size: int = 1) -> NDArray[np.float64]: return g(x, dist_obs.rvs(size=size)) - new_g(x) return DynamicalSystem( nx, ny, new_f, new_g, dist_X, DynDist(), ObsDist(), discrete_time=discrete_time )