import logging
from copy import copy
from dataclasses import dataclass
from typing import Union, List, Callable
import matplotlib.pyplot as plt
import numpy as np
from sensai import VectorModel, InputOutputData, VectorClassificationModel, VectorRegressionModel
from sensai.data_transformation import DFTColumnFilter
from sensai.evaluation import VectorModelCrossValidatorParams, create_vector_model_cross_validator
from sensai.evaluation.metric_computation import MetricComputation
from sensai.feature_importance import FeatureImportanceProvider, AggregatedFeatureImportance
from sensai.util.plot import ScatterPlot
log = logging.getLogger(__name__)
[docs]@dataclass
class RFEStep:
metric_value: float
features: List[str]
[docs]class RFEResult:
def __init__(self, steps: List[RFEStep], metric_name: str, minimise: bool):
self.steps = steps
self.metric_name = metric_name
self.minimise = minimise
[docs] def get_sorted_steps(self) -> List[RFEStep]:
"""
:return: the elimination step results, sorted from best to worst
"""
return sorted(self.steps, key=lambda s: s.metric_value, reverse=not self.minimise)
[docs] def get_selected_features(self) -> List[str]:
return self.get_sorted_steps()[0].features
[docs] def get_num_features_array(self) -> np.ndarray:
"""
:return: array containing the number of features that was considered in each step
"""
return np.array([len(s.features) for s in self.steps])
[docs] def get_metric_values_array(self) -> np.ndarray:
"""
:return: array containing the metric value that resulted in each step
"""
return np.array([s.metric_value for s in self.steps])
[docs] def plot_metric_values(self) -> plt.Figure:
"""
Plots the metric values vs. the number of features for each step of the elimination
:return: the figure
"""
return ScatterPlot(self.get_num_features_array(), self.get_metric_values_array(), c_opacity=1, x_label="number of features",
y_label=f"cross-validation mean metric value ({self.metric_name})").fig
[docs]class RecursiveFeatureEliminationCV:
"""
Recursive feature elimination, using cross-validation to select the best set of features:
In each step, the model is first evaluated using cross-validation.
Then the feature importance values are aggregated across the models that were trained during cross-validation,
and the least important feature is discarded. For the case where the lowest feature importance is 0, all
features with 0 importance are discarded.
This process is repeated until a point is reached where only `minFeatures` (or less) remain.
The selected set of features is the one from the step where cross-validation yielded the best evaluation metric value.
Feature importance is computed at the level of model input features, i.e. after feature generation and transformation.
NOTE: This implementation differs markedly from sklearn's RFECV, which performs an independent RFE for each fold.
RFECV determines the number of features to use by determining the elimination step in each fold that yielded the best
metric value on average. Because the eliminations are independent, the actual features that were being used in those step
could have been completely different. Using the selected number of features n, RFECV then performs another RFE, eliminating features
until n features remain and returns these features as the result.
"""
def __init__(self, cross_validator_params: VectorModelCrossValidatorParams, min_features=1):
"""
:param cross_validator_params: the parameters for cross-validation
:param min_features: the smallest number of features that shall be evaluated during feature elimination
"""
if not cross_validator_params.returnTrainedModels:
raise ValueError("crossValidatorParams: returnTrainedModels is required to be enabled")
self.cross_validator_params = cross_validator_params
self.min_features = min_features
[docs] def run(self, model: Union[VectorModel, FeatureImportanceProvider], io_data: InputOutputData, metric_name: str,
minimise: bool, remove_input_preprocessors=False) -> RFEResult:
"""
Runs the optimisation for the given model and data.
:param model: the model
:param io_data: the data
:param metric_name: the metric to optimise
:param minimise: whether the metric shall be minimsed; if False, maximise.
:param remove_input_preprocessors: whether to remove input preprocessors from the model and create input data
only once during the entire experiment; this is usually reasonable only if all input preprocessors are not
trained on the input data or if, for any given data split/fold, the preprocessor learning outcome is likely
to be largely similar.
:return: a result object, which provides access to the selected features and data on all elimination steps
"""
metric_key = f"mean[{metric_name}]"
if remove_input_preprocessors:
model = copy(model)
model.fit_input_output_data(io_data, fit_preprocessors=True, fit_model=False)
inputs = model.compute_model_inputs(io_data.inputs)
model.remove_input_preprocessors()
io_data = InputOutputData(inputs, io_data.outputs)
features = list(inputs.columns)
else:
features = None # can only be obtained after having fitted the model initially (see below)
dft_column_filter = DFTColumnFilter()
model.with_feature_transformers(dft_column_filter, add=True)
steps = []
while True:
# evaluate model
cross_validator = create_vector_model_cross_validator(io_data, model=model, params=self.cross_validator_params)
cross_val_data = cross_validator.eval_model(model)
agg_metrics_dict = cross_val_data.get_eval_stats_collection().agg_metrics_dict()
metric_value = agg_metrics_dict[metric_key]
if features is None:
features = cross_val_data.trained_models[0].get_model_input_variable_names()
steps.append(RFEStep(metric_value=metric_value, features=features))
# eliminate feature(s)
log.info(f"Model performance with {len(features)} features: {metric_key}={metric_value}")
agg_importance = AggregatedFeatureImportance(*cross_val_data.trained_models)
fi = agg_importance.get_aggregated_feature_importance()
tuples = fi.get_sorted_tuples()
min_importance = tuples[0][1]
if min_importance == 0:
eliminated_features = []
for i, (fname, importance) in enumerate(tuples):
if importance > 0:
break
eliminated_features.append(fname)
log.info(f"Eliminating {len(eliminated_features)} features with 0 importance: {eliminated_features}")
else:
eliminated_features = [tuples[0][0]]
log.info(f"Eliminating feature {eliminated_features[0]}")
features = [f for f in features if f not in eliminated_features]
dft_column_filter.keep = features
log.info(f"{len(features)} features remain")
if len(features) < self.min_features:
log.info("Minimum number of features reached/exceeded")
break
return RFEResult(steps, metric_name, minimise)
[docs]class RecursiveFeatureElimination:
def __init__(self, metric_computation: MetricComputation, min_features=1):
"""
:param metric_computation: the method to apply for metric computation in order to determine which feature set is best
:param min_features: the smallest number of features that shall be evaluated during feature elimination
"""
self.metric_computation = metric_computation
self.min_features = min_features
[docs] def run(self, model_factory: Callable[[], Union[VectorRegressionModel, VectorClassificationModel]], minimise: bool) -> RFEResult:
"""
Runs the optimisation for the given model and data.
:param model_factory: factory for the model to be evaluated
:param minimise: whether the metric shall be minimised; if False, maximise.
:return: a result object, which provides access to the selected features and data on all elimination steps
"""
features = None # can only be obtained after having fitted the model initially (see below)
dft_column_filter = DFTColumnFilter() # kept features will be adapted in the loop below; added to each evaluated model
steps = []
while True:
def create_model():
return model_factory().with_feature_transformers(dft_column_filter, add=True)
# compute metric
metric_computation_result = self.metric_computation.compute_metric_value(create_model)
metric_value = metric_computation_result.metric_value
if features is None:
# noinspection PyTypeChecker
model: VectorModel = metric_computation_result.models[0]
features = model.get_model_input_variable_names()
steps.append(RFEStep(metric_value=metric_value, features=features))
# eliminate feature(s)
log.info(f"Model performance with {len(features)} features: metric={metric_value}")
agg_importance = AggregatedFeatureImportance(*metric_computation_result.models)
fi = agg_importance.get_aggregated_feature_importance()
tuples = fi.get_sorted_tuples()
min_importance = tuples[0][1]
if min_importance == 0:
eliminated_features = []
for i, (fname, importance) in enumerate(tuples):
if importance > 0:
break
eliminated_features.append(fname)
log.info(f"Eliminating {len(eliminated_features)} features with 0 importance: {eliminated_features}")
else:
eliminated_features = [tuples[0][0]]
log.info(f"Eliminating feature {eliminated_features[0]}")
features = [f for f in features if f not in eliminated_features]
dft_column_filter.keep = features
log.info(f"{len(features)} features remain")
if len(features) < self.min_features:
log.info("Minimum number of features reached/exceeded")
break
return RFEResult(steps, self.metric_computation.metric.name, minimise)