# -*- coding: utf8 -*-
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Database functions for seiscat.
:copyright:
2022-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
GNU General Public License v3.0 or later
(https://www.gnu.org/licenses/gpl-3.0-standalone.html)
"""
import os
import re
import shutil
import sqlite3
import numpy as np
from .data_types import Event, EventList
# Current supported DB version
# Increment this number when changing the DB schema
DB_VERSION = 1
# Default columns are part of the core schema and cannot be renamed/deleted.
# 'raw_evid' is optional (only present when keep_raw_evid = True in the config)
# but is protected when it exists.
DEFAULT_COLUMNS = (
'evid',
'ver',
'time',
'lat',
'lon',
'depth',
'mag',
'mag_type',
'event_type',
)
def _get_db_connection(config, initdb=False):
"""
Get database connection.
:param config: config object
:return: database connection
:raises ValueError: if db_file is not set in config file
:raises FileNotFoundError: if database file does not exist
"""
db_file = config.get('db_file', None)
if db_file is None:
raise ValueError('db_file not set in config file')
if not initdb:
try:
open(db_file, 'r', encoding='utf8')
except FileNotFoundError as e:
raise FileNotFoundError(
f'Database file "{db_file}" not found.') from e
return sqlite3.connect(db_file)
def _check_db_version(cursor, config):
"""
Check if database version is compatible with current version.
:param cursor: database cursor
:param config: config object
:raises ValueError: if db_file version is not supported
"""
db_version = cursor.execute('PRAGMA user_version').fetchone()[0]
db_file = config['db_file']
if db_version < DB_VERSION:
msg = (
f'"{db_file}" has an old database version: '
f'"{db_version}" Current supported version is "{DB_VERSION}".\n'
'Remove or rename your old database file, '
'so that a new one can be created.'
)
raise ValueError(msg)
def _set_db_version(cursor):
"""
Set the database version.
:param cursor: database cursor
"""
cursor.execute(f'PRAGMA user_version = {DB_VERSION:d}')
[docs]
def backup_db(config, move=False):
"""
Create a backup of the current database file.
:param config: config object
:param move: if True, move db file to backup path. If False, copy it.
:returns: backup file path
:raises ValueError: if db_file is not set in config file
:raises FileNotFoundError: if database file does not exist
"""
db_file = config.get('db_file', None)
if db_file is None:
raise ValueError('db_file not set in config file')
if not os.path.exists(db_file):
raise FileNotFoundError(
f'Database file "{db_file}" does not exist.\n'
'Run "seiscat initdb" first.'
)
bak_file = f'{db_file}.bak'
if move:
os.rename(db_file, bak_file)
else:
shutil.copy2(db_file, bak_file)
print(f'Backup of "{db_file}" saved to "{bak_file}"')
return bak_file
[docs]
def check_db_exists(config, initdb):
"""
Check if database file exists.
:param config: config object
:param initdb: if True, create new database file
:raises ValueError: if db_file is not set in config file
:raises RuntimeError: if user does not want to overwrite existing database
:raises FileNotFoundError: if database file does not exist
"""
db_file = config.get('db_file', None)
if db_file is None:
raise ValueError('db_file not set in config file')
if initdb and os.path.exists(db_file):
ans = input(
f'"{db_file}" already exists. '
'Do you want to overwrite it?\n'
f'(Current database will be saved as "{db_file}.bak") [y/N] '
)
if ans not in ['y', 'Y']:
raise RuntimeError(
'Existing database file will not be overwritten. Exiting.')
backup_db(config, move=True)
if not initdb and not os.path.exists(db_file):
raise FileNotFoundError(
f'Database file "{db_file}" does not exist.\n'
'Run "seiscat initdb" first.'
)
def _same_values(values1, values2, skip_begin=0, skip_end=0):
"""
Check if two lists of values have the same values.
:param event1: first event
:param event2: second event
:param skip_begin: number of fields to skip at the beginning of values
:param skip_end: number of fields to skip at the end of values
:returns: True if the two lists have the same values, False otherwise
"""
for idx in range(skip_begin, len(values1) - skip_end):
try:
# Use np.isclose() for numbers
match = np.isclose(values1[idx], values2[idx])
except TypeError:
# Use == for strings
match = values1[idx] == values2[idx]
if not match:
return False
return True
def _event_exists(cursor, values, skip_begin=0, skip_end=0):
"""
Check if an event exists in the database, based on values.
:param cursor: database cursor
:param values: list of values
:param skip_begin: number of fields to skip at the beginning of values
:param skip_end: number of fields to skip at the end of values
:returns: True if event exists, False otherwise
"""
evid = values[0]
cursor.execute('SELECT * FROM events WHERE evid = ?', (evid,))
rows = cursor.fetchall()
rows_with_same_values = [
row for row in rows
if _same_values(values, row, skip_begin, skip_end)
]
return len(rows_with_same_values) > 0
def _get_evid(resource_id):
"""
Get evid from resource_id.
:param resource_id: resource_id string
:returns: evid string
"""
evid = resource_id
if '/' in evid:
evid = resource_id.split('/')[-1]
if '?' in evid:
evid = resource_id.split('?')[-1]
if '&' in evid:
evid = evid.split('&')[0]
if '=' in evid:
evid = evid.split('=')[-1]
return evid
def _get_db_field_definitions(config):
"""
Get a list of database fields, number of standard fields, and number of
extra fields.
:param config: config object
:returns: list of field definitions, number of standard fields,
number of extra fields
"""
field_definitions = [
'evid TEXT',
'ver INTEGER',
'time TEXT',
'lat REAL',
'lon REAL',
'depth REAL',
'mag REAL',
'mag_type TEXT',
'event_type TEXT',
]
if config.get('keep_raw_evid', False):
field_definitions.append('raw_evid TEXT')
n_standard_fields = len(field_definitions)
extra_field_names = config['extra_field_names'] or []
extra_field_types = config['extra_field_types'] or []
n_extra_fields = len(extra_field_names)
field_definitions.extend(
f'{name} {dbtype}' for name, dbtype
in zip(extra_field_names, extra_field_types))
return field_definitions, n_standard_fields, n_extra_fields
def _get_db_values_from_event(ev, config):
"""
Get a list of values from an obspy event object.
:param ev: obspy event object
:param config: config object
:returns: list of values
:raises ValueError: if event has no origin
"""
evid = _get_evid(str(ev.resource_id.id))
version = 1
try:
orig = ev.preferred_origin() or ev.origins[0]
except IndexError as e:
raise ValueError(f'Event {evid} has no origin') from e
time = str(orig.time)
lat = orig.latitude
lon = orig.longitude
try:
depth = orig.depth / 1e3 # km
except TypeError:
depth = None
try:
magnitude = ev.preferred_magnitude() or ev.magnitudes[0]
mag = magnitude.mag
mag_type = magnitude.magnitude_type
except IndexError:
mag = None
mag_type = None
event_type = ev.event_type
values = [
evid, version, time, lat, lon, depth, mag, mag_type, event_type]
if config.get('keep_raw_evid', False):
values.append(str(ev.resource_id.id))
# add extra fields (configured defaults and/or per-event values)
extra_field_names = config['extra_field_names'] or []
extra_field_defaults = config['extra_field_defaults'] or []
ev_extra = getattr(ev, 'extra', {}) or {}
for idx, field_name in enumerate(extra_field_names):
extra_value = ev_extra.get(field_name)
if isinstance(extra_value, dict):
extra_value = extra_value.get('value')
if extra_value is None and idx < len(extra_field_defaults):
extra_value = extra_field_defaults[idx]
values.append(extra_value)
return values
def _merge_extra_fields_in_config(config, extra_column_names):
"""Merge runtime extra columns into config extra field lists."""
extra_field_names = list(config['extra_field_names'] or [])
extra_field_types = list(config['extra_field_types'] or [])
extra_field_defaults = list(config['extra_field_defaults'] or [])
for column_name in extra_column_names:
if column_name in extra_field_names:
continue
extra_field_names.append(column_name)
extra_field_types.append('TEXT')
extra_field_defaults.append(None)
config['extra_field_names'] = extra_field_names
config['extra_field_types'] = extra_field_types
config['extra_field_defaults'] = extra_field_defaults
def _add_runtime_columns_to_db(config, column_names):
"""Add runtime-discovered columns to events table."""
existing_columns = get_db_columns(config)
for column_name in column_names:
if column_name in existing_columns:
continue
add_column_to_db(config, column_name)
existing_columns.append(column_name)
def _validate_column_name(column_name):
"""Validate a database column name."""
if not re.fullmatch(r'[A-Za-z_][A-Za-z0-9_]*', column_name):
raise ValueError(
f'Invalid column name "{column_name}". '
'Use only letters, numbers, and underscores, '
'and do not start with a number.'
)
def _parse_column_definition(column_definition):
"""Parse a column definition string in the form NAME[:TYPE]."""
if ':' in column_definition:
name, col_type = column_definition.split(':', 1)
else:
name, col_type = column_definition, 'TEXT'
name = name.strip()
col_type = col_type.strip().upper()
_validate_column_name(name)
allowed_types = {'TEXT', 'INTEGER', 'REAL', 'NUMERIC', 'BLOB'}
if col_type not in allowed_types:
raise ValueError(
f'Invalid column type "{col_type}". '
f'Allowed types: {", ".join(sorted(allowed_types))}'
)
return name, col_type
[docs]
def get_db_columns(config):
"""Return the list of column names in the events table."""
conn = _get_db_connection(config)
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(events)')
columns = [row[1] for row in cursor.fetchall()]
conn.close()
return columns
[docs]
def add_column_to_db(config, column_definition):
"""Add a new column to the events table."""
name, col_type = _parse_column_definition(column_definition)
columns = get_db_columns(config)
if name in columns:
raise ValueError(f'Column "{name}" already exists in database')
conn = _get_db_connection(config)
cursor = conn.cursor()
cursor.execute(f'ALTER TABLE events ADD COLUMN {name} {col_type}')
conn.commit()
conn.close()
print(f'Added column "{name}" with type {col_type}')
[docs]
def delete_column_from_db(config, column_name):
"""Delete a column from the events table."""
column_name = column_name.strip()
_validate_column_name(column_name)
if column_name in DEFAULT_COLUMNS:
raise ValueError(
f'Column "{column_name}" is protected and cannot be deleted')
columns = get_db_columns(config)
if column_name not in columns:
raise ValueError(f'Column "{column_name}" not found in database')
conn = _get_db_connection(config)
cursor = conn.cursor()
try:
cursor.execute(f'ALTER TABLE events DROP COLUMN {column_name}')
except sqlite3.OperationalError as e:
raise ValueError(
f'Cannot delete column "{column_name}": {e.args[0]}') from e
conn.commit()
conn.close()
print(f'Deleted column "{column_name}"')
[docs]
def rename_column_in_db(config, rename_definition):
"""Rename a column in the events table using OLD=NEW syntax."""
if '=' not in rename_definition:
raise ValueError(
f'Invalid argument "{rename_definition}" for "--rename-column". '
'Argument must be in the form "old_name=new_name"'
)
old_name, new_name = rename_definition.split('=', 1)
old_name = old_name.strip()
new_name = new_name.strip()
_validate_column_name(old_name)
_validate_column_name(new_name)
if old_name in DEFAULT_COLUMNS:
raise ValueError(
f'Column "{old_name}" is protected and cannot be renamed')
if new_name in DEFAULT_COLUMNS:
raise ValueError(
f'Column name "{new_name}" is reserved and cannot be used')
columns = get_db_columns(config)
if old_name not in columns:
raise ValueError(f'Column "{old_name}" not found in database')
if new_name in columns:
raise ValueError(f'Column "{new_name}" already exists in database')
conn = _get_db_connection(config)
cursor = conn.cursor()
try:
cursor.execute(
f'ALTER TABLE events RENAME COLUMN {old_name} TO {new_name}')
except sqlite3.OperationalError as e:
raise ValueError(
f'Cannot rename column "{old_name}" to "{new_name}": '
f'{e.args[0]}') from e
conn.commit()
conn.close()
print(f'Renamed column "{old_name}" to "{new_name}"')
def _add_event_to_db(
config, ev, cursor, initdb, n_standard_fields, n_extra_fields):
"""
Add an obspy event to the database.
:param config: config object
:param ev: obspy event object
:param cursor: database cursor
:param initdb: if True, create new database file
:param n_standard_fields: number of standard fields
:param n_extra_fields: number of extra fields
:returns: number of events created, number of events updated
"""
values = _get_db_values_from_event(ev, config)
evid, version = values[:2]
ncreated = nupdated = 0
# skip evid and ver fields as well as extra fields
if _event_exists(cursor, values, skip_begin=2, skip_end=n_extra_fields):
return ncreated, nupdated
if initdb or config['overwrite_updated_events']:
# if the event exists, get the values of extra fields
ev_exists = False
cursor.execute(
'SELECT * FROM events WHERE evid = ? AND ver = ?',
(evid, version))
row = cursor.fetchone()
if row is not None:
ev_exists = True
row = list(row)
# merge event values with existing extra field values
values = values[:n_standard_fields] + row[n_standard_fields:]
# add events to table, replace events that already exist
cursor.execute(
'INSERT OR REPLACE INTO events VALUES '
f'({", ".join("?" * len(values))})', values)
if ev_exists:
nupdated = cursor.rowcount
else:
ncreated = cursor.rowcount
return ncreated, nupdated
# add event to table, increment version if evid already exists
while True:
ev_exists = False
try:
cursor.execute(
'INSERT INTO events VALUES '
f'({", ".join("?" * len(values))})', values)
if ev_exists:
nupdated = cursor.rowcount
else:
ncreated = cursor.rowcount
break
except sqlite3.IntegrityError:
# evid and ver already exist
ev_exists = True
# increment version in the values list
values[1] += 1
return ncreated, nupdated
[docs]
def write_catalog_to_db(cat, config, initdb):
"""
Write catalog to database.
:param cat: obspy Catalog object
:param config: config object
:param initdb: if True, create new database file
"""
conn = _get_db_connection(config, initdb)
cursor = conn.cursor()
if initdb:
_set_db_version(cursor)
else:
_check_db_version(cursor, config)
field_definitions, _, _ = _get_db_field_definitions(config)
# create table if it doesn't exist, use evid and ver as primary key
cursor.execute(
'CREATE TABLE IF NOT EXISTS events '
f'({", ".join(field_definitions)}, PRIMARY KEY (evid, ver))')
conn.commit()
conn.close()
if initdb and config.get('_csv_extra_columns'):
_add_runtime_columns_to_db(config, config['_csv_extra_columns'])
_merge_extra_fields_in_config(config, config['_csv_extra_columns'])
conn = _get_db_connection(config)
cursor = conn.cursor()
_check_db_version(cursor, config)
_, n_standard_fields, n_extra_fields = _get_db_field_definitions(config)
events_created = 0
events_updated = 0
for ev in cat:
try:
_ncreated, _nupdated = _add_event_to_db(
config, ev, cursor, initdb, n_standard_fields, n_extra_fields)
except ValueError as e:
print(e)
continue
events_created += _ncreated
events_updated += _nupdated
# close database connection
conn.commit()
if events_created:
plural = 's' if events_created > 1 else ''
print(
f'{events_created} new event{plural} added to the database '
f'"{config["db_file"]}"'
)
if events_updated:
plural = 's' if events_updated > 1 else ''
print(
f'{events_updated} event{plural} updated in the database '
f'"{config["db_file"]}"'
)
if not events_created and not events_updated:
print('No new events added or updated in the database')
def _process_where_option(where_str):
"""
Process the `where` option to create a SQL WHERE filter with "?"
placeholders and a list of values.
:param where_str: string passed to the `where` option
:returns: SQL WHERE filter, list of values
"""
# Define a regular expression pattern to match key-value pairs
# with optional spaces around operators. Possible operators are
# =, <, >, <=, >=, !=
pattern = re.compile(r'(\w+)\s*([><!=]+)\s*([\w\d.]+)')
values = []
def _replace(match):
key, op, value = match.groups()
val_lower = value.lower()
if val_lower in ('none', 'null'):
if op in ('=', '=='):
return f'{key} IS NULL'
if op in ('!=', '<>'):
return f'{key} IS NOT NULL'
values.append(value)
return f'{key}{op}?'
# Create the where filter by replacing the key-op-value pattern with
# key-op-? to create a placeholder for the value.
where_filter = pattern.sub(_replace, where_str)
return where_filter, values
def _build_query(
cursor, config, eventid=None, version=None, field_list=None,
honor_where_filter=True, honor_sortby=True):
"""
Build a query to read events from the database.
:param config: config object
:param eventid: limit to events with this evid. If ``None``, then the
eventid is taken from the command line arguments via
``config['args']``
:param version: limit to events with this version. If ``None``, then the
version is taken from the command line arguments via
``config['args']``
:param field_list: list of fields to read from the database
:param honor_where_filter: if ``True``, honor the ``where`` option
:param honor_sortby: if ``True``, honor the ``sortby`` option
:param cursor: database cursor
:returns: query, query values, fields
:note: if a ``field_list`` is provided, a ``sortby`` field is always added
to the query, for sorting purposes.
"""
args = config['args']
if eventid is None:
eventid = getattr(args, 'eventid', None)
if eventid == 'ALL':
eventid = None
if eventid is not None:
# raise ValueError if eventid is not in database
cursor.execute('SELECT * FROM events WHERE evid = ?', (eventid,))
if not cursor.fetchall():
raise ValueError(f'Event {eventid} not found in database')
if version is None:
version = getattr(args, 'event_version', None)
where = getattr(args, 'where', None) if honor_where_filter else None
sortby = getattr(args, 'sortby', 'time') if honor_sortby else 'time'
# Track which fields were added for sorting purposes only
added_fields = []
if field_list is not None:
# always query sortby field and version, for sorting
fields = field_list[:]
if sortby not in fields:
fields.append(sortby)
added_fields.append(sortby)
if 'ver' not in fields:
fields.append('ver')
added_fields.append('ver')
query = f'SELECT {", ".join(fields)} FROM events'
else:
# read field names
cursor.execute('PRAGMA table_info(events)')
# we just need the field names, which are in the second column
fields = [f[1] for f in cursor.fetchall()]
query = 'SELECT * FROM events'
query_values = []
if where is not None:
where_filter, values = _process_where_option(where)
query = f'{query} WHERE {where_filter}'
query_values += values
if eventid is not None:
query += ' AND evid = ?' if 'WHERE' in query else ' WHERE evid = ?'
query_values.append(eventid)
if version is not None:
query += ' AND ver = ?' if 'WHERE' in query else ' WHERE ver = ?'
query_values.append(version)
return query, query_values, fields, added_fields
def _keep_latest_version(rows, fields):
"""
Keep only the latest version of each event in the list of rows.
:param rows: list of rows
:param fields: list of fields
:returns: list of kept rows
"""
evid_index = fields.index('evid')
ver_index = fields.index('ver')
evids = set()
rows_to_keep = []
for row in sorted(
rows, key=lambda r: (r[evid_index], r[ver_index]), reverse=True
):
evid = row[evid_index]
if evid not in evids:
rows_to_keep.append(row)
evids.add(evid)
return rows_to_keep
def _sort_rows(rows, fields, sortby='time', reverse=False, added_fields=None):
"""
Sort rows by a given field and version; reverse if needed.
:param rows: list of rows
:param fields: list of fields
:param sortby: field name to sort by (default: 'time')
:param reverse: if True, sort in reverse order
:param added_fields: list of fields that were added for sorting purposes
and should be removed after sorting
:returns: sorted fields, sorted rows
"""
if added_fields is None:
added_fields = []
try:
sortby_index = fields.index(sortby)
except ValueError:
# If sortby field doesn't exist, fall back to time
sortby = 'time'
sortby_index = fields.index(sortby)
ver_index = fields.index('ver')
rows.sort(key=lambda r: (r[sortby_index], r[ver_index]), reverse=reverse)
# Remove fields that were added for sorting purposes only
if added_fields:
# Count how many fields to remove from the end
n_to_remove = 0
for field in reversed(added_fields):
if not fields or fields[-1] != field:
break
n_to_remove += 1
fields = fields[:-1]
if n_to_remove > 0:
rows = [r[:-n_to_remove] for r in rows]
return fields, rows
[docs]
def read_fields_and_rows_from_db(
config, eventid=None, version=None, field_list=None,
honor_where_filter=True, honor_sortby=True, honor_reverse=True):
"""
Read fields and rows from database. Return a list of fields and a list of
rows. The rows are sorted by time and version.
:param config: config object
:param eventid: limit to events with this evid. If ``None``, then the
eventid is taken from the command line arguments via
``config['args']``
:param version: limit to events with this version. If ``None``, then the
version is taken from the command line arguments via
``config['args']``
:param field_list: list of fields to read from the database
:param honor_where_filter: if ``True``, honor the ``where`` option
:param honor_sortby: if ``True``, honor the ``sortby`` option
:param honor_reverse: if ``True``, honor the ``reverse`` option
:returns: list of fields, list of rows
:raises ValueError: if field is not found in database
"""
conn = _get_db_connection(config)
cursor = conn.cursor()
query, query_values, fields, added_fields = _build_query(
cursor, config, eventid, version, field_list, honor_where_filter,
honor_sortby)
try:
cursor.execute(query, query_values)
except sqlite3.OperationalError as e:
field = e.args[0].split()[-1]
raise ValueError(f'Field "{field}" not found in database') from e
rows = cursor.fetchall()
conn.close()
if not getattr(config['args'], 'allversions', True):
rows = _keep_latest_version(rows, fields)
sortby = getattr(config['args'], 'sortby', 'time')\
if honor_sortby else 'time'
reverse = getattr(config['args'], 'reverse', False)\
if honor_reverse else False
return _sort_rows(rows, fields, sortby, reverse, added_fields)
[docs]
def replicate_event_in_db(config, eventid, version=1):
"""
Replicate an event in the database. The new event will have the same
evid as the original event, but a different version.
:param config: config object
:param eventid: event id of the original event
:param version: version of the original event
:raises ValueError: if eventid/version is not found in database
"""
fields, rows = read_fields_and_rows_from_db(
config, eventid=eventid, version=version)
if not rows:
raise ValueError(
f'Event {eventid} version {version} not found in database')
row = list(rows[0])
# increment version
ver_index = fields.index('ver')
row[ver_index] += 1
conn = _get_db_connection(config)
c = conn.cursor()
while True:
try:
c.execute(
'INSERT INTO events VALUES '
f'({", ".join("?" * len(row))})', row)
break
except sqlite3.IntegrityError:
# version already exists, increment version and try again
row[ver_index] += 1
# close database connection
conn.commit()
print(f'Added event {eventid} version {row[ver_index]} to database')
[docs]
def delete_event_from_db(config, eventid, version=None):
"""
Delete an event from the database.
:param config: config object
:param eventid: event id of the event to delete
(if None, delete all events for the given version)
:param version: version of the event to delete
(if None, delete all versions of the event)
"""
msg = None
conn = _get_db_connection(config)
c = conn.cursor()
if eventid is None and version is None:
c.execute('DELETE FROM events')
msg = 'All events deleted from database'
elif eventid is None:
c.execute('DELETE FROM events WHERE ver = ?', (version,))
msg = f'All events of version {version} deleted from database'
if eventid is not None and version is not None:
c.execute(
'DELETE FROM events WHERE evid = ? AND ver = ?',
(eventid, version))
msg = f'Event {eventid} version {version} deleted from database'
elif eventid is not None:
c.execute('DELETE FROM events WHERE evid = ?', (eventid,))
msg = f'Event {eventid} deleted from database'
if msg is None:
# this should never happen
raise ValueError('Invalid combination of eventid and version')
# close database connection
conn.commit()
print(msg)
[docs]
def update_event_in_db(config, eventid, version, field, value):
"""
Update an event in the database.
:param config: config object
:param eventid: event id of the event to update
:param version: version of the event to update
:param field: field to update
:param value: new value
:raises ValueError: if field is not found in database
"""
conn = _get_db_connection(config)
c = conn.cursor()
try:
c.execute(
f'UPDATE events SET {field} = ? WHERE evid = ? AND ver = ?',
(value, eventid, version))
except sqlite3.OperationalError as e:
raise ValueError(f'Field "{field}" not found in database') from e
# close database connection
conn.commit()
print(
f'Updated field "{field}={value}" '
f'for event {eventid} version {version}')
[docs]
def increment_event_in_db(config, eventid, version, field, value):
"""
Increment an event in the database.
:param config: config object
:param eventid: event id of the event to update
:param version: version of the event to update
:param field: field to update
:param value: value to increment, must be a number
:raises ValueError: if field is not found in database,
or if value is not a number
"""
conn = _get_db_connection(config)
c = conn.cursor()
# check if value is numeric
try:
value = float(value)
except ValueError as e:
raise ValueError(f'Value "{value}" is not a number') from e
# if value is an integer, convert it to int
if value == int(value):
value = int(value)
# read old value from database and check if it is numeric
try:
c.execute(
f'SELECT {field} FROM events WHERE evid = ? AND ver = ?',
(eventid, version))
old_value = c.fetchone()[0]
try:
new_value = float(old_value) + value
if new_value == int(new_value):
new_value = int(new_value)
except ValueError as e:
raise ValueError(f'Field "{field}" is not a number') from e
except sqlite3.OperationalError as e:
raise ValueError(f'Field "{field}" not found in database') from e
# update database
try:
c.execute(
f'UPDATE events SET {field} = ? '
'WHERE evid = ? AND ver = ?',
(new_value, eventid, version))
except sqlite3.OperationalError as e:
raise ValueError(f'Field "{field}" not found in database') from e
# close database connection
conn.commit()
print(
f'Field "{field}" incremented by "{value}" '
f'for event {eventid} version {version}')
[docs]
def read_events_from_db(config, eventid=None, version=None):
"""
Read events from database. Return a list of events.
:param config: config object
:param eventid: limit to events with this evid
:param version: limit to events with this version
:returns: list of events, each event is a dictionary-like object
"""
# get fields and rows from database
# rows are sorted by time and version and reversed if requested
fields, rows = read_fields_and_rows_from_db(config, eventid, version)
events_list = EventList()
for event in rows:
event_dict = Event(zip(fields, event))
events_list.append(event_dict)
return events_list
[docs]
def read_evids_and_versions_from_db(config):
"""
Get a list of event ids and versions from the database.
This function only onors the ``allversions`` option
but not the ``where`` option.
:param config: config object
:returns: list of tuples (evid, version)
"""
_fields, rows = read_fields_and_rows_from_db(
config, field_list=['evid', 'ver'], honor_where_filter=False)
return rows
[docs]
def get_catalog_stats(config):
"""
Get a string with catalog statistics.
:param config: config object
:returns: string with catalog statistics
"""
events = read_events_from_db(config)
nevents = len(events)
tmin = min(event['time'] for event in events)
tmax = max(event['time'] for event in events)
tmin = tmin.strftime('%Y-%m-%dT%H:%M:%S')
tmax = tmax.strftime('%Y-%m-%dT%H:%M:%S')
stats_str = f'{nevents} events from {tmin} to {tmax}'
if mags := [event['mag'] for event in events if event['mag'] is not None]:
mag_min = min(mags)
mag_max = max(mags)
stats_str += f'\nMagnitude range: {mag_min:.1f} -- {mag_max:.1f}'
return stats_str