Source code for sensai.ensemble.ensemble_base
from abc import ABC, abstractmethod
from concurrent.futures.process import ProcessPoolExecutor
from typing import Sequence, List, Optional
from inspect import currentframe, getframeinfo
import pandas as pd
from ..vector_model import VectorModel
from ..util.multiprocessing import VectorModelWithSeparateFeatureGeneration
from ..util.pickle import PickleFailureDebugger
[docs]class EnsembleVectorModel(VectorModel, ABC):
def __init__(self, models: Sequence[VectorModel], num_processes=1):
"""
:param models:
:param num_processes:
"""
self.num_processes = num_processes
self.models = list(models)
super().__init__(check_input_columns=False)
def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None):
self._warn_sample_weights_unsupported(False, weights)
if self.num_processes == 1 or len(self.models) == 1:
for model in self.models:
model.fit(x, y)
return
fitted_model_futures = []
executor = ProcessPoolExecutor(max_workers=self.num_processes)
fitters = [VectorModelWithSeparateFeatureGeneration(model) for model in self.models]
for fitter in fitters:
intermediate_step = fitter.fit_start(x, y)
frame_info = getframeinfo(currentframe())
PickleFailureDebugger.log_failure_if_enabled(intermediate_step,
context_info=f"Submitting {fitter} in {frame_info.filename}:{frame_info.lineno}")
fitted_model_futures.append(executor.submit(intermediate_step.execute))
for i, fittedModelFuture in enumerate(fitted_model_futures):
self.models[i] = fitters[i].fit_end(fittedModelFuture.result())
[docs] def compute_all_predictions(self, x: pd.DataFrame):
if self.num_processes == 1 or len(self.models) == 1:
return [model.predict(x) for model in self.models]
prediction_futures = []
executor = ProcessPoolExecutor(max_workers=self.num_processes)
predictors = [VectorModelWithSeparateFeatureGeneration(model) for model in self.models]
for predictor in predictors:
predict_finaliser = predictor.predict_start(x)
frame_info = getframeinfo(currentframe())
PickleFailureDebugger.log_failure_if_enabled(predict_finaliser,
context_info=f"Submitting {predict_finaliser} in {frame_info.filename}:{frame_info.lineno}")
prediction_futures.append(executor.submit(predict_finaliser.execute))
return [predictionFuture.result() for predictionFuture in prediction_futures]
def _predict(self, x):
predictions_data_frames = self.compute_all_predictions(x)
return self.aggregate_predictions(predictions_data_frames)
[docs] @abstractmethod
def aggregate_predictions(self, predictions_data_frames: List[pd.DataFrame]) -> pd.DataFrame:
pass
[docs]class EnsembleRegressionVectorModel(EnsembleVectorModel, ABC):
[docs] def is_regression_model(self):
return True
[docs]class EnsembleClassificationVectorModel(EnsembleVectorModel, ABC):
[docs] def is_regression_model(self):
return False