from datetime import date
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.stats as st
from scipy.interpolate import Rbf
from scipy.optimize import differential_evolution, minimize
from scipy.signal import savgol_filter
from sklearn import svm
from sklearn.covariance import EllipticEnvelope
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from environmentaltools.common import save
from environmentaltools.temporal import analysis
from environmentaltools.temporal import core
[docs]
def max_moving_window(data: pd.DataFrame, duration: int) -> pd.DataFrame:
"""Select peaks from time series using a moving window.
Identifies peak values that occur at the center of a moving window,
effectively filtering out values that are not local maxima within
the specified duration.
Args:
data (pd.DataFrame): Time series data.
duration (int): Duration of the moving window (in index units).
Returns:
pd.DataFrame: DataFrame containing only the detected peaks with
their original timestamps.
"""
n = len(data)
id_ = [] # List to store indices of detected peaks
# Slide a window of length 'duration' across the data
for k in range(0, n - duration):
# Find the index of the maximum value within the current window
idx = data.iloc[k : k + duration + 1].idxmax()
# Only select the peak if it is at the center of the window
if idx == data.index[k + int(duration / 2)]:
id_.append(idx)
if id_ is []:
for k in range(0, n - duration):
# Find the index of the minimum value within the current window
idx = data.iloc[k : k + duration + 1].idxmin()
# Only select the peak if it is at the center of the window
if idx == data.index[k + int(duration / 2)]:
id_.append(idx)
# Create a DataFrame with the selected peaks
results = pd.DataFrame(data.loc[id_], index=id_)
return results
[docs]
def gaps(data: pd.DataFrame, variables: str | list, file_name: str = "gaps", buoy: bool = False):
"""Create summary table of data gaps for time series variables.
Analyzes time series to identify gaps, sampling frequency, and data quality
metrics. Saves results to Excel file.
Args:
data (pd.DataFrame): Time series data with datetime index.
variables (str or list): Variable name(s) to analyze for gaps.
file_name (str): Output filename (without extension) for the gap summary
table. Defaults to "gaps".
buoy (bool): If True, includes quality control metrics assuming 'Qc_e'
column exists. Defaults to False.
Returns:
pd.DataFrame: Summary table with columns including cadency, accuracy,
period, number of years, gap percentage, median gap, and maximum gap.
"""
if not isinstance(variables, list):
variables = [variables] # Ensure variables is a list
# Define columns for the output table depending on whether it's buoy data
if not buoy:
columns_ = [
"Cadency (min)",
"Accuracy*",
"Period",
"No. years",
"Gaps (%)",
"Med. gap (hr)",
"Max. gap (d)",
]
else:
columns_ = [
"Cadency (min)",
"Accuracy*",
"Period",
"No. years",
"Gaps (%)",
"Med. gap (hr)",
"Max. gap (d)",
"Quality data (%)",
]
# Initialize the output DataFrame
tbl_gaps = pd.DataFrame(
0,
columns=columns_,
index=variables,
)
tbl_gaps.index.name = "var"
for i in variables:
dt_nan = data[i].dropna() # Remove NaNs for the variable
if buoy:
# Count good quality data points if buoy
quality = np.sum(data.loc[dt_nan.index, "Qc_e"] <= 2)
# Calculate time differences (in hours) between consecutive non-NaN samples
dt0 = (dt_nan.index[1:] - dt_nan.index[:-1]).total_seconds() / 3600
# Identify gaps as intervals significantly larger than the median
dt = dt0[dt0 > np.median(dt0) + 0.1].values
if dt.size == 0:
dt = 0 # No gaps found
# Calculate the most common sampling interval (accuracy)
acc = st.mode(np.diff(dt_nan.sort_values().unique()))[0]
# Fill the summary table for this variable
tbl_gaps.loc[i, "Cadency (min)"] = np.round(st.mode(dt0)[0]*60, decimals=2)
tbl_gaps.loc[i, "Accuracy*"] = np.round(acc, decimals=2)
tbl_gaps.loc[i, "Period"] = str(dt_nan.index[0]) + "-" + str(dt_nan.index[-1])
tbl_gaps.loc[i, "No. years"] = dt_nan.index[-1].year - dt_nan.index[0].year
tbl_gaps.loc[i, "Gaps (%)"] = np.round(
np.sum(dt) / data[i].shape[0] * 100, decimals=2
)
tbl_gaps.loc[i, "Med. gap (hr)"] = np.round(np.median(dt), decimals=2)
tbl_gaps.loc[i, "Max. gap (d)"] = np.round(np.max(dt)/24, decimals=2)
if buoy:
# Add percentage of good quality data for buoys
tbl_gaps.loc[i, "Quality data (%)"] = np.round(
quality / len(dt_nan) * 100, decimals=2
)
# Save the table to Excel file
save.to_xlsx(tbl_gaps, file_name)
return tbl_gaps
[docs]
def nonstationary_ecdf(
data: pd.DataFrame,
variable: str,
wlen: float = 14 / 365.25,
equal_windows: bool = False,
pemp: list = None,
):
"""Computes empirical percentiles using a moving window.
Args:
data (pd.DataFrame): Time series.
variable (str): Name of the variable.
wlen (float): Length of window in years (default 14 days).
equal_windows (bool): If True, use equal window size.
pemp (list, optional): Empirical percentiles to use.
Returns:
pd.DataFrame: Values of the given non-stationary percentiles.
list: Chosen empirical percentiles.
"""
timestep = 1 / 365.25 # Default timestep: daily in years
if equal_windows:
timestep = wlen # Use window length as timestep if requested
# Choose default percentiles if not provided, with special cases for some variables
if pemp is None:
pemp = np.array([0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99])
if variable.lower().startswith("d") | (variable == "Wd"):
pemp = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
if (variable == "Hs") | (variable == "Hm0"):
pemp = np.array([0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.995])
# Create result DataFrame: rows are unique normalized times, columns are percentiles
res = pd.DataFrame(0, index=data.n.unique(), columns=pemp)
# For each time index, compute percentiles in a moving window
for i in res.index:
if i >= (1 - wlen):
# Handle window at the end of the time series (wrap-around)
final_offset = i + wlen - 1
mask = ((data["n"] >= i - wlen) & (data["n"] <= i + wlen)) | (
data["n"] <= final_offset
)
elif i <= wlen:
# Handle window at the start of the time series (wrap-around)
initial_offset = i - wlen
mask = ((data["n"] >= i - wlen) & (data["n"] <= i + wlen)) | (
data["n"] >= 1 + initial_offset
)
else:
# Standard window in the middle
mask = (data["n"] >= i - wlen) & (data["n"] <= i + wlen)
# Compute percentiles for the window
res.loc[i, pemp] = data[variable].loc[mask].quantile(q=pemp).values
return res, pemp
[docs]
def best_params(data: pd.DataFrame, bins: int, distrib: str, tail: bool = False):
"""Computes the best parameters of a simple probability model based on the RMSE of the PDF.
Args:
data (pd.DataFrame): Raw time series.
bins (int): Number of bins for the histogram.
distrib (str): Name of the probability model.
tail (bool, optional): If True, fit only the tail. Defaults to False.
Returns:
list: The estimated parameters.
"""
dif_, sser = 1e2, 1e3
nlen = int(len(data) / 200)
data = data.sort_values(ascending=True).values
while (dif_ > 1) & (sser > 30) & (0.95 * nlen < len(data)):
# Fit the distribution using the statistical_fit module
results = fit_(data, bins, distrib)
sse, params = results[0], results[1:]
dif_, sser = np.abs(sser - sse), sse
if tail:
data = data[int(nlen / 4) :]
else:
data = data[nlen:-nlen]
return params
def fit_(data: pd.DataFrame, bins: int, model: str):
"""Fits a simple probability model and computes the sse with the empirical pdf
Args:
* data (pd.DataFrame): raw time series
* bins (int): no. of bins for the histogram
* model (string): name of the probability model
Returns:
* results (np.array): the parameters computed
"""
y, x = np.histogram(data, bins=bins, density=True)
xq = np.diff(x)
x = (x + np.roll(x, -1))[:-1] / 2.0
yq = np.cumsum(xq * y)
if model is st.genpareto:
params = model.fit(data, 0.01, loc=np.min(data))
else:
params = model.fit(data)
cdf = model.cdf(x, loc=params[-2], scale=params[-1], *params[:-2])
sse = np.sum((yq - cdf) ** 2)
if np.isnan(sse):
sse = 1e10
results = np.zeros(len(params) + 1)
results[: int(len(params) + 1)] = np.append(sse, params)
return results
[docs]
def ecdf(df: pd.DataFrame, variable: str, num_percentiles: int | bool = False) -> pd.DataFrame:
"""Compute the empirical cumulative distribution function (ECDF).
Calculates non-exceedance probabilities for the variable values. Can
optionally interpolate to a specified number of percentiles.
Args:
df (pd.DataFrame): Raw time series data.
variable (str): Name of the variable column to analyze.
num_percentiles (int | bool, optional): Number of empirical percentiles
to interpolate. If False, returns all data points. Defaults to False.
Returns:
pd.DataFrame: DataFrame with variable values and their non-exceedance
probabilities. Index represents probability values.
"""
dfs = df[variable].sort_values().to_frame()
dfs.index = np.arange(1, len(dfs) + 1) / (len(dfs) + 1)
if not isinstance(num_percentiles, bool):
percentiles = np.linspace(1 / num_percentiles, 1 - (1 / num_percentiles), num_percentiles)
values = np.interp(percentiles, dfs.index, dfs[variable])
dfs = pd.DataFrame(values, columns=[variable], index=percentiles)
return dfs
[docs]
def nonstationary_epdf(
data: pd.DataFrame, variable: str, wlen: float = 14 / 365.25, no_values: int = 14
):
"""Computes the empirical PDF using a moving window.
Args:
data (pd.DataFrame): Time series.
variable (str): Name of the variable.
wlen (float): Length of window in years (default 14 days).
no_values (int): Number of values for the PDF.
Returns:
pd.DataFrame: Values of the non-stationary PDF.
"""
nlen = len(data)
ndates = np.arange(0, 1, 1 / 365.25)
values_ = np.linspace(data[variable].min(), data[variable].max(), no_values)
pdf_ = pd.DataFrame(-1, index=ndates, columns=(values_[:-1] + values_[1:]) / 2)
columns = pdf_.columns
for ind_, col_ in enumerate(columns[:-1]):
for i in pdf_.index:
if i > (1 - wlen):
final_offset = i + wlen - 1
mask = (
(data["n"] > i - wlen)
& (data["n"] <= i + wlen)
& (data[variable] > col_)
& (data[variable] <= columns[ind_ + 1])
) | (data["n"] <= final_offset)
elif i < wlen:
initial_offset = i - wlen
mask = (
(data["n"] >= i - wlen)
& (data["n"] <= i + wlen)
& (data[variable] > col_)
& (data[variable] <= columns[ind_ + 1])
) | (data["n"] >= 1 + initial_offset)
else:
mask = (
(data["n"] >= i - wlen)
& (data["n"] <= i + wlen)
& (data[variable] > col_)
& (data[variable] <= columns[ind_ + 1])
)
pdf_.loc[i, col_] = np.sum(mask) / nlen
return pdf_
[docs]
def epdf(df: pd.DataFrame, variable: str, num_bins: int = 14) -> pd.DataFrame:
"""Compute the empirical probability distribution function (PDF).
Creates a histogram-based empirical PDF by binning the variable values
and calculating probability densities for each bin.
Args:
df (pd.DataFrame): Raw time series data.
variable (str): Name of the variable column to analyze.
num_bins (int, optional): Number of bins for the histogram. Defaults to 14.
Returns:
pd.DataFrame: DataFrame with bin centers as index and 'prob' column
containing probability densities.
"""
dfs = df[variable].sort_values().to_frame()
dfs["prob"] = np.arange(1, len(dfs) + 1)
count_ = pd.DataFrame(-1, index=dfs[variable].unique(), columns=["prob"])
for _, ind_ in enumerate(count_.index):
count_.loc[ind_] = np.sum(dfs[variable] == ind_)
values_ = np.linspace(df[variable].min(), df[variable].max(), num_bins)
pdf_ = pd.DataFrame(-1, index=(values_[:-1] + values_[1:]) / 2, columns=["prob"])
for ind_, index_ in enumerate(pdf_.index):
# range_ = np.interp(values_, pdf_["prob"], pdf_[variable])
val_ = np.sum(
count_.loc[
((count_.index < values_[ind_ + 1]) & (count_.index > values_[ind_]))
].values
)
pdf_.loc[index_, "prob"] = val_
pdf_.loc[:, "prob"] = pdf_["prob"] / (np.sum(pdf_).values * np.diff(values_))
return pdf_
[docs]
def acorr(data: np.ndarray | pd.Series, max_lags: int = 24):
"""Compute autocorrelation function of a time series.
Calculates the normalized autocorrelation for a range of lags using
matplotlib's autocorrelation function.
Args:
data (np.ndarray or pd.Series): Input time series data.
max_lags (int): Maximum number of lags to compute. Defaults to 24.
Returns:
tuple: (lags, autocorrelation) - Arrays of lag values and corresponding
autocorrelation coefficients.
"""
lags, c_, _, _ = plt.acorr(data, usevlines=False, maxlags=max_lags, normed=True)
plt.close()
# lags, c_ = lags[-maxlags:], c_[-maxlags:]
return lags, c_
[docs]
def bidimensional_ecdf(data1: np.ndarray, data2: np.ndarray, num_bins: int):
"""Compute empirical 2D cumulative distribution function (ECDF).
Calculates the joint empirical CDF for two variables using a 2D histogram
approach with cumulative summation.
Args:
data1 (np.ndarray): Values of the first variable.
data2 (np.ndarray): Values of the second variable.
num_bins (int): Number of bins for the 2D histogram in each dimension.
Returns:
tuple: (x_mesh, y_mesh, ecdf_values) - 2D meshgrids of bin centers and
corresponding cumulative probability values.
"""
f, xedges, yedges = np.histogram2d(data1, data2, bins=num_bins)
Fe = np.cumsum(np.cumsum(f, axis=0), axis=1) / (np.sum(f) + 1)
Fe = np.flipud(np.rot90(Fe))
xmid, ymid = (xedges[0:-1] + xedges[1:]) / 2, (yedges[0:-1] + yedges[1:]) / 2
xe, ye = np.meshgrid(xmid, ymid)
return xe, ye, Fe
[docs]
def bias_adjustment(
obs,
hist,
rcp,
variable,
funcs=["gumbel_l", "gumbel_r"],
quantiles=[0.1, 0.9],
params=None,
):
"""
Bias adjustment for climate data using parametric quantile mapping.
Args:
obs (pd.DataFrame): Observed data.
hist (pd.DataFrame): Historical simulation data.
rcp (pd.DataFrame): Scenario/projection data.
variable (str): Variable name to adjust.
funcs (list, optional): List of distribution names. Defaults to ["gumbel_l", "gumbel_r"].
quantiles (list, optional): Quantiles for tail adjustment. Defaults to [0.1, 0.9].
params (dict, optional): Precomputed distribution parameters. Defaults to None.
Returns:
tuple: (hist, rcp) with bias-adjusted values in column 'unbiased'.
"""
funcs = funcs.copy()
for index, fun in enumerate(funcs):
funcs[index] = getattr(st, fun)
hist["unbiased"] = 0
rcp["unbiased"] = 0
# Low tail
low_tail_hist = hist.loc[
hist[variable] <= hist[variable].quantile(quantiles[0]), variable
]
if isinstance(params, dict):
if "obs_low" in params:
params_obs_low = params["obs_low"]
else:
params_obs_low = funcs[0].fit(
obs.loc[obs[variable] <= obs[variable].quantile(quantiles[0]), variable]
)
if isinstance(params, dict):
if "hist_low" in params:
params_hist_low = params["hist_low"]
else:
params_hist_low = funcs[0].fit(low_tail_hist)
hist.loc[hist[variable] <= hist[variable].quantile(quantiles[0]), "unbiased"] = (
funcs[0].ppf(funcs[0].cdf(low_tail_hist, *params_hist_low), *params_obs_low)
)
low_tail_rcp = rcp.loc[
rcp[variable] <= rcp[variable].quantile(quantiles[0]), variable
]
rcp.loc[rcp[variable] <= rcp[variable].quantile(quantiles[0]), "unbiased"] = funcs[
0
].ppf(funcs[0].cdf(low_tail_rcp, *params_hist_low), *params_obs_low)
# High tail
high_tail_hist = hist.loc[
hist[variable] >= hist[variable].quantile(quantiles[1]), variable
]
if isinstance(params, dict):
if "obs_high" in params:
params_obs_high = params["obs_high"]
else:
params_obs_high = funcs[1].fit(
obs.loc[obs[variable] >= obs[variable].quantile(quantiles[1]), variable]
)
if isinstance(params, dict):
if "hist_high" in params:
params_hist_high = params["hist_high"]
else:
params_hist_high = funcs[1].fit(high_tail_hist)
hist.loc[hist[variable] >= hist[variable].quantile(quantiles[1]), "unbiased"] = (
funcs[1].ppf(funcs[1].cdf(high_tail_hist, *params_hist_high), *params_obs_high)
)
high_tail_rcp = rcp.loc[
rcp[variable] >= rcp[variable].quantile(quantiles[1]), variable
]
rcp.loc[rcp[variable] >= rcp[variable].quantile(quantiles[1]), "unbiased"] = funcs[
1
].ppf(funcs[1].cdf(high_tail_rcp, *params_hist_high), *params_obs_high)
# Body
n_obs = len(obs)
obs_sort = np.sort(obs[variable])
# body_obs_quantile = np.linspace(quantiles[0], quantiles[1], 20)
# cdf_obs = ecdf(obs, variable)
# body_obs_values = obs[variable].quantile(body_obs_quantile)
cdf_hist = ecdf(hist, variable)
# body_hist_values = hist[variable].quantile(body_obs_quantile)
values_body_hist = hist.loc[
(hist[variable] > hist[variable].quantile(quantiles[0]))
& (hist[variable] < hist[variable].quantile(quantiles[1])),
variable,
]
hist_adj_list = list()
Fe_Fhist_hist = np.interp(values_body_hist, cdf_hist[variable], cdf_hist["prob"])
for vari_Fe in Fe_Fhist_hist:
pos = int(n_obs * vari_Fe)
if pos >= n_obs:
print(pos)
pos = n_obs - 1
hist_adj_list.append(obs_sort[pos])
hist.loc[
(hist[variable] > hist[variable].quantile(quantiles[0]))
& (hist[variable] < hist[variable].quantile(quantiles[1])),
"unbiased",
] = np.asarray(hist_adj_list)
values_body_rcp = rcp.loc[
(rcp[variable] > rcp[variable].quantile(quantiles[0]))
& (rcp[variable] < rcp[variable].quantile(quantiles[1])),
variable,
]
rcp_adj_list = list()
Fe_Frcp_hist = np.interp(values_body_rcp, cdf_hist[variable], cdf_hist["prob"])
for vari_Fe in Fe_Frcp_hist:
pos = int(n_obs * vari_Fe)
if pos >= n_obs:
print(pos)
pos = n_obs - 1
rcp_adj_list.append(obs_sort[int(pos)])
rcp.loc[
(rcp[variable] > rcp[variable].quantile(quantiles[0]))
& (rcp[variable] < rcp[variable].quantile(quantiles[1])),
"unbiased",
] = np.asarray(rcp_adj_list)
fit_params = {
"hist_low": params_hist_low,
"hist_high": params_hist_high,
"obs_low": params_obs_low,
"obs_high": params_obs_high,
}
return hist, rcp, fit_params
[docs]
def probability_mapping(obs, hist, rcp, variable, func):
"""
Apply parametric probability mapping for bias correction.
Args:
obs (pd.DataFrame): Observed data.
hist (pd.DataFrame): Historical simulation data.
rcp (pd.DataFrame): Scenario/projection data.
variable (str): Variable name to adjust.
func (str): Name of the distribution to use.
Returns:
tuple: (hist, rcp) with bias-adjusted values in column 'unbiased'.
"""
func = getattr(st, func)
params_obs = func.fit(obs[variable])
params_hist = func.fit(hist[variable])
rcp["unbiased"] = func.ppf(func.cdf(rcp[variable], *params_hist), *params_obs)
hist["unbiased"] = func.ppf(func.cdf(hist[variable], *params_hist), *params_obs)
return hist, rcp
[docs]
def empirical_cdf_mapping(obs, hist, rcp, variable):
"""
Apply empirical CDF mapping for bias correction.
Args:
obs (pd.DataFrame): Observed data.
hist (pd.DataFrame): Historical simulation data.
rcp (pd.DataFrame): Scenario/projection data.
variable (str): Variable name to adjust.
Returns:
tuple: (hist, rcp) with bias-adjusted values in column 'unbiased'.
"""
n_obs = len(obs)
obs_sort = np.sort(obs[variable])
cdf_hist = ecdf(hist, variable)
hist_adj_list = list()
Fe_Fhist_hist = np.interp(hist[variable], cdf_hist[variable], cdf_hist["prob"])
for vari_Fe in Fe_Fhist_hist:
pos = int(n_obs * vari_Fe)
if pos >= n_obs:
print(pos)
pos = n_obs - 1
hist_adj_list.append(obs_sort[pos])
hist["unbiased"] = np.asarray(hist_adj_list)
rcp_adj_list = list()
Fe_Frcp_hist = np.interp(rcp[variable], cdf_hist[variable], cdf_hist["prob"])
for vari_Fe in Fe_Frcp_hist:
pos = int(n_obs * vari_Fe)
if pos >= n_obs:
print(pos)
pos = n_obs - 1
rcp_adj_list.append(obs_sort[pos])
rcp["unbiased"] = np.asarray(rcp_adj_list)
return hist, rcp
[docs]
def rotate_geo2nav(ang):
"""
Convert angles from geographic (0°=East, 90°=North) to navigational (0°=North, 90°=East).
Args:
ang (array-like): Array or Series of angles in degrees.
Returns:
np.ndarray: Rotated angles in degrees.
"""
ang = np.fmod(270 - ang + 360, 360)
return ang
[docs]
def uv_to_magnitude_angle(u: pd.Series | np.ndarray, v: pd.Series | np.ndarray, labels: list = ["magnitude", "angle"]):
"""Convert u, v vector components to magnitude and direction.
Transforms Cartesian velocity/wind components (u, v) to polar form
(magnitude, direction) using standard meteorological convention.
Args:
u (pd.Series or np.ndarray): Zonal (east-west) component.
v (pd.Series or np.ndarray): Meridional (north-south) component.
labels (list): Output column names for [magnitude, direction].
Defaults to ["magnitude", "angle"].
Returns:
pd.DataFrame: DataFrame with two columns containing magnitude (sqrt(u²+v²))
and angle in degrees [0, 360).
"""
ang = np.fmod(np.arctan2(v, u) * 180 / np.pi + 360, 360)
data = pd.DataFrame(
np.vstack([np.sqrt(u**2 + v**2), ang]).T, columns=labels, index=ang.index
)
return data
[docs]
def optimize_rbf_epsilon(coords, data, n_train, method="gaussian", smooth=0.5, eps0=1, optimizer="local", metric="rmse"):
"""
Optimize epsilon and smooth parameters for RBF by minimizing validation error (RMSE or MAE).
Allows local (SLSQP) or global (differential_evolution) optimization.
Args:
coords (np.ndarray): Input coordinates (n_samples, n_features).
data (np.ndarray): Target values (n_samples,).
n_train (int): Number of samples for training (rest for validation).
method (str, optional): RBF function type. Default 'gaussian'.
smooth (float, optional): Initial smooth value. Default 0.5.
eps0 (float, optional): Initial epsilon value. Default 1.
optimizer (str, optional): 'local' (SLSQP) or 'global' (differential_evolution).
metric (str, optional): 'rmse' or 'mae'.
Returns:
tuple: (epsilon_opt, smooth_opt)
"""
min_epsilon = 1e-3
max_epsilon = 1e5
min_smooth = 1e-6
max_smooth = 10.0
bounds = [(min_epsilon, max_epsilon), (min_smooth, max_smooth)]
# Selección aleatoria de muestras para ajuste y validación
rng = np.random.default_rng()
indices = np.arange(coords.shape[0])
rng.shuffle(indices)
train_idx = indices[:n_train]
valid_idx = indices[n_train:]
def objective(params):
return rbf_error_metric(params, coords, data, train_idx, valid_idx, method, metric)
try:
if optimizer == "global":
res_ = differential_evolution(objective, bounds, polish=True, tol=1e-5, maxiter=100)
else:
res_ = minimize(
objective,
[eps0, smooth],
bounds=bounds,
method="SLSQP",
options={"ftol": 1e-7, "eps": 1e-4, "maxiter": 1e4},
)
if isinstance(res_["x"], (np.ndarray, list)) and len(res_["x"]) == 2:
epsilon_opt, smooth_opt = float(res_["x"][0]), float(res_["x"][1])
else:
epsilon_opt, smooth_opt = float(res_["x"]), smooth
print(f"[optimize_rbf_epsilon] {optimizer} | metric={metric} | Success: {res_.get('success', False)}, epsilon_opt={epsilon_opt}, smooth_opt={smooth_opt}, fun={res_.get('fun', None)}")
return epsilon_opt, smooth_opt
except Exception as e:
print(f"Warning: optimize_rbf_epsilon failed ({e}), using epsilon=1.0, smooth={smooth}")
return 1.0, smooth
[docs]
def rbf_error_metric(params, coords, data, train_idx, valid_idx, method, metric="rmse"):
"""
Compute the error of an RBF for given epsilon and smooth values.
Args:
params (list): [epsilon, smooth].
coords (np.ndarray): Input coordinates.
data (np.ndarray): Target values.
train_idx (array): Indices for training samples.
valid_idx (array): Indices for validation samples.
method (str): RBF function type.
metric (str): 'rmse' or 'mae'.
Returns:
float: Error value (RMSE or MAE).
"""
epsilon, smooth = params
# Reordenar los conjuntos de entrenamiento y validación
func = Rbf(
*coords[train_idx, :].T, data[train_idx], function=method, smooth=smooth, epsilon=epsilon
)
validation = func(*coords[valid_idx, :].T)
if metric == "mae":
error = np.mean(np.abs(validation - data[valid_idx]))
else:
error = np.sqrt(np.mean((validation - data[valid_idx]) ** 2))
if np.isnan(error).any():
error = 1e10 # Penalización por NaN
return error
[docs]
def outliers_detection(
data, outliers_fraction, method="Local Outlier Factor", scaler_method="MinMaxScaler"
):
"""
Detect outliers in data using various sklearn algorithms.
Args:
data (array-like): Input data (2D array expected).
outliers_fraction (float): Fraction of outliers to detect (contamination parameter).
method (str, optional): Outlier detection method. Options are:
- "Robust covariance": Uses EllipticEnvelope
- "One-Class SVM": Uses OneClassSVM
- "Isolation Forest": Uses IsolationForest
- "Local Outlier Factor": Uses LocalOutlierFactor (default)
scaler_method (str, optional): Scaling method to apply before detection.
If None, no scaling is applied. Defaults to "MinMaxScaler".
Returns:
np.ndarray: Boolean mask indicating outliers (True) and inliers (False).
"""
algorithms = {
"Robust covariance": EllipticEnvelope(contamination=outliers_fraction),
"One-Class SVM": svm.OneClassSVM(
nu=outliers_fraction, kernel="rbf", gamma="scale"
),
"Isolation Forest": IsolationForest(
contamination=outliers_fraction, behaviour="new"
),
"Local Outlier Factor": LocalOutlierFactor(
n_neighbors=25, contamination=outliers_fraction
),
}
# Apply scaling if requested
if scaler_method:
transformed_data, _ = scaler(data, method=scaler_method)
else:
transformed_data = data.copy()
# Select the algorithm and fit/predict
algorithm = algorithms[method]
if method == "Local Outlier Factor":
# LocalOutlierFactor returns -1 for outliers, 1 for inliers
y_pred = algorithm.fit_predict(transformed_data)
else:
# Other methods use fit().predict()
y_pred = algorithm.fit(transformed_data).predict(transformed_data)
# Create boolean mask: True for inliers, False for outliers
inliers = y_pred == 1
return ~inliers # Return True for outliers
[docs]
def scaler(data, method="MinMaxScaler", transform=True, scale=False):
"""
Scale or inverse-scale data using sklearn scalers.
Args:
data (array-like or pd.DataFrame): Data to scale.
method (str, optional): Scaling method. Defaults to "MinMaxScaler".
transform (bool, optional): If True, transform; if False, inverse transform. Defaults to True.
scale (sklearn scaler, optional): Pre-fitted scaler to use. Defaults to False.
Returns:
tuple: (transformed_data, scaler)
"""
algorithms = {
"MinMaxScaler": MinMaxScaler(),
"StandardScaler": StandardScaler(),
"RobustScaler": RobustScaler(),
}
np_array = False
if not isinstance(data, pd.DataFrame):
np_array = True
if transform & (not scale):
scale = algorithms[method].fit(data)
if not transform:
if np_array:
transformed_data = scale.inverse_transform(data)
else:
transformed_data = pd.DataFrame(
scale.inverse_transform(data), index=data.index, columns=data.columns
)
scale = None
else:
if np_array:
transformed_data = scale.transform(data)
else:
transformed_data = pd.DataFrame(
scale.transform(data), index=data.index, columns=data.columns
)
return transformed_data, scale
[docs]
def string_to_function(param: dict, variable: str = None):
"""Convert string function names to scipy.stats function objects.
Replaces string representations of statistical distribution names with
actual scipy.stats function objects in parameter dictionary.
Args:
param (dict): Parameter dictionary containing 'fun' key with function
name strings to convert.
variable (str, optional): Specific variable name to process. If None,
processes all entries in param['fun']. Defaults to None.
Returns:
dict: Updated parameter dictionary with function objects instead of strings.
"""
if variable is None:
for key in param["fun"].keys():
if isinstance(param["fun"][key], str):
if param["fun"][key] == "wrap_norm":
param["fun"][key] = core.wrap_norm()
else:
param["fun"][key] = getattr(st, param["fun"][key])
else:
for key in param[variable]["fun"].keys():
if isinstance(param[variable]["fun"][key], str):
if param[variable]["fun"][key] == "wrap_norm":
param[variable]["fun"][key] = core.wrap_norm()
else:
param[variable]["fun"][key] = getattr(st, param[variable]["fun"][key])
return param
[docs]
def data_over_threshold(data: pd.DataFrame, variable: str, threshold: float, duration: float):
"""Extract extreme events exceeding a threshold for minimum duration.
Identifies continuous periods where variable values exceed a threshold and
persist for at least the specified duration. Counts events per year.
Args:
data (pd.DataFrame): Time series data with datetime index.
variable (str): Column name of the variable to analyze.
threshold (float): Threshold value to identify extreme events.
duration (float): Minimum duration (in time units matching the index)
required to classify as an extreme event.
Returns:
tuple: (events, events_per_year)
- events (pd.DataFrame): Time series of all values during threshold
exceedance events.
- events_per_year (pd.DataFrame): Count of events per year with
'eventno' column.
"""
asc_roots = data.loc[
((data.iloc[:-1].shift(1) < threshold) & (data.iloc[1:] >= threshold)), variable
]
dec_roots = data.loc[
((data.iloc[:-1] >= threshold) & (data.iloc[1:].shift(-1) < threshold)),
variable,
]
if asc_roots.index[0] > dec_roots.index[0]:
dec_roots.drop(index=dec_roots.index[0])
if asc_roots.index[-1] > dec_roots.index[-1]:
asc_roots.drop(index=asc_roots.index[-1])
events = pd.DataFrame(columns=[variable])
eventsno = pd.DataFrame(0, index=asc_roots.index.year.unique(), columns=["eventno"])
for indexdate in range(len(asc_roots)):
aux = pd.DataFrame(
data.loc[asc_roots.index[indexdate] : dec_roots.index[indexdate]],
columns=[variable],
)
if (aux.index[-1] - aux.index[0]).total_seconds() / 3600 >= duration:
events = pd.concat([events, aux])
eventsno.loc[aux.index[0].year] += 1
return events, eventsno
[docs]
def pre_ensemble_plot(models: list, param: dict, variable: str, file_name: str = None):
"""Compute ensemble statistics for multiple probability models across percentiles.
Evaluates multiple probability models at various percentile levels,
computes their mean and standard deviation across a normalized time grid,
and prepares data for ensemble visualization.
Args:
models (list): List of model names to evaluate.
param (dict): Dictionary containing parameters for each model, including:
- 'fun': Probability distribution functions for each variable
- Model-specific parameters for statistical fitting
variable (str): Variable name to analyze (e.g., 'Hs', 'Tp', or direction variables).
file_name (str, optional): Filename for saving results. Defaults to None.
Returns:
dict: Dictionary where keys are percentile strings (e.g., '0.05', '0.5', '0.95')
and values are DataFrames containing:
- 'n': Normalized time coordinate [0, 1]
- 'prob': Probability value for this percentile
- One column per model with computed values
- 'mean': Mean across all models
- 'std': Standard deviation across all models
"""
probs = [0.05, 0.01, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.995]
if variable.lower().startswith("d"):
probs = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
n = np.linspace(0, 1, 24 * 365.25)
ppfs = dict()
for prob in probs:
df = pd.DataFrame(np.ones(len(n)) * prob, index=n, columns=["prob"])
df["n"] = n
ppfs[str(prob)] = df.copy()
for model in models:
param[variable][model] = string_to_function(param[variable][model], None)
res = core.ppf(df.copy(), param[variable][model])
# Transformed timeserie if required
if param[variable][model]["transform"]["make"]:
res[variable] = analysis.inverse_transform(res[[variable]], param[variable][model])
ppfs[str(prob)].loc[:, model] = res[variable]
ppfs[str(prob)]["mean"] = ppfs[str(prob)].loc[:, models].mean(axis=1)
ppfs[str(prob)]["std"] = ppfs[str(prob)].loc[:, models].std(axis=1)
return ppfs
[docs]
def smooth_1d(data: np.ndarray, window_length: int = None, poly_order: int = 3) -> np.ndarray:
"""Apply Savitzky-Golay filter for 1D data smoothing.
Uses the Savitzky-Golay filter to smooth 1D data by fitting successive
sub-sets of adjacent data points with a low-degree polynomial.
Args:
data (np.ndarray): 1D array of data values to smooth.
window_length (int, optional): Length of the filter window (number of coefficients).
Must be a positive odd integer. If None, defaults to len(data)/51.
Defaults to None.
poly_order (int, optional): Order of the polynomial used to fit the samples.
Must be less than window_length. Defaults to 3.
Returns:
np.ndarray: Smoothed data array with the same length as input data.
"""
if window_length is None:
# TODO: Replace 51 with a target value based on analysis
window_length = int(len(data) / 51)
if not window_length % 2:
window_length += 1
sm_data = savgol_filter(data, window_length, poly_order)
return sm_data
[docs]
def find_nearest_point(data, point: tuple):
"""Find the nearest point in a spatial dataset to a given coordinate.
Uses Euclidean distance to identify the closest point in the dataset
to the specified target coordinates.
Args:
data (xr.Dataset or pd.DataFrame): Dataset with 'x' and 'y' coordinates.
point (tuple): Target point coordinates as (x, y).
Returns:
int: Index of the nearest point in the dataset.
"""
idx = np.argmin(np.sqrt((data.x - point[0]) ** 2 + (data.y - point[1]) ** 2))
return idx
[docs]
def date_to_julian(dates: list, calendar: str = "julian") -> np.ndarray:
"""Convert datetime objects to Julian dates.
Transforms a list of datetime objects into Julian date numbers based on
the specified calendar system.
Args:
dates (list): List of datetime objects to convert.
calendar (str, optional): Calendar system to use for conversion.
Currently only 'julian' is supported. Defaults to 'julian'.
Returns:
np.ndarray: Array of Julian date numbers.
"""
if calendar == "julian":
julian_dates = np.zeros(len(dates))
for i, date_ in enumerate(dates):
julian_dates[i] = (
date_.toordinal()
- date(1, 1, 1).toordinal()
+ date_.hour / (24)
+ date_.second / (3600 * 24)
+ 367
)
dates = julian_dates
elif calendar == "gregorian":
if isinstance(dates, pd.DatetimeIndex):
dates = dates.to_julian_date()
elif isinstance(dates, pd.DataFrame):
dates.index = dates.index.to_julian_date()
else:
raise ValueError("DatetimeIndex or DataFrame are required.")
else:
raise ValueError("Calendar can be Julian or Gregorian.")
return dates
[docs]
def mean_dt_param(B, Q):
nmodels = len(B)
norders = np.max([np.shape(Bm) for Bm in B], axis=0)
Bs = np.zeros([norders[0], norders[1], nmodels])
# Qs = np.zeros([norders[0], norders[0], nmodels])
for i in range(nmodels):
Bs[:, : np.shape(B[i])[1], i] = B[i]
B, Q = np.mean(Bs, axis=2), np.mean(Q, axis=0)
return B, Q, int((norders[1] - 1) / norders[0])
[docs]
def rmse(a, b):
"""Calculate Root Mean Square Error between two arrays.
The RMSE measures the square root of the average of squared differences
between predicted and observed values. Lower values indicate better fit.
Args:
a (array-like): First array (e.g., observed values).
b (array-like): Second array (e.g., predicted values).
Returns:
float: The root mean square error value.
"""
len_ = len(a)
value_ = np.sqrt(np.sum((a - b) ** 2) / len_)
return value_
[docs]
def maximum_absolute_error(a, b):
"""Calculate Maximum Absolute Error between two arrays.
The MAE measures the largest absolute difference between predicted
and observed values. Useful for identifying worst-case errors.
Args:
a (array-like): First array (e.g., observed values).
b (array-like): Second array (e.g., predicted values).
Returns:
float: The maximum absolute error value.
"""
len_ = len(a)
value_ = np.max(np.abs(a - b))
return value_
[docs]
def mean_absolute_error(a, b):
"""Calculate Mean Absolute Error between two arrays.
The Mean Absolute Error (MAE) measures the average magnitude of errors
between predicted and observed values without considering their direction.
Args:
a (array-like): First array (e.g., observed values).
b (array-like): Second array (e.g., predicted values).
Returns:
float: The mean absolute error value.
"""
len_ = len(a)
value_ = np.sum(np.abs(a - b)) / len_
return value_
[docs]
def xrnearest(
ds, lat, lon, lat_name=None, lon_name=None, variable_mask=None, time_mask=0
):
"""Find the nearest grid point to specified coordinates in xarray dataset.
Locates the dataset grid point closest to the given latitude and longitude
coordinates, optionally applying a mask to exclude invalid data points.
Args:
ds (xarray.Dataset): Input dataset with coordinate information.
lat (float): Target latitude coordinate.
lon (float): Target longitude coordinate.
lat_name (str, optional): Name of latitude variable. Auto-detected if None.
lon_name (str, optional): Name of longitude variable. Auto-detected if None.
variable_mask (str, optional): Variable name to use for masking NaN values.
time_mask (int): Time index to use for masking. Defaults to 0.
Returns:
xarray.Dataset: Subset of dataset at the nearest grid point.
Raises:
Exception: If coordinate dimensions are not found in dataset.
"""
if not lat_name or not lon_name:
lat_name, lon_name = coords_name(ds)
lats, lons = latslons_values(ds, lat_name, lon_name)
if variable_mask:
mask = np.isnan(ds[variable_mask].isel(time=time_mask).values)
lats = np.ma.masked_array(lats, mask)
lons = np.ma.masked_array(lons, mask)
ilat, ilon = find_indexes(lats, lons, lat, lon)
if "rlat" in ds.dims and "rlon" in ds.dims:
poi = ds.isel(rlat=ilat, rlon=ilon)
elif lat_name in ds.dims and lon_name in ds.dims:
poi = ds.isel(**{lat_name: ilat, lon_name: ilon})
elif not ilat and not ilon:
poi = ds
else:
raise Exception("Dimensions coordinates not found in dataset")
return poi
[docs]
def latslons_values(ds, lat_name, lon_name):
"""Extract latitude and longitude values from dataset.
Retrieves coordinate values, creating 2D meshgrid if coordinates are 1D arrays.
Args:
ds (xarray.Dataset): Input dataset containing coordinate variables.
lat_name (str): Name of latitude coordinate variable.
lon_name (str): Name of longitude coordinate variable.
Returns:
tuple: (lats, lons) - Arrays of latitude and longitude values.
"""
if len(ds[lat_name].dims) == 1 and len(ds[lon_name] == 1):
lats, lons = create_lat_lon_matrix(ds[lat_name].values, ds[lon_name].values)
else:
lats, lons = ds[lat_name].values, ds[lon_name].values
return lats, lons
[docs]
def find_indexes(latvar, lonvar, lat0, lon0):
"""Find array indices of coordinates nearest to target location.
Uses great circle distance (tunnel through Earth) as the distance metric
for finding the closest grid point.
Args:
latvar (np.ndarray): Array of latitude values (degrees).
lonvar (np.ndarray): Array of longitude values (degrees).
lat0 (float): Target latitude (degrees).
lon0 (float): Target longitude (degrees).
Returns:
tuple: (lat_index, lon_index) - Array indices of nearest point, or (None, None)
if arrays are empty.
References:
https://www.unidata.ucar.edu/blogs/developer/en/entry/accessing_netcdf_data_by_coordinates
"""
indexes = None, None
if latvar.size > 1 and lonvar.size > 1:
rad_factor = np.pi / 180.0
latvals = latvar * rad_factor
lonvals = lonvar * rad_factor
lat_rad = lat0 * rad_factor
lon_rad = lon0 * rad_factor
clat, clon = np.cos(latvals), np.cos(lonvals)
slat, slon = np.sin(latvals), np.sin(lonvals)
delX = np.cos(lat_rad) * np.cos(lon_rad) - clat * clon
delY = np.cos(lat_rad) * np.sin(lon_rad) - clat * slon
delZ = np.sin(lat_rad) - slat
dist_sq = delX**2 + delY**2 + delZ**2
minindex_1d = dist_sq.argmin()
indexes = np.unravel_index(minindex_1d, latvals.shape)
return indexes
[docs]
def create_lat_lon_matrix(lat, lon):
"""Create 2D coordinate meshgrid from 1D coordinate vectors.
Converts separate latitude and longitude vectors into 2D coordinate matrices
suitable for spatial operations and interpolation.
Args:
lat (np.ndarray): 1D array of latitude values.
lon (np.ndarray): 1D array of longitude values.
Returns:
tuple: (lat_matrix, lon_matrix) - 2D arrays of shape (len(lat), len(lon)).
"""
lat_matrix = np.tile(lat, (lon.shape[0], 1)).T
lon_matrix = np.tile(lon, (lat.shape[0], 1))
return lat_matrix, lon_matrix
[docs]
def coords_name(ds):
"""Detect standard coordinate variable names in dataset.
Identifies latitude and longitude coordinate names from common conventions.
Args:
ds (xarray.Dataset): Input dataset to inspect.
Returns:
tuple: (lat_name, lon_name) - Names of latitude and longitude coordinates.
Raises:
Exception: If standard coordinate names are not found in dataset.
"""
if "lat" in ds.coords and "lon" in ds.coords:
lat_name = "lat"
lon_name = "lon"
elif "latitude" in ds.coords and "longitude" in ds.coords:
lat_name = "latitude"
lon_name = "longitude"
else:
raise Exception("Latitudes and longitudes not found in dataset")
return lat_name, lon_name