Source code for swvo.io.dst.wdc

# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0

"""
Module for handling WDC Dst data.
"""

import logging
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
from typing import List, Tuple

import numpy as np
import pandas as pd
import requests

from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone

logger = logging.getLogger(__name__)

logging.captureWarnings(True)


[docs] class DSTWDC(BaseIO): """This is a class for the WDC Dst data. Parameters ---------- data_dir : Path | None Data directory for the WDC Dst data. If not provided, it will be read from the environment variable Methods ------- download_and_process read Raises ------ ValueError Raises `ValueError` if necessary environment variable is not set. """ ENV_VAR_NAME = "WDC_STREAM_DIR" URL = "https://wdc.kugi.kyoto-u.ac.jp/dst_realtime/YYYYMM/" LABEL = "wdc"
[docs] def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process WDC Dst data files. Parameters ---------- start_time : datetime Start time of the data to download. Must be timezone-aware. end_time : datetime End time of the data to download. Must be timezone-aware. reprocess_files : bool, optional Downloads and processes the files again, defaults to False, by default False Returns ------- None """ assert start_time < end_time, "Start time must be before end time" temporary_dir = Path("./temp_wdc") temporary_dir.mkdir(exist_ok=True, parents=True) file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) for file_path, time_interval in zip(file_paths, time_intervals): filename = "index.html" if file_path.exists() and not reprocess_files: continue tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") URL = self.URL.replace("YYYYMM", time_interval.strftime("%Y%m")) if file_path.exists(): if reprocess_files: file_path.unlink() else: continue try: logger.debug(f"Downloading file {URL} ...") response = requests.get(URL, timeout=10) if response.status_code == 404: logger.warning(f"WDC Dst data not found at {URL}") continue response.raise_for_status() data = response.text.splitlines() with open(temporary_dir / filename, "w") as file: file.write("\n".join(data)) logger.debug("Processing file ...") processed_df = self._process_single_file( temporary_dir / filename, year=time_interval.year, month=time_interval.month, ) processed_df.to_csv(tmp_path, index=True, header=True) tmp_path.replace(file_path) except Exception as e: logger.error(f"Failed to process {file_path}: {e}") if tmp_path.exists(): tmp_path.unlink() continue rmtree(temporary_dir, ignore_errors=True)
def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]: """Get list of file paths and their corresponding time intervals. Returns ------- Tuple[List, List] List of file paths and time intervals. """ file_paths = [] time = [] current_time = datetime(start_time.year, start_time.month, 1) if end_time.month == 12 and end_time.month == 12: end_time = datetime(end_time.year + 1, 1, 1, 0, 0, 0) else: end_time = datetime(end_time.year, end_time.month + 1, 1) while current_time < end_time: # Create yearly subdirectory year_dir = self.data_dir / str(current_time.year) year_dir.mkdir(parents=True, exist_ok=True) file_path = year_dir / f"WDC_DST_{current_time.strftime('%Y%m')}.csv" file_paths.append(file_path) file_time = current_time time.append(file_time) # Increment the month if current_time.month == 12 and current_time.month == 12: current_time = datetime(current_time.year + 1, 1, 1, 0, 0, 0) else: current_time = datetime(current_time.year, current_time.month + 1, 1, 0, 0, 0) return file_paths, time def _process_single_file(self, file_path: Path, year, month) -> pd.DataFrame: """Process yearly WDC Dst file to a DataFrame. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame YearlyWDC Dst data. """ with open(file_path, "r") as file: text = file.read() data = text.split("DAY")[-1] data = data.split("<!-- vvvvv S yyyymm_part3.html vvvvv -->", 1)[0] lines = data.strip().splitlines() records = [] # Skip header and any non-data lines for line in lines: numbers = re.findall(r"[-+]?\d+", line) if not numbers: continue day = int(numbers[0]) dst_values = numbers[1:] for hour, val in enumerate(dst_values): if val.startswith("9999"): continue if len(val) > 4: val = val[:4] if not val.startswith("9999") else None try: dst = float(val) # ty:ignore[invalid-argument-type] except: # noqa: E722 continue dt = datetime(year, month, day, hour) records.append({"t": dt, "dst": dst}) df = pd.DataFrame(records) df.reset_index(drop=True, inplace=True) df["t"] = pd.to_datetime(df["t"], utc=True) df.index = df["t"] df.drop(columns=["t"], inplace=True) df.index.rename("timestamp", inplace=True) file_path.unlink() return df
[docs] def read(self, start_time: datetime, end_time: datetime, download: bool = False) -> pd.DataFrame: """ Read WDC Dst data for the given time range. it always returns the data until the last day of the month or incase of current month, until the current day. Parameters ---------- start_time : datetime Start time of the data to read. Must be timezone-aware. end_time : datetime End time of the data to read. Must be timezone-aware. download : bool, optional Download data on the go, defaults to False. Returns ------- :class:`pandas.DataFrame` WDC Dst data. """ if start_time > end_time: msg = "start_time must be before end_time" logger.error(msg) raise ValueError(msg) start_time = enforce_utc_timezone(start_time) end_time = enforce_utc_timezone(end_time) assert start_time < end_time, "Start time must be before end time!" file_paths, _ = self._get_processed_file_list(start_time, end_time) t = pd.date_range( datetime(start_time.year, start_time.month, start_time.day), datetime(end_time.year, end_time.month, end_time.day, 23, 00, 00), freq=timedelta(hours=1), tz=timezone.utc, ) data_out = pd.DataFrame(index=t) data_out["dst"] = np.array([np.nan] * len(t)) data_out["file_name"] = np.array([None] * len(t)) for file_path in file_paths: if not file_path.exists(): if download: self.download_and_process(start_time, end_time) else: logger.warning(f"File {file_path} not found") continue if not file_path.exists(): logger.warning(f"File {file_path} not found, filling with NaNs") continue df_one_file = self._read_single_file(file_path) data_out = df_one_file.combine_first(data_out) data_out = data_out.truncate( before=start_time - timedelta(hours=0.9999), after=end_time + timedelta(hours=0.9999), ) data_out.index.rename("t", inplace=True) data_out = data_out[["dst", "file_name"]] return data_out
def _read_single_file(self, file_path: Path) -> pd.DataFrame: """Read yearlyWDC Dst file to a DataFrame. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame Data from yearly WDC Dst file. """ df = pd.read_csv(file_path) df.index = pd.to_datetime(df["timestamp"], utc=True) df.index.rename("t", inplace=True) df["file_name"] = file_path df.loc[df["dst"].isna(), "file_name"] = None return df