Skip to content

Commit

Permalink
GEN-1166 - Improve Ingestion Workflow Error Summary (#18280)
Browse files Browse the repository at this point in the history
* GEN-1166 - Improve Ingestion Workflow Error Summary

* fix test

* docs

* comments
  • Loading branch information
pmbrull authored Oct 16, 2024
1 parent 89b6c1c commit 7012e73
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 169 deletions.
34 changes: 2 additions & 32 deletions ingestion/src/metadata/workflow/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,18 @@
from abc import ABC, abstractmethod
from typing import List, Optional

from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.application import (
OpenMetadataApplicationConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.api.step import Step
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.importer import import_from_module
from metadata.utils.logger import ingestion_logger
from metadata.workflow.base import BaseWorkflow
from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE

logger = ingestion_logger()

Expand Down Expand Up @@ -92,15 +86,9 @@ def __init__(self, config_dict: dict):
# Applications are associated to the OpenMetadata Service
self.service_type: ServiceType = ServiceType.Metadata

metadata_config: OpenMetadataConnection = (
self.config.workflowConfig.openMetadataServerConfig
)
log_level: LogLevels = self.config.workflowConfig.loggerLevel

super().__init__(
config=self.config,
log_level=log_level,
metadata_config=metadata_config,
workflow_config=self.workflow_config,
service_type=self.service_type,
)

Expand Down Expand Up @@ -134,26 +122,8 @@ def execute_internal(self) -> None:
"""Workflow-specific logic to execute safely"""
self.runner.run()

def calculate_success(self) -> float:
return self.runner.get_status().calculate_success()

def get_failures(self) -> List[StackTraceError]:
return self.workflow_steps()[0].get_status().failures

def workflow_steps(self) -> List[Step]:
return [self.runner]

def raise_from_status_internal(self, raise_warnings=False):
"""Check failed status in the runner"""
if (
self.runner.get_status().failures
and self.calculate_success() < SUCCESS_THRESHOLD_VALUE
):
raise WorkflowExecutionError(
f"{self.runner.name} reported errors: {Summary.from_step(self.runner)}"
)

if raise_warnings and self.runner.get_status().warnings:
raise WorkflowExecutionError(
f"{self.runner.name} reported warning: {Summary.from_step(self.runner)}"
)
64 changes: 47 additions & 17 deletions ingestion/src/metadata/workflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
from statistics import mean
from typing import Any, Dict, List, Optional, TypeVar, Union

from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
CreateIngestionPipelineRequest,
)
Expand All @@ -32,10 +34,13 @@
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
WorkflowConfig,
)
from metadata.generated.schema.tests.testSuite import ServiceType
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.timer.repeated_timer import RepeatedTimer
Expand All @@ -49,10 +54,7 @@
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger, set_loggers_level
from metadata.workflow.workflow_output_handler import WorkflowOutputHandler
from metadata.workflow.workflow_status_mixin import (
SUCCESS_THRESHOLD_VALUE,
WorkflowStatusMixin,
)
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin

logger = ingestion_logger()

Expand Down Expand Up @@ -82,8 +84,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
def __init__(
self,
config: Union[Any, Dict],
log_level: LogLevels,
metadata_config: OpenMetadataConnection,
workflow_config: WorkflowConfig,
service_type: ServiceType,
output_handler: WorkflowOutputHandler = WorkflowOutputHandler(),
):
Expand All @@ -92,19 +93,22 @@ def __init__(
"""
self.output_handler = output_handler
self.config = config
self.workflow_config = workflow_config
self.service_type = service_type
self._timer: Optional[RepeatedTimer] = None
self._ingestion_pipeline: Optional[IngestionPipeline] = None
self._start_ts = datetime_to_ts(datetime.now())

self._execution_time_tracker = ExecutionTimeTracker(
log_level == LogLevels.DEBUG
self.workflow_config.loggerLevel == LogLevels.DEBUG
)

set_loggers_level(log_level.value)
set_loggers_level(self.workflow_config.loggerLevel.value)

# We create the ometa client at the workflow level and pass it to the steps
self.metadata_config = metadata_config
self.metadata = create_ometa_client(metadata_config)
self.metadata = create_ometa_client(
self.workflow_config.openMetadataServerConfig
)
self.set_ingestion_pipeline_status(state=PipelineState.running)

self.post_init()
Expand Down Expand Up @@ -157,9 +161,22 @@ def post_init(self) -> None:
def execute_internal(self) -> None:
"""Workflow-specific logic to execute safely"""

@abstractmethod
def calculate_success(self) -> float:
"""Get the success % of the internal execution"""
def calculate_success(self) -> Optional[float]:
"""
Get the success % of the internal execution.
Since we'll use this to get a single success % from multiple steps, we'll take
the minimum success % from all the steps. This way, we can have a proper
workflow status.
E.g., if we have no errors on the source but a bunch of them on the sink,
we still want the flow to be marked as a failure or partial success.
"""
if not self.workflow_steps():
logger.warning("No steps to calculate success")
return None

return mean(
[step.get_status().calculate_success() for step in self.workflow_steps()]
)

@abstractmethod
def get_failures(self) -> List[StackTraceError]:
Expand All @@ -169,9 +186,22 @@ def get_failures(self) -> List[StackTraceError]:
def workflow_steps(self) -> List[Step]:
"""Steps to report status from"""

@abstractmethod
def raise_from_status_internal(self, raise_warnings=False) -> None:
"""Based on the internal workflow status, raise a WorkflowExecutionError"""
for step in self.workflow_steps():
if (
step.get_status().failures
and step.get_status().calculate_success()
< self.workflow_config.successThreshold
):
raise WorkflowExecutionError(
f"{step.name} reported errors: {Summary.from_step(step)}"
)

if raise_warnings and step.status.warnings:
raise WorkflowExecutionError(
f"{step.name} reported warning: {Summary.from_step(step)}"
)

def execute(self) -> None:
"""
Expand All @@ -186,7 +216,7 @@ def execute(self) -> None:
try:
self.execute_internal()

if SUCCESS_THRESHOLD_VALUE <= self.calculate_success() < 100:
if self.workflow_config.successThreshold <= self.calculate_success() < 100:
pipeline_state = PipelineState.partialSuccess

# Any unhandled exception breaking the workflow should update the status
Expand Down
42 changes: 5 additions & 37 deletions ingestion/src/metadata/workflow/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
from typing import List, Tuple, Type, cast

from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
Expand All @@ -38,7 +35,7 @@
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
from metadata.ingestion.models.custom_types import ServiceWithConnectionType
from metadata.profiler.api.models import ProfilerProcessorConfig
Expand All @@ -55,14 +52,15 @@
)
from metadata.utils.logger import ingestion_logger
from metadata.workflow.base import BaseWorkflow, InvalidWorkflowJSONException
from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE

logger = ingestion_logger()


class IngestionWorkflow(BaseWorkflow, ABC):
"""
Base Ingestion Workflow implementation
Base Ingestion Workflow implementation. This is used for all
workflows minus the application one, which directly inherits the
BaseWorkflow.
"""

config: OpenMetadataWorkflowConfig
Expand All @@ -79,14 +77,9 @@ def __init__(self, config: OpenMetadataWorkflowConfig):
self.config.source.type
)

metadata_config: OpenMetadataConnection = (
self.config.workflowConfig.openMetadataServerConfig
)

super().__init__(
config=config,
log_level=config.workflowConfig.loggerLevel,
metadata_config=metadata_config,
workflow_config=config.workflowConfig,
service_type=self.service_type,
)

Expand Down Expand Up @@ -137,37 +130,12 @@ def execute_internal(self):
if bulk_sink:
bulk_sink.run()

def calculate_success(self) -> float:
return self.source.get_status().calculate_success()

def get_failures(self) -> List[StackTraceError]:
return self.source.get_status().failures

def workflow_steps(self) -> List[Step]:
return [self.source] + list(self.steps)

def raise_from_status_internal(self, raise_warnings=False):
"""
Check the status of all steps
"""
if (
self.source.get_status().failures
and self.calculate_success() < SUCCESS_THRESHOLD_VALUE
):
raise WorkflowExecutionError(
f"{self.source.name} reported errors: {Summary.from_step(self.source)}"
)

for step in self.steps:
if step.status.failures:
raise WorkflowExecutionError(
f"{step.name} reported errors: {Summary.from_step(step)}"
)
if raise_warnings and step.status.warnings:
raise WorkflowExecutionError(
f"{step.name} reported warnings: {Summary.from_step(step)}"
)

def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
Expand Down
19 changes: 11 additions & 8 deletions ingestion/src/metadata/workflow/workflow_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

import time
from statistics import mean
from typing import Any, Dict, List, Optional, Type, Union

from pydantic import BaseModel
Expand Down Expand Up @@ -114,16 +115,15 @@ def print_summary(self, steps: List[Step], debug: bool = False):

self._print_summary(steps)

def _print_summary(self, steps: List[Step]):
def _print_summary(self, steps: List[Step]) -> None:
failures: List[Failure] = []
total_records: int = 0
total_errors: int = 0
if not steps:
log_ansi_encoded_string(message="No steps to process.")
return

for step in steps:
step_summary = Summary.from_step(step)

total_records += step_summary.records or 0
total_errors += step_summary.errors or 0
failures.append(
Failure(name=step.name, failures=step.get_status().failures)
)
Expand All @@ -141,15 +141,18 @@ def _print_summary(self, steps: List[Step]):
log_ansi_encoded_string(message=f"Filtered: {step_summary.filtered}")

log_ansi_encoded_string(message=f"Errors: {step_summary.errors}")
log_ansi_encoded_string(
message=f"Success %: {step.get_status().calculate_success()}"
)

self._print_failures_if_apply(failures)

total_success = max(total_records, 1)
# If nothing is processed, we'll have a success of 100%
success_pct = mean([step.get_status().calculate_success() for step in steps])
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Success %: "
+ f"{round(total_success * 100 / (total_success + total_errors), 2)}",
message="Workflow Success %: " + f"{round(success_pct, 2)}",
)

def _print_debug_summary(self, steps: List[Step]):
Expand Down
2 changes: 0 additions & 2 deletions ingestion/src/metadata/workflow/workflow_status_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

logger = ometa_logger()

SUCCESS_THRESHOLD_VALUE = 90


class WorkflowResultStatus(Enum):
SUCCESS = 0
Expand Down
6 changes: 2 additions & 4 deletions ingestion/tests/unit/profiler/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
Table,
TableProfilerConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.profiler.api.models import ProfilerProcessorConfig
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
Expand Down Expand Up @@ -122,7 +120,7 @@ def test_init_workflow(mocked_method, mocked_orm): # pylint: disable=unused-arg
mocked_method.assert_called()

assert isinstance(workflow.source.source_config, DatabaseServiceProfilerPipeline)
assert isinstance(workflow.metadata_config, OpenMetadataConnection)
assert isinstance(workflow.workflow_config, WorkflowConfig)

profiler_processor_step = workflow.steps[0]
assert isinstance(profiler_processor_step.profiler_config, ProfilerProcessorConfig)
Expand Down
Loading

0 comments on commit 7012e73

Please sign in to comment.