"""
Spatiotemporal Raster Analysis Module
====================================
This module provides comprehensive tools for spatiotemporal raster analysis in
environmental applications, with a focus on coastal management and marine spatial
analysis. The module handles NetCDF-based environmental data cubes and generates
binary masks for threshold-based analysis.
Main Components
---------------
**Configuration Management**
- JSON-based configuration file loading and validation
- Input data verification and consistency checking
- Output directory structure creation
**Temporal Analysis**
- Temporal difference calculation between elevation layers
- Change statistics computation across space and time
- Multi-simulation temporal pattern analysis
**Binary Matrix Generation**
- Threshold-based binary mask creation
- Spatiotemporal data cube processing
- NetCDF output generation with metadata
**Preprocessing and Refinement**
- Data preprocessing for spatial analysis
- Grid refinement and coordinate transformation
- Level data processing and statistics
Functions
---------
calculate_temporal_differences : Analyze temporal changes in elevation data
check_inputs : Validate input data and setup processing parameters
post_treatment : Perform preprocessing steps for analysis
binary_matrix : Generate binary masks from threshold comparison
analysis : Execute complete spatiotemporal raster analysis workflow
Examples
--------
Basic usage for coastal management analysis:
>>> from pathlib import Path
>>> from environmentaltools.spatiotemporal.raster import analysis
>>>
>>> # Setup configuration file path
>>> config_path = Path("config/coastal_analysis.json")
>>>
>>> # Execute complete analysis workflow
>>> results = analysis(config_path)
>>> print(f"Processed {len(results['datacube_filenames'])} simulations")
The configuration file should contain:
- project: Input/output paths and simulation parameters
- temporal: Time-related processing settings
- parameters: Analysis parameters including refinement options
- seasons: Seasonal definitions for temporal analysis
Notes
-----
This module is designed for processing large spatiotemporal datasets and
includes memory usage monitoring and optimization features. It supports
multiple simulation scenarios and analysis indices for comprehensive
environmental assessment.
"""
import numpy as np
import pandas as pd
import xarray as xr
from pathlib import Path
import json
from loguru import logger
from datetime import datetime
from time import time
import geopandas as gpd
from shapely import LineString
from pyproj import CRS
from skimage.measure import find_contours
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import psutil
from environmentaltools.spatiotemporal import indicators
GPKG_ACTIVE = False # Set to True if Fiona and GeoPackage support are available (for GeoPackage output instead of Shapefile)
[docs]
def largest_region_from_field(batches):
"""
Selects the largest region from a continuous field and returns its contour.
Parameters
----------
batches : tuple
Tuple containing:
- field: 2D ndarray of continuous values.
- refinement: bool, whether to use refined contour extraction.
Returns
-------
contours : list of ndarray
List of (y, x) coordinates of the interpolated contour for the largest region.
"""
field, refinement = batches
if not refinement:
# 1. Create binary mask: True where field <= 0, False otherwise
field = field <= 0
contours = find_contours(field.astype(bool), level=0)
else:
# Use float field for refined contour extraction
contours = find_contours(field.astype(float), level=0)
# Filter contours to keep only the one corresponding to the largest region
lengths = [c.shape[0] for c in contours]
idx_long = int(np.argmax(lengths))
contours = [contours[idx_long]]
return contours
[docs]
def preprocess(ds):
"""
Expand the dataset by adding a 'simulation' dimension using the 'simulation' attribute.
Parameters
----------
ds : xarray.Dataset
Input dataset, expected to have a 'simulation' attribute.
Returns
-------
xarray.Dataset
Dataset with an added 'simulation' dimension labeled by the simulation ID.
"""
sim_id = ds.attrs.get("simulation", "unknown")
return ds.expand_dims({"simulation": [sim_id]})
[docs]
def compute_contours(continuous_stack, info):
"""
Compute the largest connected contour for each time slice in a 3D raster stack.
This function processes a stack of 2D continuous raster fields (e.g., thresholded or probability maps)
and extracts the largest connected region (contour) from each time slice. The extraction is performed
in parallel for efficiency. The refinement parameter is passed to the region extraction function.
Parameters
----------
continuous_stack : np.ndarray
3D array of shape (T, Y, X), where T is the number of time slices, and Y, X are spatial dimensions.
Each slice continuous_stack[t] is a 2D field to be processed.
info : dict
Configuration dictionary containing analysis parameters. Must include:
- "refinement" : bool
Whether to apply grid refinement in contour extraction.
- "n_workers" : int
Number of parallel workers to use for processing.
Returns
-------
contours : list
List of extracted contours (one per time slice), as returned by largest_region_from_field.
"""
start_time = time()
# Number of time slices to process (T axis of the stack)
contours = []
n_to_process = int(continuous_stack.shape[0])
# Build a list of per-slice arguments for largest_region_from_field.
# Each element is a tuple (field_2d, refinement_flag)
slice_args = []
for t in range(n_to_process):
field2d = continuous_stack[t]
# Each field2d is a time slice to be processed for contour extraction
slice_args.append((field2d, info["refinement"]))
# Process slices in parallel and report progress with ETA (estimated time of arrival)
total_slices = len(slice_args)
with ProcessPoolExecutor(max_workers=info["n_workers"]) as ex:
for slice_idx, slice_res in enumerate(ex.map(largest_region_from_field, slice_args), start=1):
contours.append(slice_res)
elapsed = time() - start_time
print(
f"Slice {slice_idx}/{total_slices}"
f", elapsed {elapsed:.1f}s.",
end="\r", flush=True,
)
return contours
[docs]
def analysis(info=None):
"""
Execute the complete spatiotemporal raster analysis workflow.
This is the main entry point for the spatiotemporal raster processing pipeline. It orchestrates
configuration loading, input validation, simulation and index processing, and output generation.
The function supports multi-simulation and multi-index workflows for environmental and coastal analysis.
Parameters
----------
info : dict
Configuration dictionary containing all project parameters, input/output paths, and analysis settings.
Expected structure matches the validated configuration (see check_inputs docstring).
Returns
-------
info : dict
Updated configuration dictionary, including processed results, output file paths, and metadata.
Notes
-----
The analysis workflow includes:
1. Configuration loading and validation
2. Input data verification and preprocessing
3. Binary matrix and contour generation for each simulation and index
4. NetCDF output file creation with metadata
5. Progress logging and error handling
The function processes all simulations and analysis indices as specified in the configuration,
creating separate output files for each combination. Results and metadata are stored in the returned info dict.
Examples
--------
>>> from pathlib import Path
>>> import json
>>> with open("config/coastal_analysis.json") as f:
... config = json.load(f)
>>> results = analysis(config)
>>> print(f"Processed {len(results['datacube_filenames'])} simulations")
"""
logger.info("="*60)
logger.info("STARTING SPATIOTEMPORAL RASTER ANALYSIS")
logger.info("="*60)
# Validate input data and setup processing parameters
check_inputs(info)
# Process each analysis index specified in configuration
# (for multi-index workflows, loop over indices here)
logger.info(f"Processing project index: {info['project']['index']}.")
if info["index"] == "presence_boundary":
# Allocate a continuous data stack (original variable values) to use for percentile calculations
data_stack = np.ndarray(info["dimensions"], dtype=np.float32)
# Process each simulation for the current index (loop over simulations)
for sim_no in range(info["project"]["no_sims"]):
# Log simulation progress (show fraction completed for user feedback)
print(f"Processing simulation {sim_no + 1}/{info['project']['no_sims']} for index {info['project']['index']}.", end="\r", flush=True)
# Load spatiotemporal data cube for current simulation (NetCDF file)
file_path = info["datacube_filenames"][sim_no]
# Extract the specified environmental variable from NetCDF file
data_cube = xr.open_dataset(file_path)[info[info["index"]]["variable"]]
# Store continuous values for later interpolation (time, y, x)
data_stack[sim_no, :, :, :] = data_cube.values
# Load threshold levels from CSV file for current simulation
levels = pd.read_csv(
info['level_filenames'][sim_no],
sep=",",
index_col=0,
)
for j, date in enumerate(levels.index):
data_stack[sim_no, j, :, :] -= levels.loc[date, info[info["index"]]["temporal"]]
logger.info("="*60)
logger.info("OBTAINING CONTOURS")
logger.info("="*60)
contours = [[] for _ in info['percentiles']]
for i_perc, p in enumerate(info['percentiles']):
# Compute the continuous percentile stack from elevation data across the simulation axis (axis=0)
continuous_stack = np.percentile(data_stack, 100-float(p), axis=0)
# Compute contours for the current percentile
contours[i_perc] = compute_contours(continuous_stack, info)
logger.info(f"Completed percentile {p}%.")
if info["project"]["index"] == "influence_extent":
contours = [[] for _ in info["influence_extent"]["horizon_times"]]
for i_ht, horizon_time in enumerate(info["influence_extent"]["horizon_times"]):
# Allocate a continuous data stack (original variable values) to use for percentiles
len_times = np.sum(info["dates"] <= horizon_time)
data_stack = np.ndarray((info["dimensions"][0], len_times, info["dimensions"][2], info["dimensions"][3]), dtype=np.float32)
# Process each simulation for the current index
for sim_no in range(info["project"]["no_sims"]):
# Log simulation progress (show fraction completed)
print(f"Processing simulation {sim_no + 1}/{info['project']['no_sims']} for index {info['project']['index']}.", end="\r", flush=True)
# Load spatiotemporal data cube for current simulation
file_path = info["datacube_filenames"][sim_no]
# Extract the specified environmental variable from NetCDF file
data_cube = xr.open_dataset(file_path)[info[info["index"]]["variable"]]
dates = [d for d in info["dates"] if horizon_time >= d]
data_cube_sel = data_cube.sel(time=dates)
data_stack[sim_no, :, :, :] = data_cube_sel.values - info["influence_extent"]["temporal"][i_ht]
if i_ht == 0:
logger.info("="*60)
logger.info(f"OBTAINING CONTOURS")
logger.info("="*60)
data_stack = np.max(data_stack, axis=0)
# Compute contours for the current percentile
contours[i_ht] = compute_contours(data_stack, info)
logger.info(f"Completed horizon time {horizon_time}.")
if info["project"]["index"] == "directional_influence":
# Allocate a continuous data stack (original variable values) to use for percentiles
data_stack = np.ndarray(info["dimensions"], dtype=np.float32)
# Process each simulation for the current index
for sim_no in range(info["project"]["no_sims"]):
# Log simulation progress (show fraction completed)
print(f"Processing simulation {sim_no + 1}/{info['project']['no_sims']} for index {info['project']['index']}.", end="\r", flush=True)
# Load spatiotemporal data cube for current simulation
file_path = info["datacube_filenames"][sim_no]
# Extract the specified environmental variable from NetCDF file
data_cube = xr.open_dataset(file_path)[info[info["index"]]["variable"]]
# Store continuous values for later interpolation (time, y, x)
data_stack[sim_no, :, :, :] = data_cube.values
# Load threshold levels from CSV file for current simulation
levels = pd.read_csv(
info['level_filenames'][sim_no],
sep=",",
index_col=0,
)
for j, date in enumerate(levels.index):
data_stack[sim_no, j, :, :] -= levels.loc[date, info[info["index"]]["temporal"]]
results = [[] for _ in info["percentiles"]]
for i_perc, p in enumerate(info['percentiles']):
# Compute the continuous percentile stack from elevation data across the simulation axis (axis=0)
continuous_stack = np.percentile(data_stack, 100-float(p), axis=0)
angle_map, magnitude_map = indicators.directional_influence(continuous_stack)
results[i_perc] = [angle_map, magnitude_map]
if info["contour"]:
save_contours(contours, info)
else:
save_arrays_to_netcdf(results, info)
return
[docs]
def obtain_geometry(contour, info):
"""
Convert a contour represented by pixel indices to a real-world geometry.
Maps a sequence of (row, col) pixel indices from a raster contour to real-world coordinates
using the x and y coordinate arrays provided in the info dictionary. Returns a LineString
geometry representing the contour in spatial coordinates.
Parameters
----------
contour : np.ndarray
Array of shape (N, 2) containing (row, col) pixel indices of the contour.
info : dict
Configuration dictionary containing coordinate arrays under info['coords']['x'] and info['coords']['y'].
Returns
-------
geom : shapely.geometry.LineString
LineString geometry representing the contour in real-world coordinates.
"""
# contour is an (N,2) array with (row, col)
# Attempt to map pixel indices to real-world coordinates
x_coords = np.asarray(info.get("coords", {}).get("x", []))
y_coords = np.asarray(info.get("coords", {}).get("y", []))
nx = x_coords.size
ny = y_coords.size
col_idx = np.arange(nx)
row_idx = np.arange(ny)
row_vals = contour[:, 0]
col_vals = contour[:, 1]
xs = np.interp(col_vals, col_idx, x_coords)
ys = np.interp(row_vals, row_idx, y_coords)
coords_list = list(zip(xs.astype(float).tolist(), ys.astype(float).tolist()))
geom = LineString(coords_list)
return geom
[docs]
def save_layer(records, info):
"""
Save a set of geometry records as a spatial layer (GeoPackage or Shapefile).
Converts a list of geometry records into a GeoDataFrame, assigns the correct CRS,
and writes the layer to disk. The output format is determined by the GPKG_ACTIVE flag:
if True, writes to a GeoPackage (GPKG); otherwise, writes to an ESRI Shapefile.
Parameters
----------
records : list of dict
List of records, each containing geometry and attribute data for the layer.
info : dict
Configuration dictionary containing output directory, CRS, and layer name (index).
Returns
-------
None
The function writes files to disk and does not return a value.
"""
# Use CRS from info['coords']
# Normalize and assign CRS
gdf = gpd.GeoDataFrame(records)
crs_obj = CRS.from_user_input(info["coords"]["crs"])
auth = crs_obj.to_authority()
gdf = gdf.set_crs(CRS.from_epsg(int(auth[1])), allow_override=True)
layer_name = info["index"]
if GPKG_ACTIVE is True:
# Write to GeoPackage
if first_layer:
gdf.to_file(f"{layer_name}.gpkg", driver="GPKG", layer=layer_name)
first_layer = False
else:
gdf.to_file(f"{layer_name}.gpkg", driver="GPKG", layer=layer_name, mode="a")
else:
# Write a shapefile per-percentile for compatibility with CRS
shp_path = Path(info["output_dir"]) / f"{layer_name}.shp"
gdf.to_file(shp_path, driver="ESRI Shapefile")
return
[docs]
def save_contours(contours, info):
"""
Save extracted contour geometries to disk as a spatial layer (GeoPackage or Shapefile).
Converts a nested list of contour arrays (per percentile or horizon time, per time slice)
into real-world geometries and saves them using the save_layer function. The output format
is determined by the configuration and the GPKG_ACTIVE flag.
Parameters
----------
contours : list
Nested list of contour arrays. For each percentile or horizon time, contains a list of
per-time-slice contour arrays (each array is an (N, 2) array of pixel indices).
info : dict
Configuration dictionary with output path, coordinate arrays, and index-specific keys
(e.g., 'percentiles' or 'horizon_times').
Returns
-------
None
The function writes files to disk and does not return a value.
"""
if info["index"] == "presence_boundary":
key_ = "percentiles"
elif info["index"] == "influence_extent":
key_ = "horizon_times"
records = []
for i_key, key in enumerate(info[info["index"]][key_]):
for t_idx, slice_contours in enumerate(contours[i_key]):
for contour in slice_contours:
geom = obtain_geometry(contour, info)
if info["index"] == "presence_boundary":
records.append({"geometry": geom, "percentile": str(key), "time": info["dates"][t_idx]})
elif info["index"] == "influence_extent":
records.append({"geometry": geom, "horizon_time": str(key)})
save_layer(records, info)
return
[docs]
def save_arrays_to_netcdf(arrays, info):
"""
Save a list of arrays (per percentile) to a NetCDF file with spatial coordinates and metadata.
This function writes angle and magnitude arrays (per percentile) to a NetCDF file, including
CF-Convention metadata and CRS (EPSG:25830 or as specified) for compatibility with QGIS/MDAL.
The output includes coordinate variables, grid mapping, and global attributes for spatial reference.
Parameters
----------
arrays : list of [angle, magnitude]
List of [angle, magnitude] arrays for each percentile. Each array should be 2D or 3D (optionally 4D with time).
info : dict
Configuration dictionary containing coordinate arrays, percentiles, CRS, and output directory.
Returns
-------
outpath : Path
Path to the saved NetCDF file.
"""
# Spatial coordinates
x = np.asarray(info['coords']['x'])
y = np.asarray(info['coords']['y'])
coords = {'y': y, 'x': x}
# Separate angle and magnitude arrays
angle_list = [arrs[0] for arrs in arrays]
magnitude_list = [arrs[1] for arrs in arrays]
angle_arr = np.stack(angle_list, axis=0)
magnitude_arr = np.stack(magnitude_list, axis=0)
# Detect array dimensions to set NetCDF variable dimensions
if angle_arr.ndim == 4:
dims = ['percentile', 'time', 'y', 'x']
time = np.asarray(info['dates'])
coords['time'] = time
elif angle_arr.ndim == 3:
dims = ['percentile', 'y', 'x']
else:
raise ValueError('Each array must be 2D or 3D')
coords['percentile'] = np.array(info["percentiles"], dtype=float)
# Standard attributes for coordinates (CF conventions)
x_attrs = {
"standard_name": "projection_x_coordinate",
"long_name": "x coordinate of projection",
"units": "m"
}
y_attrs = {
"standard_name": "projection_y_coordinate",
"long_name": "y coordinate of projection",
"units": "m"
}
# CRS auxiliary variable (CF-1.8): contains projection information for QGIS/MDAL compatibility
crs_code = info['coords']['crs']
crs_obj = CRS.from_user_input(crs_code)
epsg = crs_obj.to_epsg()
wkt = crs_obj.to_wkt()
crs_attrs = {
"grid_mapping_name": "transverse_mercator" if crs_obj.to_dict().get("proj") == "tmerc" else "latitude_longitude",
"epsg_code": f"EPSG:{epsg}" if epsg else "",
"crs_wkt": wkt
}
# Extra global attributes for QGIS/MDAL compatibility
proj4 = crs_obj.to_proj4() if hasattr(crs_obj, 'to_proj4') else ""
global_attrs = {
"crs": wkt,
"spatial_ref": wkt,
"crs_wkt": wkt,
"proj4": proj4
}
# Auxiliary variable spatial_ref (string type, WKT value, with attributes)
spatial_ref_attrs = {
"spatial_ref": wkt,
"crs_wkt": wkt,
"grid_mapping_name": crs_attrs["grid_mapping_name"],
"epsg_code": crs_attrs["epsg_code"]
}
wkt_bytes = wkt.encode('utf-8')
strlen = len(wkt_bytes)
spatial_ref_char = np.frombuffer(wkt_bytes, dtype='S1')
ds = xr.Dataset(
{
'angle': (dims, angle_arr, {"grid_mapping": "spatial_ref"}),
'magnitude': (dims, magnitude_arr, {"grid_mapping": "spatial_ref"}),
'spatial_ref': (('strlen',), spatial_ref_char, spatial_ref_attrs)
},
coords={
'x': ('x', x, x_attrs),
'y': ('y', y, y_attrs),
'percentile': ('percentile', np.array(info["percentiles"], dtype=float)),
**({'time': ('time', time)} if angle_arr.ndim == 4 else {}),
'strlen': np.arange(strlen)
},
attrs=global_attrs
)
# Guardar
outpath = Path(info["output_dir"]) / (str(info["index"]) + ".nc")
ds.to_netcdf(outpath)
return outpath
[docs]
def load_results(results_path):
"""
Load indicator analysis results from a results directory (JSON + NPZ format).
This function reconstructs the results of a spatiotemporal indicator analysis from a directory
containing a metadata.json file and an arrays.npz file. It returns a dictionary mapping each
index name to a list of (contours, mean_map) tuples for each simulation.
Parameters
----------
results_path : str or Path
Path to the results directory or directly to the metadata.json file.
Returns
-------
results : dict
Dictionary containing indicator results for each index.
Format: {index_name: [(contours, mean_map), ...]}
Examples
--------
>>> from environmentaltools.spatiotemporal import raster
>>> results = raster.load_results("results/indicator_results_20250112_143022")
>>> # Access results for specific index
>>> for contours, mean_map in results['mean_presence_boundary']:
... print(f"Found {len(contours)} contours")
"""
from pathlib import Path
results_path = Path(results_path)
# Handle both directory and metadata.json file paths
if results_path.is_file() and results_path.name == "metadata.json":
results_dir = results_path.parent
metadata_file = results_path
else:
results_dir = results_path
metadata_file = results_dir / "metadata.json"
arrays_file = results_dir / "arrays.npz"
if not metadata_file.exists():
raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
if not arrays_file.exists():
raise FileNotFoundError(f"Arrays file not found: {arrays_file}")
# Load metadata
with open(metadata_file, 'r') as f:
metadata = json.load(f)
# Load arrays
arrays = np.load(arrays_file)
# Reconstruct results dictionary
results = {}
for index_name, index_meta in metadata["indices"].items():
results[index_name] = []
for sim_idx in range(index_meta["n_simulations"]):
# Load mean_map
mean_map_key = index_meta["mean_map_keys"][sim_idx]
mean_map = arrays[mean_map_key]
# Load contours
contours = []
for contour_key in index_meta["contour_keys"][sim_idx]:
contours.append(arrays[contour_key])
results[index_name].append((contours, mean_map))
logger.info(f"Results loaded from: {results_dir}")
return results