Source code for seiscat.database.editdb

# -*- coding: utf8 -*-
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Edit functions for seiscat.

:copyright:
    2022-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
    GNU General Public License v3.0 or later
    (https://www.gnu.org/licenses/gpl-3.0-standalone.html)
"""
from ..utils import err_exit
from .dbfunctions import (
    read_fields_and_rows_from_db, replicate_event_in_db,
    delete_event_from_db, update_event_in_db, increment_event_in_db,
    add_column_to_db, delete_column_from_db, rename_column_in_db,
    DEFAULT_COLUMNS)


def _are_you_sure(msg):
    """
    Ask user if she is sure to proceed.

    :param msg: message to print
    """
    print(msg)
    answer = input('Are you sure? [y/N] ')
    if answer.lower() != 'y':
        err_exit('Aborted')


def _parse_set_arg(arg):
    """
    Parse "--set" argument.

    :param arg: argument string
    :return: key, value
    """
    if '=' not in arg:
        err_exit(
            f'Invalid argument "{arg}" for "--set". '
            'Argument must be in the form "key=value"')
    key, val = arg.split('=')
    # sanitize key and val
    key = key.strip()
    val = val.strip()
    return key, val


def _replicate(config, fields, rows):
    """
    Replicate event in database.

    :param config: config object
    :param fields: list of fields
    :param rows: list of rows
    """
    for row in rows:
        eventid = row[fields.index('evid')]
        version = row[fields.index('ver')]
        replicate_event_in_db(config, eventid, version)


def _delete(config, fields, rows, eventid, version, args):
    """
    Delete event from database.

    :param config: config object
    :param fields: list of fields
    :param rows: list of rows
    :param eventid: event ID
    :param version: event version
    :param args: parsed arguments
    """
    if eventid:
        version = rows[0][fields.index('ver')]
        if not args.force:
            _are_you_sure(
                f'Delete event {eventid} version {version} from database?')
    elif version and not args.force:
        _are_you_sure(
            f'Delete ALL events of version {version} from database?')
    elif not version and not args.force:
        _are_you_sure('Delete ALL events from database?')
    delete_event_from_db(config, eventid, version)


def _set(config, fields, rows, key_values, args):
    """
    Set key-value pairs in database.

    :param config: config object
    :param fields: list of fields
    :param rows: list of rows
    :param key_values: list of key-value pairs
    :param args: parsed arguments
    """
    if len(rows) == 1 and not args.force:
        eventid = rows[0][fields.index('evid')]
        version = rows[0][fields.index('ver')]
        _are_you_sure(
            f'Update event {eventid} version {version} in database?')
    elif not args.force:
        _are_you_sure(
            f'Update {len(rows)} events in database?')
    for row in rows:
        eventid = row[fields.index('evid')]
        version = row[fields.index('ver')]
        key_values = [_parse_set_arg(arg) for arg in args.set]
        for key, val in key_values:
            update_event_in_db(config, eventid, version, key, val)


def _increment(config, fields, rows, key_values, args):
    """
    Increment key-value pairs in database.

    :param config: config object
    :param fields: list of fields
    :param rows: list of rows
    :param key_values: list of key-value pairs
    :param args: parsed arguments
    """
    if len(rows) == 1 and not args.force:
        eventid = rows[0][fields.index('evid')]
        version = rows[0][fields.index('ver')]
        _are_you_sure(
            f'Update event {eventid} version {version} in database?')
    elif not args.force:
        _are_you_sure(
            f'Update {len(rows)} events in database?')
    for row in rows:
        eventid = row[fields.index('evid')]
        version = row[fields.index('ver')]
        key_values = [_parse_set_arg(arg) for arg in args.increment]
        for key, val in key_values:
            increment_event_in_db(config, eventid, version, key, val)


def _edit_columns(config, args):
    """Edit table columns (add, delete, rename)."""
    if args.eventid is not None or args.event_version is not None:
        err_exit(
            'Column operations act on the whole table and do not accept '
            'eventid/event_version arguments')
    if args.where is not None:
        err_exit('Column operations do not accept --where filters')
    if args.add_column:
        if not args.force:
            _are_you_sure(f'Add column "{args.add_column}" to database?')
        add_column_to_db(config, args.add_column)
    elif args.delete_column:
        column_name = args.delete_column.strip()
        if column_name in DEFAULT_COLUMNS:
            err_exit(
                f'Column "{column_name}" is protected and cannot be deleted')
        if not args.force:
            _are_you_sure(
                f'Delete column "{args.delete_column}" from database?')
        delete_column_from_db(config, args.delete_column)
    elif args.rename_column:
        if '=' not in args.rename_column:
            err_exit(
                f'Invalid argument "{args.rename_column}" '
                'for "--rename-column". '
                'Argument must be in the form "old_name=new_name"')
        old_name = args.rename_column.split('=', 1)[0].strip()
        if old_name in DEFAULT_COLUMNS:
            err_exit(
                f'Column "{old_name}" is protected and cannot be renamed')
        if not args.force:
            _are_you_sure(
                f'Rename column "{args.rename_column}" in database?')
        rename_column_in_db(config, args.rename_column)


[docs] def editdb(config): """ Edit database. :param config: config object """ args = config['args'] actions = [ bool(args.replicate), bool(args.delete), bool(args.set), bool(args.increment), bool(args.add_column), bool(args.delete_column), bool(args.rename_column), ] if sum(actions) > 1: err_exit('Only one edit action can be specified at a time') if sum(actions) == 0: err_exit('No action specified. See "seiscat editdb -h" for help') if args.add_column or args.delete_column or args.rename_column: try: _edit_columns(config, args) return except ValueError as msg: err_exit(msg) eventid = args.eventid if eventid == 'ALL': eventid = None version = args.event_version try: fields, rows = read_fields_and_rows_from_db(config) except (FileNotFoundError, ValueError) as msg: err_exit(msg) if not rows: if eventid and version: msg = f'Event {eventid} version {version} not found in database' elif eventid: msg = f'Event {eventid} not found in database' else: msg = 'No events found in database' err_exit(msg) if eventid and len(rows) > 1: err_exit( f'Event {eventid} has {len(rows)} versions, ' 'please specify the version number after the event ID' ) try: if args.replicate: _replicate(config, fields, rows) elif args.delete: _delete(config, fields, rows, eventid, version, args) elif args.set: _set(config, fields, rows, args.set, args) elif args.increment: _increment(config, fields, rows, args.increment, args) except ValueError as msg: err_exit(msg)