Skip to content

Commit

Permalink
1211-fix (#1213)
Browse files Browse the repository at this point in the history
#1211 fix
  • Loading branch information
kasyanovse authored Dec 4, 2023
1 parent 9d9469f commit 75d2da8
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 54 deletions.
106 changes: 61 additions & 45 deletions fedot/core/operations/atomized_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from copy import deepcopy
from collections import Counter
from datetime import timedelta
from typing import Callable, Union, Optional
from functools import reduce
from operator import and_, or_
from typing import Callable, Union, Optional, Set, List, Any, Dict

from fedot.core.pipelines.node import PipelineNode
from golem.core.tuning.simultaneous import SimultaneousTuner

from fedot.core.data.data import InputData, OutputData
Expand All @@ -11,7 +14,6 @@
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
from fedot.core.repository.operation_types_repository import OperationMetaInfo, \
atomized_model_type
from fedot.core.utils import make_pipeline_generator


class AtomizedModel(Operation):
Expand All @@ -23,32 +25,35 @@ def __init__(self, pipeline: 'Pipeline'):

super().__init__(operation_type=atomized_model_type())
self.pipeline = pipeline
self.unique_id = self.pipeline.root_node.descriptive_id

def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData):

copied_input_data = deepcopy(data)
predicted_train = self.pipeline.fit(input_data=copied_input_data)
def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData) -> ('Pipeline', OutputData):
predicted_train = self.pipeline.fit(input_data=data)
fitted_atomized_operation = self.pipeline

return fitted_atomized_operation, predicted_train

def predict(self, fitted_operation, data: InputData,
params: Optional[Union[OperationParameters, dict]] = None, output_mode: str = 'default'):
def predict(self,
fitted_operation: 'Pipeline',
data: InputData,
params: Optional[Union[OperationParameters, Dict[str, Any]]] = None,
output_mode: str = 'default') -> OutputData:

# Preprocessing applied
copied_input_data = deepcopy(data)
prediction = fitted_operation.predict(input_data=copied_input_data, output_mode=output_mode)
prediction = fitted_operation.predict(input_data=data, output_mode=output_mode)
prediction = self.assign_tabular_column_types(prediction, output_mode)
return prediction

def predict_for_fit(self, fitted_operation, data: InputData, params: Optional[OperationParameters] = None,
output_mode: str = 'default'):
def predict_for_fit(self,
fitted_operation: 'Pipeline',
data: InputData,
params: Optional[OperationParameters] = None,
output_mode: str = 'default') -> OutputData:
return self.predict(fitted_operation, data, params, output_mode)

def fine_tune(self, metric_function: Callable,
input_data: InputData = None, iterations: int = 50,
timeout: int = 5):
def fine_tune(self,
metric_function: Callable,
input_data: Optional[InputData] = None,
iterations: int = 50,
timeout: int = 5) -> 'AtomizedModel':
""" Method for tuning hyperparameters """
tuner = TunerBuilder(input_data.task)\
.with_tuner(SimultaneousTuner)\
Expand All @@ -62,40 +67,51 @@ def fine_tune(self, metric_function: Callable,

@property
def metadata(self) -> OperationMetaInfo:
generator = make_pipeline_generator(self.pipeline)
tags = set()

for node in generator:
tags.update(node.operation_tags)

root_node = self.pipeline.root_node
supported_strategies = None
allowed_positions = ['any']
tags = list(tags)

operation_info = OperationMetaInfo(root_node.operation.supplementary_data.id,
root_node.operation.supplementary_data.input_types,
root_node.operation.supplementary_data.output_types,
root_node.operation.supplementary_data.task_type,
supported_strategies, allowed_positions,
tags)

def extract_metadata_from_pipeline(attr_name: str,
node_filter: Optional[Callable[[PipelineNode], bool]] = None,
reduce_function: Optional[Callable[[Set], Set]] = None) -> List[Any]:
""" Extract metadata from atomized pipeline
:param attr_name: extracting metadata property
:param node_filter: return True for nodes with extracting metadata
:param reduce_function: function is used for combining extracted
metadata in ``reduce`` function
:return: list with extracted metadata
"""
nodes_to_extract_metadata = self.pipeline.nodes
if node_filter is not None:
nodes_to_extract_metadata = [node for node in nodes_to_extract_metadata if node_filter(node)]
data = [set(getattr(node.operation.metadata, attr_name)) for node in nodes_to_extract_metadata]
return list(reduce(reduce_function or or_, data))

tags = extract_metadata_from_pipeline('tags')
input_types = extract_metadata_from_pipeline('input_types',
node_filter=lambda node: node.is_primary,
reduce_function=and_)
output_types = root_node.operation.metadata.output_types
presets = extract_metadata_from_pipeline('presets')

operation_info = OperationMetaInfo(id=root_node.operation.metadata.id,
input_types=input_types,
output_types=output_types,
task_type=root_node.operation.metadata.task_type,
supported_strategies=None,
allowed_positions=['any'],
tags=tags,
presets=presets)
return operation_info

def description(self, operation_params: Optional[dict]):
def description(self, operation_params: Optional[dict] = None) -> str:
operation_type = self.operation_type
operation_length = self.pipeline.length
operation_depth = self.pipeline.depth
operation_id = self.unique_id
operation_types = {}

for node in self.pipeline.nodes:
if node.operation.operation_type in operation_types:
operation_types[node.operation.operation_type] += 1
else:
operation_types[node.operation.operation_type] = 1

operation_id = self.pipeline.root_node.descriptive_id
operation_types = map(lambda node: node.operation.operation_type,
self.pipeline.nodes)
operation_types_dict = dict(Counter(operation_types))
return f'{operation_type}_length:{operation_length}_depth:{operation_depth}' \
f'_types:{operation_types}_id:{operation_id}'
f'_types:{operation_types_dict}_id:{operation_id}'

@staticmethod
def assign_tabular_column_types(output_data: OutputData,
Expand Down
9 changes: 0 additions & 9 deletions fedot/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,6 @@ def ensure_directory_exists(dir_names: list):
os.mkdir(dataset_dir)


def make_pipeline_generator(pipeline):
visited_nodes = []

for node in pipeline.nodes:
if node not in visited_nodes:
visited_nodes.append(node)
yield node


def set_random_seed(seed: Optional[int]):
""" Sets random seed for evaluation of models"""
if seed is not None:
Expand Down
36 changes: 36 additions & 0 deletions test/integration/models/test_atomized_model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from functools import reduce

import numpy as np
import pytest
Expand Down Expand Up @@ -94,6 +95,11 @@ def create_pipeline_with_several_nested_atomized_model() -> Pipeline:
return pipeline


def get_some_atomized_nodes():
pipeline = create_pipeline_with_several_nested_atomized_model()
return [node for node in pipeline.nodes if isinstance(node.operation, AtomizedModel)]


def create_input_data():
train_file_path = os.path.join('test', 'data', 'scoring', 'scoring_train.csv')
test_file_path = os.path.join('test', 'data', 'scoring', 'scoring_test.csv')
Expand All @@ -106,6 +112,36 @@ def create_input_data():
return train_data, test_data


@pytest.mark.parametrize('atomized_node', get_some_atomized_nodes())
def test_atomized_model_metadata(atomized_node):
pipeline = atomized_node.operation.pipeline

# check input types, it should be union of input types of primary nodes
input_types = reduce(lambda types, input_types: types & set(input_types),
[set(node.operation.metadata.input_types) for node in pipeline.primary_nodes])
assert input_types == set(atomized_node.operation.metadata.input_types)

# check output types, it should be output types of root node
output_types = set(pipeline.root_node.operation.metadata.output_types)
assert output_types == set(atomized_node.operation.metadata.output_types)

# check tags, it should be union of tags of pipeline nodes
tags = reduce(lambda types, node: types | set(node.operation.metadata.tags),
pipeline.nodes,
set())
assert tags == set(atomized_node.operation.metadata.tags)

# check presets, it should be union of presets of pipeline nodes
presets = reduce(lambda types, node: types | set(node.operation.metadata.presets),
pipeline.nodes,
set())
assert presets == set(atomized_node.operation.metadata.presets)

# check task_type
task_type = set(pipeline.root_node.operation.metadata.task_type)
assert task_type == set(atomized_node.operation.metadata.task_type)


def test_save_load_atomized_pipeline_correctly():
pipeline = create_pipeline_with_several_nested_atomized_model()

Expand Down

0 comments on commit 75d2da8

Please sign in to comment.