Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise snowflake plugins doc #1127

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#
# Structured dataset is a superset of Flyte Schema.
#
# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, or any storage by registering new structured dataset encoder and decoder.
# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, Snowflake, or any storage by registering new structured dataset encoder and decoder.
#
# Flytekit makes it possible to return or accept a {py:class}`pandas.DataFrame` which is automatically
# converted into Flyte's abstract representation of a structured dataset object.
Expand Down Expand Up @@ -81,24 +81,34 @@ def get_schema_df(a: int) -> FlyteSchema[superset_cols]:
def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols]:
df = df.open(pd.DataFrame).all()
df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
# On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table
# When specifying a BigQuery or Snowflake URI for a StructuredDataset, flytekit exports a Pandas DataFrame to a table in BigQuery or Snowflake.
return StructuredDataset(dataframe=df)


# %% [markdown]
# ## StructuredDataset with `uri` Argument
#
# BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`. The `uri` comprises of the bucket name and the filename prefixed with `gs://`.
# If you specify BigQuery `uri` for StructuredDataset, BigQuery creates a table in the location specified by the `uri`.
# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, or any storage.
# Let's understand how to convert a pandas DataFrame to a BigQuery table and vice-versa through an example.
# Both Snowflake and BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`.
# The `uri` comprises of the bucket name and the filename prefixed with `bq://` for BigQuery and `snowflake://` for Snowflake.
# If you specify in either BigQuery or Snowflake `uri` for StructuredDataset, it will create a table in the location specified by the `uri`.
# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, Snowflake or any storage.
# Let's understand how to convert a pandas DataFrame to a BigQuery or Snowflake table and vice-versa through an example.
#
# Before writing DataFrame to a BigQuery table,
#
# 1. Create a [GCP account](https://cloud.google.com/docs/authentication/getting-started) and create a service account.
# 2. Create a project and add the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to your .bashrc file.
# 3. Create a dataset in your project.

# Before writing DataFrame to a Snowflake table,
#
# 1. Create a [Snowflake account](https://signup.snowflake.com/) and create a service account.
# 2. Create a dataset in your project.
# 3. Use [Key Pair Authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth) to setup the connections with Snowflake.
# 4. run the following command to setup the secret
# ```bash
# kubectl create secret generic snowflake --namespace=flytesnacks-development --from-file=private_key={your_private_key_above}
# ```
# %% [markdown]
# Import the dependencies.
# %%
Expand All @@ -118,6 +128,19 @@ def pandas_to_bq() -> StructuredDataset:
return StructuredDataset(dataframe=df, uri="bq://sample-project-1-352610.sample_352610.test1")


# %% [markdown]
# Define a task that converts a pandas DataFrame to a Snowflake table.
# %%
@task
def pandas_to_sf() -> StructuredDataset:
# create a pandas dataframe
df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
# convert the dataframe to StructuredDataset
return StructuredDataset(
dataframe=df, uri="snowflake://<user>:<your_account>/<warehouse>/<database>/<schema>/<table>"
)


# %% [markdown]
# :::{note}
# The BigQuery uri's format is `bq://<project_name>.<dataset_name>.<table_name>`.
Expand All @@ -132,6 +155,20 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
return sd.open(pd.DataFrame).all()


# %% [markdown]
# :::{note}
# The Snowflake uri's format is `snowflake://<user>:<your_account>/<warehouse>/<database>/<schema>/<table>`.
# :::

# %% [markdown]
# Define a task that converts the Snowflake table to a pandas DataFrame.
# %%
@task
def sf_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
# convert to pandas dataframe
return sd.open(pd.DataFrame).all()


# %% [markdown]
# :::{note}
# Flyte creates the table inside the dataset in the project upon BigQuery query execution.
Expand All @@ -141,8 +178,13 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
# Trigger the tasks locally.
# %%
if __name__ == "__main__":
o1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1"))
o2 = pandas_to_bq()
obj_bq_1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1"))
obj_bq_2 = pandas_to_bq()

obj_sf_1 = sf_to_pandas(
sd=StructuredDataset(uri="snowflake://<user>:<your_account>/<warehouse>/<database>/<schema>/<table>")
)
obj_sf_2 = pandas_to_sf()


# %% [markdown]
Expand Down
8 changes: 7 additions & 1 deletion examples/snowflake_plugin/snowflake_plugin/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# %% [markdown]
# Instantiate a {py:class}`~flytekitplugins.snowflake.SnowflakeTask` to execute a query.
# Incorporate {py:class}`~flytekitplugins.snowflake.SnowflakeConfig` within the task to define the appropriate configuration.
# USERNAME is the account you login the snowflake website.
# %%
snowflake_task_no_io = SnowflakeTask(
name="sql.snowflake.no_io",
Expand All @@ -22,6 +23,8 @@
database="SNOWFLAKE_SAMPLE_DATA",
schema="TPCH_SF1000",
warehouse="COMPUTE_WH",
table="<TABLE_TO_INTERACT>",
user="<USERNAME>",
),
)

Expand Down Expand Up @@ -58,6 +61,7 @@
#
# Let us explore how we can parameterize our query to filter results for a specific country.
# This country will be provided as user input, using a nation key to specify it.
# USERNAME is the account you login the snowflake website.
# %%
snowflake_task_templatized_query = SnowflakeTask(
name="sql.snowflake.w_io",
Expand All @@ -68,8 +72,10 @@
database="SNOWFLAKE_SAMPLE_DATA",
schema="TPCH_SF1000",
warehouse="COMPUTE_WH",
table="<TABLE_TO_INTERACT>",
user="<USERNAME>",
),
query_template="SELECT * from CUSTOMER where C_NATIONKEY = {{ .inputs.nation_key }} limit 100",
query_template="SELECT * from CUSTOMER where C_NATIONKEY = %(nation_key)s limit 100",
)


Expand Down