Source code for environmentaltools.processes.compute

import os
import shutil
import subprocess
from datetime import datetime

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.stats as st
import xarray as xr
from loguru import logger
from environmentaltools.processes import load, waves, write
from environmentaltools.spatial.analysis import rotate_coords
from environmentaltools.common import utils, read, save
from scipy.interpolate import griddata, interp1d
from scipy.special import erfc


[docs] def create_db(params, data, mesh="global", time_=None, vars_=None, method="nearest"): """Create the project folder with the initialized files for SWAN and Copla models. Parameters ---------- params : dict Dictionary with the model parameters data : pd.DataFrame DataFrame with the time series of the boundary data mesh : str, optional Mesh type identifier. Default is 'global'. time_ : array-like, optional Time coordinates. Default is None. vars_ : str or list, optional Variable names to include. Default is None. method : str, optional Interpolation method ('nearest', 'linear', 'cubic'). Default is 'nearest'. Returns ------- xr.Dataset Dataset of the project with initialized grids """ lon, lat = create_mesh(params, mesh) if isinstance(vars_, str): vars_ = [vars_] db = create_xarray(lon, lat, time_=time_, vars_=vars_) if time_ is None: db["depth"][:, :] = griddata( data.loc[:, ["x", "y"]], data.loc[:, "z"].values, (lon, lat), method=method ) else: db["depth"][:, :, 0] = griddata( data.loc[:, ["x", "y"]], data.loc[:, "z"].values, (lon, lat), method="linear", ) return db
[docs] def create_mesh(params, mesh="global"): """Create a computational mesh for the model domain. Parameters ---------- params : dict Dictionary containing mesh parameters including: - {mesh}_length_x : float Domain length in x-direction - {mesh}_length_y : float Domain length in y-direction - {mesh}_nodes_x : int Number of nodes in x-direction - {mesh}_nodes_y : int Number of nodes in y-direction - {mesh}_angle : float Rotation angle in degrees - {mesh}_coords_x : float Origin x-coordinate - {mesh}_coords_y : float Origin y-coordinate mesh : str, optional Mesh identifier prefix. Default is 'global'. Returns ------- lon : np.ndarray Longitude coordinates of mesh nodes lat : np.ndarray Latitude coordinates of mesh nodes """ dx = np.linspace(0, params[mesh + "_length_x"], params[mesh + "_nodes_x"]) dy = np.linspace(0, params[mesh + "_length_y"], params[mesh + "_nodes_y"]) dx, dy = np.meshgrid(dx, dy) x, y = rotate_coords(dx, dy, params[mesh + "_angle"]) lon, lat = x + params[mesh + "_coords_x"], y + params[mesh + "_coords_y"] return lon, lat
[docs] def create_xarray(x, y, time_=None, vars_=None): """Create a 2D or 3D xarray Dataset with specified fields. Parameters ---------- x : np.ndarray X-coordinates (longitude) grid y : np.ndarray Y-coordinates (latitude) grid time_ : array-like or int, optional Time coordinates. If int, creates a single time step. Default is None. vars_ : str, list, or None, optional Variable names to include. Options: - None: default variables (depth, Hs, DirM, U, DirU, qc, ql, Setup) - 'full': extended variable list including wave parameters - list: custom list of variable names Default is None. Returns ------- xr.Dataset Dataset with initialized variables and coordinates """ dict_ = {} if vars_ is None: vars_ = "depth", "Hs", "DirM", "U", "DirU", "qc", "ql", "Setup" elif vars_ == "full": vars_ = ( "depth", "Sb", "Hs", "Tp", "DirM", "Setup", "cp", "L", "U", "DirU", "Dr", "Db", "Df", "Qst", "Qbt", "qc", "ql", ) if time_ is not None: if isinstance(time_, int): zeros = np.zeros(np.append(np.shape(x), 1)) time_ = [time_] else: zeros = np.zeros(np.append(np.shape(x), len(time_))) for i in vars_: dict_[i] = (["lon", "lat", "time"], zeros.copy()) db = xr.Dataset( dict_, coords={"x": (["lon", "lat"], x), "y": (["lon", "lat"], y), "time": time_}, ) else: zeros = np.zeros(np.shape(x)) for i in vars_: dict_[i] = (["lon", "lat"], zeros.copy()) db = xr.Dataset( dict_, coords={"x": (["lon", "lat"], x), "y": (["lon", "lat"], y)} ) return db
[docs] def slopes(data, variable="DirU"): """Compute the bottom slopes in the direction of a specified variable. Parameters ---------- data : pd.DataFrame DataFrame containing geometry data with columns 'x', 'y', 'depth', and the specified variable variable : str, optional Name of the circular direction variable in degrees. Default is 'DirU'. Returns ------- np.ndarray Array of bottom slopes in the specified direction """ xi, yi = ( (data["x"] + np.cos(data[variable] * np.pi / 180)).reshape(-1), (data["y"] + np.sin(data[variable] * np.pi / 180)).reshape(-1), ) Sb = griddata( (data["x"].reshape(-1), data["y"].reshape(-1)), data["depth"].reshape(-1), (xi, yi), ) - data["depth"].reshape(-1) Sb = Sb.reshape(np.shape(data["x"])) Sb[np.isnan(Sb)] = 0 return Sb
[docs] def sediment_transport_Kobayashi(j, data, grid, params): """Calculate suspended and bed-load transport following Kobayashi et al. (2008). This function implements the Kobayashi sediment transport model which computes both suspended load and bed load transport rates based on wave and current conditions. Parameters ---------- j : pd.DatetimeIndex Timestamp for the current computation data : pd.DataFrame DataFrame with input time series including wave parameters grid : dict Dictionary containing 2D numpy arrays with required spatial data (depth, Setup, etc.) params : dict Dictionary with model constants and parameters including: - fb : float Bottom friction coefficient - wf : float Fall velocity - gamma : float Breaking parameter - And other model-specific parameters Returns ------- dict Updated grid dictionary with computed sediment transport fields including: - Qst : Suspended load transport rate - Qbt : Bed load transport rate - qc : Cross-shore sediment transport - ql : Alongshore sediment transport """ G = 9.8091 grid["D"] = grid["depth"] + grid["Setup"] grid["D"][grid["D"] < 0.1] = np.nan # Total depth grid["cp"] = np.sqrt(G * grid["D"]) # Shallow water waves grid["sigma"] = data.loc[j, "Hs"] / np.sqrt(8) / grid["D"] * grid["cp"] grid["sigma"][np.isnan(grid["sigma"])] = 0 grid["Rs"] = (2 / params["fb"]) ** (1 / 3) * params["wf"] / grid["sigma"] grid["Rs"][np.isnan(grid["Rs"])] = 0 grid["Ps"] = 0.5 * erfc((grid["Rs"] + grid["U"]) / np.sqrt(2)) + 0.5 * erfc( (grid["Rs"] - grid["U"]) / np.sqrt(2) ) grid["Hb"] = ( 0.88 / grid["kp"] * np.tanh( params["gamma"] * grid["kp"] * grid["D"] / (0.88 * grid["cp"] * data.loc[j, "Tp"]) ) ) # Kobayashi 0.88 grid["Hb"][grid["Hb"] / grid["D"] > 0.88] = ( 0.88 * grid["D"][grid["Hb"] / grid["D"] > 0.88] ) grid["Hb"][np.isnan(grid["Hb"])] = 0 # Suspended load transport # -------------------------------------------------------------------------- grid["Dr"] = ( G * (params["rho"] * 0.9 * grid["Hb"] ** 2 * grid["cp"] / data.loc[j, "Tp"]) * np.sin(params["br"]) ) # Roller effect grid["Dr"][np.isnan(grid["Dr"])] = 0 grid["arot"] = 1 / 3 * grid["Sb"] * data.loc[j, "Tp"] * np.sqrt(G / grid["D"]) grid["arot"][np.isnan(grid["arot"])] = 0 grid["Db"] = ( params["rho"] * grid["arot"] * G * grid["Qb"] * grid["Hb"] ** 2 / (4 * data.loc[j, "Tp"]) ) # Battjes and Stive, 1985 grid["Df"] = 0.5 * params["rho"] * G * params["fb"] * grid["U"] ** 3 grid["Vs"] = ( (params["eb"] * (grid["Db"] - grid["Dr"]) + params["ef"] * grid["Df"]) * grid["Ps"] / (params["rho"] * G * (params["S"] - 1) * params["wf"]) ) grid["Qst"] = params["aK"] * grid["U"] * grid["Vs"] * np.sqrt(1 + grid["Sb"] ** 2) # Bed-load transport # -------------------------------------------------------------------------- grid["Rb"] = ( np.sqrt( 2 * G * (params["S"] - 1) * params["d50"] * params["Shic"] / params["fb"] ) / grid["sigma"] ) grid["Rb"][np.isnan(grid["Rb"])] = 0 grid["Pb"] = 0.5 * erfc((grid["Rb"] + grid["U"]) / np.sqrt(2)) + 0.5 * erfc( (grid["Rb"] - grid["U"]) / np.sqrt(2) ) grid["Gs"] = (np.tan(params["phi"]) - 2 * grid["Sb"]) / ( np.tan(params["phi"]) - grid["Sb"] ) grid["Gs"][grid["Sb"] < 0] = np.tan(params["phi"]) / ( np.tan(params["phi"]) + grid["Sb"][grid["Sb"] < 0] ) grid["Qbt"] = ( params["bK"] * grid["Pb"] * grid["Gs"] * grid["sigma"] ** 3 / (G * (params["S"] - 1)) ) # Formulación de Meyer-Peter-Mueller, modificado # Cross-shore and alongshore sediment transport # -------------------------------------------------------------------------- grid["qc"] = ( (grid["Qbt"] + grid["Qst"]) * np.sin((grid["DirU"] - params["local_angle"]) * np.pi / 180) * params["tburst"] ) # /((1.00-param['materiales']['p'])*param['malla']['local']['inc'][0]) grid["ql"] = ( (grid["Qbt"] + grid["Qst"]) * np.cos((grid["DirU"] - params["local_angle"]) * np.pi / 180) * params["tburst"] ) # /((1.00-param['materiales']['p'])*param['malla']['local']['inc'][1]) return grid
[docs] def sediment_transport_CERC(data, params, theta_c): """Calculate longshore sediment transport using CERC formula. The CERC (Coastal Engineering Research Center) formula is used to estimate longshore sediment transport rates based on breaking wave characteristics. Parameters ---------- data : pd.DataFrame DataFrame with wave parameters including: - Hr : Breaking wave height (m) - thetar : Breaking wave angle (degrees) params : dict Dictionary with sediment and water properties: - d50 : float Mean sediment grain size (m) - rho : float Fresh-water density (kg/m³) - rho_s : float Sediment density (kg/m³) - n : float Porosity - gamma : float Breaking parameter theta_c : float Beach angle (degrees) Returns ------- sign_ : np.ndarray Sign of transport direction Q : np.ndarray Longshore sediment transport rate """ data["alphar"] = np.deg2rad( np.min( np.asarray( [np.abs(data["thetar"] - theta_c), np.abs(theta_c - data["thetar"])] ), axis=0, ) ) data.loc[data["alphar"] > np.pi / 2, "alphar"] = ( 0 # el valor absolutio del ángulo entre la normal y el tren de olas no puede ser superior a 90 ) G = 9.8091 K = 1.4 * np.exp(-2.5 * params["d50"]) Q0 = ( K * ( params["rho"] * np.sqrt(G) / ( 16 * np.sqrt(params["gamma"]) * (params["rho_s"] - params["rho"]) * (1 - params["n"]) ) ) * data["Hr"] ** 2.5 ) sign_ = np.sign(data["thetar"] - theta_c) Q = sign_ * Q0 * np.sin(2 * data["alphar"]) return sign_, Q
[docs] def nesting(k, l, params, data): """Execute nested SWAN and COPLA model runs for a specific time step. This function performs a coupled wave-morphodynamic simulation by first running SWAN for wave propagation, then COPLA for profile evolution, and finally computing sediment transport using the Kobayashi method. Parameters ---------- k : int Time step index l : pd.DatetimeIndex or similar Current timestamp for the computation params : dict Dictionary with model parameters including: - cwd : str Current working directory - directory : str Project directory name - Other model-specific parameters data : pd.DataFrame DataFrame with boundary condition time series Returns ------- dict Grid dictionary containing computed fields including wave parameters, velocities, and sediment transport rates """ id_ = str(k + 1).zfill(4) write.write_swan(k, l, id_, data, params, mesh="local") current_working_directory = os.path.join(params["cwd"], params["directory"], id_) if not os.path.exists(os.path.join(current_working_directory, id_ + ".mat")): run_swan(current_working_directory) write.copla(k, l, str(k + 1).zfill(4), data, params, mesh="local") copla(current_working_directory) # join(current_working_directory) # grid = load.copla(params['directory'] + '/' + id_ + '/' + id_+ 'tot.out') grid = load.read_copla(params["directory"] + "/" + id_ + "/" + id_ + "vel.001") grid = load.read_swan(params["directory"] + "/" + id_ + "/" + id_ + ".mat", grid) grid["Sb"] = slopes(grid) # compute the slopes in the wave current direction grid = sediment_transport_Kobayashi( l, data, grid, params ) # compute the sediment transport return grid
[docs] def run_swan(swan_executable_path, working_directory): """Execute SWAN (Simulating WAves Nearshore) model. Runs the SWAN wave model in the specified working directory. SWAN is a third-generation wave model for obtaining realistic estimates of wave parameters in coastal areas. Parameters ---------- swan_executable_path : str Path to the SWAN executable file working_directory : str Path to the working directory containing SWAN input files Returns ------- None Raises ------ FileNotFoundError If SWAN executable is not found at the specified path """ # Verify that SWAN executable exists if not os.path.isfile(swan_executable_path): raise FileNotFoundError( f"SWAN executable not found at: {swan_executable_path}\n" "Please download SWAN from: https://swanmodel.sourceforge.io/download/download.htm\n" "After installation, provide the correct path to the SWAN executable." ) subprocess.run(swan_executable_path, cwd=working_directory) return
[docs] def run_copla(copla_executable_path, working_directory): """Execute COPLA (Coastal Profile Algorithm) model. Runs the COPLA morphodynamic model which simulates cross-shore profile evolution in the specified working directory. Parameters ---------- copla_executable_path : str Path to the COPLA executable file working_directory : str Path to the working directory containing COPLA input files Returns ------- None Raises ------ FileNotFoundError If COPLA executable is not found at the specified path """ # Verify that COPLA executable exists if not os.path.isfile(copla_executable_path): raise FileNotFoundError( f"COPLA executable not found at: {copla_executable_path}\n" "Please ensure COPLA is properly installed and provide the correct path to the executable." ) subprocess.run(copla_executable_path, cwd=working_directory) return
[docs] def run_cshore(cshore_executable_path, working_directory): """Execute CSHORE (Coastal Storm Hindcast of Reshaping and Erosion) model. Runs the CSHORE model for simulating beach profile evolution and storm impacts in the specified working directory. Parameters ---------- cshore_executable_path : str Path to the CSHORE executable file working_directory : str Path to the working directory containing CSHORE input files Returns ------- None Raises ------ FileNotFoundError If CSHORE executable is not found at the specified path """ # Verify that CSHORE executable exists if not os.path.isfile(cshore_executable_path): raise FileNotFoundError( f"CSHORE executable not found at: {cshore_executable_path}\n" "Please download CSHORE from: https://github.com/erdc/cshore\n" "After installation, provide the correct path to the CSHORE executable." ) subprocess.run(cshore_executable_path, cwd=working_directory) return
[docs] def run_coastalme(coastalme_executable_path, working_directory): """Run CoastalME model in the specified directory. Parameters ---------- coastalme_executable_path : str Path to the CoastalME executable file working_directory : str Path to the working directory where CoastalME will be executed Returns ------- None Raises ------ FileNotFoundError If CoastalME executable is not found at the specified path """ # Verify that CoastalME executable exists if not os.path.isfile(coastalme_executable_path): raise FileNotFoundError( f"CoastalME executable not found at: {coastalme_executable_path}\n" "Please download CoastalME from: https://github.com/coastalme/\n" "After installation, provide the correct path to the CoastalME executable." ) subprocess.run(coastalme_executable_path, cwd=working_directory) return
[docs] def save_db(time_, grid, data, params): """Create and save xarray Dataset with computed model results for a time step. Updates the database by creating an xarray Dataset with the computation results and saves it to a NetCDF file. Parameters ---------- time_ : int or pd.DatetimeIndex Time step identifier grid : dict Dictionary containing computed values for the current time step including: - depth : np.ndarray Water depth - Hs : np.ndarray Significant wave height - DirM : np.ndarray Mean wave direction - U : np.ndarray Current velocity magnitude - DirU : np.ndarray Current direction - qc, ql : np.ndarray Cross-shore and longshore sediment transport - Setup : np.ndarray Wave setup - Qb : np.ndarray Bed load transport data : pd.DataFrame DataFrame with boundary data params : dict Dictionary with model parameters including: - fileout : str Output file path prefix Returns ------- xr.Dataset Updated dataset with computed fields """ # kw = ['Hs', 'qc', 'ql', 'Setup', 'DirM', 'd', 'cp', # 'Dr', 'Db', 'Df', 'L', 'U', 'DirU', 'Qst', 'Qbt'] kw = ["depth", "Hs", "DirM", "U", "DirU", "qc", "ql", "Setup", "Qb"] db = create_db(params, data, "local", vars_=kw, time_=time_) for var_ in kw: if var_.startswith("Dir"): db[var_][:, :, 0] = np.remainder(270 - grid[var_], 360) else: db[var_][:, :, 0] = grid[var_] db.to_netcdf(params["fileout"] + "_" + str(time_).zfill(4) + ".nc") return db
[docs] def clean(params): """Remove temporary computation directory tree if deletion is enabled. Cleans up the working directory created during model execution by recursively removing all files and subdirectories. Parameters ---------- params : dict Dictionary with model parameters including: - delete_folder : bool Flag to enable/disable directory deletion - cwd : str Current working directory - directory : str Name of the directory to remove Returns ------- None """ if params["delete_folder"]: shutil.rmtree( os.path.join(params["cwd"], params["directory"]), ignore_errors=True ) return
[docs] def equilibrium_plan_shape(params, data): """Compute equilibrium planform shape using parabolic bay equation. Calculates the equilibrium beach planform shape based on wave diffraction theory and parabolic bay equation (Hsu and Evans, 1989). Useful for assessing coastal stability and headland bay beach morphology. Parameters ---------- params : dict Dictionary with parameters including: - x : float X-coordinate of diffraction point - y : float Y-coordinate of diffraction point - theta_m : float Mean wave direction (degrees) - Ts12 : float Mean wave period (s) - h : float Water depth (m) - beta_r : float, optional Parabolic coefficient (default: 2.13) data : pd.DataFrame DataFrame with profile coordinates containing: - x : X-coordinates - y : Y-coordinates Returns ------- x : np.ndarray X-coordinates of equilibrium planform y : np.ndarray Y-coordinates of equilibrium planform theta_0 : float Control angle at the reference point (radians) """ if not "beta_r" in params.keys(): params["beta_r"] = 2.13 theta_m = np.remainder(270 - params["theta_m"], 360) # compute the angles between the diffraction point and every point along the profile thetas = np.arctan2(data.x - params["x"], data.y - params["y"]) theta_0 = thetas - np.deg2rad(theta_m - 90) ds = np.sqrt((data.x - params["x"]) ** 2 + (data.y - params["y"]) ** 2) Ys = ds * np.cos(theta_0) npoint = np.argmin( np.abs(thetas) ) # plane that follows the mean energy flux and crosses the diffraction point k = waves.wave_number(params["Ts12"], params["h"]) L = 2 * np.pi / k diff, npoints, iter_ = 1, list(), 0 while diff > 1e-3: alpha_min = np.arctan( np.sqrt( params["beta_r"] ** 4 / 16 + params["beta_r"] ** 2 * Ys[npoint] / (2 * L) ) / (Ys[npoint] / L) ) npoint = np.argmin(np.abs(alpha_min - theta_0)) npoints.append(npoint) diff = np.abs(alpha_min - theta_0[npoint]) if any(npoints == npoint): break iter_ += 1 beta = np.pi / 2 - theta_0[npoint] R0 = ds[npoint] R0 = 254.8474172725602 # change this value for convex beaches coeffs = read.xlsx("parabolic_coeffs") C0 = np.interp(np.rad2deg(beta), coeffs.index, coeffs["C0"]) C1 = np.interp(np.rad2deg(beta), coeffs.index, coeffs["C1"]) C2 = np.interp(np.rad2deg(beta), coeffs.index, coeffs["C2"]) theta = np.deg2rad(np.linspace(np.rad2deg(beta), 180, 100)) R = R0 * (C0 + C1 * (beta / theta) + C2 * (beta / theta) ** 2) theta = np.pi / 2 - theta + np.deg2rad(theta_m - 90) x = -R * np.sin(theta) y = R * np.cos(theta) # print(L, np.rad2deg(theta_0[npoint])) return x + params["x"], y + params["y"], theta_0[npoint]
[docs] def coastline_evolution(coastlines, points): """Track coastline position changes over time at specific points. Computes the distance of coastline displacement at multiple monitoring points throughout a time series, measuring retreat or advance perpendicular to the initial coastline position. Parameters ---------- coastlines : dict Dictionary with time steps as keys and DataFrames with coastline coordinates as values. Each DataFrame should contain 'x' and 'y' columns. points : list List of 2D points (x, y tuples or arrays) where coastline evolution is tracked Returns ------- pd.DataFrame DataFrame with coastline evolution distances at each monitoring point. Positive values indicate advance, negative values indicate retreat. """ no_points = len(points) no_steps = len(coastlines) evolution = pd.DataFrame(-1, index=range(1, no_steps), columns=range(no_points)) for ind_, point in enumerate(points): index_0 = utils.nearest(coastlines[0], point) for time_ in coastlines.keys(): index_ = utils.nearest( coastlines[time_], [coastlines[0].loc[index_0, "x"], coastlines[0].loc[index_0, "y"]], ) if ( coastlines[time_].loc[index_, "y"] - coastlines[0].loc[index_0, "y"] ) > 0: constant = -1 else: constant = 1 evolution.loc[time_, ind_] = constant * np.sqrt( (coastlines[time_].loc[index_, "x"] - coastlines[0].loc[index_0, "x"]) ** 2 + (coastlines[time_].loc[index_, "y"] - coastlines[0].loc[index_0, "y"]) ** 2 ) return evolution
[docs] def precipitation_to_flow(data, info): """Convert precipitation time series to streamflow using SCS Curve Number method. Transforms rainfall data into runoff hydrographs using the NRCS (formerly SCS) Curve Number method with unit hydrograph theory. Accounts for soil moisture conditions, infiltration, and routing. Parameters ---------- data : pd.Series or pd.DataFrame Precipitation time series with datetime index info : dict Dictionary with watershed and model parameters: - lon : float Main river length (km) - slope : float Main river slope (m/m) - area : float Watershed area (km²) - cn2 : float SCS Curve Number for average moisture conditions (0-100) - k_ac : float, optional Aquifer storage constant (hours). Default: 360 - f_abs : float, optional Absorption factor. Default: 1 - rainning_pattern : str, optional SCS precipitation pattern ID. Default: '1_24h' - events : bool, optional Whether data contains discrete events. Default: False - freq_raw_data : str, optional Frequency of raw data ('D', 'H', etc.). Default: 'D' - dt : float, optional Time step (hours). Default: 1 - model : str Concentration time model: 'SCS', 'Temez', 'Kirpich', 'Kirpich-natural_slope' Returns ------- pd.DataFrame DataFrame with computed hydrological variables including: - pr : Distributed precipitation - cumulative : Cumulative precipitation by events - cn : Dynamic curve number - net_pr : Effective precipitation (runoff) - sup_flow : Surface flow (m³/s) - base_flow : Base flow (m³/s) - total_flow : Total streamflow (m³/s) - infiltration : Infiltrated water depth - Mass balance statistics Raises ------ ValueError If required parameters are missing or if concentration time model is not recognized """ if ((not "lon") | (not "slope") | (not "area") | (not "cn2")) in info.keys(): raise ValueError( "Watershed area and mean curve number, main river length and slope are required" ) if not "k_ac" in info.keys(): info["k_ac"] = 360 if not "f_abs" in info.keys(): info["f_abs"] = 1 if not "rainning_pattern" in info.keys(): info["rainning_pattern"] = "1_24h" logger.info( "The rainning pattern was not defined. Assuming a 24-hours evenly distributed pattern." ) if not "events" in info.keys(): info["events"] = False if not "freq_raw_data" in info.keys(): info["freq_raw_data"] = "D" if not "dt" in info.keys(): info["dt"] = 1 if not "model" in info.keys(): raise ValueError( "Model of evaluation of concretation times is required. Options are SCS, Temez, Kirpich, and Kirpich-natural_slope" ) info["cn3"] = 23 * info["cn2"] / (10 + 0.13 * info["cn2"]) info["cn1"] = 4.2 * info["cn2"] / (10 - 0.058 * info["cn2"]) data = data * info["f_abs"] pattern_path = os.path.join( os.path.dirname(os.path.realpath(__file__)), "src", "patrones_SCS", ) # Read one of the SCS precipitation patterns pattern = read.xlsx(pattern_path)[info["rainning_pattern"]].values pattern[1:] = pattern[1:] - pattern[:-1] # Create the dataframe for the whole period ini = datetime( data.index[0].year, data.index[0].month, data.index[0].day, data.index[0].hour ) end = datetime(data.index[-1].year, data.index[-1].month, data.index[-1].day, 23) index = pd.date_range(ini, end, freq="H") df = pd.DataFrame(0, index=index, columns=["pr"]) # Distribute the precipitation along the day df = distribute_precipitation(data, pattern, df, info) # Compute the cumulative sum by events df["cumulative"] = cumulative_by_events(df) window_size = 5 + 1 # 5 hours humedo = ( df["cumulative"] .rolling(window_size) .apply(wet_soil, args=(info["cn3"],), raw=True) ) window_size = (5 * 24) + 1 # 5 days into hours seco = ( df["cumulative"] .rolling(window_size) .apply(dry_soil, args=(info["cn1"],), raw=True) ) df["cn"] = 0 df["cn"] = humedo.fillna(info["cn2"]) df["cn"].loc[seco.notnull()] = seco df["f_a"] = 254.0 * (100 / df["cn"] - 1) # Fraction of cumulative abstraction df["i_a"] = 0.2 * df["f_a"] # Abstraccion Inicial --> A partir de SCS df["effec_cum"] = 0 df["effec_cum"].loc[df["cumulative"] >= df["i_a"]] = ( df["cumulative"].loc[df["cumulative"] >= df["i_a"]] - df["i_a"].loc[df["cumulative"] >= df["i_a"]] ) ** 2 / (df["cumulative"] + 0.8 * df["f_a"].loc[df["cumulative"] >= df["i_a"]]) df["net_pr"] = 0 diff = np.diff(df["effec_cum"]) df["net_pr"].iloc[1:] = np.where(diff > 0, diff, 0) df = unit_hydrograph_model(df, info) # BASE FLOW # Computing the infiltrated flow df["infiltration"] = df["pr"] - df["net_pr"] df["infil_flow"] = ( df["infiltration"] * info["area"] * 1e6 / (1000 * 3600) ) # mm/h --> m3/s df["base_flow"] = 0 df["base_flow"].acumulado = df["base_flow"].iloc[0] window_size = 2 df["base_flow"] = ( df["base_flow"] .rolling(window_size) .apply(base_flow, args=(df, info), raw=False) ) df["base_flow"].iloc[0] = 0 df["total_flow"] = df["base_flow"] + df["sup_flow"] df["total_sup_vol"] = ( df["sup_flow"].sum() * 3600 / 1e6 ) # Total volume of runoff (m3) df["total_infil_vol"] = ( df["infil_flow"].sum() * 3600 / 1e6 ) # Total volume of rain (m3) df["total_input_vol"] = (df["pr"] * info["area"] * 1e6 / 1000).sum() / 1e6 df["mass_balance"] = ( df["total_sup_vol"] + df["total_infil_vol"] - df["total_input_vol"] ) df["error"] = 100 - ( (df["total_sup_vol"] + df["total_infil_vol"]) / df["total_input_vol"] * 100 ) return df
[docs] def wet_soil(window, cn): """Determine if soil is in wet condition based on rainfall history. Evaluates soil moisture dynamics using a rolling window. Classifies soil as wet if continuous rainfall occurred within the last 5 hours. Parameters ---------- window : np.ndarray Rolling window of cumulative precipitation values cn : float Curve Number value for wet soil conditions (CN3) Returns ------- float Curve Number for wet conditions if all values in window are non-zero, otherwise returns NaN Notes ----- Soil moisture classification criteria: - More than 5 hours of continuous rain: wet soil (returns cn) - More than 5 days without rain: dry soil (see dry_soil function) - Between 1-5 hours of rain: average conditions (CN2) """ if np.count_nonzero(~np.isclose(window, 0)) == window.size: value = cn else: value = np.NaN return value
[docs] def dry_soil(window, cn): """Determine if soil is in dry condition based on antecedent dry period. Evaluates soil moisture dynamics using a rolling window. Classifies soil as dry if no rainfall occurred within the last 5 days. Parameters ---------- window : np.ndarray Rolling window of cumulative precipitation values (typically 5 days = 120 hours) cn : float Curve Number value for dry soil conditions (CN1) Returns ------- float Curve Number for dry conditions if all values in window are zero, otherwise returns NaN """ if np.count_nonzero(np.isclose(window, 0)) == window.size: value = cn else: value = np.NaN return value
[docs] def unit_hydrograph_model(df, info): """Transform rainfall to discharge using SCS Unit Hydrograph method. Applies unit hydrograph theory to convert effective precipitation into surface runoff hydrographs. Supports multiple concentration time formulas. Parameters ---------- df : pd.DataFrame DataFrame with effective precipitation ('net_pr' column) info : dict Dictionary with watershed parameters: - model : str Concentration time model ('SCS', 'Temez', 'Kirpich', 'Kirpich-natural_slope') - lon : float Main river length (km) - slope : float Main river slope (m/m) - cn2 : float Curve Number for average conditions - area : float Watershed area (km²) - dt : float Time step (hours) Returns ------- pd.DataFrame Updated DataFrame with 'sup_flow' column containing surface flow (m³/s) Raises ------ ValueError If the specified unit hydrograph model is not implemented Notes ----- Concentration time formulas: - SCS: tc = 0.071 * L^0.8 / (S^0.25) * (1000/CN - 9)^0.7 - Temez: tc = 0.3 * (L / S^0.25)^0.76 - Kirpich: tc = 0.066 * L^0.77 / S^0.385 - Kirpich-natural_slope: tc = 0.13 * L^0.77 / S^0.385 """ if info["model"] == "SCS": tc = ( 0.071 * info["lon"] ** (0.8) / (info["slope"] ** (1 / 4)) * (1000 / info["cn2"] - 9) ** (0.7) ) elif info["model"] == "Temez": tc = 0.3 * (info["lon"] / info["slope"] ** (1 / 4)) ** (0.76) elif info["model"] == "Kirpich": tc = 0.066 * (info["lon"] ** (0.77) / info["slope"] ** (0.385)) elif info["model"] == "Kirpich-natural_slope": tc = 0.13 * (info["lon"] ** (0.77) / info["slope"] ** (0.385)) else: raise ValueError( "The unit hydrograph model given ({}) is not yet implemented".format( info["model"] ) ) t_p = 0.5 * info["dt"] + 0.6 * tc t_b = 2.67 * t_p q_p = info["area"] / (1.8 * t_b) hu1 = np.array([[0, 0], [t_p, q_p], [t_b, 0]]) hu2 = interp1d(hu1[:, 0], hu1[:, 1])(np.arange(1, np.ceil(t_b))) # Desarrollamos la matriz de convolucion discreta df["sup_flow"] = np.convolve(df["net_pr"], hu2)[0 : len(df["net_pr"])] return df
[docs] def base_flow(window, df, info): """Compute base flow using exponential recession model. Calculates subsurface flow contribution to total streamflow using an exponential recession equation with aquifer storage dynamics. Parameters ---------- window : pd.Series Rolling window containing current and previous time step df : pd.DataFrame DataFrame containing flow computations with 'base_flow' and 'infil_flow' columns info : dict Dictionary with parameters: - dt : float Time step (hours) - k_ac : float Aquifer storage constant (hours) Returns ------- float Updated base flow value at current time step (m³/s) Notes ----- Uses exponential recession: Q_base(t) = Q_base(t-1) * exp(-dt/k) + Q_infil * (1 - exp(-dt/k)) where k is the aquifer storage constant controlling recession rate. """ ex = np.exp(-info["dt"] / info["k_ac"]) df["base_flow"].acumulado = df["base_flow"].acumulado * ex + df["infil_flow"][ window.index[1] ] * (1 - ex) return df["base_flow"].acumulado
[docs] def distribute_precipitation(data, pattern, df, info): """Distribute precipitation data temporally according to SCS rainfall pattern. Disaggregates daily or multi-hour precipitation data into hourly values following standard SCS (Soil Conservation Service) rainfall distribution patterns. Parameters ---------- data : pd.Series Precipitation time series with datetime index pattern : np.ndarray SCS precipitation distribution pattern (cumulative fractions) df : pd.DataFrame Empty DataFrame with hourly datetime index to fill info : dict Dictionary with parameters: - freq_raw_data : str Frequency of input data ('D' for daily, 'H' for hourly, 'nH' for n-hour) - events : bool Whether data represents discrete storm events Returns ------- pd.DataFrame DataFrame with distributed hourly precipitation in 'pr' column Notes ----- Saves distributed precipitation to 'distributed_precipitation.zip' CSV file. """ if info["events"]: dates = [] for ind_, _ in enumerate(data.index): dates.append( data.index[ind_] - pd.Timedelta( seconds=data.index[ind_].minute * 60 + data.index[ind_].second + data.index[ind_].microsecond / 1e6 ) ) data.index = dates if info["freq_raw_data"] == "D": df["pr"] = np.outer(data.values, pattern).ravel() elif info["freq_raw_data"] == "H": df["pr"] = data.values else: hours = data.index.hour - 1 k, group = 0, 0 group_len = int(info["freq_raw_data"].split("H")[0]) while k < len(hours): if group == 0: locs = [hours[k] + gr for gr in range(group_len)] indexs = np.fmod(locs, 24) mult_ = pattern[hours[indexs]] / np.sum(pattern[hours[indexs]]) date = data.index[k] df.loc[date + pd.Timedelta(hours=group)] = data.loc[date] * mult_[group] if group == group_len - 1: group = 0 k += 1 else: group += 1 save.to_csv(df, "distributed_precipitation.zip") return df
[docs] def cumulative_by_events(df, eps: float = 1e-6): """Compute cumulative sum of precipitation resetting between storm events. Calculates running cumulative precipitation that resets to zero during periods without rainfall, effectively separating discrete storm events. Parameters ---------- df : pd.DataFrame DataFrame with 'pr' column containing precipitation values eps : float, optional Threshold below which values are considered zero. Default: 1e-6 Returns ------- pd.Series Cumulative precipitation with resets at dry periods Notes ----- This function is essential for the SCS Curve Number method as it tracks precipitation accumulation within individual storm events for proper infiltration and runoff calculations. """ pr_hr = df.values[:, 0] cumsum = np.cumsum(pr_hr) cumsum_reset = cumsum[pr_hr == 0] cumsum_reset = np.insert(cumsum_reset, 0, 0) correction = np.diff(cumsum_reset) p_horaria_corrected = pr_hr.copy() p_horaria_corrected[pr_hr == 0] -= correction df_cumulative = pd.Series(p_horaria_corrected.cumsum(), index=df.index) df_cumulative[df_cumulative < eps] = 0 return df_cumulative
[docs] def hydraulic_radius(channel_info, type_="rectangular"): """Calculate hydraulic properties of open channel cross-sections. Computes wetted area, wetted perimeter, hydraulic radius, and water surface width for various channel geometries used in open channel flow calculations. Parameters ---------- channel_info : dict Dictionary with channel geometry parameters: - b : float Channel bottom width (m) - Additional parameters depending on channel type type_ : str, optional Channel cross-section type. Options: 'rectangular', 'trapezoidal', 'triangular', 'circular', 'parabolic'. Default: 'rectangular' Returns ------- pd.DataFrame DataFrame with computed hydraulic properties: - wet_area : Wetted cross-sectional area (m²) - wet_perimeter : Wetted perimeter (m) - rh : Hydraulic radius (m) - water_mirror : Water surface width (m) Raises ------ ValueError If channel type is not implemented Notes ----- Hydraulic radius is defined as: Rh = A / P where A is the wetted area and P is the wetted perimeter. This parameter is fundamental for Manning's equation and other open channel flow formulas. Warning ------- This function has a bug: variable 'y' (water depth) is not defined as parameter """ df = df.copy() # TODO: Check that b and y can be inside the channel_info dictionary if type_ == "rectangular": df["wet_area"] = channel_info["b"] * channel_info["y"] df["wet_perimeter"] = channel_info["b"] + 2 * channel_info["y"] df["rh"] = channel_info["b"] * channel_info["y"] / (channel_info["b"] + 2 * channel_info["y"]) df["water_mirror"] = channel_info["b"] elif type_ == "trapezoidal": df["wet_area"] = (channel_info["b"] + channel_info["y"]) * channel_info["y"] df["wet_perimeter"] = channel_info["b"] + 2 * channel_info["y"] df["rh"] = channel_info["b"] * channel_info["y"] / (channel_info["b"] + 2 * channel_info["y"]) df["water_mirror"] = channel_info["b"] elif type_ == "triangular": df["wet_area"] = channel_info["b"] * channel_info["y"] df["wet_perimeter"] = channel_info["b"] + 2 * channel_info["y"] df["rh"] = channel_info["b"] * channel_info["y"] / (channel_info["b"] + 2 * channel_info["y"]) df["water_mirror"] = channel_info["b"] elif type_ == "circular": df["wet_area"] = channel_info["b"] * channel_info["y"] df["wet_perimeter"] = channel_info["b"] + 2 * channel_info["y"] df["rh"] = channel_info["b"] * channel_info["y"] / (channel_info["b"] + 2 * channel_info["y"]) df["water_mirror"] = channel_info["b"] elif type_ == "parabolic": df["wet_area"] = channel_info["b"] * channel_info["y"] df["wet_perimeter"] = channel_info["b"] + 2 * channel_info["y"] df["rh"] = channel_info["b"] * channel_info["y"] / (channel_info["b"] + 2 * channel_info["y"]) df["water_mirror"] = channel_info["b"] else: raise ValueError("Type of section not yet implemented") return df
[docs] def water_elevation(type_="rectangular"): """Generate objective function for water depth calculation using Manning's equation. Creates and returns a function that computes the residual between observed discharge and Manning's equation prediction for different channel geometries. Used for iterative water depth calculation given discharge. Parameters ---------- type_ : str, optional Channel cross-section type. Options: - 'rectangular' : Rectangular channel - 'trapezoidal' : Trapezoidal channel - 'triangular' : Triangular channel - 'circular' : Circular pipe/culvert - 'parabolic' : Parabolic channel Default: 'rectangular' Returns ------- callable Objective function f(h, Q, channel_params) that returns the absolute residual between actual discharge Q and Manning's equation prediction. Used with numerical solvers to find water depth h. Raises ------ ValueError If channel section type is not implemented Notes ----- Manning's equation: Q = (1/n) * A * R^(2/3) * S^(1/2) where: - Q : Discharge (m³/s) - n : Manning's roughness coefficient - A : Wetted area (m²) - R : Hydraulic radius (m) - S : Channel slope (m/m) The returned function should be minimized (e.g., using scipy.optimize.minimize_scalar) to find the water depth that produces the given discharge. channel_params dictionary should contain: - n : Manning's roughness coefficient - w : Channel width or bottom width (m) - S : Channel bed slope (m/m) - z : Side slope (for trapezoidal/triangular, horizontal:vertical) - D : Diameter (for circular) - T : Top width (for parabolic) - theta : Central angle (for circular) """ if type_ == "rectangular": def rectangular(h, Q, channel_params): return np.abs( Q - 1 / channel_params["n"] * (channel_params["w"] * h) * (channel_params["w"] * h / (channel_params["w"] + 2 * h)) ** (2 / 3) * channel_params["S"] ** 0.5 ) fun = rectangular elif type_ == "trapezoidal": def trapezoidal(h, Q, channel_params): return np.abs( Q - 1 / channel_params["n"] * (channel_params["w"] * channel_params["z"] * h) * h / (channel_params["w"] + 2 * h * np.sqrt(1 + channel_params["z"] ** 2)) ** (2 / 3) * channel_params["S"] ** 0.5 ) fun = trapezoidal elif type_ == "triangular": def triangular(h, Q, channel_params): return np.abs( Q - 1 / channel_params["n"] * (channel_params["z"] * h) / (2 * np.sqrt(1 + channel_params["z"] ** 2)) ** (2 / 3) * channel_params["S"] ** 0.5 ) fun = triangular elif type_ == "circular": def circular(h, Q, channel_params): return np.abs( Q - 1 / channel_params["n"] * ( (1 - np.sin(np.rad2deg(channel_params["theta"])) / np.rad2deg(channel_params["theta"])) * channel_params["D"] / 4 ) ** (2 / 3) * channel_params["S"] ** 0.5 ) fun = circular elif type_ == "parabolic": def parabolic(h, Q, channel_params): return np.abs( Q - 1 / channel_params["n"] * ((2 * channel_params["T"] ** 2 * h) / (3 * channel_params["T"] ** 2 + 8 * h**2)) ** (2 / 3) * channel_params["S"] ** 0.5 ) fun = parabolic else: raise ValueError("Type of section not yet implemented") return fun
[docs] def settling_velocity(ds, info, type_="Rubey"): """Calculate sediment particle settling velocity in water. Computes the terminal fall velocity of sediment particles using empirical formulas that account for particle size, shape, and fluid properties. Parameters ---------- ds : float or np.ndarray Sediment grain diameter (m) info : dict Dictionary with fluid and sediment properties: - nu : float Kinematic viscosity of water (m²/s) - sg : float Specific gravity of sediment (dimensionless, typically 2.65 for quartz) - Sp : float Sediment shape factor (required for Wu_Wang method) type_ : str, optional Settling velocity formula: 'Rubey' or 'Wu_Wang'. Default: 'Rubey' Returns ------- float or np.ndarray Settling velocity (m/s) Raises ------ ValueError If settling velocity formula is not implemented Notes ----- Available formulas: - Rubey (1933): Valid for natural sediments across wide size range - Wu & Wang (2006): Accounts for particle shape effects """ g = 9.81 if type_ == "Rubey": F = (2 / 3 + 36 * info["nu"] ** 2 / (g * (info["sg"] - 1) * ds**3)) ** ( 1 / 2 ) - (36 * info["nu"] ** 2 / (g * (info["sg"] - 1) * ds**3)) ** (1 / 2) w_s = F * (g * (info["sg"] - 1) * ds) ** (0.5) elif type_ == "Wu_Wang": M = 53.5 * np.exp(-0.65 * info["Sp"]) N = 5.65 * np.exp(-2.5 * info["Sp"]) n = 0.7 + 0.9 * info["Sp"] w_s = ( M * info["nu"] / (N * d) * (np.sqrt(0.25 + (4 * N / (3 * M**2) * d_ast**3) ** (1 / n)) - 0.5) ** n ) else: raise ValueError("Settling velocity formula not implemented.") return w_s
[docs] def river_sediment_transport(df, info, type_="meyer-peter-muller"): """Calculate river bed-load sediment transport rate using various formulas. Computes sediment transport in rivers using established empirical and semi-empirical formulas. Supports multiple methods for different grain sizes and flow conditions. Parameters ---------- df : pd.DataFrame DataFrame with hydraulic variables: - h : Water depth (m) info : dict Dictionary with sediment and channel properties: - rho_w : float Water density (kg/m³) - rho_s : float Sediment density (kg/m³) - sg : float Specific gravity of sediment (dimensionless) - d50 : float Median grain diameter (m) - S : float Channel bed slope (m/m) - w : float Channel width (m) - nu : float Kinematic viscosity (m²/s, for some methods) - ds_substrate : array-like Grain size distribution (for Wilcock-Crowe, Yang, Brownlie) - frac_substrate : array-like Fraction of each grain size class (for Wilcock-Crowe) - n : float Manning's roughness coefficient (for some methods) type_ : str, optional Sediment transport formula. Options: - 'meyer-peter-muller' : Meyer-Peter & Müller (1948) for gravel rivers - 'einstein-brown' : Einstein & Brown (1942) probabilistic approach - 'wilcock-crowe' : Wilcock & Crowe (2003) for mixed-size sediments - 'bagnold' : Bagnold (1966) stream power approach - 'yang' : Yang (1973) for sand-bed rivers - 'brownlie' : Brownlie (1981) comprehensive formula Default: 'meyer-peter-muller' Returns ------- pd.Series or np.ndarray Bed-load sediment transport rate Qb (m³/hour or m³/s depending on formula) Notes ----- Meyer-Peter-Müller formula: qb* = 8 * (τ* - τ*c)^1.5 Valid for gravel-bed rivers with uniform sediment Einstein-Brown formula: Uses probability approach with three transport regimes based on τ* Wilcock-Crowe formula: Accounts for hiding/exposure effects in mixed-size sediments Requires grain size distribution input Bagnold formula: Based on stream power concept: qb ∝ τ * U Yang formula: Empirical formula for sand transport in terms of unit stream power Brownlie formula: General formula with dimensionless critical Shields parameter References ---------- - Meyer-Peter, E., & Müller, R. (1948) - Einstein, H. A., & Brown, C. B. (1942) - Wilcock, P. R., & Crowe, J. C. (2003) - Bagnold, R. A. (1966) - Yang, C. T. (1973) - Brownlie, W. R. (1981) """ g = 9.81 tau = info["rho_w"] * g * df["h"] * info["S"] tau_shi = tau / (g * info["d50"] * info["rho_w"] * (info["sg"] - 1)) tau_c_shi = 0.047 if type_ == "meyer-peter-muller": logger.info( "Based on Meyer-Peter-Müller (1948) for gravel rivers with it uses d50" " of the superficial bed layers" ) df["qb"] = 0 motion = tau_shi >= tau_c_shi df["qb"][motion] = ( 8 * (tau_shi[motion] - tau_c_shi) ** (1.5) * (info["d50"] * (g * (info["sg"] - 1) * info["d50"]) ** 0.5) ) df["Qb"] = df["qb"] * 3600 * info["w"] elif type_ == "einstein-brown": w_s = settling_velocity(info["d50"], info) low_motion = tau_shi < 0.18 mid_motion = (tau_shi > 0.18) & (tau_shi < 0.52) high_motion = tau_shi > 0.52 df["qb"] = 0 df["qb"][low_motion] = 2.15 * np.exp(-0.391 / tau_shi[low_motion]) df["qb"][mid_motion] = 40 * tau_shi[mid_motion] ** 3 df["qb"][high_motion] = 15 * tau_shi[high_motion] ** (1.5) df["qb"] = df["qb"] * w_s * info["d50"] df["Qb"] = df["qb"] * 3600 * info["w"] # m3 elif type_ == "wilcock-crowe": dm = np.sum(info["ds_substrate"] * info["frac_substrate"]) if not "Fs" in info.keys(): Fs = 0.2 # Porcentaje de arena en la capa de superficie y substrato logger.info("Percentage of sands if not given. Using Fs equals to 0.2.") tau_rm_shi = 0.021 + 0.015 * np.exp(-20 * Fs) tau_rm = tau_rm_shi / (g * info["d50"] * info["rho_w"] * (info["sg"] - 1)) weight = np.zeros(len(tau)) df["qb"] = 0 for i in range(0, len(info["ds_substrate"]) - 1): b = 0.67 / (1 + np.exp(1.5 - info["ds_substrate"][i] / dm)) # bug aquí tau_ri = tau_rm * (info["ds_substrate"][i] / info["d50"]) ** b Fi = tau / tau_ri mask = Fi < 1.35 weight[mask] = 0.002 * Fi[mask] ** (7.5) weight[~mask] = 14 * (1 - 0.894 / ((Fi[~mask]) ** 0.5)) ** 4.5 df["qb"] = df["qb"] + weight * info["frac_substrate"][i] * info["w"] * ( g * df["h"] * info["S"] ) ** (3 / 2) * info["rho_s"] / ( (info["sg"] - 1) * g ) # o sobra un rho_s df["Qb"] = df["qb"] * 3600 / info["rho_s"] # o falta un rho_s elif type_ == "bagnold": if not "d50" in info.keys(): raise ValueError("d50 is not given.") w_s = settling_velocity(info["d50"], info) U = 1 / info["n"] * df["h"] ** (2 / 3) * info["S"] ** 0.5 if not "eb" in info.keys(): info["eb"] = 0.15 logger.info("Bagnold's effitienty parameter not given. Using 0.15.") df["Qb"] = ( tau * U / (info["sg"] - 1) * (info["eb"] + 0.01 * U / w_s) * info["w"] / info["rho_s"] * 3600 ) elif type_ == "yang": if not "ds_substrate" in info.keys(): raise ValueError("ds is required for Yan method.") w_s = settling_velocity(info["ds_substrate"], info) U = 1 / info["n"] * df["h"] ** (2 / 3) * info["S"] ** 0.5 Q = U * info["w"] * df["h"] uc = (g * df["h"] * info["S"]) ** 0.5 landa = np.zeros((len(info["ds_substrate"]), len(h))) Cm2 = np.zeros((len(info["ds_substrate"]), len(h))) # concentración másica Qv2 = np.zeros((len(info["ds_substrate"]), len(h))) # caudal sólido for i in range(0, len(info["ds_substrate"]) - 1): fac1 = np.zeros(len(U)) mask = (info["ds_substrate"][i] > 0) & ( uc * info["ds_substrate"][i] / info["nu"] > 1.2 & uc * info["ds_substrate"][i] / info["nu"] < 70 ) fac1[mask] = ( 2.5 / (np.log(uc[mask] * info["ds_substrate"][i] / info["nu"]) - 0.06) + 0.66 ) mask = (info["ds_substrate"][i] > 0) & ( uc * info["ds_substrate"][i] / info["nu"] >= 70 ) fac1[mask] = 2.05 mask = (info["ds_substrate"][i] > 0) & (info["ds_substrate"][i] < 0.002) landa[i][mask] = ( 5.435 - 0.286 * np.log(w_s[i] * info["ds"][i] / info["nu"]) - 0.457 * np.log(uc[mask] / w_s[i]) ) +( 1.799 - 0.409 * np.log(w_s[i] * info["ds_substrate"][i] / info["nu"]) - 0.314 * np.log(uc[mask] / w_s[i]) ) * np.log(U[mask] * info["S"] / w_s[i] - fac1[mask] * info["S"]) mask = info["ds_substrate"][i] > 0.002 landa[i][mask] = ( 6.681 - 0.633 * np.log(w_s[i] * info["ds_substrate"][i] / info["nu"]) - 4.816 * np.log(uc[mask] / w_s[i]) ) +( 2.784 - 0.305 * np.log(w_s[i] * info["ds_substrate"][i] / info["nu"]) - 0.282 * np.log(uc[mask] / w_s[i]) ) * np.log(U[mask] * info["S"] / w_s[i] - fac1[mask] * info["S"]) mask = info["ds_substrate"][i] == 0 landa[i][mask] = 0 Cm2[i] = 10 ** landa[i] Qv2[i] = Cm2[i] * 0.001 * Q / info["rho_s"] elif type_ == "browlie": U = 1 / info["n"] * df["h"] ** (2 / 3) * info["S"] ** 0.5 Q = U * info["w"] * df["h"] d_adim = info["ds_substrate"] * ((info["sg"] - 1) * g / (info["nu"] ** 2)) ** ( 1 / 3 ) if "tau_adim" in info: if info["tau_adim"] == "Wu_Wang": tau_adim_c = np.zeros((len(d_adim))) mask = d_adim <= 1.5 tau_adim_c[mask] = 0.126 * d_adim[mask] ** (-0.44) mask = (d_adim > 1.5) & (d_adim <= 10) tau_adim_c[mask] = 0.131 * d_adim[mask] ** (-0.55) mask = (d_adim > 10) & (d_adim <= 20) tau_adim_c[mask] = 0.0685 * d_adim[mask] ** (-0.27) mask = (d_adim > 20) & (d_adim <= 40) tau_adim_c[mask] = 0.0173 * d_adim[mask] ** (0.19) mask = (d_adim > 40) & (d_adim <= 150) tau_adim_c[mask] = 0.0115 * d_adim[mask] ** (0.30) mask = d_adim > 150 tau_adim_c[mask] = 0.052 else: logger.info("Browlie's tau adimensional parameter.") dens_apa = 1600 Y_bw = ( g**0.5 * (dens_apa / info["rho_w"]) ** 0.5 * info["d50"] ** (3 / 2) / info["nu"] ) ** (-0.6) tau_adim_c = 0.22 * Y_bw + 0.06 * 10 ** (7.7 * Y_bw) df["Cppm"] = 0 df["Qb"] = 0 cb = 1.268 Uc = ( ((info["sg"] - 1) * g * info["d50"]) ** 0.5 * 4.596 * tau_adim_c**0.529 * info["S"] ** (-0.1405) * np.std(info["ds_substrate"]) ** (-0.1606) ) df["Cppm"] = ( 7115 * cb * ((U - Uc) / ((info["sg"] - 1) * g * info["d50"]) ** 0.5) ** 1.978 * info["S"] ** 0.6601 * (df["h"] / info["d50"]) ** (-0.3301) ) df["Qb"] = df["Cppm"] * 0.001 * Q * 3600 / info["rho_s"] return df["Qb"]
[docs] def storm_surge_from_waves(data: pd.DataFrame, location: str, var_name: str = "Hm0"): """Compute storm surge elevation from significant wave height. Estimates storm surge (sea level anomaly) based on wave conditions using empirical relationships from the Spanish Flooding Atlas (Atlas de Inundación Español). Parameters ---------- data : pd.DataFrame DataFrame with wave parameters location : str Location identifier for parameter selection. Options: 'Huelva', 'Malaga' var_name : str, optional Column name for significant wave height (m). Default: 'Hm0' Returns ------- pd.DataFrame Input DataFrame with added 'mm' column containing storm surge elevation (m) Notes ----- Uses location-specific polynomial relationships between wave height and surge: - Location parameter (mu): mean surge level - Scale parameter (sigma): surge variability Storm surge is computed stochastically using normal distribution with wave-height-dependent parameters. References ---------- Atlas de Inundación de la Costa Española """ parametros = { "Malaga": { "loc": [-0.0334122, 0.08969, -0.0148889], "esc": [0.0875315, -0.0238893, 0.0473336, -0.0122261], }, "Huelva": { "loc": [-0.0218071, -0.00208886, 0.0286215], "esc": [0.0558449, 0.0446334, -0.0106856, -0.000660628], }, "info": "Parametros del Atlas de Inundacion", } mu = np.polyval(parametros[location]["loc"][::-1], data[var_name]) esc = np.polyval(parametros[location]["esc"][::-1], data[var_name]) # Replace negative variance esc[esc < 0] = np.random.rand(len(esc[esc < 0])) * 1e-3 p_Hs = np.random.rand(len(data)) eta = st.norm.ppf(p_Hs, mu, esc) data["mm"] = data[var_name] * 0 + eta return data
[docs] def flood_fill(c, r, mask): """Perform flood fill algorithm on a binary mask to identify connected regions. Implements a non-recursive 8-way connectivity flood fill algorithm. Starting from a seed cell, identifies all connected cells with value 1 in the mask. Useful for delineating inundation areas or connected coastal regions. Parameters ---------- c : int Starting column index (x-coordinate) r : int Starting row index (y-coordinate) mask : np.ndarray Binary 2D array containing only 1 and 0 values, where 1 represents cells to potentially fill Returns ------- np.ndarray Binary array of same shape as mask, with 1 values for all cells connected to the starting point, 0 elsewhere Notes ----- This algorithm uses 8-way connectivity (includes diagonal neighbors). It's non-recursive to avoid stack overflow issues with large connected regions. The algorithm is commonly used for: - Identifying inundation extents from water level data - Finding connected coastal segments - Delineating watersheds or drainage basins Complexity: O(n) where n is the number of connected cells """ # cells already filled filled = set() # cells to fill fill = set() fill.add((c, r)) width = mask.shape[1] - 1 height = mask.shape[0] - 1 # Our output inundation array flood = np.zeros_like(mask, dtype=np.int8) # Loop through and modify the cells which need to be checked. while fill: # Grab a cell x, y = fill.pop() if (x <= height) & (y <= width) & (x >= 0) & (y >= 0): # Don't fill # continue if (mask[x][y] == 1) & ((x, y) not in filled): # Do fill flood[x][y] = 1 filled.add((x, y)) # Check neighbors for 1 values west = (x - 1, y) east = (x + 1, y) north = (x, y - 1) south = (x, y + 1) northwest = (x - 1, y - 1) northeast = (x + 1, y - 1) southwest = (x - 1, y + 1) southeast = (x + 1, y + 1) if west not in filled: fill.add(west) if east not in filled: fill.add(east) if north not in filled: fill.add(north) if south not in filled: fill.add(south) if northwest not in filled: fill.add(northwest) if northeast not in filled: fill.add(northeast) if southwest not in filled: fill.add(southwest) if southeast not in filled: fill.add(southeast) return flood
[docs] def EOS_sea_water(T, S): """Compute density of seawater using equation of state (EOS). Calculates seawater density as a function of temperature and salinity using the standard mean ocean water equation of state with pure water as reference. Includes pressure correction at 1 meter depth. Parameters ---------- T : pd.DataFrame or np.ndarray Water temperature (°C) S : pd.DataFrame or np.ndarray Salinity (psu - practical salinity units) Returns ------- pd.DataFrame or np.ndarray Seawater density (kg/m³) at given temperature and salinity Notes ----- The calculation follows the standard seawater equation of state: 1. Computes pure water density as function of temperature 2. Adjusts for salinity effects 3. Applies high pressure correction (p = 0.10073 bar at 1m depth) The equation uses empirical polynomial coefficients derived from oceanographic measurements and is valid for typical ocean conditions. Examples -------- >>> import pandas as pd >>> T = pd.Series([15, 20, 25]) # Temperature in °C >>> S = pd.Series([35, 35, 35]) # Salinity in psu >>> rho = EOS_sea_water(T, S) >>> print(rho) """ p = 0.10073 # bar at -1m rho_w = ( 999.842594 + 6.793952 * 1e-2 * T - 9.095290 * 1e-3 * T**2 + 1.001685 * 1e-4 * T**3 - 1.120083 * 1e-6 * T**4 + 6.536332 * 1e-9 * T**5 ) # Density of sea water at one standard atmosphere (p=0) rho_S_T_0 = ( rho_w + ( 8.24493e-1 - 4.0899e-3 * T + 7.6438e-5 * T**2 - 8.2467e-7 * T**3 + 5.3875e-9 * T**4 ) * S + (-5.72466e-3 + 1.0227e-4 * T - 1.6546e-6 * T**2) * (S ** (3 / 2)) + 4.8314e-4 * S**2 ) # Density of sea water at high pressure is: rho_S_T_p Kw = ( 19652.21 + 148.4206 * T - 2.327105 * T**2 + 1.360477e-2 * T**3 - 5.155288e-5 * T**4 ) K_S_T_0 = ( Kw + (54.6746 - 0.603459 * T + 1.09987e-2 * T**2 - 6.1670e-5 * T**3) * S + (7.944e-2 + 1.6483e-2 * T - 5.3009e-4 * T**2) * S ** (3 / 2) ) Aw = 3.239908 + 1.43713e-3 * T + 1.16092e-4 * T**2 - 5.77905e-7 * T**3 A = ( Aw + (2.2838e-3 - 1.0981e-5 * T - 1.6078e-6 * T**2) * S + 1.91075e-4 * S ** (3 / 2) ) Bw = 8.50935e-5 - 6.12293e-6 * T + 5.2787e-8 * T**2 B = Bw + (-9.9348e-7 + 2.0816e-8 * T + 9.1697e-10 * T**2) * S K_S_T_p = K_S_T_0 + A * p + B * p**2 rho = rho_S_T_0 / (1 - p / (K_S_T_p)) return rho
[docs] def bulk_fluid_density(T, S, C, rhos=2650): """Compute bulk fluid density from suspended sediment concentration. Calculates the density of water-sediment mixture accounting for temperature, salinity, and suspended sediment concentration (turbidity). Parameters ---------- T : pd.DataFrame or np.ndarray Water temperature (°C) S : pd.DataFrame or np.ndarray Salinity (psu - practical salinity units) C : pd.DataFrame or np.ndarray Turbidity, suspended sediment concentration (FNU - Formazin Nephelometric Units) rhos : float, optional Sediment particle density (kg/m³). Default: 2650 (typical quartz sand) Returns ------- rho : pd.DataFrame or np.ndarray Clear water density (kg/m³) from temperature and salinity rho_bulk : pd.DataFrame or np.ndarray Bulk fluid density (kg/m³) including suspended sediment effects Notes ----- Conversion factor: Multiply turbidity C by 1.6015e-3 to convert FNU to g/L. The bulk density accounts for: - Base seawater density from EOS (temperature and salinity effects) - Volume displacement by sediment particles - Mass contribution from suspended sediment Formula: ρ_bulk = ρ_water + (1 - ρ_water/ρ_sediment) * C * 1.6015e-3 Examples -------- >>> import pandas as pd >>> T = pd.Series([20, 20, 20]) >>> S = pd.Series([35, 35, 35]) >>> C = pd.Series([0, 100, 500]) # Turbidity in FNU >>> rho, rho_bulk = bulk_fluid_density(T, S, C) >>> print(f"Clear water: {rho[0]:.2f} kg/m³") >>> print(f"With sediment: {rho_bulk[2]:.2f} kg/m³") """ rho = EOS_sea_water(T, S) rho0_prof_1 = rho + (1 - (rho / rhos)) * C * 1.6015 * 1e-3 return rho, rho0_prof_1