#
# -*- coding: utf-8 -*-
'''
pyOMA - A toolbox for Operational Modal Analysis
Copyright (C) 2015 - 2025 Simon Marwitz, Volkmar Zabel, Andrei Udrea et al.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
.. TODO::
* correct linear,.... offsets as well
* implement loading of different filetypes ascii, lvm, ...
* currently loading geometry, etc. files will overwrite existing assignments implement "load and append"
* add test to tests package
'''
import os
import csv
import datetime
import numpy as np
import scipy.signal
import matplotlib.pyplot as plt
from .Helpers import nearly_equal, simplePbar, validate_array
import logging
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
[docs]
class GeometryProcessor(object):
'''
conventions:
* chan_dofs=[(chan, node, (x_amplif,y_amplif,z_amplif)),...]
* channels = 0 ... #, starting at channel 0, should be a complete sequence
* nodes = 1 ... #, starting at node 1, can be a sequence with missing entries
* lines = [(node_start, node_end),...], unordered
* parent_childs = [(node_parent, x_parent, y_parent, z_parent,
node_child, x_child, y_child, z_child),...], unordered
.. TODO::
* change parent_child assignment to skewed coordinate
* change parent_childs to az, elev
'''
[docs]
def __init__(self, nodes={}, lines=[], parent_childs=[]):
super().__init__()
self.nodes = {}
assert isinstance(nodes, dict)
self.add_nodes(nodes)
self.lines = []
assert isinstance(lines, (list, tuple, np.ndarray))
self.add_lines(lines)
self.parent_childs = []
assert isinstance(parent_childs, (list, tuple, np.ndarray))
self.add_parent_childs(parent_childs)
[docs]
@staticmethod
def nodes_loader(filename):
'''
nodes file uses one header line
tab-separated file
node is treated as a string
x,y,z are treated as floats (in scientific format)
'''
nodes = {}
with open(filename, 'r') as f:
f.__next__()
for line1 in csv.reader(f, delimiter='\t', skipinitialspace=True):
line = []
for val in line1:
if not val:
continue
line += val.split()
if not line:
continue
if line[0].startswith('#'):
break
node, x, y, z = [float(line[i]) if i >= 1 else line[i].strip(
' ') for i in range(4)] # cut trailing empty columns
nodes[node] = [x, y, z]
return nodes
[docs]
@staticmethod
def lines_loader(filename):
'''
lines file uses one header line
tab-separated file
nodenames are treated as strings
'''
lines = []
with open(filename, 'r') as f:
f.__next__()
for line1 in csv.reader(f, delimiter='\t', skipinitialspace=True):
line = []
for val in line1:
if not val:
continue
line += val.split()
if not line:
continue
if line[0].startswith('#'):
break
node_start, node_end = \
[line[i] for i in range(2)] # cut trailing empty columns
lines.append((node_start, node_end))
return lines
[docs]
@staticmethod
def parent_childs_loader(filename):
'''
lines file uses one header line
tab-separated file
nodenames are treated as strings
amplification factors are treated as floats
'''
parent_childs = []
with open(filename, 'r') as f:
f.__next__()
reader = csv.reader(f, delimiter='\t', skipinitialspace=True)
for line1 in reader:
line = []
for val in line1:
if not val:
continue
line += val.split()
if not line:
continue
if line[0].startswith('#'):
break
i_m, x_m, y_m, z_m, i_sl, x_sl, y_sl, z_sl = [
float(line[i]) if i not in [0, 4] else line[i].strip(' ') for i in range(8)]
parent_childs.append(
(i_m, x_m, y_m, z_m, i_sl, x_sl, y_sl, z_sl))
return parent_childs
[docs]
@classmethod
def load_geometry(
cls,
nodes_file,
lines_file=None,
parent_childs_file=None):
'''
inititalizes a geometry object, to be passed along in the preprocessed data object
'''
geometry_data = cls()
nodes = geometry_data.nodes_loader(nodes_file)
geometry_data.add_nodes(nodes)
if lines_file is not None and os.path.exists(lines_file):
lines = geometry_data.lines_loader(lines_file)
geometry_data.add_lines(lines)
if parent_childs_file is not None and os.path.exists(parent_childs_file):
parent_childs = geometry_data.parent_childs_loader(
parent_childs_file)
geometry_data.add_parent_childs(parent_childs)
return geometry_data
def add_nodes(self, nodes):
for item in nodes.items():
try:
self.add_node(*item)
except BaseException:
logger.warning(
'Something was wrong while adding node {}. Continuing!'.format(item))
continue
def add_node(self, node_name, coordinate_list):
node_name = str(node_name)
if node_name in self.nodes.keys():
logger.warning('Node {} is already defined. Overwriting.'.format(node_name))
if not isinstance(coordinate_list, (list, tuple)):
raise RuntimeError(
'Coordinates must be provided as (x,y,z) tuples/lists.')
if len(coordinate_list) != 3:
raise RuntimeError(
'Coordinates must be provided as (x,y,z) tuples/lists.')
try:
node_name = str(node_name)
coordinate_list = list(coordinate_list)
for i in range(3):
coordinate_list[i] = float(coordinate_list[i])
except ValueError:
raise RuntimeError(
'Coordinate {} at position {} could not be converted to float.'.format(
coordinate_list[i], i))
except BaseException:
raise
self.nodes[node_name] = tuple(coordinate_list)
def take_node(self, node_name):
if node_name not in self.nodes:
logger.warning('Node not defined. Exiting')
return
while True: # check if any line is connected to this node
for j in range(len(self.lines)):
line = self.lines[j]
if node_name in line:
del self.lines[j]
break
else:
break
while True: # check if this node is a parent or child for another node
for j, parent_child in enumerate(self.parent_childs):
if node_name == parent_child[0] or node_name == parent_child[4]:
_ = parent_child
del self.parent_childs[j]
break
else:
break
del self.nodes[node_name]
logger.info('Node {} removed.'.format(node_name))
def add_lines(self, lines):
for line in lines:
try:
self.add_line(line)
except BaseException:
logger.warning(
'Something was wrong while adding line {}. Continuing!'.format(line))
continue
def add_line(self, line):
if not isinstance(line, (list, tuple)):
raise RuntimeError(
'Line has to be provided in format (start_node, end_node).')
if len(line) != 2:
raise RuntimeError(
'Line has to be provided in format (start_node, end_node).')
line = [str(line[0]), str(line[1])]
if line[0] not in self.nodes or line[1] not in self.nodes:
logger.warning('One of the end-nodes of line {} not defined!'.format(line))
else:
for line_ in self.lines:
if line_[0] == line[0] and line_[1] == line[1]:
logger.info('Line {} was defined, already.'.format(line))
self.lines.append(line)
def take_line(self, line=None, line_ind=None):
assert line is None or line_ind is None
if line is not None:
for line_ind in range(len(self.lines)):
line_ = self.lines[line_ind]
if line[0] == line_[0] and line[1] == line_[1]:
break
else:
logger.warning('Line {} was not found.'.format(line))
return
del self.lines[line_ind]
logger.info('Line {} at index {} removed.'.format(line, line_ind))
def add_parent_childs(self, parent_childs):
for ms in parent_childs:
try:
self.add_parent_child(ms)
except BaseException:
logger.warning(
'Something was wrong while adding parent-child-definition {}. Continuing!'.format(ms))
continue
def add_parent_child(self, ms):
if not isinstance(ms, (list, tuple)):
raise RuntimeError(
'parent child definition has to be provided in format (start_node, end_node).')
if len(ms) != 8:
raise RuntimeError(
'parent child definition has to be provided in format (parent_node, x_ampli, y_ampli, z_ampli, child_node, x_ampli, y_ampli, z_ampli).')
ms = (
str(
ms[0]), float(
ms[1]), float(
ms[2]), float(
ms[3]), str(
ms[4]), float(
ms[5]), float(
ms[6]), float(
ms[7]))
if ms[0] not in self.nodes or ms[4] not in self.nodes:
logger.warning(
'One of the nodes of parent child definition {} not defined!'.format(ms))
else:
for ms_ in self.parent_childs:
b = False
for i in range(8):
b = b and ms_[i] == ms[i]
if b:
logger.info(
'parent child definition {} was defined, already.'.format(ms))
else:
self.parent_childs.append(ms)
def take_parent_child(self, ms=None, ms_ind=None):
assert ms is None or ms_ind is None
if ms is not None:
for ms_ind in range(len(self.parent_childs)):
ms_ = self.parent_childs[ms_ind]
b = False
for i in range(8):
b = b and ms_[i] == ms[i]
if b:
break
else:
logger.warning('parent child definition {} was not found.'.format(ms))
return
del self.parent_childs[ms_ind]
logger.info('parent child definition {} at index {} removed.'.format(ms, ms_ind))
def rescale_geometry(self, factor):
pass
[docs]
class PreProcessSignals(object):
'''
A simple pre-processor for signals
* load ascii datafiles
* specify sampling rate, reference channels
* specify geometry, channel-dof-assignments
* specify channel quantities such as acceleration, velocity, etc
* remove (constant) offsets from signals
* decimate signals
* compute correlation functions and power spectral densities
* filter signals
Subsequent modules of pyOMA (SysId, Modal Analysis, Stabilization, Mode shape
visualization) rely on the variables and methods provided by
this class.
.. TODO :
* time-step integration of signals
* Multi-block Blackman-Tukey PSD
'''
[docs]
def __init__(self, signals, sampling_rate,
ref_channels=None,
accel_channels=None, velo_channels=None, disp_channels=None,
setup_name=None, channel_headers=None, start_time=None,
F=None, **kwargs):
super().__init__()
assert isinstance(signals, np.ndarray)
assert signals.shape[0] > signals.shape[1]
self.signals = np.copy(signals)
self.signals_filtered = np.copy(signals)
assert isinstance(sampling_rate, (int, float))
self.sampling_rate = sampling_rate
# added by anil
if F is not None:
assert isinstance(F, np.ndarray)
self.F = F
self._ref_channels = None
if ref_channels is None:
ref_channels = list(range(signals.shape[1]))
self.ref_channels = ref_channels
self._accel_channels = []
self._velo_channels = []
self._disp_channels = []
if disp_channels is None:
disp_channels = []
if velo_channels is None:
velo_channels = []
if accel_channels is None:
accel_channels = [c for c in range(self.num_analised_channels)
if c not in disp_channels and c not in velo_channels]
for chan in range(self.num_analised_channels):
if (chan in accel_channels) + (chan in velo_channels) + \
(chan in disp_channels) != 1:
logger.warning(f'Quantity of channel {chan} is not defined.')
self.accel_channels = accel_channels
self.velo_channels = velo_channels
self.disp_channels = disp_channels
if setup_name is None:
setup_name = ''
assert isinstance(setup_name, str)
self.setup_name = setup_name
if channel_headers is not None:
assert len(channel_headers) == self.num_analised_channels
else:
channel_headers = list(range(self.num_analised_channels))
self.channel_headers = channel_headers
if start_time is not None:
assert isinstance(start_time, datetime.datetime)
else:
start_time = datetime.datetime.now()
self.start_time = start_time
self.chan_dofs = []
self.channel_factors = [1 for _ in range(self.num_analised_channels)]
self.scaling_factors = None
self._last_meth = None
self.corr_matrix_wl = None
self.corr_matrices_wl = None
self.var_corr_wl = None
self.psd_matrix_wl = None
self.psd_matrices_wl = None
self.var_psd_wl = None
self.n_lines_wl = None
self.m_lags_wl = None
self.n_segments_wl = None
self.corr_matrix_bt = None
self.corr_matrices_bt = None
self.var_corr_bt = None
self.psd_matrix_bt = None
self.psd_matrices_bt = None
self.var_psd_bt = None
self.n_lines_bt = None
self.m_lags_bt = None
self.n_segments_bt = None
# self.s_vals_cf = None
self.s_vals_psd = None
[docs]
@classmethod
def init_from_config(
cls,
conf_file,
meas_file,
chan_dofs_file=None,
**kwargs):
'''
initializes the PreProcessor object with a configuration file
to remove channels at loading time use 'usecols' keyword argument
if delete_channels are specified, these will be checked against
all other channel definitions, which will be adjusted accordingly
'''
if not os.path.exists(conf_file):
raise RuntimeError(
'Conf File does not exist: {}'.format(conf_file))
with open(conf_file, 'r') as f:
assert f.__next__().strip('\n').strip(' ') == 'Setup Name:'
name = f. __next__().strip('\n')
assert f.__next__().strip('\n').strip(' ') == 'Sampling Rate [Hz]:'
sampling_rate = float(f. __next__().strip('\n'))
assert f.__next__().strip('\n').strip(' ') == 'Reference Channels:'
ref_channels = f.__next__().strip('\n').split(' ')
if ref_channels:
ref_channels = [int(val)
for val in ref_channels if val.isnumeric()]
assert f.__next__().strip('\n').strip(' ') == 'Delete Channels:'
delete_channels = f.__next__().strip('\n ').split(' ')
if delete_channels:
delete_channels = [
int(val) for val in delete_channels if val.isnumeric()]
assert f.__next__().strip('\n').strip(' ') == 'Accel. Channels:'
accel_channels = f.__next__().strip('\n ').split()
if accel_channels:
accel_channels = [int(val) for val in accel_channels]
assert f.__next__().strip('\n').strip(' ') == 'Velo. Channels:'
velo_channels = f.__next__().strip('\n ').split()
if velo_channels:
velo_channels = [int(val) for val in velo_channels]
assert f.__next__().strip('\n').strip(' ') == 'Disp. Channels:'
disp_channels = f.__next__().strip('\n ').split()
if disp_channels:
disp_channels = [int(val) for val in disp_channels]
loaded_signals = cls.load_measurement_file(meas_file, **kwargs)
if not isinstance(loaded_signals, np.ndarray):
# print(loaded_signals)
headers, _, start_time, sample_rate, signals = loaded_signals
else:
signals = loaded_signals
start_time = datetime.datetime.now()
sample_rate = sampling_rate
headers = ['Channel_{}'.format(i)
for i in range(signals.shape[1])]
if not sample_rate == sampling_rate:
logger.warning(
'Sampling Rate from file: {} does not correspond with specified Sampling Rate from configuration {}'.format(
sample_rate, sampling_rate))
# print(headers)
if chan_dofs_file is not None:
chan_dofs = cls.load_chan_dofs(chan_dofs_file)
else:
chan_dofs = None
if chan_dofs is not None:
for chan_dof in chan_dofs:
if len(chan_dof) == 5:
chan = chan_dof[0]
chan_name = chan_dof[4]
if len(chan_name) == 0:
continue
elif headers[chan] == 'Channel_{}'.format(chan):
headers[chan] = chan_name
elif headers[chan] != chan_name:
logger.info(
'Different headers for channel {} in signals file ({}) and in channel-DOF-assignment ({}).'.format(
chan, headers[chan], chan_name))
else:
continue
# print(delete_channels)
if delete_channels:
# delete_channels.sort(reverse=True)
_ = [
'Reference Channels',
'Accel. Channels',
'Velo. Channels',
'Disp. Channels']
_ = [
ref_channels,
accel_channels,
velo_channels,
disp_channels]
# print(chan_dofs)
num_all_channels = signals.shape[1]
# print(chan_dofs, ref_channels, accel_channels, velo_channels,disp_channels, headers)
new_chan_dofs = []
new_ref_channels = []
new_accel_channels = []
new_velo_channels = []
new_disp_channels = []
new_headers = []
new_channel = 0
for channel in range(num_all_channels):
if channel in delete_channels:
logger.info(
'Now removing Channel {} (no. {})!'.format(
headers[channel], channel))
continue
else:
for chan_dof in chan_dofs:
if chan_dof[0] == channel:
node, az, elev = chan_dof[1:4]
if len(chan_dof) == 5:
cname = chan_dof[4]
else:
cname = ''
break
else:
logger.warning('Could not find channel in chan_dofs')
continue
new_chan_dofs.append([new_channel, node, az, elev, cname])
if channel in ref_channels:
new_ref_channels.append(new_channel)
if channel in accel_channels:
new_accel_channels.append(new_channel)
if channel in velo_channels:
new_velo_channels.append(new_channel)
if channel in disp_channels:
new_disp_channels.append(new_channel)
new_headers.append(headers[channel])
new_channel += 1
signals = np.delete(signals, delete_channels, axis=1)
chan_dofs = new_chan_dofs
ref_channels = new_ref_channels
accel_channels = new_accel_channels
velo_channels = new_velo_channels
disp_channels = new_disp_channels
headers = new_headers
# print(chan_dofs, ref_channels, accel_channels, velo_channels,disp_channels, headers)
# channel = signals.shape[1]
# #num_channels = signals.shape[1]
# while channel >= 0:
#
# if channel in delete_channels:
# # affected lists: ref_channels, accel_channels, velo_channels, disp_channels + chan_dofs
# # remove channel from all lists
# # decrement all channels higher than channel in all lists
# #num_channels -= 1
# for channel_list in channel_lists:
# if channel in channel_list:
# channel_list.remove(channel)
# print('Channel {} removed from {} list'.format(channel, names[channel_lists.index(channel_list)]))
# for channel_ind in range(len(channel_list)):
# if channel_list[channel_ind] > channel:
# channel_list[channel_ind] -= 1
#
# if chan_dofs:
# this_num_channels = len(chan_dofs)
# chan_dof_ind = 0
# while chan_dof_ind < this_num_channels:
# if channel==chan_dofs[chan_dof_ind][0]:
# print('Channel-DOF-Assignment {} removed.'.format(chan_dofs[chan_dof_ind]))
# del chan_dofs[chan_dof_ind]
# this_num_channels -= 1
# elif channel < chan_dofs[chan_dof_ind][0]:
# chan_dofs[chan_dof_ind][0] -= 1
# chan_dof_ind += 1
# print('Now removing Channel {} (no. {})!'.format(headers[channel], channel))
# del headers[channel]
# channel -= 1
# #print(chan_dofs)
#
# signals=np.delete(signals, delete_channels, axis=1)
# total_time_steps = signals.shape[0]
num_channels = signals.shape[1]
# roving_channels = [i for i in range(num_channels) if i not in ref_channels]
if not accel_channels and not velo_channels and not disp_channels:
accel_channels = [i for i in range(num_channels)]
# print(signals.shape, ref_channels)
# print(signals)
prep_signals = cls(signals, sampling_rate,
ref_channels,
accel_channels, velo_channels, disp_channels,
setup_name=name, channel_headers=headers,
start_time=start_time,
**kwargs)
if chan_dofs:
prep_signals.add_chan_dofs(chan_dofs)
return prep_signals
[docs]
@staticmethod
def load_chan_dofs(fname):
'''
chan_dofs[i] = (chan_num, node_name, az, elev, chan_name)
= (int, str, float,float, str)
azimuth angle starting from x axis towards y axis
elevation defined from x-y plane up
x: 0.0, 0.0
y: 90.0, 0.0
z: 0.0, 90.0
channels not present in the file will be removed later
nodes do not have to, but should exist, as this information is
also used for merging multiple setups, which does not rely on
any "real" geometry
'''
chan_dofs = []
with open(fname, 'r') as f:
f.__next__()
for line1 in csv.reader(f, delimiter='\t', skipinitialspace=True):
line = []
for val in line1:
if not val:
continue
line += val.split()
if not line:
continue
if line[0].startswith('#'):
break
while len(line) <= 5:
line.append('')
chan_num, node, az, elev, chan_name = [
line[i].strip(' ') for i in range(5)]
chan_num, az, elev = int(
float(chan_num)), float(az), float(elev)
# print(chan_num, node, az, elev)
if node == 'None':
node = None
# print(None)
chan_dofs.append([chan_num, node, az, elev, chan_name])
return chan_dofs
[docs]
@staticmethod
def load_measurement_file(fname, **kwargs):
'''
A method for loading a signals file
Parameters
----------
fname : str
The full path of the signals file
Returns
-------
headers : list of str
The names of all channels
units : list of str
The units of all channels
start_time : datetime.datetime
The starting time of the measured signal
sample_rate : float
The sample rate, at wich the signal was acquired
signals : ndarray
Array of shape (num_timesteps, num_channels) which contains
the acquired signal
'''
raise NotImplementedError(
'This method must be provided by the user for each specific analysis task and assigned to the class before instantiating the instance.')
headers = None
units = None
start_time = None
sample_rate = None
measurement = None
return headers, units, start_time, sample_rate, measurement
[docs]
def add_chan_dofs(self, chan_dofs):
'''
chan_dofs = [ (chan_num, node_name, az, elev, chan_name) , ... ]
This function is not checking if channels or nodes actually exist
the former should be added
the latter might only be possible, if the geometry object is known to the class
'''
for chan_dof in chan_dofs:
chan_dof[0] = int(chan_dof[0])
if chan_dof[1] is not None:
chan_dof[1] = str(chan_dof[1])
chan_dof[2] = float(chan_dof[2])
chan_dof[3] = float(chan_dof[3])
if len(chan_dof) == 4:
chan_dof.append('')
self.chan_dofs.append(chan_dof)
# self.chan_dofs=chan_dofs
def take_chan_dof(self, chan, node, dof):
for j in range(len(self.chan_dofs)):
if self.chan_dofs[j][0] == chan and \
self.chan_dofs[j][1] == node and \
nearly_equal(self.chan_dofs[j][2][0], dof[0], 3) and \
nearly_equal(self.chan_dofs[j][2][1], dof[1], 3) and \
nearly_equal(self.chan_dofs[j][2][2], dof[2], 3):
del self.chan_dofs[j]
break
else:
if self.chan_dofs:
logger.warning('chandof not found')
def save_state(self, fname):
# print('fname = ', fname)
logger.info('Saving results to {}'.format(fname))
dirname, _ = os.path.split(fname)
if not os.path.isdir(dirname):
os.makedirs(dirname)
out_dict = {}
out_dict['self.signals'] = self.signals
out_dict['self.sampling_rate'] = self.sampling_rate
out_dict['self.ref_channels'] = self._ref_channels
out_dict['self.accel_channels'] = self._accel_channels
out_dict['self.velo_channels'] = self._velo_channels
out_dict['self.disp_channels'] = self._disp_channels
out_dict['self.setup_name'] = self.setup_name
out_dict['self.channel_headers'] = self.channel_headers
out_dict['self.start_time'] = self.start_time
out_dict['self.chan_dofs'] = self.chan_dofs
out_dict['self.scaling_factors'] = self.scaling_factors
out_dict['self.channel_factors'] = self.channel_factors
out_dict['self._last_meth'] = self._last_meth
out_dict['self.corr_matrix_wl'] = self.corr_matrix_wl
out_dict['self.corr_matrices_wl'] = self.corr_matrices_wl
out_dict['self.psd_matrix_wl'] = self.psd_matrix_wl
out_dict['self.psd_matrices_wl'] = self.psd_matrices_wl
out_dict['self.var_corr_wl'] = self.var_corr_wl
out_dict['self.var_psd_wl'] = self.var_psd_wl
out_dict['self.n_lines_wl'] = self.n_lines_wl
out_dict['self.m_lags_wl'] = self.m_lags_wl
out_dict['self.n_segments_wl'] = self.n_segments_wl
out_dict['self.corr_matrix_bt'] = self.corr_matrix_bt
out_dict['self.corr_matrices_bt'] = self.corr_matrices_bt
out_dict['self.psd_matrix_bt'] = self.psd_matrix_bt
out_dict['self.n_lines_bt'] = self.n_lines_bt
out_dict['self.m_lags_bt'] = self.m_lags_bt
out_dict['self.n_segments_bt'] = self.n_segments_bt
out_dict['self.var_corr_bt'] = self.var_corr_bt
np.savez_compressed(fname, **out_dict)
@classmethod
def load_state(cls, fname):
logger.info('Loading results from {}'.format(fname))
in_dict = np.load(fname, allow_pickle=True)
signals = validate_array(in_dict['self.signals'])
sampling_rate = validate_array(in_dict['self.sampling_rate'])
_ref_channels = validate_array(in_dict['self.ref_channels'])
_accel_channels = validate_array(in_dict['self.accel_channels'])
_velo_channels = validate_array(in_dict['self.velo_channels'])
_disp_channels = validate_array(in_dict['self.disp_channels'])
channel_headers = validate_array(in_dict['self.channel_headers'])
start_time = validate_array(in_dict['self.start_time'])
setup_name = validate_array(in_dict['self.setup_name'])
preprocessor = cls(signals, sampling_rate,
_ref_channels,
_accel_channels, _velo_channels, _disp_channels,
setup_name, channel_headers, start_time,
)
chan_dofs = [[int(float(chan_dof[0])), str(chan_dof[1]), float(chan_dof[2]), float(chan_dof[3]), str(
chan_dof[4] if 5 == len(chan_dof) else '')] for chan_dof in in_dict['self.chan_dofs']]
preprocessor.add_chan_dofs(chan_dofs)
try:
preprocessor.scaling_factors = validate_array(in_dict['self.scaling_factors'])
preprocessor.channel_factors = validate_array(in_dict['self.channel_factors'])
preprocessor._last_meth = validate_array(in_dict['self._last_meth'])
preprocessor.corr_matrix_wl = validate_array(in_dict['self.corr_matrix_wl'])
preprocessor.corr_matrices_wl = validate_array(in_dict.get('self.corr_matrices_wl'))
preprocessor.psd_matrix_wl = validate_array(in_dict['self.psd_matrix_wl'])
preprocessor.psd_matrices_wl = validate_array(in_dict.get('self.psd_matrices_wl'))
preprocessor.var_corr_wl = validate_array(in_dict.get('self.var_corr_wl'))
preprocessor.var_psd_wl = validate_array(in_dict['self.var_psd_wl'])
preprocessor.n_lines_wl = validate_array(in_dict['self.n_lines_wl'])
preprocessor.m_lags_wl = validate_array(in_dict.get('self.m_lags_wl'))
preprocessor.n_segments_wl = validate_array(in_dict['self.n_segments_wl'])
preprocessor.corr_matrix_bt = validate_array(in_dict['self.corr_matrix_bt'])
preprocessor.corr_matrices_bt = validate_array(in_dict.get('self.corr_matrices_bt'))
preprocessor.psd_matrix_bt = validate_array(in_dict['self.psd_matrix_bt'])
preprocessor.n_lines_bt = validate_array(in_dict['self.n_lines_bt'])
preprocessor.m_lags_bt = validate_array(in_dict.get('self.m_lags_bt'))
preprocessor.n_segments_bt = validate_array(in_dict['self.n_segments_bt'])
preprocessor.var_corr_bt = validate_array(in_dict['self.var_corr_bt'])
except KeyError as e:
# loading data saved with old version, spectral values must be recomputed
logger.warning(f'Failed to load part of the saved file at Key {e}')
return preprocessor
def validate_channels(self, channels, quant_check=False):
if quant_check:
accel_channels = self.accel_channels
velo_channels = self.velo_channels
disp_channels = self.disp_channels
for channel in channels:
# channel names
if channel < 0:
raise ValueError('A channel number cannot be negative!')
if channel > self.num_analised_channels - 1:
raise ValueError('A channel number cannot be greater'
' than the number of all channels!')
if quant_check:
if channel in accel_channels:
logger.warning(f'Channel {self.channel_headers[channel]} is already defined'
' as an acceleration channel. Removing')
accel_channels.remove(channel)
if channel in velo_channels:
logger.warning(f'Channel {self.channel_headers[channel]} is already defined'
' as a velocity channel. Removing')
velo_channels.remove(channel)
if channel in disp_channels:
logger.warning(f'Channel {self.channel_headers[channel]} is already defined'
' as a displacement channel. Removing')
disp_channels.remove(channel)
def _channel_numbers(self, channels=None, refs=None):
"""
Method to return channel numbers
Interpretation of the argument values:
* None: a list of all channel indices
* list-of-int: a validated list of given channel indices
* list-of-str: a list of channel indices for each channel name in the given order
* int: a single-item list of the given channel index
* str: a single-item list of the channel index for the given name
* 'auto' (only refs): a single item list corresponding to the respective channel
Parameters
----------
channels: None, list-of-int, list-of-str, int, str
The selected channels.
refs: 'auto', list-of-indices, optional
The reference channel indices to be contained in the reference channel list
Returns
-------
channel_numbers: list
The generated channel indices
ref_numbers: list-of-lists
The corresponding reference channels for each channel in
channel_numbers, such that it can be looped over in an inner loop.
"""
if channels is None:
channel_numbers = list(range(self.num_analised_channels))
elif isinstance(channels, int):
channel_numbers = [channels]
elif isinstance(channels, str):
try:
channel_number = int(channels)
except ValueError:
channel_number = self.channel_headers.index(channels)
channel_numbers = [channel_number]
elif isinstance(channels, (list, tuple, np.ndarray)):
channel_numbers = []
for channel in channels:
if isinstance(channel, (int, np.int32, np.int64)):
channel_numbers.append(int(channel))
elif isinstance(channel, str):
try:
channel_number = int(channel)
except ValueError:
channel_number = self.channel_headers.index(channel)
channel_numbers.append(channel_number)
else:
raise ValueError(f'Channel {channel} in channels is an invalid channel definition.')
if refs is None:
ref_channels = self.ref_channels
ref_numbers = [ref_channels for _ in channel_numbers]
elif refs == 'auto':
ref_numbers = [[ind] for ind in channel_numbers]
elif isinstance(refs, int):
ref_numbers = [[refs] for _ in channel_numbers]
elif isinstance(refs, str):
ind = self.channel_headers.index(channels)
ref_numbers = [[ind] for _ in channel_numbers]
elif isinstance(refs, (list, tuple, np.ndarray)):
custom_ref_numbers = []
for channel in refs:
if isinstance(channel, int):
custom_ref_numbers.append(channel)
elif isinstance(channel, str):
custom_ref_numbers.append(self.channel_headers.index(channel))
else:
raise ValueError(f'Channel {channel} in refs is an invalid channel definition.')
ref_numbers = [custom_ref_numbers for _ in channel_numbers]
else:
raise ValueError(f'{refs} not a valid reference channel specification.')
return channel_numbers, ref_numbers
@property
def ref_channels(self):
return self._ref_channels
@ref_channels.setter
def ref_channels(self, ref_channels):
ref_channels, _ = self._channel_numbers(ref_channels)
self.validate_channels(ref_channels)
self._clear_spectral_values()
self._ref_channels = ref_channels
@property
def accel_channels(self):
return self._accel_channels
@accel_channels.setter
def accel_channels(self, accel_channels):
accel_channels, _ = self._channel_numbers(accel_channels)
self.validate_channels(accel_channels, True)
self._accel_channels = accel_channels
@property
def velo_channels(self):
return self._velo_channels
@velo_channels.setter
def velo_channels(self, velo_channels):
velo_channels, _ = self._channel_numbers(velo_channels)
self.validate_channels(velo_channels, True)
self._velo_channels = velo_channels
@property
def disp_channels(self):
return self._disp_channels
@disp_channels.setter
def disp_channels(self, disp_channels):
disp_channels, _ = self._channel_numbers(disp_channels)
self.validate_channels(disp_channels, True)
self._disp_channels = disp_channels
@property
def num_ref_channels(self):
return len(self.ref_channels)
@property
def num_analised_channels(self):
return self.signals.shape[1]
@property
def total_time_steps(self):
return self.signals.shape[0]
@property
def duration(self):
return self.total_time_steps / self.sampling_rate
@property
def dt(self):
return 1 / self.sampling_rate
@property
def t(self):
N = self.total_time_steps
fs = self.sampling_rate
# t[-1] != self.duration to ensure sample_spacing == self.dt
return np.linspace(0, N / fs, N, False)
@property
def n_lines(self):
if self._last_meth == 'welch':
return self.n_lines_wl
elif self._last_meth == 'blackman-tukey':
return self.n_lines_bt
else:
return None
@property
def freqs(self):
'''
Returns
----------
freqs: np.ndarray (n_lines, )
Array with the frequency lines corresponding to the spectral values
'''
if self.n_lines:
n_lines = self.n_lines
fs = self.sampling_rate
return np.fft.rfftfreq(n_lines, 1 / fs)
@property
def freqs_wl(self):
'''
Returns
----------
freqs: np.ndarray (n_lines, )
Array with the frequency lines corresponding to the spectral values
'''
if self.m_lags_wl:
n_lines = self.n_lines_wl
fs = self.sampling_rate
return np.fft.rfftfreq(n_lines, 1 / fs)
@property
def freqs_bt(self):
'''
Returns
----------
freqs: np.ndarray (n_lines, )
Array with the frequency lines corresponding to the spectral values
'''
if self.n_lines_bt:
n_lines = self.n_lines_bt
fs = self.sampling_rate
return np.fft.rfftfreq(n_lines, 1 / fs)
@property
def lags(self):
if self.m_lags:
m_lags = self.m_lags
fs = self.sampling_rate
return np.linspace(0, m_lags / fs, m_lags, False)
@property
def lags_wl(self):
if self.m_lags_wl:
m_lags = self.m_lags_wl
fs = self.sampling_rate
return np.linspace(0, m_lags / fs, m_lags, False)
@property
def lags_bt(self):
if self.m_lags_bt:
m_lags = self.m_lags_bt
fs = self.sampling_rate
return np.linspace(0, m_lags / fs, m_lags, False)
@property
def m_lags(self):
if self._last_meth == 'welch':
return self.m_lags_wl
elif self._last_meth == 'blackman-tukey':
return self.m_lags_bt
else:
return None
@property
def n_segments(self):
if self._last_meth == 'welch':
return self.n_segments_wl
elif self._last_meth == 'blackman-tukey':
return self.n_segments_bt
else:
return None
# @property
# def m_lags_wl(self):
# if self.n_lines_wl:
# return self.n_lines_wl // 2 + 1
#
# @property
# def m_lags_bt(self):
# if self.n_lines_bt:
# return self.n_lines_bt // 2 + 1
@property
def corr_matrix(self):
if self._last_meth == 'welch':
return self.corr_matrix_wl
elif self._last_meth == 'blackman-tukey':
return self.corr_matrix_bt
else:
return None
@property
def corr_matrices(self):
if self._last_meth == 'welch':
return self.corr_matrices_wl
elif self._last_meth == 'blackman-tukey':
return self.corr_matrices_bt
else:
return None
@property
def psd_matrix(self):
if self._last_meth == 'welch':
return self.psd_matrix_wl
elif self._last_meth == 'blackman-tukey':
return self.psd_matrix_bt
else:
return None
@property
def signal_power(self):
if not np.all(np.isclose(np.mean(self.signals, axis=0), 0)):
logger.warning("Signal has constant offsets. Power values may be errorneous")
return np.mean(np.square(self.signals), axis=0)
@property
def signal_rms(self):
return np.sqrt(self.signal_power)
def add_noise(self, amplitude=0, snr=0):
logger.info(
'Adding Noise with Amplitude {} and {} percent RMS'.format(
amplitude,
snr *
100))
assert amplitude != 0 or snr != 0
if snr != 0 and amplitude == 0:
rms = self.signal_rms
amplitude = rms * snr
else:
amplitude = [
amplitude for channel in range(
self.num_analised_channels)]
for channel in range(self.num_analised_channels):
self.signals[:, channel] += np.random.normal(0, amplitude[channel], self.total_time_steps)
self._clear_spectral_values()
[docs]
def correct_offset(self):
'''
corrects a constant offset from measured signals
..TODO::
* remove linear, ... ofsets as well
'''
logger.info('Correcting offset of measured signals')
self.signals -= self.signals.mean(axis=0)
self._clear_spectral_values()
return
def precondition_signals(self, method='iqr'):
assert method in ['iqr', 'range']
self.correct_offset()
for i in range(self.signals.shape[1]):
tmp = self.signals[:, i]
if method == 'iqr':
factor = np.subtract(*np.percentile(tmp, [95, 5]))
elif method == 'range':
factor = np.max(tmp) - np.min(tmp)
self.signals[:, i] /= factor
self.channel_factors[i] = factor
self._clear_spectral_values()
def filter_signals(self, lowpass=None, highpass=None,
overwrite=True,
order=None, ftype='butter', RpRs=[3, 3],
plot_ax=None):
logger.info('Filtering signals in the band: {} .. {} with a {} order {} filter.'.format(highpass, lowpass, order, ftype))
if (highpass is None) and (lowpass is None):
raise ValueError('Neither a lowpass or a highpass corner frequency was provided.')
ftype_list = ['butter', 'cheby1', 'cheby2', 'ellip', 'bessel', 'moving_average', 'brickwall']
if not (ftype in ftype_list):
raise ValueError(f'Filter type {ftype} is not any of the available types: {ftype_list}')
if order is None:
if ftype_list.index(ftype) < 5:
# default FIR filter order
order = 4
else:
# default IIR filter order
order = 21
if order < 1:
raise ValueError('Order must be greater equal 1')
nyq = self.sampling_rate / 2
freqs = []
if lowpass is not None:
freqs.append(float(lowpass))
btype = 'lowpass'
if highpass is not None:
freqs.append(float(highpass))
btype = 'highpass'
if len(freqs) == 2:
btype = 'bandpass'
freqs.sort()
freqs[:] = [x / nyq for x in freqs]
measurement = self.signals
if ftype in ftype_list[0:5]: # IIR filter
# if order % 2: # odd number
# logger.warning(f'Odd filter order {order} will be rounded up to {order+1}, because of forward-backward filtering.')
# order = int(np.ceil(order / 2)) # reduce by factor 2 because of double filtering
order = int(order)
sos = scipy.signal.iirfilter(
order, freqs, rp=RpRs[0], rs=RpRs[1],
btype=btype, ftype=ftype, output='sos')
signals_filtered = scipy.signal.sosfiltfilt(
sos, measurement, axis=0)
if self.F is not None:
self.F_filt = scipy.signal.sosfiltfilt(sos, self.F, axis=0)
elif ftype in ftype_list[5:7]: # FIR filter
if ftype == 'brickwall':
fir_irf = scipy.signal.firwin(numtaps=order, cutoff=freqs, pass_zero=btype, fs=np.pi)
elif ftype == 'moving_average':
if freqs:
logger.warning('For the moving average filter, no cutoff frequencies can be defined.')
fir_irf = np.ones((order)) / order
signals_filtered = scipy.signal.lfilter(fir_irf, [1.0], measurement, axis=0)
if self.F is not None:
self.F_filt = scipy.signal.lfilter(fir_irf, [1.0], self.F, axis=0)
if np.isnan(signals_filtered).any():
logger.warning('Your filtered signals contain NaNs. Check your filter settings! Continuing...')
if plot_ax is not None:
N = 2048
dt = 1 / self.sampling_rate
if isinstance(plot_ax, (list, np.ndarray)):
freq_ax = plot_ax[1]
tim_ax = plot_ax[0]
else:
freq_ax = plot_ax
tim_ax = None
if ftype in ftype_list[0:5]: # IIR Filter
w, h = scipy.signal.sosfreqz(sos, worN=np.fft.rfftfreq(N) * 2 * np.pi)
# convert to decibels
# the square comes from double filtering and has nothing to do with rms or such
# db factor 20 due to Root-Mean-Square not Mean-Square-Spectrum quantity
frf = 20 * np.log10(abs(h) ** 2)
freq_ax.plot((nyq / np.pi) * w, frf, color='lightgrey', ls='dashed')
if tim_ax is not None:
irf = np.fft.irfft(h, n=10 * N)
logger.debug(f'IRF Integral {np.sum(irf)*dt}')
dur = N * dt
t = np.linspace(0, dur - dt, 10 * N)
# b, a = scipy.signal.sos2tf(sos)
# tout, yout = scipy.signal.dimpulse((b, a, dt), n=N)
# tim_ax.plot(tout, np.squeeze(yout))
tim_ax.plot(t, irf, color='lightgrey')
else: # FIR Filter
dt = 1 / self.sampling_rate
dur = order * dt
# zero-pad the FRF to achieve spectral-interpolated IRF
frf = np.fft.fft(fir_irf)
if order % 2:
# if numtaps is odd, the maximum frequency is present additionally to the minimum,
# which is just a conjugate in the case of real signals
neg = frf[order // 2 + 1:order]
pos = frf[:order // 2 + 1]
else:
# if numtaps is even, only the mimimum frequency is present
pos = frf[:order // 2]
neg = frf[order // 2:order]
# mirror the conjugate of the minimum frequency to the maximum frequency to ensure symmetry of the spectrum
pos = np.hstack([pos, np.conj(neg[0:1])])
frf_pad = np.hstack([pos, np.zeros((N - order // 2 * 2 - 1,), dtype=complex), neg])
irf_fine = np.fft.ifft(frf_pad)
# ensure imaginary part of interpolated IRF is zero
assert np.max(irf_fine.imag) <= np.finfo(np.float64).eps
irf_fine = irf_fine.real
dt_new = dur / N
irf_fine /= dt_new / dt
logger.debug(f'IRF Integral {np.sum(fir_irf) * dt}, {np.sum(irf_fine) * dt_new}')
# zero-pad the IRF to achieve high-resolution FRF
irf_pad = np.zeros((N,))
irf_pad[:order] = fir_irf
frf_fine = np.fft.fft(irf_pad)
# convert to decibels
frf_fine = 20 * np.log10(abs(frf_fine))
# plot FRF and IRF
freq_ax.plot(np.fft.fftshift(np.fft.fftfreq(N, dt)),
np.fft.fftshift(frf_fine), color='lightgrey', ls='dashed')
if tim_ax is not None:
t = np.linspace(-dur / 2, dur / 2 - dt_new, N)
tim_ax.plot(t, irf_fine, color='lightgrey',)
if overwrite:
self.signals = signals_filtered
if self.F is not None:
self.F = self.F_filt
self.signals_filtered = signals_filtered
self._clear_spectral_values()
return signals_filtered
[docs]
def decimate_signals(self, decimate_factor, nyq_rat=2.5,
highpass=None, order=None, filter_type='cheby1'):
'''
decimates signals data
filter type and order are choosable (order 8 and type cheby1 are standard for scipy signal.decimate function)
maximum ripple in the passband (rp) and minimum attenuation in the stop band (rs) are modifiable
'''
if highpass:
logger.info(f'Decimating signals by factor {decimate_factor}'
f' and additional highpass filtering at {highpass}'
f' to a sampling rate of {self.sampling_rate/decimate_factor} Hz')
else:
logger.info(f'Decimating signals by factor {decimate_factor}'
f' to a sampling rate of {self.sampling_rate/decimate_factor} Hz')
# input validation
decimate_factor = abs(decimate_factor)
assert isinstance(decimate_factor, int)
assert decimate_factor >= 1
assert nyq_rat >= 2.0
if order is None:
if filter_type in ['brickwall', 'moving_average']:
order = 21 * decimate_factor - 1 # make it odd to avoid errors when highpass filtering
else:
order = 8
else:
order = abs(order)
assert isinstance(order, int)
assert order > 1
RpRs = [None, None]
if filter_type == 'cheby1' or filter_type == 'cheby2' or filter_type == 'ellip':
RpRs = [0.05, 0.05] # standard for signal.decimate
nyq = self.sampling_rate / decimate_factor
sig_filtered = self.filter_signals(
lowpass=nyq / nyq_rat,
highpass=highpass,
overwrite=False,
order=order,
ftype=filter_type,
RpRs=RpRs,)
self.sampling_rate /= decimate_factor
N_dec = int(np.floor(self.total_time_steps / decimate_factor))
# ceil would also work, but breaks indexing for aliasing noise estimation
# with floor though, care must be taken to shorten the time domain signal to N_dec full blocks before slicing
# decimate signal
sig_decimated = np.copy(sig_filtered[0:N_dec * decimate_factor:decimate_factor,:])
# correct for power loss due to decimation
# https://en.wikipedia.org/wiki/Downsampling_(signal_processing)#Anti-aliasing_filter
sig_decimated *= decimate_factor
if self.F is not None:
F_decimated = self.F_filt[slice(None, None, decimate_factor)]
self.F = F_decimated
# self.total_time_steps = sig_decimated.shape[0]
self.signals = sig_decimated
self._clear_spectral_values()
def _clear_spectral_values(self):
"""
Convenience method to clear all previously computed spectral values.
To be called when any modifications, such as filtering, decimation,
etc. are applied to the signals.
"""
self.scaling_factors = None
self._last_meth = None
self.psd_matrix_bt = None
self.psd_matrix_wl = None
self.n_lines_wl = None
self.n_lines_bt = None
self.n_segments_bt = None
self.n_segments_wl = None
self.corr_matrix_bt = None
self.corr_matrix_wl = None
self.var_corr_bt = None
self.var_psd_wl = None
[docs]
def psd_welch(self, n_lines=None, n_segments=None, refs_only=True, window='hamming', **kwargs):
'''
Estimate the (cross- and auto-) power spectral densities (PSD),
according to Welch's method. No overlapping is allowed (deliberately
to ensure statistical independence of blocks for variance estimation).
Segments are n_lines // 2 long and zero padded to n_lines to allow
estimation of the full correlation sequence, which is twice as long
as the input signal. Normalization is applied w.r.t. conservation of
energy, i.e. magnitudes will change with n_lines but power stays
constant.
Parameters
----------
n_lines: integer, optional
Number of frequency lines (positive + negative)
n_segments: integer, optional
Number of segments to perform averaging over
resulting segment length must be smaller or equal n_lines
refs_only: bool, optional
Compute cross-PDSs only with reference channels
window: str or tuple or array_like, optional
Desired window to use. See scipy.signal.get_window() for more information
Other Parameters
----------------
kwargs :
Additional kwargs are passed to scipy.signals.csd
Returns
-------
psd_matrix: np.ndarray
Array of shape (num_channels, num_ref_channels, n_lines // 2 + 1)
containing the power density values of the respective
channels and frequencies
'''
N = self.total_time_steps
if n_lines is not None:
if not isinstance(n_lines, int):
raise ValueError(f"{n_lines} is not a valid number of n_lines for a spectral densities")
if n_lines % 2:
n_lines += 1
logger.warning(f"Only even number of frequency lines are supported setting n_lines={n_lines}")
if n_lines > 2 * N:
logger.warning(f'Number of frequency lines {n_lines} should not'
f'be larger than twice the number of timesteps {self.total_time_steps}')
if n_segments is not None:
if not isinstance(n_segments, int):
raise ValueError(f"{n_segments} is not a valid number of segments")
self._last_meth = 'welch'
# catch function call cases 1, ..., 4
# 1: no arguments: possibly cached results
if n_lines is None and n_segments is None:
n_lines = self.n_lines_wl
n_segments = self.n_segments_wl
if n_lines is None and n_segments is None:
raise RuntimeError('Either n_lines or n_segments must be provided on first run.')
# 2: no variance of spectra requested
if n_segments is None and n_lines is not None:
# it increases variance and does not improve the result in any other sense
# when using less than the maximally possible number of segments
N_segment = n_lines
_n_segments = N // N_segment
# 3. variance of spectra requested, n_lines not of interest (when called from corr_welch)
elif n_segments is not None and n_lines is None:
_n_segments = n_segments
N_segment = N // n_segments
n_lines = N_segment
# 4. variance of spectra with given n_lines requested
else:
_n_segments = n_segments
N_segment = min(N // _n_segments, n_lines)
if n_lines % 2: # repeat the check from above
n_lines += 1
if N_segment > n_lines:
# make sure scipy.signal.psd does not create additional segments or discard part of the signal by passing exactly one segment
raise ValueError(f"The segment length {N_segment} must not be larger than the number of frequency lines {n_lines}")
if N_segment < n_lines / 2:
logger.warning(f"The segment length {N_segment} is much smaller than the number of frequency lines {n_lines} (zero-padded)")
while True:
# check, if it is possible to simply return previously computed PSD
if kwargs:
logger.debug(f"Not returning because: kwargs provided")
break
if self.psd_matrix_wl is None:
logger.debug(f"Not returning because: self.psd_matrix_wl not available")
break
if self.n_lines_wl != n_lines:
logger.debug(f"Not returning because: n_lines differs from previous")
break
if n_segments is not None and self.psd_matrices_wl.shape[0] != n_segments:
logger.debug(f"Not returning because: n_segments differs from previous")
break
if (self.psd_matrix_wl.shape[1] == self.num_ref_channels) != refs_only:
logger.debug(f"Not returning because: non-/reference-based not matching previous")
break
logger.debug(f"Returning PSD by Welch's method with {n_lines}"
f' frequency lines, {_n_segments} non-overlapping'
f' segments and a {window} window...')
return self.psd_matrix_wl
logger.info(f"Estimating PSD by Welch's method with {n_lines}"
f' frequency lines, {_n_segments} non-overlapping'
f' segments and a {window} window...')
fs = self.sampling_rate
num_analised_channels = self.num_analised_channels
if refs_only:
num_ref_channels = self.num_ref_channels
ref_channels = self.ref_channels
else:
num_ref_channels = num_analised_channels
ref_channels = list(range(num_ref_channels))
signals = self.signals
psd_matrix_shape = (num_analised_channels,
num_ref_channels,
n_lines // 2 + 1)
psd_matrices = []
win = scipy.signal.get_window(window, N_segment, fftbins=True)
pbar = simplePbar(_n_segments * num_analised_channels * num_ref_channels)
if True:
for i_seg in range(_n_segments):
this_psd_matrix = np.empty(psd_matrix_shape, dtype=complex)
this_signals_block = signals[i_seg * N_segment:(i_seg + 1) * N_segment,:]
for channel_1 in range(num_analised_channels):
for channel_2, ref_channel in enumerate(ref_channels):
next(pbar)
# compute spectrum according to welch, with automatic application of a window and scaling
# spectrum scaling compensates windowing by dividing by window(n_lines).sum()**2
# density scaling divides by fs * window(n_lines)**2.sum()
_, Pxy_den = scipy.signal.csd(this_signals_block[:, channel_1],
this_signals_block[:, ref_channel],
fs,
window=win,
nperseg=N_segment,
nfft=n_lines,
# nfft!=N_Segments, as more data might be used for input than for FFT
noverlap=0,
return_onesided=True,
scaling='density',
**kwargs)
if channel_1 == ref_channel:
assert np.isclose(Pxy_den.imag, 0).all()
Pxy_den.imag = 0
# compensate averaging over segments (for power equivalence segments should be summed up)
Pxy_den *= _n_segments
# reverse 1/Hz of scaling="density"
Pxy_den *= fs
# compensate onesided
Pxy_den /= 2
# compensate zero-padding
Pxy_den /= 2
# compensate energy loss through short segments
Pxy_den *= n_lines
this_psd_matrix[channel_1, channel_2,:] = Pxy_den
psd_matrices.append(this_psd_matrix)
psd_matrix = np.mean(psd_matrices, axis=0)
self.psd_matrices_wl = np.stack(psd_matrices, axis=0)
self.var_psd_wl = np.var(psd_matrices, axis=0)
else:
psd_matrix = np.empty(psd_matrix_shape, dtype=complex)
for channel_1 in range(num_analised_channels):
for channel_2, ref_channel in enumerate(ref_channels):
next(pbar)
# compute spectrum according to welch, with automatic application of a window and scaling
# specrum scaling compensates windowing by dividing by window(n_lines).sum()**2
# density scaling divides by fs * window(n_lines)**2.sum()
_, Pxy_den = scipy.signal.csd(signals[:, channel_1],
signals[:, ref_channel],
fs,
window=win,
nperseg=n_lines // 2,
nfft=n_lines,
noverlap=0,
return_onesided=True,
scaling='density',
**kwargs)
if channel_1 == ref_channel:
assert np.isclose(Pxy_den.imag, 0).all()
Pxy_den.imag = 0
# compensate averaging over segments (for power equivalence segments should be summed up)
Pxy_den *= n_segments
# reverse 1/Hz of scaling="density"
Pxy_den *= fs
# compensate onesided
Pxy_den /= 2
# compensate zero-padding
Pxy_den /= 2
# compensate energy loss through short segments
Pxy_den *= n_lines
psd_matrix[channel_1, channel_2,:] = Pxy_den
if self.scaling_factors is None:
# obtain the scaling factors for the PSD which remain,
# even after filtering or any DSP other operation
self.scaling_factors = psd_matrix.max(axis=2)
# logger.debug(f'PSD Auto-/Cross-Powers: {np.mean(np.abs(psd_matrix), axis=2)}')
self.psd_matrix_wl = psd_matrix
self.n_lines_wl = n_lines
self.n_segments_wl = n_segments
self.m_lags_wl = None
self.corr_matrix_wl = None
self.corr_matrices_wl = None
self.var_corr_wl = None
self.s_vals_psd = None
return psd_matrix
[docs]
def corr_welch(self, m_lags=None, n_segments=None, refs_only=True, **kwargs):
'''
Estimate the (cross- and auto-) correlation functions (C/ACF),
by the inverse Fourier Transform of Power Spectral Densities,
estimated according to Welch's method. Bias due to windowing of
the underlying PSD persists. Normalization is done according to
the unbiased estimator, i.e. 0-lag correlation value must be
multiplied by n_lines to get the signals cross-power.
Note that:
m_lags \= n_lines // 2 + 1
n_lines \= (m_lags - 1) * 2
N_segment = N // n_segments
Parameters
----------
m_lags: integer, optional
Total number of lags (positive). Note: this includes the
0-lag, therefore excludes the m_lags-lag.
n_segments: integer, optional
Number of segments to perform averaging over
resulting segment length must be smaller or equal n_lines
refs_only: bool, optional
Compute cross-ACFss only with reference channels
Other Parameters
----------------
kwargs :
Additional kwargs are passed to self.psd_welch and further
Returns
-------
corr_matrix: np.ndarray
Array of shape (num_channels, num_ref_channels, m_lags)
containing the correlation values of the respective
channels and lags
See also
--------
psd_welch:
PSD estimation algorithm used by this method.
.. TODO ::
* deconvolve window (if possible)
'''
self._last_meth = 'welch'
if m_lags is not None:
if not isinstance(m_lags, int):
raise ValueError(f"{m_lags} is not a valid number of lags for a correlation sequence")
if n_segments is not None:
if not isinstance(n_segments, int):
raise ValueError(f"{n_segments} is not a valid number of segments")
N = self.total_time_steps
# catch function call cases 1, ..., 4
# variable _n_segments is derived from all cases and solely passed to psd_welch
# 1: no arguments: possibly cached results
if m_lags is None and n_segments is None:
if self.m_lags_wl is not None:
m_lags = self.m_lags_wl
elif self.n_lines_wl is not None:
m_lags = self.n_lines_wl // 2 + 1
n_segments = self.n_segments_wl
if m_lags is None and n_segments is None:
raise RuntimeError('Either m_lags or n_segments must be provided on first run.')
# 2: no variance of correlations requested
if n_segments is None and m_lags is not None:
N_segment = (m_lags - 1) * 2
_n_segments = N // N_segment
# let psd_welch use the best number of frequency lines
_n_lines = None
# 3. variance of correlations requested, lags not of interest (possibly rare case)
elif n_segments is not None and m_lags is None:
_n_segments = n_segments
m_lags = N // n_segments // 2 + 1
# recalculate N_segment, due to floor operator in m_lags computation
N_segment = min(N // n_segments, (m_lags - 1) * 2)
# let psd_welch use the best number of frequency lines
_n_lines = None
# 4. variance of correlations with given lag requested
else:
_n_segments = n_segments
# Segments might have to be zero-padded in psd_welch to reach the desired lag length
_n_lines = (m_lags - 1) * 2
N_segment = min(N // n_segments, _n_lines)
if N_segment > (m_lags - 1) * 2:
raise ValueError(f"The segment length {N_segment} must not be larger than the number of frequency lines {(m_lags - 1) * 2}")
while True:
# check, if it is possible to simply return previously computed C/ACF
if kwargs:
logger.debug(f"Not returning because: kwargs provided")
break
if self.corr_matrix_wl is None:
logger.debug(f"Not returning because: self.corr_matrix_wl not available")
break
if self.m_lags_wl < m_lags:
# if self.corr_matrix_wl.shape[2] != m_lags:
logger.debug(f"Not returning because: m_lags differs from previous")
break
if n_segments is not None and self.n_segments_wl != n_segments:
logger.debug(f"Not returning because: n_segments differs from previous")
break
if (self.corr_matrix_wl.shape[1] == self.num_ref_channels) != refs_only:
logger.debug(f"Not returning because: non-/reference-based not matching previous")
break
logger.debug("Returning Correlation Function by Welch's method with"
f" {m_lags} time lags and {self.n_segments_wl} non-overlapping"
f" segments.")
return self.corr_matrix_wl[...,:m_lags]
#
# onesided, i.e. RFFT suffices for real inputs f and g
# correlation functions are also real, so IRFFT should suffice
self.psd_welch(n_lines=_n_lines, n_segments=_n_segments, refs_only=refs_only, **kwargs)
logger.info("Estimating Correlation Function by Welch's method with"
f" {m_lags} time lags and {_n_segments} non-overlapping"
f" segments.")
# get computed blocks of psd_matrices
if n_segments is None or n_segments == 1:
psd_matrices = self.psd_matrix_wl[np.newaxis, ...]
n_segments = 1
else:
psd_matrices = self.psd_matrices_wl
num_analised_channels = self.num_analised_channels
if refs_only:
num_ref_channels = self.num_ref_channels
else:
num_ref_channels = num_analised_channels
corr_matrix_shape = (num_analised_channels, num_ref_channels, m_lags)
corr_matrices = []
pbar = simplePbar(n_segments * num_analised_channels * num_ref_channels)
# user requested variances: use n_segments instead of computed _n_segments
for i_segment in range(n_segments):
this_corr_matrix = np.empty(corr_matrix_shape)
this_psd_matrix = psd_matrices[i_segment, ...]
for channel_1 in range(num_analised_channels):
for channel_2 in range(num_ref_channels):
next(pbar)
this_psd = this_psd_matrix[channel_1, channel_2,:]
this_corr = np.fft.irfft(this_psd)
assert np.all(np.isclose(this_corr.imag, 0))
# cut-off at m_lags and use only the real part (should be real)
this_corr = this_corr[:m_lags].real
# divide by n_lines [equivalence of r(0) and Var(y)]
this_corr /= (m_lags - 1) * 2
this_corr_matrix[channel_1, channel_2,:] = this_corr
corr_matrices.append(this_corr_matrix)
corr_matrix = np.mean(corr_matrices, axis=0)
# logger.debug(f'0-lag Auto-/Cross-Correlations: {np.abs(corr_matrix[:, :, 0]) * (m_lags - 1) * 2}')
self.corr_matrix_wl = corr_matrix
self.corr_matrices_wl = np.stack(corr_matrices, axis=0)
self.var_corr_wl = np.var(corr_matrices, axis=0)
self.m_lags_wl = m_lags
return corr_matrix
[docs]
def corr_blackman_tukey(self, m_lags, n_segments=None, refs_only=True, **kwargs):
'''
Estimate the (cross- and auto-) correlation functions (C/ACF),
by direct computation of the standard un-biased estimator:
.. math::
\\hat{R}_{fg}[m] = \\frac{1}{N - m}\\sum_{n=0}^{N - m - 1} f[n] g[n + m]
Computes correlation functions of all channels with selected reference
channels up to, but excluding, a time lag of m_lags. Normalization
is done according to the unbiased estimator, i.e. 0-lag correlation
value must be multiplied by n_lines to get the signals cross-power.
Variance estimation for each time lag is performed by dividing
the signals into n_segments non-overlapping blocks for individual
estimation of correlation functions. With increasing numbers of
non-overlapping blocks the confidence intervals of the correlation
functions increase ("worsen"), especially at higher lags and
short block lengths, due to a larger number of time steps being
discarded.
Note that:
m_lags \= n_lines // 2 + 1
n_lines \= (m_lags - 1) * 2
Parameters
----------
m_lags: integer, optional
Number of lags (positive). Note: this includes the
0-lag, therefore excludes the "m_lags"-lag.
n_segments: integer, optional
Number of blocks to perform averaging over. If blocks
are shorter than m_lags it raises a ValueError.
refs_only: bool, optional
Compute cross-ACFss only with reference channels
Other Parameters
----------------
kwargs :
Additional kwargs are currently not used
Returns
-------
corr_matrix: np.ndarray
Array of shape (num_channels, num_ref_channels, m_lags)
containing the correlation values of the respective
channels and lags
See also
--------
corr_welch:
Correlation function estimation by Welch's method, possibly
faster, but distorted for short segments and biased through
windowing.
'''
self._last_meth = 'blackman-tukey'
if m_lags is not None:
if not isinstance(m_lags, int):
raise ValueError(f"{m_lags} is not a valid number of lags for a correlation sequence")
if n_segments is not None:
if not isinstance(n_segments, int):
raise ValueError(f"{n_segments} is not a valid number of blocks")
N = self.total_time_steps
# catch function call cases 1, ..., 4
# variable _n_segments is derived from all cases and solely passed to psd_welch
# 1: no arguments: possibly cached results
if m_lags is None and n_segments is None:
m_lags = self.m_lags_bt
n_segments = self.n_segments_bt
if m_lags is None and n_segments is None:
raise RuntimeError('Either m_lags or n_segments must be provided on first run.')
# 2: no variance of correlations requested, or using previous n_segments
if n_segments is None and m_lags is not None:
if self.n_segments_bt is None:
N_block = N
n_segments = 1
else:
# m_lags provided programmatically (through e.g. SSICovRef), but n_segments not
# still want to return previous
n_segments = self.n_segments_bt
N_block = N // n_segments
if N_block < m_lags:
n_segments = 1
N_block = N
# 3. variance of correlations requested, lags not of interest (possibly rare case)
elif n_segments is not None and m_lags is None:
# increasing block length decreases variance (for non-overlapping blocks)
# use the maximum possible block length
m_lags = N // n_segments
N_block = m_lags
# 4. variance of correlations with given lag requested
else:
N_block = N // n_segments
if N_block < m_lags:
raise ValueError(f"The segment length {N_block} must not be shorther than the number of lags {m_lags}")
while True:
# check, if it is possible to simply return previously computed C/ACF
if kwargs:
logger.debug(f"Not returning because: kwargs provided")
break
if self.corr_matrix_bt is None:
logger.debug(f"Not returning because: self.corr_matrix_bt not available")
break
if self.m_lags_bt < m_lags:
logger.debug(f"Not returning because: m_lags differs from previous")
break
if n_segments is not None and self.n_segments_bt != n_segments:
logger.debug(f"Not returning because: n_segments differs from previous")
break
if (self.corr_matrix_bt.shape[1] == self.num_ref_channels) != refs_only:
logger.debug(f"Not returning because: non-/reference-based not matching previous")
break
logger.debug("Using previously computed Correlation Functions (BT)...")
return self.corr_matrix_bt[...,:m_lags]
logger.info(f'Estimating Correlation Functions (BT) with m_lags='
f'{m_lags} and n_segments={n_segments}...')
num_analised_channels = self.num_analised_channels
if refs_only:
num_ref_channels = self.num_ref_channels
ref_channels = self.ref_channels
else:
num_ref_channels = num_analised_channels
ref_channels = list(range(num_ref_channels))
signals = self.signals
corr_matrix_shape = (num_analised_channels, num_ref_channels, m_lags)
corr_matrices = []
pbar = simplePbar(m_lags * n_segments)
for block in range(n_segments):
this_corr_matrix = np.empty(corr_matrix_shape)
this_signals_block = signals[block * N_block:(block + 1) * N_block,:]
for lag in range(m_lags):
next(pbar)
# theoretically (unbounded, continuous): conj(R_fg) = R_gf
# for f and g being reference channels, additional
# performance improvements may be implemented
# currently, both are computed individually
y_r = this_signals_block[:N_block - lag, ref_channels]
y_a = this_signals_block[lag:,:]
# standard un-biased estimator (revert rectangular window)
this_block = (y_a.T @ y_r) / (N_block - lag)
this_corr_matrix[:,:, lag] = this_block
corr_matrices.append(this_corr_matrix)
corr_matrix = np.mean(corr_matrices, axis=0)
assert np.all(corr_matrix.shape == corr_matrix_shape)
self.corr_matrix_bt = corr_matrix
self.corr_matrices_bt = np.stack(corr_matrices, axis=0)
self.var_corr_bt = np.var(corr_matrices, axis=0)
self.m_lags_bt = m_lags
self.n_segments_bt = n_segments
self.psd_matrix_bt = None
self.s_vals_psd = None
return corr_matrix
[docs]
def psd_blackman_tukey(self, n_lines=None, refs_only=True, window='hamming', **kwargs):
'''
Estimate the (cross- and auto-) power spectral densities (PSD),
by Fourier Transform of correlation functions estimated
according to Blackman-Tukey's method. Non-negativeness of the
PSD is ensured by using a lag window, i.e. convolving the temporal
window with itself. Normalization is applied w.r.t. conservation of
energy, i.e. magnitudes will change with n_lines but power stays
constant.
Note that:
m_lags = n_lines // 2 + 1
n_lines = (m_lags - 1) * 2
Parameters
----------
n_lines: integer, optional
Number of frequency lines (positive + negative)
refs_only: bool, optional
Compute cross-PDSs only with reference channels
window: str or tuple or array_like, optional
Desired temporal window to be applied to the correlation
sequence after conversion to a lag window by "self-convolution"
See scipy.signal.get_window() for more information
Other Parameters
----------------
kwargs :
Additional kwargs are passed to self.corr_blackman_tukey
Returns
-------
psd_matrix: np.ndarray
Array of shape (num_channels, num_ref_channels, n_lines // 2 + 1)
containing the power density values of the respective
channels and frequencies
'''
logger.debug(f'Arguments psd_blackman_tukey: n_lines={n_lines}, refs_only={refs_only}, window={window}, {kwargs}')
self._last_meth = 'blackman-tukey'
N = self.total_time_steps
if n_lines is not None:
if not isinstance(n_lines, int):
raise ValueError(f"{n_lines} is not a valid number of n_lines for a spectral densities")
if n_lines % 2:
n_lines += 1
logger.warning(f"Only even number of frequency lines are supported setting n_lines={n_lines}")
if n_lines > 2 * N:
logger.warning(f'Number of frequency lines {n_lines} should not'
f'be larger than twice the number of timesteps {self.total_time_steps}')
# .. TODO:: implement multi-block psd
n_segments = None
if n_segments is not None:
if not isinstance(n_segments, int):
raise ValueError(f"{n_segments} is not a valid number of segments")
# catch function call cases 1, ..., 4
# 1: no arguments: possibly cached results
if n_lines is None and n_segments is None:
if self.n_lines_bt is None and self.m_lags_bt is not None:
n_lines = (self.m_lags_bt - 1) * 2
else:
n_lines = self.n_lines_bt
n_segments = self.n_segments_bt
if n_lines is None and n_segments is None:
raise RuntimeError('Either n_lines or n_segments must be provided on first run.')
# 2: no variance of spectra requested
if n_segments is None and n_lines is not None:
# it increases variance and does not improve the result in any other sense
# when using less than the maximally possible number of segments
N_segment = n_lines
_n_segments = N // N_segment
# 3. variance of spectra requested, n_lines not of interest (when called from corr_welch)
elif n_segments is not None and n_lines is None:
_n_segments = n_segments
N_segment = N // n_segments
n_lines = N_segment
# 4. variance of spectra with given n_lines requested
else:
_n_segments = n_segments
N_segment = min(N // _n_segments, n_lines)
if n_lines % 2: # repeat the check from above
n_lines += 1
if N_segment > n_lines:
raise ValueError(f"The segment length {N_segment} must not be larger than the number of frequency lines {n_lines}")
if N_segment < n_lines / 2:
logger.warning(f"The segment length {N_segment} is much smaller than the number of frequency lines {n_lines} (zero-padded)")
while True:
# check, if it is possible to simply return previously computed PSD
if kwargs:
logger.debug(f"Not returning because: kwargs provided")
break
if self.psd_matrix_bt is None:
logger.debug(f"Not returning because: self.psd_matrix_bt not available")
break
if self.psd_matrix_bt.shape[2] != n_lines // 2 + 1:
logger.debug(f"Not returning because: n_lines differs from previous")
break
if (self.psd_matrix_bt.shape[1] == self.num_ref_channels) != refs_only:
logger.debug(f"Not returning because: non-/reference-based not matching previous")
break
logger.debug("Using previously computed Power Spectral Density (BT)...")
return self.psd_matrix_bt
logger.info("Estimating Power Spectral Density by Blackman-Tukey's method...")
corr_matrix = self.corr_blackman_tukey(n_lines // 2 + 1, refs_only=refs_only, **kwargs)
num_analised_channels = self.num_analised_channels
if refs_only:
num_ref_channels = self.num_ref_channels
else:
num_ref_channels = num_analised_channels
psd_matrix_shape = (num_analised_channels,
num_ref_channels,
n_lines // 2 + 1)
psd_matrix = np.empty(psd_matrix_shape, dtype=complex)
# create a symmetrical window, i.e. lacking the last 0 (for an even number of lines)
# win = getattr(np, window)(n_lines // 2 + 1)[:n_lines // 2]
win = scipy.signal.get_window(window, n_lines // 2, fftbins=True)
# Zero-Pad both sides (= zero pad once and circular convolution)
# to allow the window to "slide along" the correct number of lags in np.convolve = 3 * n_lines//2 - 1
# here first (!) zero pad is n_lines//2-1 because it is convolve
win_pad = np.concatenate((np.zeros(n_lines // 2 - 1), win, np.zeros(n_lines // 2)))
# Convolve zero-padded and unpadded window
# resulting shape: M - N + 1 = (3 * n_lines//2 - 1) - (n_lines//2) + 1 = 2 * n_lines//2 = n_lines
corr_win = np.convolve(win_pad, win, 'valid')
corr_win /= n_lines // 2 # -np.abs(k_dir) # unbiased not needed here, because it is "windowed"
# normalization factor for power equivalence
norm_fact = self.total_time_steps
# equivalent noise bandwidth of the window for density scaling
eq_noise_bw = np.sum(win ** 2) / np.sum(win) ** 2 * (n_lines // 2)
pbar = simplePbar(num_analised_channels * num_ref_channels)
for channel_1 in range(num_analised_channels):
for channel_2 in range(num_ref_channels):
next(pbar)
# https://en.wikipedia.org/wiki/Cross-correlation#Properties
# for real f and g: R_fg(tau) = R_gf(tau)
# for all f and g: R_fg(-tau) = R_gf(tau)
corr_seq = corr_matrix[channel_1, channel_2,:]
corr_sequence = np.concatenate((np.flip(corr_seq)[:n_lines // 2], corr_seq[:n_lines // 2]))
# normalize 0-lag correlation to signal power -> spectral power
corr_sequence *= norm_fact
# apply window and compute the spectrum by the FFT
spec_btr = np.fft.fft(corr_sequence * corr_win)
# restrict the spectrum to positive frequencies
spec_btr = spec_btr[:n_lines // 2 + 1]
# compensate one-sided
spec_btr *= 2
# compensate window
spec_btr *= eq_noise_bw
psd_matrix[channel_1, channel_2,:] = spec_btr
# plt.show()
logger.debug(f'PSD Auto-/Cross-Powers: {np.mean(np.abs(psd_matrix), axis=2)}')
if self.scaling_factors is None:
# obtain the scaling factors for the PSD which remain,
# even after filtering or any DSP other operation
self.scaling_factors = psd_matrix.max(axis=2)
self.n_lines_bt = n_lines
self.psd_matrix_bt = psd_matrix
self._last_meth = 'blackman-tukey'
return psd_matrix
def welch(self, n_lines, **kwargs):
logger.warning("DeprecationWarning: method welch() will soon be dropped. Use psd_welch and/or corr_welch instead")
psd_matrix = self.psd_welch(n_lines, **kwargs)
corr_matrix = self.corr_welch()
return corr_matrix, psd_matrix
[docs]
def correlation(self, m_lags=None, method=None, **kwargs):
'''
A convenience method for obtaining the correlation sequence by
the default or any specified estimation method.
Parameters
----------
m_lags: integer, optional
Number of lags (positive). Note: this includes the
0-lag, therefore excludes the m_lags-lag.
method: str, optional
The method to use for spectral estimation
Other Parameters
-----------------
kwargs:
Additional parameters are passed to the spectral estimation method
Returns
-------
corr_matrix: np.ndarray
Array of shape (num_channels, num_ref_channels, m_lags)
containing the correlation values of the respective
channels and lags
'''
logger.debug(f'Arguments correlation: m_lags={m_lags}, method={method}, {kwargs}')
if method is None:
if self._last_meth is None:
method = 'blackman-tukey'
else:
method = self._last_meth
if method == 'welch':
return self.corr_welch(m_lags, **kwargs)
elif method == 'blackman-tukey':
return self.corr_blackman_tukey(m_lags, **kwargs)
else:
raise ValueError(f'Unknown method {method}')
[docs]
def psd(self, n_lines=None, method=None, **kwargs):
'''
A convenience method for obtaining the PSD by the default or any
specified estimation method.
Parameters
----------
n_lines: integer, optional
Number of frequency lines (positive + negative)
method:
The method to use for spectral estimation
Other Parameters
----------------
**kwargs:
Additional parameters are passed to the spectral estimation method
Returns
-------
psd_matrix: np.ndarray
Array of shape (num_channels, num_ref_channels, n_lines // 2 + 1)
containing the power density values of the respective
channels and frequencies
'''
logger.debug(f'Arguments psd: n_lines={n_lines}, method={method}, {kwargs}')
if method is None:
if self._last_meth is None:
method = 'welch'
else:
method = self._last_meth
if method == 'welch':
# if n_lines is None:
# n_lines = self.n_lines_wl
# if not isinstance(n_lines, int):
# raise ValueError(f"{n_lines} is not a valid number of frequency lines for a psd sequence")
return self.psd_welch(n_lines, **kwargs)
elif method == 'blackman-tukey':
# if n_lines is None:
# if self.n_lines_bt is not None:
# n_lines = self.n_lines_bt
# elif self.m_lags_bt is not None:
# n_lines = (self.m_lags_bt - 1) * 2
# if not isinstance(n_lines, int):
# raise ValueError(f"{n_lines} is not a valid number of frequency lines for a psd sequence")
return self.psd_blackman_tukey(n_lines, **kwargs)
else:
raise ValueError(f'Unknown method {method}')
[docs]
def sv_psd(self, n_lines=None, **kwargs):
'''
Compute the singular values of the power spectral density matrices,
for which the complete (all cross spectral densities) matrices are used.
Parameters
----------
n_lines: integer, optional
Number of frequency lines (positive + negative)
Other Parameters
----------------
kwargs:
Additional parameters are passed to the spectral estimation method
'''
if self.s_vals_psd is not None and (n_lines is None or self.s_vals_psd.shape[1] == n_lines // 2 + 1):
return self.s_vals_psd
psd_matrix = self.psd(n_lines,
# refs_only=False,
**kwargs)
n_sigma = np.min(psd_matrix.shape[:2])
# n_sigma = self.num_analised_channels
n_lines = self.n_lines
s_vals_psd = np.empty((n_sigma, n_lines // 2 + 1))
for k in range(n_lines // 2 + 1):
# might use only real part to account for slightly asynchronous data
# see [Au (2017): OMA, Chapter 7.5]
s_vals_psd[:, k] = np.linalg.svd(psd_matrix[:,:, k], True, False)
self.s_vals_psd = s_vals_psd
return s_vals_psd
[docs]
class SignalPlot(object):
[docs]
def __init__(self, prep_signals):
if not isinstance(prep_signals, PreProcessSignals):
logger.warning(f'Argument prep_signals ist not of type PreProcessSignals but {type(prep_signals)}')
self.prep_signals = prep_signals
[docs]
def plot_signals(self,
channels=None,
per_channel_axes=False,
timescale='time',
psd_scale='db',
axest=None,
axesf=None,
plot_kwarg_dict={},
**kwargs):
'''
Plot time domain and/or frequency domain signals in various configurations:
1. time history and spectrum of a single channel in two axes -> set channels = [channel] goto 2
2. time history of multiple channels (all channels or specified)
* if axes arguments are not None:must be (tuples, lists, ndarrays) of size = (num_channels,) regardless of the actual figure layout
* else: generate axes for each channel and arrange them in lists
a. time domain overlay in a single axes -> single axes is repeated in the axes list
i. spectrum overlay in a single axes -> single axes is repeated in the axes list
ii. svd spectrum in a single axes -> needs an additional argument
b. in multiple axes' in a grid figure -> axes are generated as subplots
i. spectrum in multiple axes' -> axes are generated as subplots
ii. svd spectrum in a single axes -> needs an additional argument
Parameters
----------
channels : None, list-of-int, list-of-str, int, str
The selected channels (see self._channel_numbers for explanation)
per_channel_axes: bool
Whether to plot all channels into a single or multiple axes
timescale: str ['time', 'samples', 'lags']
Whether to display time, sample or lag values on the horizontal axis
'lags' implies plotting (auto)-correlations instead of raw time histories
psd_scale: str, ['db', 'power', 'rms', 'svd', 'phase']
Scaling/Output quantity of the ordinate (value axis)
axest: ndarray of size num_channels of matplotlib.axes.Axes objects
User provided axes objects, into which to plot time domain signals
axesf: ndarray of size num_channels of matplotlib.axes.Axes objects
User provided axes objects, into which to plot spectra
Other Parameters
----------------
plot_kwarg_dict:
A dictionary to pass arguments to matplotlib.plot
kwargs:
Additional kwargs are passed to the spectral estimation method
.. TO DO::
* share y-axis scaling on axes' only between channels of the same
measurement quantity (acceleration, velocity, displacement/strains)
'''
prep_signals = self.prep_signals
refs = kwargs.pop('refs', None)
channel_numbers, ref_numbers = prep_signals._channel_numbers(channels, refs)
all_ref_numbers = set(sum(ref_numbers, []))
# if all requested reference channels are in prep_signals.ref_channels,
# a reduced correlation function may be computed
refs_only = all_ref_numbers.issubset(prep_signals.ref_channels)
# if not all are needed, but the user requested so, compute full correlation matrix
if (refs_only and not kwargs.pop('refs_only', True)) or psd_scale == 'svd':
refs_only = False
num_channels = len(channel_numbers)
if axest is None or axesf is None:
if per_channel_axes:
if psd_scale != 'svd':
# creates a subplot with side by side time and frequency domain plots for each channel
_, axes = plt.subplots(nrows=num_channels,
ncols=2,
sharey='col',
sharex='col')
if axest is None:
axest = axes[:, 0]
if axesf is None:
axesf = axes[:, 1]
else:
if axest is None:
# creates a subplot for time domain plots of each channel
nxn = int(np.ceil(np.sqrt(num_channels)))
_, axest = plt.subplots(nrows=int(np.ceil(num_channels / nxn)),
ncols=nxn,
sharey=True,
sharex=True)
axest = axest.flatten()
if axesf is None:
# creates a separate figure for the svd spectrum
_, axesf = plt.subplots(nrows=1,
ncols=1)
axesf = np.repeat(axesf, num_channels)
else:
if axest is None:
# create a single figure for overlaying all time domain plots
_, axest = plt.subplots(nrows=1,
ncols=1)
axest = np.repeat(axest, num_channels)
if axesf is None:
# create a single figure for overlaying all spectra or svd spectrum
_, axesf = plt.subplots(nrows=1,
ncols=1)
axesf = np.repeat(axesf, num_channels)
# Check the provided axes objects
if per_channel_axes:
if len(axest) < num_channels:
raise ValueError(f'The number of provided axes objects '
f'(time domain) = {len(axest)} does not match the '
f'number of channels={num_channels}')
if per_channel_axes and psd_scale != 'svd':
if len(axesf) < num_channels:
raise ValueError(f'The number of provided axes objects '
f'(frequency domain) = {len(axesf)} does not match the '
f'number of channels={num_channels}')
if not per_channel_axes:
if not isinstance(axest, (tuple, list, np.ndarray)):
axest = np.repeat(axest, num_channels)
elif len(axest) == 1:
axest = np.repeat(axest, num_channels)
elif len(axest) < num_channels:
raise ValueError(f'The number of provided axes objects '
f'(time domain) = {len(axest)} does not match the '
f'number of channels={num_channels}')
if not per_channel_axes or psd_scale:
if not isinstance(axesf, (tuple, list, np.ndarray)):
axesf = np.repeat(axesf, num_channels)
elif len(axesf) == 1:
print(axesf, num_channels)
axesf = np.repeat(axesf, num_channels)
elif len(axesf) < num_channels:
raise ValueError(f'The number of provided axes objects '
f'(frequency domain) = {len(axesf)} does not match the '
f'number of channels={num_channels}')
# precompute relevant spectral matrices
n_lines = kwargs.pop('n_lines', None)
method = kwargs.pop('method', None)
prep_signals.psd(n_lines, method, refs_only=refs_only, **kwargs.copy())
if timescale == 'lags':
prep_signals.correlation(prep_signals.m_lags, method, refs_only=refs_only, **kwargs.copy())
for axt, axf, channel in zip(axest, axesf, channel_numbers):
if timescale == 'lags':
# omitting **kwargs here to not trigger recomputation, except refs_only
self.plot_correlation(prep_signals.m_lags, [channel], axt, timescale, refs,
plot_kwarg_dict.copy(),
refs_only=refs_only, method=method)
else:
self.plot_timeseries(channels=[channel], ax=axt, scale='timescale', **plot_kwarg_dict.copy())
axt.grid(True, axis='y', ls='dotted')
# omitting **kwargs here to not trigger recomputation, except refs_only
self.plot_psd(prep_signals.n_lines, [channel], axf, psd_scale, refs,
plot_kwarg_dict.copy(),
refs_only=refs_only, method=method)
if not per_channel_axes:
axest[-1].legend()
axesf[-1].legend()
else:
figt = axest[0].get_figure()
figt.legend()
figf = axesf[0].get_figure()
figf.legend()
return axest, axesf
[docs]
def plot_timeseries(self, channels=None, ax=None, scale='time', **kwargs):
'''
Plots the time histories of the signals
Parameters
----------
channels : int, list, tuple, np.ndarray
The channels to plot, may be names, indices, etc.
ax: matplotlib.axes.Axes, optional
Matplotlib Axes object to plot into
scale: str, ['lags','samples']
Whether to display time or sample values on the horizontal axis
Other Parameters
----------------
kwargs :
Additional kwargs are passed to matplotlib.plot
Returns
-------
ax: matplotlib.axes.Axes, optional
Matplotlib Axes object containing the graphs
.. TODO::
* correct labeling of channels and axis (using accel\_, velo\_, and disp\_channels)
'''
prep_signals = self.prep_signals
signals = prep_signals.signals
t = prep_signals.t
if scale == 'samples':
t *= prep_signals.sampling_rate
xlabel = '$n$ [-]'
ylabel = '$f[n]$ [...]'
else:
xlabel = '$t$ [s]'
ylabel = '$f(t)$ [...]'
channel_numbers, _ = prep_signals._channel_numbers(channels)
if ax is None:
ax = plt.subplot(111)
for channel in channel_numbers:
if channel in prep_signals.accel_channels: f = 'a'
elif channel in prep_signals.velo_channels: f = 'v'
elif channel in prep_signals.disp_channels: f = 'd'
else: f = 'f'
channel_name = prep_signals.channel_headers[channel]
ax.plot(t, signals[:, channel], label=f'${f}_\mathrm{{{channel_name}}}$', **kwargs)
ax.set_xlim((0, prep_signals.duration))
if ax.get_subplotspec().is_last_row():
ax.set_xlabel(xlabel)
if ax.get_subplotspec().is_first_col():
ax.set_ylabel(ylabel)
return ax
[docs]
def plot_correlation(self, m_lags=None, channels=None, ax=None,
scale='lags', refs=None, plot_kwarg_dict={}, **kwargs):
'''
Plots the Cross- and Auto-Correlation sequences of the signals.
If correlations have not been estimated yet and no method
parameter is supplied, Blackman-Tukeys's method is used, else the
most recently used estimation method is employed.
Parameters
----------
m_lags: integer, optional
Number of lags (positive). Note: this includes the
0-lag, therefore excludes the m_lags-lag.
channels : int, list, tuple, np.ndarray
The channels to plot, may be names, numbers/indices, etc.
ax: matplotlib.axes.Axes, optional
Matplotlib Axes object to plot into
scale: str, ['lags','samples']
Whether to display lag or sample values on the horizontal axis
refs: 'auto', list-of-indices, optional
Reference channels to consider for cross-correlations
Other Parameters
----------------
method:
The method to use for spectral estimation
plot_kwarg_dict:
A dictionary to pass arguments to matplotlib.plot
**kwargs :
Additional kwargs are passed to the spectral estimation
method or contain figure/axes formatting options
Returns
-------
ax: matplotlib.axes.Axes, optional
Matplotlib Axes object containing the graphs
.. TODO::
* correct labeling of channels and axis (using accel\_, velo\_, and disp\_channels)
'''
prep_signals = self.prep_signals
method = kwargs.pop('method', prep_signals._last_meth)
# assert method is not None
# inspect, which reference channels are needed; ref_numbers is a list-of-lists
channel_numbers, ref_numbers = prep_signals._channel_numbers(channels, refs)
all_ref_numbers = set(sum(ref_numbers, []))
# if all requested reference channels are in prep_signals.ref_channels,
# a reduced correlation function may be computed
refs_only = all_ref_numbers.issubset(prep_signals.ref_channels)
# if not all are needed, but the user requested so or full correlation matrix has been computed already -> use that
if refs_only:
if method == 'welch' and prep_signals.corr_matrix_wl is not None:
refs_only = prep_signals.num_ref_channels == prep_signals.corr_matrix_wl.shape[1]
logger.debug(f'reverting refs_only: False -> Welch precomputed')
elif method == 'blackman-tukey' and prep_signals.corr_matrix_bt is not None:
refs_only = prep_signals.num_ref_channels == prep_signals.corr_matrix_bt.shape[1]
logger.debug(f'reverting refs_only: False -> Blackman-Tukey precomputed')
if not kwargs.pop('refs_only', True):
refs_only = False
logger.debug(f'reverting refs_only: False -> User input')
corr_matrix = prep_signals.correlation(m_lags, refs_only=refs_only, method=method, **kwargs)
assert refs_only is (prep_signals.num_ref_channels == corr_matrix.shape[1])
lags = prep_signals.lags
if scale == 'samples':
lags *= prep_signals.sampling_rate
xlabel = '$m$ [-]'
ylabel = '$\hat{R}_{i,j}[m]$ [...]'
else:
xlabel = '$\\tau$ [s]'
ylabel = '$\hat{R}_{i,j}(\\tau)$ [...]'
if ax is None:
plt.figure()
ax = plt.subplot(111)
# print(lags.shape, corr_matrix.shape)
for channel_number, current_ref_numbers in zip(channel_numbers, ref_numbers):
channel_name = prep_signals.channel_headers[channel_number]
for ref_index, ref_number in enumerate(current_ref_numbers):
if refs_only:
# reduced-channel correlation matrix is indexed by reference channel indices
corr = corr_matrix[channel_number, ref_index,:]
else:
# full-channel correlation matrix is indexed by reference channel numbers
corr = corr_matrix[channel_number, ref_number,:]
if prep_signals._last_meth == 'welch':
norm_fact = prep_signals.n_lines_wl
elif prep_signals._last_meth == 'blackman-tukey':
norm_fact = prep_signals.total_time_steps
else:
raise RuntimeError('Last used method was not stored in prep_signals object.')
if ref_number == channel_number:
label = f'$\hat{{R}}_\mathrm{{{channel_name}}}$'
else:
ref_name = prep_signals.channel_headers[ref_number]
label = f'$\hat{{R}}_\mathrm{{{ref_name},{channel_name}}}$'
ax.plot(lags, corr * norm_fact, label=label, **plot_kwarg_dict)
ax.set_xlim((0, lags.max()))
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
return ax
[docs]
def plot_psd(self, n_lines=None, channels=None, ax=None,
scale='db', refs=None, plot_kwarg_dict={}, **kwargs):
'''
Plots the Cross- and Auto-Power-Spectral Density of the signals.
PSD estimation is performed by default using Welch's method.
Parameters
----------
n_lines: integer, optional
Number of frequency lines (positive + negative)
channels : int, list, tuple, np.ndarray
The channels to plot, may be names, indices, etc.
ax: matplotlib.axes.Axes, optional
Matplotlib Axes object to plot into
scale: str, ['db', 'power', 'rms', 'svd', 'phase']
Scaling/Output quantity of the ordinate (value axis)
refs: 'auto', list-of-indices, optional
Reference channels to consider for cross-correlations
Other Parameters
----------------
method:
The method to use for spectral estimation
plot_kwarg_dict:
A dictionary to pass arguments to matplotlib.plot
**kwargs :
Additional kwargs are passed to the spectral estimation method
Returns
-------
ax: matplotlib.axes.Axes, optional
Matplotlib Axes object containing the graphs
.. TODO::
* correct labeling of channels and axis (using accel\_, velo\_, and disp\_channels)
* do we need a svd in non-db scale?
* do we need sample scaling on the abscissa
'''
prep_signals = self.prep_signals
assert scale in ['db', 'power', 'rms', 'svd', 'phase']
method = kwargs.pop('method', None)
if scale == 'svd':
if refs is not None or kwargs.pop('refs_only', False):
logger.warning("Reference channels are not used in SVD PSD.")
refs_only = False
channel_numbers, ref_numbers = prep_signals._channel_numbers(channels, [0])
psd_matrix = prep_signals.sv_psd(n_lines, method=method, refs_only=refs_only, **kwargs)
else:
# inspect, which reference channels are needed; ref_numbers is a list-of-lists
channel_numbers, ref_numbers = prep_signals._channel_numbers(channels, refs)
all_ref_numbers = set(sum(ref_numbers, []))
# if all requested reference channels are in prep_signals.ref_channels,
# a reduced correlation function may be computed
refs_only = all_ref_numbers.issubset(prep_signals.ref_channels)
# if not all are needed, but the user requested so or full psd matrix has been computed already -> use that
if refs_only:
if method == 'welch' and prep_signals.psd_matrix_wl is not None:
refs_only = prep_signals.num_ref_channels == prep_signals.psd_matrix_wl.shape[1]
elif method == 'blackman-tukey' and prep_signals.psd_matrix_bt is not None:
refs_only = prep_signals.num_ref_channels == prep_signals.psd_matrix_bt.shape[1]
if not kwargs.pop('refs_only', True):
refs_only = False
psd_matrix = prep_signals.psd(n_lines, refs_only=refs_only, method=method, **kwargs)
assert refs_only is (prep_signals.num_ref_channels == psd_matrix.shape[1])
# prep_signals.freqs refers to the last call of any spectral estimation method
freqs = prep_signals.freqs
if ax is None:
plt.figure()
ax = plt.subplot(111)
for channel_number, current_ref_numbers in zip(channel_numbers, ref_numbers):
channel_name = prep_signals.channel_headers[channel_number]
for ref_index, ref_number in enumerate(current_ref_numbers):
if scale == 'svd':
psd = psd_matrix[channel_number,:]
psd = 10 * np.log10(np.abs(psd))
ref_name = ''
elif refs_only:
# reduced-size psd matrix is indexed by reference channel indices
psd = psd_matrix[channel_number, ref_index,:]
ref_name = prep_signals.channel_headers[ref_number]
else:
# full-size psd matrix is indexed by reference channel numbers
psd = psd_matrix[channel_number, ref_number,:]
ref_name = prep_signals.channel_headers[ref_number]
if scale == 'db':
psd = 10 * np.log10(np.abs(psd))
elif scale == 'power':
psd = np.abs(psd)
elif scale == 'rms':
psd = np.sqrt(np.abs(psd))
elif scale == 'phase':
psd = np.angle(psd) / np.pi * 180
if scale == 'svd':
label = f'$\hat{{\sigma}}_\mathrm{{{channel_number}}}$'
elif ref_number == channel_number:
label = f'$\hat{{S}}_\mathrm{{{channel_name}}}$'
else:
ref_name = prep_signals.channel_headers[ref_number]
label = f'$\hat{{S}}_\mathrm{{{ref_name},{channel_name}}}$'
ax.plot(freqs, psd, label=label, **plot_kwarg_dict)
ax.set_xlim((0, freqs.max()))
ax.set_xlabel('$f$ [Hz]')
if scale == 'svd':
ax.set_ylabel('Singular Value Magnitude [dB]')
elif scale == 'db':
ax.set_ylabel('PSD [dB]')
elif scale == 'power':
ax.set_ylabel('Power Spectral Density [...]')
elif scale == 'rms':
ax.set_ylabel('Magnitude Spectral Density [...]')
elif scale == 'phase':
ax.set_ylabel('Cross Spectrum Phase[°]')
return ax
def plot_svd_spectrum(self, NFFT=512, log_scale=True, ax=None):
prep_signals = self.prep_signals
logger.warning("DeprecationWarning: method plot_svd_spectrum() will soon be dropped. Use plot_psd(scale='svd')")
if not log_scale:
raise NotImplementedError("Log scale for SVD plots cannot be deactivated")
return prep_signals.plot_psd(n_lines=NFFT, ax=ax, scale='svd')
[docs]
def load_measurement_file(fname, **kwargs):
'''
assign this function to the class before instantiating the object
PreProcessSignals.load_measurement_file = load_measurement_file
'''
# define a function to return the following variables
headers = ['channel_name', 'channel_name']
units = ['unit', 'unit', ]
start_time = datetime.datetime()
sample_rate = float()
measurement = np.array([])
# channels im columns
assert measurement.shape[0] > measurement.shape[1]
return headers, units, start_time, sample_rate, measurement
[docs]
def spectral_estimation():
# signal parameters
N = 2 ** 15
fs = 128
dt = 1 / fs
t, y, omegas, psd, corr = SDOF_ambient(N, fs)
# spectral estimation parameters
nperseg_fac = 1
window = np.hamming
n_lines = N // nperseg_fac
tau = np.linspace(0, n_lines / fs, n_lines, False)
omegasr = np.fft.rfftfreq(n_lines, 1 / fs) * 2 * np.pi
do_plot = True
if do_plot:
fig1, axes = plt.subplots(2, 2, sharex='row', sharey='row')
ax1, ax2, ax3, ax4 = axes.flat
for ax in axes.flat:
ax.axhline(0, color='gray', linewidth=0.5)
handles = []
if do_plot:
ax1.plot(np.fft.fftshift(omegas) / 2 / np.pi, np.fft.fftshift(psd), label='analytic', color='black', lw=0.5)
ax3.plot(t, corr, label='analytic', color='black', lw=0.5)
ax2.plot(np.fft.fftshift(omegas) / 2 / np.pi, np.fft.fftshift(psd), label='analytic', color='black', lw=0.5)
handles.append(ax4.plot(t, corr, label='analytic', color='black', lw=0.5)[0])
print(f'Theoretic powers')
print(f'PSD: {np.mean(psd)}')
# print(f'0-lag correlation: {correlation[0]}')
prep_signals = PreProcessSignals(y[:, np.newaxis], fs)
prep_signals.plot_signals(timescale='lags', axest=[ax3], axesf=[ax1], dbscale=False)
plt.show()
[docs]
def SDOF_ambient(N=2 ** 15, fs=128):
dt = 1 / fs
omegas = np.fft.fftfreq(N, 1 / fs) * 2 * np.pi
t = np.linspace(0, N / fs, N, False)
# generate sdof system
zeta = 0.05
omega = 5 * 2 * np.pi * np.sqrt(1 - zeta ** 2) # damped f = 5 Hz
m = 1
k = omega ** 2 * m
# c = zeta*2*sqrt(m*k)
H = -omegas ** 2 / (k * (1 + 2j * zeta * omegas / omega - (omegas / omega) ** 2))
# generate ambient input forces
f_scale = 10
phase = np.random.uniform(-np.pi, np.pi, (N // 2 + 1,))
ampli = np.exp(1j * np.concatenate((phase[:N // 2 + N % 2], -1 * np.flip(phase[1:]))))
Pomega = f_scale * np.ones(N, dtype=complex) * ampli
# make the ifft real-valued
Pomega.imag[0] = 0
Pomega[N // 2 + N % 2] = np.abs(Pomega[N // 2 + N % 2])
H.imag[0] = 0
H[N // 2 + N % 2] = np.abs(H[N // 2 + N % 2])
# generate the ambient response signal
y = np.fft.ifft(H * Pomega)
# add noise
noise = np.random.normal(0, 0.125, N) # noise adds zero energy due to zero mean?
# discard machine-precision zero imaginary parts
if np.all(np.isclose(y.imag, 0)): y = y.real
else: raise RuntimeError()
power = np.sum(y ** 2)
power_noise = np.sum(noise ** 2)
print(f'Power time-domain: {power}')
print(f'SNR={10*np.log10(power/power_noise)} dB')
# compute analytical spectrum and correlation functions with correct scaling
# psd = np.abs(H)**2*f_scale**2
psd = omegas ** 4 / (k ** 2 * (1 + (4 * zeta ** 2 - 2) * (omegas / omega) ** 2 + (omegas / omega) ** 4)) * f_scale ** 2
psd /= np.mean(psd)
# analytical solution for convolution difficult, use numerical inverse of analytical PSD
corr = np.fft.ifft(psd)
# discard machine-precision zero imaginary parts
if np.all(np.isclose(corr.imag, 0)): corr = corr.real
else: raise RuntimeError()
return t, (y + noise) / np.sqrt(power), omegas, psd, corr
[docs]
def system_frf(N=2 ** 16, fmax=130, L=200, E=2.1e11, rho=7850, A=0.0343, zeta=0.01):
df = fmax / (N // 2 + 1)
dt = 1 / df / N
fs = 1 / dt
omegas = np.linspace(0, fmax, N // 2 + 1, False) * 2 * np.pi
assert df * 2 * np.pi == (omegas[-1] - omegas[0]) / (N // 2 + 1 - 1)
num_modes = int(np.floor((fmax * 2 * np.pi * np.sqrt(rho / E) * L / np.pi * 2 + 1) / 2))
omegans = (2 * np.arange(1, num_modes + 1) - 1) / 2 * np.pi / L * np.sqrt(E / rho)
frf = np.zeros((N // 2 + 1,), dtype=complex)
zetas = np.zeros_like(omegans)
zetas[:] = zeta
kappas = omegans ** 2
# kappas[:]=E*A/L
for j, (omegan, zeta) in enumerate(zip(omegans, zetas)):
frf += -omegan ** 2 / (kappas[j] * (1 + 2 * 1j * zeta * omegas / omegan - (omegas / omegan) ** 2)) # Accelerance
return omegas, frf
if __name__ == '__main__':
fname = '/vegas/users/staff/womo1998/git/pyOMA/tests/files/prepsignals.npz'
prep_signals_compat = PreProcessSignals.load_state(fname)
prep_signals_compat.plot_signals(None, True)
# spectral_estimation()
# main()