Coverage for src/sensai/data_transformation/dft.py: 61%
569 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
1import copy
2import logging
3import re
4from abc import ABC, abstractmethod
5from typing import List, Sequence, Union, Dict, Callable, Any, Optional, Set
7import numpy as np
8import pandas as pd
9import sklearn
10from sklearn.preprocessing import OneHotEncoder
12from .sklearn_transformer import SkLearnTransformerProtocol
13from ..util import flatten_arguments, count_not_none
14from ..util.pandas import DataFrameColumnChangeTracker
15from ..util.pickle import setstate
16from ..util.string import or_regex_group, ToStringMixin
18from typing import TYPE_CHECKING
20from ..util.version import Version
22if TYPE_CHECKING:
23 from ..featuregen import FeatureGenerator
24 from ..columngen import ColumnGenerator
27log = logging.getLogger(__name__)
30class DataFrameTransformer(ABC, ToStringMixin):
31 """
32 Base class for data frame transformers, i.e. objects which can transform one data frame into another
33 (possibly applying the transformation to the original data frame - in-place transformation).
34 A data frame transformer may require being fitted using training data.
35 """
36 def __init__(self):
37 self._name = f"{self.__class__.__name__}-{id(self)}"
38 self._isFitted = False
39 self._columnChangeTracker: Optional[DataFrameColumnChangeTracker] = None
40 self._paramInfo = {} # arguments passed to init that are not saved otherwise can be persisted here
42 # for backwards compatibility with persisted DFTs based on code prior to commit 7088cbbe
43 # They lack the __isFitted attribute and we assume that each such DFT was fitted
44 def __setstate__(self, d):
45 d["_name"] = d.get("_name", f"{self.__class__.__name__}-{id(self)}")
46 d["_isFitted"] = d.get("_isFitted", True)
47 d["_columnChangeTracker"] = d.get("_columnChangeTracker", None)
48 d["_paramInfo"] = d.get("_paramInfo", {})
49 self.__dict__ = d
51 def _tostring_exclude_private(self) -> bool:
52 return True
54 def get_name(self) -> str:
55 """
56 :return: the name of this dft transformer, which may be a default name if the name has not been set.
57 """
58 return self._name
60 def set_name(self, name: str):
61 self._name = name
63 def with_name(self, name: str):
64 self.set_name(name)
65 return self
67 @abstractmethod
68 def _fit(self, df: pd.DataFrame):
69 pass
71 @abstractmethod
72 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
73 pass
75 def apply(self, df: pd.DataFrame) -> pd.DataFrame:
76 self._columnChangeTracker = DataFrameColumnChangeTracker(df)
77 if not self.is_fitted():
78 raise Exception(f"Cannot apply a DataFrameTransformer which is not fitted: "
79 f"the df transformer {self.get_name()} requires fitting")
80 df = self._apply(df)
81 self._columnChangeTracker.track_change(df)
82 return df
84 def info(self):
85 return {
86 "name": self.get_name(),
87 "changeInColumnNames": self._columnChangeTracker.column_change_string() if self._columnChangeTracker is not None else None,
88 "isFitted": self.is_fitted(),
89 }
91 def fit(self, df: pd.DataFrame):
92 self._fit(df)
93 self._isFitted = True
95 def is_fitted(self):
96 return self._isFitted
98 def fit_apply(self, df: pd.DataFrame) -> pd.DataFrame:
99 self.fit(df)
100 return self.apply(df)
102 def to_feature_generator(self, categorical_feature_names: Optional[Union[Sequence[str], str]] = None,
103 normalisation_rules: Sequence['DFTNormalisation.Rule'] = (),
104 normalisation_rule_template: 'DFTNormalisation.RuleTemplate' = None,
105 add_categorical_default_rules=True):
106 # need to import here to prevent circular imports
107 from ..featuregen import FeatureGeneratorFromDFT
108 return FeatureGeneratorFromDFT(
109 self, categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules,
110 normalisation_rule_template=normalisation_rule_template, add_categorical_default_rules=add_categorical_default_rules
111 )
114class DFTFromFeatureGenerator(DataFrameTransformer):
115 def _fit(self, df: pd.DataFrame):
116 self.fgen.fit(df, ctx=None)
118 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
119 return self.fgen.generate(df)
121 def __init__(self, fgen: "FeatureGenerator"):
122 super().__init__()
123 self.fgen = fgen
124 self.set_name(f"{self.__class__.__name__}[{self.fgen.get_name()}]")
127class InvertibleDataFrameTransformer(DataFrameTransformer, ABC):
128 @abstractmethod
129 def apply_inverse(self, df: pd.DataFrame) -> pd.DataFrame:
130 pass
132 def get_inverse(self) -> "InverseDataFrameTransformer":
133 """
134 :return: a transformer whose (forward) transformation is the inverse transformation of this DFT
135 """
136 return InverseDataFrameTransformer(self)
139class RuleBasedDataFrameTransformer(DataFrameTransformer, ABC):
140 """Base class for transformers whose logic is entirely based on rules and does not need to be fitted to data"""
142 def _fit(self, df: pd.DataFrame):
143 pass
145 def fit(self, df: pd.DataFrame):
146 pass
148 def is_fitted(self):
149 return True
152class InverseDataFrameTransformer(RuleBasedDataFrameTransformer):
153 def __init__(self, invertible_dft: InvertibleDataFrameTransformer):
154 super().__init__()
155 self.invertibleDFT = invertible_dft
157 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
158 return self.invertibleDFT.apply_inverse(df)
161class DataFrameTransformerChain(DataFrameTransformer):
162 """
163 Supports the application of a chain of data frame transformers.
164 During fit and apply each transformer in the chain receives the transformed output of its predecessor.
165 """
167 def __init__(self, *data_frame_transformers: Union[DataFrameTransformer, List[DataFrameTransformer]]):
168 super().__init__()
169 self.dataFrameTransformers = flatten_arguments(data_frame_transformers)
171 def __len__(self):
172 return len(self.dataFrameTransformers)
174 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
175 for transformer in self.dataFrameTransformers:
176 df = transformer.apply(df)
177 return df
179 def _fit(self, df: pd.DataFrame):
180 if len(self.dataFrameTransformers) == 0:
181 return
182 for transformer in self.dataFrameTransformers[:-1]:
183 df = transformer.fit_apply(df)
184 self.dataFrameTransformers[-1].fit(df)
186 def is_fitted(self):
187 return all([dft.is_fitted() for dft in self.dataFrameTransformers])
189 def get_names(self) -> List[str]:
190 """
191 :return: the list of names of all contained feature generators
192 """
193 return [transf.get_name() for transf in self.dataFrameTransformers]
195 def info(self):
196 info = super().info()
197 info["chainedDFTTransformerNames"] = self.get_names()
198 info["length"] = len(self)
199 return info
201 def find_first_transformer_by_type(self, cls) -> Optional[DataFrameTransformer]:
202 for dft in self.dataFrameTransformers:
203 if isinstance(dft, cls):
204 return dft
205 return None
207 def append(self, t: DataFrameTransformer):
208 self.dataFrameTransformers.append(t)
211class DFTRenameColumns(RuleBasedDataFrameTransformer):
212 def __init__(self, columns_map: Dict[str, str]):
213 """
214 :param columns_map: dictionary mapping old column names to new names
215 """
216 super().__init__()
217 self.columnsMap = columns_map
219 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
220 return df.rename(columns=self.columnsMap)
223class DFTConditionalRowFilterOnColumn(RuleBasedDataFrameTransformer):
224 """
225 Filters a data frame by applying a boolean function to one of the columns and retaining only the rows
226 for which the function returns True
227 """
228 def __init__(self, column: str, condition: Callable[[Any], bool]):
229 super().__init__()
230 self.column = column
231 self.condition = condition
233 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
234 return df[df[self.column].apply(self.condition)]
237class DFTInSetComparisonRowFilterOnColumn(RuleBasedDataFrameTransformer):
238 """
239 Filters a data frame on the selected column and retains only the rows for which the value is in the setToKeep
240 """
241 def __init__(self, column: str, set_to_keep: Set):
242 super().__init__()
243 self.setToKeep = set_to_keep
244 self.column = column
246 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
247 return df[df[self.column].isin(self.setToKeep)]
249 def info(self):
250 info = super().info()
251 info["column"] = self.column
252 info["setToKeep"] = self.setToKeep
253 return info
256class DFTNotInSetComparisonRowFilterOnColumn(RuleBasedDataFrameTransformer):
257 """
258 Filters a data frame on the selected column and retains only the rows for which the value is not in the setToDrop
259 """
260 def __init__(self, column: str, set_to_drop: Set):
261 super().__init__()
262 self.setToDrop = set_to_drop
263 self.column = column
265 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
266 return df[~df[self.column].isin(self.setToDrop)]
268 def info(self):
269 info = super().info()
270 info["column"] = self.column
271 info["setToDrop"] = self.setToDrop
272 return info
275class DFTVectorizedConditionalRowFilterOnColumn(RuleBasedDataFrameTransformer):
276 """
277 Filters a data frame by applying a vectorized condition on the selected column and retaining only the rows
278 for which it returns True
279 """
280 def __init__(self, column: str, vectorized_condition: Callable[[pd.Series], Sequence[bool]]):
281 super().__init__()
282 self.column = column
283 self.vectorizedCondition = vectorized_condition
285 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
286 return df[self.vectorizedCondition(df[self.column])]
288 def info(self):
289 info = super().info()
290 info["column"] = self.column
291 return info
294class DFTRowFilter(RuleBasedDataFrameTransformer):
295 """
296 Filters a data frame by applying a condition function to each row and retaining only the rows
297 for which it returns True
298 """
299 def __init__(self, condition: Callable[[Any], bool]):
300 super().__init__()
301 self.condition = condition
303 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
304 return df[df.apply(self.condition, axis=1)]
307class DFTModifyColumn(RuleBasedDataFrameTransformer):
308 """
309 Modifies a column specified by 'column' using 'columnTransform'
310 """
311 def __init__(self, column: str, column_transform: Union[Callable, np.ufunc]):
312 """
313 :param column: the name of the column to be modified
314 :param column_transform: a function operating on single cells or a Numpy ufunc that applies to an entire Series
315 """
316 super().__init__()
317 self.column = column
318 self.columnTransform = column_transform
320 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
321 df[self.column] = df[self.column].apply(self.columnTransform)
322 return df
325class DFTModifyColumnVectorized(RuleBasedDataFrameTransformer):
326 """
327 Modifies a column specified by 'column' using 'columnTransform'. This transformer can be used to utilise Numpy vectorisation for
328 performance optimisation.
329 """
330 def __init__(self, column: str, column_transform: Callable[[np.ndarray], Union[Sequence, pd.Series, np.ndarray]]):
331 """
332 :param column: the name of the column to be modified
333 :param column_transform: a function that takes a Numpy array and from which the returned value will be assigned to the column as
334 a whole
335 """
336 super().__init__()
337 self.column = column
338 self.columnTransform = column_transform
340 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
341 df[self.column] = self.columnTransform(df[self.column].values)
342 return df
345class DFTOneHotEncoder(DataFrameTransformer):
346 def __init__(self, columns: Optional[Union[str, Sequence[str]]],
347 categories: Union[List[np.ndarray], Dict[str, np.ndarray]] = None, inplace=False, ignore_unknown=False,
348 array_valued_result=False):
349 """
350 One hot encode categorical variables
352 :param columns: list of names or regex matching names of columns that are to be replaced by a list one-hot encoded columns each
353 (or an array-valued column for the case where useArrayValues=True);
354 If None, then no columns are actually to be one-hot-encoded
355 :param categories: numpy arrays containing the possible values of each of the specified columns (for case where sequence is
356 specified in 'columns') or dictionary mapping column name to array of possible categories for the column name.
357 If None, the possible values will be inferred from the columns
358 :param inplace: whether to perform the transformation in-place
359 :param ignore_unknown: if True and an unknown category is encountered during transform, the resulting one-hot
360 encoded columns for this feature will be all zeros. if False, an unknown category will raise an error.
361 :param array_valued_result: whether to replace the input columns by columns of the same name containing arrays as values
362 instead of creating a separate column per original value
363 """
364 super().__init__()
365 self._paramInfo["columns"] = columns
366 self._paramInfo["inferCategories"] = categories is None
367 self.oneHotEncoders = None
368 if columns is None:
369 self._columnsToEncode = []
370 self._columnNameRegex = "$"
371 elif type(columns) == str:
372 self._columnNameRegex = columns
373 self._columnsToEncode = None
374 else:
375 self._columnNameRegex = or_regex_group(columns)
376 self._columnsToEncode = columns
377 self.inplace = inplace
378 self.arrayValuedResult = array_valued_result
379 self.handleUnknown = "ignore" if ignore_unknown else "error"
380 if categories is not None:
381 if type(categories) == dict:
382 self.oneHotEncoders = {col: OneHotEncoder(categories=[np.sort(categories)], handle_unknown=self.handleUnknown,
383 **self._sparse_kwargs()) for col, categories in categories.items()}
384 else:
385 if len(columns) != len(categories):
386 raise ValueError(f"Given categories must have the same length as columns to process")
387 self.oneHotEncoders = {col: OneHotEncoder(categories=[np.sort(categories)], handle_unknown=self.handleUnknown,
388 **self._sparse_kwargs()) for col, categories in zip(columns, categories)}
390 @staticmethod
391 def _sparse_kwargs(sparse=False):
392 if Version(sklearn).is_at_least(1, 2):
393 return dict(sparse_output=sparse)
394 else:
395 return dict(sparse=sparse)
397 def __setstate__(self, state):
398 if "arrayValuedResult" not in state:
399 state["arrayValuedResult"] = False
400 super().__setstate__(state)
402 def _tostring_additional_entries(self) -> Dict[str, Any]:
403 d = super()._tostring_additional_entries()
404 d["columns"] = self._paramInfo.get("columns")
405 return d
407 def _fit(self, df: pd.DataFrame):
408 if self._columnsToEncode is None:
409 self._columnsToEncode = [c for c in df.columns if re.fullmatch(self._columnNameRegex, c) is not None]
410 if len(self._columnsToEncode) == 0:
411 log.warning(f"{self} does not apply to any columns, transformer has no effect; regex='{self._columnNameRegex}'")
412 if self.oneHotEncoders is None:
413 self.oneHotEncoders = {}
414 sparse_kwargs = self._sparse_kwargs()
415 for column in self._columnsToEncode:
416 values = df[column].dropna().unique()
417 categories = [np.sort(values)]
418 self.oneHotEncoders[column] = OneHotEncoder(categories=categories, handle_unknown=self.handleUnknown,
419 **sparse_kwargs)
420 for columnName in self._columnsToEncode:
421 self.oneHotEncoders[columnName].fit(df[[columnName]])
423 def _apply(self, df: pd.DataFrame):
424 if len(self._columnsToEncode) == 0:
425 return df
427 if not self.inplace:
428 df = df.copy()
429 for columnName in self._columnsToEncode:
430 encoded_array = self.oneHotEncoders[columnName].transform(df[[columnName]])
431 if not self.arrayValuedResult:
432 df = df.drop(columns=columnName)
433 for i in range(encoded_array.shape[1]):
434 df["%s_%d" % (columnName, i)] = encoded_array[:, i]
435 else:
436 df[columnName] = list(encoded_array)
437 return df
439 def info(self):
440 info = super().info()
441 info["inplace"] = self.inplace
442 info["handleUnknown"] = self.handleUnknown
443 info["arrayValuedResult"] = self.arrayValuedResult
444 info.update(self._paramInfo)
445 return info
448class DFTColumnFilter(RuleBasedDataFrameTransformer):
449 """
450 A DataFrame transformer that filters columns by retaining or dropping specified columns
451 """
452 def __init__(self, keep: Union[str, Sequence[str]] = None, drop: Union[str, Sequence[str]] = None):
453 super().__init__()
454 self.keep = [keep] if type(keep) == str else keep
455 self.drop = drop
457 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
458 df = df.copy()
459 if self.keep is not None:
460 df = df[self.keep]
461 if self.drop is not None:
462 df = df.drop(columns=self.drop)
463 return df
465 def info(self):
466 info = super().info()
467 info["keep"] = self.keep
468 info["drop"] = self.drop
469 return info
472class DFTKeepColumns(DFTColumnFilter):
473 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
474 return df[self.keep]
477class DFTDRowFilterOnIndex(RuleBasedDataFrameTransformer):
478 def __init__(self, keep: Set = None, drop: Set = None):
479 super().__init__()
480 self.drop = drop
481 self.keep = keep
483 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
484 df = df.copy()
485 if self.keep is not None:
486 df = df.loc[self.keep]
487 if self.drop is not None:
488 df = df.drop(self.drop) # type: ignore
489 return df
492class DFTNormalisation(DataFrameTransformer):
493 """
494 Applies normalisation/scaling to a data frame by applying a set of transformation rules, where each
495 rule defines a set of columns to which it applies (learning a single transformer based on the values
496 of all applicable columns).
497 DFTNormalisation ignores N/A values during fitting and application.
498 """
500 class RuleTemplate:
501 def __init__(self,
502 skip: bool = False,
503 unsupported: bool = False,
504 transformer: Optional[SkLearnTransformerProtocol] = None,
505 transformer_factory: Callable[[], SkLearnTransformerProtocol] = None,
506 independent_columns: Optional[bool] = None,
507 array_valued: bool = False,
508 fit: bool = True):
509 """
510 A template from which a rule which matches multiple columns can be created.
511 This is useful for the generation of rules which shall apply to all the (numerical) columns generated
512 by a :class:`FeatureGenerator` without specifically naming them.
514 Use the parameters as follows:
516 * If the relevant features are already normalised, pass ``skip=True``
517 * If the relevant features cannot be normalised (e.g. because they are categorical), pass ``unsupported=True``
518 * If the relevant features shall be normalised, the other parameters apply.
519 No parameters, i.e. ``RuleTemplate()``, are an option if ...
521 * a default transformer factory is specified in the :class:`DFTNormalisation` instance and its application
522 is suitable for the relevant set of features.
523 Otherwise, specify either ``transformer_factory`` or ``transformer``.
524 * the resulting rule will match only a single column. Otherwise, ``independent_columns``
525 must be specified to True or False.
527 :param skip: flag indicating whether no transformation shall be performed on the matched columns (e.g. because they are already
528 normalised).
529 :param unsupported: flag indicating whether normalisation of matched columns is unsupported (shall trigger an exception if
530 attempted). Useful e.g. for preventing intermediate features that need further processing (like columns containing strings) from making
531 their way into the final dataframe that will be normalised and used for training a model.
532 :param transformer: a transformer instance (following the sklearn.preprocessing interface, e.g. StandardScaler) to apply to the matching column(s)
533 for the case where a transformation is necessary (skip=False, unsupported=False). If None is given, either
534 transformer_factory or the containing ``DFTNormalisation`` instance's default factory will be used when the normaliser is
535 fitted.
536 NOTE: Using a transformer_factory is usually preferred. Use an instance only if you want the
537 same transformer instance to be used in multiple places - e.g. sharing it across several feature generators or models that
538 use the same type of column with associated rule/rule template (disabling `fit` where appropriate).
539 :param transformer_factory: a factory for the generation of the transformer instance, which will only be applied if
540 `transformer` is not given; if neither `transformer` nor `transformer_factory` are given, the containing ``DFTNormalisation`` instance's default
541 factory will be used. See :class:`SkLearnTransformerFactoryFactory` for convenient construction options.
542 :param array_valued: whether the column values are not scalars but arrays (of some fixed but arbitrary length).
543 It is assumed that all entries in such arrays are to be normalised in the same way, i.e. the same
544 transformation will be applied to each entry in the array.
545 Only a single matching column is supported for array_valued=True, i.e. the rule must apply to at most one column.
546 :param fit: whether the rule's transformer shall be fitted. One use case for setting this to False is
547 if a transformer instance is provided (instead of a factory), which is already fitted.
548 :param independent_columns: whether, for the case where the rule matches multiple columns, the columns are independent and a
549 separate transformation is to be learned for each of them (rather than using the same transformation for all columns and
550 learning the transformation from the data of all columns).
551 This parameter must be specified for rules matching more than one column,
552 None is acceptable for rules matching a single column, in which case None, True, and False all have the same effect.
553 """
554 # NOTE: keep in sync with Rule!
555 if (skip or unsupported) and count_not_none(transformer, transformer_factory) > 0:
556 raise ValueError("Passed transformer or transformer_factory while skip=True or unsupported=True")
557 self.skip = skip
558 self.unsupported = unsupported
559 self.transformer = transformer
560 self.transformerFactory = transformer_factory
561 self.independentColumns = independent_columns
562 self.arrayValued = array_valued
563 self.fit = fit
565 def __setstate__(self, state):
566 setstate(DFTNormalisation.RuleTemplate, self, state, new_default_properties=dict(arrayValued=False, fit=True))
568 def to_rule(self, regex: Optional[Union[str, re.Pattern]]):
569 """
570 Convert the template to a rule for all columns matching the regex
572 :param regex: a regular expression defining the column the rule applies to
573 :return: the resulting Rule
574 """
575 return DFTNormalisation.Rule(regex, skip=self.skip, unsupported=self.unsupported, transformer=self.transformer,
576 transformer_factory=self.transformerFactory, independent_columns=self.independentColumns, array_valued=self.arrayValued, fit=self.fit)
578 def to_placeholder_rule(self):
579 return self.to_rule(None)
581 class Rule(ToStringMixin):
582 def __init__(self,
583 regex: Optional[Union[str, re.Pattern]],
584 skip: bool = False,
585 unsupported: bool = False,
586 transformer: Optional[SkLearnTransformerProtocol] = None,
587 transformer_factory: Optional[Callable[[], SkLearnTransformerProtocol]] = None,
588 array_valued: bool = False,
589 fit: bool = True,
590 independent_columns: Optional[bool] = None):
591 """
592 Use the parameters as follows:
594 * If the relevant features are already normalised, pass ``skip=True``
595 * If the relevant features cannot be normalised (e.g. because they are categorical), pass ``unsupported=True``
596 * If the relevant features shall be normalised, the other parameters apply.
597 No parameters other than regex, i.e. ``Rule(regex)``, are an option if ...
599 * a default transformer factory is specified in the :class:`DFTNormalisation` instance and its application
600 is suitable for the relevant set of features.
601 Otherwise, specify either ``transformer_factory`` or ``transformer``.
602 * the resulting rule will match only a single column. Otherwise, ``independent_columns``
603 must be specified to True or False.
605 :param regex: a regular expression defining the column(s) the rule applies to.
606 If it matches multiple columns, these columns will be normalised in the same way (using the same normalisation
607 process for each column) unless independent_columns=True.
608 If None, the rule is a placeholder rule and the regex must be set later via set_regex or the rule will not be applicable.
609 :param skip: flag indicating whether no transformation shall be performed on the matched columns (e.g. because they are already
610 normalised).
611 :param unsupported: flag indicating whether normalisation of matched columns is unsupported (shall trigger an exception if
612 attempted). Useful e.g. for preventing intermediate features that need further processing (like columns containing strings) from making
613 their way into the final dataframe that will be normalised and used for training a model.
614 :param transformer: a transformer instance (following the sklearn.preprocessing interface, e.g. StandardScaler) to apply to the matching column(s)
615 for the case where a transformation is necessary (skip=False, unsupported=False). If None is given, either
616 transformer_factory or the containing ``DFTNormalisation`` instance's default factory will be used when the normaliser is
617 fitted.
618 NOTE: Using a transformer_factory is usually preferred. Use an instance only if you want the
619 same transformer instance to be used in multiple places - e.g. sharing it across several feature generators or models that
620 use the same type of column with associated rule/rule template (disabling `fit` where appropriate).
621 :param transformer_factory: a factory for the generation of the transformer instance, which will only be applied if
622 `transformer` is not given; if neither `transformer` nor `transformer_factory` are given, the containing ``DFTNormalisation`` instance's default
623 factory will be used. See :class:`SkLearnTransformerFactoryFactory` for convenient construction options.
624 :param array_valued: whether the column values are not scalars but arrays (of some fixed but arbitrary length).
625 It is assumed that all entries in such arrays are to be normalised in the same way, i.e. the same
626 transformation will be applied to each entry in the array.
627 Only a single matching column is supported for array_valued=True, i.e. the regex must match at most one column.
628 :param fit: whether the rule's transformer shall be fitted. One use case for setting this to False is
629 if a transformer instance is provided (instead of a factory), which is already fitted.
630 :param independent_columns: whether, for the case where the rule matches multiple columns, the columns are independent and a
631 separate transformation is to be learned for each of them (rather than using the same transformation for all columns and
632 learning the transformation from the data of all columns).
633 This parameter must be specified to for rules matching more than one column,
634 None is acceptable for rules matching a single column, in which case None, True, and False all have the same effect.
635 """
636 if (skip or unsupported) and count_not_none(transformer, transformer_factory) > 0:
637 raise ValueError("Passed transformer or transformer_factory while skip=True or unsupported=True")
638 if isinstance(regex, str):
639 regex = re.compile(regex)
640 self.regex = regex
641 # NOTE: keep in sync with RuleTemplate!
642 self.skip = skip
643 self.unsupported = unsupported
644 self.transformer = transformer
645 self.transformerFactory = transformer_factory
646 self.arrayValued = array_valued
647 self.fit = fit
648 self.independentColumns = independent_columns
650 def __setstate__(self, state):
651 setstate(DFTNormalisation.Rule, self, state, new_default_properties=dict(arrayValued=False, fit=True, independentColumns=False,
652 transformerFactory=None))
654 def _tostring_excludes(self) -> List[str]:
655 return super()._tostring_excludes() + ["regex"]
657 def _tostring_additional_entries(self) -> Dict[str, Any]:
658 d = super()._tostring_additional_entries()
659 if self.regex is not None:
660 d["regex"] = f"'{self.regex.pattern}'"
661 return d
663 def set_regex(self, regex: str):
664 try:
665 self.regex = re.compile(regex)
666 except Exception as e:
667 raise Exception(f"Could not compile regex '{regex}': {e}")
669 def matches(self, column: str):
670 if self.regex is None:
671 raise Exception("Attempted to apply a placeholder rule. Perhaps the feature generator from which the rule originated was "
672 "never applied in order to have the rule instantiated.")
673 return self.regex.fullmatch(column) is not None
675 def matching_columns(self, columns: Sequence[str]) -> List[str]:
676 return [col for col in columns if self.matches(col)]
678 def __init__(self, rules: Sequence[Rule], default_transformer_factory: Optional[Callable[[], SkLearnTransformerProtocol]] = None,
679 require_all_handled: bool = True, inplace: bool = False):
680 """
681 :param rules: the set of rules; rules (i.e., their transformers) are always fitted and applied in the given order.
682 A convenient way to obtain a set of rules in the :class:`sensai.vector_model.VectorModel` context is from a
683 :class:`sensai.featuregen.FeatureCollector` or :class:`sensai.featuregen.MultiFeatureGenerator`.
684 Generally, it is often a good idea to associate rules (or a rule template) with a feature generator.
685 Then the rules can be obtained from it using `get_normalisation_rules`.
686 :param default_transformer_factory: a factory for the creation of transformer instances (which implements the
687 API used by sklearn.preprocessing, e.g. StandardScaler) that shall be used to create a transformer for all
688 rules that do not specify a particular transformer.
689 The default transformer will only be applied to columns matched by such rules, unmatched columns will
690 not be transformed.
691 Use :class:`SkLearnTransformerFactoryFactory` to conveniently create a factory.
692 :param require_all_handled: whether to raise an exception if any column is not matched by a rule
693 :param inplace: whether to apply data frame transformations in-place
694 """
695 super().__init__()
696 self.requireAllHandled = require_all_handled
697 self.inplace = inplace
698 self._userRules = rules
699 self._defaultTransformerFactory = default_transformer_factory
700 self._rules = None
702 def _tostring_additional_entries(self) -> Dict[str, Any]:
703 d = super()._tostring_additional_entries()
704 if self._rules is not None:
705 d["rules"] = self._rules
706 else:
707 d["userRules"] = self._userRules
708 return d
710 def _fit(self, df: pd.DataFrame):
711 matched_rules_by_column = {}
712 self._rules = []
713 # For rules matching multiple columns, if independent_columns is False, the columns
714 # will be concatenated and treated as a single column for fitting the transformer.
715 # Note that transformers follow sklearn interfaces, thus just passing an array
716 # to them will learn a per-column-transformation. This will be the case for independent_columns=True.
717 for rule in self._userRules:
718 matching_columns = rule.matching_columns(df.columns)
719 for c in matching_columns:
720 if c in matched_rules_by_column:
721 raise Exception(f"More than one rule applies to column '{c}': {matched_rules_by_column[c]}, {rule}")
722 matched_rules_by_column[c] = rule
724 if len(matching_columns) > 0:
725 if rule.unsupported:
726 raise Exception(f"Normalisation of columns {matching_columns} is unsupported according to {rule}. "
727 f"If you want to make use of these columns, transform them into a supported column before applying "
728 f"{self.__class__.__name__}.")
729 if not rule.skip:
730 if rule.transformer is None:
731 if rule.transformerFactory is not None:
732 rule.transformer = rule.transformerFactory()
733 else:
734 if self._defaultTransformerFactory is None:
735 raise Exception(f"No transformer to fit: {rule} defines no transformer and instance has no transformer "
736 f"factory")
737 rule.transformer = self._defaultTransformerFactory()
738 if rule.fit:
739 # fit transformer
740 applicable_df = df[sorted(matching_columns)]
741 if rule.arrayValued:
742 if len(matching_columns) > 1:
743 raise Exception(f"Array-valued case is only supported for a single column, "
744 f"matched {matching_columns} for {rule}")
745 values = np.concatenate(applicable_df.values.flatten())
746 values = values.reshape((len(values), 1))
747 elif rule.independentColumns:
748 values = applicable_df.values
749 else:
750 values = applicable_df.values.flatten()
751 values = values.reshape((len(values), 1))
752 rule.transformer.fit(values)
753 else:
754 log.log(logging.DEBUG - 1, f"{rule} matched no columns")
756 # collect specialised rule for application
757 specialised_rule = copy.copy(rule)
758 if not specialised_rule.skip and specialised_rule.independentColumns is None and len(matching_columns) > 1:
759 raise ValueError(f"Normalisation rule matching multiple columns {matching_columns} must set `independentColumns` "
760 f"(got None)")
761 specialised_rule.set_regex(or_regex_group(matching_columns))
762 self._rules.append(specialised_rule)
764 def _check_unhandled_columns(self, df, matched_rules_by_column):
765 if self.requireAllHandled:
766 unhandled_columns = set(df.columns) - set(matched_rules_by_column.keys())
767 if len(unhandled_columns) > 0:
768 raise Exception(f"The following columns are not handled by any rules: {unhandled_columns}; "
769 f"rules: {', '.join(map(str, self._rules))}")
771 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
772 if not self.inplace:
773 df = df.copy()
774 matched_rules_by_column = {}
775 for rule in self._rules:
776 matching_columns = rule.matching_columns(df.columns)
777 if len(matching_columns) == 0:
778 continue
779 for c in matching_columns:
780 matched_rules_by_column[c] = rule
781 if not rule.skip:
782 if rule.independentColumns and not rule.arrayValued:
783 matching_columns = sorted(matching_columns)
784 df[matching_columns] = rule.transformer.transform(df[matching_columns].values)
785 else:
786 for c in matching_columns:
787 if not rule.arrayValued:
788 df[c] = rule.transformer.transform(df[[c]].values)
789 else:
790 df[c] = [rule.transformer.transform(np.array([x]).T)[:, 0] for x in df[c]]
791 self._check_unhandled_columns(df, matched_rules_by_column)
792 return df
794 def info(self):
795 info = super().info()
796 info["requireAllHandled"] = self.requireAllHandled
797 info["inplace"] = self.inplace
798 return info
800 def find_rule(self, col_name: str) -> "DFTNormalisation.Rule":
801 for rule in self._rules:
802 if rule.matches(col_name):
803 return rule
806class DFTFromColumnGenerators(RuleBasedDataFrameTransformer):
807 """
808 Extends a data frame with columns generated from ColumnGenerator instances
809 """
810 def __init__(self, column_generators: Sequence['ColumnGenerator'], inplace=False):
811 super().__init__()
812 self.columnGenerators = column_generators
813 self.inplace = inplace
815 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
816 if not self.inplace:
817 df = df.copy()
818 for cg in self.columnGenerators:
819 series = cg.generate_column(df)
820 df[series.name] = series
821 return df
823 def info(self):
824 info = super().info()
825 info["inplace"] = self.inplace
826 return info
829class DFTCountEntries(RuleBasedDataFrameTransformer):
830 """
831 Transforms a data frame, based on one of its columns, into a new data frame containing two columns that indicate the counts
832 of unique values in the input column. It is the "DataFrame output version" of pd.Series.value_counts.
833 Each row of the output column holds a unique value of the input column and the number of times it appears in the input column.
834 """
835 def __init__(self, column_for_entry_count: str, column_name_for_resulting_counts: str = "counts"):
836 super().__init__()
837 self.columnNameForResultingCounts = column_name_for_resulting_counts
838 self.columnForEntryCount = column_for_entry_count
840 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
841 series = df[self.columnForEntryCount].value_counts()
842 return pd.DataFrame({self.columnForEntryCount: series.index, self.columnNameForResultingCounts: series.values})
844 def info(self):
845 info = super().info()
846 info["columnNameForResultingCounts"] = self.columnNameForResultingCounts
847 info["columnForEntryCount"] = self.columnForEntryCount
848 return info
851class DFTAggregationOnColumn(RuleBasedDataFrameTransformer):
852 def __init__(self, column_for_aggregation: str, aggregation: Callable):
853 super().__init__()
854 self.columnForAggregation = column_for_aggregation
855 self.aggregation = aggregation
857 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
858 return df.groupby(self.columnForAggregation).agg(self.aggregation)
861class DFTRoundFloats(RuleBasedDataFrameTransformer):
862 def __init__(self, decimals=0):
863 super().__init__()
864 self.decimals = decimals
866 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
867 return pd.DataFrame(np.round(df.values, self.decimals), columns=df.columns, index=df.index)
869 def info(self):
870 info = super().info()
871 info["decimals"] = self.decimals
872 return info
875class DFTSkLearnTransformer(InvertibleDataFrameTransformer):
876 """
877 Applies a transformer from sklearn.preprocessing to (a subset of) the columns of a data frame.
878 If multiple columns are transformed, they are transformed independently (i.e. each column uses a separately trained transformation).
879 """
880 def __init__(self,
881 sklearn_transformer: SkLearnTransformerProtocol,
882 columns: Optional[List[str]] = None,
883 inplace=False,
884 array_valued=False):
885 """
886 :param sklearn_transformer: the transformer instance (from sklearn.preprocessing) to use (which will be fitted & applied)
887 :param columns: the set of column names to which the transformation shall apply; if None, apply it to all columns
888 :param inplace: whether to apply the transformation in-place
889 :param array_valued: whether to apply transformation not to scalar-valued columns but to one or more array-valued columns,
890 where the values of all arrays within a column (which may vary in length) are to be transformed in the same way.
891 If multiple columns are transformed, then the arrays belonging to a single row must all have the same length.
892 """
893 super().__init__()
894 self.set_name(f"{self.__class__.__name__}_wrapped_{sklearn_transformer.__class__.__name__}")
895 self.sklearnTransformer = sklearn_transformer
896 self.columns = columns
897 self.inplace = inplace
898 self.arrayValued = array_valued
900 def __setstate__(self, state):
901 state["arrayValued"] = state.get("arrayValued", False)
902 setstate(DFTSkLearnTransformer, self, state)
904 def _fit(self, df: pd.DataFrame):
905 cols = self.columns
906 if cols is None:
907 cols = df.columns
908 if not self.arrayValued:
909 values = df[cols].values
910 else:
911 if len(cols) == 1:
912 values = np.concatenate(df[cols[0]].values.flatten())
913 values = values.reshape((len(values), 1))
914 else:
915 flat_col_arrays = [np.concatenate(df[col].values.flatten()) for col in cols]
916 lengths = [len(a) for a in flat_col_arrays]
917 if len(set(lengths)) != 1:
918 raise ValueError(f"Columns {cols} do not contain the same number of values: {lengths}")
919 values = np.stack(flat_col_arrays, axis=1)
920 self.sklearnTransformer.fit(values)
922 def _apply_transformer(self, df: pd.DataFrame, inverse: bool) -> pd.DataFrame:
923 if not self.inplace:
924 df = df.copy()
925 cols = self.columns
926 if cols is None:
927 cols = df.columns
928 transform = (lambda x: self.sklearnTransformer.inverse_transform(x)) if inverse else lambda x: self.sklearnTransformer.transform(x)
929 if not self.arrayValued:
930 df[cols] = transform(df[cols].values)
931 else:
932 if len(cols) == 1:
933 c = cols[0]
934 df[c] = [transform(np.array([x]).T)[:, 0] for x in df[c]]
935 else:
936 transformed_values = [transform(np.stack(row, axis=1)) for row in df.values]
937 for iCol, col in enumerate(cols):
938 df[col] = [row[:, iCol] for row in transformed_values]
939 return df
941 def _apply(self, df):
942 return self._apply_transformer(df, False)
944 def apply_inverse(self, df):
945 return self._apply_transformer(df, True)
947 def info(self):
948 info = super().info()
949 info["columns"] = self.columns
950 info["inplace"] = self.inplace
951 info["sklearnTransformerClass"] = self.sklearnTransformer.__class__.__name__
952 return info
955class DFTSortColumns(RuleBasedDataFrameTransformer):
956 """
957 Sorts a data frame's columns in ascending order
958 """
959 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
960 return df[sorted(df.columns)]
963class DFTFillNA(RuleBasedDataFrameTransformer):
964 """
965 Fills NA/NaN values with the given value
966 """
967 def __init__(self, fill_value, inplace: bool = False):
968 super().__init__()
969 self.fillValue = fill_value
970 self.inplace = inplace
972 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
973 if self.inplace:
974 df.fillna(value=self.fillValue, inplace=True)
975 return df
976 else:
977 return df.fillna(value=self.fillValue)
980class DFTCastCategoricalColumns(RuleBasedDataFrameTransformer):
981 """
982 Casts columns with dtype category to the given type.
983 This can be useful in cases where categorical columns are not accepted by the model but the column values are actually numeric,
984 in which case the cast to a numeric value yields an acceptable label encoding.
985 """
986 def __init__(self, columns: Optional[List[str]] = None, dtype=float):
987 """
988 :param columns: the columns to convert; if None, convert all that have dtype category
989 :param dtype: the data type to which categorical columns are to be converted
990 """
991 super().__init__()
992 self.columns = columns
993 self.dtype = dtype
995 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
996 df = df.copy()
997 columns = self.columns if self.columns is not None else df.columns
998 for col in columns:
999 s = df[col]
1000 if s.dtype.name == "category":
1001 df[col] = s.astype(self.dtype)
1002 return df
1005class DFTDropNA(RuleBasedDataFrameTransformer):
1006 """
1007 Drops rows or columns containing NA/NaN values
1008 """
1009 def __init__(self, axis=0, inplace=False):
1010 """
1011 :param axis: 0 to drop rows, 1 to drop columns containing an N/A value
1012 :param inplace: whether to perform the operation in-place on the input data frame
1013 """
1014 super().__init__()
1015 self.axis = axis
1016 self.inplace = inplace
1018 def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
1019 if self.inplace:
1020 df.dropna(axis=self.axis, inplace=True)
1021 return df
1022 else:
1023 return df.dropna(axis=self.axis)