Source code for pyOMA.core.StabilDiagram

# -*- coding: utf-8 -*-
'''
pyOMA - A toolbox for Operational Modal Analysis
Copyright (C) 2015 - 2025  Simon Marwitz, Volkmar Zabel, Andrei Udrea et al.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.

Based on previous works by Andrei Udrea 2014 and Volkmar Zabel 2015
Modified and Extended by Simon Marwitz 2015 ff.

..TODO::
 * scale markers right on every platform
 * frequency range as argument or from ssi params, sampling freq
 * add switch to choose between "unstable only in ..." or "stable in ..."
 * (select and merge several poles with a rectangular mouse selection)
 * distinguish beetween stabilization criteria and filtering criteria
 * add zoom and sliders (horizontal/vertical) for the main figure
 * distinguish between  "export results" and "save state"
 * rework mask logic (currently it is very difficult to understand)
 * Merge DataCursor and JupyterGUI.SnappingCursor 

'''

from .SSICovRef import PogerSSICovRef
from .ModalBase import ModalBase
from .Helpers import simplePbar, calculateMAC, calculateMPC, calculateMPD
import numpy as np

import scipy.cluster
import scipy.spatial
import scipy.stats

import os

import collections
from operator import itemgetter
from random import shuffle

# check if python is running in headless mode i.e. as a server script
# if 'DISPLAY' in os.environ:
#     matplotlib.use("Qt5Agg", force=True)
from matplotlib import rcParams
from matplotlib.backend_bases import FigureCanvasBase
from matplotlib.figure import Figure
from matplotlib.text import TextPath, FontProperties
from matplotlib.path import Path
from matplotlib.markers import MarkerStyle
from matplotlib.widgets import Cursor
import matplotlib.cm
import matplotlib.pyplot as plot

plot.rc('figure', figsize=[8.5039399474194, 5.255723925793184], dpi=100,)
plot.rc('font', size=10)
plot.rc('legend', fontsize=10, labelspacing=0.1)
plot.rc('axes', linewidth=0.2)
plot.rc('xtick.major', width=0.2)
plot.rc('ytick.major', width=0.2)
# plot.ioff()

NoneType = type(None)

import logging
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)


[docs] class StabilCalc(object):
[docs] def __init__(self, modal_data, prep_signals=None, **kwargs): super().__init__() # print(type(modal_data), file=sys.stderr) assert isinstance(modal_data, ModalBase) self.modal_data = modal_data self.extra_func = None self.setup_name = modal_data.setup_name self.start_time = modal_data.start_time if prep_signals is not None: logger.warning('Providing prep_signals is not required anymore. Ignoring argument!') # assert isinstance(prep_signals, PreProcessSignals) self.prep_signals = modal_data.prep_signals has_mode_shapes = self.modal_data.__dict__.get( 'mode_shapes', None) is not None has_variances = self.modal_data.__dict__.get( 'std_frequencies', None) is not None has_data = self.modal_data.prep_signals is not None has_MC = self.modal_data.__dict__.get( 'modal_contributions', None) is not None has_ev = self.modal_data.__dict__.get( 'eigenvalues', None) is not None self.capabilities = {'f': 1, 'd': 1, 'msh': has_mode_shapes, 'std': has_variances, 'ev': has_ev, 'mtn': 0, 'MC': has_MC, 'auto': isinstance(self, StabilCluster), 'data': has_data} if self.capabilities['ev']: self.masked_lambda = np.ma.array( self.modal_data.eigenvalues, fill_value=0) self.masked_frequencies = np.ma.array( self.modal_data.modal_frequencies, copy=True, fill_value=0) self.masked_frequencies[np.isnan(self.masked_frequencies)] = 0 self.masked_damping = np.ma.array( self.modal_data.modal_damping, fill_value=0) max_model_order = self.modal_data.max_model_order self.num_solutions = self.modal_data.modal_frequencies.shape[1] self.order_dummy = np.ma.array( [[order] * self.num_solutions for order in range(max_model_order)], fill_value=0) # stable-in-... masks self.masks = {'mask_pre': None, # some constraints (f>0.0, order_range, etc) 'mask_ad': None, # absolute damping 'mask_stdf': None, # uncertainty frequency 'mask_stdd': None, # uncertainty damping 'mask_mpc': None, # absolute modal phase collinearity 'mask_mpd': None, # absolute mean phase deviation 'mask_mtn': None, # absolute modal transfer norm 'mask_df': None, # difference frequency 'mask_dd': None, # difference damping 'mask_dmac': None, # difference mac 'mask_dev': None, # difference eigenvalue 'mask_dmtn': None, # difference modal transfer norm 'mask_stable': None # stable in all criteria } # only-unstable-in-... masks self.nmasks = {'mask_ad': None, # absolute damping 'mask_stdf': None, # uncertainty frequency 'mask_stdd': None, # uncertainty damping # absolute modal phase collineratity 'mask_ampc': None, 'mask_ampd': None, # absolute mean phase deviation 'mask_amtn': None, # absolute modal transfer norm 'mask_df': None, # difference frequency 'mask_dd': None, # difference damping 'mask_dmac': None, # difference mac 'mask_dev': None, # difference eigenvalue 'mask_dmtn': None, # difference modal transfer norm } self.select_modes = [] self.select_callback = None self.state = 0 self.order_range = (0, 1, self.modal_data.max_model_order) self.d_range = (0, 100) self.stdf_max = 100 self.stdd_max = 100 self.mpc_min = 0 self.mpd_max = 90 self.mtn_min = 0 self.df_max = 0.01 self.dd_max = 0.05 self.dmac_max = 0.02 self.dev_min = 0.02 self.dmtn_min = 0.02 self.MC_min = 0 # print(self.capabilities) # self.calculate_soft_critera_matrices() self.callbacks = {'add_mode':[], 'remove_mode':[], }
def add_callback(self, name, func): assert name in ['add_mode', 'remove_mode'] self.callbacks[name].append(func) def calculate_soft_critera_matrices(self): logger.info('Checking stabilisation criteria...') # Direction 1: model order, Direction 2: current pole, Direction 3: # previous pole: max_model_order = self.modal_data.max_model_order num_solutions = self.num_solutions capabilities = self.capabilities lambda_diffs = np.ma.zeros( (max_model_order, num_solutions, num_solutions), fill_value=0) freq_diffs = np.ma.zeros( (max_model_order, num_solutions, num_solutions), fill_value=0) damp_diffs = np.ma.zeros( (max_model_order, num_solutions, num_solutions), fill_value=0) if capabilities['msh']: MAC_diffs = np.ma.zeros( (max_model_order, num_solutions, num_solutions), fill_value=0) MPD_diffs = np.ma.zeros( (max_model_order, num_solutions, num_solutions), fill_value=0) MP_diffs = np.ma.zeros( (max_model_order, num_solutions, num_solutions), fill_value=0) MPC_matrix = np.ma.zeros( (max_model_order, num_solutions), fill_value=0) MP_matrix = np.ma.zeros( (max_model_order, num_solutions), fill_value=0) MPD_matrix = np.ma.zeros( (max_model_order, num_solutions), fill_value=0) if capabilities['ev']: prev_lambda_row = self.masked_lambda.data[0,:] prev_freq_row = self.masked_frequencies[0,:] prev_damp_row = self.modal_data.modal_damping[0,:] if capabilities['msh']: prev_mode_shapes_row = self.modal_data.mode_shapes[:,:, 0] # tuple with array of indizes of non-zero frequencies if capabilities['ev']: # prev_non_zero_entries = np.nonzero(prev_lambda_row.imag) prev_non_zero_entries = np.where((~np.isnan(prev_lambda_row.imag)) & (prev_lambda_row.imag != 0)) else: # prev_non_zero_entries = np.nonzero(prev_freq_row) prev_non_zero_entries = np.where((~np.isnan(prev_freq_row)) & (prev_freq_row != 0)) prev_length = len(prev_non_zero_entries[0]) if capabilities['ev']: prev_lambda = prev_lambda_row[prev_non_zero_entries] prev_freq = prev_freq_row[prev_non_zero_entries] prev_damp = prev_damp_row[prev_non_zero_entries] if capabilities['msh']: prev_mode_shapes = \ prev_mode_shapes_row[:, prev_non_zero_entries[0]] prev_MPD, prev_MP_new = calculateMPD(prev_mode_shapes) prev_MP_new[prev_MP_new > 90] -= 180 # in range [-90,90] pbar = simplePbar(max_model_order - 1) for curr_order in range(1, max_model_order): next(pbar) if capabilities['ev']: curr_lambda_row = self.masked_lambda.data[(curr_order),:] curr_freq_row = self.masked_frequencies[(curr_order),:] curr_damp_row = self.modal_data.modal_damping[ (curr_order),:] if capabilities['msh']: curr_mode_shapes_row = \ self.modal_data.mode_shapes[:,:, curr_order] # catches zeros and also real poles, which should have been removed # in remove conjugates already if capabilities['ev']: # curr_non_zero_entries = np.nonzero(curr_lambda_row.imag) curr_non_zero_entries = np.where((~np.isnan(curr_lambda_row.imag)) & (curr_lambda_row.imag != 0)) else: # curr_non_zero_entries = np.nonzero(curr_freq_row) curr_non_zero_entries = np.where((~np.isnan(curr_freq_row)) & (curr_freq_row != 0)) curr_length = len(curr_non_zero_entries[0]) # print(curr_length) if not curr_length: continue if capabilities['ev']: curr_lambda = curr_lambda_row[curr_non_zero_entries] curr_freq = curr_freq_row[curr_non_zero_entries] curr_damp = curr_damp_row[curr_non_zero_entries] if capabilities['msh']: curr_mode_shapes = \ curr_mode_shapes_row[:, curr_non_zero_entries[0]] if capabilities['ev']: div_lambda = np.maximum( np.repeat(np.expand_dims(np.ma.abs(prev_lambda), axis=1), curr_lambda.shape[0], axis=1), np.repeat(np.expand_dims(np.ma.abs(curr_lambda), axis=0), prev_lambda.shape[0], axis=0)) div_freq = np.maximum( np.repeat(np.expand_dims(np.abs(prev_freq), axis=1), curr_freq.shape[0], axis=1), np.repeat(np.expand_dims(np.abs(curr_freq), axis=0), prev_freq.shape[0], axis=0)) div_damp = np.maximum( np.repeat(np.expand_dims(np.abs(prev_damp), axis=1), curr_damp.shape[0], axis=1), np.repeat(np.expand_dims(np.abs(curr_damp), axis=0), prev_damp.shape[0], axis=0)) if capabilities['msh']: mac_diffs = np.transpose(1 - calculateMAC( prev_mode_shapes[:,:prev_length], curr_mode_shapes[:,:curr_length])) # print(mac_diffs) MAC_diffs[curr_order, curr_non_zero_entries[0],:prev_length] = mac_diffs MPC_matrix[curr_order, curr_non_zero_entries[0]] = calculateMPC( curr_mode_shapes[:,:curr_length]) curr_MPD, curr_MP = calculateMPD( curr_mode_shapes[:,:curr_length]) MPD_matrix[curr_order, curr_non_zero_entries[0]], MP_matrix[ curr_order, curr_non_zero_entries[0]] = curr_MPD, curr_MP if capabilities['ev']: lambda_diffs[curr_order, curr_non_zero_entries[0],:len(prev_lambda)] = np.abs((np.repeat( np.expand_dims(prev_lambda, axis=1), curr_lambda.shape[0], axis=1) - curr_lambda) / div_lambda).T freq_diffs[curr_order, curr_non_zero_entries[0],:len(prev_freq)] = np.abs((np.repeat( np.expand_dims(prev_freq, axis=1), curr_freq.shape[0], axis=1) - curr_freq) / div_freq).T damp_diffs[curr_order, curr_non_zero_entries[0],:len(prev_damp)] = np.abs((np.repeat( np.expand_dims(prev_damp, axis=1), curr_damp.shape[0], axis=1) - curr_damp) / div_damp).T if capabilities['msh']: div_MPD = np.maximum( np.repeat( np.expand_dims( np.abs(prev_MPD), axis=1), curr_MPD.shape[0], axis=1), np.repeat( np.expand_dims( np.abs(curr_MPD), axis=0), prev_MPD.shape[0], axis=0)) MPD_diffs[curr_order, curr_non_zero_entries[0],:len(prev_MPD)] = np.abs((np.repeat( np.expand_dims(prev_MPD, axis=1), curr_MPD.shape[0], axis=1) - curr_MPD) / div_MPD).T curr_MP_new = np.copy(curr_MP) # in range [0,180] curr_MP_new[curr_MP_new > 90] -= 180 # in range [-90,90] div_MP = np.maximum( np.repeat( np.expand_dims( np.abs(prev_MP_new), axis=1), curr_MP_new.shape[0], axis=1), np.repeat( np.expand_dims( np.abs(curr_MP_new), axis=0), prev_MP_new.shape[0], axis=0)) MP_diffs[curr_order, curr_non_zero_entries[0],:len(prev_MP_new)] = np.abs((np.repeat( np.expand_dims(prev_MP_new, axis=1), curr_MP_new.shape[0], axis=1) - curr_MP_new) / div_MP).T if capabilities['ev']: prev_lambda = curr_lambda prev_freq = curr_freq prev_damp = curr_damp if capabilities['msh']: prev_mode_shapes = curr_mode_shapes prev_MPD = curr_MPD prev_MP_new = curr_MP_new prev_length = curr_length prev_non_zero_entries = curr_non_zero_entries if capabilities['ev']: self.lambda_diffs = lambda_diffs self.freq_diffs = freq_diffs self.damp_diffs = damp_diffs self.MAC_diffs = MAC_diffs self.MPD_diffs = MPD_diffs self.MP_diffs = MP_diffs self.MPD_matrix = MPD_matrix self.MP_matrix = MP_matrix self.MPC_matrix = MPC_matrix self.state = 1 def export_results(self, fname, binary=False): selected_freq, selected_damp, selected_modes, _, \ selected_order, selected_MC, \ selected_MPC, selected_MP, selected_MPD, \ selected_stdf, selected_stdd, selected_stdmsh = self.get_selected_modal_values() freq_str = '' damp_str = '' ord_str = '' if self.capabilities['msh']: msh_str = '' mpc_str = '' mp_str = '' mpd_str = '' if self.capabilities['std']: std_freq_str = '' std_damp_str = '' std_msh_str = '' if self.capabilities['MC']: MC_str = '' for col in range(len(selected_freq)): freq_str += '{:<3.3f}\t\t'.format(selected_freq[col]) damp_str += '{:<3.3f}\t\t'.format(selected_damp[col]) ord_str += '{:<6d}\t\t'.format(selected_order[col]) if self.capabilities['msh']: mpc_str += '{:<3.3f}\t \t'.format(selected_MPC[col]) mp_str += '{:<3.2f}\t\t'.format(selected_MP[col]) mpd_str += '{:<3.2f}\t\t'.format(selected_MPD[col]) if self.capabilities['std']: std_damp_str += '{:<3.3e}\t\t'.format(selected_stdd[col]) std_freq_str += '{:<3.3e}\t\t'.format(selected_stdf[col]) if self.capabilities['MC']: MC_str += '{:<3.3f}\t\t'.format(selected_MC[col]) if self.capabilities['msh']: for row in range(selected_modes.shape[0]): if isinstance(self.modal_data, PogerSSICovRef): chan_dofs = self.modal_data.merged_chan_dofs elif self.capabilities['data']: chan_dofs = self.prep_signals.chan_dofs else: chan_dofs = [] for chan_dof in chan_dofs: chan, node, az, elev = chan_dof[:4] # print(chan, row, chan==row) if chan == row: msh_str += f'\n{node.ljust(10)} ({az: <+3.2f}, {elev: >+3.2f}) \t' break else: msh_str += '\n ' if self.capabilities['std']: std_msh_str += '\n \t\t' for col in range(selected_modes.shape[1]): msh_str += '{:+<3.4f}\t'.format(selected_modes[row, col]) if self.capabilities['std']: std_msh_str += '{:+<3.3e} \t'.format( selected_stdmsh[row, col]) export_modes = 'MANUAL MODAL ANALYSIS\n' export_modes += '=======================\n' export_modes += 'Frequencies [Hz]: \t' + freq_str + '\n' if self.capabilities['std']: export_modes += 'Standard deviations of the Frequencies [Hz]:\t' + \ std_freq_str + '\n' export_modes += 'Damping [%]: \t' + damp_str + '\n' if self.capabilities['std']: export_modes += 'Standard deviations of the Damping [%]: \t' + \ std_damp_str + '\n' if self.capabilities['MC']: export_modes += 'Modal Contributions of the mode [-]: \t' + \ MC_str + '\n' if self.capabilities['msh']: export_modes += 'Node (Azimuth, Elevation) \tMode shapes:' + msh_str + '\n' if self.capabilities['std']: export_modes += 'Standard Deviations of the Mode shapes: \t' + \ std_msh_str + '\n' export_modes += 'Model order: \t' + ord_str + '\n' if self.capabilities['msh']: export_modes += 'MPC [-]: \t' + mpc_str + '\n' export_modes += 'MP [\u00b0]: \t' + mp_str + '\n' export_modes += 'MPD [-]: \t' + mpd_str + '\n\n' # + 'SSI parameters\n' # + '=======================\n'\ # + 'Maximum order :\t\t' + str(self.modal_data.max_model_order) + '\n'\ # + 'Block rows :\t\t' + str(self.num_block_rows) + '\n'\ # + 'Block columns :\t\t' + str(self.num_block_columns) + '\n' # + 'Decimation :\t\t' + str(dec_fact) + '\n'\ # + 'Filtering :\t\t' + str(filt_w) dirname, _ = os.path.split(fname) if not os.path.isdir(dirname): os.makedirs(dirname) if binary: out_dict = {'selected_freq': selected_freq, 'selected_damp': selected_damp, 'selected_order': selected_order} if self.capabilities['msh']: out_dict['selected_MPC'] = selected_MPC out_dict['selected_MP'] = selected_MP out_dict['selected_MPD'] = selected_MPD out_dict['selected_modes'] = selected_modes if self.capabilities['std']: out_dict['selected_stdf'] = selected_stdf out_dict['selected_stdd'] = selected_stdd out_dict['selected_stdmsh'] = selected_stdmsh np.savez_compressed(fname, **out_dict) else: f = open(fname, 'w') f.write(export_modes) f.close() def calculate_stabilization_masks( self, order_range=None, d_range=None, stdf_max=None, stdd_max=None, mpc_min=None, mpd_max=None, mtn_min=None, df_max=None, dd_max=None, dmac_max=None, dev_min=None, dmtn_min=None, MC_min=None): if self.state < 1: self.calculate_soft_critera_matrices() if order_range is None: order_range = (0, 1, self.modal_data.max_model_order) if d_range is None: d_range = (0, 100) if stdf_max is None: stdf_max = 100 if stdd_max is None: stdd_max = 100 if mpc_min is None: mpc_min = 0 if mpd_max is None: mpd_max = 90 if mtn_min is None: mtn_min = 0 if df_max is None: df_max = 0.01 if dd_max is None: dd_max = 0.05 if dmac_max is None: dmac_max = 0.02 if dev_min is None: dev_min = 0.02 if dmtn_min is None: dmtn_min = 0.02 if MC_min is None: MC_min = 0 self.state = 2 self.update_stabilization_masks( order_range, d_range, stdf_max, stdd_max, mpc_min, mpd_max, mtn_min, df_max, dd_max, dmac_max, dev_min, dmtn_min, MC_min) def update_stabilization_masks(self, order_range=None, d_range=None, stdf_max=None, stdd_max=None, mpc_min=None, mpd_max=None, mtn_min=None, df_max=None, dd_max=None, dmac_max=None, dev_min=None, dmtn_min=None, MC_min=None): if self.state < 2: self.calculate_stabilization_masks() # update # print(order_range , d_range , stdf_max , stdd_max, mpc_min, mpd_max, mtn_min,df_max, dd_max, dmac_max, dev_min, dmtn_min) if order_range is not None: self.order_range = order_range if d_range is not None: self.d_range = d_range if stdf_max is not None: self.stdf_max = stdf_max if stdd_max is not None: self.stdd_max = stdd_max if mpc_min is not None: self.mpc_min = mpc_min if mpd_max is not None: self.mpd_max = mpd_max if mtn_min is not None: self.mtn_min = mtn_min if df_max is not None: self.df_max = df_max if dd_max is not None: self.dd_max = dd_max if dmac_max is not None: self.dmac_max = dmac_max if dev_min is not None: self.dev_min = dev_min if dmtn_min is not None: self.dmtn_min = dmtn_min if MC_min is not None: self.MC_min = MC_min self.masked_frequencies.mask = np.ma.nomask self.order_dummy.mask = np.ma.nomask mask_pre = (~np.isnan(self.masked_frequencies)) & (self.masked_frequencies != 0) if order_range is not None: start, step, stop = order_range start = max(0, start) stop = min(stop, self.modal_data.max_model_order) mask_order = np.zeros_like(mask_pre) for order in range(start, stop, step): mask_order = np.logical_or( mask_order, self.order_dummy == order) mask_pre = np.logical_and(mask_pre, mask_order) self.masks['mask_pre'] = mask_pre if d_range is not None: assert isinstance(d_range, (tuple, list)) assert len(d_range) == 2 mask = np.logical_and( mask_pre, self.modal_data.modal_damping >= d_range[0]) mask = np.logical_and( mask, self.modal_data.modal_damping <= d_range[1]) self.masks['mask_ad'] = mask if stdf_max is not None and self.capabilities['std']: num_blocks = self.modal_data.num_blocks # mask = self.modal_data.std_frequencies <= stdf_max * \ # self.modal_data.modal_frequencies mask = scipy.stats.t.ppf(0.95, num_blocks) * self.modal_data.std_frequencies / np.sqrt(num_blocks) <= stdf_max mask = np.logical_and(mask_pre, mask) self.masks['mask_stdf'] = mask if stdd_max is not None and self.capabilities['std']: num_blocks = self.modal_data.num_blocks mask = scipy.stats.t.ppf(0.95, num_blocks) * self.modal_data.std_damping / np.sqrt(num_blocks) <= stdd_max # mask = self.modal_data.std_damping <= stdd_max * \ # self.modal_data.modal_damping mask = np.logical_and(mask_pre, mask) self.masks['mask_stdd'] = mask if mpc_min is not None: mask = np.logical_and(mask_pre, self.MPC_matrix >= mpc_min) self.masks['mask_mpc'] = mask if mpd_max is not None: mask = np.logical_and(mask_pre, self.MPD_matrix <= mpd_max) self.masks['mask_mpd'] = mask # if mtn_min is not None: # logger.warning('Modal Transfer Norm is not yet implemented! Ignoring') if MC_min is not None and self.capabilities['MC']: if np.issubdtype(self.modal_data.modal_contributions.dtype, complex): # account for complex modal contributions from spectra, e.g. pLSCF mask = np.logical_and( mask_pre, np.abs(self.modal_data.modal_contributions) >= MC_min) else: mask = np.logical_and( mask_pre, self.modal_data.modal_contributions >= MC_min) self.masks['mask_MC'] = mask full_masks = [] if df_max is not None: # rel freq diffs for each pole with all previous poles, # for all poles and orders results in 3d array # compare those rel freq diffs with df_max # and reduce 3d array to 2d array, by applying logical_or # along each poles axis (diff with all previous) mask_sf_all = np.logical_and( self.freq_diffs != 0, self.freq_diffs <= df_max) mask_sf_red = np.any(mask_sf_all, axis=2) # print(np.any(mask_sf_all)) self.masks['mask_df'] = np.logical_and(mask_pre, mask_sf_red) full_masks.append(mask_sf_all) if dd_max is not None: mask_sd_all = np.logical_and( self.damp_diffs != 0, self.damp_diffs <= dd_max) mask_sd_red = np.any(mask_sd_all, axis=2) # print(np.any(mask_sd_all)) self.masks['mask_dd'] = np.logical_and(mask_pre, mask_sd_red) full_masks.append(mask_sd_all) if dmac_max is not None: mask_sv_all = np.logical_and( self.MAC_diffs != 0, self.MAC_diffs <= dmac_max) mask_sv_red = np.any(mask_sv_all, axis=2) # print(np.any(mask_sv_all)) self.masks['mask_dmac'] = np.logical_and(mask_pre, mask_sv_red) full_masks.append(mask_sv_all) # if dev_min is not None: # logger.warning( # 'Eigenvalues are not available/implemented! Ignoring') # # if dmtn_min is not None: # logger.warning( # 'Modal Transfer Norm is not yet implemented! Ignoring') # check if all stability criteria are satisfied for all current poles if full_masks: stable_mask_full = np.ones_like(full_masks[0]) for mask in full_masks: stable_mask_full = np.logical_and(stable_mask_full, mask) stable_mask = np.any(stable_mask_full, axis=2) else: stable_mask = mask_pre # print(np.any(stable_mask)) self.masks['mask_stable'] = None for mask_name, mask in self.masks.items(): if mask_name == 'mask_autosel': continue if mask_name == 'mask_autoclear': continue if mask is not None: stable_mask = np.logical_and(stable_mask, mask) self.masks['mask_stable'] = stable_mask # compute the only-unstable-in-... masks self.nmasks = { name: np.logical_not(stable_mask) for name, mask in self.masks.items() if mask is not None} for nname, nmask in self.nmasks.items(): if nname in [ 'mask_pre', 'mask_stable', 'mask_autosel', 'mask_autoclear']: continue for name, mask in self.masks.items(): if mask is None or name in [ 'mask_pre', 'mask_stable', 'mask_autosel', 'mask_autoclear']: continue if name == nname: nmask = np.logical_and(nmask, np.logical_not(mask)) else: nmask = np.logical_and(nmask, mask) self.nmasks[nname] = nmask self.nmasks['mask_stable'] = stable_mask self.nmasks['mask_pre'] = self.masks['mask_pre'] self.nmasks['mask_autoclear'] = np.logical_not( self.masks.get('mask_autoclear', None)) self.nmasks['mask_autosel'] = np.logical_not( self.masks.get('mask_autosel', None)) def get_stabilization_mask(self, name): # print(name) mask = self.nmasks.get(name) if mask is None: mask = self.nmasks['mask_pre'] logger.debug('Pre Mask is empty') return np.logical_not(mask) def get_max_f(self): if self.prep_signals is not None: return self.prep_signals.sampling_rate / 2 elif isinstance(self.modal_data, PogerSSICovRef): return self.modal_data.sampling_rate / 2 else: return float(np.amax(self.masked_frequencies))
[docs] def get_frequencies(self): ''' Returns ------- frequencies: list Identified frequencies of all currently selected modes. ''' selected_indices = self.select_modes frequencies = sorted([self.masked_frequencies[index[0], index[1]] for index in selected_indices]) return frequencies
[docs] def get_selected_modal_values(self): ''' Returns ------- frequencies: list Identified frequencies of all currently selected modes. ''' if not self.select_modes: return [np.array([]) for _ in range(12)] self.masked_frequencies.mask = np.ma.nomask self.order_dummy.mask = np.ma.nomask select_modes = self.select_modes selected_freq = [self.masked_frequencies[index] for index in self.select_modes] select_modes = [x for (_, x) in sorted( zip(selected_freq, select_modes), key=lambda pair: pair[0])] selected_freq = [self.masked_frequencies[index] for index in select_modes] selected_damp = [self.modal_data.modal_damping[index] for index in select_modes] selected_order = [self.order_dummy[index] for index in select_modes] if self.capabilities['ev']: selected_lambda = [self.modal_data.eigenvalues[index] for index in select_modes] else: selected_lambda = None if self.capabilities['msh']: selected_MPC = [self.MPC_matrix[index] for index in select_modes] selected_MP = [self.MP_matrix[index] for index in select_modes] selected_MPD = [self.MPD_matrix[index] for index in select_modes] else: selected_MPC = None selected_MP = None selected_MPD = None if self.capabilities['std']: selected_stdf = [self.modal_data.std_frequencies[index] for index in select_modes] selected_stdd = [self.modal_data.std_damping[index] for index in select_modes] selected_stdmsh = np.zeros( (self.modal_data.mode_shapes.shape[0], len(select_modes)), dtype=complex) else: selected_stdf = None selected_stdd = None selected_stdmsh = None if self.capabilities['MC']: selected_MC = [self.modal_data.modal_contributions[index] for index in select_modes] else: selected_MC = None if self.capabilities['msh']: selected_modes = np.zeros( (self.modal_data.mode_shapes.shape[0], len(select_modes)), dtype=complex) for num, ind in enumerate(select_modes): row_index = ind[0] col_index = ind[1] mode_tmp = self.modal_data.mode_shapes[:, col_index, row_index] if self.capabilities['std']: std_mode = self.modal_data.std_mode_shapes[:, col_index, row_index] # scaling of mode shape abs_mode_tmp = np.abs(mode_tmp) index_max = np.argmax(abs_mode_tmp) this_max = mode_tmp[index_max] if not self.capabilities['std']: mode_tmp = mode_tmp / this_max selected_modes[:, num] = mode_tmp if self.capabilities['std']: selected_stdmsh[:, num] = std_mode else: selected_modes = None return selected_freq, selected_damp, selected_modes, selected_lambda, \ selected_order, selected_MC, \ selected_MPC, selected_MP, selected_MPD, \ selected_stdf, selected_stdd, selected_stdmsh
def get_modal_values(self, i): # needed for gui assert isinstance(i, (list, tuple)) assert len(i) == 2 assert i[0] <= self.modal_data.max_model_order assert i[1] <= self.num_solutions n = self.order_dummy[i] f = self.masked_frequencies[i] d = self.modal_data.modal_damping[i] if self.capabilities['msh']: mpc = self.MPC_matrix[i] mp = self.MP_matrix[i] mpd = self.MPD_matrix[i] MP_diffs = self.MP_diffs[i] # print(np.nonzero(MP_diffs)[0]) if len(np.nonzero(MP_diffs)[0]) >= 1: dmp = np.min(MP_diffs[np.nonzero(MP_diffs)]) else: dmp = 0 MPD_diffs = self.MPD_diffs[i] if len(np.nonzero(MPD_diffs)[0]) >= 1: dmpd = np.min(MPD_diffs[np.nonzero(MPD_diffs)]) else: dmpd = 0 else: mpc = np.nan mp = np.nan mpd = np.nan dmp = np.nan dmpd = np.nan if self.capabilities['std']: stdf = self.modal_data.std_frequencies[i] else: stdf = np.nan if self.capabilities['std']: stdd = self.modal_data.std_damping[i] else: stdd = np.nan if self.capabilities['mtn']: mtn = np.nan else: mtn = np.nan if self.capabilities['MC']: MC = self.modal_data.modal_contributions[i] else: MC = np.nan if self.extra_func is not None: ex_1, ex_2 = self.extra_func(self.modal_data, i, True) else: ex_1, ex_2 = np.nan, np.nan return n, f, stdf, d, stdd, mpc, mp, mpd, dmp, dmpd, mtn, MC, ex_1, ex_2 def get_mode_shape(self, i): assert isinstance(i, (list, tuple)) assert len(i) == 2 assert i[0] <= self.modal_data.max_model_order assert i[1] <= self.num_solutions return self.modal_data.mode_shapes[:, i[1], i[0]] def add_mode(self, mode_ind): if mode_ind not in self.select_modes: self.select_modes.append(mode_ind) for callback in self.callbacks['add_mode']: callback(mode_ind, len(self.select_modes) - 1) return self.select_modes.index(mode_ind) def remove_mode(self, mode_ind): if mode_ind in self.select_modes: list_ind = self.select_modes.index(mode_ind) del self.select_modes[list_ind] for callback in self.callbacks['remove_mode']: callback(mode_ind, list_ind) return list_ind else: logger.warning(f'{mode_ind} not in self.select_modes') return None def save_state(self, fname): logger.info('Saving results to {}...'.format(fname)) dirname, _ = os.path.split(fname) if not os.path.isdir(dirname): os.makedirs(dirname) out_dict = {'self.state': self.state} out_dict['self.setup_name'] = self.setup_name out_dict['self.start_time'] = self.start_time if self.state >= 1: if self.capabilities['ev']: out_dict['self.lambda_diffs'] = np.array(self.lambda_diffs) out_dict['self.freq_diffs'] = np.array(self.freq_diffs) out_dict['self.damp_diffs'] = np.array(self.damp_diffs) if self.capabilities['msh']: out_dict['self.MAC_diffs'] = np.array(self.MAC_diffs) out_dict['self.MPD_diffs'] = np.array(self.MPD_diffs) out_dict['self.MP_diffs'] = np.array(self.MP_diffs) out_dict['self.MPC_matrix'] = np.array(self.MPC_matrix) out_dict['self.MP_matrix'] = np.array(self.MP_matrix) out_dict['self.MPD_matrix'] = np.array(self.MPD_matrix) if self.state >= 2: out_dict['self.order_range'] = self.order_range out_dict['self.d_range'] = self.d_range if self.capabilities['std']: out_dict['self.stdf_max'] = self.stdf_max out_dict['self.stdd_max'] = self.stdd_max if self.capabilities['msh']: out_dict['self.mpc_min'] = self.mpc_min out_dict['self.mpd_max'] = self.mpd_max out_dict['self.mtn_min'] = self.mtn_min out_dict['self.df_max'] = self.df_max out_dict['self.dd_max'] = self.dd_max if self.capabilities['msh']: out_dict['self.dmac_max'] = self.dmac_max out_dict['self.dev_min'] = self.dev_min if self.capabilities['mtn']: out_dict['self.dmtn_min'] = self.dmtn_min if self.capabilities['MC']: out_dict['self.MC_min'] = self.MC_min out_dict['self.masks'] = self.masks out_dict['self.nmasks'] = self.nmasks if self.capabilities['auto']: if self.state >= 3: out_dict['self.num_iter'] = self.num_iter out_dict['self.threshold'] = self.threshold out_dict['self.clear_ctr'] = self.clear_ctr if self.state >= 4: out_dict['self.use_stabil'] = self.use_stabil out_dict['self.proximity_matrix_sq'] = self.proximity_matrix_sq out_dict['self.cluster_assignments'] = self.cluster_assignments if self.state >= 5: out_dict['self.select_clusters'] = self.select_clusters out_dict['self.nr_poles'] = self.nr_poles out_dict['self.selection_cut_off'] = self.selection_cut_off out_dict['self.select_modes'] = self.select_modes np.savez_compressed(fname, **out_dict) @classmethod def load_state(cls, fname, modal_data): logger.info('Now loading previous results from {}'.format(fname)) in_dict = np.load(fname, allow_pickle=True) if 'self.state' in in_dict: state = float(in_dict['self.state']) else: return setup_name = str(in_dict['self.setup_name'].item()) # start_time = in_dict['self.start_time'].item() assert setup_name == modal_data.setup_name # assert start_time == modal_data.start_time stabil_data = cls(modal_data) if state >= 1: if stabil_data.capabilities['ev']: stabil_data.lambda_diffs = np.ma.array( in_dict['self.lambda_diffs']) stabil_data.freq_diffs = np.ma.array(in_dict['self.freq_diffs']) stabil_data.damp_diffs = np.ma.array(in_dict['self.damp_diffs']) if stabil_data.capabilities['msh']: stabil_data.MAC_diffs = np.ma.array(in_dict['self.MAC_diffs']) stabil_data.MPD_diffs = np.ma.array(in_dict['self.MPD_diffs']) stabil_data.MP_diffs = np.ma.array(in_dict['self.MP_diffs']) stabil_data.MPC_matrix = np.ma.array( in_dict['self.MPC_matrix']) stabil_data.MP_matrix = np.ma.array(in_dict['self.MP_matrix']) stabil_data.MPD_matrix = np.ma.array( in_dict['self.MPD_matrix']) if state >= 2: stabil_data.order_range = tuple(in_dict['self.order_range']) stabil_data.d_range = tuple(in_dict['self.d_range']) if stabil_data.capabilities['std']: stabil_data.stdf_max = float(in_dict['self.df_max']) stabil_data.stdd_max = float(in_dict['self.stdd_max']) if stabil_data.capabilities['msh']: stabil_data.mpc_min = float(in_dict['self.mpc_min']) stabil_data.mpd_max = float(in_dict['self.mpd_max']) stabil_data.mtn_min = float(in_dict['self.mtn_min']) stabil_data.df_max = float(in_dict['self.df_max']) stabil_data.dd_max = float(in_dict['self.dd_max']) if stabil_data.capabilities['msh']: stabil_data.dmac_max = float(in_dict['self.dmac_max']) stabil_data.dev_min = float(in_dict['self.dev_min']) if stabil_data.capabilities['mtn']: stabil_data.dmtn_min = float(in_dict['self.dmtn_min']) if stabil_data.capabilities['MC']: stabil_data.MC_min = float(in_dict['self.MC_min']) stabil_data.masks = in_dict['self.masks'].item() stabil_data.nmasks = in_dict['self.nmasks'].item() if stabil_data.capabilities['auto']: if state >= 3: stabil_data.num_iter = int(in_dict['self.num_iter']) stabil_data.threshold = float(in_dict['self.threshold']) stabil_data.clear_ctr = in_dict['self.clear_ctr'] if state >= 4: stabil_data.use_stabil = bool(in_dict['self.use_stabil']) stabil_data.proximity_matrix_sq = in_dict['self.proximity_matrix_sq'] stabil_data.cluster_assignments = in_dict['self.cluster_assignments'] if state >= 5: stabil_data.select_clusters = list( in_dict['self.select_clusters']) stabil_data.nr_poles = list(in_dict['self.nr_poles']) stabil_data.selection_cut_off = float( in_dict['self.selection_cut_off']) select_modes = [tuple(a) for a in in_dict['self.select_modes']] frequencies = [stabil_data.masked_frequencies[index[0], index[1]] for index in select_modes] stabil_data.select_modes = [ x for _, x in sorted(zip(frequencies, select_modes))] stabil_data.state = state return stabil_data
[docs] class StabilCluster(StabilCalc): """ The automatic modal analysis done in three stages clustering. 1st stage: values sorted according to their soft and hard criteria by a 2-means partitioning algorithm 2nd stage: hierarchical clustering with automatic or user defined intercluster distance the automatic distance is based on the 'df', 'dd' and 'MAC' values from the centroids obtained in the first stage :math:`d = weight*df + 1 - weight*MAC + weight*dd` 3rd stage: 2-means partitioning of the physical and spurious poles. E. Neu et al. 1. Identify mode candidates from a large number of system orders. -> OMA Algorithm with n_max sufficiently high, i.e. number of mathematical modes should exceed the number pf physical modes at n <= n_max 2. Remove as many mathematical modes as possible. (a) Remove certainly mathematical modes using hard validation criteria. Re(\\lambda_n)>= 0 or Im(\\lambda_n)==0-> remove conjugates in OMA algorithm (b) Split modes into consistent and non-consistent sets using k-means clustering. p_i = [d_lambda, d_f, d_zeta, 1-MAC, dMPD] power transformation eq 11 h_Ti = ln(p_i) normalize: h_Ni = (h_Ti - mean(h_Ti)) / std(h_Ti) initialize centroids with (+std(h_Ni), -std(h_Ni)) 3. Divide the remaining modes into homogeneous sets using hierarchical clustering. (a) Derive cutoff distance from the probability distribution of the consistent modes. np.percentile(a,95) (b) Cluster the mode candidates based on a complex distance measure. average linkage / single linkage (c) Remove all but one mode from a single system order in one cluster. walk over each cluster and ensure each model order exists only once in the cluster, else remove the mode with a higher distance to the cluster center 4. Remove the small sets, which typically consist of mathematical modes. (a) Reject sets that are smaller than a threshold derived from the largest set size. no recommendations given in paper (threshold 50 %) (b) Use outlier rejection to remove natural frequency and damping outliers. skip (c) Select a single mode representative from the remaining modes in each cluster. "multivariate" median """
[docs] def __init__(self, modal_data, prep_signals=None): ''' stab_* in % ''' super().__init__(modal_data, prep_signals) assert self.capabilities['ev'] self.num_iter = 20000 self.weight_f = 1 self.weight_MAC = 1 self.weight_d = 1 self.weight_lambda = 1 self.threshold = None self.use_stabil = False
@staticmethod def decompress_flat_mask(compress_mask, flat_mask): # takes a flat mask generated on compressed data and restore it to its # decompressed form decompressed_mask = np.ma.copy(compress_mask.ravel()) flat_index = 0 for mask_index in range(decompressed_mask.shape[0]): if decompressed_mask[mask_index]: continue if flat_index >= len(flat_mask): decompressed_mask[mask_index] = True else: decompressed_mask[mask_index] = flat_mask[flat_index] flat_index += 1 return decompressed_mask.reshape(compress_mask.shape) def plot_mask(self, mask, save_path=None): plot.figure(tight_layout=1) od_mask = np.copy(self.order_dummy.mask) mf_mask = np.copy(self.masked_frequencies.mask) self.order_dummy.mask = self.get_stabilization_mask('mask_pre') self.masked_frequencies.mask = self.get_stabilization_mask('mask_pre') plot.scatter( self.masked_frequencies.compressed(), self.order_dummy.compressed(), marker='o', facecolors='none', edgecolors='grey', s=10) self.order_dummy.mask = mask self.masked_frequencies.mask = mask plot.scatter( self.masked_frequencies.compressed(), self.order_dummy.compressed(), marker='o', facecolors='none', edgecolors='black', s=10) self.order_dummy.mask = od_mask self.masked_frequencies.mask = mf_mask plot.ylim((0, 200)) plot.xlim((0, self.prep_signals.sampling_rate / 2)) plot.xlabel('Frequency [Hz]') plot.ylabel('Model Order ') plot.tight_layout() if save_path: plot.savefig(save_path + 'mask.pdf') else: plot.show() plot.pause(0.001) def automatic_clearing(self, num_iter=None): if self.state < 2: self.calculate_soft_critera_matrices() logger.info('Clearing physical modes automatically...') # 2-means clustering of all poles by all available criteria # algorithm minimizes euclidian distances if num_iter is not None: assert isinstance(num_iter, int) assert num_iter > 0 self.num_iter = num_iter # represent all the vectors by their soft criteria : # [index, i, d_lambda, d_f, d_xi, dMAC, dMPD] mask_pre = np.ma.array(self.get_stabilization_mask('mask_pre')) # in a second run mask_pre is itself masked self.freq_diffs.mask = np.ma.nomask self.damp_diffs.mask = np.ma.nomask self.lambda_diffs.mask = np.ma.nomask if self.capabilities['msh']: self.MAC_diffs.mask = np.ma.nomask # self.MPD_diffs.mask = np.ma.nomask self.MP_diffs.mask = np.ma.nomask # assuming there are no frequencies equal within given precision mask_pre_3d = self.freq_diffs == 0 soft_criteria_matrices = [] for matrix in [self.lambda_diffs, self.freq_diffs, self.damp_diffs]: matrix.mask = mask_pre_3d soft_criteria_matrices.append(matrix.min(axis=2)) if self.capabilities['msh']: self.MAC_diffs.mask = mask_pre_3d soft_criteria_matrices.append(self.MAC_diffs.min(axis=2)) # self.MPD_diffs.mask = mask_pre_3d # soft_criteria_matrices.append(self.MPD_diffs.min(axis=2)) self.MP_diffs.mask = mask_pre_3d soft_criteria_matrices.append(self.MP_diffs.min(axis=2)) for matrix in soft_criteria_matrices: matrix.mask = mask_pre # flatten unmasked values and remove first two model orders compressed_matrices = [matrix[2:,:].compressed() for matrix in soft_criteria_matrices] # dlambda, df, dd, dMAC, dMPD, stacked as list of size (order, # num_modes) all_poles = np.vstack(compressed_matrices).T # transform distribution (weibull like) to logarithmic scale (resembles # normal distribution) all_poles = np.log(all_poles) # whitening (scale to unit variance) significantly improves # convergence rates of the kmeans algorithm mean_all_poles = np.mean(all_poles, axis=0) std_all_poles = np.std(all_poles, axis=0) all_poles -= mean_all_poles all_poles /= std_all_poles std_dev = np.std(all_poles, axis=0) ideal_physical_values = -std_dev ideal_spurious_values = std_dev # the k-means algorithm is sensitive to the initial starting # values in order to converge to a solution # therefore two starting attempts are introduced ctr_init = np.array([ideal_physical_values, ideal_spurious_values]) # masked arrays are not supported by scipy's kmeans algorithm # all_poles an M by N array where the rows are observation vectors self.clear_ctr, idx = scipy.cluster.vq.kmeans2( all_poles, ctr_init, self.num_iter) logger.info( 'Possibly physical poles 1st stage: {0} Spurious poles 1st stage: {1}'.format( collections.Counter(idx)[0], collections.Counter(idx)[1])) # add unmasked values of the first two model orders that were # previously not considered mask_pre.mask = np.ma.nomask new_idx = np.hstack( (np.ones(np.sum(np.logical_not(mask_pre[:2,:]))), idx)) mask_autoclear = self.decompress_flat_mask(mask_pre, new_idx) # re-apply mask_pre, should not be necessary mask_autoclear = np.logical_or(mask_autoclear, mask_pre) # apply the hard validation criteria # mask_autoclear = np.logical_or( # mask_autoclear, self.modal_data.modal_damping < 0.001) # mask_autoclear = np.logical_or( # mask_autoclear, self.modal_data.modal_damping > 20) # compute the threshold as the 95th percentile of # P(threshold > d_lambda + d_MAC) = 0.95 soft_criteria_matrices[0].mask = np.ma.nomask soft_criteria_matrices[3].mask = np.ma.nomask distance_mat = soft_criteria_matrices[0] + soft_criteria_matrices[3] distance_mat.mask = mask_autoclear self.threshold = np.percentile(distance_mat.compressed(), q=95) self.masks['mask_autoclear'] = mask_autoclear self.update_stabilization_masks() self.state = 3 def automatic_classification(self, threshold=None, use_stabil=False): if self.state < 3 and not use_stabil: self.automatic_clearing() logger.info('Classifying physical modes automatically...') if use_stabil: mask_autoclear = self.get_stabilization_mask('mask_stable') # self.masks['mask_autoclear'] = mask_autoclear # self.update_stabilization_masks() # print(123) else: mask_autoclear = self.get_stabilization_mask('mask_autoclear') self.use_stabil = use_stabil if threshold is not None: assert isinstance(threshold, int) self.threshold = threshold if self.threshold is None: self.freq_diffs.mask = np.ma.nomask mask_pre_3d = self.freq_diffs == 0 self.lambda_diffs.mask = mask_pre_3d self.MAC_diffs.mask = mask_pre_3d distance_mat = self.lambda_diffs.min(axis=2)\ +self.MAC_diffs.min(axis=2) distance_mat.mask = mask_autoclear self.threshold = np.percentile(distance_mat.compressed(), q=95) # print(self.threshold) length_mat = np.product(mask_autoclear.shape) \ -np.sum(mask_autoclear) self.masked_lambda.mask = mask_autoclear lambda_compressed = self.masked_lambda.compressed() self.masked_lambda.mask = np.ma.nomask dim0, dim1 = mask_autoclear.shape mode_shapes_compressed = np.zeros( (self.modal_data.mode_shapes.shape[0], length_mat), dtype=np.complex128 ) n = 0 for i in range(dim0): for j in range(dim1): if mask_autoclear[i, j]: continue this_msh = self.modal_data.mode_shapes[:, j, i] mode_shapes_compressed[:, n] = this_msh n += 1 l = len(lambda_compressed) # print(l) div_lambda = np.maximum( np.repeat(np.expand_dims(np.abs(lambda_compressed), axis=1), lambda_compressed.shape[0], axis=1), np.repeat(np.expand_dims(np.abs(lambda_compressed), axis=0), lambda_compressed.shape[0], axis=0) ) lambda_proximity_matrix = np.abs( lambda_compressed - lambda_compressed.reshape((l, 1))) / div_lambda mac_proximity_matrix = 1 - \ calculateMAC(mode_shapes_compressed, mode_shapes_compressed) print(lambda_proximity_matrix.shape, mac_proximity_matrix.shape, np.sum(mask_autoclear), n) proximity_matrix = self.weight_lambda * lambda_proximity_matrix \ +self.weight_MAC * mac_proximity_matrix # correct round off errors proximity_matrix[ proximity_matrix < np.finfo(proximity_matrix.dtype).eps] = 0 self.proximity_matrix_sq = scipy.spatial.distance.squareform( proximity_matrix, checks=False) linkage_matrix = scipy.cluster.hierarchy.linkage( self.proximity_matrix_sq, method='average') self.cluster_assignments = scipy.cluster.hierarchy.fcluster( linkage_matrix, self.threshold, criterion='distance') for clusternr in range(1, max(self.cluster_assignments) + 1): flat_poles_ind = self.cluster_assignments != clusternr + 1 mask = self.decompress_flat_mask(mask_autoclear, flat_poles_ind) self.order_dummy.mask = mask for order in range(self.modal_data.max_model_order): if np.sum(self.order_dummy == order) > 1: logger.debug(f'Double Model Order: {self.order_dummy[order,:]}') self.order_dummy.mask = np.ma.nomask logger.info('Number of classified clusters: {}'.format( max(self.cluster_assignments))) self.state = 4 def automatic_selection(self, number=0): if self.state < 4: self.automatic_classification() # count clusters with more than a fraction of the number of elements of # the largest cluster and add that many zero size clusters poles = [] for cluster_ in range(1, 1 + max(self.cluster_assignments)): poles.append(np.where(self.cluster_assignments == cluster_)) nr_poles = [len(a[0]) for a in poles] max_nr = max(nr_poles) # enlarge list of poles to improve clustering # for nr in nr_poles: # if nr > max_nr * fraction: # nr_poles.append(0) # elif nr < max_nr * fraction: # continue nr_poles = np.array(nr_poles, dtype=np.float64) if number == 0: # 2-means clustering of the number-of-poles, return indices; # split into two clusters _, select_clusters = scipy.cluster.vq.kmeans2( np.array(nr_poles, dtype=np.float64), np.array([max_nr, 1e-12]), self.num_iter) else: meta_list = list(enumerate(nr_poles)) sorted_meta_list = sorted(meta_list, key=itemgetter(1), reverse=True) select_clusters = [1 for _ in nr_poles] for i in range(number): ind = sorted_meta_list[i][0] select_clusters[ind] = 0 logger.info('Number of physical modes: {0}'.format( collections.Counter(select_clusters)[0])) self.select_clusters = select_clusters self.nr_poles = nr_poles self.selection_cut_off = np.Infinity for i, b in zip(self.nr_poles, self.select_clusters): if not b: self.selection_cut_off = min(i - 1, self.selection_cut_off) logger.info('Minimum number of elements in retained clusters: {}'.format( self.selection_cut_off)) if self.use_stabil: mask_autoclear = self.get_stabilization_mask('mask_stable') # mask_autoclear = self.masks['mask_autoclear'] else: mask_autoclear = self.masks['mask_autoclear'] self.MAC_diffs.mask = self.MAC_diffs == 0 MAC_diffs = self.MAC_diffs.min(axis=2) self.MAC_diffs.mask = np.ma.nomask if 'mask_autosel' not in self.masks: self.masks['mask_autosel'] = [] for clusternr, inout in enumerate(select_clusters): if inout: continue flat_poles_ind = self.cluster_assignments != clusternr + 1 mask = self.decompress_flat_mask(mask_autoclear, flat_poles_ind) self.masks['mask_autosel'].append(np.ma.copy(mask)) # remove outermost values in all criteria, until only the # "multi-variate median" is left this pole is selected as the # representative solution for this cluster num_poles_left = np.product(mask.shape) - np.sum(mask) # print(clusternr, num_poles_left) while num_poles_left > 1: ind = [] for matrix, target in zip( [self.masked_frequencies, self.masked_damping, ], [np.ma.median, np.ma.median, ]): matrix.mask = mask val = target(matrix) min_ = np.min(matrix) max_ = np.max(matrix) if val - min_ <= max_ - val: ind.append(np.where(matrix == max_)) else: ind.append(np.where(matrix == min_)) for i in range(min(len(ind), num_poles_left - 1)): this_ind = ind[i] mask[this_ind] = True num_poles_left = np.product(mask.shape) - np.sum(mask) # print(clusternr, num_poles_left, len(ind)) select_mode = np.where(np.logical_not(mask)) # self.select_modes.append((select_mode[0][0], select_mode[1][0])) self.add_mode((select_mode[0][0], select_mode[1][0])) if self.select_callback is not None: # for matrix in [self.masked_frequencies, self.masked_damping, # MAC_diffs, self.MPC_matrix, self.MPD_matrix]: # # matrix.mask = np.ma.nomask self.select_callback(self.select_modes[-1]) for matrix in [self.masked_frequencies, self.masked_damping, MAC_diffs, self.MPC_matrix, self.MPD_matrix]: matrix.mask = np.ma.nomask self.state = 5 def plot_clearing(self, save_path=None): mask_autoclear = self.masks['mask_autoclear'] mask_pre = self.get_stabilization_mask('mask_pre') self.plot_mask(mask_autoclear, save_path) self.freq_diffs.mask = self.freq_diffs == 0 freq_diffs = self.freq_diffs.min(axis=2) self.freq_diffs.mask = np.ma.nomask self.MAC_diffs.mask = self.MAC_diffs == 0 MAC_diffs = self.MAC_diffs.min(axis=2) self.MAC_diffs.mask = np.ma.nomask self.damp_diffs.mask = self.damp_diffs == 0 damp_diffs = self.damp_diffs.min(axis=2) self.damp_diffs.mask = np.ma.nomask crits = [freq_diffs, damp_diffs, MAC_diffs, self.MPC_matrix, self.MPD_matrix] labels = ['df', 'dd', 'MAC', 'MPC', 'MPD'] new_crits = [] for j, b in enumerate(crits): new_crits.append(b) for i, a in enumerate(new_crits): if a is b: continue plot.figure(tight_layout=1) a.mask = mask_autoclear b.mask = mask_autoclear plot.plot(a.compressed(), b.compressed(), ls='', marker=',') plot.plot( np.mean(a), np.mean(b), ls='', marker='d', color='black') a.mask = mask_pre b.mask = mask_pre plot.plot( a.compressed(), b.compressed(), ls='', marker=',', color='grey') plot.plot( np.mean(a), np.mean(b), ls='', marker='d', color='grey') labela = labels[i] labelb = labels[j] plot.xlabel(labela) plot.ylabel(labelb) plot.xlim((0, 1)) plot.ylim((0, 1)) if save_path is not None: plot.savefig( save_path + 'clear_{}_{}.pdf'.format(labela, labelb)) else: # print('show') plot.show() plot.pause(0.01) for crit in crits: crit.mask = np.ma.nomask def plot_classification(self, save_path=None): rel_matrix = scipy.cluster.hierarchy.linkage( self.proximity_matrix_sq, method='average') lvs = scipy.cluster.hierarchy.leaves_list(rel_matrix) def _llf(_id): if len(lvs) > 500: if (np.where(_id == lvs)[0][0] % 100 == 0): return str(np.where(_id == lvs)[0][0]) else: return str('') else: if (np.where(_id == lvs)[0][0] % 10 == 0): return str(np.where(_id == lvs)[0][0]) else: return str('') fig = plot.figure(tight_layout=1) ax = fig.add_subplot(111) scipy.cluster.hierarchy.dendrogram( rel_matrix, leaf_label_func=_llf, color_threshold=self.threshold, leaf_font_size=16, leaf_rotation=40) # ax = plot.gca() ax.set_xlabel('Mode number [-]') ax.set_ylabel('Distance [-]') ax.axhline(self.threshold, c='r', ls='--', linewidth=3) plot.tight_layout() if save_path is not None: plot.savefig(save_path + 'dendrogram.pdf') else: # print('show') plot.show() plot.pause(0.001)
[docs] def plot_selection(self, save_path=None): """ Plot relevant results of the clustering.""" plot.figure(tight_layout=1) in_poles = list(self.nr_poles[self.nr_poles >= self.selection_cut_off]) in_poles.sort(reverse=True) out_poles = self.nr_poles[self.nr_poles < self.selection_cut_off] out_poles = list(out_poles[out_poles > 0]) out_poles.sort(reverse=True) plot.bar(range(len(in_poles)), in_poles, facecolor='red', edgecolor='none', align='center',) # print(list(range(len(in_poles),len(self.nr_poles))),out_poles) plot.bar( range( len(in_poles), len(in_poles) + len(out_poles)), out_poles, facecolor='blue', edgecolor='none', align='center', ) plot.xlim((0, len(self.nr_poles))) plot.tight_layout() if save_path is not None: plot.savefig(save_path + 'cluster_sizes.pdf') else: plot.show() plot.pause(0.001) fig = plot.figure(tight_layout=1) ax1 = fig.add_subplot(211) mask_autoclear = self.masks['mask_autoclear'] mask_pre = self.get_stabilization_mask('mask_pre') mask_pre_ = np.logical_not( np.logical_and(np.logical_not(mask_pre), mask_autoclear)) self.order_dummy.mask = mask_pre_ self.masked_frequencies.mask = mask_pre_ ax1.scatter( self.masked_frequencies.compressed(), self.order_dummy.compressed(), marker='o', facecolors='none', edgecolors='grey', s=10, label='pole') self.order_dummy.mask = mask_autoclear self.masked_frequencies.mask = mask_autoclear ax1.scatter( self.masked_frequencies.compressed(), self.order_dummy.compressed(), marker='o', facecolors='none', edgecolors='black', s=10, label='stable pole') self.order_dummy.mask = np.ma.nomask self.masked_frequencies.mask = np.ma.nomask ax1.autoscale_view(tight=True) # ax1.set_xlabel('Frequency [Hz]') ax1.set_ylabel('Model order [-]') ax1.set_title('Stabilization Diagram') ax1.set_ylim((0, 200)) for mask in self.masks['mask_autosel']: self.masked_frequencies.mask = mask plot.axvspan( self.masked_frequencies.min(), self.masked_frequencies.max(), facecolor='blue', alpha=.3, edgecolor='none') # print(self.masked_frequencies.min(), self.masked_frequencies.max(),np.ma.mean(self.masked_frequencies)) self.masked_frequencies.mask = np.ma.nomask self.order_dummy.mask = np.ma.nomask for mode in self.select_modes: f = self.modal_data.modal_frequencies[mode] n = self.order_dummy[mode] ax1.scatter( f, n, facecolors='none', marker='o', edgecolors='red', s=10) num_poles = [] fpoles = [] for clusternr in range(1, 1 + max(self.cluster_assignments)): flat_poles_ind = self.cluster_assignments != clusternr mask = self.decompress_flat_mask(mask_autoclear, flat_poles_ind) self.masked_frequencies.mask = mask num_poles.append(np.product(mask.shape) - np.sum(mask)) # print(np.ma.mean(self.masked_frequencies)) fpoles.append(np.ma.mean(self.masked_frequencies)) ax2 = fig.add_subplot(212, sharex=ax1) ax2.bar(fpoles, num_poles, width=0.01, align='center', edgecolor='none') ax2.axhline(self.selection_cut_off, c='r', ls='--', linewidth=2) ax2.set_xlabel('Frequency [Hz]') ax2.set_ylabel('Nr. of elements') ax2.set_title('Clusters') plot.tight_layout() plot.xlim((0, self.prep_signals.sampling_rate / 2)) # plot.savefig('Main_plot_clusters_' + self.timestamp + '.' + self.format_plot, format=self.format_plot) if save_path is not None: plot.savefig(save_path + 'select_clusters.pdf') else: # print('show') plot.show() plot.pause(0.001)
# plot.show(block=False) def return_results(self): all_f = [] all_d = [] all_n = [] all_std_f = [] all_std_d = [] all_MPC = [] all_MPD = [] all_MP = [] all_msh = [] all_MC = [] # for select_mode, mask in zip(self.select_modes, # self.masks['mask_autosel']): for select_mode in self.select_modes: n, f, stdf, d, stdd, mpc, mp, mpd, dmp, dmpd, mtn, MC, ex_1, ex_2 = self.get_modal_values( select_mode) msh = self.get_mode_shape(select_mode) all_n.append(n) all_f.append(f) all_std_f.append(stdf) all_d.append(d) all_std_d.append(stdd) all_MPC.append(mpc) all_MP.append(mp) all_MPD.append(mpd) all_MC.append(MC) all_msh.append(msh) continue # self.masked_frequencies.mask = np.ma.nomask # select_f = self.masked_frequencies[select_mode] # # self.masked_damping.mask = np.ma.nomask # select_d = self.masked_damping[select_mode] # # select_order = self.order_dummy[select_mode] # # if self.capabilities['msh']: # self.MPC_matrix.mask = np.ma.nomask # select_MPC = self.MPC_matrix[select_mode] # # # self.MP_matrix.mask = np.ma.nomask # # select_MP = self.MP_matrix[select_mode] # # self.MPD_matrix.mask = np.ma.nomask # select_MPD = self.MPD_matrix[select_mode] # # select_msh = self.modal_data.mode_shapes[ # :, select_mode[1], select_mode[0]] # # # scaling of mode shape # #select_msh /= select_msh[np.argmax(np.abs(select_msh))][0] # if self.capabilities['MC']: # self.modal_data.modal_contributions[select_mode] # # all_f.append(float(select_f)) # all_d.append(float(select_d)) # all_n.append(float(select_order)) # all_MPC.append(float(select_MPC)) # # all_MP.append(float(select_MP)) # all_MPD.append(float(select_MPD)) # all_msh.append(select_msh) return np.array(all_n), np.array(all_f), np.array(all_std_f), np.array(all_d), np.array( all_std_d), np.array(all_MPC), np.array(all_MP), np.array(all_MPD), np.array(all_MC), np.array(all_msh),
[docs] class StabilPlot(object):
[docs] def __init__(self, stabil_calc, fig=None): ''' stab_* in % ''' super().__init__() if not isinstance(stabil_calc, StabilCalc): logger.warning(f'Argument stabil_calc is wrong object type {type(stabil_calc)}') self.stabil_calc = stabil_calc if fig is None: self.fig = Figure(facecolor='white') # , dpi=100, figsize=(16, 12)) self.fig.set_tight_layout(True) # canvas = FigureCanvasBase(self.fig) else: self.fig = fig self.ax = self.fig.add_subplot(111) # self.ax2 = self.ax.twinx() # self.ax2.set_navigate(False) # if self.fig.canvas: if False: self.init_cursor() else: self.cursor = None marker_obj_1 = MarkerStyle('o') path_1 = marker_obj_1.get_path().transformed( marker_obj_1.get_transform()) marker_obj_2 = MarkerStyle('+') path_2 = marker_obj_2.get_path().transformed( marker_obj_2.get_transform()) path_stab = Path.make_compound_path(path_1, path_2) marker_obj_2 = MarkerStyle('x') path_2 = marker_obj_2.get_path().transformed( marker_obj_2.get_transform()) path_auto = Path.make_compound_path(path_1, path_2) fp = FontProperties(family='monospace', weight=0, size='large') self.psd_plot = [] self.stable_plot = { 'plot_pre': None, # 'plot_ad': None, # 'plot_df': None, # 'plot_dd': None, 'plot_stable': None, } self.colors = { 'plot_pre': 'grey', # 'plot_ad': 'grey', # 'plot_df': 'black', # 'plot_dd': 'black', 'plot_stable': 'black', } self.markers = { 'plot_pre': 'o', # 'plot_ad': TextPath((-2, -4), '\u00b7 d', prop=fp, size=10), # 'plot_df': TextPath((-2, -4), '\u00b7 f', prop=fp, size=10), # 'plot_dd': TextPath((-2, -4), '\u00b7 d', prop=fp, size=10), 'plot_stable': path_stab, # } self.labels = { 'plot_pre': 'all poles', # 'plot_ad': 'damping criterion', # 'plot_df': 'unstable in frequency', # 'plot_dd': 'unstable in damping', 'plot_stable': 'stable poles', } if self.stabil_calc.capabilities['std']: self.stable_plot['plot_stdf'] = None # uncertainty frequency self.stable_plot['plot_stdd'] = None # uncertainty damping self.colors['plot_stdf'] = 'grey' self.colors['plot_stdd'] = 'grey' self.labels['plot_stdf'] = 'uncertainty bounds frequency criterion' self.labels['plot_stdd'] = 'uncertainty bounds damping criterion' self.markers['plot_stdf'] = 'd' self.markers['plot_stdd'] = 'd' if self.stabil_calc.capabilities['msh']: # absolute modal phase collineratity self.stable_plot['plot_mpc'] = None # absolute mean phase deviation self.stable_plot['plot_mpd'] = None self.stable_plot['plot_dmac'] = None # difference mac # self.colors['plot_mpc'] = 'grey' # self.colors['plot_mpd']= 'grey' # self.colors['plot_dmac']= 'black' # self.labels['plot_mpc']= 'modal phase collinearity criterion' # self.labels['plot_mpd']= 'mean phase deviation criterion' # self.labels['plot_dmac']= 'unstable in mac' # self.markers['plot_mpc']= TextPath((-2, -4), '\u00b7 v', prop=fp, size=10) # self.markers['plot_mpd']= TextPath((-2, -4), '\u00b7 v', prop=fp, size=10) # self.markers['plot_dmac']= TextPath((-2, -4), '\u00b7 v', prop=fp, size=10) if self.stabil_calc.capabilities['auto']: # auto clearing by 2Means Algorithm self.stable_plot['plot_autoclear'] = None # autoselection by 2 stage hierarchical clustering self.stable_plot['plot_autosel'] = None self.colors['plot_autoclear'] = 'black' self.colors['plot_autosel'] = 'rainbow' self.labels['plot_autoclear'] = 'autoclear poles' self.labels['plot_autosel'] = 'autoselect poles' self.markers['plot_autoclear'] = path_auto self.markers['plot_autosel'] = 'o' if self.stabil_calc.capabilities['MC']: # absolute modal error contribution self.stable_plot['plot_MC'] = None self.colors['plot_MC'] = 'grey' self.labels['plot_MC'] = 'modal error contribution criterion' self.markers['plot_MC'] = 'x' if self.stabil_calc.capabilities['mtn']: # difference modal transfer norm self.stable_plot['plot_dmtn'] = None self.stable_plot['plot_mtn'] = None # absolute modal transfer norm self.colors['plot_dmtn'] = 'black' self.colors['plot_mtn'] = 'grey' self.labels['plot_mtn'] = 'modal transfer norm criterion' self.labels['plot_dmtn'] = 'unstable in modal transfer norm' self.markers['plot_mtn'] = '>' self.markers['plot_dmtn'] = '>' if False: self.stable_plot['plot_dev'] = None # difference eigenvalue self.colors['plot_dev'] = 'grey' self.labels['plot_dev'] = 'unstable in eigenvalue' self.markers['plot_dev'] = TextPath( (-2, -4), '\u00b7 \u03bb', prop=fp, size=10), self.zorders = {key: key != 'plot_pre' for key in self.labels.keys()} self.zorders['plot_autosel'] = 2 self.sizes = {key: 30 for key in self.labels.keys()} self.prepare_diagram() # that list should eventually be replaced by a matplotlib.collections # collection self.scatter_objs = [None for _ in self.stabil_calc.select_modes] self.stabil_calc.add_callback('add_mode', self.add_mode) self.stabil_calc.add_callback('remove_mode', self.remove_mode) if stabil_calc.select_modes: for mode in stabil_calc.select_modes: list_ind = self.stabil_calc.select_modes.index(mode) self.add_mode(mode, list_ind)
def init_cursor(self, visible=True): self.cursor = DataCursor( ax=self.ax, horizOn=visible, vertOn=visible, order_data=self.stabil_calc.order_dummy, f_data=self.stabil_calc.masked_frequencies, datalist=self.stabil_calc.select_modes, color='black', useblit=True) self.fig.canvas.mpl_connect( 'button_press_event', self.mode_selected) self.fig.canvas.mpl_connect( 'resize_event', self.cursor.fig_resized) return self.cursor def prepare_diagram(self): self.ax.set_ylim((0, self.stabil_calc.modal_data.max_model_order)) self.ax.locator_params( 'y', tight=True, nbins=self.stabil_calc.modal_data.max_model_order // 5) x_lims = (0, self.stabil_calc.get_max_f()) self.ax.set_xlim(x_lims) self.ax.autoscale_view(tight=True) self.ax.set_xlabel('Frequency [Hz]') self.ax.set_ylabel('Model Order') def update_stabilization(self, **criteria): # print(criteria) # recalculate stabilization masks self.stabil_calc.update_stabilization_masks(**criteria) # update stabil plots if values have changed # if 'd_range' in criteria: # self.plot_stabil('plot_ad') if 'stdf_max' in criteria and self.stabil_calc.capabilities['std']: self.plot_stabil('plot_stdf') if 'stdd_max' in criteria and self.stabil_calc.capabilities['std']: self.plot_stabil('plot_stdd') # if 'mpc_min' in criteria and self.stabil_calc.capabilities['msh']: # self.plot_stabil('plot_mpc') # if 'mpd_max' in criteria and self.stabil_calc.capabilities['msh']: # self.plot_stabil('plot_mpd') if 'mtn_min' in criteria and self.stabil_calc.capabilities['mtn']: self.plot_stabil('plot_mtn') # if 'df_max' in criteria: # self.plot_stabil('plot_df') # if 'dd_max' in criteria: # self.plot_stabil('plot_dd') # if 'dmac_max' in criteria and self.stabil_calc.capabilities['msh']: # self.plot_stabil('plot_dmac') if 'dev_min' in criteria and False: self.plot_stabil('plot_dev') if 'dmtn_min' in criteria and self.stabil_calc.capabilities['mtn']: self.plot_stabil('plot_dmtn') if 'MC_min' in criteria and self.stabil_calc.capabilities['MC']: self.plot_stabil('plot_MC') self.plot_stabil('plot_pre') self.plot_stabil('plot_stable') if self.stabil_calc.capabilities['auto']: if self.stabil_calc.state >= 3 and not self.stabil_calc.use_stabil: self.plot_stabil('plot_autoclear') if self.stabil_calc.state >= 5: self.plot_stabil('plot_autosel') if self.stabil_calc.capabilities['std']: self.plot_stabil('plot_stdf') # update the cursors snap mask if self.cursor: cursor_name_mask = self.cursor.name_mask cursor_mask = self.stabil_calc.get_stabilization_mask( cursor_name_mask) self.cursor.set_mask(cursor_mask, cursor_name_mask) def plot_stabil_autosel(self, color, marker, zorder, size, label): name = 'plot_autosel' if self.stable_plot[name] is not None: for plot in self.stable_plot[name]: plot.remove() visibility = True masks = self.stabil_calc.masks['mask_autosel'] colors = list(matplotlib.cm.gist_rainbow( np.linspace( 0, 1, len(masks)))) # @UndefinedVariable shuffle(colors) self.stable_plot[name] = [] for color, mask in zip(colors, masks): self.stabil_calc.masked_frequencies.mask = mask self.stabil_calc.order_dummy.mask = mask self.stable_plot[name].append(self.ax.scatter( self.stabil_calc.masked_frequencies.compressed(), self.stabil_calc.order_dummy.compressed(), zorder=zorder, facecolors=color, edgecolors='none', marker=marker, alpha=0.4, s=size, label=label, visible=visibility)) return def plot_stabil_stdf(self, name, color, zorder, label): if self.stable_plot[name] is not None: try: children = self.stable_plot[name].get_children() if children: visibility = children[0].get_visible() self.stable_plot[name].remove() except IndexError as e: logger.debug(f'Failed to remove stabil_stdf objects {e}') visibility = True else: visibility = True mask = self.stabil_calc.get_stabilization_mask('mask_stable') self.stabil_calc.masked_frequencies.mask = mask self.stabil_calc.order_dummy.mask = mask if self.stabil_calc.capabilities['std']: std_frequencies = np.ma.array( self.stabil_calc.modal_data.std_frequencies) std_frequencies.mask = mask # standard error num_blocks = self.stabil_calc.modal_data.num_blocks std_error = std_frequencies.compressed() / np.sqrt(num_blocks) # 95 % confidence interval -> student t (tabulated percentage # points) * std_error (approx 2* std_error) self.stable_plot[name] = self.ax.errorbar(self.stabil_calc.masked_frequencies.compressed(), self.stabil_calc.order_dummy.compressed(), xerr=scipy.stats.t.ppf(0.95, num_blocks) * std_error, zorder=zorder, fmt='none', ecolor=color, label=label, visible=visibility) return def plot_stabil(self, name): # print(name) color = self.colors[name] marker = self.markers[name] # print(marker, name) zorder = self.zorders[name] size = self.sizes[name] label = self.labels[name] if name == 'plot_autosel': self.plot_stabil_autosel(color, marker, zorder, size, label) elif name == 'plot_stdf': self.plot_stabil_stdf(name, color, zorder, label) else: if self.stable_plot[name] is not None: visibility = self.stable_plot[name].get_visible() self.stable_plot[name].remove() else: visibility = True mask = self.stabil_calc.get_stabilization_mask( name.replace('plot', 'mask')) self.stabil_calc.masked_frequencies.mask = mask self.stabil_calc.order_dummy.mask = mask self.stable_plot[name] = self.ax.scatter( self.stabil_calc.masked_frequencies.compressed(), self.stabil_calc.order_dummy.compressed(), zorder=zorder, facecolors='none', edgecolors=color, marker=marker, s=size, label=label, visible=visibility) mask_stable = self.stabil_calc.get_stabilization_mask('mask_pre') self.stabil_calc.masked_frequencies.mask = mask_stable self.stabil_calc.order_dummy.mask = mask_stable self.fig.canvas.draw_idle() def show_MC(self, b=False): if b: if not self.stabil_calc.capabilities['MC']: logger.warning('Modal contributions are not computed and cannot be displayed.') return ylim = self.fig.axes[0].get_ylim() if len(self.fig.axes) < 2: self.fig.add_subplot(1, 2, 2, sharey=self.fig.axes[0]) gs = matplotlib.gridspec.GridSpec( 1, 2, width_ratios=(6, 1), wspace=0.01, hspace=0) self.fig.axes[0].set_subplotspec(gs[0]) self.fig.axes[1].set_subplotspec(gs[1]) ax = self.fig.axes[1] MCs = np.zeros((self.stabil_calc.modal_data.max_model_order)) for order in range(self.stabil_calc.modal_data.max_model_order): sum_mc = np.sum( self.stabil_calc.modal_data.modal_contributions[order,:]) if np.iscomplex(sum_mc): # abs used for complex modal contributions (pLSCF) sum_mc = np.abs(sum_mc) MCs[order] = sum_mc ax.plot( MCs, list( range( self.stabil_calc.modal_data.max_model_order)), marker='o', fillstyle='full', markerfacecolor='white', markeredgecolor='grey', color='darkgrey', markersize=4) ax.grid(True, axis='x') ax.set_ylim(ylim) ax.yaxis.tick_right() ax.set_xlim([0, 1]) ax.set_xticks([ 0.25, 0.5, 0.75, 1]) ax.set_xticklabels([ '0.25', '0.5', '0.75', '1']) else: if len(self.fig.axes) < 2: return ax = self.fig.axes[1] self.fig.delaxes(ax) gs = matplotlib.gridspec.GridSpec(1, 1, wspace=0, hspace=0) self.fig.axes[0].set_subplotspec(gs[0]) self.fig.canvas.draw_idle()
[docs] def plot_sv_psd(self, b, NFFT=None): ''' .. TODO:: * add GUI for choosing PSD parameters ''' # cases: # create new plot with defaults (True, None) # hide plot (False, ...) # show last plot (True, NFFT==n_lines) # recreate plot with other NFFT (True, NFFT!=n_lines) # # check if something was drawn already # check if it should be hidden # hide it # check if parameters match if self.psd_plot and not b: for channel in self.psd_plot: for line in channel: line._visible = b self.fig.canvas.draw_idle() return elif self.psd_plot and NFFT == self.stabil_calc.prep_signals.n_lines: for channel in self.psd_plot: for line in channel: line._visible = b self.fig.canvas.draw_idle() return elif self.psd_plot: for channel in self.psd_plot: for line in channel: line.remove() self.psd_plot = [] if self.stabil_calc.prep_signals is None: raise RuntimeError('Measurement Data was not provided!') if not b: return # if no PSD has been computed before, set a default value if NFFT is None and self.stabil_calc.prep_signals.n_lines is None: NFFT = 2048 sv_psd = self.stabil_calc.prep_signals.sv_psd(NFFT) freq_psd = self.stabil_calc.prep_signals.freqs # sv_psd -= np.min(sv_psd) # sv_psd /= (np.max(sv_psd)) * 0.5 * \ # self.stabil_calc.modal_data.max_model_order sv_psd_db_scaled = 10 * np.log10(sv_psd) sv_psd_db_scaled -= np.min(sv_psd_db_scaled) sv_psd_db_scaled /= 2 * np.max(sv_psd_db_scaled) n_channels = sv_psd.shape[0] for channel in range(n_channels): self.psd_plot.append(self.ax.plot(freq_psd, sv_psd_db_scaled[channel,:], color='grey', alpha=(n_channels - channel) / n_channels, linestyle='solid', visible=b, zorder=-1, transform=self.ax.get_xaxis_transform())) self.fig.canvas.draw_idle()
def update_xlim(self, xlim): self.ax.set_xlim(xlim) self.fig.canvas.draw_idle() def update_ylim(self, ylim): self.ax.set_ylim(ylim) self.fig.canvas.draw_idle() # #@pyqtSlot(bool) # def snap_frequency(self, b=True): # if b: # mask = self.stabil_calc.get_stabilization_mask('mask_df') # self.cursor.set_mask(mask, 'mask_df') # # #@pyqtSlot(bool) # def snap_damping(self, b=True): # if b: # mask = self.stabil_calc.get_stabilization_mask('mask_dd') # self.cursor.set_mask(mask, 'mask_dd') # # #@pyqtSlot(bool) # def snap_vector(self, b=True): # if b: # mask = self.stabil_calc.get_stabilization_mask('mask_dmac') # self.cursor.set_mask(mask, 'mask_dmac') # # #@pyqtSlot(bool) # def snap_stable(self, b=True): # if b: # mask = self.stabil_calc.get_stabilization_mask('mask_stable') # self.cursor.set_mask(mask, 'mask_stable') # # #@pyqtSlot(bool) # def snap_all(self, b=True): # if b: # mask = self.stabil_calc.get_stabilization_mask('mask_pre') # self.cursor.set_mask(mask, 'mask_pre') # # #@pyqtSlot(bool) # def snap_clear(self, b=True): # if b: # mask = self.stabil_calc.get_stabilization_mask('mask_autoclear') # self.cursor.set_mask(mask, 'mask_autoclear') # # #@pyqtSlot(bool) # def snap_select(self, b=True): # if b: # mask = self.stabil_calc.get_stabilization_mask('mask_autoselect') # self.cursor.set_mask(mask, 'mask_autoselect') # @pyqtSlot(int) def toggle_df(self, b): plot_obj = self.stable_plot['plot_df'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_stdf(self, b): plot_obj = self.stable_plot['plot_stdf'] if plot_obj is None: return for obj in plot_obj.get_children(): if obj is None: continue obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_stdd(self, b): plot_obj = self.stable_plot['plot_stdd'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_ad(self, b): plot_obj = self.stable_plot['plot_ad'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_dd(self, b): plot_obj = self.stable_plot['plot_dd'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_dmac(self, b): plot_obj = self.stable_plot['plot_dmac'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_mpc(self, b): plot_obj = self.stable_plot['plot_mpc'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_mpd(self, b): plot_obj = self.stable_plot['plot_dmac'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_mtn(self, b): plot_obj = self.stable_plot['plot_mtn'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_dev(self, b): plot_obj = self.stable_plot['plot_dev'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_dmtn(self, b): plot_obj = self.stable_plot['plot_dmtn'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_stable(self, b): # print('plot_stable',b) plot_obj = self.stable_plot['plot_stable'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_clear(self, b): # print('plot_autoclear',b) plot_obj = self.stable_plot['plot_autoclear'] if plot_obj is None: return plot_obj.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_select(self, b): plot_obj = self.stable_plot['plot_autosel'] if plot_obj is None: return for plot_obj_ in plot_obj: plot_obj_.set_visible(b) self.fig.canvas.draw_idle() # @pyqtSlot(bool) # @pyqtSlot(int) def toggle_all(self, b): plot_obj = self.stable_plot['plot_pre'] if plot_obj is None: # print('plot_pre not found') return plot_obj.set_visible(b) self.fig.canvas.draw_idle() def save_figure(self, fname=None): startpath = rcParams.get('savefig.directory', '') startpath = os.path.expanduser(startpath) # start = os.path.join(startpath, self.fig.canvas.get_default_filename()) if fname: if startpath == '': # explicitly missing key or empty str signals to use cwd rcParams['savefig.directory'] = startpath else: # save dir for next time rcParams['savefig.directory'] = os.path.dirname(str(fname)) try: # scatter_objs = [] # ord_mask = self.stabil_calc.order_dummy.mask # self.stabil_calc.order_dummy.mask = np.ma.nomask # f_mask = self.stabil_calc.masked_frequencies.mask # self.stabil_calc.masked_frequencies.mask = np.ma.nomask # # for mode in self.stabil_calc.select_modes: # mode = tuple(mode) # y, x = self.stabil_calc.order_dummy[ # mode], self.stabil_calc.masked_frequencies[mode] # # print(x,y) # scatter_objs.append( # self.ax.scatter( # x, # y, # facecolors='none', # edgecolors='red', # s=200, # visible=True)) # # self.stabil_calc.order_dummy.mask = ord_mask # self.stabil_calc.masked_frequencies.mask = f_mask # # text = self.ax.annotate(str(self.stabil_calc.start_time), xy=( # 0.85, 0.99), xycoords='figure fraction') self.fig.canvas.print_figure(str(fname)) # text.remove() # # for scatter_obj in scatter_objs: # scatter_obj.remove() # del scatter_objs except Exception as e: import traceback traceback.print_exc()
[docs] def mode_selected(self, event): ''' connect this function to the button press event of the canvas ''' if event.name == "button_press_event" and event.inaxes == self.ax: # Check if in zooming or panning mode; credit: https://stackoverflow.com/questions/48446351/ zooming_panning = False try: # Qt Backend zooming_panning = (self.fig.canvas.cursor().shape() != 0) # 0 is the arrow, which means we are not zooming or panning. except Exception: pass try: # nbAgg Backend zooming_panning = str(self.fig.canvas.toolbar.cursor) != 'Cursors.POINTER' except Exception: pass if zooming_panning: logger.debug('In zooming or panning mode') return ind = self.cursor.i if ind is None: logger.warning('Empty mode index for the button_press_event. Ensure cursor is working.') return if ind not in self.stabil_calc.select_modes: self.stabil_calc.add_mode(ind) else: self.stabil_calc.remove_mode(ind)
def toggle_mode(self, datapoint): datapoint = tuple(datapoint) if datapoint in self.stabil_calc.select_modes: self.stabil_calc.remove_mode(datapoint) else: self.stabil_calc.add_mode(datapoint) def add_mode(self, datapoint, list_ind): # datapoint = tuple(datapoint) # list_ind = self.stabil_calc.add_mode(datapoint) if len(self.scatter_objs) <= list_ind: self.scatter_objs.append(None) if self.scatter_objs[list_ind] is not None: self.scatter_objs[list_ind].remove() x = self.stabil_calc.masked_frequencies[datapoint] y = self.stabil_calc.order_dummy[datapoint] # x, y = self.x[datapoint], self.y[datapoint] self.scatter_objs[list_ind] = self.ax.scatter( x, y, facecolors='none', edgecolors='red', s=200, visible=True, zorder=3) # TODO:: improve Performance by blitting the scatter_objs if False: # if self.useblit: if self.background is not None: self.fig.canvas.restore_region(self.background) for scatter in self.scatter_objs: scatter.set_visible(True) self.ax.draw_artist(scatter) scatter.set_visible(False) self.ax.draw_artist(self.linev) self.ax.draw_artist(self.lineh) self.fig.canvas.blit(self.ax.bbox) else: # for scatter in self.scatter_objs: # scatter.set_visible(True) self.fig.canvas.draw() # def add_modes(self, datalist): # # convenience function for add_datapoint # for datapoint in datalist: # self.add_mode(datapoint) def remove_mode(self, datapoint, list_ind): # datapoint = tuple(datapoint) # list_ind = self.stabil_calc.remove_mode(datapoint) if list_ind is not None: self.scatter_objs[list_ind].remove() del self.scatter_objs[list_ind] self.fig.canvas.draw()
# def remove_modes(self, datalist): # # convenience function for remove_datapoint # for datapoint in datalist: # self.remove_mode(datapoint)
[docs] class DataCursor(Cursor): # create and edit an instance of the matplotlib default Cursor widget # show_current_info = pyqtSignal(tuple) # mode_selected = pyqtSignal(tuple) # mode_deselected = pyqtSignal(tuple)
[docs] def __init__( self, ax, order_data, f_data, mask=None, useblit=True, datalist=[], **lineprops): Cursor.__init__(self, ax, useblit=useblit, **lineprops) # QObject.__init__(self) self.callbacks = {'show_current_info':lambda *args, **kwargs: None, 'mode_selected':lambda *args, **kwargs: None, 'mode_deselected':lambda *args, **kwargs: None, } self.ax = ax self.y = order_data self.y.mask = np.ma.nomask self.x = f_data self.x.mask = np.ma.nomask if mask is not None: self.mask = mask else: self.mask = np.ma.nomask self.name_mask = 'mask_stable' self.i = None # that list should eventually be replaced by a matplotlib.collections # collection # self.scatter_objs = [] # # self.datalist = datalist # if datalist: # self.add_datapoints(datalist) self.fig_resized()
def add_callback(self, name, func): assert name in ['show_current_info', 'mode_selected', 'mode_deselected'] self.callbacks[name] = func # def add_datapoint(self, datapoint): # datapoint = tuple(datapoint) # if datapoint not in self.datalist: # self.datalist.append(datapoint) # x, y = self.x[datapoint], self.y[datapoint] # # print(x,y) # self.scatter_objs.append(self.ax.scatter( # x, y, facecolors='none', edgecolors='red', s=200, visible=False)) # self.callbacks['mode_selected'](datapoint) # # def add_datapoints(self, datalist): # # convenience function for add_datapoint # for datapoint in datalist: # self.add_datapoint(datapoint) # # def remove_datapoint(self, datapoint): # datapoint = tuple(datapoint) # if datapoint in self.datalist: # ind = self.datalist.index(datapoint) # self.scatter_objs[ind].remove() # del self.scatter_objs[ind] # self.datalist.remove(datapoint) # self.callbacks['mode_deselected'](datapoint) # else: # print(datapoint, 'not in self.datalist') # # def remove_datapoints(self, datalist): # # convenience function for remove_datapoint # for datapoint in datalist: # self.remove_datapoint(datapoint) def set_mask(self, mask, name): self.mask = mask self.fig_resized() self.name_mask = name def fig_resized(self, event=None): # self.background = self.ax.figure.canvas.copy_from_bbox(self.ax.figure.bbox) # if event is not None: # self.width, self.height = event.width, event.height # else: # self.width, self.height = self.ax.get_figure( # ).canvas.get_width_height() self.xpix, self.ypix = self.ax.transData.transform( np.vstack([self.x.flatten(), self.y.flatten()]).T).T self.xpix.shape = self.x.shape self.xpix.mask = self.mask self.ypix.shape = self.y.shape self.ypix.mask = self.mask
[docs] def onmove(self, event): if self.ignore(event): return ''' 1. Override event.data to force it to snap-to nearest data item 2. On a mouse-click, select the data item and append it to a list of selected items 3. The second mouse-click on a previously selected item, removes it from the list ''' if (self.xpix.mask).all(): # i.e. no stable poles return if event.name == "motion_notify_event": # get cursor coordinates xdata = event.xdata ydata = event.ydata if xdata is None or ydata is None: return xData_yData_pixels = self.ax.transData.transform( np.vstack([xdata, ydata]).T) xdata_pix, ydata_pix = xData_yData_pixels.T self.fig_resized() self.i = self.findIndexNearestXY(xdata_pix[0], ydata_pix[0]) xnew, ynew = self.x[self.i], self.y[self.i] if xdata == xnew and ydata == ynew: return # set the cursor and draw event.xdata = xnew event.ydata = ynew self.callbacks['show_current_info'](self.i) # select item by mouse-click only if the cursor is active and in the # main plot # if event.name == "button_press_event" and event.inaxes == self.ax and self.i is not None: # # ''' # we have the index already from the last motion notify event # stabil_plot hold the scatter plot objects # stabil_calc holds the selected modes indices # both lists must be inline # cursor decides if a modes is selected/deselected? # ''' # # # if self.i not in self.stabil_plot.stabil_calc.select_modes: # self.stabil_plot.add_mode(self.i) # # self.linev.set_visible(False) # # self.lineh.set_visible(False) # #self.background = self.ax.figure.canvas.copy_from_bbox(self.ax.figure.bbox) # #self.datalist.append(self.i) # # self.ax.hold(True) # overlay plots # # plot a circle where clicked # #self.scatter_objs.append(self.ax.scatter(self.x[self.i], self.y[ # # self.i], facecolors='none', edgecolors='red', s=200, visible=False)) # self.callbacks['mode_selected'](self.i) # # self.ax.draw_artist(self.scatter_objs[-1]) # # else: # self.stabil_plot.remove_mode(self.i) # # ind = self.datalist.index(self.i) # # self.scatter_objs[ind].remove() # # del self.scatter_objs[ind] # # self.datalist.remove(self.i) # self.callbacks['mode_deselected'](self.i) # # # self.ax.figure.canvas.restore_region(self.background) # # self.ax.figure.canvas.blit(self.ax.figure.bbox) # # self.i = None Cursor.onmove(self, event)
# for scatter in self.scatter_objs: scatter.set_visible(False) def _update(self): if self.useblit: if self.background is not None: self.canvas.restore_region(self.background) # for scatter in self.scatter_objs: # scatter.set_visible(True) # self.ax.draw_artist(scatter) # scatter.set_visible(False) self.ax.draw_artist(self.linev) self.ax.draw_artist(self.lineh) self.canvas.blit(self.ax.bbox) else: if self.horizOn or self.vertOn: # for scatter in self.scatter_objs: # scatter.set_visible(True) self.canvas.draw_idle() return False
[docs] def findIndexNearestXY(self, x_point, y_point): ''' Finds the nearest neighbour .. TODO:: currently a very inefficient brute force implementation should be replaced by e.g. a k-d-tree nearest neighbour search `https://en.wikipedia.org/wiki/K-d_tree` ''' distance = np.square( self.ypix - y_point) + np.square(self.xpix - x_point) index = np.argmin(distance) index = np.unravel_index(index, distance.shape) return index
[docs] def nearly_equal(a, b, sig_fig=5): return (a == b or int(a * 10 ** sig_fig) == int(b * 10 ** sig_fig) )
if __name__ == '__main__': pass