Source code for pyOMA.core.SSICovRef

# -*- coding: utf-8 -*-
'''
pyOMA - A toolbox for Operational Modal Analysis
Copyright (C) 2015 - 2025  Simon Marwitz, Volkmar Zabel, Andrei Udrea et al.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.

Based on previous works by Andrei Udrea 2014 and Volkmar Zabel 2015
Modified and Extended by Simon Marwitz 2015-2018
'''

import os
import warnings
import copy

import numpy as np
import scipy.linalg

from .PreProcessingTools import PreProcessSignals
from .ModalBase import ModalBase
from .Helpers import validate_array, simplePbar

import logging

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)


[docs] class BRSSICovRef(ModalBase):
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 0 1 2 # self.state= [Toeplitz, State Mat., Modal Par. self.state = [False, False, False] self.num_block_columns = None self.num_block_rows = None self.U = None self.S = None self.V_T = None self.modal_contributions = None
@property def accel_channels(self): return self.prep_signals.accel_channels @property def velo_channels(self): return self.prep_signals.velo_channels
[docs] @classmethod def init_from_config(cls, conf_file, prep_signals): assert os.path.exists(conf_file) with open(conf_file, 'r') as f: assert f.__next__().strip('\n').strip(' ') == 'Number of Block-Columns:' num_block_columns = int(f. __next__().strip('\n')) assert f.__next__().strip('\n').strip(' ') == 'Maximum Model Order:' max_model_order = int(f. __next__().strip('\n')) ssi_object = cls(prep_signals) ssi_object.build_toeplitz_cov(num_block_columns) ssi_object.compute_modal_params(max_model_order) return ssi_object
[docs] def build_toeplitz_cov( self, num_block_columns=None, num_block_rows=None, shift=0): ''' Builds a Block-Toeplitz Matrix of Covariances with varying time lags and decomposes it by a Singular Value decomposition. :: <-num_block_columns * n_r -> _ [ R_m R_m-1 ... R_1 ]^ [ R_m+1 R_m ... R_2 ]num_block_rows * n_l [ ... ... ... ... ] [ R_2m-1 ... ... R_m ]v The total number of block columns and block rows should not exceed the maximum time lag of pre-computed correlation functions: num_block_columns + num_block_rows + shift < prep_signals.m_lags Parameters ---------- num_block_columns: integer, optional Number of block columns. By default, half the number of time lags are used num_block_rows: integer, optional Number of block rows. By default it is set equal to num_block_columns shift: integer, optional Allows the assembly of a shifted Block-Toeplitz matrix, s. t. the correlation function starting at shift is assembled into the block Toeplitz matrix ''' max_lags = self.prep_signals.m_lags if num_block_columns is not None: assert isinstance(num_block_columns, int) else: if max_lags is None: raise RuntimeError('Either num_block_columns, or pre-computed correlation functions must be provided.') if num_block_rows is not None: assert isinstance(num_block_rows, int) num_block_columns = max_lags - num_block_rows - shift else: num_block_columns = (max_lags - shift) // 2 if num_block_rows is None: num_block_rows = num_block_columns logger.info('Assembling Toeplitz matrix using pre-computed correlation functions' ' {} block-columns and {} block rows'.format(num_block_columns, num_block_rows + 1)) m_lags = num_block_rows + 1 + num_block_columns - 1 + shift if max_lags is None: max_lags = self.prep_signals.m_lags if max_lags is not None and max_lags < m_lags: logger.warning('The pre-computed correlation function is too short for the requested matrix dimensions.') n_l = self.num_analised_channels n_r = self.num_ref_channels corr_matrix = self.prep_signals.correlation(m_lags) Toeplitz_matrix = np.zeros((n_l * (num_block_rows + 1), n_r * num_block_columns)) for ii in range(num_block_columns): tau = num_block_columns - ii + shift this_block = corr_matrix[:,:, tau - 1] Toeplitz_matrix[:n_l, ii * n_r:(ii * n_r + n_r)] = this_block for i in range(1, num_block_rows + 1): # shift previous block row down and left previous_Toeplitz_row = (i - 1) * n_l this_block = Toeplitz_matrix[previous_Toeplitz_row:( previous_Toeplitz_row + n_l), 0:n_r * (num_block_columns - 1)] begin_Toeplitz_row = i * n_l Toeplitz_matrix[begin_Toeplitz_row:(begin_Toeplitz_row + n_l), n_r:(n_r * num_block_columns)] = this_block # fill right most block tau = num_block_columns + i + shift this_block = corr_matrix[:,:, tau - 1] Toeplitz_matrix[begin_Toeplitz_row:(begin_Toeplitz_row + n_l), 0:n_r] = this_block logger.info('Decomposing Toeplitz matrix') U, S, V_T = scipy.linalg.svd(Toeplitz_matrix, 1) self.num_block_columns = num_block_columns self.num_block_rows = num_block_rows self.U = U self.S = S self.V_T = V_T self.state[0] = True
[docs] def compute_modal_params(self, max_model_order=None, max_modes=None, algo='svd', modal_contrib=True): ''' Perform a multi-order computation of modal parameters. Successively calls * estimate_state(order, max_modes, algo) * modal_analysis(A,C) * synthesize_correlation(A,C, G), if modal_contrib == True At ascending model orders, up to max_model_order. See the explanations in the the respective methods, for a detailed explanation of parameters. Parameters ---------- max_model_order: integer, optional Maximum model order, where to interrupt the algorithm. If not given, it is min(num_channels * (num_block_rows + 1), num_reference_channels * num_block_columns) ''' if max_model_order is not None: assert isinstance(max_model_order, int) else: max_model_order = self.S.shape[0] assert max_model_order <= self.S.shape[0] num_analised_channels = self.num_analised_channels logger.info('Computing modal parameters...') modal_frequencies = np.zeros((max_model_order, max_model_order)) modal_damping = np.zeros((max_model_order, max_model_order)) mode_shapes = np.zeros((num_analised_channels, max_model_order, max_model_order), dtype=complex) eigenvalues = np.zeros((max_model_order, max_model_order), dtype=complex) if modal_contrib: modal_contributions = np.zeros((max_model_order, max_model_order)) else: modal_contributions = None pbar = simplePbar(max_model_order - 1) for order in range(1, max_model_order): next(pbar) A, C, G = self.estimate_state(order, max_modes, algo) f, d, phi, lamda, = self.modal_analysis(A, C) modal_frequencies[order,:order] = f modal_damping[order,:order] = d mode_shapes[:phi.shape[0],:order, order] = phi eigenvalues[order,:order] = lamda if modal_contrib: _, delta = self.synthesize_correlation(A, C, G) modal_contributions[order,:order] = delta self.max_model_order = max_model_order self.modal_frequencies = modal_frequencies self.modal_damping = modal_damping self.mode_shapes = mode_shapes self.eigenvalues = eigenvalues self.modal_contributions = modal_contributions self.state[2] = True
[docs] def estimate_state(self, order, max_modes=None, algo='svd'): ''' Compute the state matrix A, output matrix C and next-state-output covariance matrix G from the singular values and vectors of the block Toeplitz matrix, truncated at the requested order. Estimation of the state matrix can be performed by QR decomposition or Singular Value decomposition of the shifted observability matrix. If max_modes is specified, the singular value decomposition is truncated additionally, also known as Crystal Clear SSI. Parameters ---------- order: integer, required Model order, at which the state matrices should be estimated max_modes: integer, optional Maximum number of modes, that are known to be present in the signal, to suppress noise modes algo: str, optional Algorithm to use for estimation of A. Either 'svd' or 'qr'. Returns ------- A: numpy.ndarray State matrix: Array of shape (order, order) C: numpy.ndarray Output matrix: Array of shape (num_analised_channels, order) G: numpy.ndarray next-state-output covariance matrix : Array of shape (order, num_ref_channels) ''' if order > self.S.shape[0]: raise RuntimeError(f'Order cannot be higher than {self.S.shape[0]}. Consider using more block_rows/block_columns.') assert algo in ['svd', 'qr'] n_l = self.num_analised_channels n_r = self.num_ref_channels num_block_rows = self.num_block_rows U = self.U[:,:order] S = self.S[:order] V_T = self.V_T[:order,:] # compute state-space model S_2 = np.power(S, 0.5) O = U * S_2[np.newaxis,:] Z = S_2[:, np.newaxis] * V_T On_up = O[:n_l * num_block_rows,:order] On_down = O[n_l:n_l * (num_block_rows + 1),:order] if algo == 'svd': if max_modes is not None: [u, s, v_t] = np.linalg.svd(On_up, 0) s = 1. / s[:max_modes] # On_up_i = np.dot(np.transpose(v_t[:max_modes, :]), np.multiply( # s[:, np.newaxis], np.transpose(u[:, :max_modes]))) On_up_i = v_t[:max_modes,:].T @ (s[:, np.newaxis] * u[:,:max_modes].T) else: On_up_i = np.linalg.pinv(On_up) # , rcond=1e-12) A = On_up_i @ On_down elif algo == 'qr': Q, R = np.linalg.qr(On_up) S = Q.T.dot(On_down) A = np.linalg.solve(R, S) C = O[:n_l,:order] # output matrix G = Z[:order, -n_r:] # next-state-output covariance matrix return A, C, G
[docs] def modal_analysis(self, A, C, rescale_fun=None): ''' Computes the modal parameters from a given state space model as described by Peeters 1999 and Döhler 2012. Mode shapes are scaled to unit modal displacements. Complex conjugate and real modes are removed prior to further processing. Typically, order // 2 modes are in the returned arrays. Parameters ---------- A: numpy.ndarray State matrix: Array of shape (order, order) C: numpy.ndarray Output matrix: Array of shape (num_analised_channels, order) Returns ------- modal_frequencies: (order,) numpy.ndarray Array holding the modal frequencies for each mode modal_damping: (order,) numpy.ndarray Array holding the modal damping ratios (0,100) for each mode mode_shapes: (n_l, order,) numpy.ndarray Complex array holding the mode shapes eigenvalues: (order,) numpy.ndarray Complex array holding the eigenvalues for each mode ''' # collect variables accel_channels = self.accel_channels velo_channels = self.velo_channels sampling_rate = self.prep_signals.sampling_rate n_l = self.num_analised_channels order = A.shape[0] assert order == A.shape[1] # allocate output arrays modal_frequencies = np.full((order), np.nan) modal_damping = np.full((order), np.nan) mode_shapes = np.full((n_l, order), np.nan, dtype=complex) eigenvalues = np.full((order), np.nan, dtype=complex) # compute modal model eigvals, eigvecs_r = np.linalg.eig(A) Phi = C.dot(eigvecs_r) conj_indices = self.remove_conjugates(eigvals, eigvecs_r, inds_only=True) for i, ind in enumerate(conj_indices): lambda_i = eigvals[ind] mode_shape_i = Phi[:, ind] a_i = np.abs(np.arctan2(np.imag(lambda_i), np.real(lambda_i))) b_i = np.log(np.abs(lambda_i)) freq_i = np.sqrt(a_i ** 2 + b_i ** 2) * sampling_rate / 2 / np.pi damping_i = 100 * np.abs(b_i) / np.sqrt(a_i ** 2 + b_i ** 2) if rescale_fun is not None: mode_shape_i = rescale_fun(mode_shape_i) # scale modeshapes to modal displacements mode_shape_i = self.integrate_quantities( mode_shape_i, accel_channels, velo_channels, freq_i * 2 * np.pi) # rotate mode shape in complex plane mode_shape_i = self.rescale_mode_shape(mode_shape_i) modal_frequencies[i] = freq_i modal_damping[i] = damping_i mode_shapes[:mode_shape_i.shape[0], i] = mode_shape_i eigenvalues[i] = lambda_i argsort = np.argsort(modal_frequencies) return modal_frequencies[argsort], modal_damping[argsort], mode_shapes[:, argsort], eigenvalues[argsort],
[docs] def synthesize_correlation(self, A, C, G): ''' Correlation function synthetization in a modal decoupled form follows Reynders-2012-SystemIdentificationMethodsFor(Operational)ModalAnalysisReviewAndComparison Eq. 161 p. 74 (24) where \Lambda are the correlation functions of the identified system Parameters ---------- A: numpy.ndarray State matrix: Array of shape (order, order) C: numpy.ndarray Output matrix: Array of shape (num_analised_channels, order) G: numpy.ndarray next-state-output covariance matrix : Array of shape (order, num_ref_channels) Returns ------- corr_matrix_synth: (n_l, n_r, m_lags, n_modes) numpy.ndarray Array holding the modally decomposed correlation functions for each channel n_l and reference channel n_r and all modes modal_contributions: (order,) numpy.ndarray Array holding the contributions of each mode to the input correlation function. To use cross-validation for the modal contributions, manually replace the correlation matrix, between steps system identification and modal analysis: .. code-block:: python order=40 m_lags= 150 n_blocks = 40 k = 10 cardinality = n_blocks // k block_indices = np.arange(cardinality*k) np.random.shuffle(block_indices) prep_signals.corr_blackman_tukey(max_m_lags, num_blocks=n_blocks, refs_only=True) i = 1 # subset i out of k test_set = block_indices[i * cardinality:(i + 1) * cardinality] training_set = np.take(block_indices, np.arange((i + 1) * cardinality, (i + k) * cardinality), mode='wrap') # use the training set for building the Toeplitz matrix prep_signals.corr_matrix_bt = np.mean(prep_signals.corr_matrices_bt[training_set,...,:this_m_lags], axis=0) modal_data.build_toeplitz_cov(int(this_m_lags // 2)) # use the test set for modal analysis prep_signals.corr_matrix_bt = np.mean(prep_signals.corr_matrices_bt[test_set,...,:this_m_lags], axis=0) A, C, G = modal_data.estimate_state(order) this_modal_frequencies, this_modal_damping, this_mode_shapes, this_eigenvalues, = modal_data.modal_analysis(A, C) _, this_modal_contributions = modal_data.synthesize_correlation(A, C, G) . . . ''' num_block_rows = self.num_block_rows num_block_columns = self.num_block_columns n_l = self.num_analised_channels n_r = self.num_ref_channels order = A.shape[0] assert order == A.shape[1] m_lags = num_block_rows + 1 + num_block_columns - 1 # m_lags = self.prep_signals.m_lags corr_matrix_data = self.prep_signals.corr_matrix[:,:,:m_lags] corr_mats_shape = (n_l, n_r, m_lags, order // 2) corr_matrix_synth = np.zeros(corr_mats_shape, dtype=np.float64) Sigma_data = np.zeros((n_l * n_r)) Sigma_synth = np.zeros((n_l * n_r)) Sigma_data_synth = np.zeros((n_l * n_r, order)) modal_contributions = np.zeros((order)) # redundant: eigendecomposition is recomputed here for better readability of code eigvals, eigvecs_r = np.linalg.eig(A) Phi = C.dot(eigvecs_r) conj_indices = self.remove_conjugates(eigvals, eigvecs_r, inds_only=True) # Peeters-2000-SystemIdentificationAndDamageDetectionInCivilEngineering Eq. 2.57 G_m = np.linalg.solve(eigvecs_r, G) if logger.isEnabledFor(logging.DEBUG): tau = 21 logger.debug(C @ eigvecs_r @ np.diag(eigvals) ** tau @ np.linalg.inv(eigvecs_r) @ G) logger.debug(Phi @ np.diag(eigvals) ** tau @ G_m) for i, ind in enumerate(conj_indices): lambda_i = eigvals[ind] conjs_ind = eigvals == lambda_i.conj() conjs_ind[ind] = 1 conj_eigvals = eigvals[conjs_ind][np.newaxis] conj_Phis = Phi[:, conjs_ind] conj_Gms = G_m[conjs_ind,:] eigspowtau = conj_eigvals[np.newaxis, ...] ** np.arange(m_lags)[:, np.newaxis, np.newaxis] this_corr_synth = (eigspowtau * conj_Phis[np.newaxis, ...]).dot(conj_Gms) if not np.all(np.isclose(this_corr_synth.imag, 0)): logger.warning(f'Synthetized correlation functions are complex for mode index {ind}. Something is wrong!') this_corr_synth = np.transpose(this_corr_synth.real, (1, 2, 0)) corr_matrix_synth[:,:,:, i] = this_corr_synth if logger.isEnabledFor(logging.DEBUG): Sigma_data_synthtot = np.zeros((n_l * n_r)) for i_r in range(n_r): for i_l in range(n_l): corr_data = corr_matrix_data[i_l, i_r,:] corr_synth = np.sum(corr_matrix_synth, axis=3)[i_l, i_r,:] Sigma_data[i_r * n_l + i_l] = corr_data.dot(corr_data.T) Sigma_synth[i_r * n_l + i_l] = corr_synth.dot(corr_synth.T) if logger.isEnabledFor(logging.DEBUG): Sigma_data_synthtot[i_r * n_l + i_l] = corr_data.dot(corr_synth.T) for i, ind in enumerate(conj_indices): Sigma_data_synth[i_r * n_l + i_l, i] = corr_data @ corr_matrix_synth[i_l, i_r,:, i] for i, ind in enumerate(conj_indices): rho = (Sigma_data_synth[:, i] / np.sqrt(Sigma_data * Sigma_synth)) modal_contributions[i] = rho.mean() self._corr_matrix_synth = corr_matrix_synth self._modal_contributions = modal_contributions return corr_matrix_synth, modal_contributions
[docs] def synthesize_spectrum(self, A, C, G): ''' L = N*dt (duration = number_of_samples*sampling_period) P = N*df (maximal frequency = number of samples * frequency inverval) dt * df = 1/N L * P = N ''' logger.warning('Implementation: Spectrum estimation is not tested.') # f_max = self.prep_signals.sampling_rate / 2 m_lags = self.prep_signals.m_lags delta_t = 1 / self.prep_signals.sampling_rate num_analised_channels = self.num_analised_channels num_ref_channels = self.num_ref_channels order = A.shape[0] assert order == A.shape[1] psd_mats_shape = (num_analised_channels, num_ref_channels, m_lags) psd_matrix = np.zeros(psd_mats_shape, dtype=np.float64) I = np.identity(order) Lambda_0 = self.prep_signals.signal_power() for n in range(m_lags): z = np.exp(0 + 1j * n * delta_t) psd_matrix[:,:, n] = C.dot(np.linalg.solve( z * I - A, G)) + Lambda_0 + G.T.dot(np.linalg.solve(1 / z * I - A.T, C.T)) self._psd_matrix = psd_matrix
[docs] def save_state(self, fname): logger.info('Saving results to {}...'.format(fname)) dirname, _ = os.path.split(fname) if not os.path.isdir(dirname): os.makedirs(dirname) # 0 1 2 # self.state= [Toeplitz, State Mat., Modal Par.] out_dict = {'self.state': self.state} out_dict['self.setup_name'] = self.setup_name # out_dict['self.prep_signals']=self.prep_signals if self.state[0]: # covariances # out_dict['self.toeplitz_matrix'] = self.toeplitz_matrix out_dict['self.num_block_columns'] = self.num_block_columns out_dict['self.num_block_rows'] = self.num_block_rows out_dict['self.U'] = self.U out_dict['self.S'] = self.S out_dict['self.V_T'] = self.V_T if self.state[2]: # modal params out_dict['self.modal_frequencies'] = self.modal_frequencies out_dict['self.modal_damping'] = self.modal_damping out_dict['self.mode_shapes'] = self.mode_shapes out_dict['self.eigenvalues'] = self.eigenvalues out_dict['self.modal_contributions'] = self.modal_contributions out_dict['self.max_model_order'] = self.max_model_order np.savez_compressed(fname, **out_dict)
[docs] @classmethod def load_state(cls, fname, prep_signals): logger.info('Loading results from {}'.format(fname)) in_dict = np.load(fname, allow_pickle=True) # 0 1 2 # self.state= [Toeplitz, State Mat., Modal Par.] if 'self.state' in in_dict: state = validate_array(in_dict['self.state']) else: return for this_state, state_string in zip(state, ['Covariance Matrices Built', 'State Matrices Computed', 'Modal Parameters Computed', ]): if this_state: logger.info(state_string) assert isinstance(prep_signals, PreProcessSignals) setup_name = validate_array(in_dict['self.setup_name']) assert setup_name == prep_signals.setup_name start_time = prep_signals.start_time assert start_time == prep_signals.start_time ssi_object = cls(prep_signals) ssi_object.state = state if state[0]: # covariances # ssi_object.toeplitz_matrix = in_dict['self.toeplitz_matrix'] ssi_object.num_block_columns = validate_array(in_dict['self.num_block_columns']) ssi_object.num_block_rows = validate_array(in_dict['self.num_block_rows']) ssi_object.U = validate_array(in_dict['self.U']) ssi_object.S = validate_array(in_dict['self.S']) ssi_object.V_T = validate_array(in_dict['self.V_T']) if state[2]: # modal params ssi_object.modal_frequencies = validate_array(in_dict['self.modal_frequencies']) ssi_object.modal_damping = validate_array(in_dict['self.modal_damping']) ssi_object.mode_shapes = validate_array(in_dict['self.mode_shapes']) ssi_object.eigenvalues = validate_array(in_dict['self.eigenvalues']) ssi_object.modal_contributions = validate_array(in_dict.get( 'self.modal_contributions', None)) ssi_object.max_model_order = validate_array(in_dict['self.max_model_order']) return ssi_object
[docs] def show_channel_reconstruction(modal_data, modelist=None, channel_list=None, ref_channel_list=None, axes=None): import matplotlib.pyplot as plt corr_matrix_synth = modal_data._corr_matrix_synth corr_matrix_data = modal_data.prep_signals.corr_matrix if channel_list is None: channel_list = np.arange(modal_data.prep_signals.num_analised_channels) if ref_channel_list is None: ref_channel_list = np.arange(modal_data.prep_signals.num_ref_channels) num_modes = corr_matrix_synth.shape[-1] if modelist is None: modelist = list(range(num_modes)) ratio = len(channel_list) / len(ref_channel_list) num_plots = len(modelist) n_rows = int(num_plots / ratio) n_cols = int(np.ceil(num_plots / n_rows)) fig, axes = plt.subplots(n_rows, n_cols, sharex=True, sharey=True) RMS_err = np.zeros((len(channel_list), len(ref_channel_list), num_modes)) for mode in modelist: # Error is not normalized and tends to be larger for channels with stronger signals RMS_err[..., mode] = np.sqrt(np.mean(np.power(corr_matrix_synth[..., mode] - corr_matrix_data, 2), axis=-1)) vmin, vmax = np.min(RMS_err), np.max(RMS_err) for mode in modelist: mappable = axes.flat[mode].imshow(RMS_err[..., mode], vmin=vmin, vmax=vmax) if axes.flat[mode].get_subplotspec().is_first_col(): axes.flat[mode].set_yticks(np.arange(len(channel_list)), modal_data.prep_signals.channel_headers) if axes.flat[mode].get_subplotspec().is_last_row(): axes.flat[mode].set_xticks(np.arange(len(ref_channel_list)), np.array(modal_data.prep_signals.channel_headers)[modal_data.prep_signals.ref_channels]) # axes.flat[mode].set_title(f'{modal_data._modal_frequencies[mode]:1.3f} Hz') cbar = fig.colorbar(mappable) cbar.set_label('RMS error')
[docs] def plot_corr_synth(modal_data, modelist=None, channel_inds=None, ref_channel_inds=None, axes=None): import matplotlib.pyplot as plt corr_matrix_synth = modal_data._corr_matrix_synth m_lags = corr_matrix_synth.shape[2] corr_matrix_data = modal_data.prep_signals.corr_matrix[:,:,:m_lags] ref_channels = modal_data.prep_signals.ref_channels sampling_rate = modal_data.prep_signals.sampling_rate channel_headers = modal_data.prep_signals.channel_headers modal_contributions = modal_data._modal_contributions if channel_inds is None: channel_inds = np.arange(modal_data.prep_signals.num_analised_channels) num_channels = len(channel_inds) if ref_channel_inds is None: ref_channel_inds = np.arange(modal_data.prep_signals.num_ref_channels) num_ref_channels = len(ref_channel_inds) # build non-repeating channel combinations # contains indices for all_channels (=channel numbers) and ref_channels # channel numbers for ref_channels can be obtained from prep_signals.ref_channels i_l_i_r = np.full((num_channels * num_ref_channels, 2), np.nan) j = 0 for index_l in channel_inds: if index_l in ref_channels: index_l_in_ref_channels = ref_channels.index(index_l) else: index_l_in_ref_channels = None for index_r in ref_channel_inds: if index_l_in_ref_channels is None: i_l_i_r[j, 0] = index_l i_l_i_r[j, 1] = index_r j += 1 else: index_r_in_all_channels = ref_channels[index_r] inds_inverted = np.array([[index_r_in_all_channels, index_l_in_ref_channels]]) if not (np.any(np.all(i_l_i_r == inds_inverted , axis=1))): i_l_i_r[j, 0] = index_l i_l_i_r[j, 1] = index_r j += 1 i_l_i_r = i_l_i_r[~np.all(np.isnan(i_l_i_r), axis=1),:].astype(int) # Plot correlation functions for each mode and all channel combinations num_modes = corr_matrix_synth.shape[-1] if modelist is None: modelist = list(range(num_modes)) num_plots = len(modelist) + 2 fig1, axes = plt.subplots(num_plots, 1, sharex='col', sharey='col', squeeze=False) taus = np.linspace(0, m_lags / sampling_rate, m_lags) for ip, i in enumerate(modelist): rho = modal_contributions[i] this_corr_synth = corr_matrix_synth[:,:,:, i] for j in range(len(i_l_i_r)): i_l, i_r = i_l_i_r[j,:] color = str(np.linspace(0, 1, len(i_l_i_r) + 2)[j + 1]) ls = 'solid' axes[ip, 0].plot(taus, this_corr_synth[i_l, i_r,:], color=color, ls=ls) axes[ip, 0].set_ylabel(f'$\delta_{{{i + 1}}}$={rho:1.2f}', rotation=0, labelpad=40, va='center', ha='left') for j in range(len(i_l_i_r)): i_l, i_r = i_l_i_r[j,:] color = str(np.linspace(0, 1, len(i_l_i_r) + 2)[j + 1]) ls = 'solid' this_corr_data = corr_matrix_data[i_l, i_r,:] this_corr_synth = np.sum(corr_matrix_synth, axis=3)[i_l, i_r,:] axes[-1, 0].plot(taus, this_corr_data, color=color, ls=ls, label=f'{channel_headers[i_l]} $\leftrightarrow$ {channel_headers[ref_channels[i_r]]}') axes[-2, 0].plot(taus, this_corr_synth, color=color, ls=ls,) axes[-1, 0].set_ylabel('Measured', rotation=0, labelpad=50, va='center', ha='left') axes[-2, 0].set_ylabel(f'$\sum\delta$={np.sum(modal_contributions):1.2f}', rotation=0, labelpad=50, va='center', ha='left') axes[-1, 0].set_xlabel('$\\tau$ [\si{\second}]') for ax in axes.flat: ax.set_yticks([]) ax.set_xlim(0, taus.max() / 2) fig1.legend(title='Channels') fig1.subplots_adjust(left=None, bottom=None, right=0.97, top=0.97, wspace=None, hspace=0.1,) # Plot power spectral density functions for each channel combination and all modes num_plots = len(i_l_i_r) fig2, axes = plt.subplots(num_plots, 1, sharex='col', sharey='col', squeeze=False) ft_freq = np.fft.rfftfreq(m_lags, d=(1 / sampling_rate)) for j in range(num_plots): i_l, i_r = i_l_i_r[j,:] this_corr_data = corr_matrix_data[i_l, i_r,:] ft_meas = np.fft.rfft(this_corr_data * np.hanning(m_lags)) if j == 0: label = f'Inp.' else: label = None axes[j, 0].plot(ft_freq, 10 * np.log10(np.abs(ft_meas)), ls='solid', color='k', label=label) for ip, i in enumerate(modelist): ft_synth = np.fft.rfft(corr_matrix_synth[i_l, i_r,:, i] * np.hanning(m_lags)) color = str(np.linspace(0, 1, len(modelist) + 2)[ip + 1]) ls = ['-', '--', ':', '-.'][i % 4] if j == 0: label = f'm={i+1}' else: label = None axes[j, 0].plot(ft_freq, 10 * np.log10(np.abs(ft_synth)), color=color, ls=ls, label=label) axes[j, 0].set_ylabel(f'{channel_headers[i_l]}\n $\leftrightarrow$ \n{channel_headers[ref_channels[i_r]]}', rotation=0, labelpad=20, va='center', ha='center') axes[-1, 0].set_xlabel('$f$ [\si{\hertz}]') for ax in axes.flat: ax.set_yticks([]) ax.set_xlim(0, 1 / 2 * sampling_rate) ax.set_ylim(ymin=-90) fig2.legend(title='Mode') fig2.subplots_adjust(left=None, bottom=None, right=0.97, top=0.97, wspace=None, hspace=0.1,) return fig1, fig2
[docs] class PogerSSICovRef(BRSSICovRef): ''' "In the PoGER approach, first a nonparametric system model is identified for each setup separately. In the time domain, this nonparametric model consists of the correlations between all measured outputs. In a second step, the output correlations obtained from the different setups are stacked on top of each other. Extracting the modal parameters from the resulting correlation function yields global values for the eigenfrequencies and damping ratios. The identified partial mode shapes are stacked on top of each other in a global mode shape. However, due to the non-stationary ambient excitation level and the non-stationary ambient excitation color, it is necessary to re-scale the partial mode shapes in a least-squares sense, for instance to the reference DOFs of the first partial mode shape, just as in the PoSER approach" from: Döhler, M.; Reynders, E.; Magalhaes, F.; Mevel, L.; Roeck, G. D. & Cunha, A. Pre-and post-identification merging for multi-setup OMA with covariance-driven SSI 28th International Modal Analysis Conference, 2010 , 57-70 Analysis steps: * Create your geometry definitions * Create configuration files and channel-dof-assignments for each setup * Pre-process each setup using PreProcessData * Pre-compute correlations functions using PreProcessData.compute_correlation_functions (note: m_lags >= num_block_columns + num_block_rows >= 2 * num_block_columns + 1) * add the PreProcessData objects of each setup using add_setup * call pair_channels(), build_merged_subspace_matrix(), estimate_state(), compute_modal_params() Notes on the reference channels: There are two different uses of reference channels: 1. Reference channels for reducing the computational effort / improving results if noisy channels are present 2. Reference channels for mode shape rescaling when multiple setups should be merged In PoGER merging the first group of reference channels are required for joint identification. In this case, reference-based correlation functions are "stacked on top of each other" and then assembled into a joint Hankel matrix. Here, only the reference channels, that are present in all setups can be used. Based on each setups' channel-dof-assignments and selected reference channels, the PogerSSICovRef class automatically determines the reference channels for: * joint identification and * mode shape rescaling / merging. Thus, by changing the reference channel definition in each setup, the used reference channels in joint identification can be influenced. The reference channels for modeshape rescaling are automatically generated, regardless of the the definition in the setup. Rescaling is always done with respect to the first setup, so a "good" setup should always be added first. .. TODO:: * Add modal contributions * Implement PreGER merging with variance computation in a new class '''
[docs] def __init__(self,): ''' Initializes class and all class variables channel definition: channels start at 0 ''' super().__init__() self.state = [False, False, False, False, False] # __init__ self.setup_name = 'merged_' # self.start_times = [] # add_setup self.setups = [] self.sampling_rate = None self.num_ref_channels = None self.m_lags = None # pair_channels self.ssi_ref_channels = None self.merged_chan_dofs = None self.merged_accel_channels = None self.merged_velo_channels = None self.merged_disp_channels = None self.merged_num_channels = None self.num_analised_channels = None # self.start_time = None # build_merged_subspace_matrix self.subspace_matrix = None self.num_block_columns = None self.num_block_rows = None self.U = None self.S = None self.V_T = None
[docs] def add_setup(self, prep_signals): ''' todo: check that ref_channels are equal in each setup (by number and by DOF) ''' assert isinstance(prep_signals, PreProcessSignals) # assure chan_dofs were assigned assert prep_signals.chan_dofs if self.sampling_rate is not None: assert prep_signals.sampling_rate == self.sampling_rate else: self.sampling_rate = prep_signals.sampling_rate if self.num_ref_channels is not None: if self.num_ref_channels != prep_signals.num_ref_channels: warnings.warn( 'This setup contains a different number of reference channels ({}), than the previous setups ({})!'.format( prep_signals.num_ref_channels, self.num_ref_channels)) self.num_ref_channels = min( self.num_ref_channels, prep_signals.num_ref_channels) else: self.num_ref_channels = prep_signals.num_ref_channels if self.m_lags is not None: self.m_lags = min(self.m_lags, prep_signals.m_lags) else: self.m_lags = prep_signals.m_lags self.setup_name += prep_signals.setup_name + '_' # self.start_times.append(prep_signals.start_time) # extract needed information and store them in a dictionary self.setups.append({'setup_name': prep_signals.setup_name, 'num_analised_channels': prep_signals.num_analised_channels, 'chan_dofs': prep_signals.chan_dofs, 'ref_channels': prep_signals.ref_channels, # 'roving_channels': prep_signals.roving_channels, 'accel_channels': prep_signals.accel_channels, 'velo_channels': prep_signals.velo_channels, 'disp_channels': prep_signals.disp_channels, 'corr_matrix': prep_signals.corr_matrix, 'start_time': prep_signals.start_time, }) logger.info( 'Added setup "{}" with {} channels'.format( prep_signals.setup_name, prep_signals.num_analised_channels)) # assign last setup, to be able to display spectra in stabil_plot # this actually created a bug in modal_analysis, where accel_channels and velo_channels # were passed from prep_signals, instead of the merged version # this bug would have been caught if prep_signals wouldn't exist in the object self.prep_signals = prep_signals self.state[3] = True
[docs] def pair_channels(self,): ''' pairs channels from all given setups for the poger merging methods ssi_reference channels are common to all setups rescale reference channels are common to at least two setups finds common dofs from all setups and their respective channels generates new channel_dof_assignments with ascending channel numbers rescale reference channels are assumed to be equal to ssi_reference channels ''' logger.info('Pairing channels and dofs...') setups = self.setups merged_chan_dofs = [] merged_accel_channels = [] merged_velo_channels = [] merged_disp_channels = [] # extract dofs from each setup and exclude channel numbers # merged_chan_dofs will be a list of chan_dof lists ''' merged_chan_dofs = [[dof of setup 0 channel 0, dof of setup 0 channel 1, ... dof of setup 0 channel num_analised_channels] [dof of setup 1 channel 0, dof of setup 1 channel 1, ... dof of setup 1 channel num_analised_channels] ... [dof of setup num_setups channel 0, dof of setup num_setups channel 1, ... dof of setup num_setups channel num_analised_channels] ] ''' for setup in setups: chan_dofs = [] accel_channels = [] velo_channels = [] disp_channels = [] this_chan_dofs = setup['chan_dofs'] this_num_analised_channels = setup['num_analised_channels'] # this_ref_channels = setup['ref_channels'] # this_rov_channels = setup['roving_channels'] this_accel_channels = setup['accel_channels'] this_velo_channels = setup['velo_channels'] this_disp_channels = setup['disp_channels'] # chan dofs are now sorted by channel number this_chan_dofs.sort(key=lambda x: x[0]) for channel in range(this_num_analised_channels): for chan_dof in this_chan_dofs: if channel == chan_dof[0]: node, az, elev = chan_dof[1:4] if len(chan_dof) == 5: name = chan_dof[4] else: name = '' chan_dofs.append([node, az, elev, name]) break # if channel has not been assigned to a DOF else: chan_dofs.append([None, 0, 0, '']) accel_channels.append(channel in this_accel_channels) velo_channels.append(channel in this_velo_channels) disp_channels.append(channel in this_disp_channels) merged_chan_dofs.append(chan_dofs) merged_accel_channels.append(accel_channels) merged_velo_channels.append(velo_channels) merged_disp_channels.append(disp_channels) # find dofs common to all setups # takes the dofs of the first setup as ssi_ref_dofs # loops over all setups and only keeps channels that are present in # the previous ssi_ref_dofs and the current setup # only ssi_ref_dofs can be used in the assembly of the hankel matrix # for mode shape rescaling the ref_dofs between the respective combination of two setups could be used # but the logic for this is not included here, therefore # only ssi_ref_dofs will be used for mode shape rescaling ssi_ref_dofs = copy.deepcopy(merged_chan_dofs[0]) for chan_dofs in merged_chan_dofs[1:]: new_ref_dofs = [] for node, az, elev, name in chan_dofs: if node is None: continue for rnode, raz, relev, rname in ssi_ref_dofs: if node == rnode and az == raz and elev == relev and name == rname: new_ref_dofs.append((rnode, raz, relev, rname)) break ssi_ref_dofs = new_ref_dofs if len(ssi_ref_dofs) == 0: raise RuntimeError( 'Could not find any DOF that is common to all setups.') # find channels for each common dof # add the channel number to rescale_ref_channels; these will # be used to get the reference modal coordinates in mode shape rescaling # add the index of the channel in setups' ref_channels to # ssi_ref_channels; these will be used to assemble the Hankel # matrix were each column is assembled from a single reference DOF (!) # if reference channel orders have changed between setups, reordering # is needed to achieve constistent columns of the Hankel matrix ssi_ref_channels = [] # base_ssi_ref_channels = ssi_ref_channels[0] rescale_ref_channels = [] for setup, chan_dofs in zip(setups, merged_chan_dofs): # prep_signals = setup['prep_signals'] this_ssi_ref_channels = [] this_rescale_ref_channels = [] for rnode, raz, relev, rname in ssi_ref_dofs: index = None for channel, node, az, elev, name in setup['chan_dofs']: if node == rnode and az == raz and elev == relev and name == rname: # index = i # channel = setup['chan_dofs'][index][0] this_rescale_ref_channels.append(int(channel)) if channel not in setup['ref_channels']: warnings.warn( 'Channel {} ({}) is common to multiple setups but not chosen as a reference channel.'.format( channel, name)) else: this_ref_index = setup['ref_channels'].index( channel) this_ssi_ref_channels.append(this_ref_index) break else: raise RuntimeError( 'Oops! Something went wrong. This should not happen!') rescale_ref_channels.append(this_rescale_ref_channels) ssi_ref_channels.append(this_ssi_ref_channels) # print(this_ssi_ref_channels) # reorder chan_dofs, accel_channels, etc. of the first setup # s.t. references come first, followed by rovings # refs are ordered by ssi_ref_dofs order # rovs are ordered by ascending channel number of the underlying setup new_chan_dofs, new_accel_channels, new_velo_channels, new_disp_channels = [], [], [], [] chan_dofs, accel_channels, velo_channels, disp_channels = merged_chan_dofs[ 0], merged_accel_channels[0], merged_velo_channels[0], merged_disp_channels[0] # print(chan_dofs,accel_channels, velo_channels, disp_channels ) for rnode, raz, relev, rname in ssi_ref_dofs: for i, (node, az, elev, name) in enumerate(chan_dofs): if node == rnode and az == raz and elev == relev and name == rname: new_chan_dofs.append(chan_dofs[i]) new_accel_channels.append(accel_channels[i]) new_velo_channels.append(velo_channels[i]) new_disp_channels.append(disp_channels[i]) break else: raise RuntimeError( 'This should not happen, as all ref_dofs were previously checked to be present in each setup.') del chan_dofs[i] del accel_channels[i] del velo_channels[i] del disp_channels[i] new_chan_dofs += chan_dofs new_accel_channels += accel_channels new_velo_channels += velo_channels new_disp_channels += disp_channels merged_chan_dofs[0], merged_accel_channels[0], merged_velo_channels[0], merged_disp_channels[ 0] = new_chan_dofs, new_accel_channels, new_velo_channels, new_disp_channels # delete channels of the reference dofs for chan_dofs, accel_channels, velo_channels, disp_channels in zip( merged_chan_dofs[1:], merged_accel_channels[1:], merged_velo_channels[1:], merged_disp_channels[1:]): # prep_signals = setup['prep_signals'] for rnode, raz, relev, rname in ssi_ref_dofs: index = None for i, (node, az, elev, name) in enumerate(chan_dofs): if node == rnode and az == raz and elev == relev and name == rname: index = i break else: raise RuntimeError( 'This should not happen, as all ref_dofs were previously checked to be present in each setup.') # remove the channel_dof_assignment of the reference channels # for all setups del chan_dofs[index] del accel_channels[index] del velo_channels[index] del disp_channels[index] # flatten chan_dofs and add ascending channel numbers flattened = [] channel = 0 for sublist in merged_chan_dofs: for val in sublist: val.insert(0, channel) flattened.append(val) channel += 1 merged_chan_dofs = flattened flattened = [] channel = 0 for sublist in merged_accel_channels: for val in sublist: if val: flattened.append(channel) channel += 1 merged_accel_channels = flattened flattened = [] channel = 0 for sublist in merged_velo_channels: for val in sublist: if val: flattened.append(channel) channel += 1 merged_velo_channels = flattened flattened = [] channel = 0 for sublist in merged_disp_channels: for val in sublist: if val: flattened.append(channel) channel += 1 merged_disp_channels = flattened num_analised_channels = sum( [setup['num_analised_channels'] for setup in setups]) self.merged_accel_channels = merged_accel_channels self.merged_velo_channels = merged_velo_channels self.merged_disp_channels = merged_disp_channels self.ssi_ref_channels = ssi_ref_channels self.rescale_ref_channels = rescale_ref_channels self.merged_chan_dofs = merged_chan_dofs self.merged_num_channels = len(merged_chan_dofs) self.num_analised_channels = num_analised_channels self.start_time = min([stp['start_time'] for stp in setups]) self.state[1] = True return ssi_ref_channels, merged_chan_dofs
@property def accel_channels(self): return self.merged_accel_channels @property def velo_channels(self): return self.merged_velo_channels
[docs] def build_merged_subspace_matrix( self, num_block_columns, num_block_rows=None): ''' Builds a Block-Hankel Matrix of Covariances with varying time lags :: <- num_block_columns*num_ref_channels-> _ [ R_1 R_2 ... R_i ]^ [ R_2 R_3 ... R_2 ]num_block_rows*(num_num_ref_channels*num_setups) [ ... ... ... ... ]v [ R_i ... ... R_2i-1 ]_ R_1 = [ R_1^1 ] [ R_1^2 ] [ ... ] [ R_1^num_setups ] ''' assert isinstance(num_block_columns, int) if num_block_rows is None: num_block_rows = num_block_columns # -10 assert isinstance(num_block_rows, int) if not num_block_columns + num_block_columns + 1 <= self.m_lags: raise RuntimeError( 'Correlation functions were pre-computed ' 'up to {} time lags, which is sufficient for assembling ' 'a Hankel-Matrix with up to {} x {} blocks. You requested ' '{} x {} blocks'.format( self.m_lags, self.m_lags // 2 + 1, self.m_lags // 2, num_block_rows + 1, num_block_columns)) setups = self.setups logger.info( 'Assembling subspace matrix using pre-computed correlation' ' functions from {} setups with {} block-columns and {} ' 'block rows'.format( len(setups), num_block_columns, num_block_rows + 1)) ssi_ref_channels = self.ssi_ref_channels num_analised_channels = self.num_analised_channels num_ref_channels = len(ssi_ref_channels[0]) m_lags = self.m_lags subspace_matrix = np.zeros( ((num_block_rows + 1) * num_analised_channels, num_block_columns * num_ref_channels)) end_row = None for block_row in range(num_block_rows + 1): sum_analised_channels = 0 for this_ssi_ref_channels, setup in zip(ssi_ref_channels, setups): this_analised_channels = setup['num_analised_channels'] this_corr_matrix = setup['corr_matrix'] this_corr_matrix = this_corr_matrix[:, this_ssi_ref_channels,:] # for each setup the order of the reference channels must be equal with respect to their DOFs # we need a list of (local) ref_channels that corresponds to the ref_channels' DOFs of the first setup # ssi_ref_channels = [[ref_DOF_1 -> index of ref_channel setup A, ref_DFO_2 -> index of ref_channel setup A, ...], # [ref_DOF_1 -> index of ref_channel setup B, ref_DFO_2 -> index of ref_channel setup B, ...], # ...] # this_corr_matrix = this_corr_matrix[:,this_ssi_ref_channels] # # just reorders the columns corresponding to the ref_channels # of current setup this_corr_matrix = this_corr_matrix.reshape( (this_analised_channels, num_ref_channels * m_lags), order='F') this_block_column = this_corr_matrix[:, block_row * num_ref_channels:( num_block_columns + block_row) * num_ref_channels] begin_row = block_row * num_analised_channels + sum_analised_channels if end_row is not None: assert begin_row >= end_row end_row = begin_row + this_analised_channels subspace_matrix[begin_row:end_row,:] = this_block_column sum_analised_channels += this_analised_channels # block_row 0 1 # setup 0: row 0*this_analised_channels ... 1*this_analised_channels, 3*this_analised_channels ... 4*this_analised_channels # setup 1: row 1*this_analised_channels ... 2*this_analised_channels, 4*this_analised_channels ... 5*this_analised_channels # setup 2: row 2*this_analised_channels ... 3*this_analised_channels, 5*this_analised_channels ... 6*this_analised_channels # (bc*num_setups+setup)*num_ref_channels assert (subspace_matrix != 0).all() U, S, V_T = scipy.linalg.svd(subspace_matrix, 1) self.U = U self.S = S self.V_T = V_T self.max_model_order = S.shape[0] # self.subspace_matrix = subspace_matrix self.num_block_rows = num_block_rows self.num_block_columns = num_block_columns self.state[0] = True
[docs] def compute_modal_params(self, max_model_order=None, max_modes=None, algo='svd'): super().compute_modal_params(max_model_order, max_modes, algo, modal_contrib=False) self.mode_shapes = self.mode_shapes[:self.merged_num_channels,:,:]
[docs] def modal_analysis(self, A, C,): return super().modal_analysis(A, C, rescale_fun=self.rescale_by_references)
[docs] def rescale_by_references(self, mode_shape): ''' This is PoGer Rescaling * extracts each setup's reference and roving parts of the modeshape * compute rescaling factor from all setup's reference channels using a least-squares approach * rescales each setup's roving channels and assembles final modeshape vector reference channel_pairs and final channel-dof-assignments have been determined by function pair_channels note: reference channels for SSI need not necessarily be reference channels for rescaling and vice versa :math:`S_\\phi \\times \\alpha = [n \\times 1, 0 .. 0]` :math:`\\phi^{ref}_i` : Reference-sensor part of modeshape estimated from setup :math:`i = 0 .. n` :matH:`j_{max} = \\operatorname{argmax}(\\Pi_i |\\phi^{ref}_i|)` : maximal modal component in all setups → will be approximately scaled to 1, must belong to the same sensor in each setup .. math:: S_\\phi = \\begin{bmatrix} \\phi^{ref}_{0,j_{max}}& \\phi^{ref}_{1,j_{max}}& ..& ..& \\phi^{ref}_{n,j_{max}} \\\\ \\phi^{ref}_0& -\\phi^{ref}_1& 0& ..& 0 \\\\ \\phi^{ref}_0& 0& -\\phi^{ref}_2& ..& 0 \\\\ . &. &. & . & . \\\\ . &. &. & . & . \\\\ \\phi^{ref}_0& 0& 0& ..& -\\phi^{ref}_n \\\\ 0& \\phi^{ref}_1& -\\phi^{ref}_2& ..& 0 \\\\ . &. & . & . & . \\\\ . &. & . & . & . \\\\ 0& \\phi^{ref}_1& 0& ..& -\\phi^{ref}_n \\\\ . &. & . & . & . \\\\ . &. & . & . & . \\\\ 0& 0& \\phi^{ref}_2& ..& -\\phi^{ref}_n \\\\ . &. & . & . & . \\\\ . &. & . & . & . \\\\ 0& 0& 0& \\phi^{ref}_{n-1}& -\\phi^{ref}_n \\end{bmatrix} if references are the same in all setups dimensions :math:`= 1 + (n_{setups} ! )* n_{ref_{channels}} \\times n_{setups}` not quite exact, since different setups may share different references → list based assembly of the :math:`S_\\phi` matrix ''' new_mode_shape = np.zeros((self.merged_num_channels), dtype=complex) start_row_scaled = 0 end_row_scaled = self.setups[0]['num_analised_channels'] new_mode_shape[start_row_scaled:end_row_scaled] = mode_shape[start_row_scaled:end_row_scaled] num_setups = len(self.setups) S_phi = [] # assemble the first line all_ref_modes = [] for setup_num, setup in enumerate(self.setups): this_refs = self.rescale_ref_channels[setup_num] mode_refs_this = mode_shape[this_refs] all_ref_modes.append(mode_refs_this) all_ref_modes = np.array(all_ref_modes).T max_ind = np.argmax(np.product(np.abs(all_ref_modes), axis=1)) S_phi.append(all_ref_modes[max_ind:max_ind + 1,:]) # assemble S_phi row_unscaled_1 = 0 for setup_num_1, setup_1 in enumerate(self.setups): row_unscaled_2 = 0 for setup_num_2, setup_2 in enumerate(self.setups): if setup_num_2 <= setup_num_1: row_unscaled_2 += setup_2['num_analised_channels'] continue # ssi_ref_channels is ref_channels with respect to setup not to # merged mode shape base_refs = self.rescale_ref_channels[setup_num_1] this_refs = self.rescale_ref_channels[setup_num_2] base_refs = [int(ref + row_unscaled_1) for ref in base_refs] this_refs = [int(ref + row_unscaled_2) for ref in this_refs] mode_refs_base = mode_shape[base_refs] mode_refs_this = mode_shape[this_refs] this_S_phi = np.zeros( (len(base_refs), num_setups), dtype=complex) this_S_phi[:, setup_num_1] = mode_refs_base this_S_phi[:, setup_num_2] = -1 * mode_refs_this S_phi.append(this_S_phi) row_unscaled_2 += setup_2['num_analised_channels'] row_unscaled_1 += setup_1['num_analised_channels'] S_phi = np.vstack(S_phi) # compute scaling factors rhs = np.zeros(S_phi.shape[0], dtype=complex) rhs[0] = num_setups + 0j alpha = np.linalg.pinv(S_phi).dot(rhs) end_row_scaled = 0 # self.setups[0]['num_analised_channels'] row_unscaled = 0 # self.setups[0]['num_analised_channels'] for setup_num, setup in enumerate(self.setups): this_refs = self.rescale_ref_channels[setup_num] # setup['roving_channels']+setup['ref_channels'] this_all = range(setup['num_analised_channels']) this_rovs = list(set(this_all).difference(this_refs)) this_rovs = [rov + row_unscaled for rov in this_rovs] mode_rovs_this = mode_shape[this_rovs] scale_fact = alpha[setup_num] if 0: base_refs = self.rescale_ref_channels[0] this_refs = [int(ref + row_unscaled) for ref in this_refs] mode_refs_base = mode_shape[base_refs] mode_refs_this = mode_shape[this_refs] mode_refs_this_conj = mode_refs_this.conj() numer = np.inner(mode_refs_this_conj, mode_refs_base) denom = np.inner(mode_refs_this_conj, mode_refs_this) scale_fact = numer / denom if setup_num == 0: mode_refs_this = mode_shape[this_refs] start_row_scaled = end_row_scaled end_row_scaled += len(this_refs) new_mode_shape[start_row_scaled:end_row_scaled] = scale_fact * \ mode_refs_this start_row_scaled = end_row_scaled end_row_scaled += len(this_rovs) new_mode_shape[start_row_scaled:end_row_scaled] = scale_fact * \ mode_rovs_this row_unscaled += setup['num_analised_channels'] return new_mode_shape
[docs] def save_state(self, fname): logger.info('Saving results to {}...'.format(fname)) dirname, _ = os.path.split(fname) if not os.path.isdir(dirname): os.makedirs(dirname) out_dict = {'self.state': self.state} out_dict['self.setup_name'] = self.setup_name if self.state[3]: # add_setup out_dict['self.setups'] = self.setups out_dict['self.sampling_rate'] = self.sampling_rate out_dict['self.num_ref_channels'] = self.num_ref_channels out_dict['self.m_lags'] = self.m_lags if self.state[1]: # pair_channels out_dict['self.ssi_ref_channels'] = self.ssi_ref_channels out_dict['self.rescale_ref_channels'] = self.rescale_ref_channels out_dict['self.merged_chan_dofs'] = self.merged_chan_dofs out_dict['self.merged_accel_channels'] = self.merged_accel_channels out_dict['self.merged_velo_channels'] = self.merged_velo_channels out_dict['self.merged_disp_channels'] = self.merged_disp_channels out_dict['self.merged_num_channels'] = self.merged_num_channels out_dict['self.num_analised_channels'] = self.num_analised_channels out_dict['self.start_time'] = self.start_time if self.state[0]: # build_merged_subspace_matrix # out_dict['self.subspace_matrix'] = self.subspace_matrix out_dict['self.num_block_columns'] = self.num_block_columns out_dict['self.num_block_rows'] = self.num_block_rows out_dict['self.U'] = self.U out_dict['self.S'] = self.S out_dict['self.V_T'] = self.V_T out_dict['self.max_model_order'] = self.max_model_order if self.state[2]: # compute_modal_params out_dict['self.eigenvalues'] = self.eigenvalues out_dict['self.modal_damping'] = self.modal_damping out_dict['self.modal_frequencies'] = self.modal_frequencies out_dict['self.mode_shapes'] = self.mode_shapes np.savez_compressed(fname, **out_dict)
[docs] @classmethod def load_state(cls, fname,): logger.info('Loading results from {}'.format(fname)) in_dict = np.load(fname, allow_pickle=True) if 'self.state' in in_dict: state = list(in_dict['self.state']) else: raise RuntimeError('The result file is missing required components (self.state)') for this_state, state_string in zip(state, ['Setups added', 'Channels paired, channel-DOF assignments generated', 'Subspace matrix built', 'State matrices computed', 'Modal parameters computed', ]): if this_state: logger.info(state_string) setup_name = str(in_dict['self.setup_name'].item()) ssi_object = cls() ssi_object.setup_name = setup_name ssi_object.state = state # debug_here if state[3]: # add_setup ssi_object.setups = validate_array(in_dict['self.setups']) ssi_object.sampling_rate = validate_array(in_dict['self.sampling_rate']) ssi_object.num_ref_channels = validate_array(in_dict['self.num_ref_channels']) ssi_object.m_lags = validate_array(in_dict['self.m_lags']) if state[1]: # pair_channels ssi_object.ssi_ref_channels = validate_array(in_dict['self.ssi_ref_channels']) ssi_object.rescale_ref_channels = validate_array(in_dict['self.rescale_ref_channels']) ssi_object.merged_chan_dofs = [[int(float(cd[0])), str(cd[1]), float(cd[2]), float( cd[3]), str(cd[4] if len(cd) == 5 else '')] for cd in in_dict['self.merged_chan_dofs']] ssi_object.merged_accel_channels = validate_array(in_dict['self.merged_accel_channels']) ssi_object.merged_velo_channels = validate_array(in_dict['self.merged_velo_channels']) ssi_object.merged_disp_channels = validate_array(in_dict['self.merged_disp_channels']) ssi_object.merged_num_channels = validate_array(in_dict['self.merged_num_channels']) ssi_object.num_analised_channels = validate_array(in_dict['self.num_analised_channels']) ssi_object.start_time = validate_array(in_dict['self.start_time']) if state[0]: # build_merged_subspace_matrix # ssi_object.subspace_matrix = validate_array(in_dict['self.subspace_matrix']) ssi_object.num_block_columns = validate_array(in_dict['self.num_block_columns']) ssi_object.num_block_rows = validate_array(in_dict['self.num_block_rows']) ssi_object.U = validate_array(in_dict['self.U']) ssi_object.S = validate_array(in_dict['self.S']) ssi_object.V_T = validate_array(in_dict.get('self.V_T', None)) ssi_object.max_model_order = validate_array(in_dict['self.max_model_order']) if state[2]: # compute_modal_params ssi_object.eigenvalues = validate_array(in_dict['self.eigenvalues']) ssi_object.modal_damping = validate_array(in_dict['self.modal_damping']) ssi_object.modal_frequencies = validate_array(in_dict['self.modal_frequencies']) ssi_object.mode_shapes = validate_array(in_dict['self.mode_shapes']) return ssi_object
if __name__ == '__main__': pass