# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0
"""
Module for handling F10.7 data from SWPC.
"""
from __future__ import annotations
import logging
import shutil
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
import numpy as np
import pandas as pd
import requests
from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone
logger = logging.getLogger(__name__)
logging.captureWarnings(True)
[docs]
class F107SWPC(BaseIO):
"""This is a class for the SWPC F107 data.
Methods
-------
download_and_process
read
Raises
------
ValueError
Returns `ValueError` if necessary environment variable is not set.
"""
ENV_VAR_NAME = "RT_SWPC_F107_DIR"
URL = "https://services.swpc.noaa.gov/text/"
NAME_F107 = "daily-solar-indices.txt"
LABEL = "swpc"
def _is_within_download_range(self, target_date: datetime) -> bool:
"""Check if a date is within the last 30 days.
Parameters
----------
target_date : datetime
Date to check.
Returns
-------
bool
True if the date is within the last 30 days, False otherwise.
"""
now = datetime.now(timezone.utc)
thirty_days_ago = now - timedelta(days=30)
return target_date >= thirty_days_ago
def _get_processed_file_list(
self, start_time: datetime, end_time: datetime
) -> tuple[list[Path], list[tuple[datetime, datetime]]]:
"""Get list of file paths and their corresponding time intervals.
Returns
-------
Tuple[List, List]
List of file paths and time intervals.
"""
years_needed = range(start_time.year, end_time.year + 1)
file_paths = [self.data_dir / f"SWPC_F107_{year}.csv" for year in years_needed]
time_intervals = [
(
datetime(year, 1, 1, tzinfo=timezone.utc),
datetime(year, 12, 31, 23, 59, 59, tzinfo=timezone.utc),
)
for year in years_needed
]
return file_paths, time_intervals
[docs]
def download_and_process(self) -> None:
"""Download and process the latest 30-day F10.7 data.
Returns
-------
None
"""
temp_dir = Path("./temp_f107")
temp_dir.mkdir(exist_ok=True)
logger.debug("Downloading F10.7 data...")
self._download(temp_dir, self.NAME_F107)
logger.debug("Processing F10.7 data...")
new_data = self._process_single_file(temp_dir / self.NAME_F107)
for year, year_data in new_data.groupby(new_data.date.dt.year):
file_path = self.data_dir / f"SWPC_F107_{year}.csv"
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
try:
if file_path.expanduser().exists():
logger.debug(f"Updating {file_path}...")
existing_data = pd.read_csv(file_path, parse_dates=["date"])
existing_data["date"] = pd.to_datetime(existing_data["date"]).dt.tz_localize(None)
combined_data = pd.concat([existing_data, year_data])
combined_data = combined_data.drop_duplicates(subset=["date"], keep="last")
combined_data = combined_data.sort_values("date")
new_records = len(combined_data) - len(existing_data)
logger.debug(f"Added {new_records} new records to {year}")
else:
logger.debug(f"Creating new file for {year}")
combined_data = year_data
combined_data.to_csv(tmp_path, index=False)
tmp_path.replace(file_path)
except Exception as e:
logger.error(f"Failed to process file for year {year}: {e}")
if tmp_path.exists():
tmp_path.unlink()
continue
shutil.rmtree(temp_dir, ignore_errors=True)
def _download(self, temp_dir: Path, filename: str) -> None:
"""Download a file from SWPC server.
Parameters
----------
temp_dir : Path
Temporary directory to store the downloaded file.
filename : str
Name of the file to download.
Raises
------
requests.HTTPError
If the HTTP request fails.
FileNotFoundError
If the downloaded file is empty.
"""
response = requests.get(self.URL + filename)
response.raise_for_status()
with open(temp_dir / filename, "wb") as f:
f.write(response.content)
if (temp_dir / filename).stat().st_size == 0:
msg = f"Error downloading file: {self.URL + filename}"
raise FileNotFoundError(msg)
def _process_single_file(self, file_path: Path) -> pd.DataFrame:
"""Read and process the F10.7 data file.
Parameters
----------
file_path : Path
Path to the file.
Returns
-------
pd.DataFrame
Data from yearly F10.7 file.
"""
data = pd.read_csv(
file_path,
sep=r"\s+",
skiprows=13,
usecols=[0, 1, 2, 3],
names=["year", "month", "day", "f107"],
)
data["date"] = pd.to_datetime(data[["year", "month", "day"]].assign(hour=0))
data = data[["date", "f107"]]
return data # noqa: RET504
[docs]
def read(self, start_time: datetime, end_time: datetime, *, download: bool = False) -> pd.DataFrame:
"""Read F10.7 SWPC data for the given time range.
Parameters
----------
start_time : datetime
Start time of the data to read. Must be timezone-aware.
end_time : datetime
End time of the data to read. Must be timezone-aware.
download : bool, optional
Download data on the go, defaults to False.
Returns
-------
:class:`pandas.DataFrame`
F10.7 data.
Raises
------
ValueError
Raises ValueError if `start_time` is `after end_time`.
"""
if start_time > end_time:
msg = "start_time must be before end_time"
logger.error(msg)
raise ValueError(msg)
start_time = enforce_utc_timezone(start_time)
end_time = enforce_utc_timezone(end_time)
file_paths, _ = self._get_processed_file_list(start_time, end_time)
t = pd.date_range(
datetime(start_time.year, start_time.month, start_time.day),
datetime(
end_time.year,
end_time.month,
end_time.day,
),
freq=timedelta(days=1),
)
data_out = pd.DataFrame(index=t)
data_out["f107"] = np.array([np.nan] * len(t))
data_out["date"] = data_out.index
data_out["file_name"] = np.array([None] * len(t))
for file_path in file_paths:
if not file_path.exists():
if download:
year = int(file_path.stem.split("_")[-1])
year_end = datetime(year, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
if not self._is_within_download_range(year_end):
logger.warning(
f"Cannot download data for year {year}. "
f"Only data from the last 30 days can be downloaded from SWPC."
)
continue
self.download_and_process()
else:
warnings.warn(f"File {file_path} not found")
continue
df_one_file = self._read_single_file(file_path)
data_out = df_one_file.combine_first(data_out)
if not data_out.empty:
data_out.index = enforce_utc_timezone(data_out.index)
data_out.drop("date", axis=1, inplace=True)
data_out = data_out.truncate(
before=start_time - timedelta(hours=23.9999),
after=end_time + timedelta(hours=23.9999),
)
return data_out
def _read_single_file(self, file_path: Path) -> pd.DataFrame:
"""Read yearly F107 file to a DataFrame.
Parameters
----------
file_path : Path
Path to the file.
Returns
-------
pd.DataFrame
Data from yearly F10.7 SWPC Resolution file.
"""
df = pd.read_csv(file_path)
df["date"] = pd.to_datetime(df["date"])
df.index = df["date"]
df["file_name"] = file_path
df.loc[df["f107"].isna(), "file_name"] = None
return df