Source code for swvo.io.f10_7.swpc

# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0

"""
Module for handling F10.7 data from SWPC.
"""

from __future__ import annotations

import logging
import shutil
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path

import numpy as np
import pandas as pd
import requests

from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone

logger = logging.getLogger(__name__)

logging.captureWarnings(True)


[docs] class F107SWPC(BaseIO): """This is a class for the SWPC F107 data. Methods ------- download_and_process read Raises ------ ValueError Returns `ValueError` if necessary environment variable is not set. """ ENV_VAR_NAME = "RT_SWPC_F107_DIR" URL = "https://services.swpc.noaa.gov/text/" NAME_F107 = "daily-solar-indices.txt" LABEL = "swpc" def _is_within_download_range(self, target_date: datetime) -> bool: """Check if a date is within the last 30 days. Parameters ---------- target_date : datetime Date to check. Returns ------- bool True if the date is within the last 30 days, False otherwise. """ now = datetime.now(timezone.utc) thirty_days_ago = now - timedelta(days=30) return target_date >= thirty_days_ago def _get_processed_file_list( self, start_time: datetime, end_time: datetime ) -> tuple[list[Path], list[tuple[datetime, datetime]]]: """Get list of file paths and their corresponding time intervals. Returns ------- Tuple[List, List] List of file paths and time intervals. """ years_needed = range(start_time.year, end_time.year + 1) file_paths = [self.data_dir / f"SWPC_F107_{year}.csv" for year in years_needed] time_intervals = [ ( datetime(year, 1, 1, tzinfo=timezone.utc), datetime(year, 12, 31, 23, 59, 59, tzinfo=timezone.utc), ) for year in years_needed ] return file_paths, time_intervals
[docs] def download_and_process(self) -> None: """Download and process the latest 30-day F10.7 data. Returns ------- None """ temp_dir = Path("./temp_f107") temp_dir.mkdir(exist_ok=True) logger.debug("Downloading F10.7 data...") self._download(temp_dir, self.NAME_F107) logger.debug("Processing F10.7 data...") new_data = self._process_single_file(temp_dir / self.NAME_F107) for year, year_data in new_data.groupby(new_data.date.dt.year): file_path = self.data_dir / f"SWPC_F107_{year}.csv" tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") try: if file_path.expanduser().exists(): logger.debug(f"Updating {file_path}...") existing_data = pd.read_csv(file_path, parse_dates=["date"]) existing_data["date"] = pd.to_datetime(existing_data["date"]).dt.tz_localize(None) combined_data = pd.concat([existing_data, year_data]) combined_data = combined_data.drop_duplicates(subset=["date"], keep="last") combined_data = combined_data.sort_values("date") new_records = len(combined_data) - len(existing_data) logger.debug(f"Added {new_records} new records to {year}") else: logger.debug(f"Creating new file for {year}") combined_data = year_data combined_data.to_csv(tmp_path, index=False) tmp_path.replace(file_path) except Exception as e: logger.error(f"Failed to process file for year {year}: {e}") if tmp_path.exists(): tmp_path.unlink() continue shutil.rmtree(temp_dir, ignore_errors=True)
def _download(self, temp_dir: Path, filename: str) -> None: """Download a file from SWPC server. Parameters ---------- temp_dir : Path Temporary directory to store the downloaded file. filename : str Name of the file to download. Raises ------ requests.HTTPError If the HTTP request fails. FileNotFoundError If the downloaded file is empty. """ response = requests.get(self.URL + filename) response.raise_for_status() with open(temp_dir / filename, "wb") as f: f.write(response.content) if (temp_dir / filename).stat().st_size == 0: msg = f"Error downloading file: {self.URL + filename}" raise FileNotFoundError(msg) def _process_single_file(self, file_path: Path) -> pd.DataFrame: """Read and process the F10.7 data file. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame Data from yearly F10.7 file. """ data = pd.read_csv( file_path, sep=r"\s+", skiprows=13, usecols=[0, 1, 2, 3], names=["year", "month", "day", "f107"], ) data["date"] = pd.to_datetime(data[["year", "month", "day"]].assign(hour=0)) data = data[["date", "f107"]] return data # noqa: RET504
[docs] def read(self, start_time: datetime, end_time: datetime, *, download: bool = False) -> pd.DataFrame: """Read F10.7 SWPC data for the given time range. Parameters ---------- start_time : datetime Start time of the data to read. Must be timezone-aware. end_time : datetime End time of the data to read. Must be timezone-aware. download : bool, optional Download data on the go, defaults to False. Returns ------- :class:`pandas.DataFrame` F10.7 data. Raises ------ ValueError Raises ValueError if `start_time` is `after end_time`. """ if start_time > end_time: msg = "start_time must be before end_time" logger.error(msg) raise ValueError(msg) start_time = enforce_utc_timezone(start_time) end_time = enforce_utc_timezone(end_time) file_paths, _ = self._get_processed_file_list(start_time, end_time) t = pd.date_range( datetime(start_time.year, start_time.month, start_time.day), datetime( end_time.year, end_time.month, end_time.day, ), freq=timedelta(days=1), ) data_out = pd.DataFrame(index=t) data_out["f107"] = np.array([np.nan] * len(t)) data_out["date"] = data_out.index data_out["file_name"] = np.array([None] * len(t)) for file_path in file_paths: if not file_path.exists(): if download: year = int(file_path.stem.split("_")[-1]) year_end = datetime(year, 12, 31, 23, 59, 59, tzinfo=timezone.utc) if not self._is_within_download_range(year_end): logger.warning( f"Cannot download data for year {year}. " f"Only data from the last 30 days can be downloaded from SWPC." ) continue self.download_and_process() else: warnings.warn(f"File {file_path} not found") continue df_one_file = self._read_single_file(file_path) data_out = df_one_file.combine_first(data_out) if not data_out.empty: data_out.index = enforce_utc_timezone(data_out.index) data_out.drop("date", axis=1, inplace=True) data_out = data_out.truncate( before=start_time - timedelta(hours=23.9999), after=end_time + timedelta(hours=23.9999), ) return data_out
def _read_single_file(self, file_path: Path) -> pd.DataFrame: """Read yearly F107 file to a DataFrame. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame Data from yearly F10.7 SWPC Resolution file. """ df = pd.read_csv(file_path) df["date"] = pd.to_datetime(df["date"]) df.index = df["date"] df["file_name"] = file_path df.loc[df["f107"].isna(), "file_name"] = None return df