Source code for environmentaltools.temporal.simulation

import warnings
from datetime import datetime, timedelta

import matplotlib.dates as mdates
import numpy as np
import pandas as pd
import scipy.stats as st
import os
from loguru import logger

from environmentaltools.temporal import core
from environmentaltools.temporal.copula import Copula as Copula
from environmentaltools.common import utils, save

"""This file is part of environmentaltools.

environmentaltools is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

environmentaltools is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with environmentaltools.  If not, see <https://www.gnu.org/licenses/>.
"""


def show_init_message():
    message = (
        "\n"
        + "=============================================================================\n"
        + " Initializing environmentaltools.temporal, v.1.0.0\n"
        + "=============================================================================\n"
        + "Copyright (C) 2021 Environmental Fluid Dynamics Group (University of Granada)\n"
        + "=============================================================================\n"
        + "This program is free software; you can redistribute it and/or modify it under\n"
        + "the terms of the GNU General Public License as published by the Free Software\n"
        + "Foundation; either version 3 of the License, or (at your option) any later \n"
        + "version.\n"
        + "This program is distributed in the hope that it will be useful, but WITHOUT \n"
        + "ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS\n"
        + "FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.\n"
        + "You should have received a copy of the GNU General Public License along with\n"
        + "this program; if not, write to the Free Software Foundation, Inc., 675 Mass\n"
        + "Ave, Cambridge, MA 02139, USA.\n"
        + "============================================================================="
    )
    return message


[docs] def simulation( param: dict, tidalConstituents: dict = None, dur_storm_calms: pd.DataFrame = None, seed: int = np.random.randint(1e6), include: bool = False, ): """Simulates time series with the probability model given in param and the temporal dependency given in df_dt. Parameters ---------- param : dict The probability models of every variable tidalConstituents : dict, optional Tidal constituents required to compute the tidal elevation and the mean sea level (Códiga, 2011). Defaults to None. dur_storm_calms : pd.DataFrame, optional Time series of storm events seed : int, optional A value to create a non-random simulation (mainly for debugging actions). Default is a random integer. include : bool, optional Include some deterministic time series given by the user. Default is False. Returns ------- None Saves files with simulated time series Examples -------- >>> params = {"Hs": {... see environmentaltools.temporal.analysis.fit_marginal_distribution}, ... "TD": {... environmentaltools.temporal.see analysis.dependency}, ... "TS": {}} """ np.random.seed(seed) # Check the parameters param = check_parameters(param) # Start the simulation algorithm for nosim in range(param["TS"]["nosim"]): logger.info( "Simulation no. %s of %s" % (str(nosim + 1).zfill(4), str(param["TS"]["nosim"]).zfill(4)) ) # Make storms simulation (Lira et al, 2019) if param["TS"]["events"]: # Initializa storm duration and calms dictionary durs_by_seasons = { i: {"storms": [], "calms": []} for i in param["TS"]["season"] } index = {} # Fill storm and calm durations using Copula parameters for season in param["TS"]["season"].keys(): cop = Copula( dur_storm_calms.loc[ dur_storm_calms["season"] == season, "dur_storm" ].values[:], dur_storm_calms.loc[ dur_storm_calms["season"] == season, "dur_calms" ].values[:], param["TS"]["family"], ) # cop.theta, cop.tau = param["TS"]["season"][season] cop.generate_xy(n=10000) ( durs_by_seasons[season]["storms"], durs_by_seasons[season]["calms"], ) = ( cop.X1, cop.Y1, ) index[season] = 0 # Remove outlayers if any(durs_by_seasons[season]["storms"] < 0): indexs = np.where(durs_by_seasons[season]["storms"] > 0) durs_by_seasons[season]["storms"] = durs_by_seasons[season][ "storms" ][indexs] durs_by_seasons[season]["calms"] = durs_by_seasons[season]["calms"][ indexs ] warnings.warn( "Some storms duration at {} are negative. Removed from events database. Please, check the copula parameters.".format( season ) ) # Remove outlayers if any(durs_by_seasons[season]["calms"] < 0): indexs = np.where(durs_by_seasons[season]["calms"] > 0) durs_by_seasons[season]["storms"] = durs_by_seasons[season][ "storms" ][indexs] durs_by_seasons[season]["calms"] = durs_by_seasons[season]["calms"][ indexs ] warnings.warn( "Some calms duration at {} are negative. Removed from eventsdatabase. Please, check the copula parameters.".format( season ) ) ini, end = ( datetime.strptime(param["TS"]["start"], "%Y/%m/%d %H:%M:%S"), datetime.strptime(param["TS"]["end"], "%Y/%m/%d %H:%M:%S"), ) df = pd.DataFrame() calms, storms = [], [] season = class_seasons(ini, type_=param["TS"]["class_type"]) calms.append(durs_by_seasons[season]["calms"][0]) ini = ini + timedelta(hours=durs_by_seasons[season]["calms"][0]) # Start the simulation while ini < end: # Look for season and retrieve the durations season = class_seasons(ini, type_=param["TS"]["class_type"]) dstorm, dcalm = ( durs_by_seasons[season]["storms"][index[season]], durs_by_seasons[season]["calms"][index[season]], ) calms.append(dcalm) storms.append(dstorm) # Locate the start/end of the i-storm if "D" in param["TS"]["freq"]: timedelta_storm = timedelta(days=dstorm) end_i = ini + timedelta(days=dstorm + dcalm) if param["TS"]["freq"] == "D": factor = 1 else: factor = int(param["TS"]["freq"].split("D")[0]) elif "H" in param["TS"]["freq"]: timedelta_storm = timedelta(hours=dstorm) end_i = ini + timedelta(hours=dstorm + dcalm) if param["TS"]["freq"] == "H": factor = 1 else: factor = int(param["TS"]["freq"].split("H")[0]) # Initialize the normalize simulation for the i-storm df_zsim = pd.DataFrame( -1, index=pd.date_range( start=ini, end=ini + timedelta_storm, freq=param["TS"]["freq"], ), columns=param["TS"]["vars"], ) # Generate the i-storm zsim = var_simulation(param["TD"], int(dstorm / factor) + 1, "normal") df_zsim.loc[:, param["TS"]["vars"]] = zsim df = pd.concat([df, df_zsim]) index[season] = index[season] + 1 ini = end_i # Create the normalized date df["n"] = ( (df.index.dayofyear + df.index.hour / 24.0 - 1) / pd.to_datetime( {"year": df.index.year, "month": 12, "day": 31, "hour": 23} ).dt.dayofyear ).values # Return the value at time t and the parameters associated for ind_, var_ in enumerate(param["TS"]["vars"]): # Read the NS parameters of the PMs if param["TS"]["ensemble"]: for model in param["TS"]["models"]: param[var_] = utils.string_to_function(param[var_], model) else: param = utils.string_to_function(param, var_) # Compute the inverse Gaussian for the "var_" if param["TS"]["conditional"] & (param["TS"]["mvar"] == var_): # If it is the main variable of the analysis, compute the # conditional probability reached the threshold df_var = pd.DataFrame( np.ones(len(df["n"])) * param["TS"]["threshold"], index=df.index, columns=["data"], ) df_var[var_] = df[var_].values df_var["n"] = df["n"].values # Compute the NS-CDF if param["TS"]["ensemble"]: if param["EA"]["weights"] == "equal": cdfu = 0 for model in param["TS"]["models"]: cdfu_model = core.cdf(df_var, param[var_][model]) cdfu += cdfu_model cdfu = cdfu / len(param["TS"]["models"]) else: cdfu = 0 for i, model in enumerate(param["TS"]["models"]): cdfu_model = core.cdf(df_var, param[var_][model]) * ( param["EA"]["weights"][i] ) cdfu += cdfu_model else: cdfu = core.cdf(df_var, param[ind_]) cdfu = pd.DataFrame(cdfu) # Return the values cdfj = ( st.norm.cdf(df.loc[:, var_]) * (1 - cdfu["prob"].values[:]) + cdfu["prob"].values[:] ) dfj = pd.DataFrame(cdfj, index=df.index, columns=["prob"]) else: # Compute the inverse Gaussian for the "var_" dfj = pd.DataFrame( st.norm.cdf(df.loc[:, var_].values[:]), index=df.index, columns=["prob"], ) dfj["n"] = df["n"].copy() # Compute the ppf of the PM for "var_" if param["TS"]["ensemble"]: res = core.ensemble_ppf(dfj, param, var_, param["TS"]["nodes"]) else: res = pd.DataFrame( core.ppf(dfj, param[var_]), #, ppf=True), index=dfj.index, columns=[var_], ) df[var_] = res[var_].values # Compute the inverse of the power transform if it is required # Transformed timeserie if param[var_]["transform"]["make"]: if "scale" in param: df[var_] = df[var_] * param[var_]["scale"] df[var_] = df[var_] + param[var_]["transform"]["min"] df[var_] = core.inverse_transform( df[[var_]], param[var_]) # if param["EA"]["make"]: # if "scale" in param: # df[var_] = df[var_] * param[var_]["scale"] # df[var_] = df[var_] + param["EA"]["min_ensemble"] # df[var_] = core.inverse_transform(df[[var_]], param["EA"], True) # elif "scale" in param: # df[var_] = df[var_] * param[var_]["scale"] else: # Make the full simulation ini, end = ( datetime.strptime(param["TS"]["start"], "%Y/%m/%d %H:%M:%S"), datetime.strptime(param["TS"]["end"], "%Y/%m/%d %H:%M:%S"), ) df = pd.DataFrame( index=pd.date_range( start=ini, end=end, freq=param["TS"]["freq"], ) ) # Create the normalized date df["n"] = ( (df.index.dayofyear + df.index.hour / 24.0 - 1) / pd.to_datetime( {"year": df.index.year, "month": 12, "day": 31, "hour": 23} ).dt.dayofyear ).values # Create the stationary full simulation zsim = var_simulation(param["TD"], len(df), "normal") for ind_, var_ in enumerate(param["TD"]["vars"]): # Compute the inverse of the Gaussian PM dfj = pd.DataFrame( st.norm.cdf(zsim[:, ind_]), index=df.index, columns=["prob"] ) dfj["n"] = df["n"].copy() # Compute the ppf of the PMs for "var_" if param["TS"]["ensemble"]: for model in param["TS"]["models"]: param[var_] = utils.string_to_function(param[var_], model) df[var_] = core.ensemble_ppf(dfj, param, var_, param["TS"]["nodes"]) # Transformed timeserie if param[var_]["transform"]["make"]: if "scale" in param: df[var_] = df[var_] * param[var_]["scale"] if "min" in param: df[var_] = df[var_] + param[var_]["transform"]["min"] df[var_] = core.inverse_transform(df[[var_]], param[var_]) elif "scale" in param[var_]: df[var_] = df[var_] * param[var_]["scale"] else: param = utils.string_to_function(param, var_) df[var_] = pd.DataFrame( core.ppf(dfj, param[var_]), index=dfj.index, columns=[var_], ) # Transformed timeseries if param[var_]["transform"]["make"]: if "scale" in param[var_]: df[var_] = df[var_] * param[var_]["scale"] df[var_] = df[var_] + param[var_]["transform"]["min"] df[var_] = core.inverse_transform(df[[var_]], param[var_]) elif "scale" in param[var_]: df[var_] = df[var_] * param[var_]["scale"] # Include tidal level if added if tidalConstituents is not None: # The tidal level is deterministic so just one time per simulations is # reconstructed if nosim == 0: from utide import reconstruct time = mdates.date2num(df.index.to_pydatetime()) tidalLevel = reconstruct(time, tidalConstituents) df["ma"] = tidalLevel["h"] - tidalConstituents["mean"] if "mm" in df.columns: df["eta"] = tidalLevel["h"] + df["mm"] # Transform radians to angles (circular variables) # for var_ in param["TD"]["vars"]: # if param[var_]["circular"] == True: # df[var_] = np.rad2deg(df[var_]) # Include any deterministic timeseries if given if "include" in param["TS"]: if param["TS"]["include"]: df[include.name] = include logger.info( "Saving simulation no. %s of %s" % (str(nosim + 1).zfill(4), str(param["TS"]["nosim"]).zfill(4)) ) # Create the folder for simulations os.makedirs(param["TS"]["folder"], exists_ok=True) # Save simulation file if param["TS"]["save_z"]: save.to_txt( zsim, param["TS"]["folder"] + "/simulation_z_" + str(nosim + 1).zfill(4) + ".csv", ) save.to_csv( df, param["TS"]["folder"] + "/simulation_" + str(nosim + 1).zfill(4) + ".zip", ) # If storm analysis, save the storm and calms duration too if param["TS"]["events"]: dstorm = pd.DataFrame(storms) save.to_csv( dstorm, param["TS"]["folder"] + "/durs_storm_" + str(nosim + 1).zfill(4) + ".zip", ) dcalms = pd.DataFrame(calms) save.to_csv( dcalms, param["TS"]["folder"] + "/durs_calms_" + str(nosim + 1).zfill(4) + ".zip", ) del df return
[docs] def check_parameters(param): """_summary_ TODO Args: param (_type_): _description_ Raises: ValueError: _description_ ValueError: _description_ ValueError: _description_ Returns: _type_: _description_ """ if param["TS"]["nosim"] <= 0: raise ValueError( "The number of simulation should be a positive integer. Got {}.".format( param["TS"]["nosim"] ) ) # Required for the numerical computation of non-stationary F for ensembles if not "nodes" in param["TS"].keys(): param["TS"]["nodes"] = [4383, 250] if not "conditional" in param["TS"].keys(): param["TS"]["conditional"] = False param["TS"]["mvar"] = None else: if not "mvar" in param["TS"].keys(): raise ValueError( "The main variable (mvar) is required for conditional analysis." ) if not "threshold" in param["TS"].keys(): raise ValueError( "The threshold parameter is required for conditional analysis." ) if not "include" in param["TS"].keys(): param["TS"]["include"] = False if not "F_files" in param["TS"].keys(): param["TS"]["F_files"] = "nonst_F_ensemble_" if not "folder" in param["TS"].keys(): param["TS"]["folder"] = "simulations" if not "events" in param["TS"].keys(): param["TS"]["events"] = False else: if param["TS"]["events"]: if not "family" in param["TS"].keys(): raise ValueError("Family is required for events simulation.") if not "class_type" in param["TS"].keys(): param["TS"]["class_type"] = "WSSF" param["TS"]["season"] = {"winter": 1, "spring": 2, "fall": 3, "summer": 4} if not "ensemble" in param["TS"].keys(): param["TS"]["ensemble"] = False if not "save_z" in param["TS"].keys(): param["TS"]["save_z"] = False return param
[docs] def var_simulation(par: dict, lsim: int, distribution: str): """Creates new normalized multivariate time series Args: * par (dict): paramaters of the VAR adjustment * lsim (int): length of the new time series * distribution (string): name of the probability model acccording to scipy.stats Returns: * zsim (np.ndarray): a multivariate time series """ dim = par["dim"] ord_ = par["id"]+1 zsim = np.zeros([dim, lsim]) if distribution == "normal": # TODO: some other non-normal multivariate analysis if dim == 1: y = np.random.normal(np.zeros(dim), np.sqrt(par["Q"]), lsim) y = y[:, np.newaxis].T else: y = np.random.multivariate_normal(np.zeros(dim), par["Q"], lsim).T # Initialize the simulation matrix zsim[:, 0:ord_] = y[:, 0:ord_] for i in range(ord_, lsim): z = np.fliplr(zsim[:, i - ord_ : i]) z1 = np.vstack((1, np.reshape(z.T, (ord_ * dim, 1)))) zsim[:, i] = np.dot(par["B"], z1).T + y[:, i] return zsim.T
[docs] def class_seasons(date, type_="WSSF"): """Obtains the season of any date Args: * date (datetime): starting date of the event Returns: * season (string): the season """ if type_ == "WSSF": if ( ((date.month == 12) & (date.day >= 21)) | (date.month == 1) | (date.month == 2) | ((date.month == 3) & (date.day < 21)) ): season = "winter" elif ( ((date.month == 3) & (date.day >= 21)) | (date.month == 4) | (date.month == 5) | ((date.month == 6) & (date.day < 21)) ): season = "spring" elif ( ((date.month == 6) & (date.day >= 21)) | (date.month == 7) | (date.month == 8) | ((date.month == 9) & (date.day < 21)) ): season = "summer" else: season = "fall" elif type_ == "WS": if ( ((date.month == 12) & (date.day >= 21)) | (date.month == 1) | (date.month == 2) | (date.month == 3) | (date.month == 4) | (date.month == 5) | ((date.month == 6) & (date.day < 21)) ): season = "WS" else: season = "SF" elif type_ == "SF": if ( ((date.month == 3) & (date.day >= 21)) | (date.month == 4) | (date.month == 5) | (date.month == 6) | (date.month == 7) | (date.month == 8) | ((date.month == 9) & (date.day < 21)) ): season = "SS" else: season = "FW" return season