Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1211-fix #1213

Merged
merged 10 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 61 additions & 45 deletions fedot/core/operations/atomized_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from copy import deepcopy
from collections import Counter
from datetime import timedelta
from typing import Callable, Union, Optional
from functools import reduce
from operator import and_, or_
from typing import Callable, Union, Optional, Set, List, Any, Dict

from fedot.core.pipelines.node import PipelineNode
from golem.core.tuning.simultaneous import SimultaneousTuner

from fedot.core.data.data import InputData, OutputData
Expand All @@ -11,7 +14,6 @@
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
from fedot.core.repository.operation_types_repository import OperationMetaInfo, \
atomized_model_type
from fedot.core.utils import make_pipeline_generator


class AtomizedModel(Operation):
Expand All @@ -23,32 +25,35 @@ def __init__(self, pipeline: 'Pipeline'):

super().__init__(operation_type=atomized_model_type())
self.pipeline = pipeline
self.unique_id = self.pipeline.root_node.descriptive_id

def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData):

copied_input_data = deepcopy(data)
predicted_train = self.pipeline.fit(input_data=copied_input_data)
def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData) -> ('Pipeline', OutputData):
predicted_train = self.pipeline.fit(input_data=data)
fitted_atomized_operation = self.pipeline

return fitted_atomized_operation, predicted_train

def predict(self, fitted_operation, data: InputData,
params: Optional[Union[OperationParameters, dict]] = None, output_mode: str = 'default'):
def predict(self,
fitted_operation: 'Pipeline',
data: InputData,
params: Optional[Union[OperationParameters, Dict[str, Any]]] = None,
output_mode: str = 'default') -> OutputData:

# Preprocessing applied
copied_input_data = deepcopy(data)
prediction = fitted_operation.predict(input_data=copied_input_data, output_mode=output_mode)
prediction = fitted_operation.predict(input_data=data, output_mode=output_mode)
prediction = self.assign_tabular_column_types(prediction, output_mode)
return prediction

def predict_for_fit(self, fitted_operation, data: InputData, params: Optional[OperationParameters] = None,
output_mode: str = 'default'):
def predict_for_fit(self,
fitted_operation: 'Pipeline',
data: InputData,
params: Optional[OperationParameters] = None,
output_mode: str = 'default') -> OutputData:
return self.predict(fitted_operation, data, params, output_mode)

def fine_tune(self, metric_function: Callable,
input_data: InputData = None, iterations: int = 50,
timeout: int = 5):
def fine_tune(self,
metric_function: Callable,
input_data: Optional[InputData] = None,
iterations: int = 50,
timeout: int = 5) -> 'AtomizedModel':
""" Method for tuning hyperparameters """
tuner = TunerBuilder(input_data.task)\
.with_tuner(SimultaneousTuner)\
Expand All @@ -62,40 +67,51 @@ def fine_tune(self, metric_function: Callable,

@property
def metadata(self) -> OperationMetaInfo:
generator = make_pipeline_generator(self.pipeline)
tags = set()

for node in generator:
tags.update(node.operation_tags)

root_node = self.pipeline.root_node
supported_strategies = None
allowed_positions = ['any']
tags = list(tags)

operation_info = OperationMetaInfo(root_node.operation.supplementary_data.id,
root_node.operation.supplementary_data.input_types,
root_node.operation.supplementary_data.output_types,
root_node.operation.supplementary_data.task_type,
supported_strategies, allowed_positions,
tags)

def extract_metadata_from_pipeline(attr_name: str,
node_filter: Optional[Callable[[PipelineNode], bool]] = None,
reduce_function: Optional[Callable[[Set], Set]] = None) -> List[Any]:
""" Extract metadata from atomized pipeline
:param attr_name: extracting metadata property
:param node_filter: return True for nodes with extracting metadata
:param reduce_function: function is used for combining extracted
metadata in ``reduce`` function
:return: list with extracted metadata
"""
nodes_to_extract_metadata = self.pipeline.nodes
if node_filter is not None:
nodes_to_extract_metadata = [node for node in nodes_to_extract_metadata if node_filter(node)]
data = [set(getattr(node.operation.metadata, attr_name)) for node in nodes_to_extract_metadata]
return list(reduce(reduce_function or or_, data))

tags = extract_metadata_from_pipeline('tags')
input_types = extract_metadata_from_pipeline('input_types',
node_filter=lambda node: node.is_primary,
reduce_function=and_)
output_types = root_node.operation.metadata.output_types
presets = extract_metadata_from_pipeline('presets')

operation_info = OperationMetaInfo(id=root_node.operation.metadata.id,
input_types=input_types,
output_types=output_types,
task_type=root_node.operation.metadata.task_type,
supported_strategies=None,
allowed_positions=['any'],
tags=tags,
presets=presets)
return operation_info

def description(self, operation_params: Optional[dict]):
def description(self, operation_params: Optional[dict] = None) -> str:
operation_type = self.operation_type
operation_length = self.pipeline.length
operation_depth = self.pipeline.depth
operation_id = self.unique_id
operation_types = {}

for node in self.pipeline.nodes:
if node.operation.operation_type in operation_types:
operation_types[node.operation.operation_type] += 1
else:
operation_types[node.operation.operation_type] = 1

operation_id = self.pipeline.root_node.descriptive_id
operation_types = map(lambda node: node.operation.operation_type,
self.pipeline.nodes)
operation_types_dict = dict(Counter(operation_types))
return f'{operation_type}_length:{operation_length}_depth:{operation_depth}' \
f'_types:{operation_types}_id:{operation_id}'
f'_types:{operation_types_dict}_id:{operation_id}'

@staticmethod
def assign_tabular_column_types(output_data: OutputData,
Expand Down
9 changes: 0 additions & 9 deletions fedot/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,6 @@ def ensure_directory_exists(dir_names: list):
os.mkdir(dataset_dir)


def make_pipeline_generator(pipeline):
visited_nodes = []

for node in pipeline.nodes:
if node not in visited_nodes:
visited_nodes.append(node)
yield node


def set_random_seed(seed: Optional[int]):
""" Sets random seed for evaluation of models"""
if seed is not None:
Expand Down
36 changes: 36 additions & 0 deletions test/integration/models/test_atomized_model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from functools import reduce

import numpy as np
import pytest
Expand Down Expand Up @@ -94,6 +95,11 @@ def create_pipeline_with_several_nested_atomized_model() -> Pipeline:
return pipeline


def get_some_atomized_nodes():
pipeline = create_pipeline_with_several_nested_atomized_model()
return [node for node in pipeline.nodes if isinstance(node.operation, AtomizedModel)]


def create_input_data():
train_file_path = os.path.join('test', 'data', 'scoring', 'scoring_train.csv')
test_file_path = os.path.join('test', 'data', 'scoring', 'scoring_test.csv')
Expand All @@ -106,6 +112,36 @@ def create_input_data():
return train_data, test_data


@pytest.mark.parametrize('atomized_node', get_some_atomized_nodes())
def test_atomized_model_metadata(atomized_node):
MorrisNein marked this conversation as resolved.
Show resolved Hide resolved
pipeline = atomized_node.operation.pipeline

# check input types, it should be union of input types of primary nodes
input_types = reduce(lambda types, input_types: types & set(input_types),
[set(node.operation.metadata.input_types) for node in pipeline.primary_nodes])
assert input_types == set(atomized_node.operation.metadata.input_types)

# check output types, it should be output types of root node
output_types = set(pipeline.root_node.operation.metadata.output_types)
assert output_types == set(atomized_node.operation.metadata.output_types)

# check tags, it should be union of tags of pipeline nodes
tags = reduce(lambda types, node: types | set(node.operation.metadata.tags),
pipeline.nodes,
set())
assert tags == set(atomized_node.operation.metadata.tags)

# check presets, it should be union of presets of pipeline nodes
presets = reduce(lambda types, node: types | set(node.operation.metadata.presets),
pipeline.nodes,
set())
assert presets == set(atomized_node.operation.metadata.presets)

# check task_type
task_type = set(pipeline.root_node.operation.metadata.task_type)
assert task_type == set(atomized_node.operation.metadata.task_type)


def test_save_load_atomized_pipeline_correctly():
pipeline = create_pipeline_with_several_nested_atomized_model()

Expand Down
Loading