"""
Bayesian Maximum Entropy (BME) Module
======================================
This module provides functions for Bayesian Maximum Entropy spatiotemporal estimation
and analysis, including moment computation, local mean estimation, and covariance modeling.
"""
import itertools as it
import os
import numpy as np
import pandas as pd
import scipy.stats as st
[docs]
def compute_bme_moments(dfk, dfh, dfs, covmodel, covparam, nmax, dmax, order, options, path, name):
"""
Compute Bayesian Maximum Entropy moments for spatiotemporal estimation.
This function calculates statistical moments at estimation points using BME theory,
incorporating both hard (exact) and soft (probabilistic) data with spatiotemporal
covariance modeling.
Parameters
----------
dfk : pd.DataFrame
Estimation points with columns ['x', 'y', 't'] for spatial and temporal coordinates.
dfh : pd.DataFrame
Hard data (exact observations) with columns ['x', 'y', 't', 'h'].
dfs : pd.DataFrame
Soft data (probabilistic observations) with columns ['x', 'y', 't', 'h', 's'].
covmodel : str
Covariance model type (e.g., 'exponential', 'gaussian', 'spherical').
covparam : array-like
Parameters for the covariance model.
nmax : list of int
Maximum number of data points [hard_max, soft_max] to use in local neighborhoods.
dmax : list of float
Maximum distances [spatial_max, temporal_max, space_time_ratio] for data selection.
order : list of int
Polynomial regression orders [spatial_order, temporal_order] for local mean.
options : list
BME options [max_integration_intervals, num_moments, percentile].
path : str
Directory path for saving/loading cached results.
name : str
Base filename for cached results.
Returns
-------
np.ndarray
Array of shape (n_estimation_points, n_moments+1) containing estimated moments
at each estimation point. Columns include probability, mean, variance, and skewness.
Notes
-----
Results are cached to disk using NumPy's .npy format. If a cached file exists,
it will be loaded instead of recomputing.
The BME approach combines prior knowledge (covariance model) with hard and soft
data to produce optimal spatiotemporal estimates.
"""
# Import locally to avoid circular import
from environmentaltools.spatiotemporal import covariance
if os.path.isfile(os.path.join(path, name + ".npy")):
moments = np.load(os.path.join(path, name + ".npy"))
else:
# no of elements
nk = np.shape(dfk)[0]
moments = np.zeros([nk, int(options[1]) + 1]) * np.nan
# Main loop starts here
for i in range(len(dfk)):
print(
"Point no. "
+ str(i + 1).zfill(5)
+ " of "
+ str(len(dfk))
+ ". A "
+ str(np.round((i + 1) / float(len(dfk)) * 100, decimals=2)).zfill(5)
+ "% completed."
)
ck0 = pd.DataFrame([dfk.values[i]], index=[0], columns=["x", "y", "t"])
# print(ck0)
# Select the local neighbourhood for all variables
chl, zhl, dh, sumnh, _ = select_neighbours(
ck0, dfh.loc[:, ["x", "y", "t"]], dfh.loc[:, ["h"]], nmax[0], dmax
)
csl, zsl, ds, sumns, indexs = select_neighbours(
ck0, dfs.loc[:, ["x", "y", "t"]], dfs.loc[:, ["h"]], nmax[1], dmax
)
# Calculate the covariance matrices
k_kk = covariance.calculate_theoretical_covariance(covmodel, [0, 0], covparam)
k_hh = covariance.calculate_theoretical_covariance(
covmodel,
[
coordinates_to_distance(chl.loc[:, ["x", "y"]].values),
coordinates_to_distance(chl.loc[:, "t"].values),
],
covparam,
)
k_ss = covariance.calculate_theoretical_covariance(
covmodel,
[
coordinates_to_distance(csl.loc[:, ["x", "y"]].values),
coordinates_to_distance(csl.loc[:, "t"].values),
],
covparam,
)
k_kh = covariance.calculate_theoretical_covariance(covmodel, [dh[0], dh[1]], covparam)
k_ks = covariance.calculate_theoretical_covariance(covmodel, [ds[0], ds[1]], covparam)
k_sh = covariance.calculate_theoretical_covariance(
covmodel,
[
coordinates_to_distance(
csl.loc[:, ["x", "y"]].values, chl.loc[:, ["x", "y"]].values
),
coordinates_to_distance(csl.loc[:, "t"].values, chl.loc[:, "t"].values),
],
covparam,
)
if k_hh.size == 0:
khs = k_ss
else:
khs = np.vstack((np.hstack((k_hh, k_sh)), np.hstack((k_sh.T, k_ss))))
vsl = pd.DataFrame(dfs.iloc[indexs].s)
mkest, mhest, msest, _ = estimate_local_mean_bme(
ck0, chl, csl, zhl, zsl, vsl, khs, order
)
if sumnh > 0: # Subtract the mean from the hard data
zhl.h = zhl.h - mhest
if sumns > 0: # Subtract the mean from the soft data
zsl.h = zsl.h - msest
# Calculate the statistical moments
BsIFh = np.dot(k_sh.T, np.linalg.inv(k_hh))
KsIFh = k_ss - np.dot(BsIFh, k_sh)
kk_hs = np.hstack((k_kh, k_ks))
BkIFhs = np.dot(kk_hs, np.linalg.inv(khs))
KkIFhs = k_kk - np.dot(BkIFhs, kk_hs.T)
moments[i, 0], moments[i, 1], moments[i, 2], moments[i, 3] = calculate_moments(
zhl.h.values,
zsl.h.values,
vsl.values,
[sumnh, sumns],
BsIFh,
KsIFh,
BkIFhs,
KkIFhs,
options,
)
moments[i, 1] = moments[i, 1]
moments[i, 2] = moments[i, 2] ** 2
np.save(os.path.join(path, name + ".npy"), moments)
return moments
[docs]
def estimate_local_mean_bme(ck, ch, cs, zh, ms, vs, khs, order):
"""
Estimate local mean for Bayesian Maximum Entropy.
Computes local mean estimates at estimation points and data locations using
polynomial regression weighted by covariance structure.
Parameters
----------
ck : pd.DataFrame
Coordinates of estimation point with columns ['x', 'y', 't'].
ch : pd.DataFrame
Coordinates of hard data locations with columns ['x', 'y', 't'].
cs : pd.DataFrame
Coordinates of soft data locations with columns ['x', 'y', 't'].
zh : pd.DataFrame
Values of hard data.
ms : pd.DataFrame
Mean values of soft data.
vs : pd.DataFrame
Variance values of soft data.
khs : np.ndarray
Combined covariance matrix for hard and soft data.
order : list of int
Polynomial regression orders [spatial_order, temporal_order].
Returns
-------
mkest : float
Estimated mean at the estimation point.
mhest : np.ndarray
Estimated means at hard data locations.
msest : np.ndarray
Estimated means at soft data locations.
vkest : float
Variance of the estimated mean at the estimation point.
Notes
-----
Uses generalized least squares regression with covariance weighting to
estimate spatiotemporal trends in the data.
"""
nh = np.shape(zh)[0]
c = pd.concat([ch, cs], sort=False)
ind_diag = np.arange(nh, len(c))
khs[ind_diag, ind_diag] = np.diag(vs)
z = np.vstack((zh, ms))
best, vbest, mbest = estimate_bme_regression(c, z, order, khs)
mhest = mbest[:nh, 0]
msest = mbest[nh:, 0]
ck = np.vstack((ck.x.values, ck.y.values, ck.t.values)).T
x = create_design_matrix(ck, order)
mkest = np.dot(x, best)
vkest = np.dot(np.dot(x, vbest), x.T)
return mkest, mhest, msest, vkest
[docs]
def calculate_moments(zh, zs, vs, nhs, BsIFh, KsIFh, BkIFhs, KkIFhs, options):
"""
Calculate moments of the BME posterior probability density function.
Computes statistical moments (mean, variance, skewness) of the posterior
distribution using numerical integration over soft data intervals.
Parameters
----------
zh : np.ndarray
Hard data values.
zs : np.ndarray
Soft data mean values.
vs : np.ndarray
Soft data variance values.
nhs : list of int
Number of hard and soft data points [n_hard, n_soft].
BsIFh : np.ndarray
BME coefficient matrix for soft data given hard data.
KsIFh : np.ndarray
BME covariance matrix for soft data given hard data.
BkIFhs : np.ndarray
BME coefficient matrix for estimation point given all data.
KkIFhs : float
BME covariance at estimation point given all data.
options : list
Integration options [max_intervals, num_moments, percentile].
Returns
-------
tuple of float
(probability, mean, std_dev, skewness) - Four moments of the posterior PDF.
Notes
-----
Uses trapezoidal rule for numerical integration over soft data probability
intervals defined by the specified percentile bounds.
"""
nh, ns = nhs[0], nhs[1]
maxpts, nmom, perc = options[0], options[1], options[2]
Bs = np.ones(2)
moments = np.zeros(4)
if zh.size == 0:
msIFh = zs
else:
msIFh = np.dot(BsIFh, zh)
Bs[1] = np.dot(BkIFhs[:nh], zh)
per = np.tile([1 - perc, perc], (ns, 1))
zper = []
for i, j in enumerate(zs):
zper.append(st.norm.ppf(per[i], loc=j, scale=np.sqrt(vs[i])))
zper = np.asarray(zper)
As = np.zeros([ns, 2])
As[:, 1] = BkIFhs[nh:]
p = np.array([1, 1])
val = integrate_moment_vector(msIFh, vs, KsIFh, As, Bs, p, maxpts, zper)
moments[0] = val[0]
moments[1] = val[1] / val[0]
p = np.array([2, 3])
As[:, 0] = BkIFhs[nh:]
Bs[0] = Bs[1] - moments[1]
val = integrate_moment_vector(msIFh, vs, KsIFh, As, Bs, p, maxpts, zper)
moments[2] = np.sqrt(KkIFhs + val[0] / moments[0])
moments[3] = (val[1] / moments[0]) / moments[2] ** 3
return moments
[docs]
def integrate_moment_vector(ms, vs, ks, As, Bs, p, maxpts, zp):
"""
Compute moment integrals using the trapezoidal rule.
Performs numerical integration for BME moment calculations over soft data
probability intervals.
Parameters
----------
ms : np.ndarray
Mean values of soft data.
vs : np.ndarray
Variance values of soft data.
ks : np.ndarray
Covariance matrix for soft data.
As : np.ndarray
Coefficient matrix for moment calculation.
Bs : np.ndarray
Constant terms for moment calculation.
p : np.ndarray
Powers for moment orders [power1, power2].
maxpts : int
Maximum number of integration intervals.
zp : np.ndarray
Percentile bounds for integration intervals.
Returns
-------
np.ndarray
Array containing integrated moment values.
Notes
-----
Uses trapezoidal integration combined with multivariate normal distributions
to evaluate moments of the posterior distribution.
"""
maxpts = int(maxpts)
xs = np.asarray([np.linspace(zp[i, 0], zp[i, 1], maxpts) for i in range(len(zp))])
# def is_pos_def(x):
# return np.all(np.linalg.eigvals(x) > 0)
# if is_pos_def(ks):
stn = st.multivariate_normal(cov=ks)
# else:
# stn = st.multivariate_normal(cov=ks*0+np.diag(ks))
nms_c = stn.pdf(xs.T)
Bs = np.tile(Bs, (maxpts, 1)).T
if np.all(nms_c == 0):
nms_c = 1
if p[0]:
Bs[:, 1] = Bs[:, 1] * 0
elif p[0] == 2:
Bs = np.zeros(np.shape(Bs))
stn = st.norm(loc=ms, scale=np.sqrt(np.ravel(vs)))
ns = stn.pdf(xs.T)
p = np.tile(p, (maxpts, 1)).T
dxs = np.abs(xs[:, 1] - xs[:, 0])
I = np.zeros([2, maxpts])
for i in range(len(zp)):
I += (
(np.dot(As.T, xs) + Bs) ** p
* np.tile(ns[:, i], (2, 1))
* np.tile(nms_c, (2, 1))
* dxs[i]
)
results = np.sum(I, axis=1)
return results
[docs]
def apply_data_smoothing(dfh, dfs, dfk, nmax, dmax, path):
"""
Apply spatial smoothing to hard and soft data.
Performs kernel smoothing on spatiotemporal data to reduce noise and normalize
values for BME analysis.
Parameters
----------
dfh : pd.DataFrame
Hard data with columns ['x', 'y', 't', 'h'].
dfs : pd.DataFrame
Soft data with columns ['x', 'y', 't', 'h'].
dfk : pd.DataFrame
Estimation points with columns ['x', 'y', 't'].
nmax : list of int
Maximum number of neighbors [hard_max, soft_max].
dmax : list of float
Maximum distances [spatial_max, temporal_max, space_time_ratio].
path : str
Directory path for caching smoothed results.
Returns
-------
zh : np.ndarray
Smoothed hard data values.
zs : np.ndarray
Smoothed soft data values.
zk : np.ndarray
Smoothed values at estimation points.
dfh : pd.DataFrame
Normalized hard data.
dfs : pd.DataFrame
Normalized soft data.
Notes
-----
Results are cached to disk. Smoothing uses exponential distance weighting.
Data is standardized by subtracting smooth trend and dividing by standard deviation.
"""
if os.path.isfile(path + "/Smooth_h.npy"):
zh = np.load(path + "/Smooth_h.npy")
zs = np.load(path + "/Smooth_s.npy")
zk = np.load(path + "/Smooth_k.npy")
else:
zh = smooth_data(
dfh.loc[:, ["x", "y", "t"]],
dfh.loc[:, ["x", "y", "t"]],
dfh.loc[:, ["h"]],
0.2,
nmax[0],
dmax,
)
np.save(path + "/Smooth_h.npy", zh)
zs = smooth_data(
dfs.loc[:, ["x", "y", "t"]],
dfs.loc[:, ["x", "y", "t"]],
dfs.loc[:, ["h"]],
0.2,
nmax[0],
dmax,
)
np.save(path + "/Smooth_s.npy", zs)
dfhs = pd.concat([dfh, dfs], sort=False)
dfhs.reset_index(drop=True, inplace=True)
zk = smooth_data(
dfk.loc[:, ["x", "y", "t"]],
dfhs.loc[:, ["x", "y", "t"]],
dfhs.loc[:, ["h"]],
0.2,
nmax[0],
dmax,
)
np.save(path + "/Smooth_k.npy", zk)
dfh.h = (dfh.h - zh) / (dfh.h - zh).std()
dfs.h = (dfs.h - zs) / (dfs.h - zs).std()
dfhs = pd.concat([dfh, dfs], sort=False)
dfhs.reset_index(drop=True, inplace=True)
return zh, zs, zk, dfh, dfs
[docs]
def create_spatiotemporal_matrix(dfh, dfs):
"""
Create spatiotemporal data matrix.
Organizes hard and soft data into a matrix format where columns represent
unique spatial locations and rows represent time points.
Parameters
----------
dfh : pd.DataFrame
Hard data with columns ['x', 'y', 't', 'h'].
dfs : pd.DataFrame
Soft data with columns ['x', 'y', 't', 'h'].
Returns
-------
dfz : pd.DataFrame
Matrix of values where index is time and columns are spatial locations.
coups : np.ndarray
Array of unique spatial coordinate pairs [x, y].
coupt : np.ndarray
Array of unique time points.
Notes
-----
Missing data are represented as NaN values in the resulting matrix.
"""
cPt = dfh.append(dfs)
cPt.sort_index(sort=False, inplace=True)
coupt = cPt.t.unique()
coups = np.unique((cPt.x, cPt.y), axis=1).T
dfz = pd.DataFrame(np.nan, index=coupt, columns=(np.arange(len(coups))))
for i, j in enumerate(coups[:, 0]):
for t in coupt:
dfz.loc[t, i] = cPt[
((cPt.t == t) & (cPt.x == j) & (cPt.y == coups[i, 1]))
].h.values[0]
return dfz, coups, coupt
[docs]
def coordinates_to_distance(pi, *args):
"""
Compute distances between coordinate points.
Calculates pairwise distances for spatial (2D) or temporal (1D) coordinates,
or distances from points to a reference location.
Parameters
----------
pi : np.ndarray
Coordinate array. Shape (n,) for temporal or (n, 2) for spatial coordinates.
*args : tuple, optional
Reference coordinates for computing distances to a single point.
If provided, computes distances from all points in `pi` to this reference.
Returns
-------
dist : np.ndarray or pd.DataFrame
Distance matrix or array. If no args provided, returns symmetric distance
matrix between all points. If args provided, returns distances to reference.
Notes
-----
- For spatial data (2D): Euclidean distance
- For temporal data (1D): Absolute difference
- Distance matrices are symmetric with zeros on diagonal
"""
if args:
ncoor = len(np.shape(pi))
pi0 = args[0]
nl = np.shape(pi0)[0]
if nl == 1:
if ncoor == 1:
dist = np.abs(pi - pi0)
else:
dist = np.sqrt(
(pi[:, 0] - pi0[0, 0]) ** 2 + (pi[:, 1] - pi0[0, 1]) ** 2
)
else:
dist = []
if ncoor == 1:
for m in range(nl):
dist.append(np.abs(pi - pi0[m]))
else:
for m in range(nl):
dist.append(
np.sqrt(
(pi[:, 0] - pi0[m, 0]) ** 2 + (pi[:, 1] - pi0[m, 1]) ** 2
)
)
else:
ncoor = len(np.shape(pi))
df = [[] for ii in range(ncoor)]
if ncoor == 1:
pi = pi[:, np.newaxis]
for m in range(ncoor):
df[m] = pd.DataFrame(
0, index=np.arange(len(pi)), columns=np.arange(len(pi))
)
dx = []
for a, b in it.combinations_with_replacement(pi[:, m], 2):
dx.append(a - b)
dx = np.array(dx)
k, l = len(pi), 0
for i, j in enumerate(df[m].index):
df[m].loc[j, i:] = dx[l : l + k]
df[m].loc[i:, j] = dx[l : l + k]
l += k
k -= 1
if ncoor == 1:
dist = pd.DataFrame(np.abs(df[0]))
elif ncoor == 2:
dist = pd.DataFrame(np.sqrt(df[0] ** 2.0 + df[1] ** 2.0))
else:
dist = 0
return dist
[docs]
def coordinates_to_distance_angle(pi):
"""
Compute distances and angles between spatial coordinate points.
Calculates pairwise Euclidean distances and directional angles for 2D
spatial coordinates.
Parameters
----------
pi : np.ndarray
Spatial coordinate array of shape (n, 2) with columns [x, y].
Returns
-------
dist : pd.DataFrame
Symmetric matrix of Euclidean distances between all point pairs.
ang : pd.DataFrame
Matrix of angles in degrees (0-180) representing direction from
each point to every other point.
Notes
-----
Angles are computed using complex number representation and converted
to degrees in the range [0, 180).
"""
df = [[] for ii in range(2)]
for m in range(2):
df[m] = pd.DataFrame(0, index=np.arange(len(pi)), columns=np.arange(len(pi)))
dx = []
for a, b in it.combinations_with_replacement(pi[:, m], 2):
dx.append(a - b)
dx = np.array(dx)
k, l = len(pi), 0
for i, j in enumerate(df[m].index):
df[m].loc[j, i:] = dx[l : l + k]
df[m].loc[i:, j] = dx[l : l + k]
l += k
k -= 1
dist = pd.DataFrame(np.sqrt(df[0] ** 2.0 + df[1] ** 2.0))
ang = pd.DataFrame(np.angle(df[0] + 1j * df[1], deg=True))
ang[ang < 0] += 180
return dist, ang
[docs]
def find_pairs_by_distance(pi, plag, plagtol, *args):
"""
Find pairs of points separated by specified distance intervals.
Identifies point pairs whose separation falls within given distance lag
tolerances, optionally considering directional angles for spatial data.
Parameters
----------
pi : np.ndarray
Coordinate array (1D for temporal, 2D for spatial).
plag : array-like
Array of target distance lags.
plagtol : array-like
Array of distance tolerance values for each lag.
*args : tuple, optional
For directional analysis: [dlag, dlagtol] where dlag is array of
target angles and dlagtol is angular tolerance.
Returns
-------
idxpairs : list
Nested list of index tuples identifying point pairs meeting distance
criteria. Structure depends on whether directional analysis is included.
Notes
-----
Without directional args: Returns list of index arrays for each distance lag.
With directional args: Returns 2D list organized by [distance_lag][angle_bin].
"""
if not args:
nr = len(plag)
dist = coordinates_to_distance(pi)
idxpairs = [[] for i in range(0, nr)]
for ir in range(0, nr):
idxpairs[ir] = np.where(
(plag[ir] - plagtol[ir] <= dist) & (dist <= plag[ir] + plagtol[ir])
)
else:
dlag, dlagtol = args[0][0], args[0][1]
nr, nd = len(plag), len(dlag)
dist, ang = coordinates_to_distance_angle(pi)
idxpairs = [[[] for j in range(nd)] for i in range(nr)]
db = 2 * plag[1] * np.sin(dlagtol * np.pi / 360)
daI2 = np.arctan(db / (2 * dist)) * 180 / np.pi
for i in range(nd):
idxpairs[0][i] = np.where(
(plag[0] - plagtol[0] <= dist) & (dist <= plag[0] + plagtol[0])
)
idxpairs[1][i] = np.where(
(plag[1] - plagtol[1] <= dist)
& (dist <= plag[1] + plagtol[1])
& (dlag[1] - dlagtol / 2 <= ang)
& (ang <= dlag[1] + dlagtol / 2)
)
for id in range(nd):
for ir in range(2, nr):
idxpairs[ir][id] = np.where(
(plag[ir] - plagtol[ir] <= dist)
& (dist <= plag[ir] + plagtol[ir])
& (dlag[id] - daI2 <= ang)
& (ang <= dlag[id] + daI2)
)
return idxpairs
[docs]
def select_neighbours(c0, c, z, nmax, dmax):
"""
Select neighboring data points for local BME estimation.
Finds data points within specified spatiotemporal distance constraints,
limiting to a maximum number of nearest neighbors.
Parameters
----------
c0 : pd.DataFrame
Estimation point coordinates with columns ['x', 'y', 't'].
c : pd.DataFrame
Data point coordinates with columns ['x', 'y', 't'].
z : pd.DataFrame
Data values at coordinates `c`.
nmax : int
Maximum number of neighbors to select.
dmax : list of float
Maximum distances [spatial_max, temporal_max, space_time_ratio].
Returns
-------
csub : pd.DataFrame
Coordinates of selected neighbor points.
zsub : pd.DataFrame
Values at selected neighbor points.
dsub : list of np.ndarray
Distances [spatial, temporal, combined] for selected neighbors.
nsub : int
Number of selected neighbors.
index : np.ndarray
Indices of selected neighbors in original data.
Notes
-----
Combined distance uses weighted sum: ds + (space_time_ratio * dt).
If more than nmax neighbors meet distance criteria, selects nearest ones.
"""
# Computing distances
ds = coordinates_to_distance(c.loc[:, ["x", "y"]].values, c0.loc[:, ["x", "y"]].values)
dt = coordinates_to_distance(c.loc[:, "t"].values, c0.loc[:, "t"].values)
index = np.where((ds <= dmax[0]) & (dt <= dmax[1]))[0]
# Check if more data meet the conditions than allowed; select nearest
nsub = len(index)
dp = ds + dmax[2] * dt
if nsub > nmax:
di = dp[index]
dis, indexi = np.sort(di), np.argsort(di)
indexi = indexi[:nmax]
nsub = nmax
index = index[indexi]
dsub = [ds[index], dt[index], dp[index]]
if np.shape(z)[1] == 1:
zsub = z.iloc[index]
else:
zsub = z.iloc[index, :]
csub = c.iloc[index, :]
return csub, pd.DataFrame(zsub), dsub, nsub, index
[docs]
def estimate_bme_regression(c, z, order, k):
"""
Compute parameter estimates for linear regression with covariance weighting.
Performs generalized least squares regression incorporating spatiotemporal
covariance structure.
Parameters
----------
c : pd.DataFrame
Spatiotemporal coordinates with columns ['x', 'y', 't'].
z : np.ndarray or pd.DataFrame
Data values at coordinates.
order : list of int
Polynomial orders [spatial_order, temporal_order].
k : np.ndarray
Covariance matrix for the data.
Returns
-------
best : np.ndarray
Estimated regression coefficients.
vbest : np.ndarray
Covariance matrix of coefficient estimates.
zest : np.ndarray
Regression-fitted values at input coordinates.
Notes
-----
Uses the design matrix approach with polynomial terms up to specified orders.
Accounts for spatiotemporal correlation through the covariance matrix.
"""
c = np.vstack((c.x.values, c.y.values, c.t.values)).T
x = create_design_matrix(c, order)
xtinvk = np.dot(x.T, np.linalg.inv(k))
vbest = np.linalg.inv(np.dot(xtinvk, x))
best = np.dot(np.dot(vbest, xtinvk), z)
zest = np.dot(x, best)
return best, vbest, zest
[docs]
def create_design_matrix(c, order):
"""
Create design matrix for polynomial regression.
Constructs matrix of polynomial terms for spatiotemporal regression,
with separate spatial (x, y) and temporal (t) terms.
Parameters
----------
c : np.ndarray
Coordinate matrix of shape (n, 3) with columns [x, y, t].
order : list of int
Polynomial orders [spatial_order, temporal_order].
Returns
-------
x : np.ndarray
Design matrix of shape (n, n_terms) where n_terms = 1 + 2*spatial_order + temporal_order.
Column 0 is constant term, followed by spatial terms (x, y) up to specified order,
then temporal terms up to specified order.
Notes
-----
Spatial terms alternate between x and y coordinates: [1, x, y, x², y², x³, y³, ...].
Temporal terms follow: [t, t², t³, ...].
"""
n, nd = np.shape(c)
x = np.ones([n, 1 + order[0] * 2 + order[1]])
for i in range(order[0]):
x[:, 1 + i * 2 : i * 2 + 3] = c[:, 0:2] ** (i + 1)
for i in range(order[1]):
x[:, 1 + order[0] * 2 + i] = c[:, 2] ** (i + 1)
return x
[docs]
def smooth_data(ck, chs, zhs, v, nmax, dmax):
"""
Apply spatial smoothing to data using exponential distance weighting.
Performs kernel smoothing on spatiotemporal data to reduce noise.
Parameters
----------
ck : pd.DataFrame
Estimation point coordinates with columns ['x', 'y', 't'].
chs : pd.DataFrame
Data point coordinates with columns ['x', 'y', 't'].
zhs : pd.DataFrame
Data values.
v : float
Bandwidth parameter for exponential kernel.
nmax : int
Maximum number of neighbors to use.
dmax : list of float
Maximum distances [spatial_max, temporal_max, space_time_ratio].
Returns
-------
zk : np.ndarray
Smoothed values at estimation points.
Notes
-----
Uses exponential kernel: weight = exp(-distance / v).
Weights are normalized to sum to 1 for each estimation point.
"""
zk = np.zeros(len(ck))
for i in range(len(ck)):
print(
"Point no. "
+ str(i + 1).zfill(5)
+ " of "
+ str(len(ck))
+ ". A "
+ str(np.round((i + 1) / float(len(ck)) * 100, decimals=2)).zfill(5)
+ "% completed."
)
ck0 = ck[ck.index == i]
chl, zhl, dhl, sumnh, _ = select_neighbours(ck0, chs, zhs, nmax, dmax)
if sumnh > 0:
lam = np.exp(-dhl[2] / v)
lam = lam / np.sum(lam)
zk[i] = np.dot(lam.T, zhl)
return zk