# -*- coding: utf-8 -*-
"""
pyOMA - A toolbox for Operational Modal Analysis
Copyright (C) 2015 - 2025 Simon Marwitz, Volkmar Zabel, Andrei Udrea et al.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Created on Thu Oct 16 14:41:56 2014
@author: volkmar
"""
import numpy as np
import scipy.linalg
import os
from .Helpers import lq_decomp, validate_array, simplePbar
from .PreProcessingTools import PreProcessSignals
from .ModalBase import ModalBase
import logging
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
'''
.. TODO::
* define unit tests to check functionality after changes
* add switch to keep synthesized time-histories
'''
[docs]
class SSIDataMC(ModalBase):
[docs]
def __init__(self, *args, **kwargs):
'''
channel definition: channels start at 0
'''
super().__init__(*args, **kwargs)
self.state = [False, False, False, False]
self.num_block_rows = None
self.num_blocks = 1
self.P_i_ref = None
self.P_i_1 = None
self.Y_i_i = None
self.S = None
self.U = None
self.V_T = None
self.max_model_order = None
self.modal_contributions = None
[docs]
@classmethod
def init_from_config(cls, conf_file, prep_signals):
assert os.path.exists(conf_file)
assert isinstance(prep_signals, PreProcessSignals)
with open(conf_file, 'r') as f:
assert f.__next__().strip('\n').strip(' ') == 'Number of Block-Columns:'
num_block_rows = int(f. __next__().strip('\n'))
assert f.__next__().strip('\n').strip(' ') == 'Maximum Model Order:'
max_model_order = int(f. __next__().strip('\n'))
ssi_object = cls(prep_signals)
ssi_object.build_block_hankel(num_block_rows)
ssi_object.compute_modal_params(max_model_order)
return ssi_object
[docs]
def build_block_hankel(self, num_block_rows=None, reduced_projection=True):
'''
Builds a Block-Hankel Matrix of the measured time series with varying
time lags and estimates the subspace matrix from its LQ decomposition.
::
<- num_time samples - num_block_rows-> _
[ y_0 y_1 ... y_(j-1) ]^
[ y_1 y_2 ... y_j ]num_block_rows (=i)*n_l
[ ... ... ... ... ]v
[ y_(2i-1) y_(2i) ... y_(2i+j-2) ]_
The notation mostly follows Peeters 1999.
Parameters
-------
num_block_rows: integer, required
The number of block rows of the Subspace matrix
'''
if num_block_rows is None:
num_block_rows = self.num_block_rows
assert isinstance(num_block_rows, int)
self.num_block_rows = num_block_rows
signals = self.prep_signals.signals
total_time_steps = self.prep_signals.total_time_steps
ref_channels = sorted(self.prep_signals.ref_channels)
n_l = self.prep_signals.num_analised_channels
n_r = self.prep_signals.num_ref_channels
q = num_block_rows
p = num_block_rows
N = int(total_time_steps - 2 * p)
logger.info(f'Building Block-Hankel matrix with {p} block-columns and {q} block rows')
Y_minus = np.zeros((q * n_r, N))
Y_plus = np.zeros(((p + 1) * n_l, N))
for ii in range(q):
Y_minus[(q - ii - 1) * n_r:(q - ii) * n_r,:] = signals[(ii):(ii + N), ref_channels].T
for ii in range(p + 1):
Y_plus[ii * n_l:(ii + 1) * n_l,:] = signals[(q + ii):(q + ii + N)].T
Hankel_matrix = np.vstack((Y_minus, Y_plus))
Hankel_matrix /= np.sqrt(N)
logger.debug(Hankel_matrix.shape)
# self.Hankel_matrix = Hankel_matrix
l, q = lq_decomp(Hankel_matrix, mode='full')
logger.info('Estimating subspace matrix...')
a = n_r * p
b = n_r
c = n_l - n_r
d = n_l * p
P_i_ref = l[a:a + b + c + d,: a] @ q[:a,:]
if reduced_projection:
[U, S, V_T] = np.linalg.svd(l[a:a + b + c + d,: a], full_matrices=False)
V_T = None
else:
[U, S, V_T] = np.linalg.svd(P_i_ref, full_matrices=False)
P_i_1 = l[a + b + c:a + b + c + d,: a + b] @ q[: a + b,: ]
# Y_i_i = l[a : a + b + c, : a + b + c] @ q[ : a + b + c, : ]
Y_i_i = Hankel_matrix[a: a + b + c,: ]
# print(np.sum(np.abs(Y_i_i-l[a : a + b + c, : a + b + c] @ q[ : a + b + c, : ])))
self.L_red = l[a:,:a + b]
self.Q_red = q[:a + b,:]
self.P_i_1 = P_i_1
self.P_i_ref = P_i_ref
self.Y_i_i = Y_i_i
self.S = S
self.U = U
self.V_T = V_T
# self.max_model_order = self.S.shape[0]
self.state[0] = True
[docs]
def compute_modal_params(self, max_model_order=None,
j=None, validation_blocks=None,
synth_sig=True):
'''
Perform a multi-order computation of modal parameters. Successively
calls
* estimate_state(order,)
* modal_analysis(A,C)
* synthesize_signals(A, C, Q, R, S, j)
at ascending model orders, up to max_model_order.
See the explanations in the the respective methods, for a detailed
explanation of parameters.
Parameters
----------
max_model_order: integer, optional
Maximum model order, where to interrupt the algorithm. If not given,
it is determined from the previously computed subspace matrix.
'''
assert self.state[0]
if max_model_order is not None:
assert isinstance(max_model_order, int)
else:
max_model_order = self.self.S.shape[0]
assert max_model_order <= self.S.shape[0]
# num_block_rows = self.num_block_rows
num_analised_channels = self.prep_signals.num_analised_channels
# num_ref_channels = self.prep_signals.num_ref_channels
# sampling_rate = self.prep_signals.sampling_rate
if j is None and validation_blocks is None:
j = self.prep_signals.total_time_steps
elif validation_blocks is None:
assert j <= self.prep_signals.signals.shape[0]
logger.info('Computing modal parameters...')
eigenvalues = np.zeros(
(max_model_order, max_model_order), dtype=np.complex128)
modal_frequencies = np.zeros((max_model_order, max_model_order))
modal_damping = np.zeros((max_model_order, max_model_order))
mode_shapes = np.zeros(
(num_analised_channels,
max_model_order,
max_model_order),
dtype=complex)
if synth_sig:
modal_contributions = np.zeros((max_model_order, max_model_order))
else:
modal_contributions = None
pbar = simplePbar(max_model_order - 1)
for order in range(1, max_model_order):
next(pbar)
A, C, Q, R, S = self.estimate_state(order)
f, d, phi, lamda, = self.modal_analysis(A, C)
modal_frequencies[order,:order] = f
modal_damping[order,:order] = d
mode_shapes[:phi.shape[0],:order, order] = phi
eigenvalues[order,:order] = lamda
if synth_sig:
_, delta = self.synthesize_signals(A, C, Q, R, S, j=j, validation_blocks=validation_blocks)
modal_contributions[order,:order] = delta
self.max_model_order = max_model_order
self.modal_contributions = modal_contributions
self.modal_frequencies = modal_frequencies
self.modal_damping = modal_damping
self.mode_shapes = mode_shapes
self.eigenvalues = eigenvalues
self.state[2] = True
[docs]
def estimate_state(self, order,):
'''
Estimate the state matrices A, C and noise covariances Q, R and S from
the subspace / projection matrix. Several methods exist, e.g.
* Peeters 1999 Reference Based Stochastic Subspace Identification for OMA
* DeCock 2007 Subspace Identification Methods
* the algorithm used in BRSSICovRef.
Here, the first algorithm, a residual-based computation of Q, R and S,
is implemented.
Parameters
----------
order: integer, required
The model order, at which to truncate the singular values of the
projection Matrix P_i_ref
Returns
-------
A: numpy.ndarray
State matrix: Array of shape (order, order)
C: numpy.ndarray
Output matrix: Array of shape (num_analised_channels, order)
Q: numpy.ndarray
state noise covariance matrix: Symmetric array of shape (order, order)
R: numpy.ndarray
signal noise covariance matrix: Array of shape (num_analised_channels, num_analised_channels)
S: numpy.ndarray
system noise - signal noise covariance matrix: Array of shape (order, num_analised_channels)
'''
num_block_rows = self.num_block_rows
num_analised_channels = self.prep_signals.num_analised_channels
# num_ref_channels = self.prep_signals.num_ref_channels
U = self.U[:,:order]
S = self.S[:order]
P_i_1 = self.P_i_1 # n_l * p x n_r * (p + 1) x N
Y_i_i = self.Y_i_i # n_l x n_r *p + n_l x N
# compute state-space model
S_2 = np.power(S, 0.5)
O = U * S_2[np.newaxis,:]
O_i_1 = O[:num_analised_channels * num_block_rows,:order]
O_i = O[:,:order]
if self.V_T is not None:
V_T = self.V_T[:order,:]
X_i = S_2[:, np.newaxis] * V_T
else:
P_i_ref = self.P_i_ref # (p + 1) * n_l x n_r * p x N
X_i = np.linalg.pinv(O_i) @ P_i_ref
X_i_1 = np.linalg.pinv(O_i_1) @ P_i_1
X_i_1_Y_i = np.vstack((X_i_1, Y_i_i))
AC = X_i_1_Y_i @ np.linalg.pinv(X_i)
A = AC[:order,:]
C = AC[order:,:]
roh_w_v = X_i_1_Y_i - AC @ X_i
QSR = roh_w_v @ roh_w_v.T
Q = QSR[:order,:order]
S = QSR[:order, order:order + num_analised_channels]
R = QSR[order:order + num_analised_channels,
order:order + num_analised_channels]
return A, C, Q, R, S
[docs]
def modal_analysis(self, A, C, rescale_fun=None):
'''
Computes the modal parameters from a given state space model as described
by Peeters 1999 and Döhler 2012. Mode shapes are scaled to unit modal
displacements. Complex conjugate and real modes are removed prior to
further processing. Typically, order // 2 modes are in the returned arrays.
Parameters
----------
A: numpy.ndarray
State matrix: Array of shape (order, order)
C: numpy.ndarray
Output matrix: Array of shape (num_analised_channels, order)
Returns
-------
modal_frequencies: numpy.ndarray, shape (order,)
Array holding the modal frequencies for each mode
modal_damping: numpy.ndarray, shape (order,)
Array holding the modal damping ratios (0,100) for each mode
mode_shapes: numpy.ndarray, shape (n_l, order,)
Complex array holding the mode shapes
eigenvalues: numpy.ndarray, shape (order,)
Complex array holding the eigenvalues for each mode
'''
# collect variables
accel_channels = self.prep_signals.accel_channels
velo_channels = self.prep_signals.velo_channels
sampling_rate = self.prep_signals.sampling_rate
n_l = self.num_analised_channels
order = A.shape[0]
assert order == A.shape[1]
# allocate output arrays
modal_frequencies = np.full((order), np.nan)
modal_damping = np.full((order), np.nan)
mode_shapes = np.full((n_l, order), np.nan, dtype=complex)
eigenvalues = np.full((order), np.nan, dtype=complex)
# compute modal model
eigvals, eigvecs_r = np.linalg.eig(A)
Phi = C.dot(eigvecs_r)
conj_indices = self.remove_conjugates(eigvals, eigvecs_r, inds_only=True)
for i, ind in enumerate(conj_indices):
lambda_i = eigvals[ind]
mode_shape_i = Phi[:, ind]
a_i = np.abs(np.arctan2(np.imag(lambda_i), np.real(lambda_i)))
b_i = np.log(np.abs(lambda_i))
freq_i = np.sqrt(a_i ** 2 + b_i ** 2) * sampling_rate / 2 / np.pi
damping_i = 100 * np.abs(b_i) / np.sqrt(a_i ** 2 + b_i ** 2)
if rescale_fun is not None:
mode_shape_i = rescale_fun(mode_shape_i)
# scale modeshapes to modal displacements
mode_shape_i = self.integrate_quantities(
mode_shape_i, accel_channels, velo_channels, freq_i * 2 * np.pi)
# rotate mode shape in complex plane
mode_shape_i = self.rescale_mode_shape(mode_shape_i)
modal_frequencies[i] = freq_i
modal_damping[i] = damping_i
mode_shapes[:mode_shape_i.shape[0], i] = mode_shape_i
eigenvalues[i] = lambda_i
argsort = np.argsort(modal_frequencies)
return modal_frequencies[argsort], modal_damping[argsort], mode_shapes[:, argsort], eigenvalues[argsort],
[docs]
def synthesize_signals(self, A, C, Q, R, S, j=None, **kwargs):
'''
Computes the modal response signals and the contribution of each mode.
The algorithm follows Peeters 1999 and the Lyapunov equation is solved
as a discrete-time algebraic Riccati equation (DARE). For long signals,
the computation may become time-consuming, thus only time steps up to j
may be used to synthesize the signal.
Parameters
----------
A: numpy.ndarray
State matrix: Array of shape (order, order)
C: numpy.ndarray
Output matrix: Array of shape (num_analised_channels, order)
Q: numpy.ndarray
state noise covariance matrix: Symmetric array of shape (order, order)
R: numpy.ndarray
signal noise covariance matrix: Array of shape (num_analised_channels, num_analised_channels)
S: numpy.ndarray
system noise - signal noise covariance matrix: Array of shape (order, num_analised_channels)
j: integer, optional
length of signal to synthesize (number of timesteps)
Returns
-------
sig_synth: numpy.ndarray, shape (num_analised_channels, j, order // 2)
Array holding the modally decomposed input signals for
each channel n_l and all modes
modal_contributions: numpy.ndarray, shape (order, )
Array holding the contributions of each mode to the input
signals.
'''
order = A.shape[0]
assert order == A.shape[1]
if j is None:
j = self.prep_signals.total_time_steps
signals = self.prep_signals.signals[:j,:]
n_l = self.prep_signals.num_analised_channels
modal_contributions = np.zeros((order))
sig_synth = np.zeros((n_l, j, order // 2))
try:
P = scipy.linalg.solve_discrete_are(
a=A.T, b=C.T, q=Q, r=R, s=S, balanced=True)
except:
logger.warning('Correlations of residuals are not symmetric. Skiping Modal Contributions')
return sig_synth, modal_contributions
APCS = A @ P @ C.T + S
CPCR = C @ P @ C.T + R
K = np.linalg.solve(CPCR.T, APCS.T,).T
eigvals, eigvecs_r = np.linalg.eig(A)
conj_indices = self.remove_conjugates(eigvals, eigvecs_r, inds_only=True)
A_0 = np.diag(eigvals)
C_0 = C @ eigvecs_r
K_0 = np.linalg.solve(eigvecs_r, K)
states = np.zeros((order, j), dtype=complex)
AKC = A_0 - K_0 @ C_0
K_0m = K_0 @ signals.T
for k in range(j - 1):
states[:, k + 1] = K_0m[:, k] + AKC @ states[:, k]
Y = signals.T
# norm = 1 / np.einsum('ji,ji->j', Y, Y)
Sigma_data = np.einsum('ji,ji->j', Y, Y)
Sigma_data_synth = np.zeros((n_l, order))
for i, ind in enumerate(conj_indices):
lambda_i = eigvals[ind]
ident = eigvals == lambda_i.conj()
ident[ind] = 1
C_0I = C_0[:, ident]
this_sig_synth = C_0I @ states[ident,:]
if not np.all(np.isclose(this_sig_synth.imag, 0)):
logger.warning(f'Synthetized signals are complex at mode index {order}:{ind}.')
sig_synth[:,:, i] = this_sig_synth.real
mYT = np.einsum('ji,ji->j', sig_synth[:,:, i], Y)
Sigma_data_synth[:, i] = mYT
# modal_contributions[i] = np.mean(norm * mYT)
total_sig_synth = np.sum(sig_synth, axis=-1) # shape n_l, N
Sigma_synth = np.einsum('ji,ji->j', total_sig_synth, total_sig_synth)
modal_contributions[:] = np.mean(Sigma_data_synth / np.sqrt(Sigma_data * Sigma_synth)[:, np.newaxis], axis=0)
self._sig_synth = sig_synth
self._modal_conributions = modal_contributions
return sig_synth, modal_contributions
[docs]
def save_state(self, fname):
logger.info('Saving results to {}...'.format(fname))
dirname, _ = os.path.split(fname)
if not os.path.isdir(dirname):
os.makedirs(dirname)
out_dict = {'self.state': self.state}
out_dict['self.setup_name'] = self.setup_name
out_dict['self.start_time'] = self.start_time
if self.state[0]: # subspace matrix
out_dict['self.num_block_rows'] = self.num_block_rows
out_dict['self.num_blocks'] = self.num_blocks
# out_dict['self.P_i_1'] = self.P_i_1
# out_dict['self.P_i_ref'] = self.P_i_ref
out_dict['self.L_red'] = self.L_red
out_dict['self.Q_red'] = self.Q_red
out_dict['self.Y_i_i'] = self.Y_i_i
out_dict['self.S'] = self.S
out_dict['self.U'] = self.U
out_dict['self.V_T'] = self.V_T
if self.state[2]: # modal params
out_dict['self.modal_frequencies'] = self.modal_frequencies
out_dict['self.modal_damping'] = self.modal_damping
out_dict['self.eigenvalues'] = self.eigenvalues
out_dict['self.mode_shapes'] = self.mode_shapes
out_dict['self.modal_contributions'] = self.modal_contributions
out_dict['self.max_model_order'] = self.max_model_order
np.savez_compressed(fname, **out_dict)
[docs]
@classmethod
def load_state(cls, fname, prep_signals):
logger.info('Loading results from {}'.format(fname))
in_dict = np.load(fname, allow_pickle=True)
if 'self.state' in in_dict:
state = list(in_dict['self.state'])
else:
return
assert isinstance(prep_signals, PreProcessSignals)
setup_name = str(in_dict['self.setup_name'].item())
assert setup_name == prep_signals.setup_name
start_time = prep_signals.start_time
assert start_time == prep_signals.start_time
# prep_signals = in_dict['self.prep_signals'].item()
ssi_object = cls(prep_signals)
if state[0]: # subspace matrix
ssi_object.num_block_rows = validate_array(in_dict['self.num_block_rows'])
ssi_object.num_blocks = validate_array(in_dict['self.num_blocks'])
ssi_object.L_red = validate_array(in_dict['self.L_red'])
ssi_object.Q_red = validate_array(in_dict['self.Q_red'])
# rebuild projections from reduced L and Q matrices
# saves ~ 95 % storage (Xe2 MB) compared to storing projections directly
a = prep_signals.num_ref_channels * ssi_object.num_block_rows
b = prep_signals.num_ref_channels
c = prep_signals.num_analised_channels - prep_signals.num_ref_channels
d = prep_signals.num_analised_channels * ssi_object.num_block_rows
ssi_object.P_i_ref = ssi_object.L_red[:b + c + d,: a] @ ssi_object.Q_red[:a,:]
ssi_object.P_i_1 = ssi_object.L_red[b + c:b + c + d,: ] @ ssi_object.Q_red[:,: ]
ssi_object.Y_i_i = validate_array(in_dict['self.Y_i_i'])
ssi_object.S = validate_array(in_dict['self.S'])
ssi_object.U = validate_array(in_dict['self.U'])
ssi_object.V_T = validate_array(in_dict['self.V_T'])
ssi_object.S = validate_array(in_dict['self.S'])
ssi_object.U = validate_array(in_dict['self.U'])
ssi_object.V_T = validate_array(in_dict['self.V_T'])
if state[2]: # modal params
ssi_object.modal_frequencies = validate_array(in_dict['self.modal_frequencies'])
ssi_object.modal_damping = validate_array(in_dict['self.modal_damping'])
ssi_object.eigenvalues = validate_array(in_dict['self.eigenvalues'])
ssi_object.mode_shapes = validate_array(in_dict['self.mode_shapes'])
ssi_object.modal_contributions = validate_array(in_dict['self.modal_contributions'])
ssi_object.max_model_order = validate_array(in_dict['self.max_model_order'])
ssi_object.state = state
return ssi_object
[docs]
class SSIData(SSIDataMC):
[docs]
def compute_modal_params(self, max_model_order):
'''
Perform a multi-order computation of modal parameters. Successively
calls
* estimate_state(order,)
* modal_analysis(A,C)
at ascending model orders, up to max_model_order.
See the explanations in the the respective methods, for a detailed
explanation of parameters.
Parameters
----------
max_model_order: integer, optional
Maximum model order, where to interrupt the algorithm. If not given,
it is determined from the previously computed subspace matrix.
'''
super().compute_modal_params(max_model_order, synth_sig=False)
[docs]
def estimate_state(self, order, max_modes=None, algo='svd'):
'''
Compute the state matrix A and output matrix C from the singular values
and vectors of the projection matrix, truncated at the requested order. Estimation of the
state matrix can be performed by QR decomposition or Singular Value decomposition
of the shifted observability matrix. If max_modes is specified, the singular
value decomposition is truncated additionally, also known as Crystal Clear SSI.
Parameters
----------
order: integer, required
Model order, at which the state matrices should be estimated
max_modes: integer, optional
Maximum number of modes, that are known to be present in the signal,
to suppress noise modes
algo: str, optional
Algorithm to use for estimation of A. Either 'svd' or 'qr'.
Returns
-------
A: numpy.ndarray
State matrix: Array of shape (order, order)
C: numpy.ndarray
Output matrix: Array of shape (num_analised_channels, order)
'''
if order > self.S.shape[0]:
raise RuntimeError(f'Order cannot be higher than {self.S.shape[0]}. Consider using more block_rows/block_columns.')
assert algo in ['svd', 'qr']
n_l = self.num_analised_channels
num_block_rows = self.num_block_rows
U = self.U[:,:order]
S = self.S[:order]
# compute state-space model
S_2 = np.power(S, 0.5)
O = U * S_2[np.newaxis,:]
On_up = O[:n_l * num_block_rows,:order]
On_down = O[n_l:n_l * (num_block_rows + 1),:order]
if algo == 'svd':
if max_modes is not None:
[u, s, v_t] = np.linalg.svd(On_up, 0)
s = 1. / s[:max_modes]
# On_up_i = np.dot(np.transpose(v_t[:max_modes, :]), np.multiply(
# s[:, np.newaxis], np.transpose(u[:, :max_modes])))
On_up_i = v_t[:max_modes,:].T @ (s[:, np.newaxis] * u[:,:max_modes].T)
else:
On_up_i = np.linalg.pinv(On_up) # , rcond=1e-12)
A = On_up_i @ On_down
elif algo == 'qr':
Q, R = np.linalg.qr(On_up)
S = Q.T.dot(On_down)
A = np.linalg.solve(R, S)
C = O[:n_l,:order] # output matrix
return A, C, None, None, None
[docs]
class SSIDataCV(SSIDataMC):
[docs]
def build_block_hankel(self, num_block_rows=None, num_blocks=1, training_blocks=None, reduced_projection=True):
'''
Builds serveral Block-Hankel Matrices of the measured time series with varying
time lags and estimates the subspace matrices from their LQ decompositions.
Uniqueness of the subspace estimates is ensured by an intermediate LQ
decomposition, where the diagonals of the L matrices are constrained to
positive values. A subspace matrix estimate is computed by the mean over
the training blocks leaving any remainig blocks for validation.
Note: Blocks are not completely i.i.d. as we borrow p+q timesteps from the
previous block for the projection of a full block (assembly of Hankel matrix)
.. TODO::
* investigate correct scaling of the subspace matrices
[sqrt(N_b), sqrt(N_b * num_blocks), sqrt(N_b*n_training_blocks)] ?
* use sparse SVD (scipy.sparse.svds) to save memory and cpu time
Parameters
-------
num_block_rows: integer, required
The number of block rows of the Subspace matrix
num_blocks: integer, optional
The number of blocks, used for cross-validation
training_blocks: list, optional
The selected blocks to use for system identification (=training)
'''
if num_block_rows is None:
num_block_rows = self.num_block_rows
assert isinstance(num_block_rows, int)
assert isinstance(num_blocks, int)
if training_blocks is None:
training_blocks = np.arange(num_blocks)
elif isinstance(training_blocks, (list, tuple)):
training_blocks = np.array(training_blocks)
elif not isinstance(training_blocks, np.ndarray):
raise RuntimeError(f"Argument 'training_blocks' must be an iterable but is type {type(training_blocks)}")
assert training_blocks.max() < num_blocks
n_training_blocks = training_blocks.shape[0]
self.num_block_rows = num_block_rows
self.num_blocks = num_blocks
signals = self.prep_signals.signals
total_time_steps = self.prep_signals.total_time_steps
ref_channels = sorted(self.prep_signals.ref_channels)
n_l = self.prep_signals.num_analised_channels
n_r = self.prep_signals.num_ref_channels
q = num_block_rows
p = num_block_rows
logger.info(f'Building Block-Hankel matrix from {n_training_blocks} out of {num_blocks} signal blocks with {p} block-columns and {q} block rows.')
N_b = int(np.floor((total_time_steps - q - p) / num_blocks))
if N_b < n_r * q:
raise RuntimeError(f'Block-length ({N_b}) must not be smaller than the number of reference channels * number of block rows (={n_r * q}).')
# might omit some timesteps in favor of equally sized blocks
N = N_b * num_blocks
# shorten signals by omitted samples to have them available for Kalman-Filter startup later
N_offset = total_time_steps - q - p - N
signals = signals[N_offset:,:]
K = min((q * n_r) + (p + 1) * n_l, N_b)
K2 = min(((q * n_r) + (p + 1) * n_l, K * n_training_blocks))
Y_minus = np.zeros((q * n_r, N))
Y_plus = np.zeros(((p + 1) * n_l, N))
for ii in range(q):
Y_minus[(q - ii - 1) * n_r:(q - ii) * n_r,:] = signals[ii:ii + N, ref_channels].T
for ii in range(p + 1):
Y_plus[ii * n_l:(ii + 1) * n_l,:] = signals[q + ii:q + ii + N,:].T
Hankel_matrix = np.vstack((Y_minus, Y_plus))
Hankel_matrix /= np.sqrt(N)
logger.debug(Hankel_matrix.shape)
hankel_matrices = np.hsplit(Hankel_matrix, np.arange(N_b, N_b * num_blocks, N_b))
np.hstack([hankel_matrices[i_block] for i_block in training_blocks])
# R_matrices = []
R_matrices = np.empty((n_r * q + n_l * (p + 1), K * n_training_blocks))
Q_matrices = []
# R_unique_matrices = []
# Q_unique_matrices = []
Q_unique_matrices = np.empty((K2, N))
pbar = simplePbar(n_training_blocks * 3)
for i in range(n_training_blocks):
i_block = training_blocks[i]
next(pbar)
L, Q = lq_decomp(hankel_matrices[i_block], mode='reduced', unique=True)
# R_matrices.append(L)
R_matrices[:, i * K:(i + 1) * K] = L
Q_matrices.append(Q)
logger.debug(f'R shapes: actual: {R_matrices.shape} expected: {(n_r * p + n_l * (p + 1), K* n_training_blocks)}')
# R_full_breve, Q_full_breve = lq_decomp(np.hstack(R_matrices), mode='reduced', unique=True)
R_full_breve, Q_full_breve = lq_decomp(R_matrices, mode='reduced', unique=True)
[next(pbar) for _ in range(n_training_blocks)]
del R_matrices
logger.debug(f'Q_breve shapes: actual: {Q_full_breve.shape} expected: ,{(K2, K * n_training_blocks)}')
Q_breve_matrices = np.hsplit(Q_full_breve, np.arange(K, n_training_blocks * K, K)) # list of views into Q_full_breve
logger.debug(f'Q_breve_j shapes: actual: {Q_breve_matrices[0].shape}, expected: {(K2, K)}')
del Q_full_breve
for i in range(n_training_blocks):
next(pbar)
Q_breve_matrix = Q_breve_matrices[i]
# R_matrix = R_matrices[i]
# R_matrix = R_matrices[:, i * K:(i + 1) * K]
Q_matrix = Q_matrices[i]
# R_unique_matrices.append(R_matrix @ Q_breve_matrix.T)
# Q_unique_matrices.append(Q_breve_matrix @ Q_matrix) # (q * n_r + (p + 1) * n_l, N_b)
Q_unique_matrices[:, i * N_b: (i + 1) * N_b ] = Q_breve_matrix @ Q_matrix
del Q_breve_matrix
del Q_breve_matrices
del Q_matrix
del Q_matrices
logger.info('Estimating subspace matrix...')
# L, Q = R_full_breve, np.concatenate(Q_unique_matrices, axis=1)
L, Q = R_full_breve, Q_unique_matrices
a = n_r * q
b = n_r
c = n_l - n_r
d = n_l * p
P_i_ref = L[a:a + b + c + d,: a] @ Q[:a,:] # (p + 1) * n_l x n_r * q x N
if reduced_projection:
[U, S, V_T] = np.linalg.svd(L[a:a + b + c + d,: a], full_matrices=False)
V_T = None
else:
[U, S, V_T] = np.linalg.svd(P_i_ref, full_matrices=False)
P_i_1 = L[a + b + c:a + b + c + d,: a + b] @ Q[: a + b,: ] # n_l * p x n_r * (q + 1) x N
Y_i_i = L[a: a + b + c,: a + b + c] @ Q[: a + b + c,: ] # n_l x n_r *q + n_l x N
self.L_red = L[a:,:a + b] # (p + 1) n_l x n_r * (q + 1)
self.Q_red = Q[:a + b,:] # n_r * (q + 1) x N
self.P_i_1 = P_i_1
self.P_i_ref = P_i_ref
self.Y_i_i = Y_i_i
self.S = S
self.U = U
self.V_T = V_T
# self.max_model_order = self.S.shape[0]
self.state[0] = True
[docs]
def synthesize_signals(self, A, C, Q, R, S, validation_blocks=None, N_offset=None, **kwargs):
'''
Computes the modal response signals and the contribution of each mode.
The algorithm follows Peeters 1999 and the Lyapunov equation is solved
as a discrete-time algebraic Riccati equation (DARE). For long signals,
the computation may become time-consuming, thus only time steps up to j
may be used to synthesize the signal.
Parameters
----------
A: numpy.ndarray
State matrix: Array of shape (order, order)
C: numpy.ndarray
Output matrix: Array of shape (num_analised_channels, order)
Q: numpy.ndarray
state noise covariance matrix: Symmetric array of shape (order, order)
R: numpy.ndarray
signal noise covariance matrix: Array of shape (num_analised_channels, num_analised_channels)
S: numpy.ndarray
system noise - signal noise covariance matrix: Array of shape (order, num_analised_channels)
validation_blocks: list, optional
The selected blocks to be synthethized and used for system validation.
N_offset: integer, optional
The number of samples to be used from any previous block for
Kalman-Filter startup.
Returns
-------
sig_synth: numpy.ndarray, shape (num_analised_channels, j, order // 2)
Array holding the modally decomposed input signals for
each channel n_l and all modes
modal_contributions: numpy.ndarray, shape (order, )
Array holding the contributions of each mode to the input
signals.
'''
order = A.shape[0]
assert order == A.shape[1]
num_blocks = self.num_blocks
if validation_blocks is None:
validation_blocks = np.arange(num_blocks)
elif isinstance(validation_blocks, (list, tuple)):
validation_blocks = np.array(validation_blocks)
elif not isinstance(validation_blocks, np.ndarray):
raise RuntimeError(f"Argument 'validation_blocks' must be an iterable but is type {type(validation_blocks)}")
assert validation_blocks.max() < num_blocks
n_validation_blocks = validation_blocks.shape[0]
n_l = self.prep_signals.num_analised_channels
signals = self.prep_signals.signals
total_time_steps = self.prep_signals.total_time_steps
q = self.num_block_rows
p = self.num_block_rows
N_b = int(np.floor((total_time_steps - q - p) / num_blocks))
# might omit some timesteps in favor of equally sized blocks
N = N_b * num_blocks
# blocks start at N_0_offset + p + q
# (in training we virtually borrow p + q timesteps from the previous block)
N_0_offset = total_time_steps - N
if N_offset is None:
N_offset = N_b // 5
if 0 in validation_blocks and N_0_offset < N_offset:
logger.info(f"Block '0' is in the validation dataset, but only has {N_0_offset} startup-samples (recommended/chosen: {N_offset}) from any previous block for the Kalman Filter. Expect a degraded performance.")
modal_contributions = np.zeros((order))
all_sig_synth = []
try:
P = scipy.linalg.solve_discrete_are(
a=A.T, b=C.T, q=Q, r=R, s=S, balanced=True)
except Exception:
logger.warning('Correlations of residuals are not symmetric. Skiping Modal Contributions')
return all_sig_synth, modal_contributions
APCS = A @ P @ C.T + S
CPCR = C @ P @ C.T + R
K = np.linalg.solve(CPCR.T, APCS.T,).T
eigvals, eigvecs_r = np.linalg.eig(A)
conj_indices = self.remove_conjugates(eigvals, eigvecs_r, inds_only=True)
A_0 = np.diag(eigvals)
C_0 = C @ eigvecs_r
K_0 = np.linalg.solve(eigvecs_r, K)
AKC = A_0 - K_0 @ C_0
block_starts = validation_blocks * N_b + N_0_offset
start_states = [None for _ in range(num_blocks + 1)]
for i in np.argsort(validation_blocks):
i_block = validation_blocks[i]
sig_synth = np.zeros((n_l, N_b, order // 2))
start_state = start_states[i]
if i_block == 0:
_N_offset = N_0_offset
elif start_state is not None:
_N_offset = 1
else:
_N_offset = N_offset
states = np.zeros((order, N_b + _N_offset), dtype=complex)
block_start = block_starts[i] - _N_offset
block_end = block_starts[i] + N_b
signals_block = signals[block_start:block_end,:]
K_0m = K_0 @ signals_block.T
if start_state is not None:
states[:, 0] = start_state
for k in range(N_b + _N_offset - 1):
states[:, k + 1] = K_0m[:, k] + AKC @ states[:, k]
start_states[i + 1] = states[:, -1]
# remove start-up samples
states = states[:, _N_offset:]
Y = signals_block[_N_offset:,:].T
# norm = 1 / np.einsum('ji,ji->j', Y, Y)
Sigma_data = np.einsum('ji,ji->j', Y, Y)
Sigma_data_synth = np.zeros((n_l, order))
for i, ind in enumerate(conj_indices):
lambda_i = eigvals[ind]
ident = eigvals == lambda_i.conj()
ident[ind] = 1
C_0I = C_0[:, ident]
this_sig_synth = C_0I @ states[ident,:]
if not np.all(np.isclose(this_sig_synth.imag, 0)):
logger.warning(f'Synthetized signals are complex at mode index {order}:{ind}.')
sig_synth[:,:, i] = this_sig_synth.real
mYT = np.einsum('ji,ji->j', sig_synth[:,:, i], Y)
Sigma_data_synth[:, i] = mYT
# modal_contributions[i] += np.mean(norm * mYT)
total_sig_synth = np.sum(sig_synth, axis=-1) # shape n_l, N_b
Sigma_synth = np.einsum('ji,ji->j', total_sig_synth, total_sig_synth)
modal_contributions += np.mean(Sigma_data_synth / np.sqrt(Sigma_data * Sigma_synth)[:, np.newaxis], axis=0)
all_sig_synth.append(sig_synth)
modal_contributions /= n_validation_blocks
self._sig_synth = all_sig_synth
self._modal_contributions = modal_contributions
return all_sig_synth, modal_contributions
[docs]
def plot_sig_synth(modal_data, modelist=None, channel_inds=None, ref_channel_inds=None, axes=None, i_block=None):
import matplotlib.pyplot as plt
prep_signals = modal_data.prep_signals
sig_synth = modal_data._sig_synth
if isinstance(sig_synth, list): # multi-block for cross-validation SSIDataCV
sig_synth = sig_synth[i_block]
num_blocks = modal_data.num_blocks
total_time_steps = prep_signals.total_time_steps
q = modal_data.num_block_rows
p = modal_data.num_block_rows
N_b = int(np.floor((total_time_steps - q - p) / num_blocks))
N = N_b * num_blocks
N_0_offset = total_time_steps - q - p - N
# N_offset = N_b // 15
block_starts = i_block * N_b + N_0_offset + p + q
t = prep_signals.t[block_starts:block_starts + N_b]
signals = prep_signals.signals.T[:, block_starts:block_starts + N_b]
else:
t = prep_signals.t
signals = prep_signals.signals.T
# ref_channels = modal_data.prep_signals.ref_channels
sampling_rate = modal_data.prep_signals.sampling_rate
channel_headers = modal_data.prep_signals.channel_headers
modal_contributions = modal_data._modal_contributions
if channel_inds is None:
channel_inds = np.arange(modal_data.prep_signals.num_analised_channels)
num_channels = len(channel_inds)
if ref_channel_inds is None:
ref_channel_inds = np.arange(modal_data.prep_signals.num_ref_channels)
# num_ref_channels = len(ref_channel_inds)
# Plot signals for each mode and each channel
num_modes = sig_synth.shape[-1]
if modelist is None:
modelist = list(range(num_modes))
num_plots = len(modelist) + 2
fig1, axes = plt.subplots(num_plots, 1, sharex='col', sharey='col', squeeze=False)
for ip, i in enumerate(modelist):
rho = modal_contributions[i]
this_signals = sig_synth[:,:, i]
for j in range(num_channels):
i_l = channel_inds[j]
color = str(np.linspace(0, 1, len(channel_inds) + 2)[j + 1])
ls = 'solid'
axes[ip, 0].plot(t, this_signals[i_l,:], color=color, ls=ls)
axes[ip, 0].set_ylabel(f'$\delta_{{{i + 1}}}$={rho:1.2f}',
rotation=0, labelpad=40, va='center', ha='left')
for j in range(num_channels):
i_l = channel_inds[j]
color = str(np.linspace(0, 1, len(channel_inds) + 2)[j + 1])
ls = 'solid'
this_signals = signals[i_l,:]
this_signals_synth = np.sum(sig_synth, axis=2)[i_l,:]
axes[-1, 0].plot(t, this_signals, color=color, ls=ls,
label=f'{channel_headers[i_l]}')
axes[-2, 0].plot(t, this_signals_synth, color=color, ls=ls,)
axes[-1, 0].set_ylabel('Measured', rotation=0, labelpad=50, va='center', ha='left')
axes[-2, 0].set_ylabel(f'$\sum\delta$={np.sum(modal_contributions):1.2f}',
rotation=0, labelpad=50, va='center', ha='left')
axes[-1, 0].set_xlabel('$t$ [\si{\second}]')
for ax in axes.flat:
ax.set_yticks([])
fig1.legend(title='Channels')
# Plot power spectral density functions for each channel and all modes
fig2, axes = plt.subplots(num_channels, 1, sharex='col', sharey='col', squeeze=False)
ft_freq = np.fft.rfftfreq(len(t), d=(1 / sampling_rate))
for j in range(num_channels):
i_l = channel_inds[j]
this_signals = signals[i_l,:]
this_signals_synth = sig_synth[i_l,:,:]
ft_meas = np.fft.rfft(this_signals * np.hanning(len(t)))
if j == 0: label = f'Inp.'
else: label = None
axes[j, 0].plot(ft_freq, 10 * np.log10(np.abs(ft_meas)), ls='solid', color='k', label=label)
for ip, i in enumerate(modelist):
ft_synth = np.fft.rfft(this_signals_synth[:, i] * np.hanning(len(t)))
color = str(np.linspace(0, 1, len(modelist) + 2)[ip + 1])
ls = ['-', '--', ':', '-.'][i % 4]
if j == 0: label = f'm={i+1}'
else: label = None
axes[j, 0].plot(ft_freq, 10 * np.log10(np.abs(ft_synth)), color=color, ls=ls, label=label)
axes[j, 0].set_ylabel(f'{channel_headers[i_l]}',
rotation=0, labelpad=20, va='center', ha='center')
axes[-1, 0].set_xlabel('$f$ [\si{\hertz}]')
for ax in axes.flat:
ax.set_yticks([])
ax.set_xlim(0, 1 / 2 * sampling_rate)
fig2.legend(title='Mode')
return fig1, fig2
if __name__ == '__main__':
main()