Source code for sensai.util.datastruct

from abc import ABC, abstractmethod
from enum import Enum
from typing import Sequence, Optional, TypeVar, Generic, Tuple, Dict, Any, TYPE_CHECKING

from . import sequences as array_util
from .string import ToStringMixin, dict_string

if TYPE_CHECKING:
    import pandas as pd

TKey = TypeVar("TKey")
TValue = TypeVar("TValue")
TSortedKeyValueStructure = TypeVar("TSortedKeyValueStructure", bound="SortedKeyValueStructure")


[docs]class Trivalent(Enum): TRUE = "true" FALSE = "false" UNKNOWN = "unknown"
[docs] @classmethod def from_bool(cls, b: bool): return cls.TRUE if b else cls.FALSE
[docs] def is_true(self): return self == Trivalent.TRUE
[docs] def is_false(self): return self == Trivalent.FALSE
[docs]class Maybe(Generic[TValue]): def __init__(self, value: Optional[TValue]): self.value = value
[docs]class DeferredParams(ToStringMixin): """ Represents a dictionary of parameters that is specifically designed to hold parameters that can only defined late within a process (i.e. not initially at construction time), e.g. because the parameters are data-dependent and therefore can only be determined once the data has been seen. """ UNDEFINED = "__undefined__DeferredParams" def __init__(self): self.params = {} def _tostring_object_info(self) -> str: return dict_string(self.params)
[docs] def set_param(self, name: str, value: Any): self.params[name] = value
[docs] def get_param(self, name, default=UNDEFINED): """ :param name: the parameter name :param default: in case no value is set, return this value, and if UNDEFINED (default), raise KeyError :return: the parameter value """ if default == self.UNDEFINED: return self.params[name] else: return self.params.get(name, default)
[docs] def get_dict(self) -> Dict[str, Any]: return self.params
[docs]class SortedValues(Generic[TValue]): """ Provides convenient binary search (bisection) operations for sorted sequences """ def __init__(self, sorted_values: Sequence[TValue]): self.values = sorted_values def __len__(self): return len(self.values)
[docs] def floor_index(self, value) -> Optional[int]: """ Finds the rightmost index where the value is less than or equal to the given value :param value: the value to search for :return: the index or None if there is no such index """ return array_util.floor_index(self.values, value)
[docs] def ceil_index(self, value) -> Optional[int]: """ Finds the leftmost index where the value is greater than or equal to the given value :param value: the value to search for :return: the index or None if there is no such index """ return array_util.ceil_index(self.values, value)
[docs] def closest_index(self, value) -> Optional[int]: """ Finds the index of the value that is closest to the given value. If two subsequent values have the same distance, the smaller index is returned. :param value: the value to search for :return: the index or None if this object is empty """ return array_util.closest_index(self.values, value)
def _value(self, idx: Optional[int]) -> Optional[TValue]: if idx is None: return None else: return self.values[idx]
[docs] def floor_value(self, value) -> Optional[TValue]: """ Finds the largest value that is less than or equal to the given value :param value: the value to search for :return: the value or None if there is no such value """ return self._value(self.floor_index(value))
[docs] def ceil_value(self, value) -> Optional[TValue]: """ Finds the smallest value that is greater than or equal to the given value :param value: the value to search for :return: the value or None if there is no such value """ return self._value(self.ceil_index(value))
[docs] def closest_value(self, value) -> Optional[TValue]: """ Finds the value that is closest to the given value. If two subsequent values have the same distance, the smaller value is returned. :param value: the value to search for :return: the value or None if this object is empty """ return self._value(self.closest_index(value))
def _value_slice(self, first_index, last_index): if first_index is None or last_index is None: return None return self.values[first_index:last_index + 1]
[docs] def value_slice(self, lowest_key, highest_key) -> Optional[Sequence[TValue]]: return self._value_slice(self.ceil_index(lowest_key), self.floor_index(highest_key))
[docs]class SortedKeyValueStructure(Generic[TKey, TValue], ABC): @abstractmethod def __len__(self): pass
[docs] @abstractmethod def floor_index(self, key: TKey) -> Optional[int]: """ Finds the rightmost index where the key value is less than or equal to the given value :param key: the value to search for :return: the index or None if there is no such index """ pass
[docs] @abstractmethod def ceil_index(self, key: TKey) -> Optional[int]: """ Finds the leftmost index where the key value is greater than or equal to the given value :param key: the value to search for :return: the index or None if there is no such index """ pass
[docs] @abstractmethod def closest_index(self, key: TKey) -> Optional[int]: """ Finds the index where the key is closest to the given value. If two subsequent keys have the same distance, the smaller index is returned. :param key: the value to search for :return: the index or None if this object is empty. """ pass
[docs] @abstractmethod def floor_value(self, key: TKey) -> Optional[TValue]: """ Returns the value for the largest index where the corresponding key is less than or equal to the given value :param key: the key to search for :return: the value or None if there is no such value """ pass
[docs] @abstractmethod def ceil_value(self, key: TKey) -> Optional[TValue]: """ Returns the value for the smallest index where the corresponding key is greater than or equal to the given value :param key: the key to search for :return: the value or None if there is no such value """ pass
[docs] @abstractmethod def closest_value(self, key: TKey) -> Optional[TValue]: """ Finds the value that is closest to the given value. If two subsequent values have the same distance, the smaller value is returned. :param key: the key to search for :return: the value or None if this object is empty """ pass
[docs] @abstractmethod def floor_key_and_value(self, key: TKey) -> Optional[Tuple[TKey, TValue]]: pass
[docs] @abstractmethod def ceil_key_and_value(self, key: TKey) -> Optional[Tuple[TKey, TValue]]: pass
[docs] @abstractmethod def closest_key_and_value(self, key: TKey) -> Optional[Tuple[TKey, TValue]]: pass
[docs] def interpolated_value(self, key: TKey) -> Optional[TValue]: """ Computes a linearly interpolated value for the given key - based on the two closest key-value pairs found in the data structure. If the key is found in the data structure, the corresponding value is directly returned. NOTE: This operation is supported only for value types that support the required arithmetic operations. :param key: the key for which the interpolated value is to be computed. :return: the interpolated value or None if the data structure does not contain floor/ceil entries for the given key """ fkv = self.floor_key_and_value(key) ckv = self.ceil_key_and_value(key) if fkv is None or ckv is None: return None floor_key, floor_value = fkv ceil_key, ceil_value = ckv if ceil_key == floor_key: return floor_value else: frac = (key - floor_key) / (ceil_key - floor_key) return floor_value + (ceil_value - floor_value) * frac
[docs] def slice(self: TSortedKeyValueStructure, lower_bound_key=None, upper_bound_key=None, inner=True) -> TSortedKeyValueStructure: """ :param lower_bound_key: the key defining the start of the slice (depending on inner); if None, the first included entry will be the very first entry :param upper_bound_key: the key defining the end of the slice (depending on inner); if None, the last included entry will be the very last entry :param inner: if True, the returned slice will be within the bounds; if False, the returned slice is extended by one entry in both directions such that it contains the bounds (where possible) :return: """ if lower_bound_key is not None and upper_bound_key is not None: assert upper_bound_key >= lower_bound_key if lower_bound_key is not None: if inner: from_index = self.ceil_index(lower_bound_key) if from_index is None: from_index = len(self) # shall return empty slice else: from_index = self.floor_index(lower_bound_key) if from_index is None: from_index = 0 else: from_index = 0 if upper_bound_key is not None: if inner: to_index = self.floor_index(upper_bound_key) if to_index is None: to_index = -1 # shall return empty slice else: to_index = self.ceil_index(upper_bound_key) if to_index is None: to_index = len(self) - 1 else: to_index = len(self) - 1 return self._create_slice(from_index, to_index)
@abstractmethod def _create_slice(self: TSortedKeyValueStructure, from_index: int, to_index: int) -> TSortedKeyValueStructure: pass
[docs]class SortedKeysAndValues(Generic[TKey, TValue], SortedKeyValueStructure[TKey, TValue]): def __init__(self, keys: Sequence[TKey], values: Sequence[TValue]): """ :param keys: a sorted sequence of keys :param values: a sequence of corresponding values """ if len(keys) != len(values): raise ValueError(f"Lengths of keys ({len(keys)}) and values ({len(values)}) do not match") self.keys = keys self.values = values def __len__(self): return len(self.keys)
[docs] @classmethod def from_series(cls, s: "pd.Series"): """ Creates an instance from a pandas Series, using the series' index as the keys and its values as the values :param s: the series :return: an instance """ # noinspection PyTypeChecker return cls(s.index, s.values)
[docs] def floor_index(self, key) -> Optional[int]: return array_util.floor_index(self.keys, key)
[docs] def ceil_index(self, key) -> Optional[int]: return array_util.ceil_index(self.keys, key)
[docs] def closest_index(self, key) -> Optional[int]: return array_util.closest_index(self.keys, key)
[docs] def floor_value(self, key) -> Optional[TValue]: return array_util.floor_value(self.keys, key, values=self.values)
[docs] def ceil_value(self, key) -> Optional[TValue]: return array_util.ceil_value(self.keys, key, values=self.values)
[docs] def closest_value(self, key) -> Optional[TValue]: return array_util.closest_value(self.keys, key, values=self.values)
[docs] def floor_key_and_value(self, key) -> Optional[Tuple[TKey, TValue]]: idx = self.floor_index(key) return None if idx is None else (self.keys[idx], self.values[idx])
[docs] def ceil_key_and_value(self, key) -> Optional[Tuple[TKey, TValue]]: idx = self.ceil_index(key) return None if idx is None else (self.keys[idx], self.values[idx])
[docs] def closest_key_and_value(self, key) -> Optional[Tuple[TKey, TValue]]: idx = self.closest_index(key) return None if idx is None else (self.keys[idx], self.values[idx])
[docs] def value_slice_inner(self, lower_bound_key, upper_bound_key): return array_util.value_slice_inner(self.keys, lower_bound_key, upper_bound_key, values=self.values)
[docs] def value_slice_outer(self, lower_bound_key, upper_bound_key, fallback=False): return array_util.value_slice_outer(self.keys, lower_bound_key, upper_bound_key, values=self.values, fallback_bounds=fallback)
def _create_slice(self, from_index: int, to_index: int) -> "SortedKeysAndValues": return SortedKeysAndValues(self.keys[from_index:to_index + 1], self.values[from_index:to_index + 1])
[docs]class SortedKeyValuePairs(Generic[TKey, TValue], SortedKeyValueStructure[TKey, TValue]):
[docs] @classmethod def from_unsorted_key_value_pairs(cls, unsorted_key_value_pairs: Sequence[Tuple[TKey, TValue]]): return cls(sorted(unsorted_key_value_pairs, key=lambda x: x[0]))
def __init__(self, sorted_key_value_pairs: Sequence[Tuple[TKey, TValue]]): self.entries = sorted_key_value_pairs self._sortedKeys = SortedValues([t[0] for t in sorted_key_value_pairs]) def __len__(self): return len(self.entries) def _value(self, idx: Optional[int]) -> Optional[TValue]: if idx is None: return None return self.entries[idx][1]
[docs] def value_for_index(self, idx: int) -> TValue: return self.entries[idx][1]
[docs] def key_for_index(self, idx: int) -> TKey: return self.entries[idx][0]
[docs] def floor_index(self, key) -> Optional[int]: """Finds the rightmost index where the key is less than or equal to the given key""" return self._sortedKeys.floor_index(key)
[docs] def floor_value(self, key) -> Optional[TValue]: return self._value(self.floor_index(key))
[docs] def floor_key_and_value(self, key) -> Optional[Tuple[TKey, TValue]]: idx = self.floor_index(key) return None if idx is None else self.entries[idx]
[docs] def ceil_index(self, key) -> Optional[int]: """Find leftmost index where the key is greater than or equal to the given key""" return self._sortedKeys.ceil_index(key)
[docs] def ceil_value(self, key) -> Optional[TValue]: return self._value(self.ceil_index(key))
[docs] def ceil_key_and_value(self, key) -> Optional[Tuple[TKey, TValue]]: idx = self.ceil_index(key) return None if idx is None else self.entries[idx]
[docs] def closest_index(self, key) -> Optional[int]: return self._sortedKeys.closest_index(key)
[docs] def closest_value(self, key) -> Optional[TValue]: return self._value(self.closest_index(key))
[docs] def closest_key_and_value(self, key) -> Optional[Tuple[TKey, TValue]]: idx = self.closest_index(key) return None if idx is None else self.entries[idx]
def _value_slice(self, first_index, last_index): if first_index is None or last_index is None: return None return [e[1] for e in self.entries[first_index:last_index + 1]]
[docs] def value_slice(self, lowest_key, highest_key) -> Optional[Sequence[TValue]]: return self._value_slice(self.ceil_index(lowest_key), self.floor_index(highest_key))
def _create_slice(self, from_index: int, to_index: int) -> "SortedKeyValuePairs": return SortedKeyValuePairs(self.entries[from_index:to_index + 1])