Source code for seiscat.database.dbfunctions

# -*- coding: utf8 -*-
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Database functions for seiscat.

:copyright:
    2022-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
    GNU General Public License v3.0 or later
    (https://www.gnu.org/licenses/gpl-3.0-standalone.html)
"""
import os
import re
import shutil
import sqlite3
import numpy as np
from .data_types import Event, EventList

# Current supported DB version
# Increment this number when changing the DB schema
DB_VERSION = 1


# Default columns are part of the core schema and cannot be renamed/deleted.
# 'raw_evid' is optional (only present when keep_raw_evid = True in the config)
# but is protected when it exists.
DEFAULT_COLUMNS = (
    'evid',
    'ver',
    'time',
    'lat',
    'lon',
    'depth',
    'mag',
    'mag_type',
    'event_type',
)


def _get_db_connection(config, initdb=False):
    """
    Get database connection.

    :param config: config object
    :return: database connection

    :raises ValueError: if db_file is not set in config file
    :raises FileNotFoundError: if database file does not exist
    """
    db_file = config.get('db_file', None)
    if db_file is None:
        raise ValueError('db_file not set in config file')
    if not initdb:
        try:
            open(db_file, 'r', encoding='utf8')
        except FileNotFoundError as e:
            raise FileNotFoundError(
                f'Database file "{db_file}" not found.') from e
    return sqlite3.connect(db_file)


def _check_db_version(cursor, config):
    """
    Check if database version is compatible with current version.

    :param cursor: database cursor
    :param config: config object

    :raises ValueError: if db_file version is not supported
    """
    db_version = cursor.execute('PRAGMA user_version').fetchone()[0]
    db_file = config['db_file']
    if db_version < DB_VERSION:
        msg = (
            f'"{db_file}" has an old database version: '
            f'"{db_version}" Current supported version is "{DB_VERSION}".\n'
            'Remove or rename your old database file, '
            'so that a new one can be created.'
        )
        raise ValueError(msg)


def _set_db_version(cursor):
    """
    Set the database version.

    :param cursor: database cursor
    """
    cursor.execute(f'PRAGMA user_version = {DB_VERSION:d}')


[docs] def backup_db(config, move=False): """ Create a backup of the current database file. :param config: config object :param move: if True, move db file to backup path. If False, copy it. :returns: backup file path :raises ValueError: if db_file is not set in config file :raises FileNotFoundError: if database file does not exist """ db_file = config.get('db_file', None) if db_file is None: raise ValueError('db_file not set in config file') if not os.path.exists(db_file): raise FileNotFoundError( f'Database file "{db_file}" does not exist.\n' 'Run "seiscat initdb" first.' ) bak_file = f'{db_file}.bak' if move: os.rename(db_file, bak_file) else: shutil.copy2(db_file, bak_file) print(f'Backup of "{db_file}" saved to "{bak_file}"') return bak_file
[docs] def check_db_exists(config, initdb): """ Check if database file exists. :param config: config object :param initdb: if True, create new database file :raises ValueError: if db_file is not set in config file :raises RuntimeError: if user does not want to overwrite existing database :raises FileNotFoundError: if database file does not exist """ db_file = config.get('db_file', None) if db_file is None: raise ValueError('db_file not set in config file') if initdb and os.path.exists(db_file): ans = input( f'"{db_file}" already exists. ' 'Do you want to overwrite it?\n' f'(Current database will be saved as "{db_file}.bak") [y/N] ' ) if ans not in ['y', 'Y']: raise RuntimeError( 'Existing database file will not be overwritten. Exiting.') backup_db(config, move=True) if not initdb and not os.path.exists(db_file): raise FileNotFoundError( f'Database file "{db_file}" does not exist.\n' 'Run "seiscat initdb" first.' )
def _same_values(values1, values2, skip_begin=0, skip_end=0): """ Check if two lists of values have the same values. :param event1: first event :param event2: second event :param skip_begin: number of fields to skip at the beginning of values :param skip_end: number of fields to skip at the end of values :returns: True if the two lists have the same values, False otherwise """ for idx in range(skip_begin, len(values1) - skip_end): try: # Use np.isclose() for numbers match = np.isclose(values1[idx], values2[idx]) except TypeError: # Use == for strings match = values1[idx] == values2[idx] if not match: return False return True def _event_exists(cursor, values, skip_begin=0, skip_end=0): """ Check if an event exists in the database, based on values. :param cursor: database cursor :param values: list of values :param skip_begin: number of fields to skip at the beginning of values :param skip_end: number of fields to skip at the end of values :returns: True if event exists, False otherwise """ evid = values[0] cursor.execute('SELECT * FROM events WHERE evid = ?', (evid,)) rows = cursor.fetchall() rows_with_same_values = [ row for row in rows if _same_values(values, row, skip_begin, skip_end) ] return len(rows_with_same_values) > 0 def _get_evid(resource_id): """ Get evid from resource_id. :param resource_id: resource_id string :returns: evid string """ evid = resource_id if '/' in evid: evid = resource_id.split('/')[-1] if '?' in evid: evid = resource_id.split('?')[-1] if '&' in evid: evid = evid.split('&')[0] if '=' in evid: evid = evid.split('=')[-1] return evid def _get_db_field_definitions(config): """ Get a list of database fields, number of standard fields, and number of extra fields. :param config: config object :returns: list of field definitions, number of standard fields, number of extra fields """ field_definitions = [ 'evid TEXT', 'ver INTEGER', 'time TEXT', 'lat REAL', 'lon REAL', 'depth REAL', 'mag REAL', 'mag_type TEXT', 'event_type TEXT', ] if config.get('keep_raw_evid', False): field_definitions.append('raw_evid TEXT') n_standard_fields = len(field_definitions) extra_field_names = config['extra_field_names'] or [] extra_field_types = config['extra_field_types'] or [] n_extra_fields = len(extra_field_names) field_definitions.extend( f'{name} {dbtype}' for name, dbtype in zip(extra_field_names, extra_field_types)) return field_definitions, n_standard_fields, n_extra_fields def _get_db_values_from_event(ev, config): """ Get a list of values from an obspy event object. :param ev: obspy event object :param config: config object :returns: list of values :raises ValueError: if event has no origin """ evid = _get_evid(str(ev.resource_id.id)) version = 1 try: orig = ev.preferred_origin() or ev.origins[0] except IndexError as e: raise ValueError(f'Event {evid} has no origin') from e time = str(orig.time) lat = orig.latitude lon = orig.longitude try: depth = orig.depth / 1e3 # km except TypeError: depth = None try: magnitude = ev.preferred_magnitude() or ev.magnitudes[0] mag = magnitude.mag mag_type = magnitude.magnitude_type except IndexError: mag = None mag_type = None event_type = ev.event_type values = [ evid, version, time, lat, lon, depth, mag, mag_type, event_type] if config.get('keep_raw_evid', False): values.append(str(ev.resource_id.id)) # add extra fields (configured defaults and/or per-event values) extra_field_names = config['extra_field_names'] or [] extra_field_defaults = config['extra_field_defaults'] or [] ev_extra = getattr(ev, 'extra', {}) or {} for idx, field_name in enumerate(extra_field_names): extra_value = ev_extra.get(field_name) if isinstance(extra_value, dict): extra_value = extra_value.get('value') if extra_value is None and idx < len(extra_field_defaults): extra_value = extra_field_defaults[idx] values.append(extra_value) return values def _merge_extra_fields_in_config(config, extra_column_names): """Merge runtime extra columns into config extra field lists.""" extra_field_names = list(config['extra_field_names'] or []) extra_field_types = list(config['extra_field_types'] or []) extra_field_defaults = list(config['extra_field_defaults'] or []) for column_name in extra_column_names: if column_name in extra_field_names: continue extra_field_names.append(column_name) extra_field_types.append('TEXT') extra_field_defaults.append(None) config['extra_field_names'] = extra_field_names config['extra_field_types'] = extra_field_types config['extra_field_defaults'] = extra_field_defaults def _add_runtime_columns_to_db(config, column_names): """Add runtime-discovered columns to events table.""" existing_columns = get_db_columns(config) for column_name in column_names: if column_name in existing_columns: continue add_column_to_db(config, column_name) existing_columns.append(column_name) def _validate_column_name(column_name): """Validate a database column name.""" if not re.fullmatch(r'[A-Za-z_][A-Za-z0-9_]*', column_name): raise ValueError( f'Invalid column name "{column_name}". ' 'Use only letters, numbers, and underscores, ' 'and do not start with a number.' ) def _parse_column_definition(column_definition): """Parse a column definition string in the form NAME[:TYPE].""" if ':' in column_definition: name, col_type = column_definition.split(':', 1) else: name, col_type = column_definition, 'TEXT' name = name.strip() col_type = col_type.strip().upper() _validate_column_name(name) allowed_types = {'TEXT', 'INTEGER', 'REAL', 'NUMERIC', 'BLOB'} if col_type not in allowed_types: raise ValueError( f'Invalid column type "{col_type}". ' f'Allowed types: {", ".join(sorted(allowed_types))}' ) return name, col_type
[docs] def get_db_columns(config): """Return the list of column names in the events table.""" conn = _get_db_connection(config) cursor = conn.cursor() cursor.execute('PRAGMA table_info(events)') columns = [row[1] for row in cursor.fetchall()] conn.close() return columns
[docs] def add_column_to_db(config, column_definition): """Add a new column to the events table.""" name, col_type = _parse_column_definition(column_definition) columns = get_db_columns(config) if name in columns: raise ValueError(f'Column "{name}" already exists in database') conn = _get_db_connection(config) cursor = conn.cursor() cursor.execute(f'ALTER TABLE events ADD COLUMN {name} {col_type}') conn.commit() conn.close() print(f'Added column "{name}" with type {col_type}')
[docs] def delete_column_from_db(config, column_name): """Delete a column from the events table.""" column_name = column_name.strip() _validate_column_name(column_name) if column_name in DEFAULT_COLUMNS: raise ValueError( f'Column "{column_name}" is protected and cannot be deleted') columns = get_db_columns(config) if column_name not in columns: raise ValueError(f'Column "{column_name}" not found in database') conn = _get_db_connection(config) cursor = conn.cursor() try: cursor.execute(f'ALTER TABLE events DROP COLUMN {column_name}') except sqlite3.OperationalError as e: raise ValueError( f'Cannot delete column "{column_name}": {e.args[0]}') from e conn.commit() conn.close() print(f'Deleted column "{column_name}"')
[docs] def rename_column_in_db(config, rename_definition): """Rename a column in the events table using OLD=NEW syntax.""" if '=' not in rename_definition: raise ValueError( f'Invalid argument "{rename_definition}" for "--rename-column". ' 'Argument must be in the form "old_name=new_name"' ) old_name, new_name = rename_definition.split('=', 1) old_name = old_name.strip() new_name = new_name.strip() _validate_column_name(old_name) _validate_column_name(new_name) if old_name in DEFAULT_COLUMNS: raise ValueError( f'Column "{old_name}" is protected and cannot be renamed') if new_name in DEFAULT_COLUMNS: raise ValueError( f'Column name "{new_name}" is reserved and cannot be used') columns = get_db_columns(config) if old_name not in columns: raise ValueError(f'Column "{old_name}" not found in database') if new_name in columns: raise ValueError(f'Column "{new_name}" already exists in database') conn = _get_db_connection(config) cursor = conn.cursor() try: cursor.execute( f'ALTER TABLE events RENAME COLUMN {old_name} TO {new_name}') except sqlite3.OperationalError as e: raise ValueError( f'Cannot rename column "{old_name}" to "{new_name}": ' f'{e.args[0]}') from e conn.commit() conn.close() print(f'Renamed column "{old_name}" to "{new_name}"')
def _add_event_to_db( config, ev, cursor, initdb, n_standard_fields, n_extra_fields): """ Add an obspy event to the database. :param config: config object :param ev: obspy event object :param cursor: database cursor :param initdb: if True, create new database file :param n_standard_fields: number of standard fields :param n_extra_fields: number of extra fields :returns: number of events created, number of events updated """ values = _get_db_values_from_event(ev, config) evid, version = values[:2] ncreated = nupdated = 0 # skip evid and ver fields as well as extra fields if _event_exists(cursor, values, skip_begin=2, skip_end=n_extra_fields): return ncreated, nupdated if initdb or config['overwrite_updated_events']: # if the event exists, get the values of extra fields ev_exists = False cursor.execute( 'SELECT * FROM events WHERE evid = ? AND ver = ?', (evid, version)) row = cursor.fetchone() if row is not None: ev_exists = True row = list(row) # merge event values with existing extra field values values = values[:n_standard_fields] + row[n_standard_fields:] # add events to table, replace events that already exist cursor.execute( 'INSERT OR REPLACE INTO events VALUES ' f'({", ".join("?" * len(values))})', values) if ev_exists: nupdated = cursor.rowcount else: ncreated = cursor.rowcount return ncreated, nupdated # add event to table, increment version if evid already exists while True: ev_exists = False try: cursor.execute( 'INSERT INTO events VALUES ' f'({", ".join("?" * len(values))})', values) if ev_exists: nupdated = cursor.rowcount else: ncreated = cursor.rowcount break except sqlite3.IntegrityError: # evid and ver already exist ev_exists = True # increment version in the values list values[1] += 1 return ncreated, nupdated
[docs] def write_catalog_to_db(cat, config, initdb): """ Write catalog to database. :param cat: obspy Catalog object :param config: config object :param initdb: if True, create new database file """ conn = _get_db_connection(config, initdb) cursor = conn.cursor() if initdb: _set_db_version(cursor) else: _check_db_version(cursor, config) field_definitions, _, _ = _get_db_field_definitions(config) # create table if it doesn't exist, use evid and ver as primary key cursor.execute( 'CREATE TABLE IF NOT EXISTS events ' f'({", ".join(field_definitions)}, PRIMARY KEY (evid, ver))') conn.commit() conn.close() if initdb and config.get('_csv_extra_columns'): _add_runtime_columns_to_db(config, config['_csv_extra_columns']) _merge_extra_fields_in_config(config, config['_csv_extra_columns']) conn = _get_db_connection(config) cursor = conn.cursor() _check_db_version(cursor, config) _, n_standard_fields, n_extra_fields = _get_db_field_definitions(config) events_created = 0 events_updated = 0 for ev in cat: try: _ncreated, _nupdated = _add_event_to_db( config, ev, cursor, initdb, n_standard_fields, n_extra_fields) except ValueError as e: print(e) continue events_created += _ncreated events_updated += _nupdated # close database connection conn.commit() if events_created: plural = 's' if events_created > 1 else '' print( f'{events_created} new event{plural} added to the database ' f'"{config["db_file"]}"' ) if events_updated: plural = 's' if events_updated > 1 else '' print( f'{events_updated} event{plural} updated in the database ' f'"{config["db_file"]}"' ) if not events_created and not events_updated: print('No new events added or updated in the database')
def _process_where_option(where_str): """ Process the `where` option to create a SQL WHERE filter with "?" placeholders and a list of values. :param where_str: string passed to the `where` option :returns: SQL WHERE filter, list of values """ # Define a regular expression pattern to match key-value pairs # with optional spaces around operators. Possible operators are # =, <, >, <=, >=, != pattern = re.compile(r'(\w+)\s*([><!=]+)\s*([\w\d.]+)') values = [] def _replace(match): key, op, value = match.groups() val_lower = value.lower() if val_lower in ('none', 'null'): if op in ('=', '=='): return f'{key} IS NULL' if op in ('!=', '<>'): return f'{key} IS NOT NULL' values.append(value) return f'{key}{op}?' # Create the where filter by replacing the key-op-value pattern with # key-op-? to create a placeholder for the value. where_filter = pattern.sub(_replace, where_str) return where_filter, values def _build_query( cursor, config, eventid=None, version=None, field_list=None, honor_where_filter=True, honor_sortby=True): """ Build a query to read events from the database. :param config: config object :param eventid: limit to events with this evid. If ``None``, then the eventid is taken from the command line arguments via ``config['args']`` :param version: limit to events with this version. If ``None``, then the version is taken from the command line arguments via ``config['args']`` :param field_list: list of fields to read from the database :param honor_where_filter: if ``True``, honor the ``where`` option :param honor_sortby: if ``True``, honor the ``sortby`` option :param cursor: database cursor :returns: query, query values, fields :note: if a ``field_list`` is provided, a ``sortby`` field is always added to the query, for sorting purposes. """ args = config['args'] if eventid is None: eventid = getattr(args, 'eventid', None) if eventid == 'ALL': eventid = None if eventid is not None: # raise ValueError if eventid is not in database cursor.execute('SELECT * FROM events WHERE evid = ?', (eventid,)) if not cursor.fetchall(): raise ValueError(f'Event {eventid} not found in database') if version is None: version = getattr(args, 'event_version', None) where = getattr(args, 'where', None) if honor_where_filter else None sortby = getattr(args, 'sortby', 'time') if honor_sortby else 'time' # Track which fields were added for sorting purposes only added_fields = [] if field_list is not None: # always query sortby field and version, for sorting fields = field_list[:] if sortby not in fields: fields.append(sortby) added_fields.append(sortby) if 'ver' not in fields: fields.append('ver') added_fields.append('ver') query = f'SELECT {", ".join(fields)} FROM events' else: # read field names cursor.execute('PRAGMA table_info(events)') # we just need the field names, which are in the second column fields = [f[1] for f in cursor.fetchall()] query = 'SELECT * FROM events' query_values = [] if where is not None: where_filter, values = _process_where_option(where) query = f'{query} WHERE {where_filter}' query_values += values if eventid is not None: query += ' AND evid = ?' if 'WHERE' in query else ' WHERE evid = ?' query_values.append(eventid) if version is not None: query += ' AND ver = ?' if 'WHERE' in query else ' WHERE ver = ?' query_values.append(version) return query, query_values, fields, added_fields def _keep_latest_version(rows, fields): """ Keep only the latest version of each event in the list of rows. :param rows: list of rows :param fields: list of fields :returns: list of kept rows """ evid_index = fields.index('evid') ver_index = fields.index('ver') evids = set() rows_to_keep = [] for row in sorted( rows, key=lambda r: (r[evid_index], r[ver_index]), reverse=True ): evid = row[evid_index] if evid not in evids: rows_to_keep.append(row) evids.add(evid) return rows_to_keep def _sort_rows(rows, fields, sortby='time', reverse=False, added_fields=None): """ Sort rows by a given field and version; reverse if needed. :param rows: list of rows :param fields: list of fields :param sortby: field name to sort by (default: 'time') :param reverse: if True, sort in reverse order :param added_fields: list of fields that were added for sorting purposes and should be removed after sorting :returns: sorted fields, sorted rows """ if added_fields is None: added_fields = [] try: sortby_index = fields.index(sortby) except ValueError: # If sortby field doesn't exist, fall back to time sortby = 'time' sortby_index = fields.index(sortby) ver_index = fields.index('ver') rows.sort(key=lambda r: (r[sortby_index], r[ver_index]), reverse=reverse) # Remove fields that were added for sorting purposes only if added_fields: # Count how many fields to remove from the end n_to_remove = 0 for field in reversed(added_fields): if not fields or fields[-1] != field: break n_to_remove += 1 fields = fields[:-1] if n_to_remove > 0: rows = [r[:-n_to_remove] for r in rows] return fields, rows
[docs] def read_fields_and_rows_from_db( config, eventid=None, version=None, field_list=None, honor_where_filter=True, honor_sortby=True, honor_reverse=True): """ Read fields and rows from database. Return a list of fields and a list of rows. The rows are sorted by time and version. :param config: config object :param eventid: limit to events with this evid. If ``None``, then the eventid is taken from the command line arguments via ``config['args']`` :param version: limit to events with this version. If ``None``, then the version is taken from the command line arguments via ``config['args']`` :param field_list: list of fields to read from the database :param honor_where_filter: if ``True``, honor the ``where`` option :param honor_sortby: if ``True``, honor the ``sortby`` option :param honor_reverse: if ``True``, honor the ``reverse`` option :returns: list of fields, list of rows :raises ValueError: if field is not found in database """ conn = _get_db_connection(config) cursor = conn.cursor() query, query_values, fields, added_fields = _build_query( cursor, config, eventid, version, field_list, honor_where_filter, honor_sortby) try: cursor.execute(query, query_values) except sqlite3.OperationalError as e: field = e.args[0].split()[-1] raise ValueError(f'Field "{field}" not found in database') from e rows = cursor.fetchall() conn.close() if not getattr(config['args'], 'allversions', True): rows = _keep_latest_version(rows, fields) sortby = getattr(config['args'], 'sortby', 'time')\ if honor_sortby else 'time' reverse = getattr(config['args'], 'reverse', False)\ if honor_reverse else False return _sort_rows(rows, fields, sortby, reverse, added_fields)
[docs] def replicate_event_in_db(config, eventid, version=1): """ Replicate an event in the database. The new event will have the same evid as the original event, but a different version. :param config: config object :param eventid: event id of the original event :param version: version of the original event :raises ValueError: if eventid/version is not found in database """ fields, rows = read_fields_and_rows_from_db( config, eventid=eventid, version=version) if not rows: raise ValueError( f'Event {eventid} version {version} not found in database') row = list(rows[0]) # increment version ver_index = fields.index('ver') row[ver_index] += 1 conn = _get_db_connection(config) c = conn.cursor() while True: try: c.execute( 'INSERT INTO events VALUES ' f'({", ".join("?" * len(row))})', row) break except sqlite3.IntegrityError: # version already exists, increment version and try again row[ver_index] += 1 # close database connection conn.commit() print(f'Added event {eventid} version {row[ver_index]} to database')
[docs] def delete_event_from_db(config, eventid, version=None): """ Delete an event from the database. :param config: config object :param eventid: event id of the event to delete (if None, delete all events for the given version) :param version: version of the event to delete (if None, delete all versions of the event) """ msg = None conn = _get_db_connection(config) c = conn.cursor() if eventid is None and version is None: c.execute('DELETE FROM events') msg = 'All events deleted from database' elif eventid is None: c.execute('DELETE FROM events WHERE ver = ?', (version,)) msg = f'All events of version {version} deleted from database' if eventid is not None and version is not None: c.execute( 'DELETE FROM events WHERE evid = ? AND ver = ?', (eventid, version)) msg = f'Event {eventid} version {version} deleted from database' elif eventid is not None: c.execute('DELETE FROM events WHERE evid = ?', (eventid,)) msg = f'Event {eventid} deleted from database' if msg is None: # this should never happen raise ValueError('Invalid combination of eventid and version') # close database connection conn.commit() print(msg)
[docs] def update_event_in_db(config, eventid, version, field, value): """ Update an event in the database. :param config: config object :param eventid: event id of the event to update :param version: version of the event to update :param field: field to update :param value: new value :raises ValueError: if field is not found in database """ conn = _get_db_connection(config) c = conn.cursor() try: c.execute( f'UPDATE events SET {field} = ? WHERE evid = ? AND ver = ?', (value, eventid, version)) except sqlite3.OperationalError as e: raise ValueError(f'Field "{field}" not found in database') from e # close database connection conn.commit() print( f'Updated field "{field}={value}" ' f'for event {eventid} version {version}')
[docs] def increment_event_in_db(config, eventid, version, field, value): """ Increment an event in the database. :param config: config object :param eventid: event id of the event to update :param version: version of the event to update :param field: field to update :param value: value to increment, must be a number :raises ValueError: if field is not found in database, or if value is not a number """ conn = _get_db_connection(config) c = conn.cursor() # check if value is numeric try: value = float(value) except ValueError as e: raise ValueError(f'Value "{value}" is not a number') from e # if value is an integer, convert it to int if value == int(value): value = int(value) # read old value from database and check if it is numeric try: c.execute( f'SELECT {field} FROM events WHERE evid = ? AND ver = ?', (eventid, version)) old_value = c.fetchone()[0] try: new_value = float(old_value) + value if new_value == int(new_value): new_value = int(new_value) except ValueError as e: raise ValueError(f'Field "{field}" is not a number') from e except sqlite3.OperationalError as e: raise ValueError(f'Field "{field}" not found in database') from e # update database try: c.execute( f'UPDATE events SET {field} = ? ' 'WHERE evid = ? AND ver = ?', (new_value, eventid, version)) except sqlite3.OperationalError as e: raise ValueError(f'Field "{field}" not found in database') from e # close database connection conn.commit() print( f'Field "{field}" incremented by "{value}" ' f'for event {eventid} version {version}')
[docs] def read_events_from_db(config, eventid=None, version=None): """ Read events from database. Return a list of events. :param config: config object :param eventid: limit to events with this evid :param version: limit to events with this version :returns: list of events, each event is a dictionary-like object """ # get fields and rows from database # rows are sorted by time and version and reversed if requested fields, rows = read_fields_and_rows_from_db(config, eventid, version) events_list = EventList() for event in rows: event_dict = Event(zip(fields, event)) events_list.append(event_dict) return events_list
[docs] def read_evids_and_versions_from_db(config): """ Get a list of event ids and versions from the database. This function only onors the ``allversions`` option but not the ``where`` option. :param config: config object :returns: list of tuples (evid, version) """ _fields, rows = read_fields_and_rows_from_db( config, field_list=['evid', 'ver'], honor_where_filter=False) return rows
[docs] def get_catalog_stats(config): """ Get a string with catalog statistics. :param config: config object :returns: string with catalog statistics """ events = read_events_from_db(config) nevents = len(events) tmin = min(event['time'] for event in events) tmax = max(event['time'] for event in events) tmin = tmin.strftime('%Y-%m-%dT%H:%M:%S') tmax = tmax.strftime('%Y-%m-%dT%H:%M:%S') stats_str = f'{nevents} events from {tmin} to {tmax}' if mags := [event['mag'] for event in events if event['mag'] is not None]: mag_min = min(mags) mag_max = max(mags) stats_str += f'\nMagnitude range: {mag_min:.1f} -- {mag_max:.1f}' return stats_str