import re
import numpy as np
import pandas as pd
import xarray as xr
from environmentaltools.common import read, save
from scipy.io import loadmat as ldm
[docs]
def create_mesh_dictionary(file_name: str, sheet_name: str = None):
"""Read Excel file and create mesh parameter dictionary.
Loads mesh configuration from an Excel file and optionally extracts
a specific sheet as a dictionary.
Args:
file_name (str): Path to the Excel file with mesh parameters.
sheet_name (str, optional): Name of specific sheet to extract as dictionary.
If None, returns the entire DataFrame. Defaults to None.
Returns:
dict or pd.DataFrame: Dictionary of parameters if sheet_name specified,
otherwise returns full DataFrame.
"""
info = read.xlsx(file_name)
if sheet_name is not None:
params = info[sheet_name].to_dict()
else:
params = info
return params
[docs]
def cshore_config():
"""Create default CSHORE model configuration parameters.
Generates a dictionary with default configuration parameters for the CSHORE
(Cross-shore) numerical model, including morphology, wave, and sediment
transport settings.
Returns:
dict: Dictionary containing CSHORE configuration parameters including:
- Model flags (iline, iprofl, isedav, etc.)
- Physical parameters (gamma, sporo, sg, etc.)
- Boundary conditions (timebc_wave, swlbc, etc.)
- Sediment properties (tanphi, blp, slp, etc.)
"""
# Common configuration file
props = {}
props["iline"] = 1 # 1 = single line
props["iprofl"] = (
1 # 0 = no morph, 1 = run morph, 1.1 = run morph without initial smoothing
)
props["isedav"] = 0 # 0 = unlimited sand, 1 = hard bottom
props["iperm"] = 0 # 0 = no permeability, 1 = permeable
props["iover"] = 1 # 0 = no overtopping , 1 = include overtopping
props["infilt"] = 0 # 1 = include infiltration landward of dune crest
props["iwtran"] = (
0 # 0 = no standing water landward of crest, 1 = wave transmission due to overtopping
)
props["ipond"] = 0 # 0 = no ponding seaward of SWL
props["iwcint"] = 1 # 0 = no W & C interaction , 1 = include W & C interaction
props["iroll"] = 1 # 0 = no roller, 1 = roller
props["iwind"] = 0 # 0 = no wind effect
props["itide"] = 0 # 0 = no tidal effect on currents
props["iclay"] = 0 # Clay layer option
props["iveg"] = 0 # Vegetation effect (0 = no vegetation, 1 = include vegetation)
props["veg_Cd"] = 1 # Vegetation drag coefficient
props["veg_n"] = 100 # Vegetation density (stems per m²)
props["veg_dia"] = 0.01 # Vegetation stem diameter (m)
props["veg_ht"] = 0.20 # Vegetation height (m)
props["veg_rod"] = 0.1 # Vegetation erosion limit below sand for failure (m)
props["veg_extent"] = [
0.7,
1,
] # Vegetation coverage as fraction of total domain length
props["gamma"] = 0.8 # shallow water ratio of wave height to water depth
props["sporo"] = 0.4 # sediment porosity
props["sg"] = 2.65 # specific gravity
props["effb"] = 0.005 # suspension efficiency due to breaking eB
props["efff"] = 0.01 # Suspension efficiency due to friction (ef)
props["slp"] = 0.5 # Suspended load parameter
props["slpot"] = 0.1 # Overtopping suspended load parameter
props["tanphi"] = 0.630 # Tangent of sediment friction angle (degrees)
props["blp"] = 0.001 # Bedload transport parameter
props["rwh"] = 0.02 # Numerical runup wire height (m)
props["ilab"] = 1 # Controls boundary condition timing (don't change)
props["fric_fac"] = 0.015 # Bottom friction factor
# Boundary conditions and timing parameters
props["timebc_wave"] = 3600 # Wave boundary condition time interval (seconds)
props["timebc_surg"] = props["timebc_wave"] # Surge boundary condition time
props["nwave"] = 1 # Number of wave time steps
props["nsurg"] = props["nwave"] # Number of surge time steps
props["Wsetup"] = 0 # Wave setup at seaward boundary (meters)
props["swlbc"] = 0.0 # Water level at seaward boundary (meters)
return props
[docs]
def read_cshore(file_type: str, path: str, skiprows: int = 1):
"""Read CSHORE model output files.
Loads and parses output files from the CSHORE numerical model, supporting
various output types (profiles, transport rates, energy, velocities, etc.).
Args:
file_type (str): Type of CSHORE output file to read. Options include:
'bprof', 'bsusl', 'cross', 'energ', 'longs', 'param', 'rolle',
'setup', 'swase', 'timse', 'xmome', 'xvelo', 'ymome', 'yvelo'.
path (str): Directory path containing CSHORE output files.
skiprows (int): Number of rows to skip when reading file. Defaults to 1.
Returns:
pd.DataFrame: Parsed CSHORE output data with appropriate column names
and spatial index (x-distance in meters).
"""
header = {
"bprof": ["z"],
"bsusl": [r"$P_b$", r"$P_s$", r"$V_s$"],
"cross": [r"$Q_{b,x}$", r"$Q_{s,x}$", r"$Q_{b,x} + Q_{s,x}$"],
"crvol": [],
"energ": [r"Eflux (m3/s)", "Db (m2/s)", "Df (m2/s)"],
"longs": [r"$Q_{b,y}$", r"$Q_{s,y}$", r"$Q_{b,y} + Q_{s,y}$"],
"lovol": [],
"param": ["T (s)", r"$Q_b$ (nondim)", "Sigma* (nondim)"],
"rolle": ["Rq (m2/s)"],
"setup": [r"$\eta + S_{tide}$ (m)", "d (m)", r"$\sigma_{eta}$ (m)"],
"swase": ["de (m)", "Uxe (m/s)", "Qxe (m2/s)"],
"timse": ["t (id)", "t (s)", "q0 (m2/s)", "qbx,lw (m2/s)", "qsx,lw (m2/s)"],
"xmome": ["Sxx (m2)", "taubx (m)"],
"xvelo": [r"$U_x$", r"$U_{x,std}$"],
"ymome": ["Sxx (m2)", "taubx (m)"],
"yvelo": ["sin theta (unitary)", r"$U_y$", r"$U_{y,std}$"],
}
# TODO: include morphology options
# EWD: Output exceedance probability 0.015
# q0: wave overtopping rate, qbx,lw: cross-shore bedload transport rate at the landward end of the computation domain
# Construct output filename (CSHORE prepends 'O' to output files)
filename = path + "/" + "O" + file_type.upper()
if file_type == "bprof":
# For beach profile, read number of valid points from header
fid = open(filename, "rb")
properties = fid.readline()
id_ = int(properties.split()[1])
df = pd.read_csv(
filename,
sep="\s+",
skiprows=skiprows,
index_col=0,
names=header[file_type],
)
# Trim to actual number of profile points
df = df.iloc[:id_, :]
else:
# Standard reading for other output types
df = pd.read_csv(
filename,
sep="\s+",
skiprows=skiprows,
index_col=0,
names=header[file_type],
)
# Index represents x distance in meters
# Ensure column names are strings
df.columns = df.columns.astype("str")
return df
[docs]
def read_copla(file_name: str, grid: dict = None):
"""Read COPLA model velocity output files.
Loads velocity field data from COPLA
model output and computes magnitude and direction on a 2D grid with ghost cells.
Args:
file_name (str): Path to COPLA velocity output file.
grid (dict, optional): Existing grid dictionary to update. If None,
creates new dictionary. Defaults to None.
Returns:
dict: Dictionary containing velocity components and derived fields:
- 'u': East-west velocity component (m/s) with ghost cells
- 'v': North-south velocity component (m/s) with ghost cells
- 'U': Velocity magnitude (m/s)
- 'DirU': Velocity direction (degrees, nautical convention)
"""
data = pd.read_csv(
file_name,
skiprows=7,
delim_whitespace=True,
header=None,
index_col=0,
names=["x", "y", "u", "v"],
)
# Create meshgrid for spatial dimensions
_, x = np.meshgrid(data.y.unique(), data.x.unique())
if grid is None:
grid = {}
grid = dict()
nx, ny = np.shape(x)
# Create velocity arrays with ghost cells (boundary padding)
for var_ in ["u", "v"]:
grid[var_] = np.zeros([nx + 2, ny + 2])
# Fill interior cells with data
grid[var_][1:-1, 1:-1] = data[var_].to_numpy().reshape([nx, ny])
# Compute velocity magnitude
grid["U"] = np.sqrt(grid["u"] ** 2 + grid["v"] ** 2)
# Compute velocity direction (nautical convention: 0° = North, clockwise)
grid["DirU"] = np.fmod(np.rad2deg(np.arctan2(grid["v"], grid["u"])) + 90, 360)
return grid
[docs]
def read_swan(file_name: str, grid: dict = None, variables: list = None):
"""Read SWAN model output from MATLAB file format.
Loads wave field data from SWAN (Simulating WAves Nearshore) model output
stored in MATLAB format and computes wave number from wavelength.
Args:
file_name (str): Path to MATLAB (.mat) file containing SWAN output.
grid (dict, optional): Existing grid dictionary to update. If None,
creates new dictionary. Defaults to None.
variables (list, optional): List of output variable names in order:
[x, y, depth, Qb, L, Setup, Hs, DirM]. If None, uses default names.
Defaults to None.
Returns:
dict: Dictionary containing wave parameters on grid:
- 'x', 'y': Coordinates
- 'depth': Water depth (m)
- 'Qb': Wave breaking dissipation
- 'L': Wavelength (m)
- 'Setup': Wave setup (m)
- 'Hs': Significant wave height (m)
- 'DirM': Mean wave direction (degrees)
- 'kp': Wave number (2π/L) computed from wavelength
"""
if not variables:
variables = ["x", "y", "depth", "Qb", "L", "Setup", "Hs", "DirM"]
if grid is None:
grid = {}
# Load MATLAB file with SWAN output
swan_dictionary = ldm(file_name)
# Map SWAN variable names to user-specified names
for ind_, var_ in enumerate(
["Xp", "Yp", "Depth", "Qb", "Wlen", "Setup", "Hsig", "Dir"]
):
grid[variables[ind_]] = swan_dictionary[var_]
# Replace NaN values with small number to avoid numerical issues
grid[variables[ind_]][np.isnan(grid[variables[ind_]])] = 1e-6
# Compute wave number from wavelength
grid["kp"] = 2 * np.pi / grid["L"]
return grid
[docs]
def delft_raw_files_point(
point: list,
mesh_filename: str,
folder: str,
variables: list,
num_cases: int,
filename: str = "seastates"
):
"""Extract time series data at a specific point from Delft3D model outputs.
Reads Delft3D raw output files and extracts time series at the nearest
grid point to specified coordinates. Saves results to compressed CSV.
Args:
point (list): [x, y] coordinates of extraction point.
mesh_filename (str): Path to Delft3D mesh file containing grid coordinates.
folder (str): Directory containing case subdirectories with model outputs.
variables (list): Variable names to extract (e.g., ['hs', 'tp', 'eta']).
num_cases (int): Number of model cases to process.
filename (str): Output filename prefix. Defaults to "seastates".
Returns:
None: Saves extracted data to ZIP file with format:
{filename}{point[0]}_{point[1]}.zip
"""
cases = np.arange(1, num_cases + 1)
# Read mesh file and parse coordinate data
fid = open(mesh_filename, "r")
data = fid.readlines()
readed, kline = [], -1
# Extract coordinate lines starting at line 8
for i in range(8, len(data)):
if data[i].startswith(" ETA= 1 "):
readed.append(data[i])
kline += 1
else:
readed[kline] += data[i]
# Regular expression to extract numeric values (including scientific notation)
numeric_const_pattern = (
"[-+]? (?: (?: \d* \. \d+ ) | (?: \d+ \.? ) )(?: [Ee] [+-]? \d+ ) ?"
)
rx = re.compile(numeric_const_pattern, re.VERBOSE)
# Extract x and y coordinates from first two coordinate lines
x, y = rx.findall(readed[0]), rx.findall(readed[1])
# Convert string coordinates to floats
for i, j in enumerate(x):
x[i], y[i] = float(x[i]), float(y[i])
# Determine grid structure from coordinate patterns
idx = np.where(np.isclose(x, 2))[0][0]
nlen = int(len(x) / idx)
idxs = np.arange(0, len(x), idx, dtype=int)
# Remove separator values
for i in idxs[::-1]:
del x[i], y[i]
# Reshape coordinates into 2D grid
x, y = np.reshape(np.array(x), (nlen, idx - 1)), np.reshape(
np.array(y), (nlen, idx - 1)
)
# Find nearest grid point to requested location
ids = np.where(
np.min(np.sqrt((x - point[0]) ** 2 + (y - point[1]) ** 2))
== np.sqrt((x - point[0]) ** 2 + (y - point[1]) ** 2)
)
# If water level (eta) requested, find nearest point in trim file grid
if "eta" in variables:
datax = xr.open_mfdataset(
folder + "/case0001/trim-guad.nc", combine="by_coords"
)
x = datax.XCOR.compute().data
y = datax.YCOR.compute().data
ids_trim = np.where(
np.min(np.sqrt((x - point[0]) ** 2 + (y - point[1]) ** 2))
== np.sqrt((x - point[0]) ** 2 + (y - point[1]) ** 2)
)
# Initialize output DataFrame
data = pd.DataFrame(-1, index=cases, columns=[variables])
# Loop through all cases and extract data at point
for i in cases:
# Read first variable file to get grid dimensions
fid = open(folder + "/case" + str(i).zfill(4) + "/" + variables[0] + ".txt", "r")
info = fid.readlines()
nodesxt, nodesy, nodest = [int(nodes) for nodes in rx.findall(info[3])]
nodesx = int(nodesxt / nodest)
# Extract each requested variable
for var_ in variables:
if var_ == "eta":
# Water level requires NetCDF trim file
datax = xr.open_mfdataset(
folder + "/case" + str(i).zfill(4) + "/trim-guad.nc",
combine="by_coords",
)
z = datax.S1.compute().data # S1 = water level
z = z[-1, :, :] # Extract last time step
data.loc[i, "eta"] = z[ids_trim]
else:
# Other variables from text files
data.loc[i, var_] = np.loadtxt(
folder + "/case" + str(i).zfill(4) + "/" + var_ + ".txt",
skiprows=nodesxt - nodesx + 4,
)[ids[1][0], ids[0][0]]
# Save extracted time series to compressed CSV
save.to_csv(data, filename + "_" + str(point[0]) + "_" + str(point[1]) + ".zip")
return
[docs]
def delft_raw_files(folder: str, variables: dict, case_id: str):
"""Load Delft3D raw output files for a specific case.
Reads multiple variable output files from Delft3D model for a given case,
handling both communication (vars_com_guad) and wave (vars_wavm) variables.
Args:
folder (str): Path to directory containing case subdirectories.
variables (dict): Dictionary with keys 'vars_com_guad' and/or 'vars_wavm',
each containing list of variable names to load.
case_id (str): Case identifier (e.g., 'case0001').
Returns:
dict: Dictionary where keys are variable names and values are numpy
arrays containing the spatial data for each variable.
"""
# Regular expression to extract numeric values (including scientific notation)
numeric_const_pattern = (
"[-+]? (?: (?: \d* \. \d+ ) | (?: \d+ \.? ) )(?: [Ee] [+-]? \d+ ) ?"
)
rx = re.compile(numeric_const_pattern, re.VERBOSE)
dic = {}
for var_ in variables:
if var_ == "vars_com_guad":
# Load communication variables (COM module outputs)
fid = open(folder / f"{case_id}" / f"{variables['vars_com_guad'][0]}.txt", "r")
info = fid.readlines()
nodesxt, nodesyt, nodest = [int(nodes) for nodes in rx.findall(info[3])]
nodesx = int(nodesxt / nodest)
for j in variables["vars_com_guad"]:
dic[str(j)] = np.loadtxt(
folder / f"{case_id}" / f"{j}.txt", skiprows=nodesxt - nodesx + 4
)
else:
# Load wave variables (WAVE module outputs)
fid = open(folder / f"{case_id}" / f"{variables['vars_wavm'][0]}.txt", "r")
info = fid.readlines()
nodesxt, nodesyt, nodest = [int(nodes) for nodes in rx.findall(info[3])]
nodesx = int(nodesxt / nodest)
for j in variables["vars_wavm"]:
dic[str(j)] = np.loadtxt(
folder / f"{case_id}" / f"{j}.txt", skiprows=nodesxt - nodesx + 4
)
return dic