The FourPointDetector class
- class pylife.stress.rainflow.FourPointDetector(recorder)[source]
Implements four point rainflow counting algorithm.
from pylife.stress.timesignal import TimeSignalGenerator import pylife.stress.rainflow as RF ts = TimeSignalGenerator(10, { 'number': 50, 'amplitude_median': 1.0, 'amplitude_std_dev': 0.5, 'frequency_median': 4, 'frequency_std_dev': 3, 'offset_median': 0, 'offset_std_dev': 0.4}, None, None).query(10000) rfc = RF.FourPointDetector(recorder=RF.LoopValueRecorder()) rfc.process(ts) rfc.recorder.collective
from to 0 1.348875 -4.240997 1 -1.800674 -1.690273 2 -6.398929 0.084558 3 -9.177755 -9.124595 4 -5.524818 -10.209376 ... ... ... 1122 -3.564793 1.172274 1123 -3.567126 4.109150 1124 -2.205799 -5.133649 1125 -5.700325 4.453438 1126 -12.557734 7.664959 1127 rows × 2 columns
Alternatively you can ask the recorder for a histogram matrix:
rfc.recorder.histogram(bins=16)
from to (-19.164768926042104, -17.013130544708794] (-20.835938807097328, -18.483409037037532] 0.0 (-18.483409037037532, -16.130879266977733] 0.0 (-16.130879266977733, -13.778349496917935] 0.0 (-13.778349496917935, -11.425819726858137] 0.0 (-11.425819726858137, -9.07328995679834] 0.0 ... (13.109806793957542, 15.261445175290852] (5.041888663560449, 7.394418433620245] 0.0 (7.394418433620245, 9.746948203680041] 0.0 (9.746948203680041, 12.099477973739845] 0.0 (12.099477973739845, 14.45200774379964] 0.0 (14.45200774379964, 16.804537513859433] 0.0 Length: 256, dtype: float64We take four turning points into account to detect closed hysteresis loops.
Consider four consecutive peak/valley points say, A, B, C, and D If B and C are contained within A and B, then a cycle is counted from B to C; otherwise no cycle is counted.
i.e, If
X ≥ Y AND Z ≥ Ythen a cycle existFROM = BandTO = Cwhere, rangesX = |D–C|,Y = |C–B|, andZ = |B–A|Load ----------------------------- | x B F x --------/-\-----------------/----- | / \ x D / ------/-----\-/-\---------/------- | / C x \ / --\-/-------------\-----/--------- | x A \ / --------------------\-/----------- | x E ---------------------------------- | Time
So, if a cycle exsist from B to C then delete these peaks from the turns array and perform next iteration by joining A&D else if no cylce exsists, then B would be the next strarting point.
- __init__(recorder)[source]
Instantiate a FourPointDetector.
- Parameters:
recorder (subclass of
AbstractRecorder) – The recorder that the detector will report to.
- process(samples, flush=False)[source]
Process a sample chunk.
- Parameters:
samples (array_like, shape (N, )) – The samples to be processed
flush (bool) –
Whether to flush the cached values at the end.
If
flush=False, the last value of a load sequence is cached for a subsequent call toprocess, because it may or may not be a turning point of the sequence, which is only decided when the next data point arrives.Setting
flush=Trueforces processing of the last value. Whenprocessis called again afterwards with new data, two increasing or decreasing values in a row might have been processed, as opposed to only turning points of the sequence.
Examples
>>> from pylife.stress.rainflow.recorders import FullRecorder
>>> detector = FourPointDetector(recorder=FullRecorder()) >>> ( ... detector ... .process([1, 2], flush=False) # flush=False → 2 not a turning point ... .process([3, 1]) ... .recorder.collective ... ) Empty DataFrame Columns: [from, to, index_from, index_to] Index: []
>>> detector = FourPointDetector(recorder=FullRecorder()) >>> ( ... detector ... .process([1, 2], flush=True) # flush=True → 2 is considered a turning point ... .process([3, 1]) ... .recorder.collective ... ) from to index_from index_to 0 2.0 3.0 1 2
- Returns:
self – The
selfobject so that processing can be chained- Return type: