Source code for pylife.core.broadcaster

# Copyright (c) 2019-2023 - for information on the respective copyright owner
# see the NOTICE file and/or the repository
# https://github.com/boschresearch/pylife
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__author__ = "Johannes Mueller"
__maintainer__ = __author__


import numpy as np
import pandas as pd


[docs] class Broadcaster: """The Broadcaster to align pyLife signals to operands. Parameters ---------- pandas_obj : :class:`pandas.Series` or :class:`pandas.DataFrame` the object of the ``Broadcaster`` In most cases the ``Broadcaster`` class is not used directly. The functionality is in most cases used by the derived class :class:`~pylife.PylifeSignal`. The purpose of the ``Broadcaster`` is to take two numerical objects and return two objects of the same numerical data with an aligned index. That means that mathematical operations using the two objects as operands can be implemented using numpy's broadcasting functionality. See method :meth:`~pylife.Broadcaster.broadcast` documentation for details. The broadcasting is done in the following ways: :: object parameter returned object returned parameter Series Scalar Series Scalar |------|-----| |------|-----| | idx | | | idx | | |------|-----| 5.0 -> |------|-----| 5.0 | foo | 1.0 | | foo | 1.0 | | bar | 2.0 | | bar | 2.0 | |------|-----| |------|-----| DataFrame Scalar DataFrame Series |------|-----|-----| |------|-----|-----| |------|-----| | idx | foo | bar | | idx | foo | bar | | idx | | |------|-----|-----| |------|-----|-----| |------|-----| | 0 | 1.0 | 2.0 | 5.0 -> | 0 | 1.0 | 2.0 | | 0 | 5.0 | | 1 | 1.0 | 2.0 | | 1 | 1.0 | 2.0 | | 1 | 5.0 | | ... | ... | ... | | ... | ... | ... | | ... | ... | |------|-----|-----| |------|-----|-----| |------|-----| Series Series/DataFrame DataFrame Series/DataFrame |------|-----| |------|-----| |------|-----|-----| |------|-----| | None | | | idx | | | idx | foo | bar | | idx | | |------|-----| |------|-----| -> |------|-----|-----| |------|-----| | foo | 1.0 | | 0 | 5.0 | | 0 | 1.0 | 2.0 | | 0 | 5.0 | | bar | 2.0 | | 1 | 6.0 | | 1 | 1.0 | 2.0 | | 1 | 6.0 | |------|-----| | ... | ... | | ... | ... | ... | | ... | ... | |------|-----| |------|-----|-----| |------|-----| Series/DataFrame Series/DataFrame Series/DataFrame Series/DataFrame |------|-----| |------|-----| |------|-----| |------|-----| | xidx | | | xidx | | | xidx | | | xidx | | |------|-----| |------|-----| -> |------|-----| |------|-----| | foo | 1.0 | | tau | 5.0 | | foo | 1.0 | | foo | nan | | bar | 2.0 | | bar | 6.0 | | bar | 2.0 | | bar | 6.0 | |------|-----| |------|-----| | tau | nan | | tau | 5.0 | |------|-----| |------|-----| Series/DataFrame Series/DataFrame Series/DataFrame Series/DataFrame |------|-----| |------|-----| |------|------|-----| |------|------|-----| | xidx | | | yidx | | | xidx | yidx | | | xidx | yidx | | |------|-----| |------|-----| -> |------|------|-----| |------|------|-----| | foo | 1.0 | | tau | 5.0 | | foo | tau | 1.0 | | foo | tau | 5.0 | | bar | 2.0 | | chi | 6.0 | | | chi | 1.0 | | | chi | 6.0 | |------|-----| |------|-----| | bar | tau | 2.0 | | bar | tau | 5.0 | | | chi | 2.0 | | | chi | 6.0 | |------|------|-----| |------|------|-----| """ def __init__(self, pandas_obj): self._obj = pandas_obj
[docs] def broadcast(self, parameter, droplevel=None): """Broadcast the parameter to the object of ``self``. Parameters ---------- parameters : scalar, numpy array or pandas object The parameter to broadcast to Returns ------- parameter, object : index aligned numerical objects Examples -------- The behavior of the Broadcaster is best illustrated by examples: .. jupyter-execute:: :hide-code: import pandas as pd from pylife import Broadcaster * Broadcasting :class:`pandas.Series` to a scalar results in a scalar and a :class:`pandas.Series`. .. jupyter-execute:: obj = pd.Series([1.0, 2.0], index=pd.Index(['foo', 'bar'], name='idx')) obj .. jupyter-execute:: parameter, obj = Broadcaster(obj).broadcast(5.0) parameter .. jupyter-execute:: obj * Broadcasting :class:`pandas.DataFrame` to a scalar results in a :class:`pandas.DataFrame` and a :class:`pandas.Series`. .. jupyter-execute:: obj = pd.DataFrame({ 'foo': [1.0, 2.0], 'bar': [3.0, 4.0] }, index=pd.Index([1, 2], name='idx')) obj .. jupyter-execute:: parameter, obj = Broadcaster(obj).broadcast(5.0) parameter .. jupyter-execute:: obj * Broadcasting :class:`pandas.DataFrame` to a a :class:`pandas.Series` results in a :class:`pandas.DataFrame` and a :class:`pandas.Series`, **if and only if** the index name of the object is ``None``. .. jupyter-execute:: obj = pd.Series([1.0, 2.0], index=pd.Index(['tau', 'chi'])) obj .. jupyter-execute:: parameter = pd.Series([3.0, 4.0], index=pd.Index(['foo', 'bar'], name='idx')) parameter .. jupyter-execute:: parameter, obj = Broadcaster(obj).broadcast(parameter) parameter .. jupyter-execute:: obj """ droplevel = droplevel or [] if not isinstance(parameter, pd.Series) and not isinstance(parameter, pd.DataFrame): if isinstance(self._obj, pd.Series): return self._broadcast_series(parameter) return self._broadcast_frame(parameter) if self._obj.index.names == [None] and isinstance(self._obj, pd.Series): df = pd.DataFrame(index=parameter.index, columns=self._obj.index) for c in self._obj.index: df[c] = self._obj[c] return parameter, df return self._broadcast_frame_to_frame(parameter, droplevel)
def _broadcast_series(self, parameter): prm = np.asarray(parameter) if prm.shape == (): return prm, self._obj df = self._broadcasted_dataframe(parameter) if isinstance(parameter, pd.Series): return parameter, df.set_index(parameter.index, inplace=True) return pd.Series(prm), df def _broadcast_series_to_frame(self, parameter): return parameter, self._broadcasted_dataframe(parameter).set_index(parameter.index) def _broadcast_frame_to_frame(self, parameter, droplevel): def align_and_reorder(): if isinstance(self._obj, pd.DataFrame) and isinstance(parameter, pd.Series): obj, prm = self._obj.align(pd.DataFrame({0: parameter}), axis=0) prm = prm.iloc[:, 0] prm.name = parameter.name else: obj, prm = self._obj.align(parameter, axis=0) if len(droplevel) > 0: prm_columns = list(filter(lambda level: level not in droplevel, total_columns)) prm = prm.groupby(prm_columns).first() else: prm_columns = total_columns if obj.index.nlevels > 2: prm = prm.reorder_levels(prm_columns) obj = obj.reorder_levels(total_columns) return prm, obj def cross_join_and_align_obj_and_parameter(): prm_index = parameter.index.to_frame().reset_index(drop=True) obj_index = self._obj.index.to_frame()[obj_index_names] new_index = (obj_index .join(prm_index, how='cross') .set_index(total_columns) .reorder_levels(total_columns).index) obj = _broadcast_to(self._obj, new_index) prm = _broadcast_to(parameter, new_index) obj, prm = obj.align(prm, axis=0) if len(droplevel) > 0: prm_columns = list(filter(lambda level: level not in droplevel, total_columns)) prm = prm.groupby(prm_columns).first() return obj, prm uuids = _replace_none_index_names_with_unique_string([parameter, self._obj]) index_level_cache = _IndexLevelCache(self._obj, parameter) prm_index_names = list(parameter.index.names) obj_index_names = list(self._obj.index.names) total_columns = obj_index_names + [lv for lv in prm_index_names if lv not in obj_index_names] have_commons = len(total_columns) < len(prm_index_names) + len(obj_index_names) if have_commons: prm, obj = align_and_reorder() else: obj, prm = cross_join_and_align_obj_and_parameter() obj.index = index_level_cache.restore_real_index(obj.index) prm.index = index_level_cache.restore_real_index(prm.index) index_level_cache.restore_original_indeces() _replace_unique_string_with_none_name([obj, prm, self._obj, parameter], uuids) return prm, obj def _broadcast_frame(self, parameter): try: parameter = np.broadcast_to(parameter, len(self._obj)) except ValueError: raise ValueError("Dimension mismatch. " "Cannot map %d value array-like to a %d element DataFrame signal." %(len(parameter), len(self._obj))) return pd.Series(parameter, index=self._obj.index), self._obj def _broadcasted_dataframe(self, parameter): data = np.empty((len(parameter), len(self._obj))) df = pd.DataFrame(data, columns=self._obj.index).assign(**self._obj) return df
def _broadcast_to(obj, new_index): if isinstance(obj, pd.DataFrame): new = obj else: new = pd.DataFrame(obj) new = pd.DataFrame(index=new_index).join(new, how='left') if isinstance(new_index, pd.MultiIndex): new = new.reorder_levels(new_index.names) if isinstance(obj, pd.Series): new = new.iloc[:, 0] new.name = obj.name return new def _replace_none_index_names_with_unique_string(objs): import uuid def make_uuid(): this_uuid = uuid.uuid4().hex uuids.append(this_uuid) return this_uuid uuids = [] for obj in objs: obj.index.names = [name if name is not None else make_uuid() for name in obj.index.names] return uuids def _replace_unique_string_with_none_name(objs, uuids): for obj in objs: obj.index.names = [None if name in uuids else name for name in obj.index.names] class _IndexLevelCache: def __init__(self, obj, operand): self._obj_index = obj.index self._operand_index = operand.index self._obj = obj self._operand = operand common = set(obj.index.names).intersection(operand.index.names) only_obj = set(obj.index.names).difference(common) only_operand = set(operand.index.names).difference(common) self.index_levels = {} for name in common: obj_level = obj.index.get_level_values(name) operand_level = operand.index.get_level_values(name) self.index_levels[name] = obj_level.append(operand_level).unique() for name in only_obj: self.index_levels[name] = obj.index.get_level_values(name).unique() for name in only_operand: self.index_levels[name] = operand.index.get_level_values(name).unique() self.new_index_obj = self._make_new_index(obj.index) self.new_index_operand = self._make_new_index(operand.index) obj.index = self.new_index_obj operand.index = self.new_index_operand def restore_original_indeces(self): self._obj.index = self._obj_index self._operand.index =self._operand_index def restore_real_index(self, new_index): if len(new_index.names) > 1: real_index = pd.MultiIndex.from_arrays( [ self.index_levels[name][new_index.get_level_values(name)] for name in new_index.names ], names=new_index.names ) else: real_index = pd.Index(self.index_levels[new_index.name][new_index], name=new_index.name) return real_index def _make_new_index(self, index): if len(index.names) == 1: return pd.Index(self.index_levels[index.name].get_indexer_for(index), name=index.name) return pd.MultiIndex.from_arrays( [ self.index_levels[name].get_indexer_for(index.get_level_values(name)) for name in index.names ], names=index.names )