# -*- coding: utf8 -*-
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Read an event catalog from a CSV file.
This code is modified from Requake (https://github.com/SeismicSource/requake)
:copyright:
2021-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
GNU General Public License v3.0 or later
(https://www.gnu.org/licenses/gpl-3.0-standalone.html)
"""
import csv
import contextlib
from io import StringIO
import numpy as np
from obspy import UTCDateTime
from obspy.core.event import Origin, Event, Catalog, Magnitude
from ..utils import float_or_none, int_or_none
from .evid import generate_evid
def _read_first_lines(fp, max_lines=10):
"""Read and strip up to max_lines lines from a text file pointer."""
first_lines = []
for _ in range(max_lines):
if not (line := fp.readline()):
break
first_lines.append(line.strip())
return first_lines
def _is_csv_text_like(first_lines, delimiter=None):
"""Check if stripped text lines look like CSV-like tabular data."""
try:
first_line = first_lines[0]
except IndexError:
return False
if first_line.startswith('<?xml') or first_line.startswith('<'):
return False
delimiters =\
[delimiter] if delimiter is not None else [',', ';', '\t', ' ']
has_delimiters = any(
any(delim in line for line in first_lines[:5])
for delim in delimiters
)
if not has_delimiters:
return False
field_counts = []
for line in first_lines[:5]:
if not line:
continue
for delim in delimiters:
if delim in line:
fields = line.split() if delim == ' ' else line.split(delim)
if len(fields) > 1:
field_counts.append(len(fields))
break
if len(field_counts) < 2:
return False
min_fields = min(field_counts)
max_fields = max(field_counts)
return max_fields <= min_fields * 3
def _is_csv_like(filename, delimiter=None):
"""
Check if a file appears to be CSV or CSV-like format.
This performs basic checks to quickly determine if a file could be CSV,
tab-separated, or space-separated values format.
:param filename: input filename
:type filename: str
:param delimiter: optional delimiter to check for (if provided by user)
:type delimiter: str or None
:return: True if file appears to be CSV-like, False otherwise
:rtype: bool
"""
try:
with open(filename, 'rb') as fp:
# Read first few bytes to check if it's binary
first_bytes = fp.read(1024)
# Check for null bytes (indicates binary file)
if b'\x00' in first_bytes:
return False
with open(filename, 'r', encoding='utf8') as fp:
# Read first few lines
return _is_csv_text_like(
_read_first_lines(fp), delimiter=delimiter)
except (UnicodeDecodeError, OSError):
# If we can't read it as text, it's not CSV
return False
def _field_match_score(field, field_list):
"""
Return the length of the longest substring of field that matches any of
the field names in field_list.
:param field: field name
:type field: str
:param field_list: list of field names
:type field_list: list of str
:return: the length of the longest substring of field that matches any of
the field names in field_list
:rtype: int
"""
# return a very high score for a perfect match
if field.lower().strip() in field_list:
return 999
scores = [
len(guess)
for guess in field_list
if guess in field.lower().strip()
]
try:
return max(scores)
except ValueError:
return 0
def _remove_redundant_fields(output_fields, main_field, redundant_fields):
"""
Remove redundant fields from the output_fields dictionary.
"""
if output_fields[main_field] is not None:
for redundant_field in redundant_fields:
output_fields[redundant_field] = None
def _guess_field_names(input_fields):
"""
Guess the field names corresponding to origin time, latitude, longitude,
depth, magnitude and magnitude type.
:param input_fields: list of field names
:type input_fields: list of str
:return: a dictionary with field names for origin time, latitude,
longitude, depth, magnitude and magnitude type
:rtype: dict
"""
field_guesses = {
'evid': [
'evid', 'event_id', 'eventid', 'event_id', 'id', 'evidid',
'publicid', 'public_id', 'event_public_id', 'event_publicid',
'orid', 'origin_id', 'originid'
],
'date': [
'date', 'orig_date', 'origin_date', 'origin_date_utc',
'origin_date_iso'
],
'time': [
'time', 'orig_time', 'origin_time', 'origin_time_utc',
'origin_time_iso', 'datetime'
],
'year': ['year', 'yr', 'yyyy'],
'month': ['month', 'mon', 'mo', 'mm'],
'day': ['day', 'dy', 'dd'],
'hour': ['hour', 'hr', 'h', 'hh'],
'minute': ['minute', 'min'],
'seconds': ['seconds', 'second', 'sec', 's', 'ss'],
'lat': ['lat', 'latitude'],
'lon': ['lon', 'longitude'],
'depth': ['depth', 'depth_km', 'dep', 'evz'],
'mag': ['mag', 'magnitude', 'mw', 'ml'],
'mag_type': ['mag_type', 'magnitude_type'],
'event_type': ['event_type', 'ev_type'],
}
fields_to_ignore = ['rms', 'gap', 'scatter_volume']
# update the above lists with spaces instead of underscores
for values in field_guesses.values():
values.extend([val.replace('_', ' ') for val in values])
output_fields = {
# A None key must be present in the output dictionary
None: None,
'evid': None,
'date': None,
'time': None,
'year': None,
'month': None,
'day': None,
'hour': None,
'minute': None,
'seconds': None,
'lat': None,
'lon': None,
'depth': None,
'mag': None,
'mag_type': None,
'event_type': None,
}
output_field_scores = {field: 0 for field in output_fields}
for in_field in input_fields:
if in_field.lower() in fields_to_ignore:
continue
for field_name, guess_list in field_guesses.items():
score = _field_match_score(in_field, guess_list)
if score > output_field_scores[field_name]:
output_field_scores[field_name] = score
output_fields[field_name] = in_field
if all(v is None for v in output_fields.values()):
raise ValueError('Unable to identify any field')
# make a list of duplicated fields, which have been matched more than once
duplicated_fields = [
(key, value, output_field_scores[key])
for key, value in output_fields.items() if
value is not None
and list(output_fields.values()).count(value) > 1
]
# if there are duplicated fields, keep the one with the highest score
for _key, value, score in duplicated_fields:
for key2, value2 in output_fields.items():
if value2 == value and score > output_field_scores[key2]:
output_fields[key2] = None
# remove redundant fields
_remove_redundant_fields(
output_fields, 'date', ['year', 'month', 'day'])
_remove_redundant_fields(
output_fields, 'time', ['hour', 'minute', 'seconds'])
print('Columns identified ("column name" --> "identified name"):')
for in_field, matched_field in output_fields.items():
if in_field is None:
continue
if matched_field is None:
continue
print(f' "{matched_field}" --> "{in_field}"')
if (
output_fields['time'] is None
and None in (
output_fields['year'], output_fields['month'],
output_fields['day'], output_fields['hour'],
output_fields['minute'], output_fields['seconds']
)
):
raise ValueError(
'Unable to identify all the necessary date-time fields')
return output_fields
def _csv_file_info(filename):
"""
Determine the delimiter and the number of rows in a CSV file.
:param filename: input filename
:type filename: str
:return: a tuple with the delimiter and the number of rows
:rtype: tuple
"""
with open(filename, 'r', encoding='utf8') as fp:
nrows = sum(1 for _ in fp)
fp.seek(0)
n_first_lines = 5
first_lines = ''.join(fp.readline() for _ in range(n_first_lines))
# count the number of commas and semicolons in the first n lines
ncommas = first_lines.count(',')
nsemicolons = first_lines.count(';')
if ncommas >= n_first_lines:
delimiter = ','
elif nsemicolons >= n_first_lines:
delimiter = ';'
else:
delimiter = ' '
return delimiter, nrows
def _read_orig_time_from_ymdhms(row, fields):
"""
Try to build a date-time field from separated year, month, day, hour,
minute and seconds fields.
:param row: row from the CSV file
:type row: dict
:param fields: field names
:type fields: dict
:return: the origin time
:rtype: obspy.UTCDateTime
:raises ValueError: if the origin time cannot be parsed
"""
year = int_or_none(row[fields['year']])
# if year has two digits, assume it is in the 21st century
if year is not None and year < 100:
year += 2000
month = int_or_none(row[fields['month']])
day = int_or_none(row[fields['day']])
hour = int_or_none(row[fields['hour']])
minute = int_or_none(row[fields['minute']])
seconds = float_or_none(row[fields['seconds']])
return (
UTCDateTime(
year=year, month=month, day=day,
hour=hour, minute=minute, second=0
) + seconds
)
def _split_date_time(date_time_str):
"""
Split a date-time string into date and time components.
:param date_time_str: date-time string
:type date_time_str: str
:return: a tuple with the date and time components
:rtype: tuple of (str, str)
"""
if 'T' in date_time_str:
# ISO 8601 format, split on 'T'
return date_time_str.split('T', 1)
if ' ' in date_time_str:
val1, val2 = date_time_str.split(' ', 1)
else:
# cannot split
# return the original string as date and an empty string as time
return date_time_str, ''
# check for typical date separators
if any(c in ['-', '/'] for c in val1):
return val1, val2
if any(c in ['-', '/'] for c in val2):
return val2, val1
# check for typical time separators
if any(c in [':', '.'] for c in val1):
return val2, val1
if any(c in [':', '.'] for c in val2):
return val1, val2
# if we get here, we couldn't identify date and time components
# return the original string as date and an empty string as time
return date_time_str, ''
def _normalize_date_format(date):
"""
Normalize a date string by reversing day-month-year to year-month-day
format.
Handles both "-" and "/" as separators. Validates month (1-12),
day (1-31), and year ranges.
:param date: date string
:type date: str
:return: normalized date string
:rtype: str
"""
if not date:
return date
date = date.strip()
separator = '-' if '-' in date else ('/' if '/' in date else None)
if not separator:
return date
date_parts = date.split(separator)
if len(date_parts) != 3:
return date
# Check structure: 1-2 digit day/month, any length middle, 4 digit year
if len(date_parts[0]) > 2 or len(date_parts[2]) != 4:
return date
# Try to convert parts to integers for validation
try:
day_or_month = int(date_parts[0])
month_or_day = int(date_parts[1])
year = int(date_parts[2])
except ValueError:
return date
# Validate year is in reasonable range
if not 1800 <= year <= 2100:
return date
# Validate month is 1-12 and day is 1-31
# If first part is 1-31 and second is 1-12, it's day-month-year format
if 1 <= day_or_month <= 31 and 1 <= month_or_day <= 12:
return separator.join([date_parts[2], date_parts[1], date_parts[0]])
return date
def _read_orig_time_from_datetime(row, fields):
"""
Read the origin time from a date-time field.
:param row: row from the CSV file
:type row: dict
:param fields: field names
:type fields: dict
:return: the origin time
:rtype: obspy.UTCDateTime
:raises ValueError: if the origin time cannot be parsed
"""
date = row[fields['date']] if fields['date'] is not None else ''
time = row[fields['time']] if fields['time'] is not None else ''
if not date and not time:
raise ValueError('No date or time information found')
if not date or not time:
date, time = _split_date_time(date or time)
date = _normalize_date_format(date)
orig_time_str = f'{date} {time}'.strip()
try:
return UTCDateTime(orig_time_str)
except ValueError:
# One last try: check if the time is in the format
# YYYYMMDD.hhmmss.
# Replace the dot with a space, pad with zeros
# and try again
try:
return UTCDateTime(
orig_time_str.replace('.', ' ').ljust(15, '0')
)
except ValueError as e:
raise ValueError(
f'Unable to parse origin time: "{orig_time_str}"'
) from e
def _read_orig_time_from_row(row, fields):
"""
Read the origin time from a row.
:param row: row from the CSV file
:type row: dict
:param fields: field names
:type fields: dict
:return: the origin time
:rtype: obspy.UTCDateTime
:raises ValueError: if the origin time cannot be parsed
"""
return (
_read_orig_time_from_ymdhms(row, fields)
if fields['time'] is None
else _read_orig_time_from_datetime(row, fields)
)
def _normalize_no_values(no_value):
"""Normalize user-provided no-value markers for fast lookups."""
values = []
if no_value is not None:
values = [no_value] if isinstance(no_value, str) else no_value
text_markers = {
str(value).strip().lower()
for value in values
if value is not None and str(value).strip() != ''
}
# Always treat these case-insensitive tokens as missing values.
text_markers.update({'none', 'null'})
numeric_markers = {
float_val for float_val in
(float_or_none(value) for value in values)
if float_val is not None
}
return text_markers, numeric_markers
def _is_no_value(value, text_markers, numeric_markers):
"""Return True when value matches configured missing-value markers."""
if isinstance(value, str):
value = value.strip()
value_str = str(value).strip().lower()
if value_str in text_markers:
return True
value_float = float_or_none(value)
return value_float is not None and value_float in numeric_markers
def _apply_no_value_markers(row, text_markers, numeric_markers):
"""Convert configured no-value markers in a CSV row to None."""
if not text_markers and not numeric_markers:
return row
out_row = {}
for key, value in row.items():
if isinstance(value, str):
stripped = value.strip()
out_row[key] = (
None
if _is_no_value(stripped, text_markers, numeric_markers)
else stripped
)
else:
out_row[key] = (
None
if _is_no_value(value, text_markers, numeric_markers)
else value
)
return out_row
def _read_csv_row(row, fields, depth_units, mag_type, extra_column_names=None):
"""
Read a row from a CSV file.
:param row: row from the CSV file
:type row: dict
:param fields: field names
:type fields: dict
:param depth_units: depth units (m or km)
:type depth_units: str
:param mag_type: magnitude type
:type mag_type: str
:return: an ObsPy event object
:rtype: obspy.Event
"""
# this is needed to manage the case where a field name is None
row[None] = None
# check if origin time is parasable, or die trying
orig_time = _read_orig_time_from_row(row, fields)
ev = Event()
_evid = row[fields['evid']]
ev.resource_id = (
generate_evid(orig_time) if _evid is None
else str(_evid).strip()
)
evtype = row[fields['event_type']]
if evtype not in [None, '']:
try:
ev.event_type = row[fields['event_type']]
except ValueError:
print(f'Ignoring unknown event type: {ev.event_type}')
orig = Origin()
orig.time = orig_time
orig.longitude = float_or_none(row[fields['lon']])
orig.latitude = float_or_none(row[fields['lat']])
orig.depth = float_or_none(row[fields['depth']])
if depth_units == 'km':
orig.depth *= 1000
ev.origins.append(orig)
ev.preferred_origin_id = orig.resource_id
mag = Magnitude()
mag.magnitude_type = row[fields['mag_type']]
if mag.magnitude_type is None:
mag.magnitude_type = mag_type
mag.mag = float_or_none(row[fields['mag']])
ev.magnitudes.append(mag)
ev.preferred_magnitude_id = mag.resource_id
if extra_column_names:
# Store CSV extra fields so database write can map them to columns.
extra = getattr(ev, 'extra', {}) or {}
for column_name in extra_column_names:
extra[column_name] = {
'value': row.get(column_name),
'namespace': 'seiscat'
}
ev.extra = extra
return ev
def _read_csv(
fp, delimiter, column_names, nrows, depth_units, no_value=None,
import_extra_columns=False,
):
"""
Read a catalog from a CSV file.
:param fp: file pointer
:type fp: file object
:param delimiter: CSV delimiter
:type delimiter: str
:param nrows: number of rows in the CSV file
:type column_names: list of str
:param nrows: list of column names
:type nrows: int
:param depth_units: depth units (m or km)
:type depth_units: str
:param no_value: list of values/strings considered missing values
:type no_value: list of str or None
:return: an ObsPy catalog object
:rtype: obspy.Catalog
"""
if delimiter == ' ':
# if the delimiter is a space,
# remove possible multiple spaces between fields
updated_lines = [
' '.join(line.split()) for line in fp
]
updated_lines = '\n'.join(updated_lines)
# generate a new file pointer
fp = StringIO(updated_lines)
reader = csv.DictReader(
fp, delimiter=delimiter, skipinitialspace=True,
fieldnames=column_names)
text_markers, numeric_markers = _normalize_no_values(no_value)
fields = _guess_field_names(reader.fieldnames)
matched_field_names = {
field_name for field_name in fields.values() if field_name is not None
}
extra_column_names = [
field_name for field_name in reader.fieldnames
if field_name not in matched_field_names
] if import_extra_columns else []
if extra_column_names:
print('Additional CSV columns to import:')
for field_name in extra_column_names:
print(f' "{field_name}"')
# if magtype is missing, try to guess it from the magnitude field name
mag_type = None
if fields['mag_type'] is None:
mag_field = fields['mag']
if mag_field is not None and mag_field.lower() in ['mw', 'ml']:
mag_type = mag_field
if column_names is None:
nrows -= 1 # first row is the header
cat = Catalog()
for n, row in enumerate(reader):
print(f'reading row {n + 1}/{nrows}\r', end='')
row = _apply_no_value_markers(row, text_markers, numeric_markers)
try:
ev = _read_csv_row(
row, fields, depth_units, mag_type, extra_column_names)
except (ValueError, TypeError) as e:
print(f'Error at row {n + 1}: {e}')
continue
cat.append(ev)
print() # needed to add a newline after the last "reading row" message
if extra_column_names:
setattr(cat, 'seiscat_extra_column_names', extra_column_names)
return cat
[docs]
def read_catalog_from_csv(config, filename=None):
"""
Read a catalog from a CSV file.
:param config: configuration object
:type config: dict
:param filename: CSV filename (if None, uses args.fromfile[0])
:type filename: str or None
:return: an ObsPy catalog object
:rtype: obspy.Catalog
:raises FileNotFoundError: if filename does not exist
:raises ValueError: if depth units are invalid or
file is not CSV-like or no origin time field is found
"""
args = config['args']
if args.depth_units not in [None, 'km', 'm']:
raise ValueError(f'Invalid depth_units: {args.depth_units}')
# Support both filename parameter and args.fromfile list
csv_filename = filename if filename is not None else args.fromfile[0]
# Quickly return FileNotFoundError if file does not exist, before doing
# any other checks
try:
with open(csv_filename, 'r', encoding='utf8'):
pass
except FileNotFoundError as e:
raise FileNotFoundError(
f'CSV file not found: {csv_filename}'
) from e
# Check if file appears to be CSV-like format
# Pass user-provided delimiter if available
if not _is_csv_like(csv_filename, delimiter=args.delimiter):
delimiter_msg = (
f'with delimiter "{args.delimiter}"' if args.delimiter
else 'in CSV, tab-separated, or space-separated format'
)
raise ValueError(
f'File {csv_filename} does not appear to be {delimiter_msg}'
)
guess_delimiter, nrows = _csv_file_info(csv_filename)
delimiter = args.delimiter or guess_delimiter
print(f'CSV delimiter: "{delimiter}"')
print(f'CSV number of rows: {nrows}')
with open(csv_filename, 'r', encoding='utf8') as fp:
cat = _read_csv(
fp, delimiter, args.column_names, nrows,
args.depth_units, args.no_value,
import_extra_columns=getattr(args, 'csv_extra_columns', False))
if args.depth_units is None:
# If catalog's maximum depth is too small, assume it is in kilometers
# and convert it to meters
depths = np.array(
[ev.origins[0].depth for ev in cat], dtype=np.float64
)
# if all depths are NaN, skip the check
if np.isnan(depths).all():
return cat
max_depth = np.nanmax(depths)
if np.isnan(max_depth):
return cat
if max_depth < 500:
print(
'Assuming depths are in kilometers, you can specify '
'--depth_units in the command line to avoid this check')
for ev in cat:
# suppress errors in case depth is missing or not a number
with contextlib.suppress(AttributeError, TypeError):
ev.origins[0].depth *= 1000
else:
print(
'Assuming depths are in meters, you can specify '
'--depth_units in the command line to avoid this check')
return cat