# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
from typing import Optional
import numpy as np
import pandas as pd
import requests
from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone
logger = logging.getLogger(__name__)
[docs]
class HpGFZ(BaseIO):
"""This is a base class for HpGFZ data.
Parameters
----------
index : str
Hp index. Possible options are: hp30, hp60.
data_dir : Path | None
Data directory for the Hp data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument.
If False (default), the passed data_dir is used if provided, otherwise the environment variable is used.
Methods
-------
download_and_process
read
Raises
------
ValueError
Returns `ValueError` if necessary environment variable is not set.
"""
ENV_VAR_NAME = "RT_HP_GFZ_STREAM_DIR"
START_YEAR = 1985
API_URL = "https://kp.gfz.de/app/json/"
LABEL = "gfz"
def __init__(self, index: str, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
"""Initialize HpGFZ.
Parameters
----------
index : str
Hp index. Possible options are: hp30, hp60.
data_dir : Path | None
Data directory for the Hp data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument, by default False
"""
self.index = index
if self.index not in ("hp30", "hp60"):
msg = f"Encountered invalid index: {self.index}. Possible options are: hp30, hp60!"
raise ValueError(msg)
super().__init__(data_dir=data_dir, prefer_env_var=prefer_env_var)
self.index_number: int = int(index[2:])
self.data_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"{self.index.upper()} GFZ data directory: {self.data_dir}")
(self.data_dir / str(self.index)).mkdir(exist_ok=True)
[docs]
def download_and_process(
self,
start_time: datetime,
end_time: datetime,
*,
reprocess_files: bool = False,
) -> None:
"""Download and process HpGFZ data.
Parameters
----------
start_time : datetime
Start time of the data to be downloaded.
end_time : datetime
End time of the data to be downloaded.
reprocess_files : bool, optional
Downloads and processes the files again, defaults to False, by default False
Returns
-------
None
"""
temporary_dir = Path("./temp_hp_wget")
temporary_dir.mkdir(exist_ok=True, parents=True)
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)
for file_path, time_interval in zip(file_paths, time_intervals):
if file_path.exists() and not reprocess_files:
continue
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
try:
# Download data for this time interval
self._download(temporary_dir, time_interval[0], time_interval[1])
# Process the downloaded data
processed_df = self._process_single_file(temporary_dir, time_interval[0])
file_path.parent.mkdir(parents=True, exist_ok=True)
processed_df.to_csv(tmp_path, index=True, header=False)
tmp_path.replace(file_path)
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}")
if tmp_path.exists():
tmp_path.unlink()
continue
rmtree(temporary_dir, ignore_errors=True)
def _download(self, temporary_dir: Path, start_time: datetime, end_time: datetime) -> None:
"""Download data from the GFZ API.
Parameters
----------
temporary_dir : Path
Temporary directory to store the downloaded file.
start_time : datetime
Start time for the data request.
end_time : datetime
End time for the data request.
Raises
------
Exception
If the API request fails.
"""
# Format datetime to ISO format with Z timezone
start_str = start_time.isoformat().replace("+00:00", "Z")
end_str = end_time.isoformat().replace("+00:00", "Z")
# Determine the index parameter for the API
index_param = f"Hp{self.index_number}"
url = f"{self.API_URL}?start={start_str}&end={end_str}&index={index_param}&status=def"
logger.debug(f"Downloading data from {url} ...")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
logger.debug(f"Downloaded data: {len(data)} records")
# Save the JSON response to a temporary file for processing
output_file = temporary_dir / f"hp_data_{start_time.year}.json"
with open(output_file, "w") as f:
json.dump(data, f)
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise
[docs]
def read(self, start_time: datetime, end_time: datetime, *, download: bool = False) -> pd.DataFrame:
"""Read HpGFZ data for the given time range.
Parameters
----------
start_time : datetime
Start time of the data to read. Must be timezone-aware.
end_time : datetime
End time of the data to read. Must be timezone-aware.
download : bool, optional
Download data on the go, defaults to False.
Returns
-------
:class:`pandas.DataFrame`
HpGFZ data for the given time range.
"""
if start_time > end_time:
msg = "start_time must be before end_time"
logger.error(msg)
raise ValueError(msg)
start_time = enforce_utc_timezone(start_time)
end_time = enforce_utc_timezone(end_time)
if start_time < datetime(self.START_YEAR, 1, 1, tzinfo=timezone.utc):
logger.warning(
"Start date chosen falls behind the mission starting year. Moving start date to first"
" available mission files..."
)
start_time = datetime(self.START_YEAR, 1, 1, tzinfo=timezone.utc)
assert start_time < end_time
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)
# initialize data frame with NaNs
t = pd.date_range(
datetime(start_time.year, start_time.month, start_time.day, tzinfo=timezone.utc),
datetime(
end_time.year,
end_time.month,
end_time.day,
23,
59,
59,
tzinfo=timezone.utc,
),
freq=timedelta(minutes=int(self.index_number)),
)
data_out = pd.DataFrame(index=t)
data_out[self.index] = np.array([np.nan] * len(t))
for file_path, _ in zip(file_paths, time_intervals):
logger.info(f"Processing file {file_path} ...")
if not file_path.expanduser().exists() and download:
self.download_and_process(start_time, end_time)
# if we request a date in the future, the file will still not be found here
if not file_path.expanduser().exists():
logger.warning(f"File {file_path} not found, filling with NaNs")
continue
df_one_file = self._read_single_file(file_path)
# combine the new file with the old ones, replace all values present in df_one_file in data_out
data_out = df_one_file.combine_first(data_out)
data_out = data_out.truncate(
before=start_time - timedelta(minutes=int(self.index_number) - 0.01),
after=end_time + timedelta(minutes=int(self.index_number) + 0.01),
)
return data_out # noqa: RET504
def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> tuple[list, list]:
"""Get list of file paths and their corresponding time intervals.
Returns
-------
Tuple[List, List]
List of file paths and time intervals.
"""
file_paths = []
time_intervals = []
current_time = datetime(start_time.year, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
end_time = datetime(end_time.year, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
while current_time < end_time:
file_path = self.data_dir / self.index / f"Hp{self.index_number}_GFZ_{current_time.strftime('%Y')}.csv"
file_paths.append(file_path)
interval_start = current_time
interval_end = datetime(current_time.year, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
time_intervals.append((interval_start, interval_end))
current_time = datetime(current_time.year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
return file_paths, time_intervals
def _process_single_file(self, temp_dir: Path, start_time: datetime) -> pd.DataFrame:
"""Process HpGFZ data from JSON response to a DataFrame.
Parameters
----------
temp_dir : Path
Temporary directory containing the JSON file.
start_time : datetime
Start time to identify the correct JSON file.
Returns
-------
pd.DataFrame
HpGFZ data.
"""
data_total = pd.DataFrame()
json_file = temp_dir / f"hp_data_{start_time.year}.json"
if not json_file.exists():
logger.warning(f"JSON file {json_file} not found")
return data_total
with open(json_file) as f:
json_data = json.load(f)
data_total = pd.DataFrame(
{f"Hp{self.index_number}": json_data[f"Hp{self.index_number}"]},
index=pd.to_datetime(json_data["datetime"], utc=True),
)
data_total.index = enforce_utc_timezone(data_total.index)
return data_total
def _read_single_file(self, file_path: str) -> pd.DataFrame:
"""Read HpGFZ file to a DataFrame.
Parameters
----------
file_path : Path
Path to the file.
Returns
-------
pd.DataFrame
HpGFZ data.
"""
hp_df = pd.read_csv(file_path, names=["t", str(self.index)])
hp_df["t"] = pd.to_datetime(hp_df["t"], utc=True)
hp_df.index = hp_df["t"]
hp_df = hp_df.drop(labels=["t"], axis=1)
return hp_df # noqa: RET504
[docs]
class Hp30GFZ(HpGFZ):
"""A class to handle Hp30 data from GFZ.
Parameters
----------
data_dir : str | Path, optional
Data directory for the Hp30 data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument.
If False (default), the passed data_dir is used if provided, otherwise the environment variable is used.
"""
def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
super().__init__("hp30", data_dir, prefer_env_var=prefer_env_var)
[docs]
class Hp60GFZ(HpGFZ):
"""A class to handle Hp30 data from GFZ.
Parameters
----------
data_dir : str | Path, optional
Data directory for the Hp30 data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument.
If False (default), the passed data_dir is used if provided, otherwise the environment variable is used.
"""
def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
super().__init__("hp60", data_dir, prefer_env_var=prefer_env_var)