# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0
"""
Module for handling OMNI high resolution data.
"""
import calendar
import logging
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Tuple
import pandas as pd
import requests
from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone
logger = logging.getLogger(__name__)
[docs]
class OMNIHighRes(BaseIO):
"""This is a class for the OMNI High Resolution data.
Parameters
----------
data_dir : Path | None
Data directory for the OMNI High Resolution data. If not provided, it will be read from the environment variable
Methods
-------
download_and_process
read
Raises
------
ValueError
Returns `ValueError` if necessary environment variable is not set.
"""
ENV_VAR_NAME = "OMNI_HIGH_RES_STREAM_DIR"
URL = "https://omniweb.gsfc.nasa.gov/cgi/nx1.cgi"
START_YEAR = 1981
LABEL = "omni"
PARALLEL_DOWNLOAD_THRESHOLD = 10
MAX_PARALLEL_DOWNLOADS = 10
[docs]
def download_and_process(
self,
start_time: datetime,
end_time: datetime,
cadence_min: int = 1,
reprocess_files: bool = False,
) -> None:
"""Download and process OMNI High Resolution data files.
Parameters
----------
start_time : datetime
Start time for data download.
end_time : datetime
End time for data download.
cadence_min : int, optional
Cadence of the data in minutes, defaults to 1
reprocess_files : bool, optional
Downloads and processes the files again, defaults to False, by default False
Returns
-------
None
Raises
------
AssertionError
Raises `AssertionError` if the cadence is not 1 or 5 minutes.
"""
assert cadence_min == 1 or cadence_min == 5, (
"Only 1 or 5 minute cadence can be chosen for high resolution omni data."
)
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time, cadence_min)
download_tasks = []
for file_path, time_interval in zip(file_paths, time_intervals):
if file_path.exists() and not reprocess_files:
continue
download_tasks.append((file_path, time_interval))
if len(download_tasks) > self.PARALLEL_DOWNLOAD_THRESHOLD:
max_workers = min(self.MAX_PARALLEL_DOWNLOADS, len(download_tasks))
logger.info(f"Downloading {len(download_tasks)} OMNI high resolution files with {max_workers} workers.")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(self._download_and_process_single_file, file_path, time_interval, cadence_min)
for file_path, time_interval in download_tasks
]
for future in as_completed(futures):
future.result()
return
for file_path, time_interval in download_tasks:
self._download_and_process_single_file(file_path, time_interval, cadence_min)
def _download_and_process_single_file(
self,
file_path,
time_interval: Tuple[datetime, datetime],
cadence_min: int,
) -> None:
"""Download and process one monthly OMNI High Resolution file."""
# Create directory structure if it doesn't exist
file_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
try:
data = self._get_data_from_omni(
start=time_interval[0],
end=time_interval[1],
cadence=cadence_min,
)
logger.debug("Processing file ...")
processed_df = self._process_single_month(data, original_end=time_interval[1], cadence_min=cadence_min)
# Do not save empty DataFrames — no data available for this interval
if processed_df.empty:
logger.warning(f"Skipping save for {file_path}: no data available for this interval.")
return
processed_df.to_csv(tmp_path, index=True, header=True)
tmp_path.replace(file_path)
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}")
if tmp_path.exists():
tmp_path.unlink()
[docs]
def read(
self,
start_time: datetime,
end_time: datetime,
cadence_min: int = 1,
download: bool = False,
) -> pd.DataFrame:
"""
Read OMNI High Resolution data for the given time range.
Parameters
----------
start_time : datetime
Start time for reading data.
end_time : datetime
End time for reading data.
cadence_min : int, optional
Cadence of the data in minutes, defaults to 1
download : bool, optional
Download data on the go, defaults to False.
Returns
-------
:class:`pandas.DataFrame`
OMNI High Resolution data.
Raises
------
AssertionError
Raises `AssertionError` if the cadence is not 1 or 5 minutes.
"""
assert cadence_min == 1 or cadence_min == 5, (
"Only 1 or 5 minute cadence can be chosen for high resolution omni data."
)
if start_time > end_time:
msg = "start_time must be before end_time"
logger.error(msg)
raise ValueError(msg)
start_time = enforce_utc_timezone(start_time)
end_time = enforce_utc_timezone(end_time)
if start_time < datetime(self.START_YEAR, 1, 1, tzinfo=timezone.utc):
logger.warning(
"Start date chosen falls behind the existing data. Moving start date to first"
" available mission files..."
)
start_time = datetime(self.START_YEAR, 1, 1, tzinfo=timezone.utc)
assert start_time < end_time
file_paths, _ = self._get_processed_file_list(start_time, end_time, cadence_min)
dfs = []
for file_path in file_paths:
if not file_path.exists():
if download:
self.download_and_process(start_time, end_time, cadence_min=cadence_min)
else:
logger.warning(f"File {file_path} not found")
continue
# Re-check after attempted download — file may not exist if no data
# was available for this interval (e.g. beyond the OMNI data range)
if not file_path.exists():
logger.warning(f"File {file_path} not available after download attempt, skipping.")
continue
dfs.append(self._read_single_file(file_path))
data_out = pd.concat(dfs, ignore_index=False)
if not data_out.empty:
data_out.index = enforce_utc_timezone(data_out.index)
data_out = data_out.truncate(
before=start_time - timedelta(minutes=cadence_min - 0.0000001),
after=end_time + timedelta(minutes=cadence_min + 0.0000001),
)
return data_out
def _get_processed_file_list(
self, start_time: datetime, end_time: datetime, cadence_min: float
) -> Tuple[List, List]:
"""Get list of file paths and their corresponding time intervals.
Parameters
----------
start_time : datetime
Start time for the data.
end_time : datetime
End time for the data.
cadence_min : float
Cadence of the data in minutes.
Returns
-------
Tuple[List, List]
List of file paths and time intervals.
"""
file_paths = []
time_intervals = []
# Start from the first day of the start_time month
current_date = start_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# Check if end_time is within cadence_min of the next month boundary
# This ensures we include the next month's file if needed
end_year = end_time.year
end_month = end_time.month
# Calculate next month start
if end_month == 12:
next_month_start = datetime(end_year + 1, 1, 1, 0, 0, 0, tzinfo=end_time.tzinfo)
else:
next_month_start = datetime(end_year, end_month + 1, 1, 0, 0, 0, tzinfo=end_time.tzinfo)
time_diff_to_next_month = (next_month_start - end_time).total_seconds() / 3600
# If end_time is within `cadence_min` of next month, include the next month
cadence_hours = cadence_min / 60
include_next_month = time_diff_to_next_month <= cadence_hours
while current_date <= end_time or (include_next_month and current_date == next_month_start):
year = current_date.year
month = current_date.month
# directory: YYYY/
year_dir = self.data_dir / f"{year:04d}"
# Create file path
file_path = year_dir / f"OMNI_HIGH_RES_{cadence_min}min_{year:04d}{month:02d}.csv"
file_paths.append(file_path)
# Create time interval for current month
interval_start = datetime(year, month, 1, 0, 0, 0, tzinfo=current_date.tzinfo)
# Get last day of the month
last_day = calendar.monthrange(year, month)[1]
interval_end = datetime(year, month, last_day, 23, 59, 59, tzinfo=current_date.tzinfo)
time_intervals.append((interval_start, interval_end))
# Move to next month
if month == 12:
current_date = current_date.replace(year=year + 1, month=1)
else:
current_date = current_date.replace(month=month + 1)
# Break condition to avoid infinite loop
if include_next_month and current_date > next_month_start:
break
return file_paths, time_intervals
def _process_single_month(
self, data: list[str], original_end: Optional[datetime] = None, cadence_min: int = 1
) -> pd.DataFrame:
"""Process monthly OMNI High Resolution data to a DataFrame.
Parameters
----------
data : list[str]
Raw data lines from the OMNI service.
original_end : datetime, optional
The original requested end time. Used to build a NaN-filled DataFrame
when no data is available (e.g. the interval is beyond the OMNI data range).
Returns
-------
pd.DataFrame
Monthly OMNI High Resolution data. Returns a NaN-filled DataFrame
up to ``original_end`` if no data is available, or an empty DataFrame
if ``original_end`` is not provided.
"""
columns = ["bavg", "bx_gsm", "by_gsm", "bz_gsm", "speed", "proton_density", "temperature", "pdyn", "sym-h"]
# Empty data list signals that no data is available for this interval
if not data:
if original_end is None:
return pd.DataFrame(columns=columns)
# Build a NaN-filled DataFrame with the correct timestamps up to original_end
index = pd.date_range(
start=original_end.replace(day=1, hour=0, minute=0, second=0, microsecond=0),
end=original_end,
freq=pd.tseries.frequencies.to_offset(f"{cadence_min}min"),
tz=original_end.tzinfo,
)
return pd.DataFrame(pd.NA, index=index, columns=columns)
header_line = next(line for line in data if line.strip().startswith("YYYY"))
columns = header_line.split()
data_lines = [line for line in data if line.strip().startswith(("19", "20"))]
if not data_lines:
msg = "DataFrame is empty."
logger.error(msg)
raise ValueError(msg)
df = pd.DataFrame([line.split() for line in data_lines], columns=columns)
df = df.apply(pd.to_numeric)
df["timestamp"] = df["YYYY"].map(str).apply(lambda x: x + "-01-01 ") + df["HR"].map(str).apply(
lambda x: x.zfill(2)
)
df["timestamp"] += df["MN"].map(str).apply(lambda x: ":" + x.zfill(2) + ":00")
df["timestamp"] = pd.to_datetime(df["timestamp"])
df["timestamp"] = df["timestamp"] + df["DOY"].apply(lambda x: timedelta(days=int(x) - 1))
df.drop(columns=["YYYY", "HR", "MN", "DOY"], inplace=True)
df.set_index("timestamp", inplace=True)
maxes = {
"bavg": 9999.9,
"bx_gsm": 9999.9,
"by_gsm": 9999.9,
"bz_gsm": 9999.9,
"speed": 99999.8,
"proton_density": 999.8,
"temperature": 9999998.0,
"pdyn": 99.0,
"sym-h": 99999.0,
}
df.columns = maxes.keys()
for col, max_val in maxes.items():
df[col] = df[col].where(df[col] < max_val, other=pd.NA)
if df.empty:
msg = "DataFrame is empty after processing the month."
logger.error(msg)
raise ValueError(msg)
return df
def _read_single_file(self, file_path) -> pd.DataFrame:
"""Read monthly OMNI High Resolution file to a DataFrame.
Parameters
----------
file_path : Path
Path to the file.
Returns
-------
pd.DataFrame
Data from monthly High Resolution file.
"""
df = pd.read_csv(file_path, index_col=0)
df.index = pd.to_datetime(df.index, utc=True)
nan_mask = df.isna().all(axis=1)
df["file_name"] = file_path
df.loc[nan_mask, "file_name"] = None
return df
def _get_data_from_omni(self, start: datetime, end: datetime, cadence: int = 1) -> list:
"""
Fetches data from NASA's OMNIWeb service.
If an invalid date range error is returned, it automatically finds the
suggested valid end date and retries the request. If the suggested end
date falls before the start date, an empty list is returned to signal
that no data is available for the requested interval.
"""
payload = {
"activity": "retrieve",
"start_date": start.strftime("%Y%m%d"),
"end_date": end.strftime("%Y%m%d"),
}
common_vars = {"vars": ["13", "14", "17", "18", "21", "25", "26", "27", "41"]}
if cadence == 1:
params = {"res": "min", "spacecraft": "omni_min"}
payload.update(params)
payload.update(common_vars) # ty: ignore[no-matching-overload]
elif cadence == 5:
params = {"res": "5min", "spacecraft": "omni_5min"}
payload.update(params)
payload.update(common_vars) # ty: ignore[no-matching-overload]
else:
msg = f"Invalid cadence: {cadence}. Only 1 or 5 minutes are supported."
logger.error(msg)
raise ValueError(msg)
logger.debug(f"Fetching data from {self.URL} with payload: {payload}")
response = requests.post(self.URL, data=payload)
response.raise_for_status()
data = response.text.splitlines()
if data and "Error" in data[0]:
logger.warning("Received an error response from the server.")
for line in data:
if "correct range" in line:
# Use regex to find the valid date range (e.g., YYYYMMDD - YYYYMMDD)
match = re.search(r"correct range: \d{8} - (\d{8})", line)
if match:
new_end_date_str = match.group(1)
new_end_date = datetime.strptime(new_end_date_str, "%Y%m%d").replace(tzinfo=timezone.utc)
logger.warning(
f"Invalid date range detected. Found suggested end date: {new_end_date.strftime('%Y-%m-%d')}"
)
# If the suggested end date is before the start date, no data is available
# for this range — return an empty list to signal an empty DataFrame
if new_end_date < start:
logger.warning(
f"Suggested end date {new_end_date.strftime('%Y-%m-%d')} is before "
f"start date {start.strftime('%Y-%m-%d')}. No data available for this range."
)
return []
# Recursively call the function with the original start date and the new end date
return self._get_data_from_omni(start=start, end=new_end_date, cadence=cadence)
msg = f"An unspecified error occurred: {data}"
logger.error(msg)
raise ValueError(msg)
return data