Source code for EQTransformer.core.tester

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Apr 25 17:44:14 2018

@author: mostafamousavi
last update: 05/27/2021

"""

from __future__ import print_function
import os
os.environ['KERAS_BACKEND']='tensorflow'
from tensorflow.keras import backend as K
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
import numpy as np
import csv
import h5py
import time
import shutil
from .EqT_utils import f1, SeqSelfAttention, FeedForward, LayerNormalization
from .EqT_utils import generate_arrays_from_file, picker
from .EqT_utils import DataGeneratorTest, PreLoadGeneratorTest
np.warnings.filterwarnings('ignore')
import datetime
from tqdm import tqdm
from tensorflow.python.util import deprecation
deprecation._PRINT_DEPRECATION_WARNINGS = False


[docs]def tester(input_hdf5=None, input_testset=None, input_model=None, output_name=None, detection_threshold=0.20, P_threshold=0.1, S_threshold=0.1, number_of_plots=100, estimate_uncertainty=True, number_of_sampling=5, loss_weights=[0.05, 0.40, 0.55], loss_types=['binary_crossentropy', 'binary_crossentropy', 'binary_crossentropy'], input_dimention=(6000, 3), normalization_mode='std', mode='generator', batch_size=500, gpuid=None, gpu_limit=None): """ Applies a trained model to a windowed waveform to perform both detection and picking at the same time. Parameters ---------- input_hdf5: str, default=None Path to an hdf5 file containing only one class of "data" with NumPy arrays containing 3 component waveforms each 1 min long. input_testset: npy, default=None Path to a NumPy file (automaticaly generated by the trainer) containing a list of trace names. input_model: str, default=None Path to a trained model. output_dir: str, default=None Output directory that will be generated. output_probabilities: bool, default=False If True, it will output probabilities and estimated uncertainties for each trace into an HDF file. detection_threshold : float, default=0.3 A value in which the detection probabilities above it will be considered as an event. P_threshold: float, default=0.1 A value which the P probabilities above it will be considered as P arrival. S_threshold: float, default=0.1 A value which the S probabilities above it will be considered as S arrival. number_of_plots: float, default=10 The number of plots for detected events outputed for each station data. estimate_uncertainty: bool, default=False If True uncertainties in the output probabilities will be estimated. number_of_sampling: int, default=5 Number of sampling for the uncertainty estimation. loss_weights: list, default=[0.03, 0.40, 0.58] Loss weights for detection, P picking, and S picking respectively. loss_types: list, default=['binary_crossentropy', 'binary_crossentropy', 'binary_crossentropy'] Loss types for detection, P picking, and S picking respectively. input_dimention: tuple, default=(6000, 3) Loss types for detection, P picking, and S picking respectively. normalization_mode: str, default='std' Mode of normalization for data preprocessing, 'max', maximum amplitude among three components, 'std', standard deviation. mode: str, default='generator' Mode of running. 'pre_load_generator' or 'generator'. batch_size: int, default=500 Batch size. This wont affect the speed much but can affect the performance. A value beteen 200 to 1000 is recommanded. gpuid: int, default=None Id of GPU used for the prediction. If using CPU set to None. gpu_limit: int, default=None Set the maximum percentage of memory usage for the GPU. Returns -------- ./output_name/X_test_results.csv: A table containing all the detection, and picking results. Duplicated events are already removed. ./output_name/X_report.txt: A summary of the parameters used for prediction and performance. ./output_name/figures: A folder containing plots detected events and picked arrival times. Notes -------- Estimating the uncertainties requires multiple predictions and will increase the computational time. """ args = { "input_hdf5": input_hdf5, "input_testset": input_testset, "input_model": input_model, "output_name": output_name, "detection_threshold": detection_threshold, "P_threshold": P_threshold, "S_threshold": S_threshold, "number_of_plots": number_of_plots, "estimate_uncertainty": estimate_uncertainty, "number_of_sampling": number_of_sampling, "loss_weights": loss_weights, "loss_types": loss_types, "input_dimention": input_dimention, "normalization_mode": normalization_mode, "mode": mode, "batch_size": batch_size, "gpuid": gpuid, "gpu_limit": gpu_limit } if args['gpuid']: os.environ['CUDA_VISIBLE_DEVICES'] = '{}'.format(args['gpuid']) tf.Session(config=tf.ConfigProto(log_device_placement=True)) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.per_process_gpu_memory_fraction = float(args['gpu_limit']) K.tensorflow_backend.set_session(tf.Session(config=config)) save_dir = os.path.join(os.getcwd(), str(args['output_name'])+'_outputs') save_figs = os.path.join(save_dir, 'figures') if os.path.isdir(save_dir): shutil.rmtree(save_dir) os.makedirs(save_figs) test = np.load(args['input_testset']) print('Loading the model ...', flush=True) model = load_model(args['input_model'], custom_objects={'SeqSelfAttention': SeqSelfAttention, 'FeedForward': FeedForward, 'LayerNormalization': LayerNormalization, 'f1': f1 }) model.compile(loss = args['loss_types'], loss_weights = args['loss_weights'], optimizer = Adam(lr = 0.001), metrics = [f1]) print('Loading is complete!', flush=True) print('Testing ...', flush=True) print('Writting results into: " ' + str(args['output_name'])+'_outputs'+' "', flush=True) start_training = time.time() csvTst = open(os.path.join(save_dir,'X_test_results.csv'), 'w') test_writer = csv.writer(csvTst, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) test_writer.writerow(['network_code', 'ID', 'earthquake_distance_km', 'snr_db', 'trace_name', 'trace_category', 'trace_start_time', 'source_magnitude', 'p_arrival_sample', 'p_status', 'p_weight', 's_arrival_sample', 's_status', 's_weight', 'receiver_type', 'number_of_detections', 'detection_probability', 'detection_uncertainty', 'P_pick', 'P_probability', 'P_uncertainty', 'P_error', 'S_pick', 'S_probability', 'S_uncertainty', 'S_error' ]) csvTst.flush() plt_n = 0 list_generator = generate_arrays_from_file(test, args['batch_size']) pbar_test = tqdm(total= int(np.ceil(len(test)/args['batch_size']))) for _ in range(int(np.ceil(len(test) / args['batch_size']))): pbar_test.update() new_list = next(list_generator) if args['mode'].lower() == 'pre_load_generator': params_test = {'dim': args['input_dimention'][0], 'batch_size': len(new_list), 'n_channels': args['input_dimention'][-1], 'norm_mode': args['normalization_mode']} test_set={} fl = h5py.File(args['input_hdf5'], 'r') for ID in new_list: if ID.split('_')[-1] == 'EV': dataset = fl.get('data/'+str(ID)) elif ID.split('_')[-1] == 'NO': dataset = fl.get('data/'+str(ID)) test_set.update( {str(ID) : dataset}) test_generator = PreLoadGeneratorTest(new_list, test_set, **params_test) if args['estimate_uncertainty']: pred_DD = [] pred_PP = [] pred_SS = [] for mc in range(args['number_of_sampling']): predD, predP, predS = model.predict_generator(test_generator) pred_DD.append(predD) pred_PP.append(predP) pred_SS.append(predS) pred_DD = np.array(pred_DD).reshape(args['number_of_sampling'], len(new_list), params_test['dim']) pred_DD_mean = pred_DD.mean(axis=0) pred_DD_std = pred_DD.std(axis=0) pred_PP = np.array(pred_PP).reshape(args['number_of_sampling'], len(new_list), params_test['dim']) pred_PP_mean = pred_PP.mean(axis=0) pred_PP_std = pred_PP.std(axis=0) pred_SS = np.array(pred_SS).reshape(args['number_of_sampling'], len(new_list), params_test['dim']) pred_SS_mean = pred_SS.mean(axis=0) pred_SS_std = pred_SS.std(axis=0) else: pred_DD_mean, pred_PP_mean, pred_SS_mean = model.predict_generator(test_generator) pred_DD_mean = pred_DD_mean.reshape(pred_DD_mean.shape[0], pred_DD_mean.shape[1]) pred_PP_mean = pred_PP_mean.reshape(pred_PP_mean.shape[0], pred_PP_mean.shape[1]) pred_SS_mean = pred_SS_mean.reshape(pred_SS_mean.shape[0], pred_SS_mean.shape[1]) pred_DD_std = np.zeros((pred_DD_mean.shape)) pred_PP_std = np.zeros((pred_PP_mean.shape)) pred_SS_std = np.zeros((pred_SS_mean.shape)) for ts in range(pred_DD_mean.shape[0]): evi = new_list[ts] dataset = test_set[evi] try: spt = int(dataset.attrs['p_arrival_sample']); except Exception: spt = None try: sst = int(dataset.attrs['s_arrival_sample']); except Exception: sst = None matches, pick_errors, yh3 = picker(args, pred_DD_mean[ts], pred_PP_mean[ts], pred_SS_mean[ts], pred_DD_std[ts], pred_PP_std[ts], pred_SS_std[ts], spt, sst) _output_writter_test(args, dataset, evi, test_writer, csvTst, matches, pick_errors) if plt_n < args['number_of_plots']: _plotter(ts, dataset, evi, args, save_figs, pred_DD_mean[ts], pred_PP_mean[ts], pred_SS_mean[ts], pred_DD_std[ts], pred_PP_std[ts], pred_SS_std[ts], matches) plt_n += 1 else: params_test = {'file_name': str(args['input_hdf5']), 'dim': args['input_dimention'][0], 'batch_size': len(new_list), 'n_channels': args['input_dimention'][-1], 'norm_mode': args['normalization_mode']} test_generator = DataGeneratorTest(new_list, **params_test) if args['estimate_uncertainty']: pred_DD = [] pred_PP = [] pred_SS = [] for mc in range(args['number_of_sampling']): predD, predP, predS = model.predict_generator(generator=test_generator) pred_DD.append(predD) pred_PP.append(predP) pred_SS.append(predS) pred_DD = np.array(pred_DD).reshape(args['number_of_sampling'], len(new_list), params_test['dim']) pred_DD_mean = pred_DD.mean(axis=0) pred_DD_std = pred_DD.std(axis=0) pred_PP = np.array(pred_PP).reshape(args['number_of_sampling'], len(new_list), params_test['dim']) pred_PP_mean = pred_PP.mean(axis=0) pred_PP_std = pred_PP.std(axis=0) pred_SS = np.array(pred_SS).reshape(args['number_of_sampling'], len(new_list), params_test['dim']) pred_SS_mean = pred_SS.mean(axis=0) pred_SS_std = pred_SS.std(axis=0) else: pred_DD_mean, pred_PP_mean, pred_SS_mean = model.predict_generator(generator=test_generator) pred_DD_mean = pred_DD_mean.reshape(pred_DD_mean.shape[0], pred_DD_mean.shape[1]) pred_PP_mean = pred_PP_mean.reshape(pred_PP_mean.shape[0], pred_PP_mean.shape[1]) pred_SS_mean = pred_SS_mean.reshape(pred_SS_mean.shape[0], pred_SS_mean.shape[1]) pred_DD_std = np.zeros((pred_DD_mean.shape)) pred_PP_std = np.zeros((pred_PP_mean.shape)) pred_SS_std = np.zeros((pred_SS_mean.shape)) test_set={} fl = h5py.File(args['input_hdf5'], 'r') for ID in new_list: if ID.split('_')[-1] == 'EV': dataset = fl.get('data/'+str(ID)) elif ID.split('_')[-1] == 'NO': dataset = fl.get('data/'+str(ID)) test_set.update( {str(ID) : dataset}) for ts in range(pred_DD_mean.shape[0]): evi = new_list[ts] dataset = test_set[evi] try: spt = int(dataset.attrs['p_arrival_sample']); except Exception: spt = None try: sst = int(dataset.attrs['s_arrival_sample']); except Exception: sst = None matches, pick_errors, yh3=picker(args, pred_DD_mean[ts], pred_PP_mean[ts], pred_SS_mean[ts], pred_DD_std[ts], pred_PP_std[ts], pred_SS_std[ts], spt, sst) _output_writter_test(args,dataset, evi, test_writer, csvTst, matches, pick_errors) if plt_n < args['number_of_plots']: _plotter(dataset, evi, args, save_figs, pred_DD_mean[ts], pred_PP_mean[ts], pred_SS_mean[ts], pred_DD_std[ts], pred_PP_std[ts], pred_SS_std[ts], matches) plt_n += 1 end_training = time.time() delta = end_training - start_training hour = int(delta / 3600) delta -= hour * 3600 minute = int(delta / 60) delta -= minute * 60 seconds = delta with open(os.path.join(save_dir,'X_report.txt'), 'a') as the_file: the_file.write('================== Overal Info =============================='+'\n') the_file.write('date of report: '+str(datetime.datetime.now())+'\n') the_file.write('input_hdf5: '+str(args['input_hdf5'])+'\n') the_file.write('input_testset: '+str(args['input_testset'])+'\n') the_file.write('input_model: '+str(args['input_model'])+'\n') the_file.write('output_name: '+str(args['output_name']+'_outputs')+'\n') the_file.write('================== Testing Parameters ======================='+'\n') the_file.write('mode: '+str(args['mode'])+'\n') the_file.write('finished the test in: {} hours and {} minutes and {} seconds \n'.format(hour, minute, round(seconds, 2))) the_file.write('loss_types: '+str(args['loss_types'])+'\n') the_file.write('loss_weights: '+str(args['loss_weights'])+'\n') the_file.write('batch_size: '+str(args['batch_size'])+'\n') the_file.write('total number of tests '+str(len(test))+'\n') the_file.write('gpuid: '+str(args['gpuid'])+'\n') the_file.write('gpu_limit: '+str(args['gpu_limit'])+'\n') the_file.write('================== Other Parameters ========================='+'\n') the_file.write('normalization_mode: '+str(args['normalization_mode'])+'\n') the_file.write('estimate uncertainty: '+str(args['estimate_uncertainty'])+'\n') the_file.write('number of Monte Carlo sampling: '+str(args['number_of_sampling'])+'\n') the_file.write('detection_threshold: '+str(args['detection_threshold'])+'\n') the_file.write('P_threshold: '+str(args['P_threshold'])+'\n') the_file.write('S_threshold: '+str(args['S_threshold'])+'\n') the_file.write('number_of_plots: '+str(args['number_of_plots'])+'\n')
def _output_writter_test(args, dataset, evi, output_writer, csvfile, matches, pick_errors, ): """ Writes the detection & picking results into a CSV file. Parameters ---------- args: dic A dictionary containing all of the input parameters. dataset: hdf5 obj Dataset object of the trace. evi: str Trace name. output_writer: obj For writing out the detection/picking results in the CSV file. csvfile: obj For writing out the detection/picking results in the CSV file. matches: dic Contains the information for the detected and picked event. pick_errors: dic Contains prediction errors for P and S picks. Returns -------- X_test_results.csv """ numberOFdetections = len(matches) if numberOFdetections != 0: D_prob = matches[list(matches)[0]][1] D_unc = matches[list(matches)[0]][2] P_arrival = matches[list(matches)[0]][3] P_prob = matches[list(matches)[0]][4] P_unc = matches[list(matches)[0]][5] P_error = pick_errors[list(matches)[0]][0] S_arrival = matches[list(matches)[0]][6] S_prob = matches[list(matches)[0]][7] S_unc = matches[list(matches)[0]][8] S_error = pick_errors[list(matches)[0]][1] else: D_prob = None D_unc = None P_arrival = None P_prob = None P_unc = None P_error = None S_arrival = None S_prob = None S_unc = None S_error = None if evi.split('_')[-1] == 'EV': network_code = dataset.attrs['network_code'] source_id = dataset.attrs['source_id'] source_distance_km = dataset.attrs['source_distance_km'] snr_db = np.mean(dataset.attrs['snr_db']) trace_name = dataset.attrs['trace_name'] trace_category = dataset.attrs['trace_category'] trace_start_time = dataset.attrs['trace_start_time'] source_magnitude = dataset.attrs['source_magnitude'] p_arrival_sample = dataset.attrs['p_arrival_sample'] p_status = dataset.attrs['p_status'] p_weight = dataset.attrs['p_weight'] s_arrival_sample = dataset.attrs['s_arrival_sample'] s_status = dataset.attrs['s_status'] s_weight = dataset.attrs['s_weight'] receiver_type = dataset.attrs['receiver_type'] elif evi.split('_')[-1] == 'NO': network_code = dataset.attrs['network_code'] source_id = None source_distance_km = None snr_db = None trace_name = dataset.attrs['trace_name'] trace_category = dataset.attrs['trace_category'] trace_start_time = None source_magnitude = None p_arrival_sample = None p_status = None p_weight = None s_arrival_sample = None s_status = None s_weight = None receiver_type = dataset.attrs['receiver_type'] if P_unc: P_unc = round(P_unc, 3) output_writer.writerow([network_code, source_id, source_distance_km, snr_db, trace_name, trace_category, trace_start_time, source_magnitude, p_arrival_sample, p_status, p_weight, s_arrival_sample, s_status, s_weight, receiver_type, numberOFdetections, D_prob, D_unc, P_arrival, P_prob, P_unc, P_error, S_arrival, S_prob, S_unc, S_error, ]) csvfile.flush() def _plotter(dataset, evi, args, save_figs, yh1, yh2, yh3, yh1_std, yh2_std, yh3_std, matches): """ Generates plots. Parameters ---------- dataset: obj The hdf5 obj containing a NumPy array of 3 component data and associated attributes. evi: str Trace name. args: dic A dictionary containing all of the input parameters. save_figs: str Path to the folder for saving the plots. yh1: 1D array Detection probabilities. yh2: 1D array P arrival probabilities. yh3: 1D array S arrival probabilities. yh1_std: 1D array Detection standard deviations. yh2_std: 1D array P arrival standard deviations. yh3_std: 1D array S arrival standard deviations. matches: dic Contains the information for the detected and picked event. """ try: spt = int(dataset.attrs['p_arrival_sample']); except Exception: spt = None try: sst = int(dataset.attrs['s_arrival_sample']); except Exception: sst = None predicted_P = [] predicted_S = [] if len(matches) >=1: for match, match_value in matches.items(): if match_value[3]: predicted_P.append(match_value[3]) else: predicted_P.append(None) if match_value[6]: predicted_S.append(match_value[6]) else: predicted_S.append(None) data = np.array(dataset) fig = plt.figure() ax = fig.add_subplot(411) plt.plot(data[:, 0], 'k') plt.rcParams["figure.figsize"] = (8,5) legend_properties = {'weight':'bold'} plt.title(str(evi)) plt.tight_layout() ymin, ymax = ax.get_ylim() pl = None sl = None ppl = None ssl = None if dataset.attrs['trace_category'] == 'earthquake_local': if dataset.attrs['p_status'] == 'manual': pl = plt.vlines(int(spt), ymin, ymax, color='b', linewidth=2, label='Manual_P_Arrival') else: pl = plt.vlines(int(spt), ymin, ymax, color='b', linewidth=2, label='Auto_P_Arrival') if dataset.attrs['s_status'] == 'manual': sl = plt.vlines(int(sst), ymin, ymax, color='r', linewidth=2, label='Manual_S_Arrival') else: sl = plt.vlines(int(sst), ymin, ymax, color='r', linewidth=2, label='Auto_S_Arrival') if pl or sl: plt.legend(loc = 'upper right', borderaxespad=0., prop=legend_properties) ax = fig.add_subplot(412) plt.plot(data[:, 1] , 'k') plt.tight_layout() if dataset.attrs['trace_category'] == 'earthquake_local': if dataset.attrs['p_status'] == 'manual': pl = plt.vlines(int(spt), ymin, ymax, color='b', linewidth=2, label='Manual_P_Arrival') else: pl = plt.vlines(int(spt), ymin, ymax, color='b', linewidth=2, label='Auto_P_Arrival') if dataset.attrs['s_status'] == 'manual': sl = plt.vlines(int(sst), ymin, ymax, color='r', linewidth=2, label='Manual_S_Arrival') else: sl = plt.vlines(int(sst), ymin, ymax, color='r', linewidth=2, label='Auto_S_Arrival') if pl or sl: plt.legend(loc = 'upper right', borderaxespad=0., prop=legend_properties) ax = fig.add_subplot(413) plt.plot(data[:, 2], 'k') plt.tight_layout() if len(predicted_P) > 0: ymin, ymax = ax.get_ylim() for pt in predicted_P: if pt: ppl = plt.vlines(int(pt), ymin, ymax, color='c', linewidth=2, label='Predicted_P_Arrival') if len(predicted_S) > 0: for st in predicted_S: if st: ssl = plt.vlines(int(st), ymin, ymax, color='m', linewidth=2, label='Predicted_S_Arrival') if ppl or ssl: plt.legend(loc = 'upper right', borderaxespad=0., prop=legend_properties) ax = fig.add_subplot(414) x = np.linspace(0, data.shape[0], data.shape[0], endpoint=True) if args['estimate_uncertainty']: plt.plot(x, yh1, 'g--', alpha = 0.5, linewidth=1.5, label='Detection') lowerD = yh1-yh1_std upperD = yh1+yh1_std plt.fill_between(x, lowerD, upperD, alpha=0.5, edgecolor='#3F7F4C', facecolor='#7EFF99') plt.plot(x, yh2, 'b--', alpha = 0.5, linewidth=1.5, label='P_probability') lowerP = yh2-yh2_std upperP = yh2+yh2_std plt.fill_between(x, lowerP, upperP, alpha=0.5, edgecolor='#1B2ACC', facecolor='#089FFF') plt.plot(x, yh3, 'r--', alpha = 0.5, linewidth=1.5, label='S_probability') lowerS = yh3-yh3_std upperS = yh3+yh3_std plt.fill_between(x, lowerS, upperS, edgecolor='#CC4F1B', facecolor='#FF9848') plt.ylim((-0.1, 1.1)) plt.tight_layout() plt.legend(loc = 'upper right', borderaxespad=0., prop=legend_properties) else: plt.plot(x, yh1, 'g--', alpha = 0.5, linewidth=1.5, label='Detection') plt.plot(x, yh2, 'b--', alpha = 0.5, linewidth=1.5, label='P_probability') plt.plot(x, yh3, 'r--', alpha = 0.5, linewidth=1.5, label='S_probability') plt.tight_layout() plt.ylim((-0.1, 1.1)) plt.legend(loc = 'upper right', borderaxespad=0., prop=legend_properties) fig.savefig(os.path.join(save_figs, str(evi.split('/')[-1])+'.png'))