# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0
"""
Module for handling SWIFT solar wind ensemble data.
"""
import datetime as dt
import json
import logging
import os
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
from swvo.io.utils import enforce_utc_timezone, sw_mag_propagation
logger = logging.getLogger(__name__)
logging.captureWarnings(True)
[docs]
class SWSWIFTEnsemble:
"""
This is a class for SWIFT ensemble data.
Parameters
----------
data_dir : Path | None
Data directory for the SWIFT Ensemble data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument.
If False (default), the passed data_dir is used if provided, otherwise the environment variable is used.
Methods
-------
read
Raises
------
ValueError
Returns `ValueError` if necessary environment variable is not set.
FileNotFoundError
Returns `FileNotFoundError` if the data directory does not exist.
"""
PROTON_MASS = 1.67262192369e-27
ENV_VAR_NAME = "SWIFT_ENSEMBLE_OUTPUT_DIR"
LABEL = "swift"
def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
if prefer_env_var and self.ENV_VAR_NAME in os.environ:
data_dir = Path(os.environ[self.ENV_VAR_NAME])
elif data_dir is None:
if not self.ENV_VAR_NAME or self.ENV_VAR_NAME not in os.environ:
raise ValueError(f"Necessary environment variable {self.ENV_VAR_NAME} not set!")
data_dir = Path(os.environ[self.ENV_VAR_NAME])
self.data_dir = Path(data_dir)
logger.info(f"SWIFT ensemble data directory: {self.data_dir}")
if not self.data_dir.exists():
msg = f"Data directory {self.data_dir} does not exist! Impossible to retrieve data!"
logger.error(msg)
raise FileNotFoundError(msg)
[docs]
def read(
self,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
propagation: bool = False,
truncate: bool = True,
) -> list[pd.DataFrame]:
# It does not make sense to read SWIFT ensemble files from different dates
"""
Read SWIFT ensemble data for the requested period.
Parameters
----------
start_time : datetime
Start time of the data to read. Must be timezone-aware.
end_time : datetime
End time of the data to read. Must be timezone-aware.
If not provided, it defaults to 3 days after the start time.
If `propagation` is True, it defaults to 2 days after the start time.
If `propagation` is False, it defaults to 3 days after the start time.
propagation : bool, optional
Propagate the data from L1 to near-Earth, defaults to False.
truncate : bool, optional
If True, truncate the data to the requested period, defaults to True.
Returns
-------
list[:class:`pandas.DataFrame`]
A list of data frames containing ensemble data for the requested period.
"""
if start_time:
start_time = enforce_utc_timezone(start_time)
if end_time:
end_time = enforce_utc_timezone(end_time)
if start_time is None:
start_time = datetime.now(timezone.utc).replace(microsecond=0, minute=0, second=0)
if end_time is None:
end_time = start_time + timedelta(days=3)
if propagation:
logger.info("Shifting start day by -1 day to account for propagation")
start_time = start_time - timedelta(days=1)
if start_time > end_time:
msg = "start_time must be before end_time"
logger.error(msg)
raise ValueError(msg)
str_date = start_time.strftime("%Y%m%dt0000")
ensemble_folders = sorted(
list((self.data_dir / str_date).glob("*task*")),
key=lambda x: int(x.stem.split("task")[-1]),
)
logger.info(f"Found {len(ensemble_folders)} SWIFT tasks folders...")
gsm_s = []
if len(ensemble_folders) == 0:
msg = f"SWIFT ensemble folder for date {str_date} not found...impossible to read, returning DataFrame with NaNs"
warnings.warn(msg)
data_out = self._nan_dataframe(start_time, end_time)
gsm_s.append(data_out)
for ensemble_folder in ensemble_folders:
try:
logger.info(f"Reading ensemble file: {ensemble_folder}")
gsm_path = ensemble_folder / "SWIFT"
json_files = [f for f in (gsm_path).glob("gsm_*") if f.suffix == ".json"]
file = json_files[0] if len(json_files) > 0 else []
data_gsm = self._read_single_file(file)
if truncate:
data_gsm = data_gsm.truncate(
before=start_time - timedelta(minutes=10),
after=end_time + timedelta(minutes=10),
)
if propagation:
data_gsm = sw_mag_propagation(data_gsm)
data_gsm["file_name"] = data_gsm.apply(self._update_filename, axis=1)
gsm_s.append(data_gsm)
except (IndexError, TypeError):
msg = f"GSM SWIFT output file for date {str_date} and task {ensemble_folder} not found...impossible to read"
warnings.warn(msg)
return gsm_s
# def read_single_output(self, target_time: datetime):
# pass
def _read_single_file(self, file_name, use_old_column_names=False) -> pd.DataFrame:
"""
This function reads one of the two available JSON files of SWIFT output and extracts relevant variables, combining them into a pandas DataFrame.
Parameters
----------
file_name : str
The path of the file to read.
fields : list, optional
List of fields to extract from the DataFrame. The list needs to contain a subset of available fields. If None, all the fields available are retrieved.
Returns
-------
pd.DataFrame
A DataFrame containing the requested variables.
"""
with open(file_name) as f:
data = json.load(f)
time = list(
map(
lambda x: dt.datetime.fromtimestamp(int(x), tz=dt.timezone.utc),
data["arrays"]["Unix time"]["data"],
)
)
ux = np.array(data["arrays"]["Vx"]["data"]) / 1000.0
uy = np.array(data["arrays"]["Vy"]["data"]) / 1000.0
uz = np.array(data["arrays"]["Vz"]["data"]) / 1000.0
bx = np.array(data["arrays"]["Bx"]["data"]) * 1.0e9
by = np.array(data["arrays"]["By"]["data"]) * 1.0e9
bz = np.array(data["arrays"]["Bz"]["data"]) * 1.0e9
temperature = np.array(data["arrays"]["Temperature_ion"]["data"])
speed = np.sqrt(ux**2 + uy**2 + uz**2)
b = np.sqrt(bx**2 + by**2 + bz**2)
n = np.array(data["arrays"]["Rho"]["data"]) / self.PROTON_MASS * 1.0e-6
pdyn = 2e-6 * n * speed**2
if use_old_column_names:
df = pd.DataFrame(
{
"proton_density": n,
"speed": speed,
"b": b,
"temperature": temperature,
"bx": bx,
"by": by,
"bz": bz,
"ux": ux,
"uy": uy,
"uz": uz,
"pdyn": pdyn,
},
index=time,
)
else:
df = pd.DataFrame(
{
"proton_density": n,
"speed": speed,
"bavg": b,
"temperature": temperature,
"bx_gsm": bx,
"by_gsm": by,
"bz_gsm": bz,
"pdyn": pdyn,
},
index=time,
)
df["file_name"] = file_name
return df
def _update_filename(self, row: pd.Series) -> str:
"""Update the filename in the row.
Parameters
----------
row : pd.Series
Returns
-------
str
Updated filename.
"""
if pd.isna(row["file_name"]):
return row["file_name"]
file_date_str = Path(row["file_name"]).stem.split("_")[-1]
file_date = pd.to_datetime(file_date_str, format="%Y-%m-%dt0000").date()
index_date = row.name.date() # ty: ignore[unresolved-attribute]
return "propagated from previous SWIFT FORECAST file" if file_date != index_date else row["file_name"]
def _nan_dataframe(self, start_time, end_time):
t = pd.date_range(start_time, end_time, freq="5min", tz=timezone.utc)
data_out = pd.DataFrame(
{
"proton_density": [np.nan] * len(t),
"speed": [np.nan] * len(t),
"bavg": [np.nan] * len(t),
"temperature": [np.nan] * len(t),
"bx_gsm": [np.nan] * len(t),
"by_gsm": [np.nan] * len(t),
"bz_gsm": [np.nan] * len(t),
"pdyn": [np.nan] * len(t),
"file_name": [np.nan] * len(t),
},
index=t,
)
return data_out