# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable, Optional, TypeVar
import numpy as np
import pandas as pd
from swvo.io.utils import enforce_utc_timezone
logger = logging.getLogger(__name__)
Number = TypeVar("Number", int, float)
[docs]
class HpEnsemble:
"""This is a base class for Hp ensemble data.
Parameters
----------
index : str
Hp index Possible options are: hp30, hp60.
data_dir : Path | None
Data directory for the Hp data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument.
If False (default), the passed data_dir is used if provided, otherwise the environment variable is used.
Methods
-------
read
Raises
------
ValueError
Returns `ValueError` if necessary environment variable is not set.
FileNotFoundError
Returns `FileNotFoundError` if the data directory does not exist.
"""
ENV_VAR_NAME: str = "" # Must be set by subclasses
LABEL: str = "ensemble"
def __init__(self, index: str, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
"""Initialize HpEnsemble.
Parameters
----------
index : str
Hp index. Possible options are: hp30, hp60.
data_dir : Path | None
Data directory for the Hp data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument, by default False
"""
self.index = index
if self.index not in ("hp30", "hp60"):
msg = f"Encountered invalid index: {self.index}. Possible options are: hp30, hp60!"
raise ValueError(msg)
if prefer_env_var and self.ENV_VAR_NAME in os.environ:
data_dir = Path(os.environ[self.ENV_VAR_NAME])
elif data_dir is None:
if not self.ENV_VAR_NAME or self.ENV_VAR_NAME not in os.environ:
raise ValueError(f"Necessary environment variable {self.ENV_VAR_NAME} not set!")
data_dir = Path(os.environ[self.ENV_VAR_NAME])
self.data_dir = Path(data_dir)
logger.info(f"{self.index.upper()} Ensemble data directory: {self.data_dir}")
if not self.data_dir.exists():
msg = f"Data directory {self.data_dir} does not exist! Impossible to retrieve data!"
logger.error(msg)
raise FileNotFoundError(msg)
self.index_number: int = int(index[2:])
[docs]
def read(self, start_time: datetime, end_time: datetime) -> list[pd.DataFrame]:
"""
Read Hp ensemble data for the requested period.
Parameters
----------
start_time : datetime
Start time of the data to read. Must be timezone-aware.
end_time : datetime
End time of the data to read. Must be timezone-aware.
Returns
-------
list[:class:`pandas.DataFrame`]
List of ensemble data frames containing data for the requested period.
Raises
------
FileNotFoundError
Returns `FileNotFoundError` if no ensemble file is found for the requested date.
"""
if start_time is not None:
start_time = enforce_utc_timezone(start_time)
if end_time is not None:
end_time = enforce_utc_timezone(end_time)
if start_time is None:
start_time = datetime.now(timezone.utc)
if end_time is None:
end_time = start_time + timedelta(days=3)
start_date = start_time.replace(microsecond=0, minute=0, second=0)
str_date = start_date.strftime("%Y%m%dT%H0000")
if start_time > end_time:
msg = "start_time must be before end_time"
logger.error(msg)
raise ValueError(msg)
file_list = self._ensemble_file_list(str_date)
data = []
if len(file_list) == 0:
msg = f"No {self.index} ensemble file found for requested date {start_date}"
logger.error(msg)
raise FileNotFoundError(msg)
for file in file_list:
hp_df = pd.read_csv(file, names=["t", self.index])
hp_df["t"] = pd.to_datetime(hp_df["t"], utc=True)
hp_df.index = hp_df["t"]
hp_df = hp_df.drop(labels=["t"], axis=1)
hp_df["file_name"] = file
hp_df.loc[hp_df[self.index].isna(), "file_name"] = None
hp_df = hp_df.truncate(
before=start_time - timedelta(minutes=int(self.index_number) - 0.01),
after=end_time + timedelta(minutes=int(self.index_number) + 0.01),
)
data.append(hp_df)
return data
def _ensemble_file_list(self, str_date: str) -> list[Path]:
"""Check for the existence of ensemble files for a given date.
Parameters
----------
str_date : str
Date string in the format YYYYMMDDTHH0000.
Returns
-------
list[Path]
A list of file paths for the ensemble files.
Warnings
--------
DeprecationWarning
Warns if deprecated file naming convention is used.
"""
file_list_old_name = sorted(
self.data_dir.glob(
f"{str_date[:4]}/{str_date[4:6]}/{str_date[6:8]}/FORECAST_{self.index.title()}_{str_date}_ensemble_*.csv"
),
key=lambda x: int(x.stem.split("_")[-1]),
)
file_list_new_name = sorted(
self.data_dir.glob(
f"{str_date[:4]}/{str_date[4:6]}/{str_date[6:8]}/FORECAST_{self.index.upper()}_SWIFT_DRIVEN_swift_{str_date}_ensemble_*.csv"
),
key=lambda x: int(x.stem.split("_")[-1]),
)
file_list: list[Path]
if len(file_list_new_name) == 0 and len(file_list_old_name) == 0:
file_list = []
elif len(file_list_new_name) > 0:
file_list = file_list_new_name
elif len(file_list_old_name) > 0:
warnings.warn(
"The use of FORECAST_HP*_SWIFT_DRIVEN_swift_* files is deprecated. However since we still have these files from the PAGER project with this prefix, this will be supported",
DeprecationWarning,
)
file_list = file_list_old_name
return file_list
def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: Number) -> list[pd.DataFrame]:
if start_time is not None:
start_time = enforce_utc_timezone(start_time)
if end_time is not None:
end_time = enforce_utc_timezone(end_time)
if start_time > end_time:
msg = "start_time must be before end_time"
logger.error(msg)
raise ValueError(msg)
if not (0 <= horizon <= 72):
raise ValueError("Horizon must be between 0 and 72 hours")
if self.index == "hp30":
freq = "0.5h"
if horizon % 0.5 != 0:
raise ValueError("Horizon for hp30 must be in 0.5 hour increments")
elif self.index == "hp60":
freq = "1h"
if horizon % 1 != 0:
raise ValueError("Horizon for hp60 must be in 1 hour increments")
align_start_to_hp_hr = start_time.replace(hour=start_time.hour, minute=0, second=0, microsecond=0)
align_end_to_hp_hr = end_time.replace(hour=end_time.hour, minute=0, second=0, microsecond=0)
full_time_range = pd.date_range(align_start_to_hp_hr, align_end_to_hp_hr, freq=freq, tz=timezone.utc)
file_offsets, time_indices = self._get_file_offsets_and_time_indices(full_time_range, horizon)
max_ensembles = 30 # Maximum number of ensemble files to check
ensemble_dfs = [pd.DataFrame(index=full_time_range) for _ in range(max_ensembles)]
for file_time, file_offset, iloc in zip(full_time_range, file_offsets, time_indices):
str_date = (file_time + timedelta(hours=file_offset)).strftime("%Y%m%dT%H0000")
file_list = self._ensemble_file_list(str_date)
for ensemble_idx in range(max_ensembles):
df = ensemble_dfs[ensemble_idx]
if ensemble_idx < len(file_list):
data = pd.read_csv(
file_list[ensemble_idx],
names=["Forecast Time", self.index],
parse_dates=["Forecast Time"],
).iloc[iloc]
data["source"] = str_date
data["Forecast Time"] = enforce_utc_timezone(data["Forecast Time"])
df.loc[file_time, "Forecast Time"] = data["Forecast Time"]
df.loc[file_time, self.index] = data[self.index]
df.loc[file_time, "source"] = file_list[ensemble_idx].stem
else:
df.loc[file_time, self.index] = np.nan
for df in ensemble_dfs:
df["horizon"] = horizon
df.index.name = "Time"
ensemble_dfs = [df for df in ensemble_dfs if not df[self.index].isna().all()]
if len(ensemble_dfs) == 0:
msg = f"No ensemble data found for the requested period {start_time} to {end_time} and horizon {horizon} hours. Check the data directory {self.data_dir} for available data."
logger.error(msg)
raise FileNotFoundError(msg)
return ensemble_dfs
def _get_file_offsets_and_time_indices(
self, file_time_range: Iterable, forecast_horizon: float
) -> tuple[list[int], list[int]]:
"""
Compute file offsets and time indices for a given forecast horizon.
Parameters
----------
file_time_range : iterable
Available file time steps.
forecast_horizon : float
Forecast horizon in hours (can be fractional, e.g., 0.5, 1.5, 72.0).
Returns
-------
file_offsets : list[int]
Offsets from the given time steps indicating which files to read.
time_indices : list[int]
Time indices to use for each file.
"""
file_offsets = []
time_indices = []
for current_time in file_time_range:
current_hour = current_time.hour
current_minute = current_time.minute
current_fractional_hour = current_hour + current_minute / 60.0
# Target forecast time (fractional hours from same midnight)
target_fractional_hour = current_fractional_hour + forecast_horizon
# Find the file base hour (multiple of 3) that contains this target
# Files are grouped by 3s: [0,1,2] use base 0, [3,4,5] use base 3, etc.
# Each file covers 72 hours from its base hour
# Determine which 3-hour group the target falls into
target_hour_int = int(target_fractional_hour)
target_base_hour = (target_hour_int // 3) * 3
if target_fractional_hour <= target_base_hour + 72:
file_base_hour = target_base_hour
else:
# use next group
file_base_hour = target_base_hour + 3
# Determine which file in the group (base, base+1, base+2) to use
# Use the file that was created at or before current_time
if current_hour >= file_base_hour:
# Use the latest file in the group that's <= current_hour
file_hour = min(current_hour, file_base_hour + 2)
else:
file_hour = file_base_hour
file_offset = file_hour - current_hour
file_offsets.append(file_offset)
hours_from_file_start = target_fractional_hour - file_base_hour
resolution = 0.5 if self.index == "hp30" else 1.0
time_index = int(hours_from_file_start / resolution)
time_indices.append(time_index)
return file_offsets, time_indices
[docs]
class Hp30Ensemble(HpEnsemble):
"""A class to handle Hp30 ensemble data.
Parameters
----------
data_dir : str | Path, optional
Data directory for the Hp30 ensemble data. If not provided, it will be read from the environment variable
"""
ENV_VAR_NAME = "HP30_ENSEMBLE_FORECAST_DIR"
def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
super().__init__("hp30", data_dir, prefer_env_var)
[docs]
def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: float) -> list[pd.DataFrame]:
"""Read Ensemble Hp30 forecast data for a given time range and forecast horizon.
Parameters
----------
start_time : datetime
Start time of the period for which to read the data.
end_time : datetime
End time of the period for which to read the data.
horizon : float
Forecast horizon (in hours).
Returns
-------
list[:class:`pandas.DataFrame`]
A list of data frames containing ensemble data for the requested period.
Raises
------
ValueError
Raises `ValueError` if the horizon is not between 0 and 72 hours.
ValueError
Raises `ValueError` if the horizon is not in 0.5 hour increments.
"""
return super().read_with_horizon(start_time, end_time, horizon)
[docs]
class Hp60Ensemble(HpEnsemble):
"""A class to handle Hp60 ensemble data.
Parameters
----------
data_dir : str | Path, optional
Data directory for the Hp60 ensemble data. If not provided, it will be read from the environment variable
"""
ENV_VAR_NAME = "HP60_ENSEMBLE_FORECAST_DIR"
def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
super().__init__("hp60", data_dir, prefer_env_var)
[docs]
def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: int) -> list[pd.DataFrame]:
"""Read Ensemble Hp60 forecast data for a given time range and forecast horizon.
Parameters
----------
start_time : datetime
Start time of the period for which to read the data.
end_time : datetime
End time of the period for which to read the data.
horizon : int
Forecast horizon (in hours).
Returns
-------
list[:class:`pandas.DataFrame`]
A list of data frames containing ensemble data for the requested period.
Raises
------
ValueError
Raises `ValueError` if the horizon is not between 0 and 72 hours.
ValueError
Raises `ValueError` if the horizon is not in 1 hour increments.
"""
return super().read_with_horizon(start_time, end_time, horizon)