Source code for kkf.covariances
from typing import Any
import numpy as np
from numpy.typing import NDArray
from scipy.stats import rv_continuous
[docs]
def compute_initial_covariance(
x: NDArray[np.float64],
n_features: int,
initial_distribution: rv_continuous,
koopman_operator: Any,
n_samples: int,
) -> NDArray[np.float64]:
"""
Compute the covariance matrix for the initial distribution in feature space.
This function samples from an initial distribution and computes the covariance
matrix of the transformed samples using a Koopman operator's feature map.
Parameters
----------
x : np.ndarray
Current state vector or reference point.
n_features : int
Number of features in the transformed space.
initial_distribution : scipy.stats.rv_continuous
Initial probability distribution to sample from.
koopman_operator : object
Object containing the feature map phi method for state transformation.
Must have a method phi(x) that maps states to feature space.
n_samples : int
Number of samples to use for covariance estimation.
Returns
-------
np.ndarray
Covariance matrix of size (n_features, n_features) in the transformed space.
Notes
-----
The function performs the following steps:
1. Samples from the initial distribution
2. Applies the Koopman operator's feature map to each sample
3. Computes the covariance matrix of the transformed samples
"""
# Sample from initial distribution
samples = initial_distribution.rvs(size=n_samples).reshape((n_samples, len(x)))
# Initialize array for transformed samples
transformed_samples = np.zeros((n_samples, n_features))
# Transform each sample using the feature map
for i in range(n_samples):
transformed_samples[i, :] = koopman_operator.phi(samples[i, :])
# Compute and return covariance matrix (ensure 2D even for a single feature)
return np.atleast_2d(np.cov(transformed_samples, rowvar=False))
[docs]
def compute_dynamics_covariance(
x: NDArray[np.float64], n_features: int, dynamics: Any, koopman_operator: Any, n_samples: int
) -> NDArray[np.float64]:
"""
Compute the covariance matrix for the system dynamics in feature space.
This function samples from the dynamics distribution, applies the system
dynamics, and computes the covariance matrix of the transformed results.
Parameters
----------
x : np.ndarray
Current state vector or reference point.
n_features : int
Number of features in the transformed space.
dynamics : object
Dynamical system object containing:
- dist_dyn: scipy.stats.rv_continuous distribution for dynamics noise
- dynamics(x, w): method implementing the system dynamics
koopman_operator : object
Object containing the feature map phi method for state transformation.
Must have a method phi(x) that maps states to feature space.
n_samples : int
Number of samples to use for covariance estimation.
Returns
-------
np.ndarray
Covariance matrix of size (n_features, n_features) in the transformed space.
Notes
-----
The function performs the following steps:
1. Samples from the dynamics noise distribution
2. Applies the system dynamics to each sample
3. Transforms the results using the Koopman operator's feature map
4. Computes the covariance matrix of the transformed samples
"""
# Sample from dynamics distribution
noise_samples = dynamics.dist_dyn.rvs(size=n_samples).reshape((n_samples, len(x)))
# Initialize array for transformed samples
transformed_samples = np.zeros((n_samples, n_features))
# Apply dynamics and transform each sample
for i in range(n_samples):
state_evolution = dynamics.dynamics(x, noise_samples[i, :])
transformed_samples[i, :] = koopman_operator.phi(state_evolution)
# Compute and return covariance matrix (ensure 2D even for a single feature)
return np.atleast_2d(np.cov(transformed_samples, rowvar=False))
[docs]
def compute_observation_covariance(
x: NDArray[np.float64], n_outputs: int, dynamics: Any, n_samples: int
) -> NDArray[np.float64]:
"""
Compute the covariance matrix for the observation/measurement process.
This function samples from the measurement noise distribution and computes
the covariance matrix of the measurement process.
Parameters
----------
x : np.ndarray
Current state vector or reference point.
n_outputs : int
Number of measurement outputs.
dynamics : object
Dynamical system object containing:
- dist_obs: scipy.stats.rv_continuous distribution for measurement noise
- measurements(x, w): method implementing the measurement process
n_samples : int
Number of samples to use for covariance estimation.
Returns
-------
np.ndarray
Covariance matrix of size (n_outputs, n_outputs) for the measurement process.
Notes
-----
The function performs the following steps:
1. Samples from the measurement noise distribution
2. Applies the measurement function to each sample
3. Computes the covariance matrix of the measurements
"""
# Sample from measurement distribution
noise_samples = dynamics.dist_obs.rvs(size=n_samples).reshape((n_samples, n_outputs))
# Initialize array for measurements
measurement_samples = np.zeros((n_samples, n_outputs))
# Apply measurement function to each sample
for i in range(n_samples):
measurement_samples[i, :] = dynamics.measurements(x, noise_samples[i, :])
# Compute and return covariance matrix (ensure 2D even for single output)
return np.atleast_2d(np.cov(measurement_samples, rowvar=False))