# Copyright (c) 2019-2023 - for information on the respective copyright owner
# see the NOTICE file and/or the repository
# https://github.com/boschresearch/pylife
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = "Johannes Mueller"
__maintainer__ = __author__
from abc import ABCMeta, abstractmethod
import warnings
import numpy as np
import pandas as pd
[docs]
def find_turns(samples):
"""Find the turning points in a sample chunk.
Parameters
----------
samples : 1D numpy.ndarray
the sample chunk
Returns
-------
index : 1D numpy.ndarray
the indices where sample has a turning point
turns : 1D numpy.ndarray
the values of the turning points
Notes
-----
In case of plateaus i.e. multiple directly neighbored samples with exactly
the same values, building a turning point together, the first sample of the
plateau is indexed.
Warnings
--------
Any ``NaN`` values are dropped from the input signal before processing it
and will thus also not appear in the turns. In those cases a warning is
issued. The reason for this is, that if the ``NaN`` appears next to an
actual turning point the turning point is no longer detected which will
lead to an underestimation of the damage sum later in the damage
calculation. Generally you should not have ``NaN`` values in your signal.
If you do, it would be a good idea to clean them out before the rainflow
detection.
"""
def clean_nans(samples):
nans = pd.isna(samples)
if any(nans):
warnings.warn(UserWarning("At least one NaN like value has been dropped from the input signal."))
return samples[~nans], nans
return samples, None
def correct_turns_by_nans(index, nans):
if nans is None:
return
nan_positions = np.where(nans)[0]
for nan_pos in nan_positions:
index[index >= nan_pos] += 1
def plateau_turns(diffs):
plateau_turns = np.zeros_like(diffs, dtype=np.bool_)[1:]
duplicates = np.array(diffs == 0, dtype=np.int8)
if duplicates.any():
edges = np.diff(duplicates)
dups_starts = np.where(edges > 0)[0]
dups_ends = np.where(edges < 0)[0]
if len(dups_starts) and len(dups_ends):
cut_ends = dups_ends[0] < dups_starts[0]
cut_starts = dups_starts[-1] > dups_ends[-1]
if cut_ends:
dups_ends = dups_ends[1:]
if cut_starts:
dups_starts = dups_starts[:-1]
plateau_turns[dups_starts[np.where(diffs[dups_starts] * diffs[dups_ends+1] < 0)]] = True
return plateau_turns
samples, nans = clean_nans(samples)
diffs = np.diff(samples)
# find indices where /\ or \/
peak_turns = diffs[:-1] * diffs[1:] < 0.0
index = np.where(np.logical_or(peak_turns, plateau_turns(diffs)))[0] + 1
turns_values = samples[index]
correct_turns_by_nans(index, nans)
return index, turns_values
class AbstractDetector(metaclass=ABCMeta):
"""The common base class for rainflow detectors.
Subclasses implementing a specific rainflow counting algorithm are supposed
to implement a method ``process()`` that takes the signal samples as a
parameter, and reports all the hysteresis loop limits to ``self._recorder``
using its ``record_values()`` method of.
Some detectors also report the index of the loop limiting samples to the
recorder using its ``record_index()`` method. Those detectors should also
report the size of each processed sample chunk to the recorder using
``report_chunk()``.
The ``process()`` method is supposed return ``self`` and to be implemented
in a way, that the result is independent of the sample chunksize, so
``dtor.process(signal)`` should be equivalent to
``dtor.process(signal[:n]).process(signal[n:])`` for any 0 < n < signal
length.
Should usually only be instantiated by a sublacsse's ``__init__()`` using
``super().__init__()``.
"""
def __init__(self, recorder):
"""Instantiate an AbstractDetector.
Parameters
----------
recorder : subclass of :class:`.AbstractRecorder`
The recorder that the detector will report to.
"""
self._sample_tail = np.array([])
self._recorder = recorder
self._head_index = 0
self._residual_index = np.array([0], dtype=np.int64)
self._residuals = np.array([])
self._is_flushing_enabled = False
@property
def residuals(self):
"""The residual turning points of the time signal so far.
The residuals are the loops not (yet) closed.
"""
return self._residuals
@property
def residual_index(self):
"""The index of the residual turning points of the time signal so far."""
return np.append(self._residual_index, self._head_index - 1)
@property
def recorder(self):
"""The recorder instance the detector is reporting to."""
return self._recorder
@abstractmethod
def process(self, samples, flush=False):
"""Process a sample chunk.
Parameters
----------
samples : array_like, shape (N, )
The samples to be processed
flush : bool
Whether to flush the cached values at the end.
If ``flush=False``, the last value of a load sequence is
cached for a subsequent call to ``process``, because it may or may
not be a turning point of the sequence, which is only decided
when the next data point arrives.
Setting ``flush=True`` forces processing of the last value.
When ``process`` is called again afterwards with new data,
two increasing or decreasing values in a row might have been
processed, as opposed to only turning points of the sequence.
Example:
a)
process([1, 2], flush=False) # processes 1
process([3, 1], flush=True) # processes 3, 1
-> processed sequence is [1,3,1], only turning points
b)
process([1, 2], flush=True) # processes 1, 2
process([3, 1], flush=True) # processes 3, 1
-> processed sequence is [1,2,3,1], "," is not a turning point
c)
process([1, 2]) # processes 1
process([3, 1]) # processes 3
-> processed sequence is [1,3], end ("1") is missing
d)
process([1, 2]) # processes 1
process([3, 1]) # processes 3
flush() # process 1
-> processed sequence is [1,3,1]
Returns
-------
self : instance of the subclass
The ``self`` object so that processing can be chained
Notes
-----
Must be implemented by subclasses.
See also
--------
:func:`flush()`
"""
return self
def flush(self, samples=[]):
"""
Flush all remaining cached values from previous calls of ``process``.
Process all the given values until the end, leaving no cached values.
If ``process`` is called instead of ``flush``, the last value of a
load sequence is cached for a subsequent call to ``process``,
because it may or may not be a turning point of the sequence.
Using ``flush`` forces processing of the last value. This may not be
the desired effect as multiple increasing or decreasing values in a
row could occur, instead of processing only turning points.
Examples:
a)
process([1, 2]) # processes 1
flush([3, 1]) # processes 3, 1
-> processed sequence is [1,3,1]: only turning points
b)
flush([1, 2]) # processes 1, 2
flush([3, 1]) # processes 3, 1
-> processed sequence is [1,2,3,1]: the "2" is not a turning point
c)
process([1, 2]) # processes 1
process([3, 1]) # processes 3
-> processed sequence is [1,3]: the last value is missing
d)
process([1, 2]) # processes 1
process([3, 1]) # processes 3
flush() # process 1
-> processed sequence is [1,3,1]: last value processed as a result
of ``flush``
Parameters
----------
samples : array_like, shape (N, )
The samples to be processed
Returns
-------
self : AbstractDetector
The ``self`` object so that processing can be chained
Notes
-----
This method is equivalent to ``process(samples, flush=True)``.
"""
return self.process(samples, flush=True)
def _new_turns(self, samples, flush=False, preserve_start=False):
"""Provide new turning points for the next chunk.
This method can handle samples as both 1-D arrays and multi-dimensional
DataFrames.
Parameters
----------
samples : 1-D array of float or pandas DataFrame
The samples of the chunk to be processed
flush : bool
Whether to flush the values at the end, i.e., not keep a tail.
preserve_start : bool
If the beginning of the sequence should be preserved. If this is
False, only turning points are extracted, for example:
_new_turns([1, 2, 1]) # -> 2
_new_turns([0, 1]) # -> 1
If ``preserve_start`` is True, the first point is also added, even
though it is not a turn point:
_new_turns([1, 2, 1], preserve_start=True) # -> 1, 2
_new_turns([0, 1], preserve_start=True) # -> 0, 1
This option has no effect if there are samples left over
from a previous call with flush=False.
Returns
-------
turn_index : 1-D array of int
The global index of the turning points of the chunk to be processed
turn_values : 1-D array of float
The values of the turning points
Notes
-----
This method can be called by the ``process()`` implementation of
subclasses. The sample tail i.e. the samples after the last turning
point of the chunk are stored and prepended to the samples of the next
call.
"""
if isinstance(samples, pd.Series):
samples = samples.to_numpy()
assert isinstance(samples, np.ndarray) or isinstance(samples, pd.DataFrame)
# if we have samples left from the last call, disable option preserve_start
if len(self._sample_tail) > 0:
preserve_start = False
_is_multiple_assessment_points = False
if isinstance(samples, pd.DataFrame):
_is_multiple_assessment_points = True
# convert to list
samples = [df.reset_index(drop=True) for _,df in samples.groupby("load_step")]
sample_len = len(samples)
# prepend samples from previous call
if isinstance(samples, np.ndarray):
samples = np.concatenate((self._sample_tail, samples))
else:
if len(samples) == 0:
samples = list(self._sample_tail)
elif len(self._sample_tail) == 0:
pass
else:
samples = list(self._sample_tail) + samples
# get indices and values of new turns in the current samples
if _is_multiple_assessment_points:
turn_index, turn_values = self._find_turns_multiple_assessment_points(samples)
else:
turn_index, turn_values = find_turns(samples)
# if turns were found
if turn_index.size > 0:
old_sample_tail_length = len(self._sample_tail)
# store new tail of samples that were not considered in this call
self._sample_tail = samples[turn_index[-1]:]
turn_index += self._head_index - old_sample_tail_length
else:
# if no turns were found, store all samples as new tail
self._sample_tail = samples
self._head_index += sample_len
# handle flush parameter
if flush and len(self._sample_tail) > 0:
turn_index = np.concatenate((turn_index, [self._head_index-1]))
if isinstance(turn_values, np.ndarray):
turn_values = np.concatenate((turn_values, [self._sample_tail[-1]]))
else:
turn_values.append(self._sample_tail[-1])
self._sample_tail = [self._sample_tail[-1]]
# handle preserve_start parameter
if preserve_start:
if turn_index.size > 0:
if turn_index[0] > 0:
# prepend first sample to results
turn_index = np.insert(turn_index, 0, 0)
if isinstance(turn_values, np.ndarray):
turn_values = np.insert(turn_values, 0, samples[0])
else:
turn_values.insert(0, samples[0])
return turn_index, turn_values
def _find_turns_multiple_assessment_points(self, samples):
# extract the representative samples for the first node
samples_of_first_node = np.array([sample.iloc[0].values for sample in samples]).flatten()
turn_index, _ = find_turns(samples_of_first_node)
# the selected samples are a list of DataFrames. Each DataFrame contains the values for all nodes
selected_samples = [samples[index] for index in turn_index]
return turn_index, selected_samples
def _new_turns_multiple_assessment_points(self, samples, flush=False, preserve_start=False):
"""Provide new turning points for the next chunk. This function
is used when the assessment considers multiple points at once.
The function is called from `_new_turns`.
Parameters
----------
samples : pandas DataFrame
The samples of the chunk to be processed, has to be a DataFrame
with a MultiIndex of "load_step" and "node_id".
flush : bool
Whether to flush the values at the end, i.e., not keep a tail.
preserve_start : bool
If the beginning of the sequence should be preserved. If this is
False, only turning points are extracted, for example:
_new_turns([1, 2, 1]) # -> 2
_new_turns([0, 1]) # -> 1
If ``preserve_start`` is True, the first point is also added, even
though it is not a turn point:
_new_turns([1, 2, 1], preserve_start=True) # -> 1, 2
_new_turns([0, 1], preserve_start=True) # -> 0, 1
Returns
-------
turn_index : 1-D array of int
The global index of the turning points of the chunk to be processed
turn_values : list of pandas DataFrame's
The values of the turning points as data frames.
"""
assert isinstance(samples[0], pd.DataFrame)
assert samples[0].index.names == ["load_step", "node_id"]
# extract the representative samples for the first node
first_node_id = samples.index.get_level_values("node_id")[0]
samples_of_first_node = samples[samples.index.get_level_values("node_id") == first_node_id].to_numpy().flatten()
previous_head_index = self._head_index
turn_index, _ = self._new_turns(samples_of_first_node, flush, preserve_start)
# the selected samples are a list of DataFrames. Each DataFrame contains the values for all nodes
selected_samples = [samples[samples.index.get_level_values("load_step") == index-previous_head_index].reset_index(drop=True) \
for index in turn_index]
return turn_index, selected_samples
[docs]
class AbstractRecorder:
"""A common base class for rainflow recorders.
Subclasses implementing a rainflow recorder are supposed to implement the
following methods:
* ``record_values()``
* ``record_index()``
"""
[docs]
def __init__(self):
"""Instantiate an AbstractRecorder."""
self._chunks = np.array([], dtype=np.int64)
@property
def chunks(self):
"""The limits index of the chunks processed so far.
Note
----
The first chunk limit is the length of the first chunk, so identical to
the index to the first sample of the second chunk, if a second chunk
exists.
"""
return self._chunks
[docs]
def report_chunk(self, chunk_size):
"""Report a chunk.
Parameters
----------
chunk_size : int
The length of the chunk previously processed by the detector.
Note
----
Should be called by the detector after the end of ``process()``.
"""
self._chunks = np.append(self._chunks, chunk_size)
[docs]
def chunk_local_index(self, global_index):
"""Transform the global index to an index valid in a certain chunk.
Parameters
----------
global_index : array-like int
The global index to be transformed.
Returns
-------
chunk_number : array of ints
The number of the chunk the indexed sample is in.
chunk_local_index : array of ints
The index of the sample in its chunk.
"""
chunk_index = np.insert(np.cumsum(self._chunks), 0, 0)
chunk_num = np.searchsorted(chunk_index, global_index, side='right') - 1
return chunk_num, global_index - chunk_index[chunk_num]
[docs]
def record_values(self, values_from, values_to): # pragma: no cover
"""Report hysteresis loop values to the recorder.
Parameters
----------
values_from : list of floats
The sample values where the hysteresis loop starts from.
values_to : list of floats
The sample values where the hysteresis loop goes to and turns back from.
Note
----
Default implementation does nothing. Can be implemented by recorders
interested in the hysteresis loop values.
"""
pass
[docs]
def record_index(self, indeces_from, indeces_to): # pragma: no cover
"""Record hysteresis loop index to the recorder.
Parameters
----------
indeces_from : list of ints
The sample indeces where the hysteresis loop starts from.
indeces_to : list of ints
The sample indeces where the hysteresis loop goes to and turns back from.
Note
----
Default implementation does nothing. Can be implemented by recorders
interested in the hysteresis loop values.
"""
pass