Source code for pylife.stress.timesignal

# Copyright (c) 2019-2021 - for information on the respective copyright owner
# see the NOTICE file and/or the repository
# https://github.com/boschresearch/pylife
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__author__ = "Johannes Mueller"
__maintainer__ = __author__

import numpy as np
import pandas as pd
import scipy.stats as stats
import scipy.signal as signal

[docs]class TimeSignalGenerator: '''Generates mixed time signals The generated time signal is a mixture of random sets of * sinus signals * gauss signals (not yet) * log gauss signals (not yet) For each set the user supplys a dict describing the set:: sinus_set = { 'number': number of signals 'amplitude_median': 'amplitude_std_dev': 'frequency_median': 'frequency_std_dev': 'offset_median': 'offset_std_dev': } The amplitudes (:math:`A`), fequencies (:math:`\omega`) and offsets (:math:`c`) are then norm distributed. Each sinus signal looks like :math:`s = A \sin(\omega t + \phi) + c` where :math:`phi` is a random value between 0 and :math:`2\pi`. So the whole sinus :math:`S` set is given by the following expression: :math:`S = \sum^n_i A_i \sin(\omega_i t + \phi_i) + c_i`. ''' def __init__(self, sample_rate, sine_set, gauss_set, log_gauss_set): sine_amplitudes = stats.norm.rvs(loc=sine_set['amplitude_median'], scale=sine_set['amplitude_std_dev'], size=sine_set['number']) sine_frequencies = stats.norm.rvs(loc=sine_set['frequency_median'], scale=sine_set['frequency_std_dev'], size=sine_set['number']) sine_offsets = stats.norm.rvs(loc=sine_set['offset_median'], scale=sine_set['offset_std_dev'], size=sine_set['number']) sine_phases = 2. * np.pi * np.random.rand(sine_set['number']) self.sine_set = list(zip(sine_amplitudes, sine_frequencies, sine_phases, sine_offsets)) self.sample_rate = sample_rate self.time_position = 0.0
[docs] def query(self, sample_num): '''Gets a sample chunk of the time signal Parameters ---------- sample_num : int number of the samples requested Returns ------- samples : 1D numpy.ndarray the requested samples You can query multiple times, the newly delivered samples will smoothly attach to the previously queried ones. ''' samples = np.zeros(sample_num) end_time_position = self.time_position + (sample_num-1) / self.sample_rate for ampl, omega, phi, offset in self.sine_set: periods = np.floor(self.time_position / omega) start = self.time_position - periods * omega end = end_time_position - periods * omega time = np.linspace(start, end, sample_num) samples += ampl * np.sin(omega * time + phi) + offset self.time_position = end_time_position + 1. / self.sample_rate return samples
[docs] def reset(self): ''' Resets the generator A resetted generator behaves like a new generator. ''' self.time_position = 0.0
[docs]class TimeSignalPrep: def __init__(self,df): self.df = df
[docs] def resample_acc(self,sample_rate_new = 1): """ Resampling the time series Parameters ---------- self: DataFrame time_col: str column name of the time column sample_rate_new: float sample rate of the resampled time series Returns ------- DataFrame """ # dfResample.index = np.arange(self.df.index.min(),self.df.index.max(),1/sample_rate_new) index_new = np.linspace(self.df.index.min(), self.df.index.min() + np.floor((self.df.index.max()-self.df.index.min())*sample_rate_new)/sample_rate_new, int(np.floor(self.df.index.max()-self.df.index.min())*sample_rate_new + 1)) dfResample = pd.DataFrame(index = index_new) for colakt in self.df.columns: dfResample[colakt] = np.interp(dfResample.index,self.df.index,self.df[colakt]) return dfResample
[docs] def butter_bandpass(self, lowcut, highcut, fs, order=5): """Use the functonality of scipy""" nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq b, a = signal.butter(order, [low, high], btype='band') TSout = signal.filtfilt(b, a, self.df) return TSout
[docs] def running_stats_filt(self,col,window_length = 2048,buffer_overlap = 0.1,limit = 0.05, method = "rms"): """ Calculates the running statistics of one DataFrame column and drops the rejected data points from the whole DataFrame. **Attention**: Reset_index is used Parameters ----------- self: DataFrame col: str column name of the signal for the runnings stats calculation window_length: int window length of the single time snippet, default is 2048 buffer_overlap: float overlap parameter, 0.1 is equal to 10 % overlap of every buffer, default is 0.1 limit: float limit value of skipping values, 0.05 for example skips all values which buffer method parameter is lower than 5% of the total max value, default is 0.05 method: str method: 'rms', 'min', 'max', 'abs', default is 'rms' Returns ------- DataFrame """ df = self.df.reset_index(drop = True) delta_t = self.df.index.values[1]-self.df.index.values[0] hop = int(window_length*(1-buffer_overlap)) # absolute stepsize df = df.loc[:int(np.floor(len(df)/hop)*hop),:] n_iter = 1+int((len(df)-window_length)/(hop)) ind_act = 0 stats_list = [] for ii in range (n_iter): if method == "rms": stats_list.append( np.sqrt(np.mean(df[col][ind_act:ind_act+window_length]**2))) elif method == "max": stats_list.append(np.max(df[col][ind_act:ind_act+window_length])) elif method == "min": stats_list.append(np.abs(np.min(df[col][ind_act:ind_act+window_length]))) elif method == "abs": stats_list.append(np.max(np.abs(df[col][ind_act:ind_act+window_length]))) ind_act = ind_act+hop try: stats_list = pd.DataFrame({"stats": np.asarray(stats_list)})#, except: print(str(stats_list)) # index = np.arange(0,len(np.asarray(stats_list))-1, # np.asarray(stats_list))) stats_list = stats_list[stats_list["stats"] < limit*stats_list["stats"].max()] for ind_act in stats_list.index: df = df.drop(index = np.arange(ind_act*hop,ind_act*hop+window_length), errors = 'ignore') df.index = np.linspace(0,delta_t*(len(df)-1), len(df)) return df