Source code for swvo.io.kp.read_kp_from_multiple_models

# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0

"""Function to read Kp from multiple models."""

from __future__ import annotations

import logging
from collections.abc import Sequence
from datetime import datetime, timedelta, timezone
from typing import Literal

import numpy as np
import pandas as pd

from swvo.io.exceptions import ModelError
from swvo.io.kp import KpBGS, KpEnsemble, KpNiemegk, KpOMNI, KpSIDC, KpSWPC
from swvo.io.utils import (
    any_nans,
    construct_updated_data_frame,
    enforce_utc_timezone,
)

logger = logging.getLogger(__name__)

KpModel = KpEnsemble | KpNiemegk | KpOMNI | KpSWPC | KpBGS | KpSIDC

logging.captureWarnings(True)


[docs] def read_kp_from_multiple_models( # noqa: PLR0913 start_time: datetime, end_time: datetime, model_order: Sequence[KpModel] | None = None, reduce_ensemble: Literal["mean", "median"] | None = None, historical_data_cutoff_time: datetime | None = None, *, download: bool = False, recurrence: bool = False, rec_model_order: Sequence[KpOMNI | KpNiemegk | KpBGS] | None = None, fill_average: bool = False, ) -> pd.DataFrame | list[pd.DataFrame]: """Read Kp data from multiple models. The model order determines the priority of models. Data is read from the first model in the model order. If there are still NaNs in the resulting data, the next model is read, and so on. For ensemble predictions, a list of data frames is returned; otherwise, a single data frame is returned. Parameters ---------- start_time : datetime The start time of the data request. end_time : datetime The end time of the data request. model_order : Sequence or None, optional The order in which data will be read from the models. Defaults to [OMNI, Niemegk, Ensemble, SWPC]. reduce_ensemble : {"mean", "median"} or None, optional The method to reduce ensembles to a single time series. Can be "mean", "median", or None. Defaults to None. historical_data_cutoff_time : datetime or None, optional Represents "now". After this time, no data will be taken from historical models (OMNI, Niemegk). Defaults to None. download : bool, optional Flag to decide whether new data should be downloaded. Defaults to False. Also applies to recurrence filling. recurrence : bool, optional If True, fill missing values using 27-day recurrence from historical models (OMNI, Niemegk). Defaults to False. rec_model_order : Sequence[KpOMNI | KpNiemegk | KpBGS], optional The order in which historical models will be used for 27-day recurrence filling. Defaults to [OMNI, Niemegk]. fill_average : bool, optional Flag which decides whether to fill missing values with the average Kp=4, defaults to False. Returns ------- Union[:class:`pandas.DataFrame`, list[:class:`pandas.DataFrame`]] A data frame or a list of data frames containing data for the requested period. """ if start_time > end_time: msg = "start_time must be before end_time" raise ValueError(msg) start_time = enforce_utc_timezone(start_time) end_time = enforce_utc_timezone(end_time) if historical_data_cutoff_time is not None: historical_data_cutoff_time = enforce_utc_timezone(historical_data_cutoff_time) if historical_data_cutoff_time is None: historical_data_cutoff_time = min(datetime.now(timezone.utc), end_time) if model_order is None: model_order = [KpOMNI(), KpNiemegk(), KpEnsemble(), KpSWPC()] logger.warning("No model order specified, using default order: OMNI, Niemegk, Ensemble, SWPC") data_out = [pd.DataFrame()] for model in model_order: if not isinstance(model, KpModel): raise ModelError(f"Unknown or incompatible model: {type(model).__name__}") data_one_model = _read_from_model( model, start_time, end_time, historical_data_cutoff_time, reduce_ensemble, # ty:ignore[invalid-argument-type] download=download, ) # If an ensemble is already present in `data_out` and the current model is SWPC # which provides a single deterministic forecast, prefer the ensemble and skip # merging SWPC when any timestamp overlaps to avoid mismatched ensemble sizes. if isinstance(model, KpSWPC): temp_list = data_one_model if isinstance(data_one_model, list) else [data_one_model] if isinstance(data_out, list) and len(data_out) > 1 and len(temp_list) == 1: try: swpc_idx = temp_list[0].index # if any ensemble member shares an index with the SWPC forecast, keep ensemble overlap_found = any(len(df.index.intersection(swpc_idx)) > 0 for df in data_out) if overlap_found: logger.info("Keeping existing ensemble; discarding SWPC because indices overlap.") continue except Exception as e: # If any unexpected issue occurs during index comparison, fall back # to the default merging behavior. logger.error(f"Error occurred while comparing indices: {e}") pass data_out = construct_updated_data_frame(data_out, data_one_model, model.LABEL) if not any_nans(data_out): break if recurrence: logger.info( "Recurrence filling enabled. Filling missing values using 27-day recurrence from historical models." ) if rec_model_order is None: rec_model_order = [m for m in model_order if isinstance(m, (KpOMNI, KpNiemegk, KpBGS))] for i, df in enumerate(data_out): if not df.empty: data_out[i] = _recursive_fill_27d_historical(df, download, rec_model_order) if fill_average: logger.info("Average filling enabled. Filling remaining missing values with average Kp of 4.") for i, df in enumerate(data_out): if not df.empty: data_out[i] = _fill_average(df) if len(data_out) == 1: data_out = data_out[0] return data_out
def _read_from_model( # noqa: PLR0913 model: KpModel, start_time: datetime, end_time: datetime, historical_data_cutoff_time: datetime, reduce_ensemble: str, *, download: bool, ) -> list[pd.DataFrame] | pd.DataFrame: """Reads Kp data from a given model within the specified time range. Parameters ---------- model : KpModel The model from which to read the Kp data. start_time : datetime The start time of the data range. end_time : datetime The end time of the data range. historical_data_cutoff_time : datetime Represents "now". Used for defining boundaries for historical or forecast data. reduce_ensemble : str The method to reduce ensemble data (e.g., "mean"). If None, ensemble members are not reduced. download : bool, optional Whether to download new data or not. Returns ------- list[pd.DataFrame] | pd.DataFrame A single data frame or a list of data frames containing the model data. """ # Read from historical models if isinstance(model, (KpOMNI, KpNiemegk, KpBGS)): data_one_model = _read_historical_model( model, start_time, end_time, historical_data_cutoff_time, download=download, ) # Forecasting models are called with synthetic now time if isinstance(model, (KpSWPC, KpSIDC)): logger.info( f"Reading {model.LABEL} from {historical_data_cutoff_time.replace(hour=0, minute=0, second=0)} to {end_time}\noriginal historical_data_cutoff_time: {historical_data_cutoff_time}" ) if end_time - historical_data_cutoff_time > timedelta(days=3): end_time = historical_data_cutoff_time + timedelta(days=2, hours=21) logger.warning( f"end_time is more than 3 days after historical_data_cutoff_time, setting end_time to {end_time}" ) data_one_model = [ model.read( historical_data_cutoff_time.replace(hour=0, minute=0, second=0), end_time, download=download, ) ] if isinstance(model, KpEnsemble): data_one_model = _read_latest_ensemble_files(model, historical_data_cutoff_time, end_time) num_ens_members = len(data_one_model) if num_ens_members > 0 and reduce_ensemble is not None: data_one_model = _reduce_ensembles(data_one_model, reduce_ensemble) # ty: ignore[invalid-argument-type] return data_one_model def _read_historical_model( model: KpOMNI | KpNiemegk | KpBGS, start_time: datetime, end_time: datetime, historical_data_cutoff_time: datetime, *, download: bool, ) -> pd.DataFrame: """Reads Kp data from historical models (KpOMNI or KpNiemegk or KpBGS) within the specified time range. Parameters ---------- model : KpOMNI | KpNiemegk | KpBGS The historical model from which to read the data. start_time : datetime The start time of the data range. end_time : datetime The end time of the data range. historical_data_cutoff_time : datetime Represents "now". Data after this time is set to NaN. download : bool, optional Whether to download new data or not. Returns ------- pd.DataFrame A data frame containing the model data with future values (after historical_data_cutoff_time) set to NaN. Raises ------ TypeError If the provided model is not an instance of KpOMNI or KpNiemegk or KpBGS. """ if not isinstance(model, (KpOMNI, KpNiemegk, KpBGS)): msg = "Encountered invalide model type in read historical model!" raise TypeError(msg) logger.info(f"Reading {model.LABEL} from {start_time} to {end_time}") data_one_model = model.read(start_time, end_time, download=download) # set nan for 'future' values data_one_model.loc[historical_data_cutoff_time + timedelta(hours=3) : end_time] = np.nan logger.info(f"Setting NaNs in {model.LABEL} from {historical_data_cutoff_time + timedelta(hours=3)} to {end_time}") return data_one_model def _read_latest_ensemble_files( model: KpEnsemble, historical_data_cutoff_time: datetime, end_time: datetime, ) -> list[pd.DataFrame]: """ Reads the most recent Kp ensemble data file available from the specified model. If the file for the target time is not found, the function iterates backward in hourly increments, up to 3 days, until a valid file is located. Ensures that the last index of every dataframe is the next higher multiple of 3 hours than the target time. Parameters ---------- model : KpEnsemble The ensemble model from which to read the data. historical_data_cutoff_time : datetime Represents "now". The function starts searching for files from this time. end_time : datetime The end time of the data range. Returns ------- list[pd.DataFrame] A list of data frames containing ensemble data for the specified range. """ target_time = historical_data_cutoff_time data_one_model = [pd.DataFrame(data={"kp": []})] while target_time > (historical_data_cutoff_time - timedelta(days=3)) and target_time < end_time: target_time = target_time.replace(minute=0, second=0) data_one_model = model.read(target_time, end_time) if data_one_model[0]["kp"].isna().all(): target_time -= timedelta(hours=1) continue break logger.info(f"Read PAGER Kp ensemble from {target_time} to {end_time}") # Ensure the last index of every DataFrame is the next higher multiple of 3 hours than target_time adjusted_data = [] for df in data_one_model: if not df.empty or not df.isna().all().all(): if df.index[-1] < end_time and (df.index[-1] - end_time) < timedelta(hours=3): df.loc[df.index[-1] + timedelta(hours=3)] = df.loc[df.index[-1]] adjusted_data.append(df) else: adjusted_data = data_one_model return adjusted_data def _reduce_ensembles(data_ensembles: list[pd.DataFrame], method: Literal["mean", "median"]) -> pd.DataFrame: """ Reduce a list of data frames representing ensemble data to a single data frame using the provided method. Parameters ---------- data_ensembles : list[pd.DataFrame] A list of data frames containing ensemble data. method : {"mean", "median"} The method to reduce the ensemble data. Returns ------- pd.DataFrame A data frame containing the reduced ensemble data. Raises ------ NotImplementedError If the provided reduction method is not implemented. """ if method == "mean": kp_mean_ensembles = [] for it, _ in enumerate(data_ensembles[0].index): data_curr_time = [ data_one_ensemble.loc[data_one_ensemble.index[it], "kp"] for data_one_ensemble in data_ensembles ] kp_mean_ensembles.append(np.mean(data_curr_time)) data_reduced = pd.DataFrame(index=data_ensembles[0].index, data={"kp": kp_mean_ensembles}) elif method == "median": kp_median_ensembles = [] for it, _ in enumerate(data_ensembles[0].index): data_curr_time = [ data_one_ensemble.loc[data_one_ensemble.index[it], "kp"] for data_one_ensemble in data_ensembles ] kp_median_ensembles.append(np.median(data_curr_time)) data_reduced = pd.DataFrame(index=data_ensembles[0].index, data={"kp": kp_median_ensembles}) else: msg = f"This reduction method has not been implemented yet: {method}!" raise NotImplementedError(msg) return data_reduced def _recursive_fill_27d_historical(df, download, historical_models): """Recursively fill missing values in using OMNI/Niemegk for (`date` - 27 days). Parameters ---------- df : pd.DataFrame DataFrame to fill with gaps. download : bool Download new data or not. historical_models : list[KpOMNI | KpNiemegk | KpBGS] List of historical models to use for filling gaps. value_col : str, optional _description_, by default "kp" Returns ------- pd.DataFrame DataFrame with gaps filled using 27d recurrence. """ df = df.copy() value_col = "kp" missing = df.index[df[value_col].isna()] tried = set() while len(missing) > 0: fill_map = {} for idx in missing: prev_idx = idx - timedelta(days=27) if prev_idx not in tried: # Try each historical model in priority order for model in historical_models: prev_data = model.read( prev_idx - timedelta(days=3), prev_idx + timedelta(days=3), download=download ) if not prev_data.empty and not pd.isna(prev_data.loc[prev_idx, value_col]): fill_map[idx] = ( prev_data.loc[prev_idx, value_col], model.LABEL, prev_data.loc[prev_idx, "file_name"], ) break tried.add(prev_idx) for idx, (val, label, fname) in fill_map.items(): df.loc[idx, value_col] = val df.loc[idx, "model"] = f"{label}_recurrence" df.loc[idx, "file_name"] = fname # Update missing for next recursion missing = df.index[df[value_col].isna()] if not fill_map: break return df def _fill_average(df) -> pd.DataFrame: """Recursively fill missing values with average Kp=4""" df = df.copy() value_col = "kp" missing = df.index[df[value_col].isna()] df.loc[missing, value_col] = 4 df.loc[missing, "model"] = "average_fill" df.loc[missing, "file_name"] = "average_fill" return df