Skip to content

Commit

Permalink
Merge branch 'main' into GEN-1589
Browse files Browse the repository at this point in the history
  • Loading branch information
Kenil27 authored Oct 17, 2024
2 parents 0932a6e + 1e01cb4 commit 540a763
Show file tree
Hide file tree
Showing 126 changed files with 1,016 additions and 302 deletions.
103 changes: 67 additions & 36 deletions ingestion/src/metadata/ingestion/connections/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import kill_active_connections
from metadata.profiler.orm.functions.conn_test import ConnTestFn
from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import cli_logger
from metadata.utils.timeout import timeout

Expand Down Expand Up @@ -92,25 +93,24 @@ def _test_connection_steps(
metadata: OpenMetadata,
steps: List[TestConnectionStep],
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
) -> TestConnectionResult:
"""
Run all the function steps and raise any errors
"""

if automation_workflow:
_test_connection_steps_automation_workflow(
return _test_connection_steps_automation_workflow(
metadata=metadata, steps=steps, automation_workflow=automation_workflow
)

else:
_test_connection_steps_during_ingestion(steps=steps)
return _test_connection_steps_and_raise(steps=steps)


def _test_connection_steps_automation_workflow(
metadata: OpenMetadata,
steps: List[TestConnectionStep],
automation_workflow: AutomationWorkflow,
) -> None:
) -> TestConnectionResult:
"""
Run the test connection as part of the automation workflow
We need to update the automation workflow in each step
Expand Down Expand Up @@ -187,53 +187,79 @@ def _test_connection_steps_automation_workflow(
)
)

return test_connection_result

def _test_connection_steps_during_ingestion(steps: List[TestConnectionStep]) -> None:
"""
Run the test connection as part of the ingestion workflow
Raise an exception if something fails
"""
test_connection_result = TestConnectionIngestionResult()

def _test_connection_steps_during_ingestion(
steps: List[TestConnectionStep],
) -> TestConnectionResult:
"""Run the test connection steps during ingestion"""
test_connection_result = TestConnectionResult(
status=StatusType.Running,
steps=[],
)
for step in steps:
try:
logger.info(f"Running {step.name}...")
step.function()
test_connection_result.success.append(f"'{step.name}': Pass")

except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"{step.name}-{exc}")
if step.mandatory:
test_connection_result.failed.append(
f"'{step.name}': This is a mandatory step and we won't be able to extract"
f" necessary metadata. Failed due to: {exc}"
test_connection_result.steps.append(
TestConnectionStepResult(
name=step.name,
mandatory=step.mandatory,
passed=True,
)

else:
test_connection_result.warning.append(
f"'{step.name}': This is a optional and the ingestion will continue to work as expected."
f"Failed due to: {exc}"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"{step.name}-{err}")
test_connection_result.steps.append(
TestConnectionStepResult(
name=step.name,
mandatory=step.mandatory,
passed=False,
message=step.error_message,
errorLog=str(err),
)

)
if step.short_circuit:
# break the workflow if the step is a short circuit step
break

logger.info("Test connection results:")
logger.info(test_connection_result)

if test_connection_result.failed:
raise SourceConnectionException(
f"Some steps failed when testing the connection: [{test_connection_result}]"
)
return test_connection_result


def _test_connection_steps_and_raise(
steps: List[TestConnectionStep],
) -> TestConnectionResult:
"""
Run the test connection as part of the ingestion workflow
Raise an exception if something fails
"""
test_connection_result = _test_connection_steps_during_ingestion(steps)

for step in test_connection_result.steps:
if not step.passed and step.mandatory:
raise SourceConnectionException(
f"Failed to run the test connection step: {step.name}"
)
if not step.passed:
logger.warning(
f"You might be missing metadata in: {step.name} due to {step.message}"
)

return test_connection_result


def test_connection_steps(
metadata: OpenMetadata,
service_type: str,
test_fn: dict,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: int = 3 * 60,
) -> None:
timeout_seconds: Optional[int] = THREE_MIN,
) -> TestConnectionResult:
"""
Test the connection steps with a given timeout
Expand Down Expand Up @@ -268,9 +294,12 @@ def test_connection_steps(
for step in test_connection_definition.steps
]

return timeout(timeout_seconds)(_test_connection_steps)(
metadata, steps, automation_workflow
)
if timeout_seconds:
return timeout(timeout_seconds)(_test_connection_steps)(
metadata, steps, automation_workflow
)

return _test_connection_steps(metadata, steps, automation_workflow)


def test_connection_engine_step(connection: Engine) -> None:
Expand All @@ -289,7 +318,7 @@ def test_connection_db_common(
service_connection,
automation_workflow: Optional[AutomationWorkflow] = None,
queries: dict = None,
timeout_seconds: int = 3 * 60,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand Down Expand Up @@ -339,6 +368,7 @@ def test_connection_db_schema_sources(
service_connection,
automation_workflow: Optional[AutomationWorkflow] = None,
queries: dict = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand Down Expand Up @@ -393,6 +423,7 @@ def custom_executor(engine_: Engine, inspector_fn_str: str):
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)

kill_active_connections(engine)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import THREE_MIN


class SchemaURLError(Exception):
Expand Down Expand Up @@ -54,6 +55,7 @@ def test_connection(
client: Response,
service_connection: RestConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand Down Expand Up @@ -84,4 +86,5 @@ def custom_schema_exec():
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"""
Source connection handler
"""

from typing import Optional

from pydomo import Domo
Expand All @@ -29,6 +28,7 @@
test_connection_steps,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import THREE_MIN


def get_connection(connection: DomoDashboardConnection) -> OMPyDomoClient:
Expand Down Expand Up @@ -57,6 +57,7 @@ def test_connection(
client: OMPyDomoClient,
service_connection: DomoDashboardConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand All @@ -77,4 +78,5 @@ def custom_test_page_list():
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"""
Source connection handler
"""

from typing import Optional

from metadata.generated.schema.entity.automations.workflow import (
Expand All @@ -27,6 +26,7 @@
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.lightdash.client import LightdashApiClient
from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand All @@ -49,6 +49,7 @@ def test_connection(
client: LightdashApiClient,
service_connection: LightdashConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand All @@ -65,4 +66,5 @@ def custom_executor():
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import THREE_MIN


def get_connection(connection: LookerConnection) -> Looker40SDK:
Expand All @@ -49,6 +50,7 @@ def test_connection(
client: Looker40SDK,
service_connection: LookerConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand Down Expand Up @@ -81,4 +83,5 @@ def validate_api_version():
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.metabase.client import MetabaseClient
from metadata.utils.constants import THREE_MIN


def get_connection(connection: MetabaseConnection) -> MetabaseClient:
Expand All @@ -37,6 +38,7 @@ def test_connection(
client: MetabaseClient,
service_connection: MetabaseConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand All @@ -54,4 +56,5 @@ def custom_executor():
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.mode.client import ModeApiClient
from metadata.utils.constants import THREE_MIN


def get_connection(connection: ModeConnection) -> ModeApiClient:
Expand All @@ -38,6 +39,7 @@ def test_connection(
client: ModeApiClient,
service_connection: ModeConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand All @@ -55,4 +57,5 @@ def test_connection(
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.mstr.client import MSTRClient
from metadata.utils.constants import THREE_MIN


def get_connection(connection: MstrConnection) -> MSTRClient:
Expand All @@ -37,6 +38,7 @@ def test_connection(
client: MSTRClient,
service_connection: MstrConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand All @@ -50,4 +52,5 @@ def test_connection(
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
PowerBiClient,
)
from metadata.ingestion.source.dashboard.powerbi.file_client import PowerBiFileClient
from metadata.utils.constants import THREE_MIN


def get_connection(connection: PowerBIConnection) -> PowerBiApiClient:
Expand All @@ -46,6 +47,7 @@ def test_connection(
client: PowerBiClient,
service_connection: PowerBIConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand All @@ -58,4 +60,5 @@ def test_connection(
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.qlikcloud.client import QlikCloudClient
from metadata.utils.constants import THREE_MIN


def get_connection(connection: QlikCloudConnection) -> QlikCloudClient:
Expand All @@ -37,6 +38,7 @@ def test_connection(
client: QlikCloudClient,
service_connection: QlikCloudConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> None:
"""
Test connection. This can be executed either as part
Expand All @@ -50,4 +52,5 @@ def test_connection(
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
Loading

0 comments on commit 540a763

Please sign in to comment.