Source code for swvo.io.dst.read_dst_from_multiple_models
# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
from collections.abc import Sequence
from datetime import datetime, timezone
import numpy as np
import pandas as pd
from swvo.io.dst import DSTOMNI, DSTWDC
from swvo.io.exceptions import ModelError
from swvo.io.utils import (
any_nans,
construct_updated_data_frame,
enforce_utc_timezone,
)
logger = logging.getLogger(__name__)
DSTModel = DSTOMNI | DSTWDC
logging.captureWarnings(True)
[docs]
def read_dst_from_multiple_models(
start_time: datetime,
end_time: datetime,
model_order: Sequence[DSTModel] | None = None,
historical_data_cutoff_time: datetime | None = None,
*,
download: bool = False,
) -> pd.DataFrame:
"""
Read DST data from multiple models.
The model order represents the priorities of models. The first model in the
model order is read. If there are still NaNs in the resulting data, the next
model will be read, and so on. For ensemble predictions, a list will be
returned; otherwise, a plain data frame will be returned.
Parameters
----------
start_time : datetime
Start time of the data request.
end_time : datetime
End time of the data request.
model_order : Sequence or None, optional
Order in which data will be read from the models. Defaults to [OMNI, WDC].
historical_data_cutoff_time : datetime or None, optional
Time representing "now". After this time, no data will be taken from
historical models (OMNI, WDC). Defaults to None.
download : bool, optional
Flag indicating whether new data should be downloaded. Defaults to False.
Returns
-------
:class:`pandas.DataFrame`
A data frame containing data for the requested period.
"""
if start_time > end_time:
msg = "start_time must be before end_time"
raise ValueError(msg)
start_time = enforce_utc_timezone(start_time)
end_time = enforce_utc_timezone(end_time)
if historical_data_cutoff_time is not None:
historical_data_cutoff_time = enforce_utc_timezone(historical_data_cutoff_time)
if historical_data_cutoff_time is None:
historical_data_cutoff_time = min(datetime.now(timezone.utc), end_time)
if model_order is None:
model_order = [DSTOMNI(), DSTWDC()]
logger.warning("No model order specified, using default order: OMNI, WDC")
data_out = pd.DataFrame()
for model in model_order:
if not isinstance(model, DSTModel):
raise ModelError(f"Unknown or incompatible model: {type(model).__name__}")
logger.info(f"Reading {model.LABEL} from {start_time} to {end_time}")
data_one_model = model.read(start_time, end_time, download=download)
data_one_model = data_one_model.loc[~data_one_model.index.duplicated(keep="first")]
index_range = pd.date_range(
start=pd.Timestamp(start_time).ceil("h"),
end=pd.Timestamp(end_time).floor("h"),
freq="h",
name="t",
)
data_one_model = data_one_model.reindex(index_range)
data_one_model.loc[data_one_model.index > historical_data_cutoff_time, "dst"] = np.nan
data_one_model = data_one_model.fillna({"file_name": np.nan})
logger.info(f"Setting NaNs in {model.LABEL} from {historical_data_cutoff_time} to {end_time}")
data_out = construct_updated_data_frame(data_out, data_one_model, model.LABEL)
# if no NaNs are present anymore, we don't have to read backups
if not any_nans(data_out):
break
if len(data_out) == 1:
data_out = data_out[0]
return data_out