diff --git a/examples/data_types_and_io/data_types_and_io/structured_dataset.py b/examples/data_types_and_io/data_types_and_io/structured_dataset.py index 85db71718..86517e47b 100644 --- a/examples/data_types_and_io/data_types_and_io/structured_dataset.py +++ b/examples/data_types_and_io/data_types_and_io/structured_dataset.py @@ -9,7 +9,7 @@ # # Structured dataset is a superset of Flyte Schema. # -# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, or any storage by registering new structured dataset encoder and decoder. +# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, Snowflake, or any storage by registering new structured dataset encoder and decoder. # # Flytekit makes it possible to return or accept a {py:class}`pandas.DataFrame` which is automatically # converted into Flyte's abstract representation of a structured dataset object. @@ -81,17 +81,18 @@ def get_schema_df(a: int) -> FlyteSchema[superset_cols]: def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols]: df = df.open(pd.DataFrame).all() df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])]) - # On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table + # When specifying a BigQuery or Snowflake URI for a StructuredDataset, flytekit exports a Pandas DataFrame to a table in BigQuery or Snowflake. return StructuredDataset(dataframe=df) # %% [markdown] # ## StructuredDataset with `uri` Argument # -# BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`. The `uri` comprises of the bucket name and the filename prefixed with `gs://`. -# If you specify BigQuery `uri` for StructuredDataset, BigQuery creates a table in the location specified by the `uri`. -# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, or any storage. -# Let's understand how to convert a pandas DataFrame to a BigQuery table and vice-versa through an example. +# Both Snowflake and BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`. +# The `uri` comprises of the bucket name and the filename prefixed with `bq://` for BigQuery and `snowflake://` for Snowflake. +# If you specify in either BigQuery or Snowflake `uri` for StructuredDataset, it will create a table in the location specified by the `uri`. +# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, Snowflake or any storage. +# Let's understand how to convert a pandas DataFrame to a BigQuery or Snowflake table and vice-versa through an example. # # Before writing DataFrame to a BigQuery table, # @@ -99,6 +100,15 @@ def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[St # 2. Create a project and add the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to your .bashrc file. # 3. Create a dataset in your project. +# Before writing DataFrame to a Snowflake table, +# +# 1. Create a [Snowflake account](https://signup.snowflake.com/) and create a service account. +# 2. Create a dataset in your project. +# 3. Use [Key Pair Authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth) to setup the connections with Snowflake. +# 4. run the following command to setup the secret +# ```bash +# kubectl create secret generic snowflake --namespace=flytesnacks-development --from-file=private_key={your_private_key_above} +# ``` # %% [markdown] # Import the dependencies. # %% @@ -118,6 +128,19 @@ def pandas_to_bq() -> StructuredDataset: return StructuredDataset(dataframe=df, uri="bq://sample-project-1-352610.sample_352610.test1") +# %% [markdown] +# Define a task that converts a pandas DataFrame to a Snowflake table. +# %% +@task +def pandas_to_sf() -> StructuredDataset: + # create a pandas dataframe + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) + # convert the dataframe to StructuredDataset + return StructuredDataset( + dataframe=df, uri="snowflake://:////" + ) + + # %% [markdown] # :::{note} # The BigQuery uri's format is `bq://..`. @@ -132,6 +155,20 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame: return sd.open(pd.DataFrame).all() +# %% [markdown] +# :::{note} +# The Snowflake uri's format is `snowflake://:////
`. +# ::: + +# %% [markdown] +# Define a task that converts the Snowflake table to a pandas DataFrame. +# %% +@task +def sf_to_pandas(sd: StructuredDataset) -> pd.DataFrame: + # convert to pandas dataframe + return sd.open(pd.DataFrame).all() + + # %% [markdown] # :::{note} # Flyte creates the table inside the dataset in the project upon BigQuery query execution. @@ -141,8 +178,13 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame: # Trigger the tasks locally. # %% if __name__ == "__main__": - o1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1")) - o2 = pandas_to_bq() + obj_bq_1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1")) + obj_bq_2 = pandas_to_bq() + + obj_sf_1 = sf_to_pandas( + sd=StructuredDataset(uri="snowflake://:////
") + ) + obj_sf_2 = pandas_to_sf() # %% [markdown] diff --git a/examples/snowflake_plugin/snowflake_plugin/snowflake.py b/examples/snowflake_plugin/snowflake_plugin/snowflake.py index 4a466b3f0..7a2e77eb7 100644 --- a/examples/snowflake_plugin/snowflake_plugin/snowflake.py +++ b/examples/snowflake_plugin/snowflake_plugin/snowflake.py @@ -11,6 +11,7 @@ # %% [markdown] # Instantiate a {py:class}`~flytekitplugins.snowflake.SnowflakeTask` to execute a query. # Incorporate {py:class}`~flytekitplugins.snowflake.SnowflakeConfig` within the task to define the appropriate configuration. +# USERNAME is the account you login the snowflake website. # %% snowflake_task_no_io = SnowflakeTask( name="sql.snowflake.no_io", @@ -22,6 +23,8 @@ database="SNOWFLAKE_SAMPLE_DATA", schema="TPCH_SF1000", warehouse="COMPUTE_WH", + table="", + user="", ), ) @@ -58,6 +61,7 @@ # # Let us explore how we can parameterize our query to filter results for a specific country. # This country will be provided as user input, using a nation key to specify it. +# USERNAME is the account you login the snowflake website. # %% snowflake_task_templatized_query = SnowflakeTask( name="sql.snowflake.w_io", @@ -68,8 +72,10 @@ database="SNOWFLAKE_SAMPLE_DATA", schema="TPCH_SF1000", warehouse="COMPUTE_WH", + table="", + user="", ), - query_template="SELECT * from CUSTOMER where C_NATIONKEY = {{ .inputs.nation_key }} limit 100", + query_template="SELECT * from CUSTOMER where C_NATIONKEY = %(nation_key)s limit 100", )