# -*- coding: utf8 -*-
# SPDX-License-Identifier: GPL-3.0-or-later
"""
FDSN webservices functions for seiscat.
:copyright:
2022-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
GNU General Public License v3.0 or later
(https://www.gnu.org/licenses/gpl-3.0-standalone.html)
"""
from datetime import timedelta
from obspy import UTCDateTime
from obspy import Catalog
from obspy.clients.fdsn import Client
from obspy.clients.fdsn.header import FDSNNoDataException
[docs]
def open_fdsn_connection(config):
"""
Open FDSN connection. Return a FDSN client object.
:param config: config object
:returns: FDSN client object
"""
fdsn_event_url = config.get('fdsn_event_url')
if fdsn_event_url is None:
raise ValueError('FDSN event URL not set.')
user = config.get('fdsn_event_user')
password = config.get('fdsn_event_password')
kwargs = {}
if user is not None:
kwargs['user'] = user
if password is not None:
kwargs['password'] = password
return Client(fdsn_event_url, **kwargs)
def _to_utc_datetime(time):
"""
Convert time to UTCDateTime object.
:param time: time in string format
:returns: UTCDateTime object or None
"""
if time is None:
return None
if time.strip() == '':
raise ValueError('Empty time string.')
try:
return UTCDateTime(time)
except TypeError:
try:
time_interval = _parse_time_interval(time)
return UTCDateTime() + time_interval
except ValueError as e:
raise ValueError(
f'Invalid time format: {time}.\n'
'Please use YYYY-MM-DDTHH:MM:SS or '
'a time interval (typically in the past),\n'
'e.g., -1 day, -2 hours, -5 minutes, -10 seconds.'
) from e
def _parse_time_interval(time_interval):
"""
Parse time interval string.
:param time_interval: time interval in string format
:returns: timedelta object or None
"""
if time_interval is None:
return None
parts = time_interval.split()
if len(parts) != 2:
raise ValueError(f'Invalid time interval: {time_interval}.')
value = int(parts[0])
unit = parts[1]
if unit.endswith('s'):
# remvove plural form
unit = unit[:-1]
if unit == 'day':
return timedelta(days=value)
if unit == 'hour':
return timedelta(hours=value)
if unit == 'minute':
return timedelta(minutes=value)
if unit == 'second':
return timedelta(seconds=value)
raise ValueError(f'Invalid time unit: {unit}')
[docs]
class InvalidQuery(Exception):
"""Invalid query exception."""
[docs]
class QueryArgs():
"""Build query arguments for FDSN client."""
def __init__(self, config, suffix, first_query):
"""
Initialize query arguments.
:param config: config object
:param suffix: suffix to be added to the config keys
:param first_query: True if this is the first query
"""
query_keys = [
'start_time', 'end_time', 'recheck_period',
'lat_min', 'lat_max', 'lon_min', 'lon_max',
'lat0', 'lon0', 'radius_min', 'radius_max',
'depth_min', 'depth_max',
'mag_min', 'mag_max',
'event_type', 'event_type_exclude'
]
if suffix:
query_keys = [k + suffix for k in query_keys]
try:
if all(config[k] is None for k in query_keys):
raise InvalidQuery(
'All query parameters are None. Please set at least one.')
except KeyError as e:
raise InvalidQuery('Not all query parameters are set.') from e
self.starttime = _to_utc_datetime(config[f'start_time{suffix}'])
self.endtime = _to_utc_datetime(config[f'end_time{suffix}'])
recheck_period = _parse_time_interval(
config[f'recheck_period{suffix}'])
if not first_query and self.endtime is None and recheck_period:
self.starttime = max(
self.starttime, UTCDateTime() - recheck_period)
self.minlatitude = config[f'lat_min{suffix}']
self.maxlatitude = config[f'lat_max{suffix}']
self.minlongitude = config[f'lon_min{suffix}']
self.maxlongitude = config[f'lon_max{suffix}']
self.latitude = config[f'lat0{suffix}']
self.longitude = config[f'lon0{suffix}']
self.minradius = config[f'radius_min{suffix}']
self.maxradius = config[f'radius_max{suffix}']
self.mindepth = config[f'depth_min{suffix}']
self.maxdepth = config[f'depth_max{suffix}']
self.minmagnitude = config[f'mag_min{suffix}']
self.maxmagnitude = config[f'mag_max{suffix}']
[docs]
def get_query(self):
"""
Return query arguments as a dictionary.
:returns: dictionary of query arguments
"""
return {
'starttime': self.starttime, 'endtime': self.endtime,
'minlatitude': self.minlatitude, 'maxlatitude': self.maxlatitude,
'minlongitude': self.minlongitude,
'maxlongitude': self.maxlongitude,
'latitude': self.latitude, 'longitude': self.longitude,
'minradius': self.minradius, 'maxradius': self.maxradius,
'mindepth': self.mindepth, 'maxdepth': self.maxdepth,
'minmagnitude': self.minmagnitude,
'maxmagnitude': self.maxmagnitude,
}
def _query_box_or_circle(client, config, suffix=None, first_query=True):
"""
Query events from FDSN client based on box or circle criteria in config.
:param client: FDSN client object
:param config: config object
:param suffix: suffix to be added to the config keys
:param first_query: True if this is the first query
:returns: obspy Catalog object
"""
suffix = '' if suffix is None else suffix
query_args = QueryArgs(config, suffix, first_query)
kwargs = query_args.get_query()
try:
cat = client.get_events(**kwargs)
except FDSNNoDataException:
cat = Catalog()
# filter in included event types
if event_type := config[f'event_type{suffix}']:
cat = Catalog([
ev for ev in cat
if ev.event_type in event_type
])
# filter out excluded event types
if event_type_exclude := config[f'event_type_exclude{suffix}']:
cat = Catalog([
ev for ev in cat
if ev.event_type not in event_type_exclude
])
return cat
[docs]
def query_events(client, config, first_query=True):
"""
Query events from FDSN client based on criteria in config.
:param client: FDSN client object
:param config: config object
:param first_query: True if this is the first query
:returns: obspy Catalog object
"""
print(f'Querying events from FDSN server "{config["fdsn_event_url"]}"...')
cat = _query_box_or_circle(client, config, first_query=first_query)
# see if there are additional queries to be done
n = 1
while True:
try:
_cat = _query_box_or_circle(
client, config, suffix=f'_{n}', first_query=first_query)
except InvalidQuery:
break
cat += _cat
n += 1
print(f'Found {len(cat)} events.')
return cat