Source code for environmentaltools.spatiotemporal.covariance

"""
Spatiotemporal Covariance Module
=================================

This module provides functions for computing and modeling spatiotemporal covariance
structures used in Bayesian Maximum Entropy estimation.

Functions include empirical covariance calculation, directional covariance analysis,
theoretical covariance models, and model fitting.
"""

import warnings as warn

import numpy as np
from environmentaltools.spatiotemporal.bme import (
    find_pairs_by_distance,
    create_spatiotemporal_matrix,
)


[docs] def compute_spatiotemporal_covariance(dfh, dfs, slag, tlag): """ Compute empirical spatiotemporal covariance. Calculates the covariance function for spatiotemporal data at specified spatial and temporal lag distances. Parameters ---------- dfh : pd.DataFrame Hard data with columns ['x', 'y', 't', 'h']. dfs : pd.DataFrame Soft data with columns ['x', 'y', 't', 'h']. slag : np.ndarray Vector of spatial lag distances at which to compute covariance. tlag : np.ndarray Vector of temporal lag distances at which to compute covariance. Returns ------- empcovst : np.ndarray Matrix of empirical spatiotemporal covariance values, shape (n_spatial, n_temporal). pairsnost : np.ndarray Number of valid data pairs at each spatiotemporal lag, shape (n_spatial, n_temporal). covdists : np.ndarray Spatial distance grid for covariance values. covdistt : np.ndarray Temporal distance grid for covariance values. Notes ----- The covariance at lag (0,0) is set to the variance of the data. Computes sample covariance: Cov(X,Y) = E[XY] - E[X]E[Y]. """ slagtol = np.hstack((0, np.diff(slag))) tlagtol = np.hstack((0, np.diff(tlag))) dfz, coups, coupt = create_spatiotemporal_matrix(dfh, dfs) ns, nt = len(slag), len(tlag) idxpairs = find_pairs_by_distance(coups, slag, slagtol) idtpairs = find_pairs_by_distance(coupt, tlag, tlagtol) # Computing covariance empcovst = np.ones((ns, nt)) * np.nan pairsnost = np.zeros((ns, nt)) xmean, ymean, xymean = ( np.ones((ns, nt)) * np.nan, np.ones((ns, nt)) * np.nan, np.ones((ns, nt)) * np.nan, ) for inds in range(ns): # Test if we have data for this spatial lag if np.shape(idxpairs[inds])[1] != 0: for indt in range(0, nt): # Test if we have data for this temporal lag if np.shape(idtpairs[indt])[1] != 0: xcol = dfz.loc[idtpairs[indt][0], idxpairs[inds][0]].values xrow = dfz.loc[idtpairs[indt][1], idxpairs[inds][1]].values idxValid = ~np.isnan(xcol) & ~np.isnan(xrow) pairsnost[inds, indt] = np.sum(idxValid) if pairsnost[inds, indt] != 0: xcol = xcol[idxValid] xrow = xrow[idxValid] xmean[inds, indt] = np.mean(xcol) ymean[inds, indt] = np.mean(xrow) xymean[inds, indt] = np.mean(xcol * xrow) empcovst[inds, indt] = ( xymean[inds, indt] - xmean[inds, indt] * ymean[inds, indt] ) empcovst[0, 0] = dfz.stack().var() covdists = np.tile(slag, (nt, 1)).T covdistt = np.tile(tlag, (ns, 1)) return empcovst, pairsnost, covdists, covdistt
[docs] def compute_directional_covariance(dfh, dfs, slag, tlag, dinfo): """ Compute spatiotemporal covariance for multiple directional bins. Calculates anisotropic (direction-dependent) covariance structures by analyzing data pairs grouped by their spatial direction. Parameters ---------- dfh : pd.DataFrame Hard data with columns ['x', 'y', 't', 'h']. dfs : pd.DataFrame Soft data with columns ['x', 'y', 't', 'h']. slag : np.ndarray Vector of spatial lag distances. tlag : np.ndarray Vector of temporal lag distances. dinfo : list Directional information [dlag, dlagtol] where: - dlag: array of directional angles (degrees) - dlagtol: angular tolerance for binning Returns ------- empcovst : np.ndarray 3D array of empirical covariance, shape (n_spatial, n_directions, n_temporal). pairsnost : np.ndarray 3D array of pair counts, shape (n_spatial, n_temporal, n_directions). covdist : np.ndarray Spatial distance grid. covdistd : np.ndarray Directional angle grid. covdistt : np.ndarray Temporal distance vector. Notes ----- Useful for detecting and modeling spatial anisotropy in spatiotemporal fields. Directions are typically binned (e.g., 0°, 45°, 90°, 135°) to capture orientation-dependent correlation structures. """ slagtol = np.hstack((0, np.diff(slag))) tlagtol = np.hstack((0, np.diff(tlag))) dlag, dlagtol = dinfo[0], dinfo[1] dfz, coups, coupt = create_spatiotemporal_matrix(dfh, dfs) ns, nt, nd = len(slag), len(tlag), len(dlag) idxpairs = find_pairs_by_distance(coups, slag, slagtol, [dlag, dlagtol]) idtpairs = find_pairs_by_distance(coupt, tlag, tlagtol) # Computing directional covariance empcovst = np.ones((ns, nd, nt)) * np.nan pairsnost = np.zeros((ns, nt, nd)) xmean, ymean, xymean = ( np.ones((ns, nt, nd)) * np.nan, np.ones((ns, nt, nd)) * np.nan, np.ones((ns, nt, nd)) * np.nan, ) for indd in range(nd): print("indd: " + str(indd)) for inds in range(ns): print("inds: " + str(inds)) # Test if we have data for this spatial lag if np.shape(idxpairs[inds])[1] > 1: for indt in range(nt): print("indt: " + str(indt)) # Test if we have data for this temporal lag if np.shape(idtpairs[indt])[1] > 1: xcol = dfz.loc[ idtpairs[indt][0], idxpairs[inds][indd][0] ].values xrow = dfz.loc[ idtpairs[indt][1], idxpairs[inds][indd][1] ].values idxValid = ~np.isnan(xcol) & ~np.isnan(xrow) pairsnost[inds, indt, indd] = np.sum(idxValid) if pairsnost[inds, indt, indd] != 0: xcol = xcol[idxValid] xrow = xrow[idxValid] xmean[inds, indt, indd] = np.mean(xcol) ymean[inds, indt, indd] = np.mean(xrow) xymean[inds, indt, indd] = np.mean(xcol * xrow) empcovst[inds, indd, indt] = ( xymean[inds, indt, indd] - xmean[inds, indt, indd] * ymean[inds, indt, indd] ) empcovst[0, 0, 0] = dfz.stack().var() covdist, covdistd, covdistt = np.tile(slag, (nd, 1)), np.tile(dlag, (ns, 1)).T, tlag return empcovst, pairsnost, covdist, covdistd, covdistt
[docs] def calculate_theoretical_covariance(name, D, param): """ Compute theoretical spatiotemporal covariance from parametric models. Evaluates covariance functions from theoretical models at specified spatiotemporal separation distances. Parameters ---------- name : str Covariance model family. Supported models: - 'exponentialST': Separable exponential model - 'exponentialSTC': Non-separable exponential model with interaction term D : list of np.ndarray Separation distances [spatial_distances, temporal_distances]. param : array-like Model parameters. Length depends on model: - 'exponentialST': [sill, spatial_range, temporal_range, nugget] - 'exponentialSTC': [sill, spatial_range, temporal_range, interaction_range, nugget] Returns ------- res : np.ndarray Theoretical covariance values at the specified separations. Raises ------ Warning If requested model is not implemented. Notes ----- **Exponential ST Model (separable)**: C(d,t) = sill * exp(-d/range_s - t/range_t) - nugget **Exponential STC Model (non-separable with interaction)**: C(d,t) = sill * exp(-√d/range_s - t/range_t - √d*t/range_st) - nugget The nugget effect represents measurement error or micro-scale variability. """ d, t = np.asarray(D[0]), np.asarray(D[1]) if name == "exponentialST": res = param[0] * np.exp(-d / param[1] - t / param[2]) - param[3] elif name == "exponentialSTC": res = ( param[0] * np.exp(-(d ** 0.5) / param[1] - t / param[2] - d ** 0.5 * t / param[3]) - param[4] ) else: warn.warn("Sorry! This model is not implemented yet.") return res
[docs] def fit_covariance_model(param, empcovst, dist, name): """ Compute sum of squared errors between theoretical and empirical covariance. Objective function for least squares fitting of covariance model parameters to empirical covariance estimates. Parameters ---------- param : array-like Covariance model parameters to optimize. empcovst : np.ndarray Empirical spatiotemporal covariance matrix. dist : list of np.ndarray Spatiotemporal separation distances [spatial, temporal]. name : str Covariance model family name (e.g., 'exponentialST', 'exponentialSTC'). Returns ------- float Sum of squared errors between theoretical and empirical covariance. Notes ----- This function is typically used with scipy.optimize.minimize or similar optimization routines to find optimal model parameters. The objective is: SSE = Σ(C_empirical - C_theoretical)² """ cov = calculate_theoretical_covariance(name, dist, param) return np.sum(np.sum(((empcovst - cov) ** 2), axis=1), axis=0)