From 53a76b8ffaf9e94167f606c83a128edb4cf059d9 Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Wed, 29 Nov 2023 15:14:19 +0300 Subject: [PATCH 01/10] fix --- fedot/core/operations/atomized_model.py | 55 ++++++++++--------- fedot/core/utils.py | 7 +-- .../integration/models/test_atomized_model.py | 36 +++++++++++- 3 files changed, 67 insertions(+), 31 deletions(-) diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index 5b48045420..d6b06c26c0 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -1,5 +1,7 @@ from copy import deepcopy from datetime import timedelta +from functools import reduce +from itertools import chain from typing import Callable, Union, Optional from golem.core.tuning.simultaneous import SimultaneousTuner @@ -23,22 +25,18 @@ def __init__(self, pipeline: 'Pipeline'): super().__init__(operation_type=atomized_model_type()) self.pipeline = pipeline - self.unique_id = self.pipeline.root_node.descriptive_id def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData): - - copied_input_data = deepcopy(data) - predicted_train = self.pipeline.fit(input_data=copied_input_data) + predicted_train = self.pipeline.fit(input_data=data) fitted_atomized_operation = self.pipeline - return fitted_atomized_operation, predicted_train def predict(self, fitted_operation, data: InputData, - params: Optional[Union[OperationParameters, dict]] = None, output_mode: str = 'default'): + params: Optional[Union[OperationParameters, dict]] = None, + output_mode: str = 'default'): # Preprocessing applied - copied_input_data = deepcopy(data) - prediction = fitted_operation.predict(input_data=copied_input_data, output_mode=output_mode) + prediction = fitted_operation.predict(input_data=data, output_mode=output_mode) prediction = self.assign_tabular_column_types(prediction, output_mode) return prediction @@ -62,30 +60,35 @@ def fine_tune(self, metric_function: Callable, @property def metadata(self) -> OperationMetaInfo: - generator = make_pipeline_generator(self.pipeline) - tags = set() - - for node in generator: - tags.update(node.operation_tags) - root_node = self.pipeline.root_node - supported_strategies = None - allowed_positions = ['any'] - tags = list(tags) - - operation_info = OperationMetaInfo(root_node.operation.supplementary_data.id, - root_node.operation.supplementary_data.input_types, - root_node.operation.supplementary_data.output_types, - root_node.operation.supplementary_data.task_type, - supported_strategies, allowed_positions, - tags) + + def extract_metadata_from_pipeline(attr_name: str, node_filter: Optional[Callable] = None): + nodes_to_extract_metadata = make_pipeline_generator(self.pipeline) + if node_filter is not None: + nodes_to_extract_metadata = [node for node in nodes_to_extract_metadata if node_filter(node)] + data = [getattr(node.operation.metadata, attr_name) for node in nodes_to_extract_metadata] + return list(set(chain(*data))) + + tags = extract_metadata_from_pipeline('tags') + input_types = extract_metadata_from_pipeline('input_types', node_filter=lambda node: node.is_primary) + output_types = root_node.operation.metadata.output_types + presets = extract_metadata_from_pipeline('presets') + + operation_info = OperationMetaInfo(id=root_node.operation.metadata.id, + input_types=input_types, + output_types=output_types, + task_type=root_node.operation.metadata.task_type, + supported_strategies=None, + allowed_positions=['any'], + tags=tags, + presets=presets) return operation_info def description(self, operation_params: Optional[dict]): operation_type = self.operation_type operation_length = self.pipeline.length operation_depth = self.pipeline.depth - operation_id = self.unique_id + operation_id = self.pipeline.root_node.descriptive_id operation_types = {} for node in self.pipeline.nodes: @@ -101,4 +104,4 @@ def description(self, operation_params: Optional[dict]): def assign_tabular_column_types(output_data: OutputData, output_mode: str) -> OutputData: """ There is no need to perform any column types determination for nested pipelines """ - return output_data + return output_data \ No newline at end of file diff --git a/fedot/core/utils.py b/fedot/core/utils.py index 044e5b2446..3d5fc07ecf 100644 --- a/fedot/core/utils.py +++ b/fedot/core/utils.py @@ -76,11 +76,10 @@ def ensure_directory_exists(dir_names: list): def make_pipeline_generator(pipeline): - visited_nodes = [] - + visited_nodes = set() for node in pipeline.nodes: - if node not in visited_nodes: - visited_nodes.append(node) + if node.uid not in visited_nodes: + visited_nodes.add(node.uid) yield node diff --git a/test/integration/models/test_atomized_model.py b/test/integration/models/test_atomized_model.py index d881ddd0e7..65f0260983 100644 --- a/test/integration/models/test_atomized_model.py +++ b/test/integration/models/test_atomized_model.py @@ -1,5 +1,6 @@ import json import os +from functools import reduce import numpy as np import pytest @@ -8,9 +9,12 @@ from fedot.core.composer.metrics import RMSE from fedot.core.data.data import InputData from fedot.core.operations.atomized_model import AtomizedModel +from fedot.core.pipelines.adapters import PipelineAdapter from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline -from fedot.core.utils import fedot_project_root +from fedot.core.pipelines.random_pipeline_factory import RandomPipelineFactory +from fedot.core.utils import fedot_project_root, make_pipeline_generator +from golem.core.dag.graph_verifier import GraphVerifier from test.integration.utilities.test_pipeline_import_export import create_correct_path, create_func_delete_files @@ -94,6 +98,11 @@ def create_pipeline_with_several_nested_atomized_model() -> Pipeline: return pipeline +def get_some_atomized_nodes(): + pipeline = create_pipeline_with_several_nested_atomized_model() + return [node for node in make_pipeline_generator(pipeline) if isinstance(node.operation, AtomizedModel)] + + def create_input_data(): train_file_path = os.path.join('test', 'data', 'scoring', 'scoring_train.csv') test_file_path = os.path.join('test', 'data', 'scoring', 'scoring_test.csv') @@ -106,6 +115,31 @@ def create_input_data(): return train_data, test_data +def test_atomized_model_metadata(): + for atomized_node in get_some_atomized_nodes(): + pipeline = atomized_node.operation.pipeline + input_types = reduce(lambda types, node: types | set(node.operation.metadata.input_types), + pipeline.primary_nodes, + set()) + assert input_types == set(atomized_node.operation.metadata.input_types) + + output_types = set(pipeline.root_node.operation.metadata.output_types) + assert output_types == set(atomized_node.operation.metadata.output_types) + + tags = reduce(lambda types, node: types | set(node.operation.metadata.tags), + pipeline.nodes, + set()) + assert tags == set(atomized_node.operation.metadata.tags) + + presets = reduce(lambda types, node: types | set(node.operation.metadata.presets), + pipeline.nodes, + set()) + assert presets == set(atomized_node.operation.metadata.presets) + + task_type = set(pipeline.root_node.operation.metadata.task_type) + assert task_type == set(atomized_node.operation.metadata.task_type) + + def test_save_load_atomized_pipeline_correctly(): pipeline = create_pipeline_with_several_nested_atomized_model() From 195085937d1dc7caf439139d00fdaf6582a26cdd Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Wed, 29 Nov 2023 15:24:00 +0300 Subject: [PATCH 02/10] pep8 and small fix --- fedot/core/operations/atomized_model.py | 16 +++++----------- test/integration/models/test_atomized_model.py | 5 ++--- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index d6b06c26c0..abd845451d 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -1,6 +1,5 @@ -from copy import deepcopy +from collections import Counter from datetime import timedelta -from functools import reduce from itertools import chain from typing import Callable, Union, Optional @@ -89,16 +88,11 @@ def description(self, operation_params: Optional[dict]): operation_length = self.pipeline.length operation_depth = self.pipeline.depth operation_id = self.pipeline.root_node.descriptive_id - operation_types = {} - - for node in self.pipeline.nodes: - if node.operation.operation_type in operation_types: - operation_types[node.operation.operation_type] += 1 - else: - operation_types[node.operation.operation_type] = 1 - + operation_types = map(lambda node: node.operation.operation_type, + make_pipeline_generator(self.pipeline)) + operation_types_dict = dict(Counter(operation_types)) return f'{operation_type}_length:{operation_length}_depth:{operation_depth}' \ - f'_types:{operation_types}_id:{operation_id}' + f'_types:{operation_types_dict}_id:{operation_id}' @staticmethod def assign_tabular_column_types(output_data: OutputData, diff --git a/test/integration/models/test_atomized_model.py b/test/integration/models/test_atomized_model.py index 65f0260983..4bfe848525 100644 --- a/test/integration/models/test_atomized_model.py +++ b/test/integration/models/test_atomized_model.py @@ -9,12 +9,9 @@ from fedot.core.composer.metrics import RMSE from fedot.core.data.data import InputData from fedot.core.operations.atomized_model import AtomizedModel -from fedot.core.pipelines.adapters import PipelineAdapter from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline -from fedot.core.pipelines.random_pipeline_factory import RandomPipelineFactory from fedot.core.utils import fedot_project_root, make_pipeline_generator -from golem.core.dag.graph_verifier import GraphVerifier from test.integration.utilities.test_pipeline_import_export import create_correct_path, create_func_delete_files @@ -136,6 +133,8 @@ def test_atomized_model_metadata(): set()) assert presets == set(atomized_node.operation.metadata.presets) + atomized_node.operation.description(None) + task_type = set(pipeline.root_node.operation.metadata.task_type) assert task_type == set(atomized_node.operation.metadata.task_type) From 824fa692a3fda4b810aeae472309706cc98a9f9c Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Wed, 29 Nov 2023 15:26:11 +0300 Subject: [PATCH 03/10] pep8 --- fedot/core/operations/atomized_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index abd845451d..89e16534ca 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -98,4 +98,4 @@ def description(self, operation_params: Optional[dict]): def assign_tabular_column_types(output_data: OutputData, output_mode: str) -> OutputData: """ There is no need to perform any column types determination for nested pipelines """ - return output_data \ No newline at end of file + return output_data From 4cae937b393ff7cf87c2c292f982e04a6a2943a7 Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Wed, 29 Nov 2023 17:56:37 +0300 Subject: [PATCH 04/10] remove old function --- fedot/core/operations/atomized_model.py | 5 ++--- fedot/core/utils.py | 8 -------- test/integration/models/test_atomized_model.py | 4 ++-- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index 89e16534ca..f39f5062c4 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -12,7 +12,6 @@ from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder from fedot.core.repository.operation_types_repository import OperationMetaInfo, \ atomized_model_type -from fedot.core.utils import make_pipeline_generator class AtomizedModel(Operation): @@ -62,7 +61,7 @@ def metadata(self) -> OperationMetaInfo: root_node = self.pipeline.root_node def extract_metadata_from_pipeline(attr_name: str, node_filter: Optional[Callable] = None): - nodes_to_extract_metadata = make_pipeline_generator(self.pipeline) + nodes_to_extract_metadata = self.pipeline.nodes if node_filter is not None: nodes_to_extract_metadata = [node for node in nodes_to_extract_metadata if node_filter(node)] data = [getattr(node.operation.metadata, attr_name) for node in nodes_to_extract_metadata] @@ -89,7 +88,7 @@ def description(self, operation_params: Optional[dict]): operation_depth = self.pipeline.depth operation_id = self.pipeline.root_node.descriptive_id operation_types = map(lambda node: node.operation.operation_type, - make_pipeline_generator(self.pipeline)) + self.pipeline.nodes) operation_types_dict = dict(Counter(operation_types)) return f'{operation_type}_length:{operation_length}_depth:{operation_depth}' \ f'_types:{operation_types_dict}_id:{operation_id}' diff --git a/fedot/core/utils.py b/fedot/core/utils.py index 3d5fc07ecf..cb45396295 100644 --- a/fedot/core/utils.py +++ b/fedot/core/utils.py @@ -75,14 +75,6 @@ def ensure_directory_exists(dir_names: list): os.mkdir(dataset_dir) -def make_pipeline_generator(pipeline): - visited_nodes = set() - for node in pipeline.nodes: - if node.uid not in visited_nodes: - visited_nodes.add(node.uid) - yield node - - def set_random_seed(seed: Optional[int]): """ Sets random seed for evaluation of models""" if seed is not None: diff --git a/test/integration/models/test_atomized_model.py b/test/integration/models/test_atomized_model.py index 4bfe848525..528703eb0d 100644 --- a/test/integration/models/test_atomized_model.py +++ b/test/integration/models/test_atomized_model.py @@ -11,7 +11,7 @@ from fedot.core.operations.atomized_model import AtomizedModel from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline -from fedot.core.utils import fedot_project_root, make_pipeline_generator +from fedot.core.utils import fedot_project_root from test.integration.utilities.test_pipeline_import_export import create_correct_path, create_func_delete_files @@ -97,7 +97,7 @@ def create_pipeline_with_several_nested_atomized_model() -> Pipeline: def get_some_atomized_nodes(): pipeline = create_pipeline_with_several_nested_atomized_model() - return [node for node in make_pipeline_generator(pipeline) if isinstance(node.operation, AtomizedModel)] + return [node for node in pipeline.nodes if isinstance(node.operation, AtomizedModel)] def create_input_data(): From b25b18fb28521b88794b86420fe578c5cb860e5b Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Fri, 1 Dec 2023 14:21:10 +0300 Subject: [PATCH 05/10] fix typing hints --- fedot/core/operations/atomized_model.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index f39f5062c4..df514d8c2a 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -24,27 +24,34 @@ def __init__(self, pipeline: 'Pipeline'): super().__init__(operation_type=atomized_model_type()) self.pipeline = pipeline - def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData): + def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData) -> ('Pipeline', OutputData): predicted_train = self.pipeline.fit(input_data=data) fitted_atomized_operation = self.pipeline return fitted_atomized_operation, predicted_train - def predict(self, fitted_operation, data: InputData, + def predict(self, + fitted_operation: 'Pipeline', + data: InputData, params: Optional[Union[OperationParameters, dict]] = None, - output_mode: str = 'default'): + output_mode: str = 'default') -> OutputData: # Preprocessing applied prediction = fitted_operation.predict(input_data=data, output_mode=output_mode) prediction = self.assign_tabular_column_types(prediction, output_mode) return prediction - def predict_for_fit(self, fitted_operation, data: InputData, params: Optional[OperationParameters] = None, - output_mode: str = 'default'): + def predict_for_fit(self, + fitted_operation: 'Pipeline', + data: InputData, + params: Optional[OperationParameters] = None, + output_mode: str = 'default') -> OutputData: return self.predict(fitted_operation, data, params, output_mode) - def fine_tune(self, metric_function: Callable, - input_data: InputData = None, iterations: int = 50, - timeout: int = 5): + def fine_tune(self, + metric_function: Callable, + input_data: Optional[InputData] = None, + iterations: int = 50, + timeout: int = 5) -> 'AtomizedModel': """ Method for tuning hyperparameters """ tuner = TunerBuilder(input_data.task)\ .with_tuner(SimultaneousTuner)\ @@ -82,7 +89,7 @@ def extract_metadata_from_pipeline(attr_name: str, node_filter: Optional[Callabl presets=presets) return operation_info - def description(self, operation_params: Optional[dict]): + def description(self, operation_params: Optional[dict] = None) -> str: operation_type = self.operation_type operation_length = self.pipeline.length operation_depth = self.pipeline.depth From 6d0784c79099a3b66da29202fdd551de70c567c4 Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Fri, 1 Dec 2023 14:26:56 +0300 Subject: [PATCH 06/10] fix test --- .../integration/models/test_atomized_model.py | 52 ++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/test/integration/models/test_atomized_model.py b/test/integration/models/test_atomized_model.py index 528703eb0d..92c19ecf16 100644 --- a/test/integration/models/test_atomized_model.py +++ b/test/integration/models/test_atomized_model.py @@ -112,31 +112,35 @@ def create_input_data(): return train_data, test_data -def test_atomized_model_metadata(): - for atomized_node in get_some_atomized_nodes(): - pipeline = atomized_node.operation.pipeline - input_types = reduce(lambda types, node: types | set(node.operation.metadata.input_types), - pipeline.primary_nodes, - set()) - assert input_types == set(atomized_node.operation.metadata.input_types) - - output_types = set(pipeline.root_node.operation.metadata.output_types) - assert output_types == set(atomized_node.operation.metadata.output_types) - - tags = reduce(lambda types, node: types | set(node.operation.metadata.tags), - pipeline.nodes, - set()) - assert tags == set(atomized_node.operation.metadata.tags) - - presets = reduce(lambda types, node: types | set(node.operation.metadata.presets), - pipeline.nodes, - set()) - assert presets == set(atomized_node.operation.metadata.presets) - - atomized_node.operation.description(None) +@pytest.mark.parametrize('atomized_node', get_some_atomized_nodes()) +def test_atomized_model_metadata(atomized_node): + pipeline = atomized_node.operation.pipeline - task_type = set(pipeline.root_node.operation.metadata.task_type) - assert task_type == set(atomized_node.operation.metadata.task_type) + # check input types, it should be union of input types of primary nodes + input_types = reduce(lambda types, node: types | set(node.operation.metadata.input_types), + pipeline.primary_nodes, + set()) + assert input_types == set(atomized_node.operation.metadata.input_types) + + # check output types, it should be output types of root node + output_types = set(pipeline.root_node.operation.metadata.output_types) + assert output_types == set(atomized_node.operation.metadata.output_types) + + # check tags, it should be union of tags of pipeline nodes + tags = reduce(lambda types, node: types | set(node.operation.metadata.tags), + pipeline.nodes, + set()) + assert tags == set(atomized_node.operation.metadata.tags) + + # check presets, it should be union of presets of pipeline nodes + presets = reduce(lambda types, node: types | set(node.operation.metadata.presets), + pipeline.nodes, + set()) + assert presets == set(atomized_node.operation.metadata.presets) + + # check task_type + task_type = set(pipeline.root_node.operation.metadata.task_type) + assert task_type == set(atomized_node.operation.metadata.task_type) def test_save_load_atomized_pipeline_correctly(): From cbeb7b77fbca4e9730a1e3a10556d169036630d4 Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Mon, 4 Dec 2023 10:47:24 +0300 Subject: [PATCH 07/10] fix extract_metadata_from_pipeline --- fedot/core/operations/atomized_model.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index df514d8c2a..005ef0b732 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -1,8 +1,10 @@ from collections import Counter from datetime import timedelta +from functools import reduce from itertools import chain -from typing import Callable, Union, Optional +from typing import Callable, Union, Optional, Set, List +from fedot.core.pipelines.node import PipelineNode from golem.core.tuning.simultaneous import SimultaneousTuner from fedot.core.data.data import InputData, OutputData @@ -67,15 +69,21 @@ def fine_tune(self, def metadata(self) -> OperationMetaInfo: root_node = self.pipeline.root_node - def extract_metadata_from_pipeline(attr_name: str, node_filter: Optional[Callable] = None): + def extract_metadata_from_pipeline(attr_name: str, + node_filter: Optional[Callable[[PipelineNode], bool]] = None, + reduce_function: Optional[Callable[[List[Set]], Set]] = None): nodes_to_extract_metadata = self.pipeline.nodes if node_filter is not None: nodes_to_extract_metadata = [node for node in nodes_to_extract_metadata if node_filter(node)] - data = [getattr(node.operation.metadata, attr_name) for node in nodes_to_extract_metadata] - return list(set(chain(*data))) + data = [set(getattr(node.operation.metadata, attr_name)) for node in nodes_to_extract_metadata] + if reduce_function is None: + reduce_function = lambda x, y: x | y + return list(reduce(reduce_function, data)) tags = extract_metadata_from_pipeline('tags') - input_types = extract_metadata_from_pipeline('input_types', node_filter=lambda node: node.is_primary) + input_types = extract_metadata_from_pipeline('input_types', + node_filter=lambda node: node.is_primary, + reduce_function=lambda x, y: x & y) output_types = root_node.operation.metadata.output_types presets = extract_metadata_from_pipeline('presets') From 0d757dcb369707bde4bebbce30c04b622dd5b3b4 Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Mon, 4 Dec 2023 10:48:49 +0300 Subject: [PATCH 08/10] fix atomized model type hint --- fedot/core/operations/atomized_model.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index 005ef0b732..06a01e5b46 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -2,7 +2,7 @@ from datetime import timedelta from functools import reduce from itertools import chain -from typing import Callable, Union, Optional, Set, List +from typing import Callable, Union, Optional, Set, List, Any, Dict from fedot.core.pipelines.node import PipelineNode from golem.core.tuning.simultaneous import SimultaneousTuner @@ -34,7 +34,7 @@ def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputDat def predict(self, fitted_operation: 'Pipeline', data: InputData, - params: Optional[Union[OperationParameters, dict]] = None, + params: Optional[Union[OperationParameters, Dict[str, Any]]] = None, output_mode: str = 'default') -> OutputData: # Preprocessing applied From 63c98b7a71d504d4d0e2970e74a7a9ccfc97377a Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Mon, 4 Dec 2023 11:00:32 +0300 Subject: [PATCH 09/10] fix test --- test/integration/models/test_atomized_model.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/integration/models/test_atomized_model.py b/test/integration/models/test_atomized_model.py index 92c19ecf16..d66eb3347a 100644 --- a/test/integration/models/test_atomized_model.py +++ b/test/integration/models/test_atomized_model.py @@ -117,9 +117,8 @@ def test_atomized_model_metadata(atomized_node): pipeline = atomized_node.operation.pipeline # check input types, it should be union of input types of primary nodes - input_types = reduce(lambda types, node: types | set(node.operation.metadata.input_types), - pipeline.primary_nodes, - set()) + input_types = reduce(lambda types, input_types: types & set(input_types), + [set(node.operation.metadata.input_types) for node in pipeline.primary_nodes]) assert input_types == set(atomized_node.operation.metadata.input_types) # check output types, it should be output types of root node From 27a0b52b820eb8c8b44c6949322a81b34c38107e Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Mon, 4 Dec 2023 12:57:36 +0300 Subject: [PATCH 10/10] add docstring and replace lambda with operators --- fedot/core/operations/atomized_model.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/fedot/core/operations/atomized_model.py b/fedot/core/operations/atomized_model.py index 06a01e5b46..5fba8f6d44 100644 --- a/fedot/core/operations/atomized_model.py +++ b/fedot/core/operations/atomized_model.py @@ -1,7 +1,7 @@ from collections import Counter from datetime import timedelta from functools import reduce -from itertools import chain +from operator import and_, or_ from typing import Callable, Union, Optional, Set, List, Any, Dict from fedot.core.pipelines.node import PipelineNode @@ -71,19 +71,24 @@ def metadata(self) -> OperationMetaInfo: def extract_metadata_from_pipeline(attr_name: str, node_filter: Optional[Callable[[PipelineNode], bool]] = None, - reduce_function: Optional[Callable[[List[Set]], Set]] = None): + reduce_function: Optional[Callable[[Set], Set]] = None) -> List[Any]: + """ Extract metadata from atomized pipeline + :param attr_name: extracting metadata property + :param node_filter: return True for nodes with extracting metadata + :param reduce_function: function is used for combining extracted + metadata in ``reduce`` function + :return: list with extracted metadata + """ nodes_to_extract_metadata = self.pipeline.nodes if node_filter is not None: nodes_to_extract_metadata = [node for node in nodes_to_extract_metadata if node_filter(node)] data = [set(getattr(node.operation.metadata, attr_name)) for node in nodes_to_extract_metadata] - if reduce_function is None: - reduce_function = lambda x, y: x | y - return list(reduce(reduce_function, data)) + return list(reduce(reduce_function or or_, data)) tags = extract_metadata_from_pipeline('tags') input_types = extract_metadata_from_pipeline('input_types', node_filter=lambda node: node.is_primary, - reduce_function=lambda x, y: x & y) + reduce_function=and_) output_types = root_node.operation.metadata.output_types presets = extract_metadata_from_pipeline('presets')