The FourPointDetector class

class pylife.stress.rainflow.FourPointDetector(recorder)[source]

Implements four point rainflow counting algorithm.

from pylife.stress.timesignal import TimeSignalGenerator
import pylife.stress.rainflow as RF

ts = TimeSignalGenerator(10, {
    'number': 50,
    'amplitude_median': 1.0, 'amplitude_std_dev': 0.5,
    'frequency_median': 4, 'frequency_std_dev': 3,
    'offset_median': 0, 'offset_std_dev': 0.4}, None, None).query(10000)

rfc = RF.FourPointDetector(recorder=RF.LoopValueRecorder())
rfc.process(ts)

rfc.recorder.collective
from to
0 1.348875 -4.240997
1 -1.800674 -1.690273
2 -6.398929 0.084558
3 -9.177755 -9.124595
4 -5.524818 -10.209376
... ... ...
1122 -3.564793 1.172274
1123 -3.567126 4.109150
1124 -2.205799 -5.133649
1125 -5.700325 4.453438
1126 -12.557734 7.664959

1127 rows × 2 columns

Alternatively you can ask the recorder for a histogram matrix:

rfc.recorder.histogram(bins=16)
from                                        to                                        
(-19.164768926042104, -17.013130544708794]  (-20.835938807097328, -18.483409037037532]    0.0
                                            (-18.483409037037532, -16.130879266977733]    0.0
                                            (-16.130879266977733, -13.778349496917935]    0.0
                                            (-13.778349496917935, -11.425819726858137]    0.0
                                            (-11.425819726858137, -9.07328995679834]      0.0
                                                                                         ... 
(13.109806793957542, 15.261445175290852]    (5.041888663560449, 7.394418433620245]        0.0
                                            (7.394418433620245, 9.746948203680041]        0.0
                                            (9.746948203680041, 12.099477973739845]       0.0
                                            (12.099477973739845, 14.45200774379964]       0.0
                                            (14.45200774379964, 16.804537513859433]       0.0
Length: 256, dtype: float64

We take four turning points into account to detect closed hysteresis loops.

Consider four consecutive peak/valley points say, A, B, C, and D If B and C are contained within A and B, then a cycle is counted from B to C; otherwise no cycle is counted.

i.e, If X Y AND Z Y then a cycle exist FROM = B and TO = C where, ranges X = |D–C|, Y = |C–B|, and Z = |B–A|

Load -----------------------------
|        x B               F x
--------/-\-----------------/-----
|      /   \   x D         /
------/-----\-/-\---------/-------
|    /     C x   \       /
--\-/-------------\-----/---------
|  x A             \   /
--------------------\-/-----------
|                    x E
----------------------------------
|              Time

So, if a cycle exsist from B to C then delete these peaks from the turns array and perform next iteration by joining A&D else if no cylce exsists, then B would be the next strarting point.

__init__(recorder)[source]

Instantiate a FourPointDetector.

Parameters:

recorder (subclass of AbstractRecorder) – The recorder that the detector will report to.

process(samples, flush=False)[source]

Process a sample chunk.

Parameters:
  • samples (array_like, shape (N, )) – The samples to be processed

  • flush (bool) –

    Whether to flush the cached values at the end.

    If flush=False, the last value of a load sequence is cached for a subsequent call to process, because it may or may not be a turning point of the sequence, which is only decided when the next data point arrives.

    Setting flush=True forces processing of the last value. When process is called again afterwards with new data, two increasing or decreasing values in a row might have been processed, as opposed to only turning points of the sequence.

Examples

>>> from pylife.stress.rainflow.recorders import FullRecorder
>>> detector = FourPointDetector(recorder=FullRecorder())
>>> (
...     detector
...     .process([1, 2], flush=False) # flush=False → 2 not a turning point
...     .process([3, 1])
...     .recorder.collective
... )
Empty DataFrame
Columns: [from, to, index_from, index_to]
Index: []
>>> detector = FourPointDetector(recorder=FullRecorder())
>>> (
...     detector
...     .process([1, 2], flush=True) # flush=True → 2 is considered a turning point
...     .process([3, 1])
...     .recorder.collective
... )
    from   to  index_from  index_to
0   2.0  3.0           1         2
Returns:

self – The self object so that processing can be chained

Return type:

FourPointDetector