# -*- coding: utf8 -*-
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Configuration functions for seiscat.
:copyright:
2022-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
GNU General Public License v3.0 or later
(https://www.gnu.org/licenses/gpl-3.0-standalone.html)
"""
import os
import sys
from .configobj import ConfigObj
from .configobj.validate import Validator
from ..utils import err_exit, ExceptionExit, write_ok
[docs]
def parse_configspec():
"""
Parse the configspec file.
:returns: configspec object
"""
curdir = os.path.dirname(__file__)
configspec_file = os.path.join(curdir, 'configspec.conf')
return read_config(configspec_file)
[docs]
def write_sample_config(configspec, progname):
"""
Write a sample config file.
:param configspec: configspec object
:param progname: program name
"""
c = ConfigObj(configspec=configspec, default_encoding='utf8')
val = Validator()
c.validate(val)
c.defaults = []
c.initial_comment = configspec.initial_comment
c.comments = configspec.comments
c.final_comment = configspec.final_comment
configfile = f'{progname}.conf'
if write_ok(configfile):
with open(configfile, 'wb') as fp:
c.write(fp)
print(f'Sample config file written to: "{configfile}"')
[docs]
def read_config(config_file, configspec=None):
"""
Read a config file.
:param config_file: path to the config file
:param configspec: configspec object
:returns: config object
"""
kwargs = dict(
configspec=configspec, file_error=True, default_encoding='utf8')
if configspec is None:
kwargs |= dict(interpolation=False, list_values=False, _inspec=True)
with ExceptionExit(additional_msg=f'Unable to read "{config_file}"'):
config_obj = ConfigObj(config_file, **kwargs)
for k, v in config_obj.items():
if v == 'None':
config_obj[k] = None
if configspec is not None:
_validate_config(config_obj)
_resolve_path_keys(config_obj, config_file)
return config_obj
def _validate_config(config_obj):
"""
Validate the config object.
:param config_obj: config object
"""
configspec = config_obj.configspec
config_obj_keys = list(config_obj.keys())
configspec_keys = list(configspec.keys())
# extend configspec with keys ending with _n, if present in config_obj
n = 1
while True:
matching_keys = [k for k in config_obj_keys if k.endswith(f'_{n}')]
if not matching_keys:
break
for k in configspec_keys:
configspec[f'{k}_{n}'] = configspec[k]
n += 1
config_obj.configspec = configspec
val = Validator()
test = config_obj.validate(val)
if isinstance(test, dict):
for entry in test:
if not test[entry]:
sys.stderr.write(
f'Invalid value for "{entry}": "{config_obj[entry]}"\n')
sys.exit(1)
if not test:
err_exit('No configuration value present!')
# Keys whose values are file/directory paths that should be resolved
# relative to the config file's directory when they are not absolute.
# waveform_dir and station_dir are event-subdirectory names and must stay
# relative to each per-event folder.
_PATH_KEYS = ('db_file', 'event_dir')
def _resolve_path_keys(config_obj, config_file):
"""
Resolve relative path values in config_obj against the config file's
parent directory, so seiscat works correctly regardless of the current
working directory (e.g. when launched by launchd/systemd).
:param config_obj: validated config object
:param config_file: path to the config file that was read
"""
config_dir = os.path.dirname(os.path.abspath(config_file))
for key in _PATH_KEYS:
value = config_obj.get(key)
if value and not os.path.isabs(value):
config_obj[key] = os.path.join(config_dir, value)