diff --git a/fedot/api/api_utils/predefined_model.py b/fedot/api/api_utils/predefined_model.py index 1b50bd8d90..1c40b4444e 100644 --- a/fedot/api/api_utils/predefined_model.py +++ b/fedot/api/api_utils/predefined_model.py @@ -31,7 +31,11 @@ def _get_pipeline(self, use_input_preprocessing: bool = True) -> Pipeline: else: raise ValueError(f'{type(self.predefined_model)} is not supported as Fedot model') - verify_pipeline(pipelines, task_type=self.data.task.task_type, raise_on_failure=True) + # TODO: Workaround for AtomizedModel + if "atomized" in pipelines.descriptive_id: + self.log.message("Pipeline verification for AtomizedModel currently unavailable") + else: + verify_pipeline(pipelines, task_type=self.data.task.task_type, raise_on_failure=True) return pipelines diff --git a/fedot/api/main.py b/fedot/api/main.py index f389489acc..1fbcf02ca8 100644 --- a/fedot/api/main.py +++ b/fedot/api/main.py @@ -166,6 +166,13 @@ def fit(self, with fedot_composer_timer.launch_preprocessing(): self.train_data = self.data_processor.fit_transform(self.train_data) + # TODO: Workaround for AtomizedModel + init_asm = self.params.data.get('initial_assumption') + if predefined_model is None: + if isinstance(init_asm, Pipeline) and ("atomized" in init_asm.descriptive_id): + self.log.message('Composition for AtomizedModel currently unavailable') + predefined_model = init_asm + with fedot_composer_timer.launch_fitting(): if predefined_model is not None: # Fit predefined model and return it without composing diff --git a/test/integration/api/test_main_api.py b/test/integration/api/test_main_api.py index 01700842e1..f750b16bc9 100644 --- a/test/integration/api/test_main_api.py +++ b/test/integration/api/test_main_api.py @@ -11,8 +11,10 @@ from examples.simple.time_series_forecasting.ts_pipelines import ts_complex_ridge_smoothing_pipeline from fedot import Fedot +from fedot.core.operations.atomized_model import AtomizedModel from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline +from fedot.core.pipelines.pipeline_builder import PipelineBuilder from fedot.core.repository.tasks import TsForecastingParams from test.data.datasets import get_dataset, get_multimodal_ts_data, load_categorical_unimodal, \ load_categorical_multidata @@ -84,6 +86,42 @@ def test_api_tune_correct(task_type, metric_name, pred_model): assert len(test_data.target) == len(pred_before) == len(pred_after) +@pytest.mark.parametrize( + "task_type, metric_name, pred_model", + [ + ("classification", "f1", "dt"), + ("regression", "rmse", "dtreg"), + ], +) +def test_api_fit_atomized_model(task_type, metric_name, pred_model): + train_data, test_data, _ = get_dataset(task_type, n_samples=100, n_features=5, iris_dataset=False) + + auto_model = Fedot( + problem=task_type, + metric=metric_name, + **TESTS_MAIN_API_DEFAULT_PARAMS, + initial_assumption=PipelineBuilder().add_node("scaling").add_node(pred_model).build() + ) + + auto_model.fit(features=train_data) + pred_auto_model = auto_model.predict(features=test_data) + + prev_model = auto_model.current_pipeline + prev_model.unfit() + + atomized_model = Pipeline( + PipelineNode(operation_type=AtomizedModel(prev_model), nodes_from=[PipelineNode("normalization")]) + ) + + auto_model_from_atomized = Fedot( + problem=task_type, metric=metric_name, **TESTS_MAIN_API_DEFAULT_PARAMS, initial_assumption=atomized_model + ) + auto_model_from_atomized.fit(features=train_data) + pred_auto_model_from_atomized = auto_model_from_atomized.predict(features=test_data) + + assert len(test_data.target) == len(pred_auto_model) == len(pred_auto_model_from_atomized) + + def test_api_simple_ts_predict_correct(task_type: str = 'ts_forecasting'): # The forecast length must be equal to 5 forecast_length = 5