diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index 5b48045420..5fba8f6d44 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -1,7 +1,10 @@ -from copy import deepcopy +from collections import Counter from datetime import timedelta -from typing import Callable, Union, Optional +from functools import reduce +from operator import and_, or_ +from typing import Callable, Union, Optional, Set, List, Any, Dict +from fedot.core.pipelines.node import PipelineNode from golem.core.tuning.simultaneous import SimultaneousTuner from fedot.core.data.data import InputData, OutputData @@ -11,7 +14,6 @@ from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder from fedot.core.repository.operation_types_repository import OperationMetaInfo, \ atomized_model_type -from fedot.core.utils import make_pipeline_generator class AtomizedModel(Operation): @@ -23,32 +25,35 @@ def __init__(self, pipeline: 'Pipeline'): super().__init__(operation_type=atomized_model_type()) self.pipeline = pipeline - self.unique_id = self.pipeline.root_node.descriptive_id - def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData): - - copied_input_data = deepcopy(data) - predicted_train = self.pipeline.fit(input_data=copied_input_data) + def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData) -> ('Pipeline', OutputData): + predicted_train = self.pipeline.fit(input_data=data) fitted_atomized_operation = self.pipeline - return fitted_atomized_operation, predicted_train - def predict(self, fitted_operation, data: InputData, - params: Optional[Union[OperationParameters, dict]] = None, output_mode: str = 'default'): + def predict(self, + fitted_operation: 'Pipeline', + data: InputData, + params: Optional[Union[OperationParameters, Dict[str, Any]]] = None, + output_mode: str = 'default') -> OutputData: # Preprocessing applied - copied_input_data = deepcopy(data) - prediction = fitted_operation.predict(input_data=copied_input_data, output_mode=output_mode) + prediction = fitted_operation.predict(input_data=data, output_mode=output_mode) prediction = self.assign_tabular_column_types(prediction, output_mode) return prediction - def predict_for_fit(self, fitted_operation, data: InputData, params: Optional[OperationParameters] = None, - output_mode: str = 'default'): + def predict_for_fit(self, + fitted_operation: 'Pipeline', + data: InputData, + params: Optional[OperationParameters] = None, + output_mode: str = 'default') -> OutputData: return self.predict(fitted_operation, data, params, output_mode) - def fine_tune(self, metric_function: Callable, - input_data: InputData = None, iterations: int = 50, - timeout: int = 5): + def fine_tune(self, + metric_function: Callable, + input_data: Optional[InputData] = None, + iterations: int = 50, + timeout: int = 5) -> 'AtomizedModel': """ Method for tuning hyperparameters """ tuner = TunerBuilder(input_data.task)\ .with_tuner(SimultaneousTuner)\ @@ -62,40 +67,51 @@ def fine_tune(self, metric_function: Callable, @property def metadata(self) -> OperationMetaInfo: - generator = make_pipeline_generator(self.pipeline) - tags = set() - - for node in generator: - tags.update(node.operation_tags) - root_node = self.pipeline.root_node - supported_strategies = None - allowed_positions = ['any'] - tags = list(tags) - - operation_info = OperationMetaInfo(root_node.operation.supplementary_data.id, - root_node.operation.supplementary_data.input_types, - root_node.operation.supplementary_data.output_types, - root_node.operation.supplementary_data.task_type, - supported_strategies, allowed_positions, - tags) + + def extract_metadata_from_pipeline(attr_name: str, + node_filter: Optional[Callable[[PipelineNode], bool]] = None, + reduce_function: Optional[Callable[[Set], Set]] = None) -> List[Any]: + """ Extract metadata from atomized pipeline + :param attr_name: extracting metadata property + :param node_filter: return True for nodes with extracting metadata + :param reduce_function: function is used for combining extracted + metadata in ``reduce`` function + :return: list with extracted metadata + """ + nodes_to_extract_metadata = self.pipeline.nodes + if node_filter is not None: + nodes_to_extract_metadata = [node for node in nodes_to_extract_metadata if node_filter(node)] + data = [set(getattr(node.operation.metadata, attr_name)) for node in nodes_to_extract_metadata] + return list(reduce(reduce_function or or_, data)) + + tags = extract_metadata_from_pipeline('tags') + input_types = extract_metadata_from_pipeline('input_types', + node_filter=lambda node: node.is_primary, + reduce_function=and_) + output_types = root_node.operation.metadata.output_types + presets = extract_metadata_from_pipeline('presets') + + operation_info = OperationMetaInfo(id=root_node.operation.metadata.id, + input_types=input_types, + output_types=output_types, + task_type=root_node.operation.metadata.task_type, + supported_strategies=None, + allowed_positions=['any'], + tags=tags, + presets=presets) return operation_info - def description(self, operation_params: Optional[dict]): + def description(self, operation_params: Optional[dict] = None) -> str: operation_type = self.operation_type operation_length = self.pipeline.length operation_depth = self.pipeline.depth - operation_id = self.unique_id - operation_types = {} - - for node in self.pipeline.nodes: - if node.operation.operation_type in operation_types: - operation_types[node.operation.operation_type] += 1 - else: - operation_types[node.operation.operation_type] = 1 - + operation_id = self.pipeline.root_node.descriptive_id + operation_types = map(lambda node: node.operation.operation_type, + self.pipeline.nodes) + operation_types_dict = dict(Counter(operation_types)) return f'{operation_type}_length:{operation_length}_depth:{operation_depth}' \ - f'_types:{operation_types}_id:{operation_id}' + f'_types:{operation_types_dict}_id:{operation_id}' @staticmethod def assign_tabular_column_types(output_data: OutputData, diff --git a/fedot/core/utils.py b/fedot/core/utils.py index 044e5b2446..cb45396295 100644 --- a/fedot/core/utils.py +++ b/fedot/core/utils.py @@ -75,15 +75,6 @@ def ensure_directory_exists(dir_names: list): os.mkdir(dataset_dir) -def make_pipeline_generator(pipeline): - visited_nodes = [] - - for node in pipeline.nodes: - if node not in visited_nodes: - visited_nodes.append(node) - yield node - - def set_random_seed(seed: Optional[int]): """ Sets random seed for evaluation of models""" if seed is not None: diff --git a/test/integration/models/test_atomized_model.py b/test/integration/models/test_atomized_model.py index d881ddd0e7..d66eb3347a 100644 --- a/test/integration/models/test_atomized_model.py +++ b/test/integration/models/test_atomized_model.py @@ -1,5 +1,6 @@ import json import os +from functools import reduce import numpy as np import pytest @@ -94,6 +95,11 @@ def create_pipeline_with_several_nested_atomized_model() -> Pipeline: return pipeline +def get_some_atomized_nodes(): + pipeline = create_pipeline_with_several_nested_atomized_model() + return [node for node in pipeline.nodes if isinstance(node.operation, AtomizedModel)] + + def create_input_data(): train_file_path = os.path.join('test', 'data', 'scoring', 'scoring_train.csv') test_file_path = os.path.join('test', 'data', 'scoring', 'scoring_test.csv') @@ -106,6 +112,36 @@ def create_input_data(): return train_data, test_data +@pytest.mark.parametrize('atomized_node', get_some_atomized_nodes()) +def test_atomized_model_metadata(atomized_node): + pipeline = atomized_node.operation.pipeline + + # check input types, it should be union of input types of primary nodes + input_types = reduce(lambda types, input_types: types & set(input_types), + [set(node.operation.metadata.input_types) for node in pipeline.primary_nodes]) + assert input_types == set(atomized_node.operation.metadata.input_types) + + # check output types, it should be output types of root node + output_types = set(pipeline.root_node.operation.metadata.output_types) + assert output_types == set(atomized_node.operation.metadata.output_types) + + # check tags, it should be union of tags of pipeline nodes + tags = reduce(lambda types, node: types | set(node.operation.metadata.tags), + pipeline.nodes, + set()) + assert tags == set(atomized_node.operation.metadata.tags) + + # check presets, it should be union of presets of pipeline nodes + presets = reduce(lambda types, node: types | set(node.operation.metadata.presets), + pipeline.nodes, + set()) + assert presets == set(atomized_node.operation.metadata.presets) + + # check task_type + task_type = set(pipeline.root_node.operation.metadata.task_type) + assert task_type == set(atomized_node.operation.metadata.task_type) + + def test_save_load_atomized_pipeline_correctly(): pipeline = create_pipeline_with_several_nested_atomized_model()