Source code for environmentaltools.common.write

import numpy as np
from environmentaltools.common import save
from scipy.io import loadmat as ldm
import os


[docs] def write_cshore_input(properties: dict, output_folder: str): """Write CSHORE model input file. Creates the infile for the CSHORE coastal hydrodynamics model with the specified configuration parameters. Parameters ---------- properties : dict Dictionary containing CSHORE model parameters including: - header: Model header text - iline, iprofl, isedav, iperm, iover, iwtran, ipond, infilt, iwcint, iroll, iwind, itide, iveg: Integer flags for model options - dx: Spatial grid spacing - gamma: Wave breaking parameter - d50, wf, sg: Sediment parameters (grain size, fall velocity, specific gravity) - effb, efff, slp: Efficiency and slope parameters - and other model-specific parameters output_folder : str Path to folder where infile will be created. """ fid = open(output_folder + "/infile", "w") fid.write("3 \n") fid.write("{} \n".format(str(properties["header"]))) fid.write( "{} ->ILINE\n".format( str(properties["iline"]) ) ) fid.write( "{} ->IPROFL\n".format( str(properties["iprofl"]) ) ) fid.write( "{} ->ISEDAV\n".format( str(properties["isedav"]) ) ) fid.write( "{} ->IPERM\n".format( str(properties["iperm"]) ) ) fid.write( "{} ->IOVER\n".format( str(properties["iover"]) ) ) fid.write( "{} ->IWTRAN\n".format( str(properties["iwtran"]) ) ) fid.write( "{} ->IPOND\n".format( str(properties["ipond"]) ) ) fid.write( "{} ->INFILT\n".format( str(properties["infilt"]) ) ) fid.write( "{} ->IWCINT\n".format( str(properties["iwcint"]) ) ) fid.write( "{} ->IROLL \n".format( str(properties["iroll"]) ) ) fid.write( "{} ->IWIND \n".format( str(properties["iwind"]) ) ) fid.write( "{} ->ITIDE \n".format( str(properties["itide"]) ) ) fid.write( "{} ->IVEG \n".format( str(properties["iveg"]) ) ) # fid.write('{} ->ICLAY \n'.format(str(properties['iclay']))) fid.write( "{:11.4f} ->DX\n".format(properties["dx"]) ) fid.write( "{:11.4f} ->GAMMA \n".format( properties["gamma"] ) ) fid.write( "{:11.4f}{:11.4f}{:11.4f} ->D50 WF SG\n".format( properties["d50"], properties["wf"], properties["sg"] ) ) fid.write( "{:11.4f}{:11.4f}{:11.4f}{:11.4f} ->EFFB EFFF SLP\n".format( properties["effb"], properties["efff"], properties["slp"], properties["slpot"], ) ) fid.write( "{:11.4f}{:11.4f} ->TANPHI BLP\n".format( properties["tanphi"], properties["blp"] ) ) fid.write( "{:11.4f} ->RWH \n".format( properties["rwh"] ) ) fid.write( "{} ->ILAB\n".format( str(properties["ilab"]) ) ) fid.write( "{} ->NWAVE \n".format( str(properties["nwave"]) ) ) fid.write( "{} ->NSURGE \n".format( str(properties["nsurg"]) ) ) fid.write( "{:11.2f}{:11.4f}{:11.4f}{:11.4f}{:11.4f}{:11.4f}\n".format( properties["timebc_wave"], properties["Tp"], properties["Hrms"], properties["Wsetup"], properties["swlbc"], properties["angle"], ) ) fid.write( "{} ->NBINP \n".format(str(len(properties["x"]))) ) # if properties.iperm==1|properties.isedav >= 1: # fid.write('{} ->NPINP \n',length(properties.x_p)) fid.close() fid = open(output_folder + "/infile", "a") dum = np.vstack([properties["x"], properties["zb"], properties["fw"]]) for line in range(dum.shape[1]): fid.write("{:11.4f}{:11.4f}{:11.4f}\n".format(*dum[:, line])) # if properties.iperm == 1 | properties.isedav >= 1: # dum = [properties.x_p(:) properties.zb_p(:)] # fid.write('#11.4f#11.4f\n', dum) if properties["iwind"]: fid.write("1 \n") fid.write( "{:11.1f}{:11.4f}{:11.4f}\n".format( 0, properties["VelV"], properties["DirV"] ) ) fid.write( "{:11.1f}{:11.4f}{:11.4f}\n".format( properties["timebc_wave"], properties["VelV"], properties["DirV"] ) ) if properties["itide"]: fid.write("1 \n") fid.write("{:11.1f}{:13.8f}\n".format(0, properties["slgradient"])) fid.write( "{:11.1f}{:13.8f}\n".format( properties["timebc_wave"], properties["slgradient"] ) ) fid.close() return
[docs] def write_swan_input(case_index: int, time_index, case_id: str, data, params: dict, mesh: str = "global", local: bool = False, nested: bool = False): """Write SWAN wave model input files. Creates initialization and input files for the SWAN (Simulating WAves Nearshore) model for a specific case. Args: case_index (int): Sequential case number (0-based index). time_index: Timestamp from the data index for this case. case_id (str): String identifier for this case (e.g., '0001', '0002'). data (pd.DataFrame): Time series of boundary condition data. params (dict): Dictionary with model parameters including: - directory: Root directory for output files - Mesh configuration parameters - Wave and wind parameters mesh (str, optional): Mesh type identifier ('global' or 'local'). Defaults to 'global'. local (bool, optional): Whether to use local mesh configuration. Defaults to False. nested (bool, optional): Whether to enable nested grid mode. Defaults to False. """ swfile = open(params["directory"] + "/" + case_id + "/swaninit", "w") swfile.write("4 version of initialisation file\n") swfile.write("GDFA name of institute\n") swfile.write("3 command file ref. number\n") swfile.write("input_" + mesh + "_" + case_id + ".swn\n") swfile.write("4 print file ref. number\n") swfile.write("print_" + mesh + "_" + case_id + ".prt\n") swfile.write("4 test file ref. number\n") swfile.write(" test file name\n") swfile.write("6 screen ref. number\n") swfile.write("99 highest file ref. number\n") swfile.write("$ comment identifier\n") swfile.write(" TAB character\n") swfile.write("/ dir sep char in input file\n") swfile.write( "/ dir sep char replacing previous one\n" ) swfile.write("1 default time coding option\n") swfile.close() imswfile = open( params["directory"] + "/" + case_id + "/input_" + mesh + "_" + case_id + ".swn", "w" ) imswfile.write( "$*******************************HEADING******************************************\n" ) imswfile.write("$\n") imswfile.write("PROJ '" + params["project_name"] + "' '" + case_id + "' \n") imswfile.write("$Caso " + mesh + "_" + case_id + "\n") imswfile.write( "$***************************** MODEL INPUT ****************************************\n" ) imswfile.write( "SET LEVEL " + str(np.round(data.loc[time_index, "eta"], decimals=2)) + "\n" ) imswfile.write("$ xpc ypc alpc xlenc ylenc mxc myc \n") imswfile.write("$\n") imswfile.write( "CGRID REGULAR " + str(params[mesh + "_coords_x"]) + " " + str(params[mesh + "_coords_y"]) + " " + str(np.round(np.remainder(params[mesh + "_angle"], 360), decimals=2)) + " " + str(params[mesh + "_length_x"]) + " " + str(params[mesh + "_length_y"]) + " " + str(params[mesh + "_nodes_x"] - 1) + " " + str(params[mesh + "_nodes_y"] - 1) + " CIRCLE 36 0.05 0.50\n" ) imswfile.write("$\n") imswfile.write("$xpinp ypinp alpinp mxinp myinp dxinp dyinp\n") imswfile.write( "INPGRID BOTTOM " + str(params[mesh + "_coords_x"]) + " " + str(params[mesh + "_coords_y"]) + " " + str(np.round(np.remainder(params[mesh + "_angle"], 360), decimals=2)) + " " + str(params[mesh + "_nodes_x"] - 1) + " " + str(params[mesh + "_nodes_y"] - 1) + " " + str(params[mesh + "_inc_x"]) + " " + str(params[mesh + "_inc_y"]) + "\n" ) imswfile.write("$\n") imswfile.write( "$ fac fname idla nhedf formato ((mxinp+1)FN.d)\n" ) imswfile.write("$\n") imswfile.write( "READINP BOTTOM -1. '" + case_id + "_" + mesh + ".dat' 3 0 FORMAT '(" + str(params[mesh + "_nodes_x"]) + "F9.3)'\n" ) imswfile.write("$\n") imswfile.write( "WIND " + str(np.round(data.loc[time_index, "Vv"], decimals=2)) + " " + str(np.round(np.remainder(270 - data.loc[time_index, "DirV"], 360), decimals=2)) + "\n" ) imswfile.write("$\n") if mesh == "global": # Here, direction has PdE convention (waves coming from N: 0º, E: 90º) if data.loc[time_index, "DirM"] < 90: side = ["N", "E"] elif (data.loc[time_index, "DirM"] < 180) & (data.loc[time_index, "DirM"] >= 90): side = ["S", "E"] elif (data.loc[time_index, "DirM"] < 270) & (data.loc[time_index, "DirM"] >= 180): side = ["S", "W"] else: side = ["N", "W"] imswfile.write("BOUN SHAPESPEC JONSWAP PEAK DSPR POWER\n") imswfile.write("$ Hs Tp Dir dd(spreading power)\n") imswfile.write( "BOUN SIDE " + side[0] + " CONSTANT PAR " + str(np.round(data.loc[time_index, "Hs"], decimals=2)) + " " + str(np.round(data.loc[time_index, "Tp"], decimals=2)) + " " + str( np.round(np.remainder(270 - data.loc[time_index, "DirM"], 360), decimals=2) ) + " 2.00\n" ) imswfile.write( "BOUN SIDE " + side[1] + " VARIABLE PAR 200 " + str(np.round(data.loc[time_index, "Hs"], decimals=2)) + " " + str(np.round(data.loc[time_index, "Tp"], decimals=2)) + " " + str( np.round(np.remainder(270 - data.loc[time_index, "DirM"], 360), decimals=2) ) + " 2.00\n" ) else: imswfile.write("BOUNdnest1 NEST '" + case_id + ".bnd' CLOSED\n") imswfile.write("$*************************WIND GROWTH**************************\n") imswfile.write("GEN3 AGROW\n") imswfile.write( "$*************************WAVE-WAVE INTERACTION**************************\n" ) imswfile.write("TRIAD\n") imswfile.write("QUAD\n") imswfile.write("$*************************DISSIPATION**************************\n") imswfile.write("BREAKING\n") imswfile.write("WCAP\n") imswfile.write("$*************************************************************\n") imswfile.write("SETUP\n") imswfile.write("DIFFRACTION\n") imswfile.write("NUM ACCUR 0.005 0.01 0.005 99.5 STAT 50 0.1\n") if nested: imswfile.write( "$*************************OUPUT LOCATIONS**************************\n" ) imswfile.write( "NGRID '" + case_id + ".dat" + "' " + str(params["local_coords_x"]) + " " + str(params["local_coords_y"]) + " " + str(np.round(np.remainder(params["local_angle"], 360), decimals=2)) + " " + str(params["local_length_x"]) + " " + str(params["local_length_y"]) + " " + str(params["local_nodes_x"] - 1) + " " + str(params["local_nodes_y"] - 1) + " \n" ) imswfile.write( "$***************************** MODEL OUTPUT LOCATIONS***************************************\n" ) imswfile.write("NESTOUT '" + case_id + ".dat" + "' '" + case_id + ".bnd' \n") else: imswfile.write( "$***************************** MODEL COMPUTACIONAL GRID OUTPUT ***************************************\n" ) imswfile.write( "BLOCK 'COMPGRID' NOHEAD '" + case_id + ".mat' LAY 3 XP YP HSIGN TPS DIR QB WLEN DEPTH SETUP\n" ) imswfile.write( "$***************************** COMPUTATIONS ***************************************\n" ) imswfile.write("COMPUTE\n") imswfile.write("STOP\n") imswfile.close() return
[docs] def write_copla_input(case_index: int, time_index, case_id: str, data, params: dict, mesh: str = "local"): """Write COPLA wave propagation model input files. Creates input files for the COPLA (Coastal Propagation of LArge waves) model, using SWAN output as boundary conditions. Args: case_index (int): Sequential case number (0-based index). time_index: Timestamp from the data index for this case. case_id (str): String identifier for this case (e.g., '0001', '0002'). data (pd.DataFrame): Time series of boundary condition data. params (dict): Dictionary with model parameters including: - directory: Root directory for input/output files - Mesh configuration parameters mesh (str, optional): Mesh type identifier. Defaults to 'local'. """ fSwan = ldm(params["directory"] + "/" + case_id + "/" + case_id + ".mat") Hsig, h_CoPla = fSwan["Hsig"], fSwan["Depth"] Dir = ( fSwan["Dir"] - 90 ) # lo giro para adapatarlo al eje de Copla, los ejes de copla son Xc = -Ys, Yc = Xs (c: copla, s:swan) nr, mr = params[mesh + "_nodes_x"], params[mesh + "_nodes_y"] xp, yp = ( np.arange(0, mr) * params["local_inc_x"], np.arange(0, nr) * params["local_inc_y"], ) # Confirmar que es correcto Hsig[np.isnan(Hsig)], Dir[np.isnan(Dir)], h_CoPla[np.isnan(h_CoPla)] = ( 1e-6, 1e-6, 1e-6, ) fp = open(params["directory"] + "/" + case_id + "/" + case_id + "out.dat", "w") fp.write(str(nr) + " " + str(mr) + " 1\n") for col in range(0, nr - 1): fp.write(str(yp[col]) + " ") fp.write(str(yp[-1]) + "\n") for row in range(0, mr): fp.write(str(xp[row]) + "\n") for col in range(0, nr - 1): fp.write("%8.3f" % h_CoPla[row, col] + " ") fp.write("%8.3f" % h_CoPla[row, -1] + "\n") for col in range(0, nr - 1): fp.write("%8.3f" % (Hsig[row, col] / 2) + " ") fp.write("%8.3f" % (Hsig[row, -1] / 2) + "\n") if row != 0: for col in range(0, nr - 1): fp.write("%8.3f" % (Dir[row, col]) + " ") fp.write("%8.3f" % (Dir[row, -1]) + "\n") fp.close() fp = open(params["directory"] + "/" + case_id + "/CLAVE.DAT", "w") fp.write(case_id + "\n") fp.close() # Write input file fp = open(params["directory"] + "/" + case_id + "/" + case_id + "in.dat", "w") htol, nd = 50, 1 fp.write("nx ny\n") fp.write(str(nr) + " " + str(mr) + " 1\n") fp.write("iu ntype icur ibc\n") fp.write(" 1 1 0 1\n") fp.write("dx dy htol\n") fp.write( str(params["local_inc_y"]) + " " + str(params["local_inc_x"]) + " " + str(htol) + "\n" ) fp.write("nd\n") fp.write(str(nd) + "\n") fp.write("if1 if2 if3\n") fp.write("1 0 0\n") fp.write("iinput ioutput\n") fp.write("1 1\n") fp.write("T marea\n") fp.write(str(data.loc[time_index, "Tp"]) + " " + str(data.loc[time_index, "eta"]) + "\n") fp.write("amp dir(grados)\n") fp.write( str(data.loc[time_index, "Hs"] / 2) + " " + str(180 - data.loc[time_index, "DirM"]) + " \n" ) fp.close() fp = open(params["directory"] + "/" + case_id + "/" + case_id + "dat", "w") fp.write("*\n") fp.write("*\n") fp.write("* FICHERO DE DATOS PARA CALIBRACION\n") fp.write("*\n") fp.write("F(2F10.3,3I5)\n") fp.write("* IT = INTERVALO DE TIEMPO\n") fp.write("* ROZA = RUGOSIDAD DE CHEZY --> 1/Mannig\n") fp.write("* NT = NUMERO DE ESCRITURAS EN FICHERO\n") fp.write("* REPE = NUMERO DE ITERACIONES ENTRE LAS ESCRITURAS\n") fp.write("* IESDAO = NUMERO DE REPEs HASTA LA PRIMERA ESCRITURA\n") fp.write("*\n") fp.write("* EN TOTAL LAS ITERACIONES SON --> ((NT-1)*REPE + IESDAO*REPE)\n") fp.write( "* HAY QUE CUMPLIR LA CONDICION --> (NN >= (NT-1)*REPE + IESDAO*REPE)\n" ) fp.write("*\n") fp.write("* IT ROZA NT REPE IESDAO\n") fp.write("******.***######.###*****#####*****\n") fp.write(" 0.500 15.000 1 1000 1\n") fp.write("*\n") fp.write("* EDDY = FACTOR EDDY VISCOSITY\n") fp.write("* CORI = FACTOR DE CORIOLIS\n") fp.write("* NINTER= NUMERO ITERACIONES EN TERMINOS NO LINEALES\n") fp.write("F(2F10.3,I5)\n") fp.write("* EDDY CORI NINTER\n") fp.write(" 30.000 0.000 3\n") fp.write("* \n") fp.write("* IANL = TERMINOS NO LINEALES (SI = 1)\n") fp.write("* IAGUA = INUNDACION DE CELDAS (SI = 1)\n") fp.write("* ISLIP = CONTORNOS SIN FRICCION (SI = 1)\n") fp.write("F(3I5)\n") fp.write("* IANL IAGUA ISLIP\n") fp.write(" 1 0 0\n") fp.write("*\n") fp.write("*\n") fp.write( "* COORDENADAS DE PUNTOS DONDE SE DESEE TENER UN FICHERO EN EL TIEMPO \n" ) fp.write("* DE SUPERFICIE LIBRE (ETA), VELOCIDAD (U), VELOCIDAD (V). \n") fp.write("F(I5)\n") fp.write("*NUMERO DE PUNTOS (MAXIMO 30 PUNTOS)\n") fp.write(" 0\n") return
[docs] def create_project_directory(params: dict, data, global_db, local_db): """Create project folder structure with initialized files for SWAN and COPLA models. Generates a directory structure with subdirectories for each time step, containing bathymetry files and SWAN input files. Args: params (dict): Dictionary with model parameters including: - directory: Root directory path for the project data (pd.DataFrame): Time series of boundary condition data with datetime index. global_db (xr.Dataset): Global mesh bathymetry dataset with 'depth' variable. local_db (xr.Dataset): Local mesh bathymetry dataset with 'depth' variable. Returns: None: Creates directory structure and files as side effect. """ os.makedirs(params["directory"], exist_ok=True) for ind_, time in enumerate(data.index): case_id = str(ind_ + 1).zfill(4) os.makedirs(params["directory"] + "/" + case_id, exist_ok=True) save.to_txt( params["directory"] + "/" + case_id + "/" + case_id + "_global.dat", global_db["depth"].data[:, :], format="%9.3f", ) save.to_txt( params["directory"] + "/" + case_id + "/" + case_id + "_local.dat", local_db["depth"].data[:, :], format="%9.3f", ) write_swan_input(ind_, time, case_id, data, params, mesh="global", nested=True) return