Source code for swvo.io.hp.gfz

# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
from typing import Optional

import numpy as np
import pandas as pd
import requests

from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone

logger = logging.getLogger(__name__)


[docs] class HpGFZ(BaseIO): """This is a base class for HpGFZ data. Parameters ---------- index : str Hp index. Possible options are: hp30, hp60. data_dir : Path | None Data directory for the Hp data. If not provided, it will be read from the environment variable prefer_env_var : bool, optional If True, the environment variable takes precedence over the passed data_dir argument. If False (default), the passed data_dir is used if provided, otherwise the environment variable is used. Methods ------- download_and_process read Raises ------ ValueError Returns `ValueError` if necessary environment variable is not set. """ ENV_VAR_NAME = "RT_HP_GFZ_STREAM_DIR" START_YEAR = 1985 API_URL = "https://kp.gfz.de/app/json/" LABEL = "gfz" def __init__(self, index: str, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None: """Initialize HpGFZ. Parameters ---------- index : str Hp index. Possible options are: hp30, hp60. data_dir : Path | None Data directory for the Hp data. If not provided, it will be read from the environment variable prefer_env_var : bool, optional If True, the environment variable takes precedence over the passed data_dir argument, by default False """ self.index = index if self.index not in ("hp30", "hp60"): msg = f"Encountered invalid index: {self.index}. Possible options are: hp30, hp60!" raise ValueError(msg) super().__init__(data_dir=data_dir, prefer_env_var=prefer_env_var) self.index_number: int = int(index[2:]) self.data_dir.mkdir(parents=True, exist_ok=True) logger.info(f"{self.index.upper()} GFZ data directory: {self.data_dir}") (self.data_dir / str(self.index)).mkdir(exist_ok=True)
[docs] def download_and_process( self, start_time: datetime, end_time: datetime, *, reprocess_files: bool = False, ) -> None: """Download and process HpGFZ data. Parameters ---------- start_time : datetime Start time of the data to be downloaded. end_time : datetime End time of the data to be downloaded. reprocess_files : bool, optional Downloads and processes the files again, defaults to False, by default False Returns ------- None """ temporary_dir = Path("./temp_hp_wget") temporary_dir.mkdir(exist_ok=True, parents=True) file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) for file_path, time_interval in zip(file_paths, time_intervals): if file_path.exists() and not reprocess_files: continue tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") try: # Download data for this time interval self._download(temporary_dir, time_interval[0], time_interval[1]) # Process the downloaded data processed_df = self._process_single_file(temporary_dir, time_interval[0]) file_path.parent.mkdir(parents=True, exist_ok=True) processed_df.to_csv(tmp_path, index=True, header=False) tmp_path.replace(file_path) except Exception as e: logger.error(f"Failed to process {file_path}: {e}") if tmp_path.exists(): tmp_path.unlink() continue rmtree(temporary_dir, ignore_errors=True)
def _download(self, temporary_dir: Path, start_time: datetime, end_time: datetime) -> None: """Download data from the GFZ API. Parameters ---------- temporary_dir : Path Temporary directory to store the downloaded file. start_time : datetime Start time for the data request. end_time : datetime End time for the data request. Raises ------ Exception If the API request fails. """ # Format datetime to ISO format with Z timezone start_str = start_time.isoformat().replace("+00:00", "Z") end_str = end_time.isoformat().replace("+00:00", "Z") # Determine the index parameter for the API index_param = f"Hp{self.index_number}" url = f"{self.API_URL}?start={start_str}&end={end_str}&index={index_param}&status=def" logger.debug(f"Downloading data from {url} ...") try: response = requests.get(url, timeout=30) response.raise_for_status() data = response.json() logger.debug(f"Downloaded data: {len(data)} records") # Save the JSON response to a temporary file for processing output_file = temporary_dir / f"hp_data_{start_time.year}.json" with open(output_file, "w") as f: json.dump(data, f) except requests.exceptions.RequestException as e: logger.error(f"API request failed: {e}") raise
[docs] def read(self, start_time: datetime, end_time: datetime, *, download: bool = False) -> pd.DataFrame: """Read HpGFZ data for the given time range. Parameters ---------- start_time : datetime Start time of the data to read. Must be timezone-aware. end_time : datetime End time of the data to read. Must be timezone-aware. download : bool, optional Download data on the go, defaults to False. Returns ------- :class:`pandas.DataFrame` HpGFZ data for the given time range. """ if start_time > end_time: msg = "start_time must be before end_time" logger.error(msg) raise ValueError(msg) start_time = enforce_utc_timezone(start_time) end_time = enforce_utc_timezone(end_time) if start_time < datetime(self.START_YEAR, 1, 1, tzinfo=timezone.utc): logger.warning( "Start date chosen falls behind the mission starting year. Moving start date to first" " available mission files..." ) start_time = datetime(self.START_YEAR, 1, 1, tzinfo=timezone.utc) assert start_time < end_time file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) # initialize data frame with NaNs t = pd.date_range( datetime(start_time.year, start_time.month, start_time.day, tzinfo=timezone.utc), datetime( end_time.year, end_time.month, end_time.day, 23, 59, 59, tzinfo=timezone.utc, ), freq=timedelta(minutes=int(self.index_number)), ) data_out = pd.DataFrame(index=t) data_out[self.index] = np.array([np.nan] * len(t)) for file_path, _ in zip(file_paths, time_intervals): logger.info(f"Processing file {file_path} ...") if not file_path.expanduser().exists() and download: self.download_and_process(start_time, end_time) # if we request a date in the future, the file will still not be found here if not file_path.expanduser().exists(): logger.warning(f"File {file_path} not found, filling with NaNs") continue df_one_file = self._read_single_file(file_path) # combine the new file with the old ones, replace all values present in df_one_file in data_out data_out = df_one_file.combine_first(data_out) data_out = data_out.truncate( before=start_time - timedelta(minutes=int(self.index_number) - 0.01), after=end_time + timedelta(minutes=int(self.index_number) + 0.01), ) return data_out # noqa: RET504
def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> tuple[list, list]: """Get list of file paths and their corresponding time intervals. Returns ------- Tuple[List, List] List of file paths and time intervals. """ file_paths = [] time_intervals = [] current_time = datetime(start_time.year, 1, 1, 0, 0, 0, tzinfo=timezone.utc) end_time = datetime(end_time.year, 12, 31, 23, 59, 59, tzinfo=timezone.utc) while current_time < end_time: file_path = self.data_dir / self.index / f"Hp{self.index_number}_GFZ_{current_time.strftime('%Y')}.csv" file_paths.append(file_path) interval_start = current_time interval_end = datetime(current_time.year, 12, 31, 23, 59, 59, tzinfo=timezone.utc) time_intervals.append((interval_start, interval_end)) current_time = datetime(current_time.year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc) return file_paths, time_intervals def _process_single_file(self, temp_dir: Path, start_time: datetime) -> pd.DataFrame: """Process HpGFZ data from JSON response to a DataFrame. Parameters ---------- temp_dir : Path Temporary directory containing the JSON file. start_time : datetime Start time to identify the correct JSON file. Returns ------- pd.DataFrame HpGFZ data. """ data_total = pd.DataFrame() json_file = temp_dir / f"hp_data_{start_time.year}.json" if not json_file.exists(): logger.warning(f"JSON file {json_file} not found") return data_total with open(json_file) as f: json_data = json.load(f) data_total = pd.DataFrame( {f"Hp{self.index_number}": json_data[f"Hp{self.index_number}"]}, index=pd.to_datetime(json_data["datetime"], utc=True), ) data_total.index = enforce_utc_timezone(data_total.index) return data_total def _read_single_file(self, file_path: str) -> pd.DataFrame: """Read HpGFZ file to a DataFrame. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame HpGFZ data. """ hp_df = pd.read_csv(file_path, names=["t", str(self.index)]) hp_df["t"] = pd.to_datetime(hp_df["t"], utc=True) hp_df.index = hp_df["t"] hp_df = hp_df.drop(labels=["t"], axis=1) return hp_df # noqa: RET504
[docs] class Hp30GFZ(HpGFZ): """A class to handle Hp30 data from GFZ. Parameters ---------- data_dir : str | Path, optional Data directory for the Hp30 data. If not provided, it will be read from the environment variable prefer_env_var : bool, optional If True, the environment variable takes precedence over the passed data_dir argument. If False (default), the passed data_dir is used if provided, otherwise the environment variable is used. """ def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None: super().__init__("hp30", data_dir, prefer_env_var=prefer_env_var)
[docs] class Hp60GFZ(HpGFZ): """A class to handle Hp30 data from GFZ. Parameters ---------- data_dir : str | Path, optional Data directory for the Hp30 data. If not provided, it will be read from the environment variable prefer_env_var : bool, optional If True, the environment variable takes precedence over the passed data_dir argument. If False (default), the passed data_dir is used if provided, otherwise the environment variable is used. """ def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None: super().__init__("hp60", data_dir, prefer_env_var=prefer_env_var)