"""
Threshold-Based Spatial Indicators Module
==========================================
This module provides functions for computing threshold-based indicators used in
spatiotemporal analysis, particularly for assessing exceedance patterns and spatial
coverage relative to environmental thresholds.
These indicators are commonly used in:
- Flood risk assessment
- Pollution exposure analysis
- Habitat suitability mapping
- Climate impact studies
References
----------
.. [1] Bogardi, I., & Duckstein, L. (1993). The fuzzy logic paradigm of risk analysis.
Risk Analysis in Water Resources Engineering, 12(1), 1-12.
"""
import numpy as np
from skimage import measure
from scipy.ndimage import uniform_filter
from environmentaltools.spatiotemporal import utils
[docs]
def fractional_exceedance_area(data, thresholds=None):
"""
Compute fractional area exceeding threshold values.
Calculates the proportion of the spatial domain where values exceed each
specified threshold. This indicator (RAEH - Ratio of Area Exceeding thresHold)
is useful for assessing the spatial extent of threshold exceedances.
Parameters
----------
data : array_like
1D array of spatial data values to analyze.
thresholds : array_like, optional
Array of threshold values to evaluate. If None, generates 100 equally
spaced thresholds from 0 to the maximum data value.
Returns
-------
thresholds : np.ndarray
Array of threshold values used in the analysis.
exceedance_fractions : np.ndarray
Fraction of area exceeding each threshold, ranging from 0 to 1.
Notes
-----
The fractional exceedance area is computed as:
.. math::
RAEH(t) = \\frac{1}{N} \\sum_{i=1}^{N} \\mathbb{1}(x_i \\geq t)
where :math:`N` is the total number of spatial points, :math:`x_i` are the
data values, :math:`t` is the threshold, and :math:`\\mathbb{1}` is the
indicator function.
Examples
--------
>>> import numpy as np
>>> data = np.random.normal(10, 2, 1000)
>>> thresholds, fractions = fractional_exceedance_area(data)
>>> # fractions[0] will be close to 1.0 (most area exceeds low threshold)
>>> # fractions[-1] will be close to 0.0 (little area exceeds high threshold)
"""
data = np.asarray(data)
n_points = len(data)
if thresholds is None:
thresholds = np.linspace(0, np.max(data), 100)
else:
thresholds = np.asarray(thresholds)
exceedance_fractions = np.zeros(len(thresholds))
for i, threshold in enumerate(thresholds):
exceedance_fractions[i] = np.sum(data >= threshold) / n_points
return thresholds, exceedance_fractions
[docs]
def mean_exceedance_over_total_area(data, thresholds=None):
"""
Compute mean value of exceedances normalized by total area.
Calculates the sum of values exceeding each threshold, normalized by the
total number of spatial points. This indicator (MEW - Mean Exceedance over
Whole domain) provides a measure of exceedance intensity averaged over the
entire spatial domain.
Parameters
----------
data : array_like
1D array of spatial data values to analyze.
thresholds : array_like, optional
Array of threshold values to evaluate. If None, generates 100 equally
spaced thresholds from 0 to the maximum data value.
Returns
-------
thresholds : np.ndarray
Array of threshold values used in the analysis.
mean_exceedances : np.ndarray
Mean exceedance values normalized by total area.
Notes
-----
The mean exceedance over total area is computed as:
.. math::
MEW(t) = \\frac{1}{N} \\sum_{i=1}^{N} x_i \\cdot \\mathbb{1}(x_i \\geq t)
This indicator represents the spatial average of values that exceed the
threshold, with non-exceedance points contributing zero.
Examples
--------
>>> import numpy as np
>>> data = np.random.exponential(5, 1000)
>>> thresholds, mean_exc = mean_exceedance_over_total_area(data)
"""
data = np.asarray(data)
n_points = len(data)
if thresholds is None:
thresholds = np.linspace(0, np.max(data), 100)
else:
thresholds = np.asarray(thresholds)
mean_exceedances = np.zeros(len(thresholds))
for i, threshold in enumerate(thresholds):
mean_exceedances[i] = np.sum(data[data >= threshold]) / n_points
return thresholds, mean_exceedances
[docs]
def mean_excess_over_total_area(data, thresholds=None):
"""
Compute mean excess (difference from threshold) normalized by total area.
Calculates the average amount by which values exceed each threshold,
normalized by the total number of spatial points. This indicator (MEDW -
Mean Excess Difference over Whole domain) measures the average magnitude
of exceedances over the entire domain.
Parameters
----------
data : array_like
1D array of spatial data values to analyze.
thresholds : array_like, optional
Array of threshold values to evaluate. If None, generates 100 equally
spaced thresholds from 0 to the maximum data value.
Returns
-------
thresholds : np.ndarray
Array of threshold values used in the analysis.
mean_excess : np.ndarray
Mean excess values (difference from threshold) normalized by total area.
Notes
-----
The mean excess over total area is computed as:
.. math::
MEDW(t) = \\frac{1}{N} \\sum_{i=1}^{N} (x_i - t) \\cdot \\mathbb{1}(x_i \\geq t)
This provides a measure of how much, on average across the entire domain,
values exceed the threshold.
Examples
--------
>>> import numpy as np
>>> data = np.random.gamma(2, 2, 1000)
>>> thresholds, excess = mean_excess_over_total_area(data)
"""
data = np.asarray(data)
n_points = len(data)
if thresholds is None:
thresholds = np.linspace(0, np.max(data), 100)
else:
thresholds = np.asarray(thresholds)
mean_excess = np.zeros(len(thresholds))
for i, threshold in enumerate(thresholds):
mean_excess[i] = np.sum(data[data >= threshold] - threshold) / n_points
return thresholds, mean_excess
[docs]
def mean_exceedance_over_exceedance_area(data, thresholds=None):
"""
Compute mean value of exceedances normalized by exceedance area only.
Calculates the average of values that exceed each threshold, considering
only the spatial points where exceedance occurs. This indicator (WMEW -
Weighted Mean Exceedance over exceedance area) provides the conditional
mean given that the threshold is exceeded.
Parameters
----------
data : array_like
1D array of spatial data values to analyze.
thresholds : array_like, optional
Array of threshold values to evaluate. If None, generates 100 equally
spaced thresholds from 0 to the maximum data value.
Returns
-------
thresholds : np.ndarray
Array of threshold values used in the analysis.
conditional_means : np.ndarray
Mean values conditional on exceeding each threshold.
Notes
-----
The mean exceedance over exceedance area is computed as:
.. math::
WMEW(t) = \\frac{\\sum_{i: x_i \\geq t} x_i}{\\sum_{i=1}^{N} \\mathbb{1}(x_i \\geq t)}
This represents the expected value conditional on exceeding the threshold:
:math:`E[X | X \\geq t]`.
Examples
--------
>>> import numpy as np
>>> data = np.random.lognormal(1, 0.5, 1000)
>>> thresholds, cond_means = mean_exceedance_over_exceedance_area(data)
"""
data = np.asarray(data)
if thresholds is None:
thresholds = np.linspace(0, np.max(data), 100)
else:
thresholds = np.asarray(thresholds)
conditional_means = np.zeros(len(thresholds))
for i, threshold in enumerate(thresholds):
exceeding_values = data[data >= threshold]
if len(exceeding_values) > 0:
conditional_means[i] = np.mean(exceeding_values)
else:
conditional_means[i] = np.nan
return thresholds, conditional_means
[docs]
def mean_excess_over_exceedance_area(data, thresholds=None):
"""
Compute mean excess (difference from threshold) over exceedance area only.
Calculates the average amount by which values exceed each threshold,
considering only the spatial points where exceedance occurs. This indicator
(WMDW - Weighted Mean excess Difference over exceedance area) provides the
conditional mean excess given that the threshold is exceeded.
Parameters
----------
data : array_like
1D array of spatial data values to analyze.
thresholds : array_like, optional
Array of threshold values to evaluate. If None, generates 100 equally
spaced thresholds from 0 to the maximum data value.
Returns
-------
thresholds : np.ndarray
Array of threshold values used in the analysis.
conditional_excess : np.ndarray
Mean excess values conditional on exceeding each threshold.
Notes
-----
The mean excess over exceedance area is computed as:
.. math::
WMDW(t) = \\frac{\\sum_{i: x_i \\geq t} (x_i - t)}{\\sum_{i=1}^{N} \\mathbb{1}(x_i \\geq t)}
This represents the expected excess conditional on exceeding the threshold:
:math:`E[X - t | X \\geq t]`, also known as the mean excess function in
extreme value theory.
Examples
--------
>>> import numpy as np
>>> data = np.random.pareto(2, 1000)
>>> thresholds, cond_excess = mean_excess_over_exceedance_area(data)
"""
data = np.asarray(data)
if thresholds is None:
thresholds = np.linspace(0, np.max(data), 100)
else:
thresholds = np.asarray(thresholds)
conditional_excess = np.zeros(len(thresholds))
for i, threshold in enumerate(thresholds):
exceeding_values = data[data >= threshold]
if len(exceeding_values) > 0:
conditional_excess[i] = np.mean(exceeding_values - threshold)
else:
conditional_excess[i] = np.nan
return thresholds, conditional_excess
[docs]
def exceedance_to_nonexceedance_ratio(data, thresholds=None):
"""
Compute ratio of exceedance area to non-exceedance area.
Calculates the ratio between the fraction of area exceeding each threshold
and the fraction not exceeding it. This indicator (AEAN - Area Exceeding to
Area Non-exceeding) becomes large when exceedances are prevalent and
approaches zero when exceedances are rare.
Parameters
----------
data : array_like
1D array of spatial data values to analyze.
thresholds : array_like, optional
Array of threshold values to evaluate. If None, generates 100 equally
spaced thresholds from 0 to the maximum data value.
Returns
-------
thresholds : np.ndarray
Array of threshold values used in the analysis.
area_ratios : np.ndarray
Ratio of exceedance area to non-exceedance area for each threshold.
Notes
-----
The exceedance to non-exceedance ratio is computed as:
.. math::
AEAN(t) = \\frac{RAEH(t)}{1 - RAEH(t)} = \\frac{N_e}{N - N_e}
where :math:`N_e` is the number of points exceeding the threshold and
:math:`N` is the total number of points.
The ratio approaches infinity as the exceedance fraction approaches 1, and
equals 0 when no points exceed the threshold.
Examples
--------
>>> import numpy as np
>>> data = np.random.beta(2, 5, 1000) * 10
>>> thresholds, ratios = exceedance_to_nonexceedance_ratio(data)
Warnings
--------
Returns infinity for thresholds where all points exceed (100% exceedance).
"""
data = np.asarray(data)
n_points = len(data)
if thresholds is None:
thresholds = np.linspace(0, np.max(data), 100)
else:
thresholds = np.asarray(thresholds)
area_ratios = np.zeros(len(thresholds))
for i, threshold in enumerate(thresholds):
exceedance_fraction = np.sum(data >= threshold) / n_points
if exceedance_fraction < 1.0:
area_ratios[i] = exceedance_fraction / (1.0 - exceedance_fraction)
else:
area_ratios[i] = np.inf
return thresholds, area_ratios
# def compute_all_indicators_and_plot(moments):
# """
# Compute all threshold-based indicators for a single point and create plots.
# Convenience function that computes all five main threshold-based indicators
# (RAEH, MEW, MEDW, WMEW, WMDW) from BME moment data and generates comparative
# visualization plots.
# Parameters
# ----------
# moments : np.ndarray
# 2D array with shape (n_points, n_columns) where the second column
# (moments[:, 1]) contains the values to analyze. Typically output from
# BME estimation at a single spatial location across multiple time points
# or ensemble members.
# Returns
# -------
# None
# Generates and displays plots using the graphics module.
# Notes
# -----
# This function extracts the mean values (second column) from the moments array
# and computes:
# - **RAEH**: Fractional area exceeding threshold
# - **MEW**: Mean exceedance over whole domain
# - **MEDW**: Mean excess difference over whole domain
# - **WMEW**: Mean exceedance over exceedance area
# - **WMDW**: Mean excess difference over exceedance area
# The results are visualized using the spatiotemporal graphics module.
# Examples
# --------
# >>> import numpy as np
# >>> from environmentaltools.spatiotemporal import compute_all_indicators_and_plot
# >>>
# >>> # Simulate BME moments for a single point
# >>> moments = np.random.normal(5, 1.5, (1000, 3))
# >>> compute_all_indicators_and_plot(moments)
# """
# # Define indicator labels
# labels = ["RAEH", "MEW", "MEDW", "WMEW", "WMDW"]
# # Initialize storage for thresholds and indicator values
# thresholds = [None] * len(labels)
# indicator_values = [None] * len(labels)
# # Extract mean values (second column) from moments
# data = moments[:, 1]
# # Compute all indicators
# thresholds[0], indicator_values[0] = fractional_exceedance_area(data)
# thresholds[1], indicator_values[1] = mean_exceedance_over_total_area(data)
# thresholds[2], indicator_values[2] = mean_excess_over_total_area(data)
# thresholds[3], indicator_values[3] = mean_exceedance_over_exceedance_area(data)
# thresholds[4], indicator_values[4] = mean_excess_over_exceedance_area(data)
# # Create visualization
# figures.indicators(thresholds, indicator_values, labels)
# return
[docs]
def mean_presence_boundary(data_cube, threshold=None):
"""
Calculate spatial boundary where temporal mean exceeds a presence threshold.
Computes the contour lines that define areas where the temporal average
of values exceeds a specified threshold. This is useful for identifying
persistent zones of influence or presence in spatiotemporal data.
Parameters
----------
data_cube : np.ndarray, xarray.DataArray, or xarray.Dataset
3D array with shape (time, lat, lon) containing spatiotemporal data.
If Dataset, uses the first data variable.
threshold : float, optional
Presence threshold. If None, uses the global mean of the temporal
average map.
Returns
-------
contours : list
List of contour coordinates defining the presence boundary.
mean_map : np.ndarray
2D map of temporal means for each spatial location.
Notes
-----
The function computes temporal means for each spatial cell, then identifies
contours where these means exceed the specified threshold. The contours
represent boundaries between areas of high and low temporal presence.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.random((365, 50, 50)) # Daily data for 1 year
>>> contours, mean_map = mean_presence_boundary(data_cube, threshold=0.6)
"""
# Handle xarray Dataset/DataArray
if hasattr(data_cube, 'values'):
# xarray DataArray or Dataset
if hasattr(data_cube, 'data_vars'):
# It's a Dataset, get first variable
var_name = list(data_cube.data_vars.keys())[0]
data_array = data_cube[var_name]
else:
# It's already a DataArray
data_array = data_cube
# Compute mean using xarray's dim parameter
mean_map = data_array.mean(dim='time').values
else:
# It's a numpy array
mean_map = np.mean(data_cube, axis=0)
# Presence threshold
if threshold is None:
threshold = np.mean(mean_map)
# Binary: presence where threshold is exceeded
presence_mask = mean_map >= threshold
# Extract contours
contours = measure.find_contours(presence_mask.astype(float), level=0.5)
return contours, mean_map
[docs]
def maximum_influence_extent(data_cube, percentile=95):
"""
Calculate maximum spatial extent under extreme conditions.
Determines the spatial boundary encompassing areas that experience
extreme values (defined by a percentile threshold) over the time period.
This is useful for assessing maximum impact zones or worst-case scenarios.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
percentile : float, optional
Percentile to define extreme condition (e.g., 95 for 95th percentile).
Default is 95.
Returns
-------
contours : list
List of contour coordinates defining the maximum influence extent.
extreme_map : np.ndarray
2D map of extreme values (specified percentile) for each spatial cell.
Notes
-----
The function computes the specified percentile value for each spatial
location across time, then identifies contours around areas where these
extreme values exceed the mean extreme value.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.gamma(2, 1, (100, 30, 30)) # Gamma-distributed data
>>> contours, extreme_map = maximum_influence_extent(data_cube, percentile=90)
"""
# Compute extreme value per cell
extreme_map = np.percentile(data_cube, percentile, axis=0)
# Define presence threshold (can be dynamic or fixed)
threshold = np.mean(extreme_map) # or use a physical value
# Create binary mask
mask = extreme_map >= threshold
# Extract contours
contours = measure.find_contours(mask.astype(float), level=0.5)
return contours, extreme_map
[docs]
def threshold_exceedance_frequency(data_cube, threshold):
"""
Calculate frequency of threshold exceedance for each spatial cell.
Computes how many times each spatial location exceeds a given threshold
across the time dimension. This provides a measure of temporal persistence
of exceedance conditions.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
threshold : float or np.ndarray
Fixed threshold value or 2D array of spatially-varying thresholds
with shape (lat, lon).
Returns
-------
freq_map : np.ndarray
2D map showing number of times threshold is exceeded at each location.
Notes
-----
For a fixed threshold, exceedance is computed as data_cube > threshold.
For spatially-varying thresholds, each spatial cell is compared against
its corresponding threshold value.
The frequency map provides insight into which areas are most prone to
threshold exceedances over time.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.exponential(2, (100, 20, 20))
>>> freq_map = threshold_exceedance_frequency(data_cube, threshold=3.0)
>>> print(f"Max exceedances: {freq_map.max()}")
"""
if isinstance(threshold, (int, float)):
exceedances = data_cube > threshold
else:
# Variable threshold per cell
exceedances = data_cube > threshold[np.newaxis, :, :]
freq_map = np.sum(exceedances, axis=0)
return freq_map
[docs]
def permanently_affected_zone(data_cube, threshold, persistence_ratio=0.8):
"""
Calculate permanently affected zones based on threshold exceedance persistence.
Identifies spatial areas where a variable exceeds a threshold for a specified
proportion of the time period. This is useful for identifying zones of
chronic impact or persistent environmental stress.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
threshold : float or np.ndarray
Fixed threshold value or 2D array of spatially-varying thresholds
with shape (lat, lon).
persistence_ratio : float, optional
Minimum proportion of time for a cell to be considered permanently
affected (e.g., 0.8 means 80% of the time). Default is 0.8.
Returns
-------
mask : np.ndarray
Binary map of permanently affected zones (True = permanently affected).
freq_map : np.ndarray
Map of exceedance frequency (0-1) for each spatial location.
Notes
-----
A location is considered permanently affected if its exceedance frequency
exceeds the persistence_ratio threshold. This helps distinguish between
areas with occasional versus persistent exceedances.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.lognormal(0, 1, (365, 25, 25)) # Daily data
>>> mask, freq = permanently_affected_zone(data_cube, threshold=2.0, persistence_ratio=0.75)
>>> print(f"Permanently affected area: {mask.sum()} cells")
"""
if isinstance(threshold, (int, float)):
exceedances = data_cube > threshold
else:
exceedances = data_cube > threshold[np.newaxis, :, :]
freq_map = np.sum(exceedances, axis=0) / data_cube.shape[0]
mask = freq_map >= persistence_ratio
return mask, freq_map
[docs]
def mean_representative_value(data_cube, time_window=None):
"""
Calculate representative mean value for each spatial cell.
Computes the temporal mean for each spatial location, optionally within
a specified time window. This provides a representative value that
summarizes the typical condition at each location.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
time_window : tuple of int, optional
Time indices to use (start, end). If None, uses the entire time period.
Returns
-------
mean_map : np.ndarray
2D map of representative mean values for each spatial cell.
Notes
-----
When time_window is specified, only the data within that temporal subset
is used for computing means. This is useful for analyzing seasonal patterns
or specific time periods of interest.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.normal(10, 2, (365, 30, 30)) # Daily data
>>> # Full year average
>>> mean_all = mean_representative_value(data_cube)
>>> # Summer months (June-August, assuming daily data starting Jan 1)
>>> mean_summer = mean_representative_value(data_cube, time_window=(150, 243))
"""
if time_window:
start, end = time_window
data_slice = data_cube[start:end]
else:
data_slice = data_cube
mean_map = np.mean(data_slice, axis=0)
return mean_map
from scipy.stats import genextreme
[docs]
def return_period_extreme_value(data_series, return_period):
"""
Fit Generalized Extreme Value distribution and calculate return period value.
Fits a GEV (Generalized Extreme Value) distribution to extreme value data
and computes the value associated with a specified return period. This is
commonly used in environmental risk assessment and engineering design.
Parameters
----------
data_series : np.ndarray
1D array of extreme values (e.g., annual maxima).
return_period : int
Return period in years (e.g., 100 for 100-year return period).
Returns
-------
x_T : float
Extreme value associated with the specified return period.
params : tuple
Fitted GEV parameters (shape, location, scale).
Notes
-----
The return period value is calculated as:
.. math::
x_T = F^{-1}\\left(1 - \\frac{1}{T}\\right)
where :math:`F^{-1}` is the inverse CDF of the fitted GEV distribution
and :math:`T` is the return period.
The GEV distribution is commonly used for modeling extreme values in
environmental applications such as flood analysis, wind speed extremes,
and temperature extremes.
Examples
--------
>>> import numpy as np
>>> annual_maxima = np.random.gumbel(10, 2, 50) # 50 years of data
>>> x_100, params = return_period_extreme_value(annual_maxima, 100)
>>> print(f"100-year return value: {x_100:.2f}")
"""
shape, loc, scale = genextreme.fit(data_series)
prob = 1 - 1 / return_period
x_T = genextreme.ppf(prob, shape, loc=loc, scale=scale)
return x_T, (shape, loc, scale)
[docs]
def spatial_change_rate(data_cube, dx=1, dy=1):
"""
Calculate mean spatial change rate for each cell over time.
Computes the temporal average of spatial gradient magnitudes for each
location. This measures how rapidly values change spatially and how
this spatial variability evolves over time.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
dx, dy : float, optional
Spatial resolution in X and Y directions. Default is 1.
Returns
-------
rate_map : np.ndarray
2D map of mean spatial change rates.
Notes
-----
For each time step, the function computes spatial gradients using finite
differences, then calculates the gradient magnitude. The final result is
the temporal average of these magnitudes.
High values indicate areas with consistently high spatial variability,
while low values suggest spatially smooth regions.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.random((50, 20, 20))
>>> rate_map = spatial_change_rate(data_cube, dx=0.1, dy=0.1)
"""
gradients = []
for t in range(data_cube.shape[0]):
grad_x, grad_y = utils.spatial_gradient(data_cube[t], dx=dx, dy=dy)
magnitude = np.sqrt(grad_x**2 + grad_y**2)
gradients.append(magnitude)
rate_map = np.mean(gradients, axis=0)
return rate_map
[docs]
def functional_area_loss(data_cube, threshold, t_start, t_end):
"""
Calculate functional area loss between two time points.
Compares functional areas (defined by threshold exceedance) between two
time points to quantify spatial losses. This is useful for assessing
habitat loss, service area degradation, or similar applications.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
threshold : float
Threshold defining functional status.
t_start, t_end : int
Time indices to compare.
Returns
-------
loss_map : np.ndarray
Binary map of loss areas (1 = loss occurred, 0 = no loss).
area_start, area_end : int
Number of functional cells at start and end times.
Notes
-----
A cell experiences functional loss if it was above threshold at t_start
but below threshold at t_end. The analysis identifies which specific
areas have lost functionality between the two time points.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.random((100, 25, 25))
>>> loss_map, area_start, area_end = functional_area_loss(data_cube, 0.5, 10, 90)
>>> print(f"Area lost: {loss_map.sum()} cells")
"""
mask_start = data_cube[t_start] >= threshold
mask_end = data_cube[t_end] >= threshold
loss_map = np.logical_and(mask_start, ~mask_end)
area_start = np.sum(mask_start)
area_end = np.sum(mask_end)
return loss_map.astype(int), area_start, area_end
from skimage import measure
[docs]
def critical_boundary_retreat(data_cube, threshold, t_start, t_end):
"""
Calculate critical boundary retreat between two time points.
Analyzes how the boundary of critical areas (defined by threshold exceedance)
changes between two time points. This is useful for studying phenomena like
shoreline retreat, vegetation boundary shifts, or pollution extent changes.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
threshold : float
Critical threshold defining the boundary.
t_start, t_end : int
Time indices to compare.
Returns
-------
contours_start, contours_end : list
Contour coordinates at start and end times.
retreat_mask : np.ndarray
Binary map showing retreat areas (1 = retreat occurred).
Notes
-----
The function identifies contours of the critical threshold at both time
points and computes areas that were above threshold initially but fell
below threshold later, indicating boundary retreat.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.exponential(1, (50, 30, 30))
>>> contours_start, contours_end, retreat = critical_boundary_retreat(
... data_cube, threshold=1.5, t_start=5, t_end=45)
"""
mask_start = data_cube[t_start] >= threshold
mask_end = data_cube[t_end] >= threshold
contours_start = measure.find_contours(mask_start.astype(float), level=0.5)
contours_end = measure.find_contours(mask_end.astype(float), level=0.5)
retreat_mask = np.logical_and(mask_start, ~mask_end)
return contours_start, contours_end, retreat_mask.astype(int)
[docs]
def neighbourhood_mean(data_cube, size=3):
"""
Calculate neighborhood mean value for each cell.
Applies a spatial moving average filter to compute the mean value within
a specified neighborhood around each cell. This is useful for spatial
smoothing and analyzing local context.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
size : int, optional
Neighborhood size (e.g., 3 for 3x3 neighborhood). Default is 3.
Returns
-------
cube_filtered : np.ndarray
3D array with neighborhood means, same shape as input.
Notes
-----
The function applies a uniform filter that computes the mean value within
a square neighborhood of the specified size around each cell. Edge effects
are handled using reflection mode.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.random((10, 20, 20))
>>> smoothed = neighbourhood_mean(data_cube, size=5)
"""
cube_filtered = uniform_filter(data_cube, size=(1, size, size), mode='reflect')
return cube_filtered
[docs]
def neighbourhood_gradient_influence(data_cube, size=3):
"""
Calculate gradient magnitude between each cell and its neighborhood.
Computes the absolute difference between each cell's value and its
neighborhood mean. This measures how much each cell deviates from its
local spatial context.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
size : int, optional
Neighborhood size. Default is 3.
Returns
-------
influence_map : np.ndarray
2D map of average gradient influence.
Notes
-----
High values indicate cells that consistently differ from their neighbors,
suggesting local anomalies or gradient boundaries. Low values indicate
cells that are well-integrated with their spatial context.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.random((20, 15, 15))
>>> influence_map = neighbourhood_gradient_influence(data_cube, size=5)
"""
cube_filtered = neighbourhood_mean(data_cube, size=size)
diff = data_cube - cube_filtered
influence_map = np.mean(np.abs(diff), axis=0)
return influence_map
[docs]
def environmental_convergence(data_cube, size=3):
"""
Calculate environmental convergence as reduction of neighborhood differences over time.
Measures how the difference between each cell and its neighborhood changes
over time. Negative values indicate convergence (becoming more similar to
neighbors), while positive values indicate divergence.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
size : int, optional
Neighborhood size. Default is 3.
Returns
-------
convergence_map : np.ndarray
2D map of convergence trends (negative = convergence, positive = divergence).
Notes
-----
The function computes the temporal trend of absolute differences between
each cell and its neighborhood mean. Areas with negative trends are
becoming more spatially homogeneous over time.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.random((50, 20, 20))
>>> convergence_map = environmental_convergence(data_cube, size=3)
"""
cube_filtered = neighbourhood_mean(data_cube, size=size)
diff = np.abs(data_cube - cube_filtered)
trend = np.polyfit(np.arange(data_cube.shape[0]), diff.reshape(data_cube.shape[0], -1), deg=1)[0]
convergence_map = trend.reshape(data_cube.shape[1], data_cube.shape[2])
return convergence_map
[docs]
def neighbourhood_polarization(data_cube, size=3):
"""
Calculate polarization as local standard deviation.
Computes the temporal standard deviation of differences between each cell
and its neighborhood mean. This measures local variability and identifies
areas of high temporal polarization.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
size : int, optional
Neighborhood size. Default is 3.
Returns
-------
polarization_map : np.ndarray
2D map of average polarization.
Notes
-----
High polarization values indicate areas where the relationship between
a cell and its neighbors varies significantly over time. Low values
suggest stable neighborhood relationships.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.random((30, 25, 25))
>>> polarization_map = neighbourhood_polarization(data_cube, size=3)
"""
cube_filtered = neighbourhood_mean(data_cube, size=size)
polarization = np.std(data_cube - cube_filtered, axis=0)
return polarization
[docs]
def local_persistence(data_cube, size=3):
"""
Calculate local persistence as proportion of time cell exceeds its neighborhood.
Computes the fraction of time that each cell has values higher than its
neighborhood mean. This measures local dominance and persistence patterns.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
size : int, optional
Neighborhood size. Default is 3.
Returns
-------
persistence_map : np.ndarray
2D map of persistence values (0-1 scale).
Notes
-----
Values near 1 indicate cells that consistently exceed their neighborhood
(local maxima or persistent hotspots). Values near 0 indicate cells that
consistently fall below their neighborhood (local minima or cold spots).
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.exponential(1, (40, 20, 20))
>>> persistence_map = local_persistence(data_cube, size=5)
"""
cube_filtered = neighbourhood_mean(data_cube, size=size)
dominance = data_cube > cube_filtered
persistence_map = np.mean(dominance, axis=0)
return persistence_map
[docs]
def environmental_risk(data_cube, threshold, size=3):
"""
Calculate environmental risk as frequency of extreme values combined with polarization.
Combines threshold exceedance frequency with neighborhood polarization to
assess environmental risk. Areas with both high exceedance frequency and
high polarization are considered highest risk.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
threshold : float
Threshold defining extreme/hazardous conditions.
size : int, optional
Neighborhood size for polarization calculation. Default is 3.
Returns
-------
risk_map : np.ndarray
2D map of environmental risk.
Notes
-----
The risk is computed as the product of exceedance frequency and
neighborhood polarization. This combines information about both the
likelihood of extreme events and the spatial variability of conditions.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.gamma(2, 1, (100, 30, 30))
>>> risk_map = environmental_risk(data_cube, threshold=3.0, size=3)
"""
exceedance = data_cube > threshold
freq = np.mean(exceedance, axis=0)
polarization = neighbourhood_polarization(data_cube, size=size)
risk_map = freq * polarization
return risk_map
[docs]
def directional_influence(data_cube, dx=1, dy=1):
"""
Calculate mean direction of spatial gradient for each cell.
Computes the temporal average direction and magnitude of spatial gradients.
This identifies the predominant direction of spatial influence and the
strength of directional patterns.
Parameters
----------
data_cube : np.ndarray
3D array with shape (time, lat, lon) containing spatiotemporal data.
dx, dy : float, optional
Spatial resolution in X and Y directions. Default is 1.
Returns
-------
angle_map : np.ndarray
2D map of mean gradient directions in radians.
magnitude_map : np.ndarray
2D map of mean gradient magnitudes.
Notes
-----
The function computes spatial gradients for each time step, then averages
both the direction (angle) and magnitude across time. This reveals
persistent spatial patterns and directional influences.
Examples
--------
>>> import numpy as np
>>> data_cube = np.random.random((50, 25, 25))
>>> angles, magnitudes = directional_influence(data_cube, dx=0.1, dy=0.1)
"""
grad_x_list = []
grad_y_list = []
for t in range(data_cube.shape[0]):
grad_x, grad_y = utils.spatial_gradient(data_cube[t], dx=dx, dy=dy)
grad_x_list.append(grad_x)
grad_y_list.append(grad_y)
mean_grad_x = np.mean(grad_x_list, axis=0)
mean_grad_y = np.mean(grad_y_list, axis=0)
angle_map = np.arctan2(mean_grad_y, mean_grad_x)
magnitude_map = np.sqrt(mean_grad_x**2 + mean_grad_y**2)
return angle_map, magnitude_map
[docs]
def multivariate_neighbourhood_synergy(cube_list, size=3):
"""
Calculate neighborhood synergy between multiple variables.
Computes the synergy between multiple environmental variables within
spatial neighborhoods. High synergy indicates variables that co-vary
coherently in space, while low synergy suggests independent patterns.
Parameters
----------
cube_list : list of np.ndarray
List of 3D arrays (time, lat, lon), one per variable.
size : int, optional
Neighborhood size. Default is 3.
Returns
-------
synergy_map : np.ndarray
2D map of neighborhood synergy (0-1 scale, higher = more synergistic).
Notes
-----
Synergy is computed as the inverse of the coefficient of variation across
variables within each neighborhood. High synergy means variables have
similar relative patterns within neighborhoods.
Examples
--------
>>> import numpy as np
>>> cube1 = np.random.random((20, 15, 15))
>>> cube2 = np.random.random((20, 15, 15))
>>> synergy = multivariate_neighbourhood_synergy([cube1, cube2], size=3)
"""
neigh_cubes = [neighbourhood_mean(cube, size=size) for cube in cube_list]
mean_maps = [np.mean(cube, axis=0) for cube in neigh_cubes]
stacked = np.stack(mean_maps, axis=-1) # (lat, lon, variables)
synergy_map = np.zeros(stacked.shape[:2])
for i in range(stacked.shape[0]):
for j in range(stacked.shape[1]):
vec = stacked[i, j, :]
synergy_map[i, j] = np.std(vec) / (np.mean(vec) + 1e-6) # inverse coefficient of variation
return 1 - synergy_map # higher synergy = lower dispersion
[docs]
def spatiotemporal_coupling(cube_x, cube_y):
"""
Calculate spatiotemporal coupling between two variables.
Computes the temporal correlation between two variables for each spatial
location. This identifies areas where variables are strongly coupled
versus areas where they vary independently.
Parameters
----------
cube_x, cube_y : np.ndarray
3D arrays with shape (time, lat, lon) for the two variables.
Returns
-------
coupling_map : np.ndarray
2D map of temporal correlation coefficients (-1 to 1).
Notes
-----
Correlation is computed independently for each spatial location across
the time dimension. Values near 1 indicate strong positive coupling,
values near -1 indicate strong negative coupling, and values near 0
indicate weak or no coupling.
Examples
--------
>>> import numpy as np
>>> cube_x = np.random.random((100, 20, 20))
>>> cube_y = np.random.random((100, 20, 20))
>>> coupling = spatiotemporal_coupling(cube_x, cube_y)
"""
coupling_map = np.zeros(cube_x.shape[1:])
for i in range(cube_x.shape[1]):
for j in range(cube_x.shape[2]):
x = cube_x[:, i, j]
y = cube_y[:, i, j]
if np.std(x) > 0 and np.std(y) > 0:
coupling_map[i, j] = np.corrcoef(x, y)[0, 1]
else:
coupling_map[i, j] = 0
return coupling_map
[docs]
def multivariate_threshold_exceedance(cube_list, thresholds):
"""
Calculate frequency of simultaneous threshold exceedance for multiple variables.
Computes how often all variables simultaneously exceed their respective
thresholds. This is useful for identifying areas prone to compound
environmental extremes.
Parameters
----------
cube_list : list of np.ndarray
List of 3D arrays (time, lat, lon), one per variable.
thresholds : list of float
List of threshold values, one per variable.
Returns
-------
exceedance_map : np.ndarray
2D map of simultaneous exceedance frequency (0-1 scale).
Notes
-----
The function identifies time steps when ALL variables exceed their
respective thresholds simultaneously. This is more restrictive than
individual threshold exceedances and identifies true compound events.
Examples
--------
>>> import numpy as np
>>> cube1 = np.random.exponential(2, (50, 15, 15))
>>> cube2 = np.random.gamma(2, 1, (50, 15, 15))
>>> freq_map = multivariate_threshold_exceedance([cube1, cube2], [3.0, 2.5])
"""
exceedance = np.ones_like(cube_list[0], dtype=bool)
for cube, th in zip(cube_list, thresholds):
exceedance &= cube > th
freq_map = np.mean(exceedance, axis=0)
return freq_map
[docs]
def directional_coevolution(cube_x, cube_y):
"""
Calculate directional coevolution between two variables.
Measures how often two variables change in the same direction over time.
This identifies areas where variables have coordinated temporal dynamics
versus areas where they evolve independently or oppositely.
Parameters
----------
cube_x, cube_y : np.ndarray
3D arrays with shape (time, lat, lon) for the two variables.
Returns
-------
coevolution_map : np.ndarray
2D map of directional agreement (0-1 scale).
Notes
-----
The function computes temporal differences (changes) for both variables,
then calculates the proportion of time when both variables change in
the same direction (both increase or both decrease).
Values near 1 indicate strong coevolution, values near 0 indicate
independent evolution, and values near 0.5 suggest random relationships.
Examples
--------
>>> import numpy as np
>>> cube_x = np.random.random((100, 20, 20))
>>> cube_y = np.random.random((100, 20, 20))
>>> coevolution = directional_coevolution(cube_x, cube_y)
"""
dx = np.diff(cube_x, axis=0)
dy = np.diff(cube_y, axis=0)
agreement = np.sign(dx) == np.sign(dy)
coevolution_map = np.mean(agreement, axis=0)
return coevolution_map
[docs]
def multivariate_persistence(cube_list, thresholds):
"""
Calculate multivariate persistence as proportion of time with simultaneous conditions.
Computes the temporal persistence of compound conditions where multiple
variables simultaneously exceed their thresholds. This measures the
duration and frequency of multivariate extreme states.
Parameters
----------
cube_list : list of np.ndarray
List of 3D arrays (time, lat, lon), one per variable.
thresholds : list of float
List of threshold values, one per variable.
Returns
-------
persistence_map : np.ndarray
2D map of compound persistence (0-1 scale).
Notes
-----
This is similar to multivariate_threshold_exceedance but emphasizes the
temporal persistence aspect. High values indicate areas where compound
extreme conditions persist for extended periods.
Examples
--------
>>> import numpy as np
>>> cube1 = np.random.lognormal(0, 1, (365, 20, 20)) # Daily data
>>> cube2 = np.random.gamma(2, 1, (365, 20, 20))
>>> persistence = multivariate_persistence([cube1, cube2], [2.0, 3.0])
"""
condition = np.ones_like(cube_list[0], dtype=bool)
for cube, th in zip(cube_list, thresholds):
condition &= cube > th
persistence_map = np.mean(condition, axis=0)
return persistence_map