# -*- coding: utf-8 -*-
'''
pyOMA - A toolbox for Operational Modal Analysis
Copyright (C) 2015 - 2025 Simon Marwitz, Volkmar Zabel, Andrei Udrea et al.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import numpy as np
import scipy.linalg
from collections import deque
import os
from .PreProcessingTools import PreProcessSignals
import logging
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
[docs]
class ERA(object):
[docs]
def __init__(self, prep_signals):
'''
channel definition: channels start at 0
'''
super().__init__()
assert isinstance(prep_signals, PreProcessSignals)
self.prep_signals = prep_signals
self.setup_name = prep_signals.setup_name
self.start_time = prep_signals.start_time
# 0 1 2
# self.state= [SHankelMatrix, State Mat., Modal Par.
self.state = [False, False, False]
self.num_block_columns = None
self.num_block_rows = None
self.toeplitz_matrix = None
self.hankel_matrix = None # anil
self.max_model_order = None
self.state_matrix = None
self.output_matrix = None
self.modal_damping = None
self.modal_frequencies = None
self.mode_shapes = None
[docs]
def CalculateFRF(self):
'''
function by anil
FRF(Frequency response function) is convertion of signal from time to frequency domain.
The following function performs this conversion.
'''
measurement = self.prep_signals.signals
num_channels = measurement.shape[1]
num_time_steps = self.prep_signals.F.shape[0]
acceleration_fft = np.zeros(
(num_time_steps // 2 + 1, num_channels), dtype=complex)
F_fft = np.fft.rfft(np.hamming(num_time_steps) * self.prep_signals.F)
for channel in range(num_channels): # loop over channels
fft_this_channel = np.fft.rfft(np.hamming(
num_time_steps) * measurement[:, channel])
acceleration_fft[:, channel] = fft_this_channel
FRF = np.zeros_like(acceleration_fft)
for channel in range(num_channels):
FRF[:, channel] = acceleration_fft[:, channel] / F_fft
IRF = np.zeros((num_time_steps, num_channels))
for channel in range(num_channels): # loop over channels
ifft_this_channel = np.fft.irfft(FRF[:, channel])
IRF[:, channel] = ifft_this_channel
self.IFRF = IRF.T
[docs]
def build_hankel_matrix(self, num_block_columns):
'''
author: Anil
Constructs a shifted hankel matrix.
'''
IRFT = self.IFRF
num_channels = self.prep_signals.num_analised_channels
num_block_rows = num_block_columns + 1
self.num_block_columns = num_block_columns
self.num_block_rows = num_block_rows
Hankel_matrix = np.zeros(
(num_channels *
num_block_rows,
num_block_columns),
dtype=complex)
for i in range(0, num_block_rows):
j = i + 1
this_block = IRFT[0:num_channels, j:(num_block_columns + j)]
begin_row = i * num_channels
Hankel_matrix[begin_row:(
begin_row + num_channels), 0:num_block_columns] = this_block
self.hankel_matrix = Hankel_matrix
self.state[0] = True
[docs]
def compute_state_matrices(self, max_model_order=None):
'''
'''
if max_model_order is not None:
assert isinstance(max_model_order, int)
assert self.state[0]
hankel_matrix = self.hankel_matrix # anil
num_channels = self.prep_signals.num_analised_channels
# num_block_columns = self.num_block_columns
# num_block_rows = self.num_block_rows
print('Computing state matrices...')
[U, S, _] = np.linalg.svd(hankel_matrix, 0) # anil
# anil
S1 = np.diag(S)
S_sqrt = np.sqrt(S1)
p1 = np.dot(U, S_sqrt)
# p2=np.dot(S_sqrt,V_T)
# A=np.dot(np.linalg.pinv(p1), hankel_matrix, np.linalg.pinv(p2))
# A=A.real
C = p1[:num_channels,:]
# C=C.real
# p1=p1.real
self.Oi = p1
# self.state_matrix = A
self.output_matrix = C
self.max_model_order = max_model_order
self.state[1] = True
self.state[2] = False # previous modal params are invalid now
def compute_modal_params(self, max_model_order=None):
if max_model_order is not None:
assert max_model_order <= self.max_model_order
self.max_model_order = max_model_order
assert self.state[1]
print('Computing modal parameters...')
max_model_order = self.max_model_order
num_analised_channels = self.prep_signals.num_analised_channels
num_block_rows = self.num_block_rows
# state_matrix = self.state_matrix
Oi = self.Oi
output_matrix = self.output_matrix
sampling_rate = self.prep_signals.sampling_rate
modal_frequencies = np.zeros((max_model_order, max_model_order))
modal_damping = np.zeros((max_model_order, max_model_order))
eigenvalues = np.zeros(
(max_model_order, max_model_order), dtype=complex)
mode_shapes = np.zeros(
(num_analised_channels,
max_model_order,
max_model_order),
dtype=complex)
for order in range(1, max_model_order, 1):
Oi0 = Oi[:(num_analised_channels * (num_block_rows - 1)),:order]
Oi1 = Oi[num_analised_channels:(
num_analised_channels * num_block_rows),:order]
a = np.dot(np.linalg.pinv(Oi0), Oi1)
eigenvalues_paired, _, eigenvectors_paired = scipy.linalg.eig(
a=a[0:order, 0:order], b=None, left=True, right=True)
eigenvalues_single, eigenvectors_single = self.remove_conjugates_new(
eigenvalues_paired, eigenvectors_paired)
for index, k in enumerate(eigenvalues_single):
lambda_k = np.log(complex(k)) * sampling_rate
freq_j = np.abs(lambda_k) / (2 * np.pi)
damping_j = np.real(lambda_k) / np.abs(lambda_k) * (-100)
mode_shapes_j = np.dot(
output_matrix[:, 0:order], eigenvectors_single[:, index])
modal_frequencies[order, index] = freq_j
modal_damping[order, index] = damping_j
eigenvalues[order, index] = k
mode_shapes[:, index, order] = mode_shapes_j
self.modal_frequencies = modal_frequencies
self.modal_damping = modal_damping
self.mode_shapes = mode_shapes
self.eigenvalues = eigenvalues
self.state[2] = True
[docs]
@staticmethod
def remove_conjugates_new(eigval, eigvec_r, eigvec_l=None):
'''
removes conjugates
eigvec_l.shape = [order+1, order+1]
eigval.shape = [order+1,1]
'''
# return vectors, eigval
num_val = len(eigval)
conj_indices = deque()
for i in range(num_val):
this_val = eigval[i]
this_conj_val = np.conj(this_val)
if this_val == this_conj_val: # remove real eigvals
conj_indices.append(i)
for j in range(
i + 1, num_val): # catches unordered conjugates but takes slightly longer
if eigval[j] == this_conj_val:
# if not np.allclose(eigvec_l[j],eigvec_l[i].conj()):
# print('eigval is complex conjugate but eigvec_l is not')
# continue
# if not np.allclose(eigvec_r[j],eigvec_r[i].conj()):
# print('eigval is complex conjugate but eigvec_r is not')
# continue
conj_indices.append(j)
break
# print('indices of complex conjugate: {}'.format(conj_indices))
conj_indices = list(set(range(num_val)).difference(conj_indices))
# print('indices to keep and return: {}'.format(conj_indices))
if eigvec_l is None:
eigvec_r = eigvec_r[:, conj_indices]
eigval = eigval[conj_indices]
return eigval, eigvec_r
else:
eigvec_l = eigvec_l[:, conj_indices]
eigvec_r = eigvec_r[:, conj_indices]
eigval = eigval[conj_indices]
return eigval, eigvec_l, eigvec_r
def save_state(self, fname):
dirname, _ = os.path.split(fname)
if not os.path.isdir(dirname):
os.makedirs(dirname)
# 0 1 2
# self.state= [SHankelMatrix, State Mat., Modal Par.]
out_dict = {'self.state': self.state}
out_dict['self.setup_name'] = self.setup_name
out_dict['self.start_time'] = self.start_time
# out_dict['self.prep_signals']=self.prep_signals
if self.state[0]: # SHankelMatrix
# out_dict['self.toeplitz_matrix'] = self.toeplitz_matrix
out_dict['self.hankel_matrix'] = self.hankel_matrix
out_dict['self.num_block_columns'] = self.num_block_columns
out_dict['self.num_block_rows'] = self.num_block_rows
if self.state[1]: # state models
out_dict['self.max_model_order'] = self.max_model_order
out_dict['self.state_matrix'] = self.state_matrix
out_dict['self.output_matrix'] = self.output_matrix
if self.state[2]: # modal params
out_dict['self.modal_frequencies'] = self.modal_frequencies
out_dict['self.modal_damping'] = self.modal_damping
out_dict['self.mode_shapes'] = self.mode_shapes
out_dict['self.eigenvalues'] = self.eigenvalues
np.savez_compressed(fname, **out_dict)
@classmethod
def load_state(cls, fname, prep_signals):
print('Now loading previous results from {}'.format(fname))
in_dict = np.load(fname)
# 0 1 2
# self.state= [SHankelMatrix, State Mat., Modal Par.]
if 'self.state' in in_dict:
state = list(in_dict['self.state'])
else:
return
for this_state, state_string in zip(state, ['Shifted Hankel Matrices Built',
'State Matrices Computed',
'Modal Parameters Computed',
]):
if this_state:
print(state_string)
assert isinstance(prep_signals, PreProcessSignals)
setup_name = str(in_dict['self.setup_name'].item())
# start_time = in_dict['self.start_time'].item()
assert setup_name == prep_signals.setup_name
start_time = prep_signals.start_time
assert start_time == prep_signals.start_time
# prep_signals = in_dict['self.prep_signals'].item()
ssi_object = cls(prep_signals)
ssi_object.state = state
if state[0]: # SHankelMatrix
ssi_object.hankel_matrix = in_dict['self.hankel_matrix']
ssi_object.num_block_columns = int(
in_dict['self.num_block_columns'])
ssi_object.num_block_rows = int(in_dict['self.num_block_rows'])
if state[1]: # state models
ssi_object.max_model_order = int(in_dict['self.max_model_order'])
ssi_object.state_matrix = in_dict['self.state_matrix']
ssi_object.output_matrix = in_dict['self.output_matrix']
if state[2]: # modal params
ssi_object.modal_frequencies = in_dict['self.modal_frequencies']
ssi_object.modal_damping = in_dict['self.modal_damping']
ssi_object.mode_shapes = in_dict['self.mode_shapes']
ssi_object.eigenvalues = in_dict['self.eigenvalues']
return ssi_object
@staticmethod
def rescale_mode_shape(modeshape):
# scaling of mode shape
modeshape = modeshape / modeshape[np.argmax(np.abs(modeshape))]
return modeshape
if __name__ == '__main__':
pass