From 5074f6588f625859c29a6658915b29db4fe39cf9 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 18 Oct 2024 09:40:06 +0200 Subject: [PATCH 1/3] MINOR - Validate app runner init (#18316) --- .../src/metadata/workflow/application.py | 11 ++-- .../workflow/test_application_workflow.py | 51 +++++++++++++++++++ 2 files changed, 57 insertions(+), 5 deletions(-) create mode 100644 ingestion/tests/unit/workflow/test_application_workflow.py diff --git a/ingestion/src/metadata/workflow/application.py b/ingestion/src/metadata/workflow/application.py index 17ff06540705..b22d4b1bd789 100644 --- a/ingestion/src/metadata/workflow/application.py +++ b/ingestion/src/metadata/workflow/application.py @@ -78,23 +78,24 @@ class ApplicationWorkflow(BaseWorkflow, ABC): config: OpenMetadataApplicationConfig runner: Optional[AppRunner] - def __init__(self, config_dict: dict): + def __init__(self, config: OpenMetadataApplicationConfig): self.runner = None # Will be passed in post-init - # TODO: Create a parse_gracefully method - self.config = OpenMetadataApplicationConfig.model_validate(config_dict) + self.config = config # Applications are associated to the OpenMetadata Service self.service_type: ServiceType = ServiceType.Metadata super().__init__( config=self.config, - workflow_config=self.workflow_config, + workflow_config=config.workflowConfig, service_type=self.service_type, ) @classmethod def create(cls, config_dict: dict): - return cls(config_dict) + # TODO: Create a parse_gracefully method + config = OpenMetadataApplicationConfig.model_validate(config_dict) + return cls(config) def post_init(self) -> None: """ diff --git a/ingestion/tests/unit/workflow/test_application_workflow.py b/ingestion/tests/unit/workflow/test_application_workflow.py new file mode 100644 index 000000000000..df725f4d493f --- /dev/null +++ b/ingestion/tests/unit/workflow/test_application_workflow.py @@ -0,0 +1,51 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Validate the initialization of the App Workflow +""" +import yaml + +from metadata.workflow.application import ApplicationWorkflow, AppRunner + + +class TestApp(AppRunner): + """Test App class""" + + def close(self) -> None: + """I am a test""" + + def run(self) -> None: + """I am a test""" + + +def test_init_app() -> None: + """We can properly instantiate the app""" + + config = f""" + sourcePythonClass: "{__name__}.TestApp" + appConfig: + type: Automator + resources: + type: [table] + queryFilter: "..." + actions: + - type: LineagePropagationAction + overwriteMetadata: false + workflowConfig: + openMetadataServerConfig: + hostPort: "http://localhost:8585/api" + authProvider: "openmetadata" + securityConfig: + jwtToken: "..." + """ + + workflow = ApplicationWorkflow.create(yaml.safe_load(config)) + assert isinstance(workflow, ApplicationWorkflow) From c2929e67e6b4b78241a92d40e42a6f5604c7c077 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 18 Oct 2024 09:54:07 +0200 Subject: [PATCH 2/3] MINOR - Return TestConnectionResult from test_connection_fn (#18320) * MINOR - Return TestConnectionResult from test_connection fn * MINOR - Return TestConnectionResult from test_connection fn --- .../ingestion/connections/test_connections.py | 29 ++++++++----------- .../ingestion/source/api/api_service.py | 8 ++++- .../ingestion/source/api/rest/connection.py | 7 +++-- .../source/dashboard/dashboard_service.py | 8 ++++- .../dashboard/domodashboard/connection.py | 7 +++-- .../source/dashboard/lightdash/connection.py | 7 +++-- .../source/dashboard/looker/connection.py | 7 +++-- .../source/dashboard/metabase/connection.py | 7 +++-- .../source/dashboard/mode/connection.py | 7 +++-- .../source/dashboard/mstr/connection.py | 7 +++-- .../source/dashboard/powerbi/connection.py | 7 +++-- .../source/dashboard/qlikcloud/connection.py | 7 +++-- .../source/dashboard/qliksense/connection.py | 7 +++-- .../source/dashboard/quicksight/connection.py | 7 +++-- .../source/dashboard/redash/connection.py | 7 +++-- .../source/dashboard/sigma/connection.py | 7 +++-- .../source/dashboard/superset/connection.py | 7 +++-- .../source/dashboard/tableau/connection.py | 7 +++-- .../source/database/athena/connection.py | 7 +++-- .../source/database/azuresql/connection.py | 7 +++-- .../source/database/bigquery/connection.py | 9 ++++-- .../source/database/bigtable/connection.py | 7 +++-- .../source/database/clickhouse/connection.py | 7 +++-- .../source/database/couchbase/connection.py | 7 +++-- .../source/database/database_service.py | 8 ++++- .../source/database/databricks/connection.py | 7 +++-- .../source/database/datalake/connection.py | 7 +++-- .../source/database/db2/connection.py | 7 +++-- .../source/database/deltalake/connection.py | 7 +++-- .../database/domodatabase/connection.py | 7 +++-- .../source/database/doris/connection.py | 7 +++-- .../source/database/druid/connection.py | 7 +++-- .../source/database/dynamodb/connection.py | 7 +++-- .../source/database/glue/connection.py | 7 +++-- .../source/database/greenplum/connection.py | 7 +++-- .../source/database/hive/connection.py | 7 +++-- .../source/database/iceberg/connection.py | 7 +++-- .../source/database/impala/connection.py | 7 +++-- .../source/database/mariadb/connection.py | 7 +++-- .../source/database/mongodb/connection.py | 7 +++-- .../source/database/mssql/connection.py | 7 +++-- .../source/database/mysql/connection.py | 7 +++-- .../source/database/oracle/connection.py | 7 +++-- .../source/database/pinotdb/connection.py | 7 +++-- .../source/database/postgres/connection.py | 7 +++-- .../source/database/presto/connection.py | 7 +++-- .../source/database/query_parser_source.py | 6 +++- .../source/database/redshift/connection.py | 9 ++++-- .../source/database/salesforce/connection.py | 7 +++-- .../source/database/salesforce/metadata.py | 6 +++- .../source/database/saperp/connection.py | 7 +++-- .../source/database/saphana/connection.py | 7 +++-- .../source/database/saphana/lineage.py | 6 +++- .../source/database/sas/connection.py | 7 +++-- .../ingestion/source/database/sas/metadata.py | 8 ++++- .../source/database/singlestore/connection.py | 7 +++-- .../source/database/snowflake/connection.py | 7 +++-- .../source/database/sqlite/connection.py | 7 +++-- .../source/database/teradata/connection.py | 7 +++-- .../source/database/trino/connection.py | 7 +++-- .../database/unitycatalog/connection.py | 7 +++-- .../source/database/unitycatalog/lineage.py | 8 ++++- .../source/database/vertica/connection.py | 7 +++-- .../source/messaging/kafka/connection.py | 7 +++-- .../source/messaging/kinesis/connection.py | 7 +++-- .../source/messaging/messaging_service.py | 8 ++++- .../source/messaging/redpanda/connection.py | 7 +++-- .../source/metadata/alationsink/connection.py | 7 +++-- .../source/metadata/alationsink/metadata.py | 6 +++- .../source/metadata/amundsen/connection.py | 7 +++-- .../source/metadata/amundsen/metadata.py | 8 ++++- .../source/metadata/atlas/connection.py | 7 +++-- .../source/metadata/atlas/metadata.py | 8 ++++- .../source/mlmodel/mlflow/connection.py | 7 +++-- .../source/mlmodel/mlmodel_service.py | 8 ++++- .../source/mlmodel/sagemaker/connection.py | 7 +++-- .../source/pipeline/airbyte/connection.py | 7 +++-- .../source/pipeline/airflow/connection.py | 7 +++-- .../source/pipeline/dagster/connection.py | 7 +++-- .../pipeline/databrickspipeline/connection.py | 7 +++-- .../source/pipeline/dbtcloud/connection.py | 7 +++-- .../pipeline/domopipeline/connection.py | 7 +++-- .../source/pipeline/fivetran/connection.py | 7 +++-- .../source/pipeline/flink/connection.py | 7 +++-- .../pipeline/gluepipeline/connection.py | 7 +++-- .../pipeline/kafkaconnect/connection.py | 7 +++-- .../source/pipeline/nifi/connection.py | 7 +++-- .../source/pipeline/openlineage/connection.py | 7 +++-- .../source/pipeline/pipeline_service.py | 8 ++++- .../source/pipeline/spline/connection.py | 7 +++-- .../source/search/elasticsearch/connection.py | 7 +++-- .../ingestion/source/search/search_service.py | 8 ++++- .../source/storage/gcs/connection.py | 7 +++-- .../ingestion/source/storage/s3/connection.py | 7 +++-- .../source/storage/storage_service.py | 8 ++++- ingestion/src/metadata/workflow/profiler.py | 8 +++-- 96 files changed, 515 insertions(+), 192 deletions(-) diff --git a/ingestion/src/metadata/ingestion/connections/test_connections.py b/ingestion/src/metadata/ingestion/connections/test_connections.py index 3b3f98756a69..c71b04a89d4e 100644 --- a/ingestion/src/metadata/ingestion/connections/test_connections.py +++ b/ingestion/src/metadata/ingestion/connections/test_connections.py @@ -103,7 +103,7 @@ def _test_connection_steps( metadata=metadata, steps=steps, automation_workflow=automation_workflow ) - return _test_connection_steps_and_raise(steps=steps) + return _test_connection_steps_during_ingestion(steps=steps) def _test_connection_steps_automation_workflow( @@ -231,16 +231,9 @@ def _test_connection_steps_during_ingestion( return test_connection_result -def _test_connection_steps_and_raise( - steps: List[TestConnectionStep], -) -> TestConnectionResult: - """ - Run the test connection as part of the ingestion workflow - Raise an exception if something fails - """ - test_connection_result = _test_connection_steps_during_ingestion(steps) - - for step in test_connection_result.steps: +def raise_test_connection_exception(result: TestConnectionResult) -> None: + """Raise if needed an exception for the test connection""" + for step in result.steps: if not step.passed and step.mandatory: raise SourceConnectionException( f"Failed to run the test connection step: {step.name}" @@ -250,8 +243,6 @@ def _test_connection_steps_and_raise( f"You might be missing metadata in: {step.name} due to {step.message}" ) - return test_connection_result - def test_connection_steps( metadata: OpenMetadata, @@ -319,7 +310,7 @@ def test_connection_db_common( automation_workflow: Optional[AutomationWorkflow] = None, queries: dict = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -351,7 +342,7 @@ def test_connection_db_common( for key, query in queries.items(): test_fn[key] = partial(test_query, statement=query, engine=engine) - test_connection_steps( + result = test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, @@ -361,6 +352,8 @@ def test_connection_db_common( kill_active_connections(engine) + return result + def test_connection_db_schema_sources( metadata: OpenMetadata, @@ -369,7 +362,7 @@ def test_connection_db_schema_sources( automation_workflow: Optional[AutomationWorkflow] = None, queries: dict = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -418,7 +411,7 @@ def custom_executor(engine_: Engine, inspector_fn_str: str): for key, query in queries.items(): test_fn[key] = partial(test_query, statement=query, engine=engine) - test_connection_steps( + result = test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, @@ -428,6 +421,8 @@ def custom_executor(engine_: Engine, inspector_fn_str: str): kill_active_connections(engine) + return result + def test_query(engine: Engine, statement: str): """ diff --git a/ingestion/src/metadata/ingestion/source/api/api_service.py b/ingestion/src/metadata/ingestion/source/api/api_service.py index b5f10c99c255..6b5e7380c785 100644 --- a/ingestion/src/metadata/ingestion/source/api/api_service.py +++ b/ingestion/src/metadata/ingestion/source/api/api_service.py @@ -39,6 +39,9 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.topology import ( NodeStage, @@ -175,7 +178,10 @@ def close(self): def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) def mark_api_collections_as_deleted(self) -> Iterable[Either[DeleteEntity]]: """Method to mark the api collection as deleted""" diff --git a/ingestion/src/metadata/ingestion/source/api/rest/connection.py b/ingestion/src/metadata/ingestion/source/api/rest/connection.py index 676a053f8069..2aa2f09c165c 100644 --- a/ingestion/src/metadata/ingestion/source/api/rest/connection.py +++ b/ingestion/src/metadata/ingestion/source/api/rest/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.api.restConnection import ( RestConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -56,7 +59,7 @@ def test_connection( service_connection: RestConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -81,7 +84,7 @@ def custom_schema_exec(): test_fn = {"CheckURL": custom_url_exec, "CheckSchema": custom_schema_exec} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 5a30430c4eec..96df9f912f78 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -55,6 +55,9 @@ from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import C, TopologyRunnerMixin +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification @@ -554,7 +557,10 @@ def get_dashboard(self) -> Any: def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) def prepare(self): """By default, nothing to prepare""" diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/connection.py index ee60049c5d87..7bc32cdf63d0 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.domoDashboardConnection import ( DomoDashboardConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_steps, @@ -58,7 +61,7 @@ def test_connection( service_connection: DomoDashboardConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -73,7 +76,7 @@ def custom_test_page_list(): "GetCharts": client.custom.test_list_cards, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/lightdash/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/lightdash/connection.py index aa8b0554e630..1d0cb137eb0c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/lightdash/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/lightdash/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.lightdashConnection import ( LightdashConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_steps, @@ -50,7 +53,7 @@ def test_connection( service_connection: LightdashConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -61,7 +64,7 @@ def custom_executor(): test_fn = {"GetDashboards": custom_executor} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/connection.py index 4803d544cca2..cee7c9f2d68a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/connection.py @@ -24,6 +24,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import ( LookerConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -51,7 +54,7 @@ def test_connection( service_connection: LookerConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -78,7 +81,7 @@ def validate_api_version(): "ListLookMLModels": list_datamodels_test, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase/connection.py index 6ac232c2a3fc..f8c09bc85992 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import ( MetabaseConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.metabase.client import MetabaseClient @@ -39,7 +42,7 @@ def test_connection( service_connection: MetabaseConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -51,7 +54,7 @@ def custom_executor(): test_fn = {"GetDashboards": custom_executor} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mode/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/mode/connection.py index ece0bcd97d23..96a670538a4c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mode/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mode/connection.py @@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.modeConnection import ( ModeConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.mode.client import ModeApiClient @@ -40,7 +43,7 @@ def test_connection( service_connection: ModeConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -52,7 +55,7 @@ def test_connection( ) } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mstr/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/mstr/connection.py index fc193ddf6734..156022a8aaca 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mstr/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mstr/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.mstrConnection import ( MstrConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.mstr.client import MSTRClient @@ -39,7 +42,7 @@ def test_connection( service_connection: MstrConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -47,7 +50,7 @@ def test_connection( test_fn = {"GetProjects": client.get_projects_list} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/connection.py index 2ac8f064866f..a7a73e7f406a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.powerBIConnection import ( PowerBIConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.powerbi.client import ( @@ -48,14 +51,14 @@ def test_connection( service_connection: PowerBIConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ test_fn = {"GetDashboards": client.api_client.fetch_dashboards} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/connection.py index 257c58623a9f..784d5088f2a0 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.qlikCloudConnection import ( QlikCloudConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.qlikcloud.client import QlikCloudClient @@ -39,7 +42,7 @@ def test_connection( service_connection: QlikCloudConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -47,7 +50,7 @@ def test_connection( test_fn = {"GetDashboards": client.get_dashboards_list_test_conn} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/connection.py index 2cbd25bdb7cc..6ef88f4066a7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.qlikSenseConnection import ( QlikSenseConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.qliksense.client import QlikSenseClient @@ -39,7 +42,7 @@ def test_connection( service_connection: QlikSenseConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -47,7 +50,7 @@ def test_connection( test_fn = {"GetDashboards": client.get_dashboard_for_test_connection} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/connection.py index 22786be835ee..6926a140eb8e 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.quickSightConnection import ( QuickSightConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -45,7 +48,7 @@ def test_connection( service_connection: QuickSightConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -57,7 +60,7 @@ def test_connection( ) } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/redash/connection.py index cfb43fcc1b04..a0aa013aa01d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash/connection.py @@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import ( RedashConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_steps, @@ -47,7 +50,7 @@ def test_connection( service_connection: RedashConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -55,7 +58,7 @@ def test_connection( test_fn = {"GetDashboards": client.dashboards} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/sigma/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/sigma/connection.py index 0cb9c78306d5..a6d33c67d930 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/sigma/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/sigma/connection.py @@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.sigmaConnection import ( SigmaConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_steps, @@ -47,7 +50,7 @@ def test_connection( service_connection: SigmaConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -55,7 +58,7 @@ def test_connection( test_fn = {"GetToken": client.get_auth_token, "GetWorkbooks": client.get_dashboards} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/connection.py index e4aac1aa13ba..1aa87e343831 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/connection.py @@ -29,6 +29,9 @@ from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( PostgresConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.generated.schema.entity.utils.supersetApiConnection import ( SupersetApiConnection, ) @@ -71,7 +74,7 @@ def test_connection( service_connection: SupersetConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -93,7 +96,7 @@ def test_connection( else: test_fn["GetCharts"] = partial(test_query, client, FETCH_ALL_CHARTS_TEST) - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py index 191cf83334d1..e3e976e62b8d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py @@ -24,6 +24,9 @@ from metadata.generated.schema.entity.services.connections.dashboard.tableauConnection import ( TableauConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.generated.schema.security.credentials.accessTokenAuth import ( AccessTokenAuth, ) @@ -72,7 +75,7 @@ def test_connection( service_connection: TableauConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -103,7 +106,7 @@ def test_connection( "GetDataModels": client.test_get_datamodels, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/athena/connection.py b/ingestion/src/metadata/ingestion/source/database/athena/connection.py index 67d422df14fe..59baec43250e 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/connection.py @@ -27,6 +27,9 @@ from metadata.generated.schema.entity.services.connections.database.athenaConnection import ( AthenaConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -96,7 +99,7 @@ def test_connection( service_connection: AthenaConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -123,7 +126,7 @@ def custom_executor_for_view(): "GetViews": custom_executor_for_view, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql/connection.py b/ingestion/src/metadata/ingestion/source/database/azuresql/connection.py index e2a47dbe409f..fe01c321b157 100644 --- a/ingestion/src/metadata/ingestion/source/database/azuresql/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/azuresql/connection.py @@ -28,6 +28,9 @@ from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( MssqlConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -107,12 +110,12 @@ def test_connection( service_connection: AzureSQLConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/connection.py b/ingestion/src/metadata/ingestion/source/database/bigquery/connection.py index ac4d4031e4c9..132f0671953d 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/connection.py @@ -27,6 +27,9 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( BigQueryConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.generated.schema.security.credentials.gcpCredentials import ( GcpCredentialsPath, ) @@ -111,7 +114,7 @@ def test_connection( service_connection: BigQueryConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -165,7 +168,7 @@ def test_connection_inner(engine): ), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, @@ -173,7 +176,7 @@ def test_connection_inner(engine): timeout_seconds=timeout_seconds, ) - test_connection_inner(engine) + return test_connection_inner(engine) def get_table_view_names(connection, schema=None): diff --git a/ingestion/src/metadata/ingestion/source/database/bigtable/connection.py b/ingestion/src/metadata/ingestion/source/database/bigtable/connection.py index 303453c129d2..4665e1004124 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigtable/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/bigtable/connection.py @@ -19,6 +19,9 @@ from metadata.generated.schema.entity.services.connections.database.bigTableConnection import ( BigTableConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.generated.schema.security.credentials.gcpValues import ( GcpCredentialsValues, SingleProjectId, @@ -97,7 +100,7 @@ def test_connection( service_connection: BigTableConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -110,7 +113,7 @@ def test_connection( "GetRows": tester.get_row, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse/connection.py b/ingestion/src/metadata/ingestion/source/database/clickhouse/connection.py index 07357ef071c0..020f8ab3df0b 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( ClickhouseConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -69,7 +72,7 @@ def test_connection( service_connection: ClickhouseConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -77,7 +80,7 @@ def test_connection( queries = {"GetQueries": CLICKHOUSE_SQL_STATEMENT_TEST} - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/couchbase/connection.py b/ingestion/src/metadata/ingestion/source/database/couchbase/connection.py index 537c1896ccc2..8a6a8f67547e 100644 --- a/ingestion/src/metadata/ingestion/source/database/couchbase/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/couchbase/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.couchbaseConnection import ( CouchbaseConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -51,7 +54,7 @@ def test_connection( service_connection: CouchbaseConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -82,7 +85,7 @@ def test_get_collections(client: Cluster, holder: SchemaHolder): "GetCollections": partial(test_get_collections, client, holder), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index aee239c55efe..600f211170fb 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -58,6 +58,9 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.life_cycle import OMetaLifeCycleData from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.topology import ( @@ -609,4 +612,7 @@ def yield_external_table_lineage(self) -> Iterable[Either[AddLineageRequest]]: def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/connection.py b/ingestion/src/metadata/ingestion/source/database/databricks/connection.py index e737122b55e9..14d3d3e392e9 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/connection.py @@ -25,6 +25,9 @@ from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( DatabricksConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -73,7 +76,7 @@ def test_connection( service_connection: DatabricksConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -106,7 +109,7 @@ def test_database_query(engine: Engine, statement: str): "GetQueries": client.test_query_api_access, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/connection.py b/ingestion/src/metadata/ingestion/source/database/datalake/connection.py index fed07e769625..fdc11f6021d0 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/connection.py @@ -31,6 +31,9 @@ from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( DatalakeConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.datalake.clients.azure_blob import ( @@ -93,7 +96,7 @@ def test_connection( service_connection: DatalakeConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -104,7 +107,7 @@ def test_connection( ), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/db2/connection.py b/ingestion/src/metadata/ingestion/source/database/db2/connection.py index 1b974804a828..cadde6775f38 100644 --- a/ingestion/src/metadata/ingestion/source/database/db2/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/db2/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.db2Connection import ( Db2Connection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -49,12 +52,12 @@ def test_connection( service_connection: Db2Connection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/deltalake/connection.py b/ingestion/src/metadata/ingestion/source/database/deltalake/connection.py index 41f7c79621cd..20d799007a1e 100644 --- a/ingestion/src/metadata/ingestion/source/database/deltalake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/deltalake/connection.py @@ -31,6 +31,9 @@ from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import ( DeltaLakeConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -85,7 +88,7 @@ def test_connection( service_connection: DeltaLakeConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -99,7 +102,7 @@ def test_connection( ), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/connection.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/connection.py index e822002145c0..3dbe0729e52f 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.domoDatabaseConnection import ( DomoDatabaseConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_steps, @@ -53,7 +56,7 @@ def test_connection( service_connection: DomoDatabaseConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -67,7 +70,7 @@ def custom_executor(): "GetTables": custom_executor, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/doris/connection.py b/ingestion/src/metadata/ingestion/source/database/doris/connection.py index c77c1f701cd2..4707ea926637 100644 --- a/ingestion/src/metadata/ingestion/source/database/doris/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/doris/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.dorisConnection import ( DorisConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -51,12 +54,12 @@ def test_connection( service_connection: DorisConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_schema_sources( + return test_connection_db_schema_sources( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/druid/connection.py b/ingestion/src/metadata/ingestion/source/database/druid/connection.py index 840add585b52..a43b76df34c7 100644 --- a/ingestion/src/metadata/ingestion/source/database/druid/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/druid/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.druidConnection import ( DruidConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -54,12 +57,12 @@ def test_connection( service_connection: DruidConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/dynamodb/connection.py b/ingestion/src/metadata/ingestion/source/database/dynamodb/connection.py index b6aee09810be..085b37e366a0 100644 --- a/ingestion/src/metadata/ingestion/source/database/dynamodb/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/dynamodb/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( DynamoDBConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -50,7 +53,7 @@ def test_connection( service_connection: DynamoDBConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -60,7 +63,7 @@ def test_connection( "ListTables": partial(check_list_tables, client), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/glue/connection.py b/ingestion/src/metadata/ingestion/source/database/glue/connection.py index 90ebbc8aa2e2..d1f8f4649bb7 100644 --- a/ingestion/src/metadata/ingestion/source/database/glue/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.glueConnection import ( GlueConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -41,7 +44,7 @@ def test_connection( service_connection: GlueConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -66,7 +69,7 @@ def custom_executor_for_table(): "GetTables": custom_executor_for_table, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/connection.py b/ingestion/src/metadata/ingestion/source/database/greenplum/connection.py index 976bb94ca727..b9ad6eff9d01 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.greenplumConnection import ( GreenplumConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -51,7 +54,7 @@ def test_connection( service_connection: GreenplumConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -59,7 +62,7 @@ def test_connection( queries = { "GetDatabases": GREENPLUM_GET_DATABASE, } - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/hive/connection.py b/ingestion/src/metadata/ingestion/source/database/hive/connection.py index 8569e02642b5..0681f72c20ca 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/connection.py @@ -34,6 +34,9 @@ from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( PostgresConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -183,7 +186,7 @@ def test_connection( service_connection: HiveConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -207,7 +210,7 @@ def test_connection( raise ValueError("Invalid metastore connection") engine = get_metastore_connection(service_connection.metastoreConnection) - test_connection_db_schema_sources( + return test_connection_db_schema_sources( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/iceberg/connection.py b/ingestion/src/metadata/ingestion/source/database/iceberg/connection.py index 352b83004437..6b8b804dc80b 100644 --- a/ingestion/src/metadata/ingestion/source/database/iceberg/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/iceberg/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.icebergConnection import ( IcebergConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.iceberg.catalog import IcebergCatalogFactory @@ -44,7 +47,7 @@ def test_connection( service_connection: IcebergConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -62,7 +65,7 @@ def custom_executor_for_tables(): "GetTables": custom_executor_for_tables, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/impala/connection.py b/ingestion/src/metadata/ingestion/source/database/impala/connection.py index 62920aa20259..4a984214ae27 100644 --- a/ingestion/src/metadata/ingestion/source/database/impala/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/impala/connection.py @@ -24,6 +24,9 @@ from metadata.generated.schema.entity.services.connections.database.impalaConnection import ( ImpalaConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -110,12 +113,12 @@ def test_connection( service_connection: ImpalaConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_schema_sources( + return test_connection_db_schema_sources( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/mariadb/connection.py b/ingestion/src/metadata/ingestion/source/database/mariadb/connection.py index c4e15423a072..1af77fab582e 100644 --- a/ingestion/src/metadata/ingestion/source/database/mariadb/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/mariadb/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import ( MariaDBConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -51,12 +54,12 @@ def test_connection( service_connection: MariaDBConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_schema_sources( + return test_connection_db_schema_sources( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/mongodb/connection.py b/ingestion/src/metadata/ingestion/source/database/mongodb/connection.py index 9f9697a274a6..0d77e2c16568 100644 --- a/ingestion/src/metadata/ingestion/source/database/mongodb/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/mongodb/connection.py @@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import ( MongoDBConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import get_connection_url_common from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -46,7 +49,7 @@ def test_connection( service_connection: MongoDBConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -72,7 +75,7 @@ def test_get_collections(client_: MongoClient, holder_: SchemaHolder): "GetCollections": partial(test_get_collections, client, holder), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/connection.py b/ingestion/src/metadata/ingestion/source/database/mssql/connection.py index d1664cc2f5ae..92d8b1281226 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( MssqlConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -62,7 +65,7 @@ def test_connection( service_connection: MssqlConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -71,7 +74,7 @@ def test_connection( "GetQueries": MSSQL_TEST_GET_QUERIES, "GetDatabases": MSSQL_GET_DATABASE, } - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/mysql/connection.py b/ingestion/src/metadata/ingestion/source/database/mysql/connection.py index a8bd34f77863..86dc523ef8b2 100644 --- a/ingestion/src/metadata/ingestion/source/database/mysql/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/mysql/connection.py @@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( MysqlConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -65,12 +68,12 @@ def test_connection( service_connection: MysqlConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_schema_sources( + return test_connection_db_schema_sources( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/oracle/connection.py b/ingestion/src/metadata/ingestion/source/database/oracle/connection.py index 2323c6875a07..83665684cbc2 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle/connection.py @@ -31,6 +31,9 @@ OracleServiceName, OracleTNSConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -134,7 +137,7 @@ def test_connection( service_connection: OracleConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -142,7 +145,7 @@ def test_connection( test_conn_queries = {"CheckAccess": CHECK_ACCESS_TO_ALL} - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/pinotdb/connection.py b/ingestion/src/metadata/ingestion/source/database/pinotdb/connection.py index 4051da4fd1e3..8f148f3718a3 100644 --- a/ingestion/src/metadata/ingestion/source/database/pinotdb/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/pinotdb/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.pinotDBConnection import ( PinotDBConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -65,12 +68,12 @@ def test_connection( service_connection: PinotDBConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/connection.py b/ingestion/src/metadata/ingestion/source/database/postgres/connection.py index fab91e4e9215..f447f13dfb9e 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/connection.py @@ -27,6 +27,9 @@ from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( PostgresConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -73,7 +76,7 @@ def test_connection( service_connection: PostgresConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -86,7 +89,7 @@ def test_connection( "GetDatabases": POSTGRES_GET_DATABASE, "GetTags": POSTGRES_TEST_GET_TAGS, } - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/presto/connection.py b/ingestion/src/metadata/ingestion/source/database/presto/connection.py index 00222d50f503..b45499700912 100644 --- a/ingestion/src/metadata/ingestion/source/database/presto/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/presto/connection.py @@ -25,6 +25,9 @@ from metadata.generated.schema.entity.services.connections.database.prestoConnection import ( PrestoConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -86,7 +89,7 @@ def test_connection( service_connection: PrestoConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -110,7 +113,7 @@ def custom_executor_for_table(): "GetTables": custom_executor_for_table, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/query_parser_source.py b/ingestion/src/metadata/ingestion/source/database/query_parser_source.py index f502c576a3e3..20d8fbaec0dc 100644 --- a/ingestion/src/metadata/ingestion/source/database/query_parser_source.py +++ b/ingestion/src/metadata/ingestion/source/database/query_parser_source.py @@ -20,6 +20,9 @@ ) from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.steps import Source +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_test_connection_fn from metadata.utils.helpers import get_start_and_end @@ -121,4 +124,5 @@ def close(self): def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.engine) + result = test_connection_fn(self.engine) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/connection.py b/ingestion/src/metadata/ingestion/source/database/redshift/connection.py index 4a7db793a633..e904ba8a95db 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/connection.py @@ -24,6 +24,9 @@ from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( RedshiftConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -64,7 +67,7 @@ def test_connection( service_connection: RedshiftConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -98,7 +101,7 @@ def test_get_queries_permissions(engine_: Engine): ), } - test_connection_steps( + result = test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, @@ -107,3 +110,5 @@ def test_get_queries_permissions(engine_: Engine): ) kill_active_connections(engine) + + return result diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce/connection.py b/ingestion/src/metadata/ingestion/source/database/salesforce/connection.py index d3cac4f4722e..6a41e4389c47 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.salesforceConnection import ( SalesforceConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -50,14 +53,14 @@ def test_connection( service_connection: SalesforceConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ test_fn = {"CheckAccess": client.describe} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py index ddc6ad4e1665..a40ac76fa694 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py @@ -48,6 +48,9 @@ from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -375,4 +378,5 @@ def close(self): def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.client, self.service_connection) + result = test_connection_fn(self.client, self.service_connection) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/database/saperp/connection.py b/ingestion/src/metadata/ingestion/source/database/saperp/connection.py index 2b4ae95e0295..ba5bac40dbb1 100644 --- a/ingestion/src/metadata/ingestion/source/database/saperp/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/saperp/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.database.sapErpConnection import ( SapErpConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.saperp.client import SapErpClient @@ -39,12 +42,12 @@ def test_connection( service_connection: SapErpConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: test_fn = { "GetTables": client.test_table_api, "GetColumns": client.test_column_api, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/connection.py b/ingestion/src/metadata/ingestion/source/database/saphana/connection.py index b3a4739acb17..ae8fe0ce333c 100644 --- a/ingestion/src/metadata/ingestion/source/database/saphana/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/saphana/connection.py @@ -30,6 +30,9 @@ from metadata.generated.schema.entity.services.connections.database.sapHanaConnection import ( SapHanaConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -158,13 +161,13 @@ def test_connection( service_connection: SapHanaConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=_build_test_fn_dict(engine, service_connection), service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py b/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py index 0c2bdf824adc..c474c4bfea6f 100644 --- a/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py @@ -29,6 +29,9 @@ ) from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException, Source +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_test_connection_fn from metadata.ingestion.source.database.saphana.cdata_parser import ( @@ -156,4 +159,5 @@ def parse_cdata( def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.engine) + result = test_connection_fn(self.engine) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/database/sas/connection.py b/ingestion/src/metadata/ingestion/source/database/sas/connection.py index 9acbf4d140cb..014c1286dd67 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.database.sasConnection import ( SASConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.sas.client import SASClient @@ -39,9 +42,9 @@ def test_connection( service_connection: SASConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: test_fn = {"CheckAccess": client.check_connection} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index aadfc8c3c9fd..c6fbc8fb0c29 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -71,6 +71,9 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -907,4 +910,7 @@ def close(self) -> None: def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/database/singlestore/connection.py b/ingestion/src/metadata/ingestion/source/database/singlestore/connection.py index 0513a641cde1..c3aa666f4ad0 100644 --- a/ingestion/src/metadata/ingestion/source/database/singlestore/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/singlestore/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( SingleStoreConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -52,12 +55,12 @@ def test_connection( service_connection: SingleStoreConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_schema_sources( + return test_connection_db_schema_sources( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py index e85522c5ba36..5cbc9479fbf8 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py @@ -28,6 +28,9 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -147,7 +150,7 @@ def test_connection( service_connection: SnowflakeConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow. @@ -192,7 +195,7 @@ def test_connection( ), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/sqlite/connection.py b/ingestion/src/metadata/ingestion/source/database/sqlite/connection.py index ff4cbdaaa88d..f7215cca7c6b 100644 --- a/ingestion/src/metadata/ingestion/source/database/sqlite/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/sqlite/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( SQLiteConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -54,12 +57,12 @@ def test_connection( service_connection: SQLiteConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/teradata/connection.py b/ingestion/src/metadata/ingestion/source/database/teradata/connection.py index f1c371b4255f..be8d1e856bd9 100644 --- a/ingestion/src/metadata/ingestion/source/database/teradata/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/teradata/connection.py @@ -24,6 +24,9 @@ from metadata.generated.schema.entity.services.connections.database.teradataConnection import ( TeradataConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -84,14 +87,14 @@ def test_connection( service_connection: TeradataConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ queries = {"GetDatabases": TERADATA_GET_DATABASE} - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/trino/connection.py b/ingestion/src/metadata/ingestion/source/database/trino/connection.py index 2a987a7a7d52..6ee18950850a 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/connection.py @@ -29,6 +29,9 @@ from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( TrinoConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -126,7 +129,7 @@ def test_connection( service_connection: TrinoConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -135,7 +138,7 @@ def test_connection( "GetDatabases": TRINO_GET_DATABASE, } - test_connection_db_schema_sources( + return test_connection_db_schema_sources( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py index 3a37052dea90..b17be4dd0222 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/connection.py @@ -23,6 +23,9 @@ from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( UnityCatalogConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient @@ -56,7 +59,7 @@ def test_connection( service_connection: UnityCatalogConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -94,7 +97,7 @@ def get_tables(connection: WorkspaceClient, table_obj: DatabricksTable): "GetQueries": client.test_query_api_access, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index cafde0b443f5..3189102eec3e 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -30,6 +30,9 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException, Source +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_test_connection_fn @@ -159,4 +162,7 @@ def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/database/vertica/connection.py b/ingestion/src/metadata/ingestion/source/database/vertica/connection.py index 39f3b41e607d..945564d3ac7e 100644 --- a/ingestion/src/metadata/ingestion/source/database/vertica/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/vertica/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.database.verticaConnection import ( VerticaConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import ( create_generic_db_connection, get_connection_args_common, @@ -53,7 +56,7 @@ def test_connection( service_connection: VerticaConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -62,7 +65,7 @@ def test_connection( "GetQueries": VERTICA_TEST_GET_QUERIES, "GetDatabases": VERTICA_LIST_DATABASES, } - test_connection_db_common( + return test_connection_db_common( metadata=metadata, engine=engine, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py index 519844534bca..66c05a2d368e 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py @@ -29,6 +29,9 @@ from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import ( RedpandaConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -122,7 +125,7 @@ def test_connection( service_connection: Union[KafkaConnection, RedpandaConnection], automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -151,7 +154,7 @@ def schema_registry_test(): "CheckSchemaRegistry": schema_registry_test, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/messaging/kinesis/connection.py b/ingestion/src/metadata/ingestion/source/messaging/kinesis/connection.py index b9bc119c1dab..573dd986a65b 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kinesis/connection.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kinesis/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.messaging.kinesisConnection import ( KinesisConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -43,7 +46,7 @@ def test_connection( service_connection: KinesisConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -51,7 +54,7 @@ def test_connection( test_fn = {"GetTopics": client.list_streams} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index faa033784a0f..b6d5f0d8eb1f 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -34,6 +34,9 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.topology import ( NodeStage, @@ -198,7 +201,10 @@ def prepare(self): def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) def mark_topics_as_deleted(self) -> Iterable[Either[DeleteEntity]]: """Method to mark the topics as deleted""" diff --git a/ingestion/src/metadata/ingestion/source/messaging/redpanda/connection.py b/ingestion/src/metadata/ingestion/source/messaging/redpanda/connection.py index b923c5c8a025..34c01ceb9dcc 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/redpanda/connection.py +++ b/ingestion/src/metadata/ingestion/source/messaging/redpanda/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import ( RedpandaConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.messaging.kafka.connection import KafkaClient from metadata.ingestion.source.messaging.kafka.connection import ( @@ -47,13 +50,13 @@ def test_connection( service_connection: RedpandaConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ - test_kafka_connection( + return test_kafka_connection( metadata=metadata, client=client, service_connection=service_connection, diff --git a/ingestion/src/metadata/ingestion/source/metadata/alationsink/connection.py b/ingestion/src/metadata/ingestion/source/metadata/alationsink/connection.py index a3f58a4ef027..64a6b1998925 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/alationsink/connection.py +++ b/ingestion/src/metadata/ingestion/source/metadata/alationsink/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.metadata.alationSinkConnection import ( AlationSinkConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.metadata.alationsink.client import AlationSinkClient @@ -39,7 +42,7 @@ def test_connection( service_connection: AlationSinkConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -47,7 +50,7 @@ def test_connection( test_fn = {"CheckAccess": client.list_native_datasources} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/metadata/alationsink/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/alationsink/metadata.py index fa83c71ddd52..c29b81f7173b 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/alationsink/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/alationsink/metadata.py @@ -33,6 +33,9 @@ ) from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.api.steps import InvalidSourceException, Source +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.utils import model_str from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -459,6 +462,7 @@ def close(self): def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn( + result = test_connection_fn( self.metadata, self.alation_sink_client, self.service_connection ) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen/connection.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen/connection.py index 394834f4928a..8eabc792c96e 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen/connection.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen/connection.py @@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.metadata.amundsenConnection import ( AmundsenConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_steps, @@ -58,7 +61,7 @@ def test_connection( service_connection: AmundsenConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -68,7 +71,7 @@ def test_connection( "CheckAccess": partial(client.execute_query, query=NEO4J_AMUNDSEN_USER_QUERY) } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen/metadata.py index a561aa7ab136..8c9d077e434d 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen/metadata.py @@ -56,6 +56,9 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException, Source +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.client_utils import get_chart_entities_from_id from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -460,4 +463,7 @@ def get_database_service(self, service_name: str) -> DatabaseService: def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas/connection.py b/ingestion/src/metadata/ingestion/source/metadata/atlas/connection.py index a5218c666715..4a5f934a051b 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas/connection.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.metadata.atlasConnection import ( AtlasConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.metadata.atlas.client import AtlasClient @@ -39,7 +42,7 @@ def test_connection( service_connection: AtlasConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -47,7 +50,7 @@ def test_connection( test_fn = {"CheckAccess": client.list_entities} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py index 1f967e35a921..2711c5ca1469 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py @@ -41,6 +41,9 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either, Entity, StackTraceError from metadata.ingestion.api.steps import InvalidSourceException, Source +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -477,4 +480,7 @@ def get_lineage_entity_ref( def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/connection.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/connection.py index 335d4e4e4144..3957227936ad 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/connection.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import ( MlflowConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -43,7 +46,7 @@ def test_connection( service_connection: MlflowConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -51,7 +54,7 @@ def test_connection( test_fn = {"GetModels": client.search_registered_models} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index f49baae8cf97..e9517f819895 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -38,6 +38,9 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.topology import ( NodeStage, @@ -176,7 +179,10 @@ def close(self): def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) def mark_mlmodels_as_deleted(self) -> Iterable[Either[DeleteEntity]]: """Method to mark the mlmodels as deleted""" diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/connection.py b/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/connection.py index 9c67e8f67ffb..ddf5f308e5e8 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/connection.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/sagemaker/connection.py @@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.mlmodel.sageMakerConnection import ( SageMakerConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -39,7 +42,7 @@ def test_connection( service_connection: SageMakerConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -47,7 +50,7 @@ def test_connection( test_fn = {"GetModels": client.list_models} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/connection.py index d4387b78b931..03393d90fea1 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import ( AirbyteConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.airbyte.client import AirbyteClient @@ -39,7 +42,7 @@ def test_connection( service_connection: AirbyteConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -47,7 +50,7 @@ def test_connection( test_fn = {"GetPipelines": client.list_workspaces} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/connection.py index 2bf2a6786d61..70dacd993d54 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/connection.py @@ -36,6 +36,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import ( BackendConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_engine_step, @@ -105,7 +108,7 @@ def test_connection( service_connection: AirflowConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -113,7 +116,7 @@ def test_connection( test_fn = {"CheckAccess": partial(test_connection_engine_step, engine)} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dagster/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/dagster/connection.py index 9884e97060e8..b17564bd2fa1 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dagster/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dagster/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.dagsterConnection import ( DagsterConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.dagster.client import DagsterClient @@ -40,7 +43,7 @@ def test_connection( service_connection: DagsterConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -51,7 +54,7 @@ def custom_executor_for_pipeline(): test_fn = {"GetPipelines": custom_executor_for_pipeline} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py index 6faf3e976a75..adbb9707ddcf 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py @@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.databricksPipelineConnection import ( DatabricksPipelineConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.databricks.client import DatabricksClient @@ -40,7 +43,7 @@ def test_connection( service_connection: DatabricksPipelineConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -48,7 +51,7 @@ def test_connection( test_fn = {"GetPipelines": client.list_jobs_test_connection} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/connection.py index 992089be64f2..0c98d03bd84e 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.dbtCloudConnection import ( DBTCloudConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.dbtcloud.client import DBTCloudClient @@ -41,7 +44,7 @@ def test_connection( service_connection: DBTCloudConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -54,7 +57,7 @@ def test_connection( "GetRuns": partial(client.get_runs, job_id=job_id), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/connection.py index 1ff928dd59c4..eed706734aeb 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/connection.py @@ -24,6 +24,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.domoPipelineConnection import ( DomoPipelineConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_steps, @@ -49,7 +52,7 @@ def test_connection( service_connection: DomoPipelineConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -61,7 +64,7 @@ def custom_executor(): test_fn = {"GetPipelines": custom_executor} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/connection.py index b60828feab48..5096b62f0268 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import ( FivetranConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.fivetran.client import FivetranClient @@ -39,7 +42,7 @@ def test_connection( service_connection: FivetranConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -47,7 +50,7 @@ def test_connection( test_fn = {"GetPipelines": client.list_groups} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/flink/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/flink/connection.py index a868ae01ddf5..a633b3096571 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/flink/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/flink/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.flinkConnection import ( FlinkConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.flink.client import FlinkClient @@ -39,14 +42,14 @@ def test_connection( service_connection: FlinkConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow """ test_fn = {"GetPipelines": client.get_jobs} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/connection.py index 222ed9a84ab0..b953934147b8 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import ( GluePipelineConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -40,7 +43,7 @@ def test_connection( service_connection: GluePipelineConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -48,7 +51,7 @@ def test_connection( test_fn = {"GetPipelines": client.list_workflows} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/connection.py index c8a3f4e87d94..38ea63920f0b 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/connection.py @@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import ( KafkaConnectConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.kafkaconnect.client import KafkaConnectClient @@ -40,7 +43,7 @@ def test_connection( service_connection: KafkaConnectConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -52,7 +55,7 @@ def test_connection( "GetPlugins": client.get_connector_plugins, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/nifi/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/nifi/connection.py index 1b4b86e8cef0..097b8cc1dbb4 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/nifi/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/nifi/connection.py @@ -21,6 +21,9 @@ BasicAuthentication, NifiConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.nifi.client import NifiClient @@ -55,7 +58,7 @@ def test_connection( service_connection: NifiConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -66,7 +69,7 @@ def custom_executor(): test_fn = {"GetPipelines": custom_executor} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/connection.py index f578531672f5..ec7abe8b80a1 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/connection.py @@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( SecurityProtocol as KafkaSecProtocol, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import ( SourceConnectionException, test_connection_steps, @@ -78,7 +81,7 @@ def test_connection( service_connection: OpenLineageConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -91,7 +94,7 @@ def custom_executor(): test_fn = {"GetWatermarkOffsets": custom_executor} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 3df3712277a4..d9e8129aac91 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -35,6 +35,9 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest @@ -251,7 +254,10 @@ def get_pipeline(self) -> Any: def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) def register_record(self, pipeline_request: CreatePipelineRequest) -> None: """Mark the pipeline record as scanned and update the pipeline_source_state""" diff --git a/ingestion/src/metadata/ingestion/source/pipeline/spline/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/spline/connection.py index 2b4d0dcfe8bd..acee709f941f 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/spline/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/spline/connection.py @@ -20,6 +20,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.splineConnection import ( SplineConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.spline.client import SplineClient @@ -40,7 +43,7 @@ def test_connection( service_connection: SplineConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -48,7 +51,7 @@ def test_connection( test_fn = {"GetPipelines": client.get_pipelines_test_connection} - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/search/elasticsearch/connection.py b/ingestion/src/metadata/ingestion/source/search/elasticsearch/connection.py index dbd726756985..19c1b2e2c15a 100644 --- a/ingestion/src/metadata/ingestion/source/search/elasticsearch/connection.py +++ b/ingestion/src/metadata/ingestion/source/search/elasticsearch/connection.py @@ -40,6 +40,9 @@ from metadata.generated.schema.entity.services.connections.search.elasticSearchConnection import ( ElasticsearchConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.builders import init_empty_connection_arguments from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -186,7 +189,7 @@ def test_connection( service_connection: ElasticsearchConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -200,7 +203,7 @@ def test_get_search_indexes(): "GetSearchIndexes": test_get_search_indexes, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/search/search_service.py b/ingestion/src/metadata/ingestion/source/search/search_service.py index ce49c3245a1f..6bf2eaadd374 100644 --- a/ingestion/src/metadata/ingestion/source/search/search_service.py +++ b/ingestion/src/metadata/ingestion/source/search/search_service.py @@ -41,6 +41,9 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.search_index_data import OMetaIndexSampleData from metadata.ingestion.models.topology import ( @@ -192,7 +195,10 @@ def prepare(self): def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) def mark_search_indexes_as_deleted(self) -> Iterable[Either[DeleteEntity]]: """Method to mark the search index as deleted""" diff --git a/ingestion/src/metadata/ingestion/source/storage/gcs/connection.py b/ingestion/src/metadata/ingestion/source/storage/gcs/connection.py index cafb53cdc159..5d1f78ade555 100644 --- a/ingestion/src/metadata/ingestion/source/storage/gcs/connection.py +++ b/ingestion/src/metadata/ingestion/source/storage/gcs/connection.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.storage.gcsConnection import ( GcsConnection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.generated.schema.security.credentials.gcpValues import ( GcpCredentialsValues, SingleProjectId, @@ -138,7 +141,7 @@ def test_connection( service_connection: GcsConnection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -153,7 +156,7 @@ def test_connection( "GetMetrics": tester.get_metrics, } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/connection.py b/ingestion/src/metadata/ingestion/source/storage/s3/connection.py index 3c745dda162c..d2ba001b6ea9 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/connection.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/connection.py @@ -27,6 +27,9 @@ from metadata.generated.schema.entity.services.connections.storage.s3Connection import ( S3Connection, ) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import THREE_MIN @@ -55,7 +58,7 @@ def test_connection( service_connection: S3Connection, automation_workflow: Optional[AutomationWorkflow] = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -77,7 +80,7 @@ def test_buckets(connection: S3Connection, client: S3ObjectStoreClient): ), } - test_connection_steps( + return test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 5afb3b19b1d7..9a57e5231253 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -40,6 +40,9 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.topology import ( @@ -236,7 +239,10 @@ def register_record(self, container_request: CreateContainerRequest) -> None: def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) - test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + result = test_connection_fn( + self.metadata, self.connection_obj, self.service_connection + ) + raise_test_connection_exception(result) def mark_containers_as_deleted(self) -> Iterable[Either[DeleteEntity]]: """Method to mark the containers as deleted""" diff --git a/ingestion/src/metadata/workflow/profiler.py b/ingestion/src/metadata/workflow/profiler.py index 4354b8c96bc7..5f8d692d2f20 100644 --- a/ingestion/src/metadata/workflow/profiler.py +++ b/ingestion/src/metadata/workflow/profiler.py @@ -20,6 +20,9 @@ OpenMetadataWorkflowConfig, ) from metadata.ingestion.api.steps import Processor, Sink +from metadata.ingestion.connections.test_connections import ( + raise_test_connection_exception, +) from metadata.ingestion.source.connections import get_test_connection_fn from metadata.pii.processor import PIIProcessor from metadata.profiler.processor.processor import ProfilerProcessor @@ -75,12 +78,13 @@ def set_steps(self): else: self.steps = (profiler_processor, sink) - def test_connection(self): + def test_connection(self) -> None: service_config = self.config.source.serviceConnection.root.config conn = get_ssl_connection(service_config) test_connection_fn = get_test_connection_fn(service_config) - test_connection_fn(self.metadata, conn, service_config) + result = test_connection_fn(self.metadata, conn, service_config) + raise_test_connection_exception(result) def _get_sink(self) -> Sink: sink_type = self.config.sink.type From 781989e5bca4e3bfdc3cfec73f0456e35a7db71f Mon Sep 17 00:00:00 2001 From: Teddy Date: Fri, 18 Oct 2024 12:07:11 +0200 Subject: [PATCH 3/3] MINOR - live index on test suite creation (#18317) * fix: live index on test suite creation * fix: make live indexing use entityInterface --- .../service/jdbi3/TestSuiteRepository.java | 29 ++++++++++ .../dqtests/TestSuiteResourceTest.java | 56 +++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestSuiteRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestSuiteRepository.java index 61654fb453fa..a582ea559982 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestSuiteRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestSuiteRepository.java @@ -8,12 +8,14 @@ import static org.openmetadata.service.Entity.TEST_CASE; import static org.openmetadata.service.Entity.TEST_CASE_RESULT; import static org.openmetadata.service.Entity.TEST_SUITE; +import static org.openmetadata.service.Entity.getEntity; import static org.openmetadata.service.Entity.getEntityTimeSeriesRepository; import static org.openmetadata.service.util.FullyQualifiedName.quoteName; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import javax.json.JsonArray; @@ -23,6 +25,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.sqlobject.transaction.Transaction; +import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.tests.DataQualityReport; import org.openmetadata.schema.tests.ResultSummary; @@ -40,6 +43,8 @@ import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.search.SearchIndexUtils; import org.openmetadata.service.search.SearchListFilter; +import org.openmetadata.service.search.indexes.SearchIndex; +import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.JsonUtils; @@ -240,6 +245,30 @@ public TestSummary getTestSummary(UUID testSuiteId) { return null; } + @Override + protected void postCreate(TestSuite entity) { + super.postCreate(entity); + if (Boolean.TRUE.equals(entity.getExecutable()) + && entity.getExecutableEntityReference() != null) { + // Update table index with test suite field + EntityInterface entityInterface = + getEntity(entity.getExecutableEntityReference(), "testSuite", ALL); + IndexMapping indexMapping = + searchRepository.getIndexMapping(entity.getExecutableEntityReference().getType()); + SearchClient searchClient = searchRepository.getSearchClient(); + SearchIndex index = + searchRepository + .getSearchIndexFactory() + .buildIndex(entity.getExecutableEntityReference().getType(), entityInterface); + Map doc = index.buildSearchIndexDoc(); + searchClient.updateEntity( + indexMapping.getIndexName(searchRepository.getClusterAlias()), + entity.getExecutableEntityReference().getId().toString(), + doc, + "ctx._source.testSuite = params.testSuite;"); + } + } + @SneakyThrows private List getResultSummary(UUID testSuiteId) { List resultSummaries = new ArrayList<>(); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/dqtests/TestSuiteResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/dqtests/TestSuiteResourceTest.java index c708b690f787..b4e0cff94a6e 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/dqtests/TestSuiteResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/dqtests/TestSuiteResourceTest.java @@ -3,6 +3,7 @@ import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -13,10 +14,13 @@ import static org.openmetadata.service.util.TestUtils.assertResponse; import static org.openmetadata.service.util.TestUtils.assertResponseContains; +import es.org.elasticsearch.client.Request; +import es.org.elasticsearch.client.RestClient; import java.io.IOException; import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -25,6 +29,7 @@ import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; import org.apache.http.client.HttpResponseException; +import org.apache.http.util.EntityUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -45,6 +50,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.resources.EntityResourceTest; import org.openmetadata.service.resources.databases.TableResourceTest; +import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.TestUtils; @@ -733,6 +739,56 @@ void get_listTestSuiteFromSearchWithPagination(TestInfo testInfo) throws IOExcep } } + @Test + void create_executableTestSuiteAndCheckSearchClient(TestInfo test) throws IOException { + TableResourceTest tableResourceTest = new TableResourceTest(); + CreateTable tableReq = + tableResourceTest + .createRequest(test) + .withColumns( + List.of( + new Column() + .withName(C1) + .withDisplayName("c1") + .withDataType(ColumnDataType.VARCHAR) + .withDataLength(10))); + Table table = tableResourceTest.createEntity(tableReq, ADMIN_AUTH_HEADERS); + CreateTestSuite createTestSuite = createRequest(table.getFullyQualifiedName()); + TestSuite testSuite = createExecutableTestSuite(createTestSuite, ADMIN_AUTH_HEADERS); + RestClient searchClient = getSearchClient(); + IndexMapping index = Entity.getSearchRepository().getIndexMapping(Entity.TABLE); + es.org.elasticsearch.client.Response response; + Request request = + new Request( + "GET", + String.format( + "%s/_search", index.getIndexName(Entity.getSearchRepository().getClusterAlias()))); + String query = + String.format( + "{\"size\": 10,\"query\":{\"bool\":{\"must\":[{\"term\":{\"_id\":\"%s\"}}]}}}", + table.getId().toString()); + request.setJsonEntity(query); + try { + response = searchClient.performRequest(request); + } finally { + searchClient.close(); + } + String jsonString = EntityUtils.toString(response.getEntity()); + HashMap map = + (HashMap) JsonUtils.readOrConvertValue(jsonString, HashMap.class); + LinkedHashMap hits = (LinkedHashMap) map.get("hits"); + ArrayList> hitsList = + (ArrayList>) hits.get("hits"); + assertNotEquals(0, hitsList.size()); + assertTrue( + hitsList.stream() + .allMatch( + hit -> + ((LinkedHashMap) hit.get("_source")) + .get("id") + .equals(table.getId().toString()))); + } + public ResultList getTestSuites( Integer limit, String fields,