Source code for pylife.stress.timesignal

# Copyright (c) 2019-2023 - for information on the respective copyright owner
# see the NOTICE file and/or the repository
# https://github.com/boschresearch/pylife
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module for time signal handling

Warning
-------

This module is not considered finalized even though it is part of pylife-2.0.
Breaking changes might occur in upcoming minor releases.
"""

__author__ = "Johannes Mueller"
__maintainer__ = __author__

import numpy as np
import pandas as pd
import scipy.stats as stats
import scipy.signal as signal

from matplotlib.mlab import psd

try:
    import tsfresh as ts
    _HAVE_TSFRESH = True
except ModuleNotFoundError:
    _HAVE_TSFRESH = False


[docs] class TimeSignalGenerator: r"""Generates mixed time signals The generated time signal is a mixture of random sets of sinus signals For each set the user supplys a dict describing the set:: sinus_set = { 'number': number of signals 'amplitude_median': 'amplitude_std_dev': 'frequency_median': 'frequency_std_dev': 'offset_median': 'offset_std_dev': } The amplitudes (:math:`A`), fequencies (:math:`\omega`) and offsets (:math:`c`) are then norm distributed. Each sinus signal looks like :math:`s = A \sin(\omega t + \phi) + c` where :math:`phi` is a random value between 0 and :math:`2\pi`. So the whole sinus :math:`S` set is given by the following expression: :math:`S = \sum^n_i A_i \sin(\omega_i t + \phi_i) + c_i`. """ def __init__(self, sample_rate, sine_set, gauss_set, log_gauss_set): sine_amplitudes = stats.norm.rvs(loc=sine_set['amplitude_median'], scale=sine_set['amplitude_std_dev'], size=sine_set['number']) sine_frequencies = stats.norm.rvs(loc=sine_set['frequency_median'], scale=sine_set['frequency_std_dev'], size=sine_set['number']) sine_offsets = stats.norm.rvs(loc=sine_set['offset_median'], scale=sine_set['offset_std_dev'], size=sine_set['number']) sine_phases = 2. * np.pi * np.random.rand(sine_set['number']) self.sine_set = list( zip(sine_amplitudes, sine_frequencies, sine_phases, sine_offsets)) self.sample_rate = sample_rate self.time_position = 0.0
[docs] def query(self, sample_num): """Gets a sample chunk of the time signal Parameters ---------- sample_num : int number of the samples requested Returns ------- samples : 1D numpy.ndarray the requested samples You can query multiple times, the newly delivered samples will smoothly attach to the previously queried ones. """ samples = np.zeros(sample_num) end_time_position = self.time_position + \ (sample_num-1) / self.sample_rate for ampl, omega, phi, offset in self.sine_set: periods = np.floor(self.time_position / omega) start = self.time_position - periods * omega end = end_time_position - periods * omega time = np.linspace(start, end, sample_num) samples += ampl * np.sin(omega * time + phi) + offset self.time_position = end_time_position + 1. / self.sample_rate return samples
[docs] def reset(self): """ Resets the generator A resetted generator behaves like a new generator. """ self.time_position = 0.0
[docs] def fs_calc(df): """ Calculates the sample frequency of a DataFrame time series Parameters ---------- df : DataFrame time series. Returns ------- fs : int, float sample freqency """ try: fs = 1/np.mean(np.diff(df.index)) except TypeError: print("Index has to be a number not a string. We assume fs = 1") fs = 1 return fs
[docs] def resample_acc(df, fs=1): """ Resamples a pandas time series DataFrame Parameters ---------- df: DataFrame time_col: str column name of the time column fs: float sample rate of the resampled time series Returns ------- DataFrame """ index_new = np.arange(df.index.min(), df.index.max() + 1/fs, 1/fs) df_rs = pd.DataFrame(df.apply(lambda x: np.interp(index_new, df.index, x)).values, index=index_new, columns=df.columns) return df_rs
[docs] def butter_bandpass(df, lowcut, highcut, order=5): """ Use the functonality of scipy Parameters ---------- df: DataFrame lowcut : float low frequency highcut : float high freqency. order : int, optional Butterworth filter order. The default is 5. Returns ------- TSout : DataFrame """ fs = fs_calc(df) nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq b, a = signal.butter(order, [low, high], btype='bandpass') return df.apply(lambda x: signal.filtfilt(b, a, x, padlen=int(fs/2)))
[docs] def psd_df(df_ts, NFFT=512): """ calculates the psd using Welch algorithm from matplotlib functionality Parameters ---------- df_ts : DataFram time series dataframe NFFT : int, optional BufferSize. The default is 512. Returns ------- df_psd : DataFrame PSD. """ fs = fs_calc(df_ts) df_psd = pd.DataFrame() for col in df_ts: df_psd[col], freq = psd(df_ts[col], Fs=fs,NFFT = NFFT) df_psd.index = pd.Index(freq, name="frequency") return df_psd
def _prepare_rolling(df): """ Adds ID, time to the dataset for TsFresh, We would need different ID's if we had independant timeseries -like timeseries for different robots. Parameters ---------- df: pandas DataFrame input data self : TimeSignalPrep class Returns ------- df : pandas DataFrame output DataFrame with added id, time """ prep_roll = df.copy() prep_roll["id"] = 0 prep_roll["time"] = df.index.values prep_roll["time"] = prep_roll["time"].subtract(prep_roll["time"].values[0]) prep_roll.index = prep_roll["time"] return prep_roll def _roll_dataset(prep_roll_df, window_size=1000, overlap=200): """ Rolls dataset in windows so we can later extract features from every window Parameters ---------- prep_roll: output from prepare_rolling window_size : int , optional window size of the rolled segments -the default is 1000. overlap : int, optional overlap between 2 adjecent windows -The default is 200. Returns ------- df_rolled : pandas DataFrame rolled DataFrame """ # Create Rolled Dataset with Parameter rolling_direction & window_size # throws away the last halfshift rolling_direction = window_size - overlap cycles = int((len(prep_roll_df)-window_size) / rolling_direction)+1 parts = [] # shiften for i in range(cycles): position = (rolling_direction) * i shift = prep_roll_df.iloc[position: position + window_size, :].copy() # change IDs to format (id,time) df = pd.DataFrame({'id': np.int64(np.zeros(len(shift), dtype=int)), 'max_time': shift.iloc[-1, -1]}) shift['id'] = pd.MultiIndex.from_frame(df).to_numpy() parts.append(shift) return pd.concat(parts, ignore_index=True) def _extract_feature_df(df_rolled, feature="maximum"): """Extracts features like "abs_energy" or "maximum" from the rolled dataset with TsFresh Parameters ---------- df_rolled : pandas DataFrame rolled DataFrame from roll_dataset feature : string, optional Extracted feature - only supports one at a time - and only features form tsfresh that dont need extra parameters. The default is "maximum". Returns ------- extracted_features : pandas DataFrame Dataframe of extracted features """ # extract features # fc_parameters = {"abs_energy", "maximum"} fc_parameters = { feature: None, } extracted_features = ts.extract_features( df_rolled, column_id="id", column_sort="time", default_fc_parameters=fc_parameters, n_jobs=0, ) extracted_features.index = range(len(extracted_features)) return extracted_features def _select_relevant_windows(prep_roll, extracted_features, comparison_column_ex, fraction_max=0.25, window_size=1000, overlap=200, n_gridpoints=3, method="keep"): """ Writes n_gridpoints NaN's into the window_sizes with extracted features lower than fraction_max Parameters ---------- prep_roll : pandas DataFrame input data - normally output from perpare_rolling(df) extracted_features : pandas Dataframe DataFrame of features comparison_column_ex: string - name of the extraced feature column it is build: comparison_column + '__' + feauture fraction_max : float percentage of the maximum of the extraced feature. window_size : int window size of the rolled segments -the default is 1000. overlap : int, optional overlap between 2 adjecent windows -The default is 200. Returns ------- df : pandas DataFrame relevant_windows dataframe with NaN's in the windows with too low extracted features """ # get added up abs energy of interval x, if too low set None rolling_direction = window_size - overlap relevant_feature = extracted_features[comparison_column_ex] relevant_windows = prep_roll.copy() just_added_NaNs = False liste = [] for i in range(len(extracted_features)): if relevant_feature[i] <= relevant_feature.max() * fraction_max: if just_added_NaNs is True: liste.append(list(range(0 + i * rolling_direction, window_size + i * rolling_direction))) else: liste.append(list(range(0 + i * rolling_direction, window_size + i * rolling_direction - n_gridpoints))) relevant_windows.iloc[i * rolling_direction + window_size - n_gridpoints:i * rolling_direction + window_size, 0:relevant_windows.shape[1]-2] = None just_added_NaNs = True else: just_added_NaNs = False index_liste = [] """ tail = (len(prep_roll)-window_size) % rolling_direction+1 for i in range(tail): liste.append(len(prep_roll)-i-1) """ liste = list(pd.core.common.flatten(liste)) liste = list(set(liste)) for i in range(len(liste)): index_liste.append(relevant_windows.index[liste[i]]) if method == "keep": relevant_windows = relevant_windows.drop(index_liste, axis=0) elif method == "remove": relevant_windows = relevant_windows.loc[index_liste] return relevant_windows def _polyfit_gridpoints(grid_points, prep_roll, order=3, verbose=False, n_gridpoints=3): """Fills gridpoints with polynomial regression Parameters ---------- gridpoints : pandas DataFrame DataFrame with NaN's as gridpoints prep_roll : pandas DataFrame used to create time axis. DataFrame used to create time axis. order : int, optional Order of polynom The default is 3. verbose : boolean, optional If true plots polyfits. The default is False. n_gridpoints : TYPE, optional Number of gridpoints. The default is 3. Returns ------- df : pandas DataFrame DataFrame with polynomial values at the gridpoints. """ # add a null row at the start and reset time index delta_t = prep_roll.index[1]-prep_roll.index[0] line = pd.DataFrame(grid_points.iloc[:1], index=[- delta_t]) grid_points = pd.concat([grid_points, line], ignore_index=False) grid_points.index = grid_points.index + delta_t poly_gridpoints = grid_points.sort_index() poly_gridpoints.iloc[0, :] = 0 ts_time = prep_roll.iloc[:len(poly_gridpoints)] poly_gridpoints["time"] = ts_time.index.values poly_gridpoints.index = poly_gridpoints["time"] # %% smooth the gaps with polynomial values poly_gridpoints.interpolate(method='polynomial', order=order, inplace=True) return poly_gridpoints
[docs] def clean_timeseries(df, comparison_column, window_size=1000, overlap=800, feature="abs_energy", method="keep", n_gridpoints=3, percentage_max=0.05, order=3): """ Removes segments of the data in which the extracted feature value is lower as percentage_max and fills the gaps with polynomial regression Parameters ---------- df : input pandas DataFrame that shall be cleaned comparison_column: str, column that is used for the feature comparison with percentage max window_size : int, optional window size of the rolled segments - The default is 1000. overlap : int, optional overlap between 2 adjecent windows -The default is 200. feature : string, optional extracted feature - only supports one at a time - and only features form tsfresh that dont need extra parameters. The default is "maximum". method: string, optional * 'keep': keeps the windows which are extracted, * 'remove': removes the windows which are extracted n_gridpoints : TYPE, optional number of gridpoints. The default is 3. percentage_max : float, optional min percentage of the maximum to keep the window. The default is 0.05. order : int, optional order of polynom The default is 3. Returns ------- df_poly : pandas DataFrame cleaned DataFrame """ if not _HAVE_TSFRESH: raise ImportError("tsfresh and dependencies are not installed. " "Use `pip install pylife[tsfresh]` to install it.") df_prep = _prepare_rolling(df) ts_time = df_prep.copy() # adding a row delta_t = ts_time.index[1]-ts_time.index[0] line = pd.DataFrame(ts_time.iloc[:1], index=[- delta_t]) ts_time = pd.concat([ts_time, line], ignore_index=False) ts_time.index = ts_time.index + delta_t ts_time = ts_time.sort_index() ts_time['time'] = ts_time.index.values comparison_column_ex = comparison_column + '__'+feature df_rolled = _roll_dataset(df_prep, window_size=window_size, overlap=overlap) extracted_features = _extract_feature_df(df_rolled, feature) grid_points = _select_relevant_windows(df_prep, extracted_features, comparison_column_ex, percentage_max, window_size, overlap, method=method) poly_gridpoints = _polyfit_gridpoints(grid_points, ts_time, order=order, verbose=False, n_gridpoints=n_gridpoints) # Remove NaN's at the end - should be maximum 2n cleaned = poly_gridpoints.dropna(axis=0, how='any') cleaned.pop("id") return cleaned