From 027721104f8a166e829a82aab00fa129774c7969 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Tue, 25 Jun 2024 09:43:48 -0400 Subject: [PATCH 1/8] merge definitions --- hooli_data_eng/definitions.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 34aa6be..14a4c9c 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -7,7 +7,7 @@ multiprocess_executor, ) -from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets +from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets, asset_group1, asset_group2 from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job from hooli_data_eng.assets.marketing import check_avg_orders from hooli_data_eng.assets.raw_data import check_users, raw_data_schema_checks @@ -44,6 +44,9 @@ dbt_assets = load_assets_from_modules([dbt_assets]) +asset_group1 = load_assets_from_modules([asset_group1], group_name="GROUP1") +asset_group2 = load_assets_from_modules([asset_group2], group_name="GROUP2") + dbt_asset_checks = build_column_schema_change_checks(assets=[*dbt_assets]) # Our final set of assets represent Python code that @@ -55,6 +58,7 @@ marketing_assets = load_assets_from_package_module(marketing, group_name="MARKETING") + # --------------------------------------------------- # Definitions From 2e145b45e6fb1210796c67a9e0604760311ad9da Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Thu, 27 Jun 2024 21:25:33 -0400 Subject: [PATCH 2/8] add execute_queries for testing --- hooli_snowflake_insights/definitions.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index c5412ec..3fa2026 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -1,6 +1,6 @@ import os -from dagster import Definitions, EnvVar, ResourceDefinition +from dagster import Definitions, EnvVar, ResourceDefinition,asset from dagster_cloud.dagster_insights import ( create_snowflake_insights_asset_and_schedule, ) @@ -35,6 +35,15 @@ def get_env(): }, } + +@asset +def test_execute_queries(snowflake: SnowflakeResource): + queries = [ + "select 1", + "select 2" + ] + snowflake.execute_queries(sql_queries=queries, fetch_results=False) + # Creates an asset (poll_snowflake_query_history_hour) and sets its schedule snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule( "2023-10-29-00:00", @@ -43,7 +52,7 @@ def get_env(): ) defs = Definitions( - assets=[*snowflake_insights_definitions.assets,], + assets=[*snowflake_insights_definitions.assets,test_execute_queries], schedules=[snowflake_insights_definitions.schedule,], resources=resource_def[get_env()], ) From 658b8743d40ca562c59cb48a487809a0dded6877 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 28 Jun 2024 09:21:47 -0400 Subject: [PATCH 3/8] fix resource used --- hooli_snowflake_insights/definitions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index 3fa2026..9b3741d 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -37,12 +37,12 @@ def get_env(): @asset -def test_execute_queries(snowflake: SnowflakeResource): +def test_execute_queries(snowflake_insights: SnowflakeResource): queries = [ "select 1", "select 2" ] - snowflake.execute_queries(sql_queries=queries, fetch_results=False) + snowflake_insights.execute_queries(sql_queries=queries, fetch_results=False) # Creates an asset (poll_snowflake_query_history_hour) and sets its schedule snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule( From e56efc7dd4a956f774d416b6c8e864dc0e5a534a Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 28 Jun 2024 09:38:06 -0400 Subject: [PATCH 4/8] fix imports --- hooli_data_eng/definitions.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 14a4c9c..61dab9e 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -7,7 +7,7 @@ multiprocess_executor, ) -from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets, asset_group1, asset_group2 +from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job from hooli_data_eng.assets.marketing import check_avg_orders from hooli_data_eng.assets.raw_data import check_users, raw_data_schema_checks @@ -44,9 +44,6 @@ dbt_assets = load_assets_from_modules([dbt_assets]) -asset_group1 = load_assets_from_modules([asset_group1], group_name="GROUP1") -asset_group2 = load_assets_from_modules([asset_group2], group_name="GROUP2") - dbt_asset_checks = build_column_schema_change_checks(assets=[*dbt_assets]) # Our final set of assets represent Python code that From cb80f96c3f6071e6994baf626afe748dbe52ace8 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 28 Jun 2024 11:36:30 -0400 Subject: [PATCH 5/8] add new execute_queries method --- hooli_snowflake_insights/definitions.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index 9b3741d..e7f3371 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -1,10 +1,11 @@ import os -from dagster import Definitions, EnvVar, ResourceDefinition,asset +from dagster import Definitions, EnvVar, ResourceDefinition,asset, AssetExectionContext from dagster_cloud.dagster_insights import ( create_snowflake_insights_asset_and_schedule, ) from dagster_snowflake import SnowflakeResource +from contextlib import closing # Used to derive environment (LOCAL, BRANCH, PROD) def get_env(): @@ -36,6 +37,22 @@ def get_env(): } +def execute_snowflake_queries(context, snowflake: SnowflakeResource, queries: list): + with snowflake.get_connection() as conn: + with closing(conn.cursor()) as cursor: + for query in queries: + cursor.execute(query) + context.log.info("Executing query: " + query) + + +@asset +def test_execute_queries_with_get_connection(context: AssetExectionContext, snowflake_insights: SnowflakeResource): + queries = [ + "select 1", + "select 2" + ] + execute_snowflake_queries(snowflake_insights, queries) + @asset def test_execute_queries(snowflake_insights: SnowflakeResource): queries = [ From ff11b6ef6c651ba988c52395c0bd3e8ff2dd214c Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 28 Jun 2024 11:52:10 -0400 Subject: [PATCH 6/8] fix spelling --- hooli_snowflake_insights/definitions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index e7f3371..1d6072b 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -1,6 +1,6 @@ import os -from dagster import Definitions, EnvVar, ResourceDefinition,asset, AssetExectionContext +from dagster import Definitions, EnvVar, ResourceDefinition,asset, AssetExecutionContext from dagster_cloud.dagster_insights import ( create_snowflake_insights_asset_and_schedule, ) @@ -46,7 +46,7 @@ def execute_snowflake_queries(context, snowflake: SnowflakeResource, queries: li @asset -def test_execute_queries_with_get_connection(context: AssetExectionContext, snowflake_insights: SnowflakeResource): +def test_execute_queries_with_get_connection(context: AssetExecutionContext, snowflake_insights: SnowflakeResource): queries = [ "select 1", "select 2" From b9507b4c05e82778bb03c289aa0b430d76bed803 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 28 Jun 2024 11:53:46 -0400 Subject: [PATCH 7/8] actually add --- hooli_snowflake_insights/definitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index 1d6072b..3c0c0d4 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -69,7 +69,7 @@ def test_execute_queries(snowflake_insights: SnowflakeResource): ) defs = Definitions( - assets=[*snowflake_insights_definitions.assets,test_execute_queries], + assets=[*snowflake_insights_definitions.assets,test_execute_queries, test_execute_queries_with_get_connection], schedules=[snowflake_insights_definitions.schedule,], resources=resource_def[get_env()], ) From d172cb4d5f0f9c0fbc15653901f21e61560ed03a Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 28 Jun 2024 11:55:35 -0400 Subject: [PATCH 8/8] fix arguments --- hooli_snowflake_insights/definitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index 3c0c0d4..dff3859 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -51,7 +51,7 @@ def test_execute_queries_with_get_connection(context: AssetExecutionContext, sno "select 1", "select 2" ] - execute_snowflake_queries(snowflake_insights, queries) + execute_snowflake_queries(context, snowflake_insights, queries) @asset def test_execute_queries(snowflake_insights: SnowflakeResource):