Source code for kkf.koopman
"""Koopman operator approximation module.
Implements kernel-based Extended Dynamic Mode Decomposition (kEDMD)
for computing Koopman operator approximations.
"""
import warnings
from typing import Callable, Optional
import numpy as np
from numpy.typing import NDArray
from sklearn.gaussian_process import GaussianProcessRegressor
from .systems import DynamicalSystem
[docs]
class KoopmanOperator:
"""
Implementation of Koopman operator approximation using kernel-based Extended
Dynamic Mode Decomposition (kEDMD).
This class provides methods to compute finite-dimensional approximations of the
Koopman operator for nonlinear dynamical systems.
Attributes
----------
kernel_function : Callable
Kernel function for computing feature space mappings.
dynamical_system : DynamicalSystem
The underlying dynamical system.
X : Optional[np.ndarray]
Dictionary of states used for kernel computations.
phi : Optional[Callable]
Feature map function.
U : Optional[np.ndarray]
Koopman operator matrix.
G : Optional[np.ndarray]
Gram matrix.
C : Optional[np.ndarray]
Output matrix.
B : Optional[np.ndarray]
State-to-feature space transformation matrix.
Notes
-----
The Koopman operator framework lifts nonlinear dynamics to a linear setting
in a higher-dimensional feature space. This implementation uses kernel methods
to compute the necessary feature spaces and operators.
"""
def __init__(
self,
kernel_function: Callable[[NDArray[np.float64], NDArray[np.float64]], NDArray[np.float64]],
dynamical_system: DynamicalSystem,
) -> None:
self.kernel_function = kernel_function
self.dynamical_system = dynamical_system
self.X: Optional[NDArray[np.float64]] = None
self.phi: Optional[Callable] = None
self.U: Optional[NDArray[np.float64]] = None
self.G: Optional[NDArray[np.float64]] = None
self.C: Optional[NDArray[np.float64]] = None
self.B: Optional[NDArray[np.float64]] = None
[docs]
def compute_edmd(
self,
n_features: int,
optimize: bool = False,
n_restarts_optimizer: int = 10,
reg: float = 1e-10,
) -> None:
"""
Compute the kernel-based Extended Dynamic Mode Decomposition (kEDMD).
This method constructs finite-dimensional approximations of the Koopman
operator and associated matrices using kernel methods.
Parameters
----------
n_features : int
Number of features to use in the approximation.
optimize : bool
Whether to optimize the kernel function. If True, the method will
optimize the kernel function using Gaussian Process Regression. If False,
the provided kernel function will be used without optimization. Default is False
(fast; enable it explicitly to fit kernel hyperparameters).
n_restarts_optimizer : int
Number of restarts for the optimizer. If optimize is False, will be ignored. Default is 10.
reg : float
Tikhonov (jitter) regularization added to the diagonal of the Gram matrix
before inversion, scaled by its mean diagonal. Kernel Gram matrices are often
ill-conditioned; a small positive value keeps the inversion numerically stable.
Increase it if you see unstable estimates. Default is 1e-10.
Notes
-----
The method performs the following steps:
1. Generates dictionary points using the state distribution
2. Constructs the feature map using the kernel function
3. Computes the Gram matrix and its inverse
4. Constructs the Koopman operator approximation
5. Computes output and state transformation matrices
"""
# Extract system components
f, g = self.dynamical_system.f, self.dynamical_system.g
# Generate dictionary points
self.X = self.dynamical_system.sample_state(n_features)
# Optimize kernel function
if optimize:
self.optimize_kernel(X=self.X, n_restarts_optimizer=n_restarts_optimizer)
# Define feature map
self.phi = lambda x: self.kernel_function(x, self.X)[0]
# Compute Gram matrix, regularizing the diagonal to keep the inversion stable
self.G = self.kernel_function(self.X, self.X)
jitter = reg * np.mean(np.diag(self.G))
G_inv = np.linalg.inv(self.G + jitter * np.eye(self.G.shape[0]))
# Compute Koopman operator approximation
next_states = f(self.X.T).T
self.U = self.kernel_function(self.X, next_states) @ G_inv
# Compute output and state transformation matrices
self.C = g(self.X.T) @ G_inv
self.B = self.X.T @ G_inv
[docs]
def get_feature_dimension(self) -> Optional[int]:
"""
Get the dimension of the feature space.
Returns
-------
Optional[int]
Dimension of the feature space, or None if EDMD hasn't been computed.
"""
return self.X.shape[0] if self.X is not None else None
[docs]
def optimize_kernel(self, X: NDArray[np.float64], n_restarts_optimizer: int = 10) -> None:
"""
Optimize the kernel function for the Koopman operator.
Parameters
----------
X : np.ndarray
Set of points.
n_restarts_optimizer : int
Number of restarts for the optimizer. Default is 10.
Notes
-----
This method uses Gaussian Process Regression to optimize the kernel function.
"""
# Compute the output of the dynamical system
y = self.dynamical_system.f(X.T).T
# Fit the Gaussian process
gp = GaussianProcessRegressor(
kernel=self.kernel_function, n_restarts_optimizer=n_restarts_optimizer
)
gp.fit(X.T, y.T)
# Update with optimal kernel
self.kernel_function = gp.kernel_
[docs]
def opt_kernel(self, X: NDArray[np.float64], n_restarts_optimizer: int = 10) -> None:
"""Deprecated alias for :meth:`optimize_kernel`.
.. deprecated::
Use :meth:`optimize_kernel` instead. This alias will be removed in a
future major release.
"""
warnings.warn(
"KoopmanOperator.opt_kernel is deprecated; use optimize_kernel instead.",
DeprecationWarning,
stacklevel=2,
)
self.optimize_kernel(X=X, n_restarts_optimizer=n_restarts_optimizer)
# Descriptive, discoverable aliases for the single-letter matrices computed by
# ``compute_edmd``. The short names (U, G, C, B, X, phi) are kept for backward
# compatibility and mathematical convention.
@property
def koopman_matrix(self) -> Optional[NDArray[np.float64]]:
"""Koopman operator approximation (alias of ``U``)."""
return self.U
@property
def gram_matrix(self) -> Optional[NDArray[np.float64]]:
"""Kernel Gram matrix (alias of ``G``)."""
return self.G
@property
def output_matrix(self) -> Optional[NDArray[np.float64]]:
"""Feature-to-output matrix (alias of ``C``)."""
return self.C
@property
def state_matrix(self) -> Optional[NDArray[np.float64]]:
"""Feature-to-state matrix (alias of ``B``)."""
return self.B
@property
def dictionary(self) -> Optional[NDArray[np.float64]]:
"""Dictionary of sampled states (alias of ``X``)."""
return self.X
@property
def feature_map(self) -> Optional[Callable]:
"""Feature map function (alias of ``phi``)."""
return self.phi