Source code for kkf.filter

"""Koopman Kalman Filter algorithm implementation.

Combines Koopman operator theory with Kalman filtering for nonlinear
state estimation.
"""

from typing import TYPE_CHECKING, Any, Callable, Tuple

import numpy as np
from numpy.linalg import pinv
from numpy.typing import NDArray

from .covariances import (
    compute_dynamics_covariance,
    compute_initial_covariance,
    compute_observation_covariance,
)
from .solution import KoopmanKalmanFilterSolution

if TYPE_CHECKING:
    from .koopman import KoopmanOperator
    from .systems import DynamicalSystem


[docs] def apply_koopman_kalman_filter( koopman_operator: "KoopmanOperator", observations: NDArray[np.float64], initial_distribution: Any, n_features: int, optimize: bool = False, n_restarts_optimizer: int = 10, noise_samples: int = 100, reg: float = 1e-10, ) -> KoopmanKalmanFilterSolution: """ Implementation of the Koopman-Kalman Filter algorithm. This function combines Koopman operator theory with Kalman filtering to perform state estimation for nonlinear dynamical systems. It operates by lifting the state space to a higher-dimensional feature space where the dynamics are approximately linear. Parameters ---------- koopman_operator : object Object containing Koopman operator methods and attributes: - compute_edmd(n_features): Extended Dynamic Mode Decomposition method - dynamical_system: Dynamical system object - U, B, C: Matrices for Koopman approximation - phi: Feature map function observations : np.ndarray Array of shape (n_timesteps, n_outputs) containing system measurements. initial_distribution : object Distribution object for initial state with methods: - mean: Returns mean of initial state - rvs: Random sampling method n_features : int Number of features in the lifted space. optimize : bool, optional Whether to optimize the kernel parameters. Default is False. n_restarts_optimizer : int, optional Number of restarts for kernel optimization. Default is 10. noise_samples : int, optional Number of samples for noise covariance estimation. Default is 100. reg : float, optional Jitter regularization for the Koopman Gram-matrix inversion. Default is 1e-10. Returns ------- KoopmanKalmanFilterSolution Object containing all filter estimates and covariances. Notes ----- The algorithm consists of three main phases: 1. Initialization: - Sets up initial states and covariances - Computes Koopman approximation 2. Prediction Step: - Projects state forward using system dynamics - Updates covariances using Koopman operator 3. Update Step: - Incorporates new measurements - Updates state and covariance estimates The filter operates in both the original state space (x) and the lifted feature space (z), maintaining estimates and covariances in both spaces. """ # Compute Koopman approximation koopman_operator.compute_edmd(n_features, optimize, n_restarts_optimizer, reg) dynamical_system = koopman_operator.dynamical_system # Extract system and Koopman components state_dynamics = dynamical_system.f measurement_model = dynamical_system.g U, B, C = koopman_operator.U, koopman_operator.B, koopman_operator.C phi = koopman_operator.phi # --- Initialization --- mean_val = initial_distribution.mean # Initial state estimate x0 = np.atleast_1d(np.asarray(mean_val() if callable(mean_val) else mean_val)) z0 = phi(x0) # Initial state in feature space # Get problem dimensions n_timesteps, n_states = observations.shape[0], len(x0) n_outputs = observations.shape[1] # Initialize state arrays x_minus, x_plus = _initialize_state_arrays(n_timesteps, n_states) z_minus, z_plus = _initialize_state_arrays(n_timesteps, n_features) # Initialize covariance arrays Px_minus, Px_plus = _initialize_covariance_arrays(n_timesteps, n_states) Pz_minus, Pz_plus = _initialize_covariance_arrays(n_timesteps, n_features) # Initialize filter matrices S = np.zeros((n_timesteps, n_outputs, n_outputs)) # Innovation covariance K = np.zeros((n_timesteps, n_features, n_outputs)) # Kalman gain # Set initial conditions x_minus[0, :], x_plus[0, :] = x0, x0 z_minus[0, :], z_plus[0, :] = z0, z0 # Initialize covariances initial_covariance = compute_initial_covariance( x_minus[0, :], n_features, initial_distribution, koopman_operator, noise_samples ) Pz_minus[0, :, :], Pz_plus[0, :, :] = initial_covariance, initial_covariance # --- Main filter loop --- for t in range(1, n_timesteps): # Prediction Step x_minus[t], z_minus[t], Pz_minus[t] = _prediction_step( x_plus[t - 1], z_plus[t - 1], Pz_plus[t - 1], state_dynamics, phi, U, dynamical_system, koopman_operator, n_features, noise_samples, ) # Update Step x_plus[t], z_plus[t], Pz_plus[t], S[t], K[t] = _update_step( x_minus[t], z_minus[t], Pz_minus[t], observations[t], measurement_model, C, B, dynamical_system, noise_samples, ) # Update state-space covariances Px_minus[t], Px_plus[t] = _update_state_covariances(Pz_minus[t], Pz_plus[t], B) return KoopmanKalmanFilterSolution(x_plus, x_minus, Pz_plus, Pz_minus, Px_plus, Px_minus, S, K)
def _initialize_state_arrays( n_timesteps: int, n_dim: int ) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """Initialize arrays for state estimates.""" return (np.zeros((n_timesteps, n_dim)), np.zeros((n_timesteps, n_dim))) def _initialize_covariance_arrays( n_timesteps: int, n_dim: int ) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """Initialize arrays for covariance matrices.""" return (np.zeros((n_timesteps, n_dim, n_dim)), np.zeros((n_timesteps, n_dim, n_dim))) def _prediction_step( x_prev: NDArray[np.float64], z_prev: NDArray[np.float64], Pz_prev: NDArray[np.float64], state_dynamics: Callable, phi: Callable, U: NDArray[np.float64], dynamical_system: "DynamicalSystem", koopman_operator: "KoopmanOperator", n_features: int, noise_samples: int, ) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]: """Perform the prediction step of the filter.""" # Compute dynamics covariance Qz = compute_dynamics_covariance( x_prev, n_features, dynamical_system, koopman_operator, noise_samples ) # Predict states x_pred = state_dynamics(x_prev) z_pred = phi(x_pred) # Predict covariance Pz_pred = U @ Pz_prev @ U.T + Qz return x_pred, z_pred, Pz_pred def _update_step( x_minus: NDArray[np.float64], z_minus: NDArray[np.float64], Pz_minus: NDArray[np.float64], observation: NDArray[np.float64], measurement_model: Callable, C: NDArray[np.float64], B: NDArray[np.float64], dynamical_system: "DynamicalSystem", noise_samples: int, ) -> Tuple[ NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], ]: """Perform the update step of the filter.""" # Compute measurement covariance Rz = compute_observation_covariance(x_minus, len(observation), dynamical_system, noise_samples) # Compute innovation innovation = observation - measurement_model(x_minus) # Compute innovation covariance S = C @ Pz_minus @ C.T + Rz # Compute Kalman gain K = Pz_minus @ C.T @ pinv(S) # Update states z_plus = z_minus + K @ innovation x_plus = B @ z_plus # Update covariance Pz_plus = (np.eye(len(z_minus)) - K @ C) @ Pz_minus return x_plus, z_plus, Pz_plus, S, K def _update_state_covariances( Pz_minus: NDArray[np.float64], Pz_plus: NDArray[np.float64], B: NDArray[np.float64] ) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """Update covariances in the state space.""" Px_minus = B @ Pz_minus @ B.T Px_plus = B @ Pz_plus @ B.T return Px_minus, Px_plus