Source code for swvo.io.kp.swpc

# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0

"""
Module for handling SWPC Kp data.
"""

import logging
import re
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
from typing import Optional

import numpy as np
import pandas as pd
import requests

from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone

logger = logging.getLogger(__name__)

logging.captureWarnings(True)


[docs] class KpSWPC(BaseIO): """ A class for handling SWPC Kp data. In SWPC data, the file for current day always contains the forecast for the next 3 days. Keep this in mind when using the `read` and `download_and_process` methods. Parameters ---------- data_dir : Path | None Data directory for the SWPC Kp data. If not provided, it will be read from the environment variable Methods ------- download_and_process read Raises ------ ValueError Returns `ValueError` if necessary environment variable is not set """ ENV_VAR_NAME = "RT_KP_SWPC_STREAM_DIR" URL = "https://services.swpc.noaa.gov/text/" NAME = "3-day-forecast.txt" LABEL = "swpc"
[docs] def download_and_process(self, target_date: datetime, reprocess_files: bool = False) -> None: """ Download and process SWPC Kp data file. Parameters ---------- target_date : datetime Target date for the Kp data, reprocess_files : bool, optional Downloads and processes the files again, defaults to False, by default False Raises ------ ValueError Raises `ValueError` if the target date is in the past. """ if target_date.date() < datetime.now(timezone.utc).date(): raise ValueError("We can only download and progress a Kp SWPC file for the current day!") file_path = ( self.data_dir / target_date.strftime("%Y/%m") / f"SWPC_KP_FORECAST_{target_date.strftime('%Y%m%d')}.csv" ) if file_path.exists() and not reprocess_files: return tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") temporary_dir = Path("./temp_kp_swpc_wget") temporary_dir.mkdir(exist_ok=True, parents=True) try: logger.debug(f"Downloading file {self.URL + self.NAME} ...") self._download(temporary_dir, self.NAME) logger.debug("Processing file ...") processed_df = self._process_single_file(temporary_dir) file_path.parent.mkdir(parents=True, exist_ok=True) processed_df.to_csv(tmp_path, index=False, header=False) tmp_path.replace(file_path) logger.debug(f"Saving processed file {file_path}") except Exception as e: logger.error(f"Failed to download and process {file_path}: {e}") if tmp_path.exists(): tmp_path.unlink() raise finally: rmtree(temporary_dir, ignore_errors=True)
def _download(self, temporary_dir: Path, filename: str) -> None: """Download a file from SWPC server. Parameters ---------- temporary_dir : Path Temporary directory to store the downloaded file. filename : str Name of the file to download. Raises ------ requests.HTTPError If the HTTP request fails. """ response = requests.get(self.URL + filename, timeout=10) response.raise_for_status() with open(temporary_dir / filename, "wb") as f: f.write(response.content)
[docs] def read(self, start_time: datetime, end_time: Optional[datetime] = None, download: bool = False) -> pd.DataFrame: """ Read Kp data for the specified time range. Parameters ---------- start_time : datetime Start time of the data to read. end_time : datetime, optional End time of the data to read. If not provided, it will be set to 3 days after `start_time`. download : bool, optional Download data on the go, defaults to False. Returns ------- pd.DataFrame SWPC Kp dataframe. Raises ------ ValueError Raises `ValueError` if the time range is more than 3 days. """ start_time = enforce_utc_timezone(start_time) if end_time is not None: end_time = enforce_utc_timezone(end_time) if end_time is None: end_time = start_time + timedelta(days=3) if (end_time - start_time).days > 3: msg = "The difference between `end_time` and `start_time` should be less than 3 days. We can only read 3 days at a time of Kp SWPC!" logger.error(msg) raise ValueError(msg) if start_time > end_time: msg = "start_time must be before end_time" logger.error(msg) raise ValueError(msg) t = pd.date_range( datetime(start_time.year, start_time.month, start_time.day), datetime(end_time.year, end_time.month, end_time.day, 23, 59, 59), freq=timedelta(hours=3), ) data_out = pd.DataFrame(index=t) data_out.index = enforce_utc_timezone(data_out.index) data_out["kp"] = np.array([np.nan] * len(t)) data_out["file_name"] = np.array([np.nan] * len(t)) file_name_time = start_time file_path = ( self.data_dir / file_name_time.strftime("%Y/%m") / f"SWPC_KP_FORECAST_{file_name_time.strftime('%Y%m%d')}.csv" ) logger.info(f"Reading from {file_path}") if not file_path.exists() and download: self.download_and_process(start_time) if file_path.exists(): df_one_file = self._read_single_file(file_path) data_out = df_one_file.combine_first(data_out) else: warnings.warn(f"File {file_path} not found") data_out = data_out.truncate( before=start_time - timedelta(hours=2.9999), after=end_time + timedelta(hours=2.9999), ) return data_out
def _read_single_file(self, file_path: Path) -> pd.DataFrame: """Read Kp file to a DataFrame. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame Data from Kp file. """ df = pd.read_csv(file_path, names=["t", "kp"]) df["t"] = pd.to_datetime(df["t"], utc=True) df.index = df["t"] df.drop(labels=["t"], axis=1, inplace=True) df["file_name"] = file_path df.loc[df["kp"].isna(), "file_name"] = None return df def _process_single_file(self, temporary_dir: Path) -> pd.DataFrame: """Process Kp file to a DataFrame. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame Kp data. """ first_line = None dates = [] year = None kp_data = [] with open(temporary_dir / self.NAME) as f: lines = f.readlines() for line in lines: if ":Issued:" in line: year = int(re.search(r"(\d{4})", line).group(1)) # ty: ignore[unresolved-attribute] break for i, line in enumerate(lines): if "NOAA Kp index breakdown" in line: first_line = i + 2 break headers = lines[first_line].split() # ty: ignore[invalid-argument-type] headers = [headers[i] + " " + headers[i + 1] for i in range(0, len(headers), 2)] for d in headers: try: if any("Dec" in month for month in headers) and "Jan" in d: parsed_date = self._parse_date(d, year + 1) # ty: ignore[unsupported-operator] else: parsed_date = self._parse_date(d, year) dates.append(parsed_date) except ValueError: raise for line in lines[first_line + 1 : first_line + 9]: # ty: ignore[unsupported-operator] values = [float(val) for val in line.split()[1:] if re.match(r"^\d+\.\d+$", val)] kp_data.append(values) kp = [] timestamp = [] for i, day in enumerate(dates): for j in range(8): timestamp.append(day + timedelta(hours=3 * j)) kp.append(kp_data[j][i]) time_in = [timestamp[0]] * 24 df = pd.DataFrame({"t_forecast": timestamp}, index=time_in) df["kp"] = kp df.index.rename("t", inplace=True) return df def _parse_date(self, date_str, year): return datetime.strptime(f"{year} {date_str}", "%Y %b %d")