Source code for swvo.io.sme.supermag

# SPDX-FileCopyrightText: 2026 GFZ Helmholtz Centre for Geosciences
# SPDX-FileContributor: Dmitrii Gurev
#
# SPDX-License-Identifier: Apache-2.0

"""
Module for handling SuperMAG SME data.
"""

import json
import logging
import re
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
from typing import List, Tuple

import numpy as np
import pandas as pd
import requests

from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone

logger = logging.getLogger(__name__)

logging.captureWarnings(True)


[docs] class SMESuperMAG(BaseIO): """Class for SuperMAG SME data. Parameters ---------- username : str SuperMAG username used for authenticated data access (register at the SuperMAG website to obtain one) data_dir : Path | None Data directory for the SuperMAG SME data. If not provided, it will be read from the environment variable prefer_env_var : bool, optional If True, the environment variable takes precedence over the passed data_dir argument. If False (default), the passed data_dir is used if provided, otherwise the environment variable is used. Methods ------- download_and_process read Raises ------ ValueError Raised if the required environment variable is not set. """ ENV_VAR_NAME = "SUPERMAG_STREAM_DIR" LABEL = "supermag" def __init__(self, username: str, data_dir: Path | None = None, prefer_env_var: bool = False) -> None: super().__init__(data_dir, prefer_env_var) self.username = username
[docs] def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process SuperMAG SME data files. Parameters ---------- start_time : datetime Start time of the data to download. Must be timezone-aware. end_time : datetime End time of the data to download. Must be timezone-aware. reprocess_files : bool, optional Download and process files again. Defaults to False. Returns ------- None """ assert start_time < end_time, "Start time must be before end time" temporary_dir = Path("./temp_supermag") temporary_dir.mkdir(exist_ok=True, parents=True) file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) for file_path, time_interval in zip(file_paths, time_intervals): if file_path.exists() and not reprocess_files: continue tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") try: start_str = time_interval.strftime("%Y-%m-%dT%H:%M") extent = int(timedelta(days=1).total_seconds()) url = ( "https://supermag.jhuapl.edu/services/indices.php" f"?python&nohead&logon={self.username}" f"&start={start_str}" f"&extent={extent}" "&indices=sme" ) logger.debug(f"Downloading data from {url} ...") response = requests.get(url) response.raise_for_status() data = response.text.splitlines() if data[0].startswith("ERROR"): err = f"SuperMAG {data[0]}" raise ValueError(err) filename = "index.html" with open(temporary_dir / filename, "w") as file: file.write("\n".join(data)) logger.debug("Processing file ...") processed_df = self._process_single_file(temporary_dir / filename) processed_df.to_csv(tmp_path, index=True, header=True) tmp_path.replace(file_path) except Exception as e: logger.error(f"Failed to process {file_path}: {e}") if tmp_path.exists(): tmp_path.unlink() continue rmtree(temporary_dir, ignore_errors=True)
def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]: """Get a list of file paths and their corresponding time intervals. Returns ------- Tuple[List, List] List of file paths and time intervals. """ file_paths = [] time = [] current_time = start_time.replace(hour=0, minute=0, second=0, microsecond=0) end_time = end_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) while current_time < end_time: file_path = self.data_dir / f"SuperMAG_SME_{current_time.strftime('%Y%m%d')}.csv" file_paths.append(file_path) file_time = current_time time.append(file_time) # Increment the day current_time = current_time + timedelta(days=1) return file_paths, time def _process_single_file(self, file_path: Path) -> pd.DataFrame: """Process daily SuperMAG SME file into a DataFrame. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame Processed SuperMAG SME data. Raises ------ ValueError If no JSON object/array is found in the downloaded file. """ with open(file_path, "r") as file: text = file.read() match = re.search(r"(\{.*\}|\[.*\])", text, re.S) if match is None: raise ValueError("No JSON object/array found in file") json_text = match.group(1) data = json.loads(json_text) df = pd.DataFrame(data) df["timestamp"] = pd.to_datetime(df["tval"], unit="s", utc=True) df.index = df["timestamp"] df.drop(columns=["timestamp", "tval"], inplace=True) df = df.rename(columns={"SME": "sme"}) mask = df["sme"] >= 999998 df.loc[mask, "sme"] = np.nan return df
[docs] def read(self, start_time: datetime, end_time: datetime, download: bool = False) -> pd.DataFrame: """ Read SuperMAG SME data for a given time range. Parameters ---------- start_time : datetime Start time of the data to read. Must be timezone-aware. end_time : datetime End time of the data to read. Must be timezone-aware. download : bool, optional Download missing data files on demand. Defaults to False. Returns ------- :class:`pandas.DataFrame` SuperMAG SME data. """ if start_time > end_time: msg = "start_time must be before end_time" logger.error(msg) raise ValueError(msg) start_time = enforce_utc_timezone(start_time) end_time = enforce_utc_timezone(end_time) file_paths, _ = self._get_processed_file_list(start_time, end_time) t = pd.date_range( datetime(start_time.year, start_time.month, start_time.day), datetime(end_time.year, end_time.month, end_time.day, 23, 59, 00), freq=timedelta(minutes=1), tz=timezone.utc, ) data_out = pd.DataFrame(index=t) data_out["sme"] = np.array([np.nan] * len(t)) data_out["file_name"] = np.array([None] * len(t)) for file_path in file_paths: if not file_path.exists() and download: self.download_and_process(start_time, end_time) if not file_path.exists(): warnings.warn(f"File {file_path} not found") continue df_one_file = self._read_single_file(file_path) data_out = df_one_file.combine_first(data_out) data_out = data_out.truncate( before=start_time - timedelta(minutes=0.9999), after=end_time + timedelta(minutes=0.9999), ) return data_out
def _read_single_file(self, file_path: Path) -> pd.DataFrame: """Read a daily SuperMAG SME file into a DataFrame. Parameters ---------- file_path : Path Path to the file. Returns ------- pd.DataFrame Data from daily SuperMAG SME file. """ df = pd.read_csv(file_path) df.index = pd.to_datetime(df["timestamp"], utc=True) df.drop(columns=["timestamp"], inplace=True) df.index.name = None df["file_name"] = file_path df.loc[df["sme"].isna(), "file_name"] = None return df