import numpy as np
import pandas as pd
[docs]
def extreme_events(data, var_name, threshold, minimum_interarrival_time, minimum_cycle_length, interpolation=False,
interpolation_method='linear', interpolation_freq='1min', truncate=False, extra_info=False):
"""Extract storm and calm cycles from time series data using threshold-based detection.
Identifies independent extreme events (storms) and calm periods by detecting
threshold crossings, merging nearby events, and filtering short-duration events.
Optionally interpolates crossing times for improved temporal resolution.
Parameters
----------
data : pd.DataFrame
Time series data with datetime index
var_name : str
Name of the variable column to analyze
threshold : float
Threshold value separating storm (cycle) and calm periods
minimum_interarrival_time : pd.Timedelta
Minimum time between storms to ensure statistical independence
minimum_cycle_length : pd.Timedelta
Minimum storm duration to consider it valid
interpolation : bool, default=False
Enable interpolation to estimate precise threshold crossing times
interpolation_method : str, default='linear'
Interpolation method: 'linear', 'cubic', 'nearest', etc.
interpolation_freq : str, default='1min'
Frequency for interpolation grid (e.g., '1min', '10s', '1h')
truncate : bool, default=False
Clip storm values below threshold and calm values above threshold
to the threshold value
extra_info : bool, default=False
Return additional diagnostic information
Returns
-------
cycles : list of pd.Series
List of storm cycles, each as a Series with values and timestamps
calm_periods : list of pd.Series
List of calm periods between storms
info : dict, optional
Additional information (only if extra_info=True):
- 'data_cycles' : pd.DataFrame
Data for all storm periods
- 'data_calm_periods' : pd.DataFrame
Data for all calm periods
- 'cycles_indexes_clipped' : list
Indices where storm values were clipped (if truncate=True)
- 'calm_periods_indexes_clipped' : list
Indices where calm values were clipped (if truncate=True)
- 'interpolation_indexes' : np.ndarray
Indices of interpolated threshold crossings (if interpolation=True)
Notes
-----
The algorithm follows these steps:
1. Identify initial threshold crossings
2. Optionally interpolate to refine crossing times
3. Merge nearby storms (within minimum_interarrival_time)
4. Remove short storms (< minimum_cycle_length)
5. Optionally truncate values to threshold
Storm independence is crucial for extreme value analysis. The
minimum_interarrival_time should be chosen based on the decorrelation
time of the process (typically 3-5 times the characteristic timescale).
Examples
--------
>>> import pandas as pd
>>> df = pd.DataFrame({
... 'Hs': [1.2, 2.5, 3.1, 2.8, 1.5, 0.8, 1.0, 2.9, 3.5, 2.1]
... }, index=pd.date_range('2020', periods=10, freq='6h'))
>>> cycles, calms = extreme_events(
... df, 'Hs', threshold=2.0,
... minimum_interarrival_time=pd.Timedelta('12h'),
... minimum_cycle_length=pd.Timedelta('6h')
... )
>>> print(f"Found {len(cycles)} storm events")
"""
full_data = data.copy()
# noinspection PyUnresolvedReferences
var_data = full_data[var_name]
cycles_start, cycles_end = values_over_threshold(var_data, threshold)
# If data ends with a cycle, it is necessary to remove it
# noinspection PyTypeChecker
if len(var_data.index) == cycles_end[-1]:
var_data = var_data[:cycles_start[-1]]
cycles_start = cycles_start[:-1]
cycles_end = cycles_end[:-1]
new_cycles_limits_indexes = None
if interpolation:
new_cycles_limits_indexes = interpolation_boundaries_index(var_data, cycles_start, cycles_end,
interpolation_freq, interpolation_method,
threshold)
new_cycles_limits = pd.DataFrame(threshold, index=new_cycles_limits_indexes, columns=[var_name])
# noinspection PyUnresolvedReferences
full_data = full_data.combine_first(new_cycles_limits)
full_data = full_data.interpolate(interpolation_method)
var_data = full_data[var_name]
cycles_start, cycles_end = values_over_threshold(var_data, threshold)
# Find near cycles (not independent cycles)
near_cycles = near_events(var_data, cycles_start, cycles_end, minimum_interarrival_time, interpolation)
while np.any(near_cycles):
# Join near cycles
cycles_start = cycles_start[np.append(True, ~near_cycles)]
cycles_end = cycles_end[np.append(~near_cycles, True)]
# Find if there are more near cycles
near_cycles = near_events(var_data, cycles_start, cycles_end, minimum_interarrival_time,
interpolation)
# Remove short cycles
cycles_length = var_data.index[cycles_end - 1] - var_data.index[cycles_start]
short_cycles = cycles_length < minimum_cycle_length
cycles_start = cycles_start[~short_cycles]
cycles_end = cycles_end[~short_cycles]
cycles_indexes_clipped = None
calm_periods_indexes_clipped = None
cycles_mask = extreme_indexes(var_data, cycles_start, cycles_end)
if truncate:
lower_values_clipped = var_data[cycles_mask].clip(lower=threshold)
upper_values_clipped = var_data[~cycles_mask].clip(upper=threshold)
cycles_indexes_clipped = lower_values_clipped != var_data[cycles_mask]
calm_periods_indexes_clipped = upper_values_clipped != var_data[~cycles_mask]
# noinspection PyUnresolvedReferences
cycles_indexes_clipped = cycles_indexes_clipped[cycles_indexes_clipped].index.tolist()
# noinspection PyUnresolvedReferences
calm_periods_indexes_clipped = calm_periods_indexes_clipped[calm_periods_indexes_clipped].index.tolist()
var_data[cycles_mask] = lower_values_clipped
var_data[~cycles_mask] = upper_values_clipped
# Split cycles and calm periods
cross_indexes = np.sort(np.concatenate([cycles_start, cycles_end]))
cross_indexes = cross_indexes[cross_indexes != 0] # avoid splitting by 0 index
data_splitted = np.split(var_data, cross_indexes)
# Check if var_data starts with a cycle or a calm period
if not np.any(cycles_start):
cycles = []
calm_periods = []
cycles_indexes = []
calm_periods_indexes = []
elif cycles_start[0] == 0:
cycles = data_splitted[0::2]
calm_periods = data_splitted[1::2]
cycles_indexes = var_data[cycles_mask].index
calm_periods_indexes = var_data[~cycles_mask].index
else:
cycles = data_splitted[1::2]
calm_periods = data_splitted[2::2]
cycles_indexes = var_data[cycles_mask].index
calm_periods_indexes = var_data[~cycles_mask].index.difference(data_splitted[0].index)
if extra_info:
info = dict()
info['data_cycles'] = full_data.loc[cycles_indexes]
info['data_calm_periods'] = full_data.loc[calm_periods_indexes]
if truncate:
info['cycles_indexes_clipped'] = cycles_indexes_clipped
info['calm_periods_indexes_clipped'] = calm_periods_indexes_clipped
if interpolation:
info['interpolation_indexes'] = new_cycles_limits_indexes
return cycles, calm_periods, info
else:
return cycles, calm_periods
[docs]
def events_duration(events):
"""Calculate duration of each event (storm or calm period).
Computes the time span of each event from its first to last timestamp.
Parameters
----------
events : list of pd.Series
List of events where each element is a Series with values and
datetime index representing the event time evolution
Returns
-------
pd.Series
Duration of each event (as timedelta) with the event start time
as index
Examples
--------
>>> events = [
... pd.Series([2.5, 3.0, 2.8], index=pd.date_range('2020-01-01', periods=3, freq='1h')),
... pd.Series([2.2, 2.7], index=pd.date_range('2020-01-05', periods=2, freq='1h'))
... ]
>>> durations = events_duration(events)
>>> print(durations)
"""
events_stacked = pd.DataFrame(events).T
start = events_stacked.apply(pd.Series.first_valid_index)
end = events_stacked.apply(pd.Series.last_valid_index)
events_length = end - start
duration = pd.Series(events_length.values, index=start.values)
return duration
[docs]
def values_over_threshold(data, threshold):
"""Identify indices where data exceeds threshold using sparse arrays.
Efficiently detects contiguous blocks of values above threshold using
pandas sparse arrays for memory efficiency.
Parameters
----------
data : pd.Series
Time series data
threshold : float
Threshold value for event detection
Returns
-------
cycles_start : np.ndarray
Integer indices marking the start of each exceedance block
cycles_end : np.ndarray
Integer indices marking the end (exclusive) of each exceedance block
Notes
-----
Uses pandas sparse block arrays for efficient storage and detection
of contiguous regions above threshold.
Examples
--------
>>> data = pd.Series([1.0, 2.5, 3.0, 2.8, 1.5, 0.8, 2.9, 3.1])
>>> start, end = values_over_threshold(data, threshold=2.0)
>>> print(f"Exceedance blocks: {list(zip(start, end))}")
"""
# Find cycles starting and ending positions
# noinspection PyTypeChecker
cycles_data = pd.arrays.SparseArray(data.where(data >= threshold), kind='block')
cycles_start = cycles_data.sp_index.blocs
cycles_duration = cycles_data.sp_index.blengths
cycles_end = cycles_start + cycles_duration
return cycles_start, cycles_end
[docs]
def interpolation_series(data, index_start, index_end):
"""Prepare data for interpolation between cycle boundaries.
Creates a DataFrame with start and end values grouped for subsequent
interpolation across threshold crossings.
Parameters
----------
data : pd.Series
Time series data
index_start : array-like
Indices just before threshold crossings
index_end : array-like
Indices just after threshold crossings
Returns
-------
pd.DataFrame
Combined data with 'value' and 'group' columns for interpolation
Notes
-----
Groups are created to interpolate each crossing independently.
"""
start = data[index_start]
end = data[index_end]
group = start.index.asi8
interpolation_start = start.to_frame('value')
interpolation_start['group'] = group
interpolation_end = end.to_frame('value')
interpolation_end['group'] = group
interpolation = pd.concat([interpolation_start, interpolation_end])
return interpolation
[docs]
def interpolation_boundaries_index(data, cycles_start, cycles_end, interpolation_freq, interpolation_method, threshold):
"""Find precise threshold crossing times through interpolation.
Interpolates between data points surrounding threshold crossings to
estimate the exact time when the threshold was crossed.
Parameters
----------
data : pd.Series
Time series data with datetime index
cycles_start : np.ndarray
Indices of cycle start positions
cycles_end : np.ndarray
Indices of cycle end positions
interpolation_freq : str
Temporal frequency for interpolation grid (e.g., '1min', '10s')
interpolation_method : str
Interpolation method: 'linear', 'cubic', 'quadratic', etc.
threshold : float
Threshold value to find crossing times for
Returns
-------
np.ndarray
DatetimeIndex values of estimated threshold crossing times
Notes
-----
The function:
1. Creates dense time grid at interpolation_freq around crossings
2. Interpolates data values onto this grid
3. Finds timestamps closest to threshold value
This provides sub-sample temporal resolution for event boundaries.
"""
if cycles_start[0] == 0:
interpolation_start = interpolation_series(data, cycles_start[1:]-1, cycles_start[1:])
else:
interpolation_start = interpolation_series(data, cycles_start-1, cycles_start)
interpolation_end = interpolation_series(data, cycles_end-1, cycles_end)
interpolation_df = pd.concat([interpolation_start, interpolation_end])
interpolation_df.index.name = 'time'
interpolation_groups = interpolation_df.groupby('group').resample(interpolation_freq).asfreq()
interpolation = interpolation_groups['value'].interpolate(interpolation_method)
new_cycles_limits_indexes = interpolation.reset_index('time').groupby(level='group', group_keys=False).apply(
interpolation_nearest, threshold)['time'].values
new_cycles_limits_indexes = np.unique(new_cycles_limits_indexes)
return new_cycles_limits_indexes
[docs]
def interpolation_nearest(group, threshold):
"""Find nearest interpolated value to threshold.
Helper function to locate the timestamp where interpolated values
are closest to the threshold value.
Parameters
----------
group : pd.DataFrame
DataFrame with 'value' column containing interpolated data
threshold : float
Target threshold value
Returns
-------
pd.DataFrame
Single-row DataFrame with the value closest to threshold
Notes
-----
Uses absolute difference to find the nearest match.
Returns the row with minimum absolute difference (value - threshold).
"""
return group.iloc[(group['value']-threshold).abs().argsort()[:1]]
[docs]
def extreme_indexes(data, extreme_start, extreme_end):
"""Create boolean mask for extreme event periods.
Generates a boolean array indicating which time steps fall within
any extreme event period.
Parameters
----------
data : pd.Series
Time series data (used for length reference)
extreme_start : np.ndarray
Array of event start indices
extreme_end : np.ndarray
Array of event end indices (exclusive)
Returns
-------
np.ndarray
Boolean array where True indicates time steps within extreme events
Notes
-----
The function:
1. Creates condition matrix checking if each index falls within any event
2. Uses broadcasting: (n_events, n_timesteps) comparison
3. Returns True if index >= start AND < end for any event
Examples
--------
>>> data = pd.Series(range(100))
>>> starts = np.array([10, 50])
>>> ends = np.array([20, 60])
>>> mask = extreme_indexes(data, starts, ends)
>>> mask[15] # Within first event
True
>>> mask[30] # Between events
False
"""
indexes = np.arange(len(data))
condition = (indexes >= extreme_start[:, np.newaxis]) & (indexes < extreme_end[:, np.newaxis])
extreme = np.any(condition, axis=0)
return extreme
[docs]
def near_events(data, cycles_start, cycles_end, minimum_interarrival_time, interpolation):
"""Identify non-independent events for merging.
Determines which consecutive events are separated by less than the
minimum interarrival time, indicating they should be merged into
single events.
Parameters
----------
data : pd.Series
Time series data with datetime index
cycles_start : np.ndarray
Array of cycle start indices
cycles_end : np.ndarray
Array of cycle end indices
minimum_interarrival_time : pd.Timedelta
Minimum time gap required between independent events
interpolation : bool
Whether interpolation was used for threshold crossings
Returns
-------
np.ndarray
Boolean array indicating which events are too close to the previous event
Notes
-----
Implements the independence criterion for extreme events:
- Events separated by less than minimum_interarrival_time are considered
part of the same storm system
- Typically set based on decorrelation time scale of the process
The interpolation flag adjusts index selection:
- True: Uses exact crossing times (cycles_end - 1)
- False: Uses data sample indices (cycles_start - 1, cycles_end)
Examples
--------
>>> minimum_gap = pd.Timedelta(hours=72)
>>> near = near_events(data, starts, ends, minimum_gap, interpolation=False)
>>> # Merge events where near is True
"""
if interpolation:
near_cycles = data.index[cycles_start[1:]] - data.index[cycles_end[:-1] - 1] < minimum_interarrival_time
else:
near_cycles = data.index[cycles_start[1:] - 1] - data.index[cycles_end[:-1]] < minimum_interarrival_time
return near_cycles