'''
:copyright:
:license:
EUROPEAN UNION PUBLIC LICENCE v. 1.2
(https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12)
:author:
Peter Makus (makus@gfz-potsdam.de)
Created: Tuesday, 15th June 2021 03:42:14 pm
Last Modified: Friday, 29th September 2023 10:59:17 am
'''
from typing import List, Tuple, Optional
from copy import deepcopy
import numpy as np
from obspy.signal.invsim import cosine_taper
from scipy.interpolate import UnivariateSpline, interp1d
from seismic.correlate.stats import CorrStats
from seismic.monitor.trim import corr_mat_trim
[docs]def time_windows_creation(
starting_list: list, t_width: List[int] or int) -> np.ndarray:
""" Time windows creation.
A matrix containing one time window for each row is created. The starting
samples of each one of them are passed in the ``starting_list`` parameter.
The windows length ``t_width`` can be scalar or a list of values.
In the latter case both lists ``starting_list`` and ``t_width`` must
have the same length.
:type starting_list: list
:param starting_list: List of starting points
:type t_width: int or list of int
:param t_width: Windows length
:rtype: :class:`~numpy.ndarray`
:return: **tw_mat**: 2d ndarray containing the indexes of one time window
for each row
"""
if not np.isscalar(starting_list):
if not np.isscalar(t_width) and len(t_width) != len(starting_list):
raise ValueError("t_width must be a scalar or list of scalars of\
the same length as starting_list")
if np.any(np.array(t_width) <= 0):
raise ValueError('t_width must only contain values greater than 0.')
tw_list = []
if np.isscalar(starting_list):
if np.isscalar(t_width):
wlen = t_width
else:
wlen = t_width[0]
tw_list.append(np.arange(starting_list, starting_list + wlen, 1))
else:
for (ii, cstart) in enumerate(starting_list):
if np.isscalar(t_width):
wlen = t_width
else:
wlen = t_width[ii]
tw_list.append(np.arange(cstart, cstart + wlen, 1))
return tw_list
[docs]def compute_wfc(
mat: np.ndarray, tw: np.ndarray, refcorr: np.ndarray, sides: str,
remove_nans: bool = True) -> np.ndarray:
"""
Computes the waveform coherency (**WFC**) between the given reference
correlation and correlationmatrix.
See Steinmann, et. al. (2021) for details
:param mat: 2D Correlation Matrix (function of corrstart and time lag)
:type mat: np.ndarray
:param tw: Lag time window to use for the computation
:type tw: np.ndarray
:param refcorr: 1D reference Correlation Trace extracted from `mat`.
:type refcorr: np.ndarray
:param sides: Which sides to use. Can be `both`, `right`, `left`,
or `single`.
:type sides: str
:param remove_nans: Remove nans from CorrMatrix, defaults to True
:type remove_nans: bool, optional
:raises ValueError: unknown option in sides
:return: A 1D array with len=N(time windows)
:rtype: np.ndarray
"""
# Mat must be a 2d vector in every case so
mat = np.atleast_2d(mat)
assert len(refcorr) == mat.shape[1]
if remove_nans:
mat = np.nan_to_num(mat)
center_p = np.floor((mat.shape[1] - 1.) / 2.)
corr = np.zeros((len(tw), mat.shape[0]))
for (ii, ctw) in enumerate(tw):
if sides == 'both':
if ctw[0] == 0:
ctw = np.hstack((
center_p - ctw[::-1],
center_p + ctw[1:])).astype(np.int32)
else:
ctw = np.hstack((
center_p - ctw[::-1],
center_p + ctw)).astype(np.int32)
elif sides == 'left':
ctw = (center_p - ctw[::-1]).astype(np.int32)
elif sides == 'right':
ctw = (center_p + ctw).astype(np.int32)
elif sides == 'single':
ctw = ctw.astype(np.int32)
else:
raise ValueError(
'sides = %s not a valid option.' % sides)
mask = np.zeros((mat.shape[1],))
mask[ctw] = 1
ref_mask_mat = mask
mat_mask_mat = np.tile(mask, (mat.shape[0], 1))
first = mat * mat_mask_mat
second = refcorr * ref_mask_mat
dprod = np.dot(first, second.T)
# Normalization
f_sq = np.sum(first ** 2, axis=1)
s_sq = np.sum(second ** 2)
f_sq = f_sq.reshape(1, len(f_sq))
den = np.sqrt(np.dot(f_sq, s_sq))
corr[ii] = dprod/den
return corr
[docs]def wfc_multi_reftr(
corr_data: np.ndarray, ref_trs: np.ndarray, tw: np.ndarray, sides: str,
remove_nans: bool = True) -> dict:
"""
Computes the waveform coherency (**WFC**) between the given reference
correlation(s) and correlation matrix.
If several references are given, it will loop over these
See Steinmann, et. al. (2021) for details
:param corr_data: 2D Correlation Matrix
(function of corrstart and time lag)
:type corr_data: np.ndarray
:param refcorr: 1 or 2D reference Correlation Trace extracted from
`corr_data`.
If 2D, it will be interpreted as one refcorr trace per row.
:type refcorr: np.ndarray
:param tw: Lag time window to use for the computation
:type tw: np.ndarray
:param sides: Which sides to use. Can be `both`, `right`, `left`,
or `single`.
:type sides: str
:param remove_nans: Remove nans from CorrMatrix, defaults to True
:return: A dictionary with one correlation array for each reference trace.
:rtype: dict
"""
if remove_nans:
corr_data = np.nan_to_num(corr_data)
ref_trs = np.nan_to_num(ref_trs)
# remove 1-dimensions
ref_trs = np.squeeze(ref_trs)
multi_ref_panel = {}
# check how many reference traces have been passed
try:
reftr_count, _ = ref_trs.shape
except ValueError: # An array is passed
reftr_count = 1
if reftr_count == 1:
key = "reftr_0"
value = compute_wfc(
corr_data, tw, ref_trs, sides, remove_nans=remove_nans)
multi_ref_panel.update({key: value})
else: # For multiple-traces loops
for i, rftr in enumerate(ref_trs):
key = "reftr_%d" % i
value = value = compute_wfc(
corr_data, tw, rftr, sides, remove_nans=remove_nans)
multi_ref_panel.update({key: value})
return multi_ref_panel
[docs]def velocity_change_estimate(
mat: np.ndarray, tw: np.ndarray, strrefmat: np.ndarray, strvec: np.ndarray,
sides: str = 'both', return_sim_mat: bool = False,
remove_nans: bool = True) -> dict:
""" Velocity change estimate through stretching and comparison.
Velocity changes are estimated comparing each correlation function stored
in the ``mat`` matrix (one for each row) with ``strrefmat.shape[0]``
stretched versions of a reference trace stored in ``strrefmat``.
The stretch amount for each row of ``strrefmat`` must be passed in the
``strvec`` vector.
The best match (stretch amount and corresponding correlation value) is
calculated on different time windows (each row of ``tw`` is a different
one) instead of on the whole trace.
:type mat: :class:`~numpy.ndarray`
:param mat: 2d ndarray containing the correlation functions.
One for each row.
:type tw: :class:`~numpy.ndarray` of int
:param tw: 2d ndarray of time windows to be use in the velocity change
estimate.
:type strrefmat: :class:`~numpy.ndarray`
:param strrefmat: 2D array containing stretched version of the reference
matrix
:type strvec: :class:`~numpy.ndarray` or list
:param strvec: Stretch amount for each row of ``strrefmat``
:type sides: string
:param sides: Side of the reference matrix to be used for the velocity
change estimate ('both' | 'left' | 'right' | 'single')
:type remove_nans: bool
:param remove_nans: If `True` applay :func:`~numpy.nan_to_num` to the
given correlation matrix before any other operation.
:rtype: Dictionary
:return: **dv**: Dictionary with the following keys
*corr*: 2d ndarray containing the correlation value for the best
match for each row of ``mat`` and for each time window.
Its dimension is: :func:(len(tw),mat.shape[1])
*value*: 2d ndarray containing the stretch amount corresponding to
the best match for each row of ``mat`` and for each time window.
Its dimension is: :func:(len(tw),mat.shape[1])
*sim_mat*: 3d ndarray containing the similarity matricies that
indicate the correlation coefficient with the reference for the
different time windows, different times and different amount of
stretching.
Its dimension is: :py:func:`(len(tw),mat.shape[1],len(strvec))`
*second_axis*: It contains the stretch vector used for the velocity
change estimate.
*vale_type*: It is equal to 'stretch' and specify the content of
the returned 'value'.
*method*: It is equal to 'single_ref' and specify in which "way" the
values have been obtained.
"""
# Mat must be a 2d vector in every case so
mat = np.atleast_2d(mat)
if strrefmat.shape[1] != mat.shape[1]:
raise ValueError(
'STRREFMAT and MAT must have the same number of'
f"samples corrmat has shape {mat.shape} "
f'and stretched reference matrix has {strrefmat.shape}')
if remove_nans:
mat = np.nan_to_num(mat)
center_p = np.floor((mat.shape[1] - 1.) / 2.)
nstr = strrefmat.shape[0]
corr = np.zeros((len(tw), mat.shape[0]))
dt = np.zeros((len(tw), mat.shape[0]))
sim_mat = np.zeros([mat.shape[0], len(strvec), len(tw)])
for (ii, ctw) in enumerate(tw):
if sides == 'both':
# ctw = np.hstack((center_p - ctw[::-1],
# center_p + ctw)).astype(np.int32)
if ctw[0] == 0:
ctw = np.hstack((
center_p - ctw[::-1],
center_p + ctw[1:])).astype(np.int32)
else:
ctw = np.hstack((
center_p - ctw[::-1],
center_p + ctw)).astype(np.int32)
elif sides == 'left':
ctw = (center_p - ctw[::-1]).astype(np.int32)
elif sides == 'right':
ctw = (center_p + ctw).astype(np.int32)
elif sides == 'single':
ctw = ctw.astype(np.int32)
else:
raise ValueError(
'sides = %s not a valid option.' % sides)
mask = np.zeros((mat.shape[1],))
mask[ctw] = 1
ref_mask_mat = np.tile(mask, (nstr, 1))
mat_mask_mat = np.tile(mask, (mat.shape[0], 1))
first = mat * mat_mask_mat
second = strrefmat * ref_mask_mat
dprod = np.dot(first, second.T)
# Normalization
f_sq = np.sum(first ** 2, axis=1)
s_sq = np.sum(second ** 2, axis=1)
f_sq = f_sq.reshape(1, len(f_sq))
s_sq = s_sq.reshape(1, len(s_sq))
den = np.sqrt(np.dot(f_sq.T, s_sq))
tmp = dprod / den
sim_mat[:, :, ii] = tmp
tmp_corr_vect = tmp.max(axis=1)
corr[ii, :] = tmp_corr_vect
dt[ii, :] = strvec[tmp.argmax(axis=1)]
# Set dt to NaN where the correlation is NaN instead of having it equal
# to one of the two stretch_range limits
dt[ii, np.isnan(tmp_corr_vect)] = np.nan
dv = {'corr': np.squeeze(corr),
'value': np.squeeze(dt),
'second_axis': strvec,
'value_type': 'stretch',
'method': 'single_ref'}
if return_sim_mat:
dv.update({'sim_mat': np.squeeze(sim_mat)})
return dv
[docs]def time_stretch_estimate(
corr_data: np.ndarray, ref_trc: np.ndarray = None, tw: np.ndarray = None,
stretch_range: float = 0.1, stretch_steps: int = 100, sides: str = 'both',
remove_nans: bool = True,
ref_tr_trim: Optional[Tuple[float, float]] = None,
ref_tr_stats=None) -> dict:
""" Time stretch estimate through stretch and comparison.
This function estimates stretching of the time axis of traces as it can
occur if the propagation velocity changes.
Time stretching is estimated comparing each correlation function stored
in the ``corr_data`` matrix (one for each row) with ``stretch_steps``
stretched versions of reference trace stored in ``ref_trc``.
The maximum amount of stretching may be passed in ``stretch_range``. The
time axis is multiplied by 1/(1 + stretch).
The best match (stretching amount and corresponding correlation value) is
calculated on different time windows. If ``tw = None`` the stretching is
estimated on the whole trace.
:type corr_data: :class:`~numpy.ndarray`
:param corr_data: 2d ndarray containing the correlation functions.
One for each row.
:type ref_trc: :class:`~numpy.ndarray`
:param ref_trc: 1D array containing the reference trace to be shifted and
compared to the individual traces in ``mat``
:type tw: list of :class:`~numpy.ndarray` of int
:param tw: list of 1D ndarrays holding the indices of sampels in the time
windows to be use in the time shift estimate. The sampels are counted
from the zero lag time with the index of the first sample being 0. If
``tw = None`` the full time range is used.
:type stretch_range: scalar
:param stretch_range: Maximum amount of relative stretching.
Stretching and compression is tested from ``-stretch_range`` to
``stretch_range``.
:type stretch_steps: scalar`
:param stretch_steps: Number of shifted version to be tested. The
increment will be ``(2 * stretch_range) / stretch_steps``
:type sides: str
:param sides: Side of the reference matrix to be used for the stretching
estimate ('both' | 'left' | 'right' | 'single') ``single`` is used for
one-sided signals from active sources with zero lag time is on the
first sample. Other options assume that the zero lag time is in the
center of the traces.
:type remove_nans: bool
:param remove_nans: If `True` applay :func:`~numpy.nan_to_num` to the
given correlation matrix before any other operation.
:rtype: Dictionary
:return: **dv**: Dictionary with the following keys
*corr*: 2d ndarray containing the correlation value for the best
match for each row of ``mat`` and for each time window.
Its dimension is: :func:(len(tw),mat.shape[1])
*value*: 2d ndarray containing the stretch amount corresponding to
the best match for each row of ``mat`` and for each time window.
Stretch is a relative value corresponding to the negative relative
velocity change -dv/v.
Its dimension is: :func:(len(tw),mat.shape[1])
*sim_mat*: 3d ndarray containing the similarity matricies that
indicate the correlation coefficient with the reference for the
different time windows, different times and different amount of
stretching.
Its dimension is: :py:func:`(len(tw),mat.shape[1],len(strvec))`
*second_axis*: It contains the stretch vector used for the velocity
change estimate.
*vale_type*: It is equal to 'stretch' and specify the content of
the returned 'value'.
*method*: It is equal to 'single_ref' and specify in which "way" the
values have been obtained.
"""
# Mat must be a 2d vector in every case so
mat = np.atleast_2d(corr_data)
# FIX: Trace back this problem to the original source and remove
# this statement
if remove_nans:
mat = np.nan_to_num(mat)
# generate the reference trace if not given (use the whole time span)
if ref_trc is None:
ref_trc = np.nanmean(mat, axis=0)
# generate time window if not given (use the full length of the correlation
# trace)
if tw is None:
if sides == 'single':
tw = time_windows_creation([0], [mat.shape[1]])
else:
tw = time_windows_creation([0], [int(np.floor(mat.shape[1] / 2.))])
# taper and extend the reference trace to avoid interpolation
# artefacts at the ends of the trace
# 01.07.21
# This taper decreases the correlation between the traces,
# just don't permit extrapolation
# taper = cosine_taper(len(ref_trc), 0.05)
# ref_trc *= taper
# different values of shifting to be tested
stretches = np.linspace(-stretch_range, stretch_range, stretch_steps)
# dv defined as difference
# time_facs = 1/(1 + stretchs)
# dv defined as logarithmic stretch
time_facs = np.exp(-stretches)
# time axis
if sides != 'single':
time_idx = np.arange(len(ref_trc)) - (len(ref_trc) - 1.) / 2.
else:
time_idx = np.arange(len(ref_trc))
# create the array to hold the shifted traces
ref_stretch = np.zeros((len(stretches), len(ref_trc)))
# create a spline object for the reference trace
# :NOTE: This change can give very different results and should
# **definitely** be discussed!
# 01.07.21 No extrapolation
# 15.11.22 set extrapolation to 0 / ext=1
ref_tr_spline = UnivariateSpline(time_idx, ref_trc, s=0, ext='zeros')
# evaluate the spline object at different points and put in the prepared
# array
for (k, this_fac) in enumerate(time_facs):
ref_stretch[k, :] = ref_tr_spline(time_idx * this_fac)
if ref_tr_trim is not None:
ref_stretch, _ = corr_mat_trim(
ref_stretch, deepcopy(ref_tr_stats), ref_tr_trim[0],
ref_tr_trim[1])
# search best fit of the crosscorrs to one of the stretched ref_traces
dv = velocity_change_estimate(
mat, tw, ref_stretch, stretches, sides=sides, return_sim_mat=True,
remove_nans=remove_nans)
# TODO: It is not really clear why it it necessary to transpose here so
# this is the fist point where to look in case of errors.
dv['corr'] = dv['corr'].T
dv['value'] = dv['value'].T
# dv.update({'stretch_vec': stretchs})
return dv
[docs]def multi_ref_vchange(
corr_data: np.ndarray, ref_trs: np.ndarray, tw: np.ndarray = None,
stretch_range: float = 0.1, stretch_steps: int = 100, sides: str = 'both',
remove_nans: bool = True,
ref_tr_trim: Optional[Tuple[float, float]] = None,
ref_tr_stats=None) -> dict:
""" Velocity change estimate with single or multiple reference traces.
This function estimates the velocity change corresponding to each row of
the ``corr_data`` matrix respect to the reference trace/s passed in
``ref_trs``.
The velocity change is estimated comparing each correlation function stored
in the ``corr_data`` matrix (one for each row) with ``stretch_steps``
stretched versions of reference/s trace stored in ``ref_trs``.
The maximum amount of stretching may be passed in ``stretch_range``.
The best match (stretching amount and corresponding correlation value) is
calculated on different time windows. If ``tw = None`` the stretching is
estimated on the whole trace.
The output is a dictionary with keys of the form ``"reftr_%d" % i``: One
for each reference trace. The corresponding ``value`` is aslo a dictionary
that has a structure conforming with
:py:class:`~miic.core.stretch_mod.time_stretch_estimate` output.
:type corr_data: :class:`~numpy.ndarray`
:param corr_data: 2d ndarray containing the correlation functions.
One for each row.
:type ref_trc: :class:`~numpy.ndarray`
:param ref_trc: 1D array containing the reference trace to be shifted and
compared to the individual traces in ``mat``
:type tw: list of :class:`~numpy.ndarray` of int
:param tw: list of 1D ndarrays holding the indices of sampels in the time
windows to be use in the time shift estimate. The sampels are counted
from the zero lag time with the index of the first sample being 0. If
``tw = None`` the full time range is used.
:type stretch_range: scalar
:param stretch_range: Maximum amount of relative stretching.
Stretching and compression is tested from ``-stretch_range`` to
``stretch_range``.
:type stretch_steps: scalar`
:param stretch_steps: Number of shifted version to be tested. The
increment will be ``(2 * stretch_range) / stretch_steps``
:type single_sided: bool
:param single_sided: if True zero lag time is on the first sample. If
False the zero lag time is in the center of the traces.
:type sides: str
:param sides: Side of the reference matrix to be used for the stretching
estimate ('both' | 'left' | 'right' | 'single') ``single`` is used for
one-sided signals from active sources with zero lag time is on the
first sample. Other options assume that the zero lag time is in the
center of the traces.
:type remove_nans: bool
:param remove_nans: If `True` applay :func:`~numpy.nan_to_num` to the
given correlation matrix before any other operation.
:rtype: dictionary
:return: **multi_ref_panel**: It is a dictionary that contains as much
dictionaries as reference traces have been used. The key format is
``"reftr_%d" % i`` and the corresponding value is also a dictionary
with the structure described in
:py:class:`~miic.core.stretch_mod.time_stretch_estimate`
"""
# FIX: Trace back this problem to the original source and remove
# this statement
if remove_nans:
corr_data = np.nan_to_num(corr_data)
ref_trs = np.nan_to_num(ref_trs)
# remove 1-dimensions
ref_trs = np.squeeze(ref_trs)
# check how many reference traces have been passed
try:
reftr_count, _ = ref_trs.shape
except ValueError: # An array is passed
reftr_count = 1
# Dictionary that will hold all the results
multi_ref_panel = {}
# When there is just 1 reference trace no loop is necessary
if reftr_count == 1:
key = "reftr_0"
value = time_stretch_estimate(
corr_data, ref_trc=ref_trs, tw=tw, stretch_range=stretch_range,
stretch_steps=stretch_steps, sides=sides, remove_nans=remove_nans,
ref_tr_trim=ref_tr_trim, ref_tr_stats=ref_tr_stats)
multi_ref_panel.update({key: value})
else: # For multiple-traces loops
for i in range(reftr_count):
ref_trc = ref_trs[i]
key = "reftr_%d" % int(i)
value = time_stretch_estimate(
corr_data, ref_trc=ref_trc, tw=tw, stretch_range=stretch_range,
stretch_steps=stretch_steps, sides=sides,
remove_nans=remove_nans,
ref_tr_trim=ref_tr_trim, ref_tr_stats=ref_tr_stats)
multi_ref_panel.update({key: value})
return multi_ref_panel
[docs]def est_shift_from_dt_corr(
dt1: np.ndarray, dt2: np.ndarray, corr1: np.ndarray,
corr2: np.ndarray) -> Tuple[float, float]:
""" Estimation of a baseline shift between velocity-change measurements
preformed with different references.
The use of different reference traces obtaind from different reference
periods will result in a shift of the velocity-change curves that ideally
characterizes the velocity variation between the two reference periods.
Instead of directly measuring this velocity change from the two reference
traces it is calulated here as the weighted average of the point
differences between the two velocity-change curves weighted by their
inverse variance according to Weaver et al. GJI 2011 (On the precision of
noise correlation interferometry)
Input vertors must all be of the same lenth.
:type dt1: :class:`~numpy.ndarray`
:pram dt1: Velocity variation measured for reference A
:type dt2: :class:`~numpy.ndarray`
:pram dt2: Velocity variation measured for reference B
:type corr1: :class:`~numpy.ndarray`
:pram corr1: Correlation between velocity corrected trace and reference A
:type corr2: :class:`~numpy.ndarray`
:pram corr2: Correlation between velocity corrected trace and reference B
"""
# Create masked arrays so that NaNs are handled properly
dt1 = np.ma.masked_array(
dt1, mask=(np.isnan(dt1) | np.isinf(dt1)), fill_value=0)
dt2 = np.ma.masked_array(
dt2, mask=(np.isnan(dt2) | np.isinf(dt2)), fill_value=0)
corr1 = np.ma.masked_array(
corr1, mask=(np.isnan(corr1) | np.isinf(corr1)), fill_value=0)
corr2 = np.ma.masked_array(
corr2, mask=(np.isnan(corr2) | np.isinf(corr2)), fill_value=0)
# October 21 - PM - Not entirely sure, why this is done?
# However, it leads to problems when creating the mask
# (because shape wrong). Two options: 1.DOn't do it or
# 2. Also remove the correspdoning points in the dt objects
# Remove the points where the correlation is 0
no_zero = (corr1 > 0) & (corr2 > 0)
# corr1 = corr1[no_zero]
# corr2 = corr2[no_zero]
# Estimate the point-variance for the two curves
# var1 = (1 - corr1[no_zero] ** 2) / (4 * corr1[no_zero] ** 2)
# var2 = (1 - corr2[no_zero] ** 2) / (4 * corr2[no_zero] ** 2)
var1 = (1 - corr1 ** 2) / (4 * corr1 ** 2)
var2 = (1 - corr2 ** 2) / (4 * corr2 ** 2)
# Calculate the point-weight
wgt = 1 / (var1 + var2)
# Insert he no_zero mask here instead.
mask = (corr1 > 0.999) & (corr2 > 0.999) & (~no_zero)
wgt = wgt[~mask]
# This saves from returning a masked value
try:
wgt_s = np.sum(wgt).filled(np.nan)
except Exception:
wgt_s = np.sum(wgt)
# Calculate the shift and the total weight as a cumulative sum of the
# weighted average of the two curves
shift = np.sum((dt1[~mask] - dt2[~mask]) * wgt) / wgt_s
comb_corr = np.sum((corr1[~mask] + corr2[~mask]) * wgt) / wgt_s
# This saves from returning masked values
try:
shift = shift.filled(np.nan)
except Exception:
pass
try:
comb_corr = comb_corr.filled(np.nan)
except Exception:
pass
return comb_corr, shift
def _create_G(n_ref: int) -> np.ndarray:
""" Create the G matrix for the multi-trace alignment
"""
G = None
for jj in range(n_ref):
line = list(range(n_ref))
tline = [i for i in line if i != jj]
tG = np.zeros((len(tline), len(tline)))
if jj > 0:
tG[:, jj - 1] = -1
for ii in range(len(tline)):
if tline[ii] > 0:
tG[ii, tline[ii] - 1] = 1
if G is None:
G = tG
else:
G = np.vstack((G, tG))
return G
[docs]def estimate_reftr_shifts_from_dt_corr(
multi_ref_panel: dict, return_sim_mat: bool = False) -> dict:
""" Combine velocity-change measurements of the same data performed with
different references to a single curve.
For a set of velocity-change measurements performed with different
references this function estimates the relative offsets between all pairs
of the measurements as a weighted average of their difference with the
function :py:class:`~miic.core.stretch_mod.est_shift_from_dt_corr`.
A least squares solution in computed that combines the pairwise
differences to a consistent set of reference shifts. These shifts should
be similar to the velocity variations measured between the reference
traces. The consistent set of reference shifts is used to correct i.e.
shift the similarity matricies to a common reference. Finally the
corrected similarity matrices are averaged resulting in a single matrix
that is interpreted as before. The position of the crest is the combined
velocity change and the height of the crest is the correlation value.
:type multi_ref_panel: dictionay
:param multi_ref_panel: It is a dictionary with one (key,value) pair
for each reference trace. Its structure is described
in :func:`~seismic.monitor.stretch_mod.multi_ref_vchange`
:type return_sim_mat: bool
:param return_sim_mat: If `True` the returning dictionary contains also the
similarity matrix `sim_mat`.
:rtype: dict
:return: **dv**: Dictionary with the following keys
- *corr*: 2d ndarray containing the correlation value for the best
match for each row of `mat` and for each time window.
Its shape is: (len(tw),mat.shape[1])
- *value*: 2d ndarray containing the stretch amount corresponding to
the best match for each row of `mat` and for each time window.
Stretch is a relative value corresponding to the negative relative
velocity change -dv/v.
Its shape is: (len(tw),mat.shape[1])
- *sim_mat*: 3d array containing the similarity matricies that
indicate the correlation coefficient with the reference for the
different time windows, different times and different amount of
stretching.
Its shape is: (len(tw),mat.shape[1],len(strvec))
- *second_axis*: It contains the stretch vector used for the velocity
change estimate.
- *value_type*: It is equal to stretch and specify the content of
the returned value.
- *method*: It is equal to single_ref and specify in which way the
values have been obtained.
"""
# Vector with the stretching amount
stretch_vect = multi_ref_panel['reftr_0']['second_axis']
delta = stretch_vect[1] - stretch_vect[0]
n_ref = len(list(multi_ref_panel.keys()))
corr = []
shift = []
if n_ref > 1:
# Loop over reftr
for reftr1 in np.sort(list(multi_ref_panel.keys())):
ref_idx = [reftr for reftr in np.sort(list(multi_ref_panel.keys()))
if reftr != reftr1]
for reftr2 in ref_idx:
ccorr, sshift = est_shift_from_dt_corr(
np.squeeze(multi_ref_panel[reftr1]['value']),
np.squeeze(multi_ref_panel[reftr2]['value']),
np.squeeze(multi_ref_panel[reftr1]['corr']),
np.squeeze(multi_ref_panel[reftr2]['corr']))
corr.append(ccorr)
shift.append(sshift)
G = _create_G(len(list(multi_ref_panel.keys())))
W = np.diag(np.array(corr))
D = np.array(shift)
# This conversion is necessary to achive the same speed with diffrent
# .dot implementation on different numpy versions.
D = np.nan_to_num(D)
W = np.nan_to_num(W)
GTW = np.dot(G.T, W)
# to_invert = np.dot(G.T, np.dot(W, G))
to_invert = np.dot(GTW, G)
# TODO: Get rid of the matrix inversion
left_op = np.linalg.pinv(to_invert)
# This conversion is necessary to achive the same speed with diffrent
# .dot implementation on different numpy versions.
left_op = np.nan_to_num(left_op)
m = np.dot(left_op, np.dot(GTW, D))
m = np.hstack((0, m))
m = m - np.mean(m)
# How many samples (int) each sim matrix must be rolled
# PM October 2021 - require works the dtype casting in first
# step caused errors, should be fine now?
m = np.around(m / delta) # , out=np.zeros_like(m, dtype='int32'))
m = np.require(m, dtype='int32')
row, col = np.squeeze(multi_ref_panel['reftr_0']['sim_mat']).shape
stmp = np.zeros((row, col, n_ref))
# Roll the sim_mat
for (i, reftr) in enumerate(np.sort(list(multi_ref_panel.keys()))):
stmp[:, :, i] = \
np.roll(np.squeeze(multi_ref_panel[reftr]['sim_mat']),
m[i],
axis=1)
# Create a msked array to evaluate the mean
stmp_masked = np.ma.masked_array(
stmp, mask=(np.isnan(stmp) | np.isinf(stmp)), fill_value=0)
# Evaluate the sim_mat for the multi-ref approach as the mean
# of the rolled sim_mat corresponfing to the individual reference
# traces
# When operating with masked arrays this operation creates a new
# object so the default fill_values returns to be 1e20
bsimmat = np.mean(stmp_masked, axis=2)
corr = np.max(bsimmat, axis=1)
dt = np.argmax(bsimmat, axis=1)
dt = stretch_vect[dt]
# Remove the mask
bsimmat = bsimmat.filled(np.nan)
try:
corr = corr.filled(np.nan)
except Exception:
pass
try:
dt = dt.filled(np.nan).astype('int')
except Exception:
pass
ret_dict = {'corr': corr,
'value': dt,
'second_axis': stretch_vect,
'value_type': 'stretch',
'method': 'multi_ref'}
if return_sim_mat:
ret_dict.update({'sim_mat': bsimmat})
return ret_dict
else:
print("For a single reference trace use the appropriate function")
return None
[docs]def multi_ref_vchange_and_align(
corr_data: np.ndarray, ref_trs: np.ndarray, tw: np.ndarray = None,
stretch_range: float = 0.1, stretch_steps: int = 100,
sides: str = 'both', return_sim_mat: bool = False,
remove_nans: bool = True,
ref_tr_trim: Optional[Tuple[float, float]] = None,
ref_tr_stats=None) -> dict:
""" Multi-reference dv estimate and alignment
:type corr_data: :class:`~numpy.ndarray`
:param corr_data: 2d ndarray containing the correlation functions.
One for each row.
:type ref_trc: :class:`~numpy.ndarray`
:param ref_trc: 1D array containing the reference trace to be shifted and
compared to the individual traces in ``mat``
:type tw: list of :class:`~numpy.ndarray` of int
:param tw: list of 1D ndarrays holding the indices of sampels in the time
windows to be use in the time shift estimate. The sampels are counted
from the zero lag time with the index of the first sample being 0. If
``tw = None`` the full time range is used.
:type stretch_range: scalar
:param stretch_range: Maximum amount of relative stretching.
Stretching and compression is tested from ``-stretch_range`` to
``stretch_range``.
:type stretch_steps: scalar`
:param stretch_steps: Number of shifted version to be tested. The
increment will be ``(2 * stretch_range) / stretch_steps``
:type single_sided: bool
:param single_sided: if True zero lag time is on the first sample. If
False the zero lag time is in the center of the traces.
:type sides: str
:param sides: Side of the reference matrix to be used for the stretching
estimate ('both' | 'left' | 'right' | 'single') ``single`` is used for
one-sided signals from active sources with zero lag time is on the
first sample. Other options assume that the zero lag time is in the
center of the traces.
:type return_sim_mat: bool
:param return_sim_mat: If `True` the returning dictionary contains also the
similarity matrix `sim_mat`.
:type remove_nans: bool
:param remove_nans: If `True` applay :func:`~numpy.nan_to_num` to the
given correlation matrix before any other operation.
:rtype: Dictionary
:return: **dv**: Dictionary with the following keys
*corr*: 2d ndarray containing the correlation value for the best
match for each row of ``mat`` and for each time window.
Its dimension is: :func:(len(tw),mat.shape[1])
*value*: 2d ndarray containing the stretch amount corresponding to
the best match for each row of ``mat`` and for each time window.
Stretch is a relative value corresponding to the negative relative
velocity change -dv/v.
Its dimension is: :func:(len(tw),mat.shape[1])
*sim_mat*: 3d ndarray containing the similarity matricies that
indicate the correlation coefficient with the reference for the
different time windows, different times and different amount of
stretching.
Its dimension is: :py:func:`(len(tw),mat.shape[1],len(strvec))`
*second_axis*: It contains the stretch vector used for the velocity
change estimate.
*vale_type*: It is equal to 'stretch' and specify the content of
the returned 'value'.
*method*: It is equal to 'single_ref' and specify in which "way" the
values have been obtained.
"""
# FIX: Trace back this problem to the original source and remove
# this statement
if remove_nans:
corr_data = np.nan_to_num(corr_data)
ref_trs = np.nan_to_num(ref_trs)
if tw is not None and len(tw) > 1:
print(
"The multi-reference vchange evaluation cannot handle multiple "
"time windows. Only the first time-window will be used")
tw = tw[0]
multi_ref_panel = multi_ref_vchange(
corr_data, ref_trs, tw=tw, stretch_range=stretch_range,
stretch_steps=stretch_steps, sides=sides, remove_nans=remove_nans,
ref_tr_trim=ref_tr_trim, ref_tr_stats=ref_tr_stats)
n_ref = len(list(multi_ref_panel.keys()))
if n_ref > 1:
dv = estimate_reftr_shifts_from_dt_corr(
multi_ref_panel, return_sim_mat=return_sim_mat)
else:
# changed key here
dv = multi_ref_panel['reftr_0']
return dv
[docs]def time_shift_estimate(
corr_data: np.ndarray, ref_trc: np.ndarray = None,
tw: List[np.ndarray] = None, shift_range: float = 10,
shift_steps: int = 100, single_sided: bool = False,
return_sim_mat: bool = True, remove_nans: bool = True) -> dict:
""" Time shift estimate through shifting and comparison.
This function is intended to estimate shift of traces as they can occur
in noise cross-correlation in case of drifting clocks.
Time shifts are estimated comparing each correlation function stored
in the ``corr_data`` matrix (one for each row) with ``shift_steps``
shifted versions of reference trace stored in ``ref_trc``.
The maximum amount of shifting may be passed in ``shift_range``.
The best match (shifting amount and corresponding correlation value) is
calculated on different time windows. If ``tw = None`` the shifting is
estimated on the whole trace.
:type corr_data: :class:`~numpy.ndarray`
:param corr_data: 2d ndarray containing the correlation functions.
One for each row.
:type ref_trc: :class:`~numpy.ndarray`
:param ref_trc: 1D array containing the reference trace to be shifted and
compared to the individual traces in ``mat``
:type tw: list of :class:`~numpy.ndarray` of int
:param tw: list of 1D ndarrays holding the indices of sampels in the time
windows to be use in the time shift estimate. The sampels are counted
from the zero lag time with the index of the first sample being 0. If
``tw = None`` the full time range is used.
:type shift_range: scalar
:param shift_range: Maximum amount of time shift in samples (in one
direction).
Shifting is tested in both directions from ``-shift_range`` to
``shift_range``
:type shift_steps: scalar`
:param shift_steps: Number of shifted version to be tested. The increment
will be ``(2 * shift_range) / shift_steps``
:type sinlge_sided: boolean
:param single_sided: If ``True`` the zero lag time of the traces is in the
first sample. If ``False`` zero lag is assumed to be in the center of
the traces and the shifting is evaluated on the causal and acausal
parts of the traces separately and averaged. This is done to avoid bias
from velocity changes (stretching) in the case of strongly asymmetric
traces.
:type remove_nans: bool
:param remove_nans: If `True` applay :func:`~numpy.nan_to_num` to the
given correlation matrix before any other operation.
:rtype: Dictionary
:return: **dt**: Dictionary with the following keys
*corr*: :class:`~numpy.ndarray` 2d ndarray containing the correlation
value for the best match for each row of ``mat`` and for each
time window.
Its dimension is: :func:(len(tw),mat.shape[1])
*value*: :class:`~numpy.ndarray` 2d ndarray containing the amount of
shifting corresponding to the best match for each row of ``mat``
and for each time window. Shift is measured in units of the
sampling interval.
Its dimension is: :py:func:`(len(tw),mat.shape[1])`
*sim_mat*: 2d ndarray containing the similarity matrix that
indicate the time shift respect to the reference for the
selected time windows, for different times and different amount of
shift.
Its dimension is: :py:func:`(mat.shape[1],shift_steps)`
*second_axis*: It contains the shift vector used for the velocity
change estimate.
*vale_type*: It is equal to 'shift' and specify the content of
the returned 'value'.
*method*: It is equal to 'time_shift' and specify in which "way" the
values have been obtained.
"""
# Mat must be a 2d vector in every case so
mat = np.atleast_2d(corr_data)
# FIX: Trace back this problem to the original source and remove
# this statement
if remove_nans:
mat = np.nan_to_num(mat)
# generate the reference trace if not given (use the whole time span)
if ref_trc is None:
ref_trc = np.nansum(mat, axis=0) / mat.shape[0]
# generate time window if not given (use the full length of
# the correlation trace)
if tw is None:
if single_sided:
tw = time_windows_creation([0], [mat.shape[1]])
# if center is in the middle use only half the length for the
# time window
else:
tw = time_windows_creation(
[0], [int(np.floor(mat.shape[1] / 2.))])
# # taper and extend the reference trace to avoid interpolation
# # artefacts at the ends of the trace
# taper = cosine_taper(len(ref_trc), 0.05)
# ref_trc *= taper
# different values of shifting to be tested
shifts = np.linspace(-shift_range, shift_range, shift_steps)
# time axis
if single_sided:
time_idx = np.arange(len(ref_trc)) - (len(ref_trc) - 1.) / 2.
else:
time_idx = np.arange(len(ref_trc))
# create the array to hold the shifted traces
ref_shift = np.zeros((len(shifts), len(ref_trc)))
# create a spline object for the reference trace
ref_tr_spline = UnivariateSpline(time_idx, ref_trc, s=0, ext='const')
# evaluate the spline object at different points and put in the prepared
# array
for (k, this_shift) in enumerate(shifts):
ref_shift[k, :] = ref_tr_spline(time_idx - this_shift)
# search best fit of the crosscorrs to one of the shifted ref_traces
if single_sided:
vdict = velocity_change_estimate(
mat, tw, ref_shift, shifts, sides='single', return_sim_mat=True,
remove_nans=remove_nans)
corr = vdict['corr']
shift = vdict['value']
sim_mat = vdict['sim_mat']
else:
dtdict = velocity_change_estimate(
mat, tw, ref_shift, shifts, sides='both', return_sim_mat=True,
remove_nans=remove_nans)
corr = dtdict['corr']
shift = dtdict['value']
sim_mat = dtdict['sim_mat']
# create the result dictionary
dt = {'corr': np.squeeze(corr),
'value': np.squeeze(shift),
'second_axis': shifts,
'value_type': np.array(['shift']),
'method': np.array(['time_shift'])}
if return_sim_mat:
dt.update({'sim_mat': np.squeeze(sim_mat)})
return dt
[docs]def time_shift_apply(
corr_data: np.ndarray, shift: np.ndarray,
single_sided: bool = False) -> np.ndarray:
"""
Correct for clock drifts that are given in shifts.
:param corr_data: Matrix with one correlation trace per row
:type corr_data: np.ndarray
:param shift: 1 or 2D array holding shifts in number of samples compared to
a reference that is assumed to be accurate. Hence, the input CorrStream
will be shifted by -shift.
If 1D: each correlation trace will be shifted by a scalar value.
If 2D: Shape has to be equal to corr_data. The clock drift varies
in the CorrTrace (i..e, shift is not constant anymore).
:type shift: np.ndarray
:param single_sided: corr_data contains only causal side, defaults to False
:type single_sided: bool, optional
:return: The shifted correlation matrix
:rtype: np.ndarray
"""
# Mat must be a 2d vector in every case so
mat = np.atleast_2d(corr_data)
# shift input
# stretch is just a 1d array
if len(shift.shape) == 1:
t_shift = np.zeros([shift.shape[0], 1])
t_shift[:, 0] = shift
shift = t_shift
# shift has the wrong length
elif shift.shape[0] != mat.shape[0]:
raise ValueError('shift.shape[0] must be equal corr_data.shape[0]')
# if there is a significant clock drift during a correlation
if shift.shape[1] > 1 and shift.shape[1] != mat.shape[1]:
raise ValueError('shift.shape[1] must be equal corr_data.shape[1]')
# time axis
if single_sided:
time_idx = np.arange(mat.shape[1])
else:
time_idx = np.arange(mat.shape[1]) - (mat.shape[1] - 1.) / 2.
# allocate space for the result
shifted_mat = np.zeros_like(mat)
# stretch every line
for ii, (ctr, delta) in enumerate(zip(mat, shift)):
s = interp1d(
time_idx, ctr, kind='linear', bounds_error=False, fill_value=0)
shifted_mat[ii, :] = s(time_idx - delta)
return shifted_mat
[docs]def time_stretch_apply(
corr_data: np.ndarray, stretch: np.ndarray,
single_sided: bool = False) -> np.ndarray:
""" Apply time axis stretch to traces.
Stretch the time axis of traces e.g. to compensate a velocity shift in the
propagation medium.
Such shifts can occur in corrlation traces in case of a drifting clock.
This function applies the stretches. To correct for stretching
estimated with :func:`~seismic.monitor.stretch_mod.time_stretch_estimate`
you need to apply negative stretching.
:type corr_data: :class:`~numpy.ndarray`
:param corr_data: 2d ndarray containing the correlation functions that are
to be shifted.
One for each row.
:type stretch: :class:`~numpy.ndarray`
:param stretch: ndarray with stretch.shape[0] = corr_data.shape[0]
containing the stretches relative units.
:rtype: :class:`~numpy.ndarray`
:return: **stretched_mat**: stretched version of the input matrix
"""
# Mat must be a 2d vector in every case so
mat = np.atleast_2d(corr_data)
# check input
# stretch is just a 1d array
if len(stretch.shape) == 1:
t_stretch = np.zeros([stretch.shape[0], 1])
t_stretch[:, 0] = stretch
stretch = t_stretch
# stretch has the wrong length
elif stretch.shape[0] != mat.shape[0]:
raise ValueError('stretch.shape[0] must be equal corr_data.shape[0]')
# shift has multiple columns (multiple measurements for the same time)
if stretch.shape[1] > 1:
stretch = np.delete(stretch, np.arange(1, stretch.shape[1]), axis=1)
# taper and extend the reference trace to avoid interpolation
# artefacts at the ends of the trace
taper = cosine_taper(mat.shape[1], 0.05)
mat *= np.tile(taper, [mat.shape[0], 1])
# time axis
if single_sided:
time_idx = np.arange(mat.shape[1])
else:
time_idx = np.arange(mat.shape[1]) - (mat.shape[1] - 1.) / 2.
# allocate space for the result
stretched_mat = np.zeros_like(mat)
# stretch every line
for (ii, line) in enumerate(mat):
s = UnivariateSpline(time_idx, line, s=0)
stretched_mat[ii, :] = s(time_idx * (1 + stretch[ii]))
stretched_mat[ii, :] = s(time_idx * np.exp(stretch[ii]))
return stretched_mat
[docs]def create_shifted_ref_mat(
ref_trc: np.ndarray, stats: CorrStats,
shifts: np.ndarray) -> np.array:
"""
Create a matrix of shifted versions of a reference trace.
"""
assert len(ref_trc.shape) == 1 or ref_trc.shape[0] == 1, \
f"'ref_trc' must be a 1-dimensional array, has shape {ref_trc.shape}."
# squeeze the trace to 1D array
ref_trc = np.squeeze(ref_trc)
# allocate space for the shifted traces
ref_mat = np.zeros((len(shifts), len(ref_trc)))
# create the time vector
times = np.linspace(stats.start_lag, stats.end_lag, stats.npts)
# create the spline
s = UnivariateSpline(times, ref_trc, s=0)
for ii, shift in enumerate(shifts):
ref_mat[ii, :] = s(times + shift)
return ref_mat
[docs]def compare_with_modified_reference(
data: np.ndarray, ref_mat: np.ndarray,
indices: np.ndarray) -> np.ndarray:
"""
Compare a correlation matrix with a modified references.
"""
# create mask for time window
mat_mask = np.zeros_like(data)
mat_mask[:, indices] = 1
ref_mask = np.zeros_like(ref_mat)
ref_mask[:, indices] = 1
# mask array
first = data[:, indices]
second = ref_mat[:, indices]
# correlation via dot product
dprod = np.dot(first, second.T)
# Normalization
f_sq = np.sum(first ** 2, axis=1)
s_sq = np.sum(second ** 2, axis=1)
f_sq = f_sq.reshape(1, len(f_sq))
s_sq = s_sq.reshape(1, len(s_sq))
den = np.sqrt(np.dot(f_sq.T, s_sq))
# apply normalization
sim_mat = dprod / den
return sim_mat