Source code for pylife.stress.rainflow.fkm_nonlinear

# Copyright (c) 2019-2023 - for information on the respective copyright owner
# see the NOTICE file and/or the repository
# https://github.com/boschresearch/pylife
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__author__ = "Benjamin Maier"
__maintainer__ = __author__

import numpy as np
import pandas as pd

import pylife.stress.rainflow.general as RFG
import pylife.materiallaws.notch_approximation_law as NAL

INDEX = 0
LOAD_TYPE = 1

IS_CLOSED = 0
FROM = 1
TO = 2
CLOSE = 3

LOAD = 0
STRESS = 1
STRAIN = 2
EPS_MIN_LF = 3
EPS_MAX_LF = 4
STRESS_AND_STRAIN = slice(STRESS, STRAIN+1)

PRIMARY = 0
SECONDARY = 1

MEMORY_1_2 = 1
MEMORY_3 = 0

HISTORY_COLUMNS = ["load", "stress", "strain", "secondary_branch"]
HISTORY_INDEX_LEVELS = [
    "load_segment", "load_step", "run_index", "turning_point", "hyst_from", "hyst_to", "hyst_close"
]


class _ResidualsRecord:

    def __init__(self):
        self._index = []
        self._values = []

    def append(self, idx, val):
        self._index.append(idx)
        self._values.append(val)

    def pop(self):
        return self._index.pop(), self._values.pop()

    @property
    def index(self):
        return np.array(self._index, dtype=np.int64)

    @property
    def current_index(self):
        return self._index[-1]

    def reindex(self):
        self._index = list(range(-len(self._values), 0))

    def will_remain_open_by(self, load):
        current_load_extent = np.abs(load - self._values[-1])
        previous_load_extent = np.abs(self._values[-1] - self._values[-2])
        return current_load_extent < previous_load_extent

    def __len__(self):
        return len(self._index)


[docs] class FKMNonlinearDetector(RFG.AbstractDetector): """HCM-Algorithm detector as described in FKM nonlinear. """
[docs] def __init__(self, recorder, notch_approximation_law, binner=NAL.NotchApproxBinner): super().__init__(recorder) if binner is not None: self._binner = binner(notch_approximation_law) self._notch_approximation_law = self._binner else: self._binner = None self._notch_approximation_law = notch_approximation_law if notch_approximation_law is not None: self._ramberg_osgood_relation = self._notch_approximation_law.ramberg_osgood_relation # state of the hcm algorithm self._iz = 0 # indicator how many open hystereses there are self._ir = 1 # indicator how many of the open hystereses start at the primary branch (and, thus, cannot be closed) self._load_max_seen = 0.0 # maximum seen load value self._run_index = 0 # which run through the load sequence is currently performed self._last_deformation_record = None self._residuals_record = _ResidualsRecord() self._residuals = np.array([]) self._record_vals_residuals = pd.DataFrame() self._history_record = [] self._num_turning_points = 0
[docs] def process_hcm_first(self, samples): """Perform the HCM algorithm for the first time. This processes the given samples accordingly, only considering "turning points", neglecting consecutive duplicate values, making sure that the beginning starts with 0. Parameters ---------- samples : list of floats or list of pd.DataFrame`s The samples to be processed by the HCM algorithm. """ assert len(samples) >= 2 samples, flush = self._adjust_samples_and_flush_for_hcm_first_run(samples) return self.process(samples, flush=flush)
[docs] def process_hcm_second(self, samples): """Perform the HCM algorithm for the second time, after it has been executed with ``process_hcm_first``. This processes the given samples accordingly, only considering "turning points", neglecting consecutive duplicate values, making sure that the beginning of the sequence is properly fitted to the samples of the first run, such that no samples are lost and no non-turning points are introducted between the two runs. Parameters ---------- samples : list of floats or list of pd.DataFrame`s The samples to be processed by the HCM algorithm. """ assert len(samples) >= 2 return self.process(samples, flush=True)
[docs] def process(self, samples, flush=False): """Process a sample chunk. This method implements the actual HCM algorithm. Parameters ---------- samples : array_like, shape (N, ) The samples to be processed flush : bool Whether to flush the cached values at the end. For explanations see :meth:`~pylife.stress.rainflow.FourPointDetector.process` Returns ------- self : FKMNonlinearDetector The ``self`` object so that processing can be chained """ # collected values, which will be passed to the recorder at the end of `process()` assert not isinstance(samples, pd.DataFrame) self._run_index += 1 load_turning_points = self._determine_load_turning_points(samples, flush) self._current_load_index = load_turning_points.index li = load_turning_points.index.to_frame()['load_step'] turning_point_idx = pd.Index((li != li.shift()).cumsum() - 1, name="turning_point") load_turning_points_rep = np.asarray( load_turning_points.groupby(turning_point_idx, sort=False).first() ) deform_type_record, hysts = self._perform_hcm_algorithm(load_turning_points_rep) if self._last_deformation_record is None: self._last_deformation_record = np.zeros((5, self._group_size)) num_turning_points = len(load_turning_points_rep) record_vals = self._process_deformation(load_turning_points, num_turning_points, deform_type_record) self._store_recordings_for_history(deform_type_record, record_vals, turning_point_idx, hysts) results = self._process_hysteresis(record_vals, hysts) results_min, results_max = results self._update_residuals(record_vals, turning_point_idx, load_turning_points_rep) self._num_turning_points += num_turning_points # TODO: check if these are really that redundant is_closed_hysteresis = (hysts[:, 0] != MEMORY_3).tolist() is_zero_mean_stress_and_strain = (hysts[:, 0] == MEMORY_3).tolist() self._recorder.record_values_fkm_nonlinear( results_min, results_max, is_closed_hysteresis=is_closed_hysteresis, is_zero_mean_stress_and_strain=is_zero_mean_stress_and_strain, run_index=self._run_index ) return self
def _determine_load_turning_points(self, samples, flush): old_head_index = self._head_index have_multi_index = isinstance(samples, pd.Series) and len(samples.index.names) > 1 if have_multi_index: rep_samples = samples.groupby('load_step', sort=False).first().to_numpy() else: rep_samples = np.asarray(samples) if self._binner is not None: if have_multi_index: load_max_idx = samples.groupby("load_step").first().abs().idxmax() load_max = samples.xs(load_max_idx, level="load_step").abs() else: load_max = np.abs(samples).max() self._binner.initialize(load_max) loads_indices, load_turning_points = self._new_turns(rep_samples, flush) self._group_size = len(samples) // len(rep_samples) if have_multi_index: load_steps = samples.index.get_level_values('load_step').unique() if len(loads_indices) > 0: turns_idx = loads_indices - old_head_index idx = load_steps[turns_idx] load_turning_points = samples.loc[idx] if turns_idx[0] < 0: load_turning_points.iloc[:self._group_size] = self._last_sample else: load_turning_points = pd.Series( [], index=pd.MultiIndex.from_tuples([], names=samples.index.names) ) idx = load_steps[-1] self._last_sample = samples.loc[idx] if isinstance(load_turning_points, pd.Series): return load_turning_points return pd.Series( load_turning_points, index=pd.Index(loads_indices, name="load_step") ) def _store_recordings_for_history(self, record, record_vals, turning_point, hysts): record_repr = ( record_vals.reset_index(["load_step", "turning_point"]) .groupby(turning_point) .first() .drop(["epsilon_min_LF", "epsilon_max_LF"], axis=1) ) record_repr["run_index"] = self._run_index record_repr["secondary_branch"] = record[:, SECONDARY] != 0 rec_hysts = hysts.copy() rec_hysts[:, 1:] += self._num_turning_points self._history_record.append((record_repr, rec_hysts)) def _update_residuals(self, record_vals, turning_point, load_turning_points_rep): residuals_index = self._residuals_record.index old_residuals_index = residuals_index[residuals_index < 0] new_residuals_index = residuals_index[residuals_index >= 0] + self._num_turning_points remaining_vals_residuals = self._record_vals_residuals.loc[ self._record_vals_residuals.index[old_residuals_index] ] new_vals_residuals = record_vals.loc[ record_vals.index.isin(new_residuals_index, level="turning_point") ] self._record_vals_residuals = pd.concat([remaining_vals_residuals, new_vals_residuals]) self._record_vals_residuals.index.names = record_vals.index.names self._residuals = ( load_turning_points_rep[residuals_index] if len(residuals_index) else np.array([]) ) self._residuals_record.reindex() def _process_deformation(self, load_turning_points, num_turning_points, deform_type_record): """Calculate the local stress and strain of all turning points In ._perform_hcm_algorithm we recorded which turning point is in PRIMARY deformation regime and which in SECONDARY Now we use this information to calculate the local stress and strain according to the notch approximation law for each turining point for each point in the mesh. Parameters ---------- load_turning_points : pd.Series (N * self._group_size) float The load distribution of all the turning points. It carries all th index levels of the initial load signal. num_turning_points : int The number of tunring_points deform_type_record : np.ndarray(N, 2) int The inndex and deformation type of each turning point (see _perform_hcm_algorithm) Returns pd.DataFrame columns: ["load", "stress", "strain", "epsilon_min_LF", "epsilon_max_LF"] index: the same like load_turning_points """ def primary(_prev, load): return self._notch_approximation_law.primary(load) def secondary(prev, load): prev_load = prev[LOAD] delta_L = load - prev_load delta_stress_strain = self._notch_approximation_law.secondary(delta_L) return prev[STRESS_AND_STRAIN].T + delta_stress_strain def prev_record_from_residuals(prev_idx): idx = len(self._record_vals_residuals) + prev_idx*self._group_size return self._record_vals_residuals.iloc[idx:idx+self._group_size].to_numpy().T def prev_record_from_this_run(prev_idx): idx = prev_idx * self._group_size return record_vals[:, idx:idx+self._group_size] def determine_prev_record(prev_idx): if prev_idx < 0: return prev_record_from_residuals(prev_idx) if prev_idx == curr_idx: return self._last_deformation_record return prev_record_from_this_run(prev_idx) record_vals = np.empty((5, num_turning_points*self._group_size)) turning_points = load_turning_points.to_numpy() for curr_idx in range(num_turning_points): prev_record = determine_prev_record(deform_type_record[curr_idx, INDEX]) idx = curr_idx * self._group_size load = turning_points[idx:idx+self._group_size] deformation_function = secondary if deform_type_record[curr_idx, SECONDARY] else primary result_buf = record_vals[:, idx:idx+self._group_size] result_buf[LOAD] = load result_buf[STRESS_AND_STRAIN] = deformation_function(prev_record, load).T self._calculate_epsilon_LF(result_buf) self._last_deformation_record = result_buf record_vals = pd.DataFrame( record_vals.T, columns=["load", "stress", "strain", "epsilon_min_LF", "epsilon_max_LF"], index=load_turning_points.index, ) new_sum_tp = self._num_turning_points + num_turning_points tp_index = [np.arange(self._num_turning_points, new_sum_tp)] * self._group_size record_vals["turning_point"] = np.stack(tp_index).T.flatten() return record_vals.set_index("turning_point", drop=True, append=True) def _calculate_epsilon_LF(self, deformation_record): """Calculate epsilon_LF values for the current deformation record Parameters ---------- deformation_record : np.ndarray (5) [:3] load, stress, strain [3:] reserved for epsilon_min_LF and epsilon_max_LF """ old_load = self._last_deformation_record[LOAD, 0] current_load = deformation_record[LOAD, 0] if old_load < current_load: deformation_record[EPS_MAX_LF] = ( self._last_deformation_record[EPS_MAX_LF] if self._last_deformation_record[EPS_MAX_LF, 0] > deformation_record[STRAIN, 0] else deformation_record[STRAIN, :] ) deformation_record[EPS_MIN_LF] = self._last_deformation_record[EPS_MIN_LF] else: deformation_record[EPS_MIN_LF] = ( self._last_deformation_record[EPS_MIN_LF] if self._last_deformation_record[EPS_MIN_LF, 0] < deformation_record[STRAIN, 0] else deformation_record[STRAIN, :] ) deformation_record[EPS_MAX_LF] = self._last_deformation_record[EPS_MAX_LF] def _process_hysteresis(self, record_vals, hysts): """Calcuclate all the recorded hysteresis values For each hysteresis we calculate the two records consisting of load, stress, strain, epsilon_min_LF, epsilon_max_LF Parameters ---------- record_vals: pd.DataFrame colimns: load, stress, strain, epsilon_min_LF, epsilon_max_LF index of load_turning_points hysts: np.ndarray(N, 2) the recorded hysteresis information (see ._perform_hcm_algorithm) Returns ------- result_min, result_max: pd.DataFrame """ def turn_memory_1_2(values, index): if values[0][0, 0] < values[1][0, 0]: return (values[0], values[1], index[0], index[1]) return (values[1], values[0], index[1], index[0]) def turn_memory_3(values, index): abs_point = np.abs(values[0]) point_min = -abs_point point_max = abs_point point_min[:, EPS_MIN_LF:] = values[1][:, EPS_MIN_LF:] point_max[:, EPS_MIN_LF:] = values[1][:, EPS_MIN_LF:] return (point_min, point_max, index[0], index[0]) memory_functions = [turn_memory_3, turn_memory_1_2] start = len(self._residuals) record_vals_with_residuals = pd.concat([self._record_vals_residuals, record_vals]) value_array = record_vals_with_residuals.to_numpy() index_array = np.asarray( record_vals_with_residuals.index.droplevel("turning_point").to_frame() ) signal_index_names = self._current_load_index.names signal_index_num = len(signal_index_names) result_len = len(hysts) * self._group_size results_min = np.zeros((result_len, 4)) results_min_idx = np.zeros((result_len, signal_index_num), dtype=np.int64) results_max = np.zeros((result_len, 4)) results_max_idx = np.zeros((result_len, signal_index_num), dtype=np.int64) for i, hyst in enumerate(hysts): idx = (hyst[FROM:CLOSE] + start) * self._group_size hyst_type = hyst[IS_CLOSED] beg0, beg1 = idx[0], idx[1] vbeg1 = beg1 - self._group_size if hyst_type == MEMORY_3 else beg1 end0, end1 = beg0 + self._group_size, beg1 + self._group_size vend1 = vbeg1 + self._group_size values = value_array[beg0:end0], value_array[vbeg1:vend1] index = index_array[beg0:end0], index_array[beg1:end1] min_val, max_val, min_idx, max_idx = memory_functions[hyst_type](values, index) beg = i * self._group_size end = beg + self._group_size results_min[beg:end, :3] = min_val[:, :3] results_max[beg:end, :3] = max_val[:, :3] results_min_idx[beg:end] = min_idx results_max_idx[beg:end] = max_idx results_min[beg:end, -1] = min_val[:, EPS_MIN_LF] results_max[beg:end, -1] = max_val[:, EPS_MAX_LF] results_min = pd.DataFrame( results_min, columns=["loads_min", "S_min", "epsilon_min", "epsilon_min_LF"], index=pd.MultiIndex.from_arrays(results_min_idx.T, names=signal_index_names) ) results_max = pd.DataFrame( results_max, columns=["loads_max", "S_max", "epsilon_max", "epsilon_max_LF"], index=pd.MultiIndex.from_arrays(results_max_idx.T, names=signal_index_names) ) return results_min, results_max def _adjust_samples_and_flush_for_hcm_first_run(self, samples): is_multi_index = isinstance(samples, pd.Series) and len(samples.index.names) > 1 if not is_multi_index: samples = np.concatenate([[0], np.asarray(samples)]) else: assessment_levels = [name for name in samples.index.names if name != "load_step"] assessment_idx = samples.groupby(assessment_levels).first().index # create a new sample with 0 load for all nodes multi_index = pd.MultiIndex.from_product([[0], assessment_idx], names=samples.index.names) first_sample = pd.Series(0, index=multi_index) # increase the load_step index value by one for all samples samples_without_index = samples.reset_index() samples_without_index.load_step += 1 samples = samples_without_index.set_index(samples.index.names).iloc[:, 0] # prepend the new zero load sample samples = pd.concat([first_sample, samples], axis=0) # determine first, second and last samples scalar_samples = samples if is_multi_index: # convert to list scalar_samples = samples.groupby("load_step", sort=False).first() scalar_samples_twice = np.concatenate([scalar_samples, scalar_samples]) turn_indices, _ = RFG.find_turns(scalar_samples_twice) flush = len(scalar_samples)-1 in turn_indices return samples, flush def _perform_hcm_algorithm(self, load_turning_points): """Perform the entire HCM algorithm for all load samples Parameters ---------- load_turning_points : np.ndarray The representative tunring points of the load signal Returns ------- deform_type_record : np.ndarray (N,2) integer first column: index in the turning point array second column: indicate whether the deformation is in PRIMARY or SECONDARY regime hysts : np.ndarray (N,4) integer first column: Type of hysteresis memort (MEMORY_1_2 or MEMORY_3) second column: index in turning point array of the hysteresis origin third column: index in turning point array of the hysteresis front fourth column: index in turning point array of the hysteresis closing point (-1 if hysteresis not closed) """ hysts = np.zeros((len(load_turning_points), 4), dtype=np.int64) hyst_ptr = 0 deform_type_record = -np.ones((len(load_turning_points), 2), dtype=np.int64) deform_ptr = 0 for index, current_load in enumerate(load_turning_points): hyst_ptr = self._hcm_process_sample( current_load, index, hysts, hyst_ptr, deform_type_record[deform_ptr, :] ) if np.abs(current_load) > self._load_max_seen: self._load_max_seen = np.abs(current_load) self._iz += 1 self._residuals_record.append(deform_ptr, current_load) deform_ptr += 1 hysts = hysts[:hyst_ptr, :] return deform_type_record, hysts def _hcm_process_sample( self, current_load, current_idx, hysts, hyst_ptr, deform_type_record, ): """Process one sample in the HCM algorithm, i.e., one load value Parameters ---------- current_load: float The current representative load current_index: int The index of the current load in the turning point list hysts: np.ndarray (N,4) integer The hysteresis record (see ._perform_hcm_algorithm) hyst_ptr: int The pointer to the next hysteresis record deform_type_record : np.ndarray (N,2) integer The deformation type record (see ._perform_hcm_algorithm) """ record_idx = current_idx while True: if self._iz == self._ir: if np.abs(current_load) > self._load_max_seen: # case a) i, "Memory 3" deform_type_record[:] = [record_idx, PRIMARY] residuals_idx = self._residuals_record.current_index hysts[hyst_ptr, :] = [MEMORY_3, residuals_idx, current_idx, -1] hyst_ptr += 1 self._ir += 1 else: deform_type_record[:] = [record_idx, SECONDARY] break if self._iz < self._ir: deform_type_record[:] = [record_idx, PRIMARY] break # here we have iz > ir: if self._residuals_record.will_remain_open_by(current_load): deform_type_record[:] = [record_idx, SECONDARY] break # no -> we have a new hysteresis prev_idx_1, prev_load_1 = self._residuals_record.pop() prev_idx_0, prev_load_0 = self._residuals_record.pop() if len(self._residuals_record): record_idx = self._residuals_record.current_index self._iz -= 2 # if the points of the hysteresis lie fully inside the seen range of loads, i.e., # the turn points are smaller than the maximum turn point so far # (this happens usually in the second run of the HCM algorithm) if np.abs(prev_load_0) < self._load_max_seen and np.abs(prev_load_1) < self._load_max_seen: # case "Memory 2", "c) ii B" # the primary branch is not yet reached, continue processing residual loads, potentially # closing even more hysteresis hysts[hyst_ptr, :] = [MEMORY_1_2, prev_idx_0, prev_idx_1, current_idx] hyst_ptr += 1 continue # case "Memory 1", "c) ii A" # The last hysteresis saw load values equal to the previous maximum. # (Higher values cannot happen here.) # The load curve follows again the primary path. # No further hystereses can be closed, continue with the next load value. # Previously, iz was decreased by 2 and will be increased by 1 at the end of this loop , # effectively `iz = iz - 1` as described on p.70 # Proceed on primary path for the rest, which was not part of the closed hysteresis deform_type_record[:] = [record_idx, PRIMARY] hysts[hyst_ptr, :] = [MEMORY_1_2, prev_idx_0, prev_idx_1, current_idx] hyst_ptr += 1 return hyst_ptr @property def strain_values(self): """ Get the strain values of the turning points in the stress-strain diagram. They are needed in the FKM nonlinear roughness & surface layer algorithm, which adds residual stresses in another pass of the HCM algorithm. Returns ------- list of float The strain values of the turning points that are visited during the HCM algorithm. """ return self.history().query("load_step >= 0").strain.to_numpy() @property def strain_values_first_run(self): """ Get the strain values of the turning points in the stress-strain diagram, for the first run of the HCM algorithm. They are needed in the FKM nonlinear roughness & surface layer algorithm, which adds residual stresses in another pass of the HCM algorithm. Returns ------- list of float The strain values of the turning points that are visited during the first run of the HCM algorithm. """ return self.history().query("load_step >= 0 and run_index == 1").strain.to_numpy() @property def strain_values_second_run(self): """ Get the strain values of the turning points in the stress-strain diagram, for the second and any further run of the HCM algorithm. They are needed in the FKM nonlinear roughness & surface layer algorithm, which adds residual stresses in another pass of the HCM algorithm. Returns ------- list of float The strain values of the turning points that are visited during the second run of the HCM algorithm. """ return self.history().query("load_step >= 0 and run_index == 2").strain.to_numpy()
[docs] def history(self): """Compile the history of noteworthy points. Returns ------- history : pd.DataFrame The history containing of ``load``, ``stress``, ``strain`` and ``secondary_branch``. The ``secondary_branch`` column is ``bool`` and indicates if the point is on secondary load branch. The index consists of the following levels: * ``load_segment``: the number of the point * ``load_step``: the index of the point in the actual samples * ``run_index``: the index of the run (usually 1 or 2) * ``turning_point``: the number of the turning point (-1 if it is not a turning point) * ``hyst_from``: the number of the hysteresis starting at the point (-1 if there isn't one) * ``hyst_to``: the number of the hysteresis opened at the point (-1 if there isn't one) * ``hyst_close``: the number hof the hysteresis closed at the point (-1 if there isn't one) Notes ----- The history contains all the turning points with two other kinds of points injected: * The primary hysteresis opening (Memory 3 of the guidline) * The closing points of a hysteresis Note that the ``load_step`` index of the injected points is always `-1`, so you can't use it to determine the index of a hysteresis closing in the original signal. """ history = pd.concat([rr for rr, _ in self._history_record]).reset_index( drop=True ) history["load_segment"] = np.arange(1, len(history) + 1) hysts = np.concatenate([hs for _, hs in self._history_record]) hyst_index = np.concatenate( [[np.arange(len(hysts))], hysts[:, FROM:CLOSE].T, [hysts[:, IS_CLOSED].T]] ).T hyst_from_marker = pd.Series(-1, index=history.index) hyst_to_marker = pd.Series(-1, index=history.index) if len(hysts): hyst_from_marker.iloc[hyst_index[:, FROM]] = hyst_index[:, IS_CLOSED] hyst_to_marker.iloc[hyst_index[:, TO]] = hyst_index[:, IS_CLOSED] history["hyst_from"] = hyst_from_marker history["hyst_to"] = hyst_to_marker history["hyst_close"] = pd.Series(-1, index=history.index) to_insert = [] negate = [] turning_point_drop_idx = [] hyst_close_index = [] for hyst_index, hyst in enumerate(hysts): if hyst[IS_CLOSED] == MEMORY_1_2: hyst_close = int(hyst[CLOSE]) + len(to_insert) hyst_from = int(hyst[FROM]) turning_point_drop_idx.append(hyst_close) hyst_close_index.append([hyst_close, hyst_index]) to_insert.append((hyst_close, hyst_from)) else: hyst_from = int(hyst[FROM]) hyst_to = int(hyst[TO]) + len(to_insert) negate.append(hyst_to) turning_point_drop_idx.append(hyst_to) to_insert.append((hyst_to, hyst_from)) hyst_close_index = np.array(hyst_close_index, dtype=np.int64) negate = np.array(negate, dtype=np.int64) index = list(np.arange(len(history))) for target, idx in to_insert: index.insert(target, int(idx)) history = history.iloc[index].reset_index(drop=True) history.loc[ turning_point_drop_idx, ["turning_point", "load_step", "hyst_from", "hyst_to"], ] = -1 history.loc[turning_point_drop_idx, "secondary_branch"] = True history.loc[negate, HISTORY_COLUMNS] = -history.loc[ negate, HISTORY_COLUMNS ] history.loc[negate, "hyst_to"] = history.loc[negate + 1, "hyst_to"].to_numpy() history.loc[negate + 1, "hyst_to"] = -1 history.loc[negate, "secondary_branch"] = True if len(hyst_close_index): history.loc[hyst_close_index[:, 0], "hyst_close"] = hyst_close_index[:, 1] history["load_segment"] = np.arange(len(history), dtype=np.int64) history.set_index(HISTORY_INDEX_LEVELS, inplace=True) return history
[docs] def interpolated_stress_strain_data( self, *, load_segment=None, hysteresis_index=None, n_points_per_branch=100 ): """Caclulate interpolated stress and strain data. Parameters ---------- load_segment : int, Optional The number of the load segment for which the stress strain data is to be interpolated. hysteresis_index : int, Optional The number of the hysteresis for which the stress strain data is to be interpolated. n_points_per_branch : int, Optional The number of points to be interpolated to of each load segment Returns ------- stress_strain_data : pd.DataFrame The resulting ``DataFrame`` will contain the following columns: * ``stress``, ``strain`` – the stress strain data * ``secondary_branch``– a ``bool`` column indicating if the point is on a secondary load branch * ``hyst_index`` – the number of the hysteresis the load segment is part of (-1 if there isn't one) * ``load_segment`` the number of the load segment * ``run_index`` the number of the run """ history = self.history() if hysteresis_index is not None: hyst_to = history.query(f"hyst_to == {hysteresis_index}") if hysteresis_index in history.index.get_level_values("hyst_close"): hyst_close = history.query(f"hyst_close == {hysteresis_index}") load_segment_close = hyst_close.index.get_level_values("load_segment")[0] else: load_segment_close = None load_segment_to = hyst_to.index.get_level_values("load_segment")[0] segments = [ self._interpolate_deformation(load_segment_to, n_points_per_branch) ] if load_segment_close is not None: segments.append( self._interpolate_deformation( load_segment_close, n_points_per_branch ) ) result = pd.concat(segments).reset_index(drop=True) result["hyst_index"] = hysteresis_index return result if load_segment is not None: return self._interpolate_deformation(load_segment, n_points_per_branch) return ( pd.concat( [ self._interpolate_deformation( row.load_segment, n_points_per_branch ) for _, row in history.reset_index().iterrows() ] ) .reset_index(drop=True) )
def _interpolate_deformation(self, load_segment, n_points): history = self.history() idx = history.index.get_level_values("load_segment").get_loc(load_segment) to_value = history.iloc[idx] run_index = history.index.get_level_values("run_index")[idx] hyst_open_idx = history.index.get_level_values("hyst_to")[idx] hyst_close_idx = history.index.get_level_values("hyst_close")[idx] hyst_index = hyst_open_idx if hyst_open_idx >= 0 else hyst_close_idx if idx == 0: from_value = pd.Series({"stress": 0.0}) elif hyst_close_idx >= 0: from_value = history.query(f"hyst_to == {hyst_close_idx}").iloc[0] elif hyst_open_idx >= 0: from_value = history.query(f"hyst_from == {hyst_open_idx}").iloc[0] else: from_value = history.iloc[idx-1] stress = np.linspace(from_value.stress, to_value.stress, n_points) if to_value.secondary_branch: delta_stress = from_value.stress - stress strain = from_value.strain - self._ramberg_osgood_relation.delta_strain(delta_stress) else: strain = self._ramberg_osgood_relation.strain(stress) return pd.DataFrame( { "stress": stress, "strain": strain, "secondary_branch": to_value.secondary_branch, "hyst_index": hyst_index, "load_segment": load_segment, "run_index": run_index, } )