'''
Module that contains functions for preprocessing in the time domain
:copyright:
The SeisMIC development team (makus@gfz-potsdam.de).
:license:
EUROPEAN UNION PUBLIC LICENCE v. 1.2
(https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12)
:author:
Peter Makus (makus@gfz-potsdam.de)
Created: Tuesday, 20th July 2021 03:24:01 pm
Last Modified: Thursday, 29th June 2023 11:15:56 am
'''
from copy import deepcopy
import numpy as np
from scipy.fftpack import next_fast_len
from scipy import signal
from scipy.signal import detrend as sp_detrend
import obspy.signal as osignal
from seismic.utils.fetch_func_from_str import func_from_str
[docs]def clip(A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
Clip time series data at a multiple of the standard deviation
Set amplitudes exeeding a certain threshold to this threshold.
The threshold for clipping is estimated as the standard deviation of each
trace times a factor specified in `args`.
:Note: Traces should be demeaned before clipping.
:type A: numpy.ndarray
:param A: time series data with time oriented along the first \\
dimension (columns)
:type args: dictionary
:param args: the only keyword allowed is `std_factor` describing the \\
scaling of the standard deviation for the clipping threshold
:type params: dictionary
:param params: not used here
:rtype: numpy.ndarray
:return: clipped time series data
"""
stds = np.nanstd(A, axis=1)
for ind in range(A.shape[0]):
ts = args['std_factor']*stds[ind]
A[ind, A[ind, :] > ts] = ts
A[ind, A[ind, :] < -ts] = -ts
return A
[docs]def detrend(A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
Detrend wrapper. Can call scipy.signal.detrend or a faster custom function.
:param A: 1 or 2D array
:type A: np.ndarray
:param args: arguments for detrend function
:type args: dict
:param params: params file
:type params: dict
:raises ValueError: For unknown detrend method
:raises ValueError: For unknown detrend type
:return: detrended data
:rtype: np.ndarray
"""
method = args.get('method', 'qr')
if method == 'scipy':
return detrend_scipy(A, args, params)
elif method == 'qr':
detrend_type = args.get('type', 'linear')
if detrend_type == 'linear':
return detrendqr(A)
elif detrend_type == 'constant':
return demean(A)
else:
raise ValueError('Unknown detrend type')
else:
raise ValueError('Unknown detrend method')
[docs]def detrend_scipy(A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
Remove trend from data
.. seealso:: :func:`scipy.signal.detrend` for input arguments
"""
if np.all(np.isnan(A)):
return A
for ii in range(A.shape[0]):
try:
A[ii, ~np.isnan(A[ii])] = sp_detrend(
A[ii, ~np.isnan(A[ii])], axis=-1, overwrite_data=True,
**args)
except ZeroDivisionError:
# When the line is nothing but nans
pass
return A
[docs]def detrendqr(data: np.ndarray) -> np.ndarray:
"""
Remove trend from data using QR decomposition. Faster than
scipy.signal.detrend. Shamelessly adapted from
NoisePy
.. seealso:: Adapted from `NoisePy <https://doi.org/10.1785/0220190364>`_
:param data: 1 or 2D array
:type data: np.ndarray
:raises ValueError: For data with more than 2 dimensions
:return: detrended data
:rtype: np.ndarray
"""
npts = data.shape[-1]
X = np.ones((npts, 2))
X[:, 0] = np.arange(0, npts, dtype=np.float32) / npts
Q, R = np.linalg.qr(X)
rq = np.dot(np.linalg.inv(R), Q.transpose())
if data.ndim == 1:
coeff = np.dot(rq, data)
data = data - np.dot(X, coeff)
elif data.ndim == 2:
for ii in range(data.shape[0]):
coeff = np.dot(rq, data[ii])
data[ii] = data[ii] - np.dot(X, coeff)
else:
raise ValueError('data must be 1D or 2D')
return data
[docs]def demean(data: np.ndarray) -> np.ndarray:
"""
Demean data
:param data: 1 or 2D array
:type data: np.ndarray
:return: demeaned data
:rtype: np.ndarray
"""
return data - np.nanmean(data, axis=-1, keepdims=True)
[docs]def mute(A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
Mute parts of data that exceed a threshold
To completely surpress the effect of data with high amplitudes e.g. after
aftershocks these parts of the data are muted (set to zero). The respective
parts of the signal are identified as those where the envelope in a given
frequency exceeds a threshold given directly as absolute numer or as a
multiple of the data's standard deviation. A taper of length `taper_len` is
applied to smooth the edges of muted segments. Setting `extend_gaps` to
True will ensure that the taper is applied outside the segments and data
inside these segments will all zero. Edges of the data will be tapered too
in this case.
:type A: numpy.ndarray
:param A: time series data with time oriented along the first dimension
(columns)
:type args: dictionary
:param args: the following keywords are allowed:
* `filter`:
(dictionary) description of filter to be applied before
calculation of the signal envelope. If not given the envelope is
calculated from raw data. The value of the keyword filter is the
same as the `args` for the function `TDfilter`.
* `threshold`:
(float) absolute amplitude of threshold for muting
* `std_factor`:
(float) alternativly to an absolute number the threhold
can be estimated as a multiple of the standard deviation if the
scaling is given in as value of the keyword `std_factor`. If
neither `threshold` nor `std_factor` are given `std_factor=1` is
assumed.
* `extend_gaps`:
(boolean) if True date above the threshold is
guaranteed to be muted, otherwise tapering will leak into these
parts. This step involves an additional convolution.
* `taper_len`:
(float) length of taper for muted segments in seconds
:type params: dictionary
:param params: filled automatically by `pxcorr`
:rtype: numpy.ndarray
:return: clipped time series data
:Example:
``args={'filter':{'type':'bandpass', 'freqmin':1., 'freqmax':6.},
'taper_len':1., 'threshold':1000, 'std_factor':1, 'extend_gaps':True}``
"""
if args['taper_len'] == 0:
raise ValueError('Taper Length cannot be zero.')
# return zeros if length of traces is shorter than taper
ntap = int(args['taper_len']*params['sampling_rate'])
if A.shape[1] <= ntap:
return np.zeros_like(A)
# filter if asked to
if 'filter' in list(args.keys()):
C = TDfilter(A, args['filter'], params)
else:
C = A
# calculate envelope
D = np.abs(C)
# calculate threshold
if 'threshold' in list(args.keys()):
thres = np.zeros(A.shape[0]) + args['threshold']
elif 'std_factor' in list(args.keys()):
thres = np.std(C, axis=1) * args['std_factor']
else:
thres = np.std(C, axis=1)
# calculate mask
mask = np.ones_like(D)
mask[D > np.tile(thres, (D.shape[1], 1)).T] = 0
# extend the muted segments to make sure the whole segment is zero after
if args['extend_gaps']:
tap = np.ones(ntap)/ntap
for ind in range(A.shape[0]):
mask[ind, :] = np.convolve(mask[ind, :], tap, mode='same')
nmask = np.ones_like(D)
nmask[mask < 1.] = 0
else:
nmask = mask
# apply taper
tap = 2. - (np.cos(np.arange(ntap, dtype=float)/ntap*2.*np.pi) + 1.)
tap /= ntap
for ind in range(A.shape[0]):
nmask[ind, :] = np.convolve(nmask[ind, :], tap, mode='same')
# mute data with tapered mask
A *= nmask
return A
[docs]def normalizeStandardDeviation(
A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
Divide the time series by their standard deviation
Divide the amplitudes of each trace by its standard deviation.
:type A: numpy.ndarray
:param A: time series data with time oriented along the first \\
dimension (columns)
:type args: dictionary
:param args: not used here
:type params: dictionary
:param params: not used here
:rtype: numpy.ndarray
:return: normalized time series data
"""
std = np.std(A, axis=-1)
# avoid creating nans or Zerodivisionerror
std[np.where(std == 0)] = 1
A = (A.T/std).T
return A
[docs]def signBitNormalization(
A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
One bit normalization of time series data
Return the sign of the samples (-1, 0, 1).
:type A: numpy.ndarray
:param A: time series data with time oriented along the first \\
dimension (columns)
:type args: dictionary
:param args: not used here
:type params: dictionary
:param params: not used here
:rtype: numpy.ndarray
:return: 1-bit normalized time series data
"""
return np.sign(A)
[docs]def taper(A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
Taper to the time series data
Apply a simple taper to the time series data.
`args` has the following structure:
args = {'type':`type of taper`,taper_args}``
`type` may be `cosine_taper` with the corresponding taper_args `p` the
percentage of the traces to taper. Possibilities of `type` are \\
given by `obspy.signal`.
:Example:
``args = {'type':'cosine_taper','p':0.1}``
:type A: numpy.ndarray
:param A: time series data with time oriented along the first \\
dimension (columns)
:type args: dictionary
:param args: arguments dictionary as described above
:type params: dictionary
:param params: not used here
:rtype: numpy.ndarray
:return: tapered time series data
"""
if args['type'] == 'cosine_taper':
func = osignal.invsim.cosine_taper
else:
func = getattr(signal, args['type'])
args = deepcopy(args)
args.pop('type')
tap = func(A.shape[1], **args)
A *= np.tile(np.atleast_2d(tap), (A.shape[0], 1))
return A
[docs]def TDnormalization(A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
Amplitude dependent time domain normalization
Calculate the envelope of the filtered trace, smooth it in a window of
length `windowLength` and normalize the waveform by this trace. The two
used keywords in `args` are `filter and `windowLength` that describe the
filter and the length of the envelope smoothing window, respectively.
`args` has the following structure:
args = {'windowLength':`length of the envelope smoothing window in \\
[s]`,'filter':{'type':`filterType`, fargs}}``
`type` may be `bandpass` with the corresponding fargs `freqmin` and \\
`freqmax` or `highpass`/`lowpass` with the `fargs` `freqmin`/`freqmax`
:Example:
``args = {'windowLength':5,'filter':{'type':'bandpass',
'freqmin':0.5, 'freqmax':2.}}``
:type A: numpy.ndarray
:param A: time series data with time oriented along the first \\
dimension (columns)
:type args: dictionary
:param args: arguments dictionary as described above
:type params: dictionary
:param params: not used here
:rtype: numpy.ndarray
:return: normalized time series data
"""
if args['windowLength'] <= 0:
raise ValueError('Window Length has to be greater than 0.')
# filter if args['filter']
if args['filter']:
B = TDfilter(A, args['filter'], params)
# func = getattr(osignal, args['filter']['type'])
# fargs = deepcopy(args['filter'])
# fargs.pop('type')
# B = func(A.T, df=params['sampling_rate'], **fargs).T
else:
B = deepcopy(A)
# simple calculation of envelope
B = B**2
# smoothing of envelope in both directions to avoid a shift
window = (
np.ones(int(np.ceil(args['windowLength'] * params['sampling_rate'])))
/ np.ceil(args['windowLength']*params['sampling_rate']))
if len(B.shape) == 1:
B = np.convolve(B, window, mode='same')
B = np.convolve(B[::-1], window, mode='same')[::-1]
# damping factor
B += np.max(B)*1e-6
else:
for ind in range(B.shape[0]):
B[ind, :] = np.convolve(B[ind], window, mode='same')
B[ind, :] = np.convolve(B[ind, ::-1], window, mode='same')[::-1]
# damping factor
B[ind, :] += np.max(B[ind, :])*1e-6
# normalization
A /= np.sqrt(B)
return A
[docs]def TDfilter(A: np.ndarray, args: dict, params: dict) -> np.ndarray:
"""
Filter time series data
Filter in time domain. Types of filters are defined by `obspy.signal`.
`args` has the following structure:
args = {'type':`filterType`, fargs}
`type` may be `bandpass` with the corresponding fargs `freqmin` and
`freqmax` or `highpass`/`lowpass` with the `fargs` `freqmin`/`freqmax`
:Example:
``args = {'type':'bandpass','freqmin':0.5,'freqmax':2.}``
:type A: numpy.ndarray
:param A: time series data with time oriented along the first \\
dimension (columns)
:type args: dictionary
:param args: arguments dictionary as described above
:type params: dictionary
:param params: not used here
:rtype: numpy.ndarray
:return: filtered time series data
"""
func = func_from_str('obspy.signal.filter.%s' % args['type'])
args = deepcopy(args)
args.pop('type')
A = func(A, df=params['sampling_rate'], **args)
return A
[docs]def zeroPadding(A: np.ndarray, args: dict, params: dict, axis=1) -> np.ndarray:
"""
Append zeros to the traces
Pad traces with zeros to increase the speed of the Fourier transforms and
to avoid wrap around effects. Three possibilities for the length of the
padding can be set in ``args['type']``:
- ``nextFastLen``:
Traces are padded to a length that is the next fast
fft length
- ``avoidWrapAround``:
depending on length of the trace that is to be used
the padded part is just long enough to avoid wrap around
- ``avoidWrapFastLen``:
Use the next fast length that avoids wrap around
:Example: ``args = {'type':'avoidWrapPowerTwo'}``
:type A: numpy.ndarray
:param A: time series data with time oriented along the first \\
dimension (columns)
:type args: dictionary
:param args: arguments dictionary as described above
:type params: dictionary
:param params: not used here
:param axis: axis to pad on
:type axis: tuple, optional
:rtype: numpy.ndarray
:return: zero padded time series data
"""
if A.ndim > 2 or axis > 1:
raise NotImplementedError('Only two-dimensional arrays are supported.')
if A.ndim == 1 and len(A) == 0:
raise ValueError('Input Array is empty')
npts = A.shape[axis]
if A.ndim == 2:
ntrc = A.shape[axis-1]
elif A.ndim == 1:
ntrc = 1
if args['type'] == 'nextFastLen':
N = next_fast_len(npts)
elif args['type'] == 'avoidWrapAround':
N = npts + params['sampling_rate'] * params['lengthToSave']
elif args['type'] == 'avoidWrapFastLen':
N = next_fast_len(int(
npts + params['sampling_rate'] * params['lengthToSave']))
else:
raise ValueError("type '%s' of zero padding not implemented" %
args['type'])
if axis == 0:
A = np.concatenate(
(A, np.zeros((N-npts, ntrc), dtype=np.float32)), axis=axis)
else:
A = np.concatenate(
(A, np.zeros((ntrc, N-npts), dtype=np.float32)), axis=axis)
return A