# Copyright (c) 2019-2023 - for information on the respective copyright owner
# see the NOTICE file and/or the repository
# https://github.com/boschresearch/pylife
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = "Vishnu Pradeep"
__maintainer__ = "Johannes Mueller"
import numpy as np
from .general import AbstractDetector
[docs]
class FourPointDetector(AbstractDetector):
r"""Implements four point rainflow counting algorithm.
.. jupyter-execute::
from pylife.stress.timesignal import TimeSignalGenerator
import pylife.stress.rainflow as RF
ts = TimeSignalGenerator(10, {
'number': 50,
'amplitude_median': 1.0, 'amplitude_std_dev': 0.5,
'frequency_median': 4, 'frequency_std_dev': 3,
'offset_median': 0, 'offset_std_dev': 0.4}, None, None).query(10000)
rfc = RF.FourPointDetector(recorder=RF.LoopValueRecorder())
rfc.process(ts)
rfc.recorder.collective
Alternatively you can ask the recorder for a histogram matrix:
.. jupyter-execute::
rfc.recorder.histogram(bins=16)
We take four turning points into account to detect closed hysteresis loops.
Consider four consecutive peak/valley points say, A, B, C, and D If B and C are
contained within A and B, then a cycle is counted from B to C; otherwise no cycle is
counted.
i.e, If ``X ≥ Y AND Z ≥ Y`` then a cycle exist ``FROM = B`` and ``TO = C``
where, ranges ``X = |D–C|``, ``Y = |C–B|``, and ``Z = |B–A|``
::
Load -----------------------------
| x B F x
--------/-\-----------------/-----
| / \ x D /
------/-----\-/-\---------/-------
| / C x \ /
--\-/-------------\-----/---------
| x A \ /
--------------------\-/-----------
| x E
----------------------------------
| Time
So, if a cycle exsist from B to C then delete these peaks from the turns array
and perform next iteration by joining A&D else if no cylce exsists, then B would
be the next strarting point.
"""
[docs]
def __init__(self, recorder):
"""Instantiate a FourPointDetector.
Parameters
----------
recorder : subclass of :class:`.AbstractRecorder`
The recorder that the detector will report to.
"""
super().__init__(recorder)
[docs]
def process(self, samples, flush=False):
"""Process a sample chunk.
Parameters
----------
samples : array_like, shape (N, )
The samples to be processed
flush : bool
Whether to flush the cached values at the end.
If ``flush=False``, the last value of a load sequence is
cached for a subsequent call to ``process``, because it may or may
not be a turning point of the sequence, which is only decided
when the next data point arrives.
Setting ``flush=True`` forces processing of the last value.
When ``process`` is called again afterwards with new data,
two increasing or decreasing values in a row might have been
processed, as opposed to only turning points of the sequence.
Example:
a)
process([1, 2], flush=False) # processes 1
process([3, 1], flush=True) # processes 3, 1
-> processed sequence is [1,3,1], only turning points
b)
process([1, 2], flush=True) # processes 1, 2
process([3, 1], flush=True) # processes 3, 1
-> processed sequence is [1,2,3,1], "," is not a turning point
c)
process([1, 2]) # processes 1
process([3, 1]) # processes 3
-> processed sequence is [1,3], end ("1") is missing
d)
process([1, 2]) # processes 1
process([3, 1]) # processes 3
flush() # process 1
-> processed sequence is [1,3,1]
Returns
-------
self : FourPointDetector
The ``self`` object so that processing can be chained
"""
residuals = samples[:1] if self._residuals.size == 0 else self._residuals[:-1]
turns_index, turns_values = self._new_turns(samples, flush)
turns_np = np.concatenate((residuals, turns_values, samples[-1:]))
turns_index = np.concatenate((self._residual_index, turns_index))
turns = turns_np
from_vals = []
to_vals = []
from_index = []
to_index = []
residual_index = [0, 1]
i = 2
while i < len(turns):
if len(residual_index) < 3:
residual_index.append(i)
i += 1
continue
a = turns_np[residual_index[-3]]
b = turns_np[residual_index[-2]]
c = turns_np[residual_index[-1]]
d = turns_np[i]
ab = np.abs(a - b)
bc = np.abs(b - c)
cd = np.abs(c - d)
if bc <= ab and bc <= cd:
from_vals.append(b)
to_vals.append(c)
idx_2 = turns_index[residual_index.pop()]
idx_1 = turns_index[residual_index.pop()]
from_index.append(idx_1)
to_index.append(idx_2)
continue
residual_index.append(i)
i += 1
self._recorder.record_values(from_vals, to_vals)
self._recorder.record_index(from_index, to_index)
self._residuals = turns_np[residual_index]
self._residual_index = turns_index[residual_index[:-1]]
self._recorder.report_chunk(len(samples))
return self