Source code for swvo.io.RBMDataSet.identify_orbits
# SPDX-FileCopyrightText: 2026 GFZ Helmholtz Centre for Geosciences
# SPDX-FileContributor: Bernhard Haas
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import typing
from datetime import datetime
from typing import Literal, NamedTuple
import numpy as np
import pandas as pd
from numpy.typing import NDArray
from scipy.interpolate import make_splrep
from scipy.signal import find_peaks
from swvo.io.RBMDataSet import RBMDataSet
[docs]
class Trajectory(NamedTuple):
start: int
end: int
direction: Literal["inbound", "outbound"]
def _identify_orbits(
time: list[datetime], distance: NDArray[np.floating], minimal_distance: int, *, apply_smoothing: bool
) -> list[Trajectory]:
distance_filled = pd.Series(distance).interpolate(method="linear", limit_direction="both").to_numpy()
if apply_smoothing:
timestamps = [t.timestamp() for t in time]
distance_filled = make_splrep(timestamps, distance_filled, s=0)(timestamps)
distance_filled = typing.cast("NDArray[np.floating]", distance_filled)
peaks, _ = find_peaks(distance_filled, distance=minimal_distance)
troughs, _ = find_peaks(-distance_filled, distance=minimal_distance)
extrema = np.sort(np.concatenate((peaks, troughs)))
extrema = typing.cast("NDArray[np.int32]", extrema)
diffs = np.diff(distance_filled)
in_out_bound_label = "inbound" if np.median(diffs[0 : extrema[0]]) < 0 else "outbound"
orbits: list[Trajectory] = [Trajectory(0, int(extrema[0]), in_out_bound_label)]
for i in range(1, len(extrema)):
# print(diffs[extrema[i - 1]:extrema[i]])
in_out_bound_label = "inbound" if np.median(diffs[extrema[i - 1] : extrema[i]]) < 0 else "outbound"
orbits.append(Trajectory(extrema[i - 1] + 1, extrema[i], in_out_bound_label))
in_out_bound_label = "inbound" if np.median(diffs[extrema[-1] :]) < 0 else "outbound"
orbits.append(Trajectory(extrema[-1] + 1, len(distance) - 1, in_out_bound_label))
return orbits
def identify_orbits(
self: RBMDataSet,
orbit_type: Literal["R", "L*"] = "R",
minimal_distance: int = 60,
*,
apply_smoothing: bool = True,
) -> list[Trajectory]:
dist = self.R0 if orbit_type == "R" else self.Lstar[:, -1]
return _identify_orbits(self.datetime, dist, minimal_distance, apply_smoothing=apply_smoothing)