The FourPointDetector class
- class pylife.stress.rainflow.FourPointDetector(recorder)[source]
Implements four point rainflow counting algorithm.
from pylife.stress.timesignal import TimeSignalGenerator import pylife.stress.rainflow as RF ts = TimeSignalGenerator(10, { 'number': 50, 'amplitude_median': 1.0, 'amplitude_std_dev': 0.5, 'frequency_median': 4, 'frequency_std_dev': 3, 'offset_median': 0, 'offset_std_dev': 0.4}, None, None).query(10000) rfc = RF.FourPointDetector(recorder=RF.LoopValueRecorder()) rfc.process(ts) rfc.recorder.collective
from to 0 -0.174861 -2.862351 1 0.007652 -8.895089 2 -1.907509 -1.851569 3 2.140251 -4.295466 4 3.859106 -7.682095 ... ... ... 1135 1.880263 -9.494366 1136 -5.638555 -4.235082 1137 -6.416119 0.191803 1138 1.950045 -2.085017 1139 6.902497 -6.556734 1140 rows × 2 columns
Alternatively you can ask the recorder for a histogram matrix:
rfc.recorder.histogram(bins=16)
from to (-19.52219965442052, -17.518485244841592] (-18.50157185199033, -16.399351859876543] 0.0 (-16.399351859876543, -14.297131867762754] 0.0 (-14.297131867762754, -12.194911875648966] 0.0 (-12.194911875648966, -10.092691883535178] 0.0 (-10.092691883535178, -7.9904718914213895] 0.0 ... (10.533516489263377, 12.537230898842301] (4.62284806126134, 6.725068053375129] 0.0 (6.725068053375129, 8.827288045488917] 0.0 (8.827288045488917, 10.929508037602705] 0.0 (10.929508037602705, 13.031728029716493] 0.0 (13.031728029716493, 15.133948021830284] 0.0 Length: 256, dtype: float64We take four turning points into account to detect closed hysteresis loops.
Consider four consecutive peak/valley points say, A, B, C, and D If B and C are contained within A and B, then a cycle is counted from B to C; otherwise no cycle is counted.
i.e, If
X ≥ Y AND Z ≥ Ythen a cycle existFROM = BandTO = Cwhere, rangesX = |D–C|,Y = |C–B|, andZ = |B–A|Load ----------------------------- | x B F x --------/-\-----------------/----- | / \ x D / ------/-----\-/-\---------/------- | / C x \ / --\-/-------------\-----/--------- | x A \ / --------------------\-/----------- | x E ---------------------------------- | Time
So, if a cycle exsist from B to C then delete these peaks from the turns array and perform next iteration by joining A&D else if no cylce exsists, then B would be the next strarting point.
- __init__(recorder)[source]
Instantiate a FourPointDetector.
- Parameters:
recorder (subclass of
AbstractRecorder) – The recorder that the detector will report to.
- process(samples, flush=False)[source]
Process a sample chunk.
- Parameters:
samples (array_like, shape (N, )) – The samples to be processed
flush (bool) –
Whether to flush the cached values at the end.
If
flush=False, the last value of a load sequence is cached for a subsequent call toprocess, because it may or may not be a turning point of the sequence, which is only decided when the next data point arrives.Setting
flush=Trueforces processing of the last value. Whenprocessis called again afterwards with new data, two increasing or decreasing values in a row might have been processed, as opposed to only turning points of the sequence.
Examples
>>> from pylife.stress.rainflow.recorders import FullRecorder
>>> detector = FourPointDetector(recorder=FullRecorder()) >>> ( ... detector ... .process([1, 2], flush=False) # flush=False → 2 not a turning point ... .process([3, 1]) ... .recorder.collective ... ) Empty DataFrame Columns: [from, to, index_from, index_to] Index: []
>>> detector = FourPointDetector(recorder=FullRecorder()) >>> ( ... detector ... .process([1, 2], flush=True) # flush=True → 2 is considered a turning point ... .process([3, 1]) ... .recorder.collective ... ) from to index_from index_to 0 2.0 3.0 1 2
- Returns:
self – The
selfobject so that processing can be chained- Return type: