Source code for seiscat.plot.plot_timeline_utils

# -*- coding: utf8 -*-
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Common utility functions for timeline plotting.

:copyright:
    2022-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
    GNU General Public License v3.0 or later
    (https://www.gnu.org/licenses/gpl-3.0-standalone.html)
"""
import re
import numpy as np
from datetime import datetime, timezone
from .plot_utils import get_event_identifier, to_finite_float

# time constants in seconds
ONE_MINUTE_SECONDS = 60.0
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
ONE_DAY_SECONDS = 24 * ONE_HOUR_SECONDS
ONE_WEEK_SECONDS = 7 * ONE_DAY_SECONDS
AVERAGE_MONTH_DAYS = 30.4375
AVERAGE_YEAR_DAYS = 365.25
ONE_MONTH_SECONDS = AVERAGE_MONTH_DAYS * ONE_DAY_SECONDS
ONE_YEAR_SECONDS = AVERAGE_YEAR_DAYS * ONE_DAY_SECONDS


def _utcdatetime_to_datetime(utcdt):
    """Convert an obspy UTCDateTime to a timezone-aware Python datetime."""
    return datetime.fromtimestamp(float(utcdt), tz=timezone.utc)


[docs] def get_event_times_and_values(events, attribute): """ Extract event times and the value of a given attribute. Events for which the attribute is missing or non-numeric are silently skipped. :param events: EventList of Event dicts :param attribute: name of the event attribute to extract :returns: list of (datetime, float) tuples, sorted by time """ return [ (dt, val) for dt, val, _event in get_event_times_values_and_events( events, attribute ) ]
[docs] def get_event_times_values_and_events(events, attribute, colorby=None): """ Extract event times, numeric attribute values, and source events. Events for which the plotted attribute is undefined/non-numeric are skipped. If ``colorby`` is provided and differs from ``attribute``, events with undefined/non-numeric color values are also skipped. :param events: EventList of Event dicts :param attribute: name of the event attribute to extract :param colorby: optional name of the color attribute :returns: list of (datetime, float, event) tuples, sorted by time """ result = [] for event in events: attr_val = to_finite_float(event.get(attribute)) if attr_val is None: event_id = get_event_identifier(event) print( f'Skipping event "{event_id}" for timeline plot: ' f'attribute "{attribute}" is not defined.' ) continue if colorby is not None and colorby != attribute: color_val = to_finite_float(event.get(colorby)) if color_val is None: event_id = get_event_identifier(event) print( f'Skipping event "{event_id}" for timeline plot: ' f'color attribute "{colorby}" is not defined.' ) continue result.append( (_utcdatetime_to_datetime(event['time']), attr_val, event) ) result.sort(key=lambda x: x[0]) return result
# --------------------------------------------------------------------------- # Binning helpers # --------------------------------------------------------------------------- def _parse_bin_duration(spec): """ Parse a bin-width specifier string into seconds. Accepted formats: ``Nd`` (days), ``Nw`` (weeks), ``Nm`` (months, ~30.4 d), ``Ny`` (years, ~365.25 d). N may be a float. :param spec: string specifier, e.g. ``'7d'``, ``'1m'``, ``'0.5y'`` :returns: duration in seconds, or ``None`` if the string is not recognised """ m = re.match(r'^(\d+(?:\.\d+)?)\s*([dwmy])$', spec.strip().lower()) if m is None: return None value = float(m[1]) unit = m[2] mapping = { 'd': ONE_DAY_SECONDS, 'w': ONE_WEEK_SECONDS, 'm': ONE_MONTH_SECONDS, 'y': ONE_YEAR_SECONDS, } return value * mapping[unit] def _auto_bin_seconds(time_span_seconds): """ Choose an appropriate bin width (in seconds) given the total time span. Rules: * span ≤ 1 week → 1-hour bins * span ≤ 30 days → 1-day bins * span ≤ 1 year → 1-week bins * span ≤ 5 years → 1-month bins (~30.4 d) * span > 5 years → 1-year bins (~365.25 d) """ if time_span_seconds <= ONE_WEEK_SECONDS: return ONE_HOUR_SECONDS if time_span_seconds <= 30 * ONE_DAY_SECONDS: return ONE_DAY_SECONDS if time_span_seconds <= ONE_YEAR_SECONDS: return ONE_WEEK_SECONDS # sourcery skip: reintroduce-else if time_span_seconds <= 5 * ONE_YEAR_SECONDS: return ONE_MONTH_SECONDS return ONE_YEAR_SECONDS
[docs] def bin_events_by_time(events, bins_spec=None): """ Bin events into time intervals and count events in each bin. :param events: EventList of Event dicts :param bins_spec: bin specification. One of: * ``None`` / ``'auto'``: automatic bin width * integer string (e.g. ``'20'``): that many equal-width bins * duration string (e.g. ``'7d'``, ``'1m'``): fixed bin width :returns: list of ``(bin_start, bin_end, count)`` tuples where *bin_start* and *bin_end* are timezone-aware datetimes and *count* is an ``int``. """ if not events: return [] times_dt = [_utcdatetime_to_datetime(e['time']) for e in events] times_ts = np.array([dt.timestamp() for dt in times_dt]) t0_ts = times_ts.min() t1_ts = times_ts.max() total_span = t1_ts - t0_ts # --- resolve bin edges in timestamp (float seconds) --- if bins_spec is None or bins_spec == 'auto': bin_sec = ( _auto_bin_seconds(total_span) if total_span > 0 else ONE_HOUR_SECONDS ) n_bins = max(1, int(np.ceil(total_span / bin_sec))) edges = np.linspace(t0_ts, t0_ts + n_bins * bin_sec, n_bins + 1) else: # Try integer first try: n_bins = int(bins_spec) n_bins = max(n_bins, 1) edges = np.linspace( t0_ts, t1_ts if total_span > 0 else t0_ts + ONE_HOUR_SECONDS, n_bins + 1 ) except ValueError: bin_sec = _parse_bin_duration(bins_spec) if bin_sec is None: bin_sec = ( _auto_bin_seconds(total_span) if total_span > 0 else ONE_HOUR_SECONDS ) n_bins = max(1, int(np.ceil(total_span / bin_sec))) edges = np.linspace(t0_ts, t0_ts + n_bins * bin_sec, n_bins + 1) # Last edge must be strictly greater than the latest event so that the # rightmost event falls inside the last bin. if edges[-1] <= t1_ts: edges[-1] = t1_ts + 1.0 counts, _ = np.histogram(times_ts, bins=edges) result = [] for i, count in enumerate(counts): bin_start = datetime.fromtimestamp(edges[i], tz=timezone.utc) bin_end = datetime.fromtimestamp(edges[i + 1], tz=timezone.utc) result.append((bin_start, bin_end, int(count))) return result
[docs] def bin_label(bin_start, bin_end): """ Return a short human-readable label for a time bin. The label format adapts to the bin width: * ≤ 2 days → ``YYYY-MM-DD HH:MM`` * ≤ 60 days → ``YYYY-MM-DD`` * ≤ 400 days → ``YYYY-Www`` (ISO week) * otherwise → ``YYYY-MM`` """ width_days = (bin_end - bin_start).total_seconds() / ONE_DAY_SECONDS if width_days <= 2: return bin_start.strftime('%Y-%m-%d %H:%M') if width_days <= 60: return bin_start.strftime('%Y-%m-%d') if width_days <= 400: return bin_start.strftime('%Y-%b') return bin_start.strftime('%Y')
def _seconds_to_human_duration(seconds): """Convert a duration in seconds to a compact human-readable string.""" def _format_value(value): return f'{value:.1f}'.rstrip('0').rstrip('.') def _fmt(value, singular, plural): unit = singular if abs(value - 1.0) < 1e-9 else plural return f'{_format_value(value)} {unit}' if seconds < 2 * ONE_MINUTE_SECONDS: return _fmt(seconds, 'second', 'seconds') if seconds < 2 * ONE_HOUR_SECONDS: return _fmt(seconds / ONE_MINUTE_SECONDS, 'minute', 'minutes') if seconds < 2 * ONE_DAY_SECONDS: return _fmt(seconds / ONE_HOUR_SECONDS, 'hour', 'hours') if seconds < 60 * ONE_DAY_SECONDS: return _fmt(seconds / ONE_DAY_SECONDS, 'day', 'days') if seconds < 2 * ONE_YEAR_SECONDS: return _fmt(seconds / ONE_MONTH_SECONDS, 'month', 'months') return _fmt(seconds / ONE_YEAR_SECONDS, 'year', 'years')
[docs] def get_bin_size_label(bins, bins_spec=None): """ Return a concise label describing the effective bin size. :param bins: list of ``(bin_start, bin_end, count)`` tuples :param bins_spec: user-provided bin spec from CLI :returns: string such as ``'auto, 7 days'`` or ``'20 bins, 7 days each'`` """ if not bins: return 'n/a' width_seconds = (bins[0][1] - bins[0][0]).total_seconds() width_human = _seconds_to_human_duration(width_seconds) if bins_spec is None or str(bins_spec).strip().lower() == 'auto': return f'auto, {width_human}' spec = str(bins_spec).strip() try: n_bins = int(spec) except ValueError: n_bins = None if n_bins is not None: return f'{n_bins} bins, {width_human} each' # duration-like custom spec, or any non-integer string that ended up # falling back to automatic binning. seconds = _parse_bin_duration(spec) if seconds is not None: return _seconds_to_human_duration(seconds) return f'{spec}, {width_human}'
[docs] def get_cumulative_event_times_and_counts(events): """ Return cumulative event count over chronologically sorted raw event times. :param events: EventList of Event dicts :returns: tuple ``(times, counts)`` where *times* is a sorted list of timezone-aware datetimes and *counts* is ``[1, 2, ..., N]`` """ if not events: return [], [] times = sorted(_utcdatetime_to_datetime(e['time']) for e in events) counts = np.arange(1, len(times) + 1, dtype=int).tolist() return times, counts