# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0
"""
Module for handling Niemegk Kp data.
"""
import logging
import os
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
from typing import List, Tuple
import numpy as np
import pandas as pd
import requests
from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone
logger = logging.getLogger(__name__)
logging.captureWarnings(True)
[docs]
class KpNiemegk(BaseIO):
"""A class to handle Niemegk Kp data.
Parameters
----------
data_dir : Path | None
Data directory for the Niemegk Kp data. If not provided, it will be read from the environment variable
Methods
-------
download_and_process
read
Raises
------
ValueError
Returns `ValueError` if necessary environment variable is not set.
"""
ENV_VAR_NAME = "RT_KP_NIEMEGK_STREAM_DIR"
URL = "https://kp.gfz.de/fileadmin/files_for_gfz_cms/"
NAME = "Kp_ap_nowcast.txt"
DAYS_TO_SAVE_EACH_FILE = 3
LABEL = "niemegk"
[docs]
def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None:
"""Download and process Niemegk Kp data file.
Parameters
----------
start_time : datetime
Start time of the data to download and process.
end_time : datetime
End time of the data to download and process.
reprocess_files : bool, optional
Downloads and processes the files again, defaults to False, by default False
Raises
------
FileNotFoundError
Raise `FileNotFoundError` if the file is not downloaded successfully.
"""
if start_time < datetime.now(timezone.utc) - timedelta(days=30):
logger.info("We can only download and process a Kp Niemegk file from the last 30 days!")
return
temporary_dir = Path("./temp_kp_niemegk_wget")
temporary_dir.mkdir(exist_ok=True, parents=True)
logger.debug(f"Downloading file {self.URL + self.NAME} ...")
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)
for file_path, time_interval in zip(file_paths, time_intervals):
if file_path.exists() and not reprocess_files:
continue
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
try:
self._download(temporary_dir)
# check if download was successfull
if os.stat(str(temporary_dir / self.NAME)).st_size == 0:
raise FileNotFoundError(f"Error while downloading file: {self.URL + self.NAME}!")
logger.debug("Processing file ...")
processed_df = self._process_single_file(temporary_dir)
data_single_file = processed_df[
(processed_df.index >= time_interval[0]) & (processed_df.index <= time_interval[1])
]
if len(data_single_file.index) == 0:
continue
file_path.parent.mkdir(parents=True, exist_ok=True)
data_single_file.to_csv(tmp_path, index=True, header=False)
tmp_path.replace(file_path)
logger.debug(f"Saving processed file {file_path}")
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}")
if tmp_path.exists():
tmp_path.unlink()
continue
rmtree(temporary_dir, ignore_errors=True)
def _download(self, temporary_dir):
response = requests.get(self.URL + self.NAME)
response.raise_for_status()
with open(temporary_dir / self.NAME, "w") as f:
f.write(response.text)
[docs]
def read(self, start_time: datetime, end_time: datetime, download: bool = False) -> pd.DataFrame:
"""Read Niemegk Kp data for the specified time range.
Parameters
----------
start_time : datetime
Start time of the data to read.
end_time : datetime
End time of the data to read.
download : bool, optional
Download data on the go, defaults to False.
Returns
-------
:class:`pandas.DataFrame`
Niemegk Kp dataframe.
"""
if start_time > end_time:
msg = "start_time must be before end_time"
logger.error(msg)
raise ValueError(msg)
start_time = enforce_utc_timezone(start_time)
end_time = enforce_utc_timezone(end_time)
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)
# initialize data frame with NaNs
t = pd.date_range(
datetime(start_time.year, start_time.month, start_time.day),
datetime(end_time.year, end_time.month, end_time.day, 23, 59, 59),
freq=timedelta(hours=3),
)
data_out = pd.DataFrame(index=t)
data_out.index = enforce_utc_timezone(data_out.index)
data_out["kp"] = np.array([np.nan] * len(t))
data_out["file_name"] = np.array([None] * len(t))
for file_path, time_interval in zip(file_paths, time_intervals):
if not file_path.exists():
if download:
self.download_and_process(start_time, end_time)
# if we request a date in the future, the file will still not be found here
if not file_path.exists():
warnings.warn(f"File {file_path} not found")
continue
df_one_file = self._read_single_file(file_path)
# combine the new file with the old ones, replace all values present in df_one_file in data_out
data_out = df_one_file.combine_first(data_out)
data_out = data_out.truncate(
before=start_time - timedelta(hours=2.9999),
after=end_time + timedelta(hours=2.9999),
)
return data_out
def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]:
"""Get list of file paths and their corresponding time intervals.
Returns
-------
Tuple[List, List]
List of file paths and time intervals.
"""
file_paths = []
time_intervals = []
current_time = datetime(
start_time.year,
start_time.month,
start_time.day,
0,
0,
0,
tzinfo=timezone.utc,
)
end_time = datetime(end_time.year, end_time.month, end_time.day, 0, 0, 0, tzinfo=timezone.utc) + timedelta(
days=1
)
while current_time <= end_time:
file_path = (
self.data_dir
/ current_time.strftime("%Y/%m")
/ f"NIEMEGK_KP_NOWCAST_{current_time.strftime('%Y%m%d')}.csv"
)
file_paths.append(file_path)
interval_start = current_time - timedelta(days=self.DAYS_TO_SAVE_EACH_FILE - 1)
interval_end = datetime(
current_time.year,
current_time.month,
current_time.day,
23,
59,
59,
tzinfo=timezone.utc,
)
time_intervals.append((interval_start, interval_end))
current_time += timedelta(days=1)
return file_paths, time_intervals
def _read_single_file(self, file_path) -> pd.DataFrame:
"""Read Nimegk Kp file to a DataFrame.
Parameters
----------
file_path : Path
Path to the file.
Returns
-------
pd.DataFrame
Data from Nimegk Kp file.
"""
df = pd.read_csv(file_path, names=["t", "kp"])
df["t"] = pd.to_datetime(df["t"])
df.index = df["t"]
df.drop(labels=["t"], axis=1, inplace=True)
df.index = enforce_utc_timezone(df.index)
df["file_name"] = file_path
df.loc[df["kp"].isna(), "file_name"] = None
return df
def _process_single_file(self, temporary_dir: Path) -> pd.DataFrame:
"""Process Nimegk Kp file to a DataFrame.
Parameters
----------
file_path : Path
Path to the file.
Returns
-------
pd.DataFrame
Nimegk Kp data.
"""
header = [
"#YYY",
"MM",
"DD",
"hh.h",
"hh._m",
"days",
"days_m",
"Kp",
"ap",
"D",
]
data = pd.read_csv(temporary_dir / self.NAME, names=header, sep=r"\s+", comment="#")
data["t"] = pd.to_datetime(
data[["#YYY", "MM", "DD", "hh.h"]].astype(str).agg("-".join, axis=1),
format="%Y-%m-%d-%H.%f",
)
data["kp"] = data["Kp"]
data.drop(
labels=header,
axis=1,
inplace=True,
)
data.index.rename("t", inplace=True)
data.index = data["t"]
data.index = enforce_utc_timezone(data.index)
data.drop(labels=["t"], axis=1, inplace=True)
data.dropna(inplace=True)
data = data[data["kp"] != -1.0]
data["kp"] = np.round(data["kp"], decimals=2)
return data