Coverage for src/sensai/data_transformation/dft.py: 61%

569 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-29 18:29 +0000

1import copy 

2import logging 

3import re 

4from abc import ABC, abstractmethod 

5from typing import List, Sequence, Union, Dict, Callable, Any, Optional, Set 

6 

7import numpy as np 

8import pandas as pd 

9import sklearn 

10from sklearn.preprocessing import OneHotEncoder 

11 

12from .sklearn_transformer import SkLearnTransformerProtocol 

13from ..util import flatten_arguments, count_not_none 

14from ..util.pandas import DataFrameColumnChangeTracker 

15from ..util.pickle import setstate 

16from ..util.string import or_regex_group, ToStringMixin 

17 

18from typing import TYPE_CHECKING 

19 

20from ..util.version import Version 

21 

22if TYPE_CHECKING: 

23 from ..featuregen import FeatureGenerator 

24 from ..columngen import ColumnGenerator 

25 

26 

27log = logging.getLogger(__name__) 

28 

29 

30class DataFrameTransformer(ABC, ToStringMixin): 

31 """ 

32 Base class for data frame transformers, i.e. objects which can transform one data frame into another 

33 (possibly applying the transformation to the original data frame - in-place transformation). 

34 A data frame transformer may require being fitted using training data. 

35 """ 

36 def __init__(self): 

37 self._name = f"{self.__class__.__name__}-{id(self)}" 

38 self._isFitted = False 

39 self._columnChangeTracker: Optional[DataFrameColumnChangeTracker] = None 

40 self._paramInfo = {} # arguments passed to init that are not saved otherwise can be persisted here 

41 

42 # for backwards compatibility with persisted DFTs based on code prior to commit 7088cbbe 

43 # They lack the __isFitted attribute and we assume that each such DFT was fitted 

44 def __setstate__(self, d): 

45 d["_name"] = d.get("_name", f"{self.__class__.__name__}-{id(self)}") 

46 d["_isFitted"] = d.get("_isFitted", True) 

47 d["_columnChangeTracker"] = d.get("_columnChangeTracker", None) 

48 d["_paramInfo"] = d.get("_paramInfo", {}) 

49 self.__dict__ = d 

50 

51 def _tostring_exclude_private(self) -> bool: 

52 return True 

53 

54 def get_name(self) -> str: 

55 """ 

56 :return: the name of this dft transformer, which may be a default name if the name has not been set. 

57 """ 

58 return self._name 

59 

60 def set_name(self, name: str): 

61 self._name = name 

62 

63 def with_name(self, name: str): 

64 self.set_name(name) 

65 return self 

66 

67 @abstractmethod 

68 def _fit(self, df: pd.DataFrame): 

69 pass 

70 

71 @abstractmethod 

72 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

73 pass 

74 

75 def apply(self, df: pd.DataFrame) -> pd.DataFrame: 

76 self._columnChangeTracker = DataFrameColumnChangeTracker(df) 

77 if not self.is_fitted(): 

78 raise Exception(f"Cannot apply a DataFrameTransformer which is not fitted: " 

79 f"the df transformer {self.get_name()} requires fitting") 

80 df = self._apply(df) 

81 self._columnChangeTracker.track_change(df) 

82 return df 

83 

84 def info(self): 

85 return { 

86 "name": self.get_name(), 

87 "changeInColumnNames": self._columnChangeTracker.column_change_string() if self._columnChangeTracker is not None else None, 

88 "isFitted": self.is_fitted(), 

89 } 

90 

91 def fit(self, df: pd.DataFrame): 

92 self._fit(df) 

93 self._isFitted = True 

94 

95 def is_fitted(self): 

96 return self._isFitted 

97 

98 def fit_apply(self, df: pd.DataFrame) -> pd.DataFrame: 

99 self.fit(df) 

100 return self.apply(df) 

101 

102 def to_feature_generator(self, categorical_feature_names: Optional[Union[Sequence[str], str]] = None, 

103 normalisation_rules: Sequence['DFTNormalisation.Rule'] = (), 

104 normalisation_rule_template: 'DFTNormalisation.RuleTemplate' = None, 

105 add_categorical_default_rules=True): 

106 # need to import here to prevent circular imports 

107 from ..featuregen import FeatureGeneratorFromDFT 

108 return FeatureGeneratorFromDFT( 

109 self, categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules, 

110 normalisation_rule_template=normalisation_rule_template, add_categorical_default_rules=add_categorical_default_rules 

111 ) 

112 

113 

114class DFTFromFeatureGenerator(DataFrameTransformer): 

115 def _fit(self, df: pd.DataFrame): 

116 self.fgen.fit(df, ctx=None) 

117 

118 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

119 return self.fgen.generate(df) 

120 

121 def __init__(self, fgen: "FeatureGenerator"): 

122 super().__init__() 

123 self.fgen = fgen 

124 self.set_name(f"{self.__class__.__name__}[{self.fgen.get_name()}]") 

125 

126 

127class InvertibleDataFrameTransformer(DataFrameTransformer, ABC): 

128 @abstractmethod 

129 def apply_inverse(self, df: pd.DataFrame) -> pd.DataFrame: 

130 pass 

131 

132 def get_inverse(self) -> "InverseDataFrameTransformer": 

133 """ 

134 :return: a transformer whose (forward) transformation is the inverse transformation of this DFT 

135 """ 

136 return InverseDataFrameTransformer(self) 

137 

138 

139class RuleBasedDataFrameTransformer(DataFrameTransformer, ABC): 

140 """Base class for transformers whose logic is entirely based on rules and does not need to be fitted to data""" 

141 

142 def _fit(self, df: pd.DataFrame): 

143 pass 

144 

145 def fit(self, df: pd.DataFrame): 

146 pass 

147 

148 def is_fitted(self): 

149 return True 

150 

151 

152class InverseDataFrameTransformer(RuleBasedDataFrameTransformer): 

153 def __init__(self, invertible_dft: InvertibleDataFrameTransformer): 

154 super().__init__() 

155 self.invertibleDFT = invertible_dft 

156 

157 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

158 return self.invertibleDFT.apply_inverse(df) 

159 

160 

161class DataFrameTransformerChain(DataFrameTransformer): 

162 """ 

163 Supports the application of a chain of data frame transformers. 

164 During fit and apply each transformer in the chain receives the transformed output of its predecessor. 

165 """ 

166 

167 def __init__(self, *data_frame_transformers: Union[DataFrameTransformer, List[DataFrameTransformer]]): 

168 super().__init__() 

169 self.dataFrameTransformers = flatten_arguments(data_frame_transformers) 

170 

171 def __len__(self): 

172 return len(self.dataFrameTransformers) 

173 

174 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

175 for transformer in self.dataFrameTransformers: 

176 df = transformer.apply(df) 

177 return df 

178 

179 def _fit(self, df: pd.DataFrame): 

180 if len(self.dataFrameTransformers) == 0: 

181 return 

182 for transformer in self.dataFrameTransformers[:-1]: 

183 df = transformer.fit_apply(df) 

184 self.dataFrameTransformers[-1].fit(df) 

185 

186 def is_fitted(self): 

187 return all([dft.is_fitted() for dft in self.dataFrameTransformers]) 

188 

189 def get_names(self) -> List[str]: 

190 """ 

191 :return: the list of names of all contained feature generators 

192 """ 

193 return [transf.get_name() for transf in self.dataFrameTransformers] 

194 

195 def info(self): 

196 info = super().info() 

197 info["chainedDFTTransformerNames"] = self.get_names() 

198 info["length"] = len(self) 

199 return info 

200 

201 def find_first_transformer_by_type(self, cls) -> Optional[DataFrameTransformer]: 

202 for dft in self.dataFrameTransformers: 

203 if isinstance(dft, cls): 

204 return dft 

205 return None 

206 

207 def append(self, t: DataFrameTransformer): 

208 self.dataFrameTransformers.append(t) 

209 

210 

211class DFTRenameColumns(RuleBasedDataFrameTransformer): 

212 def __init__(self, columns_map: Dict[str, str]): 

213 """ 

214 :param columns_map: dictionary mapping old column names to new names 

215 """ 

216 super().__init__() 

217 self.columnsMap = columns_map 

218 

219 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

220 return df.rename(columns=self.columnsMap) 

221 

222 

223class DFTConditionalRowFilterOnColumn(RuleBasedDataFrameTransformer): 

224 """ 

225 Filters a data frame by applying a boolean function to one of the columns and retaining only the rows 

226 for which the function returns True 

227 """ 

228 def __init__(self, column: str, condition: Callable[[Any], bool]): 

229 super().__init__() 

230 self.column = column 

231 self.condition = condition 

232 

233 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

234 return df[df[self.column].apply(self.condition)] 

235 

236 

237class DFTInSetComparisonRowFilterOnColumn(RuleBasedDataFrameTransformer): 

238 """ 

239 Filters a data frame on the selected column and retains only the rows for which the value is in the setToKeep 

240 """ 

241 def __init__(self, column: str, set_to_keep: Set): 

242 super().__init__() 

243 self.setToKeep = set_to_keep 

244 self.column = column 

245 

246 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

247 return df[df[self.column].isin(self.setToKeep)] 

248 

249 def info(self): 

250 info = super().info() 

251 info["column"] = self.column 

252 info["setToKeep"] = self.setToKeep 

253 return info 

254 

255 

256class DFTNotInSetComparisonRowFilterOnColumn(RuleBasedDataFrameTransformer): 

257 """ 

258 Filters a data frame on the selected column and retains only the rows for which the value is not in the setToDrop 

259 """ 

260 def __init__(self, column: str, set_to_drop: Set): 

261 super().__init__() 

262 self.setToDrop = set_to_drop 

263 self.column = column 

264 

265 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

266 return df[~df[self.column].isin(self.setToDrop)] 

267 

268 def info(self): 

269 info = super().info() 

270 info["column"] = self.column 

271 info["setToDrop"] = self.setToDrop 

272 return info 

273 

274 

275class DFTVectorizedConditionalRowFilterOnColumn(RuleBasedDataFrameTransformer): 

276 """ 

277 Filters a data frame by applying a vectorized condition on the selected column and retaining only the rows 

278 for which it returns True 

279 """ 

280 def __init__(self, column: str, vectorized_condition: Callable[[pd.Series], Sequence[bool]]): 

281 super().__init__() 

282 self.column = column 

283 self.vectorizedCondition = vectorized_condition 

284 

285 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

286 return df[self.vectorizedCondition(df[self.column])] 

287 

288 def info(self): 

289 info = super().info() 

290 info["column"] = self.column 

291 return info 

292 

293 

294class DFTRowFilter(RuleBasedDataFrameTransformer): 

295 """ 

296 Filters a data frame by applying a condition function to each row and retaining only the rows 

297 for which it returns True 

298 """ 

299 def __init__(self, condition: Callable[[Any], bool]): 

300 super().__init__() 

301 self.condition = condition 

302 

303 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

304 return df[df.apply(self.condition, axis=1)] 

305 

306 

307class DFTModifyColumn(RuleBasedDataFrameTransformer): 

308 """ 

309 Modifies a column specified by 'column' using 'columnTransform' 

310 """ 

311 def __init__(self, column: str, column_transform: Union[Callable, np.ufunc]): 

312 """ 

313 :param column: the name of the column to be modified 

314 :param column_transform: a function operating on single cells or a Numpy ufunc that applies to an entire Series 

315 """ 

316 super().__init__() 

317 self.column = column 

318 self.columnTransform = column_transform 

319 

320 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

321 df[self.column] = df[self.column].apply(self.columnTransform) 

322 return df 

323 

324 

325class DFTModifyColumnVectorized(RuleBasedDataFrameTransformer): 

326 """ 

327 Modifies a column specified by 'column' using 'columnTransform'. This transformer can be used to utilise Numpy vectorisation for 

328 performance optimisation. 

329 """ 

330 def __init__(self, column: str, column_transform: Callable[[np.ndarray], Union[Sequence, pd.Series, np.ndarray]]): 

331 """ 

332 :param column: the name of the column to be modified 

333 :param column_transform: a function that takes a Numpy array and from which the returned value will be assigned to the column as 

334 a whole 

335 """ 

336 super().__init__() 

337 self.column = column 

338 self.columnTransform = column_transform 

339 

340 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

341 df[self.column] = self.columnTransform(df[self.column].values) 

342 return df 

343 

344 

345class DFTOneHotEncoder(DataFrameTransformer): 

346 def __init__(self, columns: Optional[Union[str, Sequence[str]]], 

347 categories: Union[List[np.ndarray], Dict[str, np.ndarray]] = None, inplace=False, ignore_unknown=False, 

348 array_valued_result=False): 

349 """ 

350 One hot encode categorical variables 

351 

352 :param columns: list of names or regex matching names of columns that are to be replaced by a list one-hot encoded columns each 

353 (or an array-valued column for the case where useArrayValues=True); 

354 If None, then no columns are actually to be one-hot-encoded 

355 :param categories: numpy arrays containing the possible values of each of the specified columns (for case where sequence is 

356 specified in 'columns') or dictionary mapping column name to array of possible categories for the column name. 

357 If None, the possible values will be inferred from the columns 

358 :param inplace: whether to perform the transformation in-place 

359 :param ignore_unknown: if True and an unknown category is encountered during transform, the resulting one-hot 

360 encoded columns for this feature will be all zeros. if False, an unknown category will raise an error. 

361 :param array_valued_result: whether to replace the input columns by columns of the same name containing arrays as values 

362 instead of creating a separate column per original value 

363 """ 

364 super().__init__() 

365 self._paramInfo["columns"] = columns 

366 self._paramInfo["inferCategories"] = categories is None 

367 self.oneHotEncoders = None 

368 if columns is None: 

369 self._columnsToEncode = [] 

370 self._columnNameRegex = "$" 

371 elif type(columns) == str: 

372 self._columnNameRegex = columns 

373 self._columnsToEncode = None 

374 else: 

375 self._columnNameRegex = or_regex_group(columns) 

376 self._columnsToEncode = columns 

377 self.inplace = inplace 

378 self.arrayValuedResult = array_valued_result 

379 self.handleUnknown = "ignore" if ignore_unknown else "error" 

380 if categories is not None: 

381 if type(categories) == dict: 

382 self.oneHotEncoders = {col: OneHotEncoder(categories=[np.sort(categories)], handle_unknown=self.handleUnknown, 

383 **self._sparse_kwargs()) for col, categories in categories.items()} 

384 else: 

385 if len(columns) != len(categories): 

386 raise ValueError(f"Given categories must have the same length as columns to process") 

387 self.oneHotEncoders = {col: OneHotEncoder(categories=[np.sort(categories)], handle_unknown=self.handleUnknown, 

388 **self._sparse_kwargs()) for col, categories in zip(columns, categories)} 

389 

390 @staticmethod 

391 def _sparse_kwargs(sparse=False): 

392 if Version(sklearn).is_at_least(1, 2): 

393 return dict(sparse_output=sparse) 

394 else: 

395 return dict(sparse=sparse) 

396 

397 def __setstate__(self, state): 

398 if "arrayValuedResult" not in state: 

399 state["arrayValuedResult"] = False 

400 super().__setstate__(state) 

401 

402 def _tostring_additional_entries(self) -> Dict[str, Any]: 

403 d = super()._tostring_additional_entries() 

404 d["columns"] = self._paramInfo.get("columns") 

405 return d 

406 

407 def _fit(self, df: pd.DataFrame): 

408 if self._columnsToEncode is None: 

409 self._columnsToEncode = [c for c in df.columns if re.fullmatch(self._columnNameRegex, c) is not None] 

410 if len(self._columnsToEncode) == 0: 

411 log.warning(f"{self} does not apply to any columns, transformer has no effect; regex='{self._columnNameRegex}'") 

412 if self.oneHotEncoders is None: 

413 self.oneHotEncoders = {} 

414 sparse_kwargs = self._sparse_kwargs() 

415 for column in self._columnsToEncode: 

416 values = df[column].dropna().unique() 

417 categories = [np.sort(values)] 

418 self.oneHotEncoders[column] = OneHotEncoder(categories=categories, handle_unknown=self.handleUnknown, 

419 **sparse_kwargs) 

420 for columnName in self._columnsToEncode: 

421 self.oneHotEncoders[columnName].fit(df[[columnName]]) 

422 

423 def _apply(self, df: pd.DataFrame): 

424 if len(self._columnsToEncode) == 0: 

425 return df 

426 

427 if not self.inplace: 

428 df = df.copy() 

429 for columnName in self._columnsToEncode: 

430 encoded_array = self.oneHotEncoders[columnName].transform(df[[columnName]]) 

431 if not self.arrayValuedResult: 

432 df = df.drop(columns=columnName) 

433 for i in range(encoded_array.shape[1]): 

434 df["%s_%d" % (columnName, i)] = encoded_array[:, i] 

435 else: 

436 df[columnName] = list(encoded_array) 

437 return df 

438 

439 def info(self): 

440 info = super().info() 

441 info["inplace"] = self.inplace 

442 info["handleUnknown"] = self.handleUnknown 

443 info["arrayValuedResult"] = self.arrayValuedResult 

444 info.update(self._paramInfo) 

445 return info 

446 

447 

448class DFTColumnFilter(RuleBasedDataFrameTransformer): 

449 """ 

450 A DataFrame transformer that filters columns by retaining or dropping specified columns 

451 """ 

452 def __init__(self, keep: Union[str, Sequence[str]] = None, drop: Union[str, Sequence[str]] = None): 

453 super().__init__() 

454 self.keep = [keep] if type(keep) == str else keep 

455 self.drop = drop 

456 

457 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

458 df = df.copy() 

459 if self.keep is not None: 

460 df = df[self.keep] 

461 if self.drop is not None: 

462 df = df.drop(columns=self.drop) 

463 return df 

464 

465 def info(self): 

466 info = super().info() 

467 info["keep"] = self.keep 

468 info["drop"] = self.drop 

469 return info 

470 

471 

472class DFTKeepColumns(DFTColumnFilter): 

473 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

474 return df[self.keep] 

475 

476 

477class DFTDRowFilterOnIndex(RuleBasedDataFrameTransformer): 

478 def __init__(self, keep: Set = None, drop: Set = None): 

479 super().__init__() 

480 self.drop = drop 

481 self.keep = keep 

482 

483 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

484 df = df.copy() 

485 if self.keep is not None: 

486 df = df.loc[self.keep] 

487 if self.drop is not None: 

488 df = df.drop(self.drop) # type: ignore 

489 return df 

490 

491 

492class DFTNormalisation(DataFrameTransformer): 

493 """ 

494 Applies normalisation/scaling to a data frame by applying a set of transformation rules, where each 

495 rule defines a set of columns to which it applies (learning a single transformer based on the values 

496 of all applicable columns). 

497 DFTNormalisation ignores N/A values during fitting and application. 

498 """ 

499 

500 class RuleTemplate: 

501 def __init__(self, 

502 skip: bool = False, 

503 unsupported: bool = False, 

504 transformer: Optional[SkLearnTransformerProtocol] = None, 

505 transformer_factory: Callable[[], SkLearnTransformerProtocol] = None, 

506 independent_columns: Optional[bool] = None, 

507 array_valued: bool = False, 

508 fit: bool = True): 

509 """ 

510 A template from which a rule which matches multiple columns can be created. 

511 This is useful for the generation of rules which shall apply to all the (numerical) columns generated 

512 by a :class:`FeatureGenerator` without specifically naming them. 

513 

514 Use the parameters as follows: 

515 

516 * If the relevant features are already normalised, pass ``skip=True`` 

517 * If the relevant features cannot be normalised (e.g. because they are categorical), pass ``unsupported=True`` 

518 * If the relevant features shall be normalised, the other parameters apply. 

519 No parameters, i.e. ``RuleTemplate()``, are an option if ... 

520 

521 * a default transformer factory is specified in the :class:`DFTNormalisation` instance and its application 

522 is suitable for the relevant set of features. 

523 Otherwise, specify either ``transformer_factory`` or ``transformer``. 

524 * the resulting rule will match only a single column. Otherwise, ``independent_columns`` 

525 must be specified to True or False. 

526 

527 :param skip: flag indicating whether no transformation shall be performed on the matched columns (e.g. because they are already 

528 normalised). 

529 :param unsupported: flag indicating whether normalisation of matched columns is unsupported (shall trigger an exception if 

530 attempted). Useful e.g. for preventing intermediate features that need further processing (like columns containing strings) from making 

531 their way into the final dataframe that will be normalised and used for training a model. 

532 :param transformer: a transformer instance (following the sklearn.preprocessing interface, e.g. StandardScaler) to apply to the matching column(s) 

533 for the case where a transformation is necessary (skip=False, unsupported=False). If None is given, either 

534 transformer_factory or the containing ``DFTNormalisation`` instance's default factory will be used when the normaliser is 

535 fitted. 

536 NOTE: Using a transformer_factory is usually preferred. Use an instance only if you want the 

537 same transformer instance to be used in multiple places - e.g. sharing it across several feature generators or models that 

538 use the same type of column with associated rule/rule template (disabling `fit` where appropriate). 

539 :param transformer_factory: a factory for the generation of the transformer instance, which will only be applied if 

540 `transformer` is not given; if neither `transformer` nor `transformer_factory` are given, the containing ``DFTNormalisation`` instance's default 

541 factory will be used. See :class:`SkLearnTransformerFactoryFactory` for convenient construction options. 

542 :param array_valued: whether the column values are not scalars but arrays (of some fixed but arbitrary length). 

543 It is assumed that all entries in such arrays are to be normalised in the same way, i.e. the same 

544 transformation will be applied to each entry in the array. 

545 Only a single matching column is supported for array_valued=True, i.e. the rule must apply to at most one column. 

546 :param fit: whether the rule's transformer shall be fitted. One use case for setting this to False is 

547 if a transformer instance is provided (instead of a factory), which is already fitted. 

548 :param independent_columns: whether, for the case where the rule matches multiple columns, the columns are independent and a 

549 separate transformation is to be learned for each of them (rather than using the same transformation for all columns and 

550 learning the transformation from the data of all columns). 

551 This parameter must be specified for rules matching more than one column, 

552 None is acceptable for rules matching a single column, in which case None, True, and False all have the same effect. 

553 """ 

554 # NOTE: keep in sync with Rule! 

555 if (skip or unsupported) and count_not_none(transformer, transformer_factory) > 0: 

556 raise ValueError("Passed transformer or transformer_factory while skip=True or unsupported=True") 

557 self.skip = skip 

558 self.unsupported = unsupported 

559 self.transformer = transformer 

560 self.transformerFactory = transformer_factory 

561 self.independentColumns = independent_columns 

562 self.arrayValued = array_valued 

563 self.fit = fit 

564 

565 def __setstate__(self, state): 

566 setstate(DFTNormalisation.RuleTemplate, self, state, new_default_properties=dict(arrayValued=False, fit=True)) 

567 

568 def to_rule(self, regex: Optional[Union[str, re.Pattern]]): 

569 """ 

570 Convert the template to a rule for all columns matching the regex 

571 

572 :param regex: a regular expression defining the column the rule applies to 

573 :return: the resulting Rule 

574 """ 

575 return DFTNormalisation.Rule(regex, skip=self.skip, unsupported=self.unsupported, transformer=self.transformer, 

576 transformer_factory=self.transformerFactory, independent_columns=self.independentColumns, array_valued=self.arrayValued, fit=self.fit) 

577 

578 def to_placeholder_rule(self): 

579 return self.to_rule(None) 

580 

581 class Rule(ToStringMixin): 

582 def __init__(self, 

583 regex: Optional[Union[str, re.Pattern]], 

584 skip: bool = False, 

585 unsupported: bool = False, 

586 transformer: Optional[SkLearnTransformerProtocol] = None, 

587 transformer_factory: Optional[Callable[[], SkLearnTransformerProtocol]] = None, 

588 array_valued: bool = False, 

589 fit: bool = True, 

590 independent_columns: Optional[bool] = None): 

591 """ 

592 Use the parameters as follows: 

593 

594 * If the relevant features are already normalised, pass ``skip=True`` 

595 * If the relevant features cannot be normalised (e.g. because they are categorical), pass ``unsupported=True`` 

596 * If the relevant features shall be normalised, the other parameters apply. 

597 No parameters other than regex, i.e. ``Rule(regex)``, are an option if ... 

598 

599 * a default transformer factory is specified in the :class:`DFTNormalisation` instance and its application 

600 is suitable for the relevant set of features. 

601 Otherwise, specify either ``transformer_factory`` or ``transformer``. 

602 * the resulting rule will match only a single column. Otherwise, ``independent_columns`` 

603 must be specified to True or False. 

604 

605 :param regex: a regular expression defining the column(s) the rule applies to. 

606 If it matches multiple columns, these columns will be normalised in the same way (using the same normalisation 

607 process for each column) unless independent_columns=True. 

608 If None, the rule is a placeholder rule and the regex must be set later via set_regex or the rule will not be applicable. 

609 :param skip: flag indicating whether no transformation shall be performed on the matched columns (e.g. because they are already 

610 normalised). 

611 :param unsupported: flag indicating whether normalisation of matched columns is unsupported (shall trigger an exception if 

612 attempted). Useful e.g. for preventing intermediate features that need further processing (like columns containing strings) from making 

613 their way into the final dataframe that will be normalised and used for training a model. 

614 :param transformer: a transformer instance (following the sklearn.preprocessing interface, e.g. StandardScaler) to apply to the matching column(s) 

615 for the case where a transformation is necessary (skip=False, unsupported=False). If None is given, either 

616 transformer_factory or the containing ``DFTNormalisation`` instance's default factory will be used when the normaliser is 

617 fitted. 

618 NOTE: Using a transformer_factory is usually preferred. Use an instance only if you want the 

619 same transformer instance to be used in multiple places - e.g. sharing it across several feature generators or models that 

620 use the same type of column with associated rule/rule template (disabling `fit` where appropriate). 

621 :param transformer_factory: a factory for the generation of the transformer instance, which will only be applied if 

622 `transformer` is not given; if neither `transformer` nor `transformer_factory` are given, the containing ``DFTNormalisation`` instance's default 

623 factory will be used. See :class:`SkLearnTransformerFactoryFactory` for convenient construction options. 

624 :param array_valued: whether the column values are not scalars but arrays (of some fixed but arbitrary length). 

625 It is assumed that all entries in such arrays are to be normalised in the same way, i.e. the same 

626 transformation will be applied to each entry in the array. 

627 Only a single matching column is supported for array_valued=True, i.e. the regex must match at most one column. 

628 :param fit: whether the rule's transformer shall be fitted. One use case for setting this to False is 

629 if a transformer instance is provided (instead of a factory), which is already fitted. 

630 :param independent_columns: whether, for the case where the rule matches multiple columns, the columns are independent and a 

631 separate transformation is to be learned for each of them (rather than using the same transformation for all columns and 

632 learning the transformation from the data of all columns). 

633 This parameter must be specified to for rules matching more than one column, 

634 None is acceptable for rules matching a single column, in which case None, True, and False all have the same effect. 

635 """ 

636 if (skip or unsupported) and count_not_none(transformer, transformer_factory) > 0: 

637 raise ValueError("Passed transformer or transformer_factory while skip=True or unsupported=True") 

638 if isinstance(regex, str): 

639 regex = re.compile(regex) 

640 self.regex = regex 

641 # NOTE: keep in sync with RuleTemplate! 

642 self.skip = skip 

643 self.unsupported = unsupported 

644 self.transformer = transformer 

645 self.transformerFactory = transformer_factory 

646 self.arrayValued = array_valued 

647 self.fit = fit 

648 self.independentColumns = independent_columns 

649 

650 def __setstate__(self, state): 

651 setstate(DFTNormalisation.Rule, self, state, new_default_properties=dict(arrayValued=False, fit=True, independentColumns=False, 

652 transformerFactory=None)) 

653 

654 def _tostring_excludes(self) -> List[str]: 

655 return super()._tostring_excludes() + ["regex"] 

656 

657 def _tostring_additional_entries(self) -> Dict[str, Any]: 

658 d = super()._tostring_additional_entries() 

659 if self.regex is not None: 

660 d["regex"] = f"'{self.regex.pattern}'" 

661 return d 

662 

663 def set_regex(self, regex: str): 

664 try: 

665 self.regex = re.compile(regex) 

666 except Exception as e: 

667 raise Exception(f"Could not compile regex '{regex}': {e}") 

668 

669 def matches(self, column: str): 

670 if self.regex is None: 

671 raise Exception("Attempted to apply a placeholder rule. Perhaps the feature generator from which the rule originated was " 

672 "never applied in order to have the rule instantiated.") 

673 return self.regex.fullmatch(column) is not None 

674 

675 def matching_columns(self, columns: Sequence[str]) -> List[str]: 

676 return [col for col in columns if self.matches(col)] 

677 

678 def __init__(self, rules: Sequence[Rule], default_transformer_factory: Optional[Callable[[], SkLearnTransformerProtocol]] = None, 

679 require_all_handled: bool = True, inplace: bool = False): 

680 """ 

681 :param rules: the set of rules; rules (i.e., their transformers) are always fitted and applied in the given order. 

682 A convenient way to obtain a set of rules in the :class:`sensai.vector_model.VectorModel` context is from a 

683 :class:`sensai.featuregen.FeatureCollector` or :class:`sensai.featuregen.MultiFeatureGenerator`. 

684 Generally, it is often a good idea to associate rules (or a rule template) with a feature generator. 

685 Then the rules can be obtained from it using `get_normalisation_rules`. 

686 :param default_transformer_factory: a factory for the creation of transformer instances (which implements the 

687 API used by sklearn.preprocessing, e.g. StandardScaler) that shall be used to create a transformer for all 

688 rules that do not specify a particular transformer. 

689 The default transformer will only be applied to columns matched by such rules, unmatched columns will 

690 not be transformed. 

691 Use :class:`SkLearnTransformerFactoryFactory` to conveniently create a factory. 

692 :param require_all_handled: whether to raise an exception if any column is not matched by a rule 

693 :param inplace: whether to apply data frame transformations in-place 

694 """ 

695 super().__init__() 

696 self.requireAllHandled = require_all_handled 

697 self.inplace = inplace 

698 self._userRules = rules 

699 self._defaultTransformerFactory = default_transformer_factory 

700 self._rules = None 

701 

702 def _tostring_additional_entries(self) -> Dict[str, Any]: 

703 d = super()._tostring_additional_entries() 

704 if self._rules is not None: 

705 d["rules"] = self._rules 

706 else: 

707 d["userRules"] = self._userRules 

708 return d 

709 

710 def _fit(self, df: pd.DataFrame): 

711 matched_rules_by_column = {} 

712 self._rules = [] 

713 # For rules matching multiple columns, if independent_columns is False, the columns 

714 # will be concatenated and treated as a single column for fitting the transformer. 

715 # Note that transformers follow sklearn interfaces, thus just passing an array 

716 # to them will learn a per-column-transformation. This will be the case for independent_columns=True. 

717 for rule in self._userRules: 

718 matching_columns = rule.matching_columns(df.columns) 

719 for c in matching_columns: 

720 if c in matched_rules_by_column: 

721 raise Exception(f"More than one rule applies to column '{c}': {matched_rules_by_column[c]}, {rule}") 

722 matched_rules_by_column[c] = rule 

723 

724 if len(matching_columns) > 0: 

725 if rule.unsupported: 

726 raise Exception(f"Normalisation of columns {matching_columns} is unsupported according to {rule}. " 

727 f"If you want to make use of these columns, transform them into a supported column before applying " 

728 f"{self.__class__.__name__}.") 

729 if not rule.skip: 

730 if rule.transformer is None: 

731 if rule.transformerFactory is not None: 

732 rule.transformer = rule.transformerFactory() 

733 else: 

734 if self._defaultTransformerFactory is None: 

735 raise Exception(f"No transformer to fit: {rule} defines no transformer and instance has no transformer " 

736 f"factory") 

737 rule.transformer = self._defaultTransformerFactory() 

738 if rule.fit: 

739 # fit transformer 

740 applicable_df = df[sorted(matching_columns)] 

741 if rule.arrayValued: 

742 if len(matching_columns) > 1: 

743 raise Exception(f"Array-valued case is only supported for a single column, " 

744 f"matched {matching_columns} for {rule}") 

745 values = np.concatenate(applicable_df.values.flatten()) 

746 values = values.reshape((len(values), 1)) 

747 elif rule.independentColumns: 

748 values = applicable_df.values 

749 else: 

750 values = applicable_df.values.flatten() 

751 values = values.reshape((len(values), 1)) 

752 rule.transformer.fit(values) 

753 else: 

754 log.log(logging.DEBUG - 1, f"{rule} matched no columns") 

755 

756 # collect specialised rule for application 

757 specialised_rule = copy.copy(rule) 

758 if not specialised_rule.skip and specialised_rule.independentColumns is None and len(matching_columns) > 1: 

759 raise ValueError(f"Normalisation rule matching multiple columns {matching_columns} must set `independentColumns` " 

760 f"(got None)") 

761 specialised_rule.set_regex(or_regex_group(matching_columns)) 

762 self._rules.append(specialised_rule) 

763 

764 def _check_unhandled_columns(self, df, matched_rules_by_column): 

765 if self.requireAllHandled: 

766 unhandled_columns = set(df.columns) - set(matched_rules_by_column.keys()) 

767 if len(unhandled_columns) > 0: 

768 raise Exception(f"The following columns are not handled by any rules: {unhandled_columns}; " 

769 f"rules: {', '.join(map(str, self._rules))}") 

770 

771 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

772 if not self.inplace: 

773 df = df.copy() 

774 matched_rules_by_column = {} 

775 for rule in self._rules: 

776 matching_columns = rule.matching_columns(df.columns) 

777 if len(matching_columns) == 0: 

778 continue 

779 for c in matching_columns: 

780 matched_rules_by_column[c] = rule 

781 if not rule.skip: 

782 if rule.independentColumns and not rule.arrayValued: 

783 matching_columns = sorted(matching_columns) 

784 df[matching_columns] = rule.transformer.transform(df[matching_columns].values) 

785 else: 

786 for c in matching_columns: 

787 if not rule.arrayValued: 

788 df[c] = rule.transformer.transform(df[[c]].values) 

789 else: 

790 df[c] = [rule.transformer.transform(np.array([x]).T)[:, 0] for x in df[c]] 

791 self._check_unhandled_columns(df, matched_rules_by_column) 

792 return df 

793 

794 def info(self): 

795 info = super().info() 

796 info["requireAllHandled"] = self.requireAllHandled 

797 info["inplace"] = self.inplace 

798 return info 

799 

800 def find_rule(self, col_name: str) -> "DFTNormalisation.Rule": 

801 for rule in self._rules: 

802 if rule.matches(col_name): 

803 return rule 

804 

805 

806class DFTFromColumnGenerators(RuleBasedDataFrameTransformer): 

807 """ 

808 Extends a data frame with columns generated from ColumnGenerator instances 

809 """ 

810 def __init__(self, column_generators: Sequence['ColumnGenerator'], inplace=False): 

811 super().__init__() 

812 self.columnGenerators = column_generators 

813 self.inplace = inplace 

814 

815 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

816 if not self.inplace: 

817 df = df.copy() 

818 for cg in self.columnGenerators: 

819 series = cg.generate_column(df) 

820 df[series.name] = series 

821 return df 

822 

823 def info(self): 

824 info = super().info() 

825 info["inplace"] = self.inplace 

826 return info 

827 

828 

829class DFTCountEntries(RuleBasedDataFrameTransformer): 

830 """ 

831 Transforms a data frame, based on one of its columns, into a new data frame containing two columns that indicate the counts 

832 of unique values in the input column. It is the "DataFrame output version" of pd.Series.value_counts. 

833 Each row of the output column holds a unique value of the input column and the number of times it appears in the input column. 

834 """ 

835 def __init__(self, column_for_entry_count: str, column_name_for_resulting_counts: str = "counts"): 

836 super().__init__() 

837 self.columnNameForResultingCounts = column_name_for_resulting_counts 

838 self.columnForEntryCount = column_for_entry_count 

839 

840 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

841 series = df[self.columnForEntryCount].value_counts() 

842 return pd.DataFrame({self.columnForEntryCount: series.index, self.columnNameForResultingCounts: series.values}) 

843 

844 def info(self): 

845 info = super().info() 

846 info["columnNameForResultingCounts"] = self.columnNameForResultingCounts 

847 info["columnForEntryCount"] = self.columnForEntryCount 

848 return info 

849 

850 

851class DFTAggregationOnColumn(RuleBasedDataFrameTransformer): 

852 def __init__(self, column_for_aggregation: str, aggregation: Callable): 

853 super().__init__() 

854 self.columnForAggregation = column_for_aggregation 

855 self.aggregation = aggregation 

856 

857 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

858 return df.groupby(self.columnForAggregation).agg(self.aggregation) 

859 

860 

861class DFTRoundFloats(RuleBasedDataFrameTransformer): 

862 def __init__(self, decimals=0): 

863 super().__init__() 

864 self.decimals = decimals 

865 

866 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

867 return pd.DataFrame(np.round(df.values, self.decimals), columns=df.columns, index=df.index) 

868 

869 def info(self): 

870 info = super().info() 

871 info["decimals"] = self.decimals 

872 return info 

873 

874 

875class DFTSkLearnTransformer(InvertibleDataFrameTransformer): 

876 """ 

877 Applies a transformer from sklearn.preprocessing to (a subset of) the columns of a data frame. 

878 If multiple columns are transformed, they are transformed independently (i.e. each column uses a separately trained transformation). 

879 """ 

880 def __init__(self, 

881 sklearn_transformer: SkLearnTransformerProtocol, 

882 columns: Optional[List[str]] = None, 

883 inplace=False, 

884 array_valued=False): 

885 """ 

886 :param sklearn_transformer: the transformer instance (from sklearn.preprocessing) to use (which will be fitted & applied) 

887 :param columns: the set of column names to which the transformation shall apply; if None, apply it to all columns 

888 :param inplace: whether to apply the transformation in-place 

889 :param array_valued: whether to apply transformation not to scalar-valued columns but to one or more array-valued columns, 

890 where the values of all arrays within a column (which may vary in length) are to be transformed in the same way. 

891 If multiple columns are transformed, then the arrays belonging to a single row must all have the same length. 

892 """ 

893 super().__init__() 

894 self.set_name(f"{self.__class__.__name__}_wrapped_{sklearn_transformer.__class__.__name__}") 

895 self.sklearnTransformer = sklearn_transformer 

896 self.columns = columns 

897 self.inplace = inplace 

898 self.arrayValued = array_valued 

899 

900 def __setstate__(self, state): 

901 state["arrayValued"] = state.get("arrayValued", False) 

902 setstate(DFTSkLearnTransformer, self, state) 

903 

904 def _fit(self, df: pd.DataFrame): 

905 cols = self.columns 

906 if cols is None: 

907 cols = df.columns 

908 if not self.arrayValued: 

909 values = df[cols].values 

910 else: 

911 if len(cols) == 1: 

912 values = np.concatenate(df[cols[0]].values.flatten()) 

913 values = values.reshape((len(values), 1)) 

914 else: 

915 flat_col_arrays = [np.concatenate(df[col].values.flatten()) for col in cols] 

916 lengths = [len(a) for a in flat_col_arrays] 

917 if len(set(lengths)) != 1: 

918 raise ValueError(f"Columns {cols} do not contain the same number of values: {lengths}") 

919 values = np.stack(flat_col_arrays, axis=1) 

920 self.sklearnTransformer.fit(values) 

921 

922 def _apply_transformer(self, df: pd.DataFrame, inverse: bool) -> pd.DataFrame: 

923 if not self.inplace: 

924 df = df.copy() 

925 cols = self.columns 

926 if cols is None: 

927 cols = df.columns 

928 transform = (lambda x: self.sklearnTransformer.inverse_transform(x)) if inverse else lambda x: self.sklearnTransformer.transform(x) 

929 if not self.arrayValued: 

930 df[cols] = transform(df[cols].values) 

931 else: 

932 if len(cols) == 1: 

933 c = cols[0] 

934 df[c] = [transform(np.array([x]).T)[:, 0] for x in df[c]] 

935 else: 

936 transformed_values = [transform(np.stack(row, axis=1)) for row in df.values] 

937 for iCol, col in enumerate(cols): 

938 df[col] = [row[:, iCol] for row in transformed_values] 

939 return df 

940 

941 def _apply(self, df): 

942 return self._apply_transformer(df, False) 

943 

944 def apply_inverse(self, df): 

945 return self._apply_transformer(df, True) 

946 

947 def info(self): 

948 info = super().info() 

949 info["columns"] = self.columns 

950 info["inplace"] = self.inplace 

951 info["sklearnTransformerClass"] = self.sklearnTransformer.__class__.__name__ 

952 return info 

953 

954 

955class DFTSortColumns(RuleBasedDataFrameTransformer): 

956 """ 

957 Sorts a data frame's columns in ascending order 

958 """ 

959 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

960 return df[sorted(df.columns)] 

961 

962 

963class DFTFillNA(RuleBasedDataFrameTransformer): 

964 """ 

965 Fills NA/NaN values with the given value 

966 """ 

967 def __init__(self, fill_value, inplace: bool = False): 

968 super().__init__() 

969 self.fillValue = fill_value 

970 self.inplace = inplace 

971 

972 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

973 if self.inplace: 

974 df.fillna(value=self.fillValue, inplace=True) 

975 return df 

976 else: 

977 return df.fillna(value=self.fillValue) 

978 

979 

980class DFTCastCategoricalColumns(RuleBasedDataFrameTransformer): 

981 """ 

982 Casts columns with dtype category to the given type. 

983 This can be useful in cases where categorical columns are not accepted by the model but the column values are actually numeric, 

984 in which case the cast to a numeric value yields an acceptable label encoding. 

985 """ 

986 def __init__(self, columns: Optional[List[str]] = None, dtype=float): 

987 """ 

988 :param columns: the columns to convert; if None, convert all that have dtype category 

989 :param dtype: the data type to which categorical columns are to be converted 

990 """ 

991 super().__init__() 

992 self.columns = columns 

993 self.dtype = dtype 

994 

995 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

996 df = df.copy() 

997 columns = self.columns if self.columns is not None else df.columns 

998 for col in columns: 

999 s = df[col] 

1000 if s.dtype.name == "category": 

1001 df[col] = s.astype(self.dtype) 

1002 return df 

1003 

1004 

1005class DFTDropNA(RuleBasedDataFrameTransformer): 

1006 """ 

1007 Drops rows or columns containing NA/NaN values 

1008 """ 

1009 def __init__(self, axis=0, inplace=False): 

1010 """ 

1011 :param axis: 0 to drop rows, 1 to drop columns containing an N/A value 

1012 :param inplace: whether to perform the operation in-place on the input data frame 

1013 """ 

1014 super().__init__() 

1015 self.axis = axis 

1016 self.inplace = inplace 

1017 

1018 def _apply(self, df: pd.DataFrame) -> pd.DataFrame: 

1019 if self.inplace: 

1020 df.dropna(axis=self.axis, inplace=True) 

1021 return df 

1022 else: 

1023 return df.dropna(axis=self.axis)