From 2662cb3e137bb5f64e41d9ac7060f7f8c0431a25 Mon Sep 17 00:00:00 2001 From: sahiler Date: Tue, 16 Apr 2024 13:14:40 -0500 Subject: [PATCH 01/12] added upload to s3 function and todo's --- .../interactive-workflow.py | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/flows/machine_learning/interactive-workflows/interactive-workflow.py b/flows/machine_learning/interactive-workflows/interactive-workflow.py index ea7cf24..3f2e6ff 100644 --- a/flows/machine_learning/interactive-workflows/interactive-workflow.py +++ b/flows/machine_learning/interactive-workflows/interactive-workflow.py @@ -5,6 +5,8 @@ from prefect.blocks.system import JSON from prefect.input import RunInput from pydantic import Field +from prefect_aws.s3 import S3Bucket + URL = "https://randomuser.me/api/" @@ -22,10 +24,8 @@ "nat", ] - -class CreateArtifact(RunInput): - create_artifact: bool = Field(description="Would you like to create an artifact?") - +class userApproval(RunInput): + approve: bool = Field(description="Would you like to approve?") class CleanedInput(RunInput): features_to_keep: list[str] @@ -82,11 +82,11 @@ def create_artifact(): logger = get_run_logger() create_artifact_input = pause_flow_run( - wait_for_input=CreateArtifact.with_initial_data( - description=description_md, create_artifact=False + wait_for_input=userApproval.with_initial_data( + description=description_md, approve=False ) ) - if create_artifact_input.create_artifact: + if create_artifact_input.approve: logger.info("Report approved! Creating artifact...") create_table_artifact( key="table-of-users", table=JSON.load("all-users-json").value @@ -119,8 +119,42 @@ def create_names(): JSON(value=df).save("all-users-json", overwrite=True) return df +@flow(name="Upload to S3") +def upload_to_s3(results): + logger = get_run_logger() + logger.info(f"Uploading to S3: {results}") + + description_md = ( + "### Features available:\n" + f"```{results}```\n" + "### Would you like to upload to s3?" + ) + + logger = get_run_logger() + upload_to_s3_input = pause_flow_run( + wait_for_input=userApproval.with_initial_data( + description=description_md, approve=False + ) + ) + + if upload_to_s3_input.approve: + s3_bucket_block = S3Bucket.load("interactive-workflow-output") + + logger.info("Report approved! Uploading to s3...") + with open("./marvin_result.txt", "w") as outfile: + outfile.write(str(results)) + pass + + s3_bucket_block.upload_from_path( + "./marvin_result.txt", "marvin_result.txt" + ) + return results + if __name__ == "__main__": list_of_names = create_names() create_artifact() - ai_functions.extract_information() + # TODO add deployment for this entire workflow + # add smart naming convention for file names (potentially use marvin?) + results = ai_functions.extract_information() + upload_to_s3(results) From 1e8af1b2d5916464c7a765cb2a12babc49079b54 Mon Sep 17 00:00:00 2001 From: sahiler Date: Wed, 17 Apr 2024 11:01:28 -0500 Subject: [PATCH 02/12] finished marvin naming convention function --- .../interactive-workflow.py | 39 +-------- .../interactive-workflows/marvin_extension.py | 83 +++++++++++++++++-- 2 files changed, 82 insertions(+), 40 deletions(-) diff --git a/flows/machine_learning/interactive-workflows/interactive-workflow.py b/flows/machine_learning/interactive-workflows/interactive-workflow.py index 3f2e6ff..7effd1b 100644 --- a/flows/machine_learning/interactive-workflows/interactive-workflow.py +++ b/flows/machine_learning/interactive-workflows/interactive-workflow.py @@ -5,7 +5,6 @@ from prefect.blocks.system import JSON from prefect.input import RunInput from pydantic import Field -from prefect_aws.s3 import S3Bucket URL = "https://randomuser.me/api/" @@ -56,7 +55,7 @@ def user_input_remove_features(url: str): raw_data = fetch(url) features = "\n".join(raw_data.get("results")[0].keys()) - + print(f"type(features): {type(features)}") description_md = ( "## Features available:" f"\n```json{features}\n```\n" @@ -75,7 +74,7 @@ def user_input_remove_features(url: str): def create_artifact(): features = JSON.load("all-users-json").value description_md = ( - "### Features available:\n" + "### Artifact Object:\n" f"```{features}```\n" "### Would you like to create an artifact?" ) @@ -119,42 +118,12 @@ def create_names(): JSON(value=df).save("all-users-json", overwrite=True) return df -@flow(name="Upload to S3") -def upload_to_s3(results): - logger = get_run_logger() - logger.info(f"Uploading to S3: {results}") - - description_md = ( - "### Features available:\n" - f"```{results}```\n" - "### Would you like to upload to s3?" - ) - - logger = get_run_logger() - upload_to_s3_input = pause_flow_run( - wait_for_input=userApproval.with_initial_data( - description=description_md, approve=False - ) - ) - - if upload_to_s3_input.approve: - s3_bucket_block = S3Bucket.load("interactive-workflow-output") - - logger.info("Report approved! Uploading to s3...") - with open("./marvin_result.txt", "w") as outfile: - outfile.write(str(results)) - pass - - s3_bucket_block.upload_from_path( - "./marvin_result.txt", "marvin_result.txt" - ) - return results if __name__ == "__main__": list_of_names = create_names() create_artifact() # TODO add deployment for this entire workflow - # add smart naming convention for file names (potentially use marvin?) + # add smart naming convention for file names (potentially use) results = ai_functions.extract_information() - upload_to_s3(results) + ai_functions.upload_to_s3(results) diff --git a/flows/machine_learning/interactive-workflows/marvin_extension.py b/flows/machine_learning/interactive-workflows/marvin_extension.py index b74d547..f8b1fa5 100644 --- a/flows/machine_learning/interactive-workflows/marvin_extension.py +++ b/flows/machine_learning/interactive-workflows/marvin_extension.py @@ -2,23 +2,42 @@ from prefect import flow, get_run_logger, pause_flow_run from prefect.blocks.system import JSON, Secret from prefect.input import RunInput +from prefect_aws.s3 import S3Bucket +from pydantic import BaseModel, constr, Field + +from prefect.variables import Variable + DEFAULT_EXTRACT_QUERY = "Group by location and count the number of users in each location." # Create a table of a users name, location, coordinates, and continent the user is located +GENERATE_SUGGESTED_FILE_NAME = "10-letter phrase that describes the user's query: " +# "Please provide a single word for a file name that describes the user's query: " + + +class userApprovalAndFileName(RunInput): + file_name: constr(pattern=r"^[a-zA-Z]+$", max_length=10) + approve: bool = Field(description="Would you like to approve?") class InputQuery(RunInput): input_instructions: str +class generatedFileName(BaseModel): + fixed_length_string: constr(pattern=r"^[a-zA-Z]+$", min_length=10, max_length=10) + + @flow(name="Extract User Insights") def extract_information(): secret_block = Secret.load("openai-creds-interactive-workflows") marvin.settings.openai.api_key = secret_block.get() - description_md = f""" - The most recent user information: {JSON.load("all-users-json")} - What would you like to gain insights on? - """ + features = JSON.load("all-users-json").value + description_md = ( + "### Features available:\n" + f"```{features}```\n" + "### Please provide a query to extract user insights.\n" + ) + logger = get_run_logger() user_input = pause_flow_run( wait_for_input=InputQuery.with_initial_data( @@ -36,9 +55,63 @@ def extract_information(): """ ) result = marvin.extract( - JSON.load("all-users-json"), + JSON.load("all-users-json").value, target=str, instructions=user_input.input_instructions, ) + Variable.set(name="user_query", value=user_input.input_instructions, overwrite=True) + logger.info(f"Query results: {result}") return result + + +@flow(name="Upload to S3", log_prints=True) +def upload_to_s3(results): + logger = get_run_logger() + logger.info(f"Uploading to S3: {results}") + + description_md = ( + "## Query results:\n" + f"```{results}```\n" + "## Would you like to upload the results to s3?\n" + "### Please provide a file name based on the query from results.\n" + "### A suggestion is provided:" + ) + user_query = Variable.get("user_query") + + instructions = f"{GENERATE_SUGGESTED_FILE_NAME} + {user_query.value}" + print(instructions) + marvin_annotated_file_name = marvin.extract( + results, + target=generatedFileName, + instructions=instructions, + ) + + output_file_name = marvin_annotated_file_name[0].fixed_length_string + + Variable.set(name=output_file_name, value=output_file_name, overwrite=True) + + print(f"marvin_annotated_file_name: {marvin_annotated_file_name}") + print(f"marvin_annotated_file_name.fixed_length_string: {output_file_name}") + logger = get_run_logger() + upload_to_s3_input = pause_flow_run( + wait_for_input=userApprovalAndFileName.with_initial_data( + description=description_md, file_name=output_file_name, approve=False + ) + ) + + if upload_to_s3_input.approve: + s3_bucket_block = S3Bucket.load("interactive-workflow-output") + + logger.info("Report approved! Uploading to s3...") + with open(f"./{output_file_name}.txt", "w") as outfile: + outfile.write(str(results)) + pass + + s3_bucket_block.upload_from_path( + f"./{output_file_name}.txt", f"{output_file_name}.txt" + ) + else: + raise Exception("User did not approve") + + return results From 76822493e2b3f50de79b64b1ae68f3d72244c7be Mon Sep 17 00:00:00 2001 From: sahiler Date: Wed, 17 Apr 2024 13:47:50 -0500 Subject: [PATCH 03/12] adding in s3 example, removed marvin file naming task --- .../interactive-workflows/README.md | 7 ++- .../interactive-workflow.py | 11 ++-- .../interactive-workflows/marvin_extension.py | 55 ++++++++++--------- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/flows/machine_learning/interactive-workflows/README.md b/flows/machine_learning/interactive-workflows/README.md index 4fe87ad..275d455 100644 --- a/flows/machine_learning/interactive-workflows/README.md +++ b/flows/machine_learning/interactive-workflows/README.md @@ -20,6 +20,7 @@ Interactive Workflow Demo is designed to guide you through the process of settin Before starting, ensure you have the following installed: - Python 3.6+ - Prefect +- Marvin - Any other dependencies listed in `requirements.txt`. ### Installation @@ -45,7 +46,11 @@ pip install -r requirements.txt Make sure Prefect is running and you are logged into Prefect Cloud or your local Prefect instance. -2. **Execute the Workflow** +2. **Prefect & Marvin environment is set up** + +Ensure your S3Bucket block is correctly set up to upload files to that specific region. Additionally, ensure you have the OpenAI api key correctly loaded into Marvin. Once you are authenticated to the correct workspace, you are good to go to run the workflow. + +2. **Execute the Deployment script** Run the main script to execute the interactive workflow diff --git a/flows/machine_learning/interactive-workflows/interactive-workflow.py b/flows/machine_learning/interactive-workflows/interactive-workflow.py index 7effd1b..6d55688 100644 --- a/flows/machine_learning/interactive-workflows/interactive-workflow.py +++ b/flows/machine_learning/interactive-workflows/interactive-workflow.py @@ -119,11 +119,12 @@ def create_names(): return df - -if __name__ == "__main__": - list_of_names = create_names() +@flow(name="Interactive Workflow") +def interactive(): + create_names() create_artifact() - # TODO add deployment for this entire workflow - # add smart naming convention for file names (potentially use) results = ai_functions.extract_information() ai_functions.upload_to_s3(results) + +if __name__ == "__main__": + interactive() \ No newline at end of file diff --git a/flows/machine_learning/interactive-workflows/marvin_extension.py b/flows/machine_learning/interactive-workflows/marvin_extension.py index f8b1fa5..9b27952 100644 --- a/flows/machine_learning/interactive-workflows/marvin_extension.py +++ b/flows/machine_learning/interactive-workflows/marvin_extension.py @@ -1,20 +1,21 @@ import marvin -from prefect import flow, get_run_logger, pause_flow_run +from prefect import flow, task, get_run_logger, pause_flow_run from prefect.blocks.system import JSON, Secret from prefect.input import RunInput from prefect_aws.s3 import S3Bucket from pydantic import BaseModel, constr, Field - from prefect.variables import Variable DEFAULT_EXTRACT_QUERY = "Group by location and count the number of users in each location." # Create a table of a users name, location, coordinates, and continent the user is located -GENERATE_SUGGESTED_FILE_NAME = "10-letter phrase that describes the user's query: " -# "Please provide a single word for a file name that describes the user's query: " +GENERATE_SUGGESTED_FILE_NAME = ( + "Output a 10-letter single word that describes the user's query: " +) class userApprovalAndFileName(RunInput): - file_name: constr(pattern=r"^[a-zA-Z]+$", max_length=10) + #file_name: constr(pattern=r"^[a-zA-Z]+$", max_length=10) + file_name: str approve: bool = Field(description="Would you like to approve?") @@ -30,6 +31,7 @@ class generatedFileName(BaseModel): def extract_information(): secret_block = Secret.load("openai-creds-interactive-workflows") marvin.settings.openai.api_key = secret_block.get() + logger = get_run_logger() features = JSON.load("all-users-json").value description_md = ( @@ -46,8 +48,6 @@ def extract_information(): ) ) - logger = get_run_logger() - logger.info( f""" Extracting user insights... \n @@ -65,7 +65,22 @@ def extract_information(): return result -@flow(name="Upload to S3", log_prints=True) +@task(name="Generate Suggested File Name") +def generate_suggested_file_name(results): + user_query = Variable.get("user_query") + instructions = f"{GENERATE_SUGGESTED_FILE_NAME} + {user_query}" + marvin_annotated_file_name = marvin.extract( + results, + target=generatedFileName, + instructions=instructions, + ) + + output_file_name = marvin_annotated_file_name[0].fixed_length_string + Variable.set(name=output_file_name, value=output_file_name, overwrite=True) + return output_file_name + + +@flow(name="Upload to S3") def upload_to_s3(results): logger = get_run_logger() logger.info(f"Uploading to S3: {results}") @@ -77,28 +92,17 @@ def upload_to_s3(results): "### Please provide a file name based on the query from results.\n" "### A suggestion is provided:" ) - user_query = Variable.get("user_query") - instructions = f"{GENERATE_SUGGESTED_FILE_NAME} + {user_query.value}" - print(instructions) - marvin_annotated_file_name = marvin.extract( - results, - target=generatedFileName, - instructions=instructions, - ) - - output_file_name = marvin_annotated_file_name[0].fixed_length_string + # extract providing flaky results, in the meantime just use a static file name + # output_file_name = generate_suggested_file_name(results) + output_file_name = "marvin_extracted_results" - Variable.set(name=output_file_name, value=output_file_name, overwrite=True) - - print(f"marvin_annotated_file_name: {marvin_annotated_file_name}") - print(f"marvin_annotated_file_name.fixed_length_string: {output_file_name}") - logger = get_run_logger() upload_to_s3_input = pause_flow_run( wait_for_input=userApprovalAndFileName.with_initial_data( description=description_md, file_name=output_file_name, approve=False ) ) + output_file_name = upload_to_s3_input.file_name if upload_to_s3_input.approve: s3_bucket_block = S3Bucket.load("interactive-workflow-output") @@ -106,10 +110,11 @@ def upload_to_s3(results): logger.info("Report approved! Uploading to s3...") with open(f"./{output_file_name}.txt", "w") as outfile: outfile.write(str(results)) - pass + pass s3_bucket_block.upload_from_path( - f"./{output_file_name}.txt", f"{output_file_name}.txt" + f"./{output_file_name}.txt", + f"{output_file_name}.txt", ) else: raise Exception("User did not approve") From 2bdaeff8091386530837ed3362d82823f896142d Mon Sep 17 00:00:00 2001 From: sahiler Date: Wed, 17 Apr 2024 13:50:16 -0500 Subject: [PATCH 04/12] adding in updates for readme --- .../interactive-workflows/README.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/flows/machine_learning/interactive-workflows/README.md b/flows/machine_learning/interactive-workflows/README.md index 275d455..bf9760c 100644 --- a/flows/machine_learning/interactive-workflows/README.md +++ b/flows/machine_learning/interactive-workflows/README.md @@ -60,7 +60,24 @@ python interactive-workflows.py 3. **Follow the interactive prompts to navigate through the workflow.** ### Understanding the Workflow -The demo includes detailed comments explaining each step of the workflow and how it integrates with Prefect's features. Pay special attention to the use of blocks for reusable logic and artifacts for visualizing workflow outputs. +#### 1. deploy-interactive-workflow.py + +- **Purpose**: Deploys an interactive workflow using Prefect to a managed execution environment. +- **Key Components**: Prefect flows, job variable configuration for Python packages. +- **Usage**: Run this script to deploy the workflow defined in the `interactive-workflow.py` script to an execution environment. Ensure you have Prefect and other dependencies installed. + +#### 2. interactive-workflow.py + +- **Purpose**: Defines an interactive data processing workflow that fetches data from an API, allows users to specify data cleaning preferences, and manages user approvals for further actions. +- **Key Components**: Prefect flows and tasks, user interaction for data processing, logging and error handling. +- **Dependencies**: Requires `marvin_extension` module, `requests` for API calls, and Prefect libraries for workflow management. +- **Usage**: Run this script in an environment where Prefect is set up and the `marvin_extension.py` is accessible. + +#### 3. marvin_extension.py + +- **Purpose**: Provides custom functionalities used by the interactive workflow, likely data processing or feature extraction methods tailored for specific needs. +- **Usage**: This script is imported and used in `interactive-workflow.py`. It should be present in the same directory or properly installed as a module. + ### Contributing We welcome contributions to improve this demo! Please feel free to fork the repository, make your changes, and submit a pull request. Whether it's adding new features, fixing bugs, or improving the documentation, your contributions are highly appreciated. From f9f7ff4e15af4bd566ccb4ac8cc625922d019f3e Mon Sep 17 00:00:00 2001 From: Sahiler Date: Wed, 17 Apr 2024 13:52:15 -0500 Subject: [PATCH 05/12] "moving file structure around" --- .../interactive-workflows => aws/interactive-workflow}/README.md | 0 .../interactive-workflow}/interactive-workflow.py | 0 .../interactive-workflow}/marvin_extension.py | 0 .../interactive-workflow}/requirements.txt | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename flows/{machine_learning/interactive-workflows => aws/interactive-workflow}/README.md (100%) rename flows/{machine_learning/interactive-workflows => aws/interactive-workflow}/interactive-workflow.py (100%) rename flows/{machine_learning/interactive-workflows => aws/interactive-workflow}/marvin_extension.py (100%) rename flows/{machine_learning/interactive-workflows => aws/interactive-workflow}/requirements.txt (100%) diff --git a/flows/machine_learning/interactive-workflows/README.md b/flows/aws/interactive-workflow/README.md similarity index 100% rename from flows/machine_learning/interactive-workflows/README.md rename to flows/aws/interactive-workflow/README.md diff --git a/flows/machine_learning/interactive-workflows/interactive-workflow.py b/flows/aws/interactive-workflow/interactive-workflow.py similarity index 100% rename from flows/machine_learning/interactive-workflows/interactive-workflow.py rename to flows/aws/interactive-workflow/interactive-workflow.py diff --git a/flows/machine_learning/interactive-workflows/marvin_extension.py b/flows/aws/interactive-workflow/marvin_extension.py similarity index 100% rename from flows/machine_learning/interactive-workflows/marvin_extension.py rename to flows/aws/interactive-workflow/marvin_extension.py diff --git a/flows/machine_learning/interactive-workflows/requirements.txt b/flows/aws/interactive-workflow/requirements.txt similarity index 100% rename from flows/machine_learning/interactive-workflows/requirements.txt rename to flows/aws/interactive-workflow/requirements.txt From af536744ef552ff5c1d59d5b2417fee1e4282ad5 Mon Sep 17 00:00:00 2001 From: Sahiler Date: Wed, 17 Apr 2024 13:54:42 -0500 Subject: [PATCH 06/12] add in deploy file --- .../deploy-interactive-workflow.py | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 flows/aws/interactive-workflow/deploy-interactive-workflow.py diff --git a/flows/aws/interactive-workflow/deploy-interactive-workflow.py b/flows/aws/interactive-workflow/deploy-interactive-workflow.py new file mode 100644 index 0000000..af6647c --- /dev/null +++ b/flows/aws/interactive-workflow/deploy-interactive-workflow.py @@ -0,0 +1,10 @@ +from prefect import flow + +flow.from_source( +source="https://github.com/Sahiler/interactive-workflow-demo.git", +entrypoint="interactive.py:interactive", +).deploy( + name="interactive-workflow-demo", + work_pool_name="managed-execution", + job_variables={"pip_packages": ["marvin", "prefect-aws"]} +) From bd05e08d316d2df065380d9ffe0e913dd60d207a Mon Sep 17 00:00:00 2001 From: Sahiler Date: Wed, 17 Apr 2024 13:55:28 -0500 Subject: [PATCH 07/12] Update requirements.txt --- flows/aws/interactive-workflow/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/flows/aws/interactive-workflow/requirements.txt b/flows/aws/interactive-workflow/requirements.txt index 6babbeb..cc2c9c3 100644 --- a/flows/aws/interactive-workflow/requirements.txt +++ b/flows/aws/interactive-workflow/requirements.txt @@ -1,2 +1,3 @@ prefect marvin +prefect-aws From 67675b367ff8348b79af545a43070270f1427d41 Mon Sep 17 00:00:00 2001 From: sahiler Date: Wed, 17 Apr 2024 14:29:31 -0500 Subject: [PATCH 08/12] testing precommits & update to readme --- flows/aws/interactive-workflow/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flows/aws/interactive-workflow/README.md b/flows/aws/interactive-workflow/README.md index bf9760c..f0ba578 100644 --- a/flows/aws/interactive-workflow/README.md +++ b/flows/aws/interactive-workflow/README.md @@ -59,7 +59,7 @@ python interactive-workflows.py ``` 3. **Follow the interactive prompts to navigate through the workflow.** -### Understanding the Workflow +## Understanding the Workflow #### 1. deploy-interactive-workflow.py - **Purpose**: Deploys an interactive workflow using Prefect to a managed execution environment. From b61dd5075cbeccf81e3043cd75770da5f0445188 Mon Sep 17 00:00:00 2001 From: sahiler Date: Wed, 17 Apr 2024 14:38:22 -0500 Subject: [PATCH 09/12] got pre-commits working, commiting changes --- .../deploy-interactive-workflow.py | 6 +++--- .../aws/interactive-workflow/interactive-workflow.py | 6 ++++-- flows/aws/interactive-workflow/marvin_extension.py | 11 +++++------ 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/flows/aws/interactive-workflow/deploy-interactive-workflow.py b/flows/aws/interactive-workflow/deploy-interactive-workflow.py index af6647c..bb23092 100644 --- a/flows/aws/interactive-workflow/deploy-interactive-workflow.py +++ b/flows/aws/interactive-workflow/deploy-interactive-workflow.py @@ -1,10 +1,10 @@ from prefect import flow flow.from_source( -source="https://github.com/Sahiler/interactive-workflow-demo.git", -entrypoint="interactive.py:interactive", + source="https://github.com/Sahiler/interactive-workflow-demo.git", + entrypoint="interactive.py:interactive", ).deploy( name="interactive-workflow-demo", work_pool_name="managed-execution", - job_variables={"pip_packages": ["marvin", "prefect-aws"]} + job_variables={"pip_packages": ["marvin", "prefect-aws"]}, ) diff --git a/flows/aws/interactive-workflow/interactive-workflow.py b/flows/aws/interactive-workflow/interactive-workflow.py index 6d55688..cd84199 100644 --- a/flows/aws/interactive-workflow/interactive-workflow.py +++ b/flows/aws/interactive-workflow/interactive-workflow.py @@ -6,7 +6,6 @@ from prefect.input import RunInput from pydantic import Field - URL = "https://randomuser.me/api/" DEFAULT_FEATURES_TO_DROP = [ @@ -23,9 +22,11 @@ "nat", ] + class userApproval(RunInput): approve: bool = Field(description="Would you like to approve?") + class CleanedInput(RunInput): features_to_keep: list[str] @@ -126,5 +127,6 @@ def interactive(): results = ai_functions.extract_information() ai_functions.upload_to_s3(results) + if __name__ == "__main__": - interactive() \ No newline at end of file + interactive() diff --git a/flows/aws/interactive-workflow/marvin_extension.py b/flows/aws/interactive-workflow/marvin_extension.py index 9b27952..d3faea2 100644 --- a/flows/aws/interactive-workflow/marvin_extension.py +++ b/flows/aws/interactive-workflow/marvin_extension.py @@ -1,11 +1,10 @@ import marvin -from prefect import flow, task, get_run_logger, pause_flow_run +from prefect import flow, get_run_logger, pause_flow_run, task from prefect.blocks.system import JSON, Secret from prefect.input import RunInput -from prefect_aws.s3 import S3Bucket -from pydantic import BaseModel, constr, Field from prefect.variables import Variable - +from prefect_aws.s3 import S3Bucket +from pydantic import BaseModel, Field, constr DEFAULT_EXTRACT_QUERY = "Group by location and count the number of users in each location." # Create a table of a users name, location, coordinates, and continent the user is located GENERATE_SUGGESTED_FILE_NAME = ( @@ -14,7 +13,7 @@ class userApprovalAndFileName(RunInput): - #file_name: constr(pattern=r"^[a-zA-Z]+$", max_length=10) + # file_name: constr(pattern=r"^[a-zA-Z]+$", max_length=10) file_name: str approve: bool = Field(description="Would you like to approve?") @@ -102,7 +101,7 @@ def upload_to_s3(results): description=description_md, file_name=output_file_name, approve=False ) ) - output_file_name = upload_to_s3_input.file_name + output_file_name = upload_to_s3_input.file_name if upload_to_s3_input.approve: s3_bucket_block = S3Bucket.load("interactive-workflow-output") From 11a6bdd8bcada7c207572408aeb95c8049db0c2e Mon Sep 17 00:00:00 2001 From: Sahiler Date: Wed, 17 Apr 2024 18:12:13 -0500 Subject: [PATCH 10/12] datetime and small fixes --- .../aws/interactive-workflow/interactive-workflow.py | 5 ++--- flows/aws/interactive-workflow/marvin_extension.py | 11 ++++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flows/aws/interactive-workflow/interactive-workflow.py b/flows/aws/interactive-workflow/interactive-workflow.py index cd84199..da99c86 100644 --- a/flows/aws/interactive-workflow/interactive-workflow.py +++ b/flows/aws/interactive-workflow/interactive-workflow.py @@ -23,7 +23,7 @@ ] -class userApproval(RunInput): +class UserApproval(RunInput): approve: bool = Field(description="Would you like to approve?") @@ -56,7 +56,6 @@ def user_input_remove_features(url: str): raw_data = fetch(url) features = "\n".join(raw_data.get("results")[0].keys()) - print(f"type(features): {type(features)}") description_md = ( "## Features available:" f"\n```json{features}\n```\n" @@ -82,7 +81,7 @@ def create_artifact(): logger = get_run_logger() create_artifact_input = pause_flow_run( - wait_for_input=userApproval.with_initial_data( + wait_for_input=UserApproval.with_initial_data( description=description_md, approve=False ) ) diff --git a/flows/aws/interactive-workflow/marvin_extension.py b/flows/aws/interactive-workflow/marvin_extension.py index d3faea2..725bbac 100644 --- a/flows/aws/interactive-workflow/marvin_extension.py +++ b/flows/aws/interactive-workflow/marvin_extension.py @@ -5,6 +5,8 @@ from prefect.variables import Variable from prefect_aws.s3 import S3Bucket from pydantic import BaseModel, Field, constr +from datetime import datetime + DEFAULT_EXTRACT_QUERY = "Group by location and count the number of users in each location." # Create a table of a users name, location, coordinates, and continent the user is located GENERATE_SUGGESTED_FILE_NAME = ( @@ -13,7 +15,6 @@ class userApprovalAndFileName(RunInput): - # file_name: constr(pattern=r"^[a-zA-Z]+$", max_length=10) file_name: str approve: bool = Field(description="Would you like to approve?") @@ -22,7 +23,7 @@ class InputQuery(RunInput): input_instructions: str -class generatedFileName(BaseModel): +class GeneratedFileName(BaseModel): fixed_length_string: constr(pattern=r"^[a-zA-Z]+$", min_length=10, max_length=10) @@ -54,7 +55,7 @@ def extract_information(): """ ) result = marvin.extract( - JSON.load("all-users-json").value, + features, target=str, instructions=user_input.input_instructions, ) @@ -70,7 +71,7 @@ def generate_suggested_file_name(results): instructions = f"{GENERATE_SUGGESTED_FILE_NAME} + {user_query}" marvin_annotated_file_name = marvin.extract( results, - target=generatedFileName, + target=GeneratedFileName, instructions=instructions, ) @@ -94,7 +95,7 @@ def upload_to_s3(results): # extract providing flaky results, in the meantime just use a static file name # output_file_name = generate_suggested_file_name(results) - output_file_name = "marvin_extracted_results" + output_file_name = f"{datetime.now()}_marvin_extracted_results" upload_to_s3_input = pause_flow_run( wait_for_input=userApprovalAndFileName.with_initial_data( From 35dbcedc219802efd8374e24e157a3f0c5fdfb9c Mon Sep 17 00:00:00 2001 From: Sahiler Date: Mon, 22 Apr 2024 07:47:58 -0500 Subject: [PATCH 11/12] readme updates and added in s3 logic --- flows/aws/interactive-workflow/README.md | 11 +++-------- .../{deploy-interactive-workflow.py => deploy.py} | 0 flows/aws/interactive-workflow/marvin_extension.py | 11 +---------- 3 files changed, 4 insertions(+), 18 deletions(-) rename flows/aws/interactive-workflow/{deploy-interactive-workflow.py => deploy.py} (100%) diff --git a/flows/aws/interactive-workflow/README.md b/flows/aws/interactive-workflow/README.md index f0ba578..b893f78 100644 --- a/flows/aws/interactive-workflow/README.md +++ b/flows/aws/interactive-workflow/README.md @@ -1,6 +1,6 @@ # Interactive Workflow Demo with Prefect -This repository demonstrates the integration of Prefect with interactive workflows, aiming to provide a hands-on example for orchestrating complex data pipelines interactively. It showcases how to leverage Prefect's powerful scheduling, execution, and management capabilities to build efficient and scalable data workflows. +This repository demonstrates the integration of Prefect with interactive workflows, aiming to provide a hands-on example for orchestrating complex data pipelines interactively. It showcases how to leverage Prefect's powerful scheduling, execution, and management capabilities to build efficient and scalable data workflows. Prefect allows for dynamic data cleaning and kicking off jobs based on human involvement. This repo can be great for folks looking to add in more QA steps to their pre-existing pipelines. ## Overview @@ -18,12 +18,11 @@ Interactive Workflow Demo is designed to guide you through the process of settin ### Prerequisites Before starting, ensure you have the following installed: -- Python 3.6+ - Prefect - Marvin - Any other dependencies listed in `requirements.txt`. -### Installation +### Setup 1. Clone this repository: ```bash @@ -41,14 +40,13 @@ source venv/bin/activate # On Windows use `venv\Scripts\activate` pip install -r requirements.txt ``` -### Running the Demo 1. **Start Prefect** Make sure Prefect is running and you are logged into Prefect Cloud or your local Prefect instance. 2. **Prefect & Marvin environment is set up** -Ensure your S3Bucket block is correctly set up to upload files to that specific region. Additionally, ensure you have the OpenAI api key correctly loaded into Marvin. Once you are authenticated to the correct workspace, you are good to go to run the workflow. +Ensure your [S3Bucket](https://prefecthq.github.io/prefect-aws/s3/) block is correctly set up to upload files to that specific region. Additionally, ensure you have the OpenAI api key correctly loaded into [Marvin](https://www.askmarvin.ai/welcome/installation/). Once you are authenticated to the correct workspace, you are good to go to run the workflow. 2. **Execute the Deployment script** @@ -79,9 +77,6 @@ python interactive-workflows.py - **Usage**: This script is imported and used in `interactive-workflow.py`. It should be present in the same directory or properly installed as a module. -### Contributing -We welcome contributions to improve this demo! Please feel free to fork the repository, make your changes, and submit a pull request. Whether it's adding new features, fixing bugs, or improving the documentation, your contributions are highly appreciated. - ### Acknowledgments Thanks to the Prefect community for providing the tools and support to make this demo possible. Join us in exploring the capabilities of Prefect with interactive workflows, and see how it can transform your data pipeline management and execution. diff --git a/flows/aws/interactive-workflow/deploy-interactive-workflow.py b/flows/aws/interactive-workflow/deploy.py similarity index 100% rename from flows/aws/interactive-workflow/deploy-interactive-workflow.py rename to flows/aws/interactive-workflow/deploy.py diff --git a/flows/aws/interactive-workflow/marvin_extension.py b/flows/aws/interactive-workflow/marvin_extension.py index 725bbac..3c173f8 100644 --- a/flows/aws/interactive-workflow/marvin_extension.py +++ b/flows/aws/interactive-workflow/marvin_extension.py @@ -105,17 +105,8 @@ def upload_to_s3(results): output_file_name = upload_to_s3_input.file_name if upload_to_s3_input.approve: - s3_bucket_block = S3Bucket.load("interactive-workflow-output") + s3_havanese_path = s3_bucket_block.write_path(path=f"./{output_file_name}.txt", content=str(results)) - logger.info("Report approved! Uploading to s3...") - with open(f"./{output_file_name}.txt", "w") as outfile: - outfile.write(str(results)) - pass - - s3_bucket_block.upload_from_path( - f"./{output_file_name}.txt", - f"{output_file_name}.txt", - ) else: raise Exception("User did not approve") From a826e62480da8544a9c1e7d81b830cdaeaece56b Mon Sep 17 00:00:00 2001 From: Sahiler Date: Mon, 22 Apr 2024 07:59:18 -0500 Subject: [PATCH 12/12] add in more updates to the readme --- flows/aws/interactive-workflow/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flows/aws/interactive-workflow/README.md b/flows/aws/interactive-workflow/README.md index b893f78..36074df 100644 --- a/flows/aws/interactive-workflow/README.md +++ b/flows/aws/interactive-workflow/README.md @@ -4,7 +4,7 @@ This repository demonstrates the integration of Prefect with interactive workflo ## Overview -Interactive Workflow Demo is designed to guide you through the process of setting up and running interactive data pipelines using Prefect. It covers the basics of Prefect's workflow orchestration capabilities, including the use of tasks, flows, parameters, and Prefect Cloud for monitoring and managing workflows. +Interactive Workflow Demo is designed to guide you through the process of setting up and running interactive data pipelines using Prefect. It covers the basics of Prefect's workflow orchestration capabilities, including the use of tasks, flows, parameters, and Prefect Cloud for monitoring and managing workflows. Human interaction allows for greater oversight over automated processes. Users can intervene in workflows when necessary, for example, to handle exceptions, perform approvals, or make decisions that are too complex or sensitive for automated systems. This adds an important layer of control, ensuring that operations conform to business rules and regulatory requirements. Human operators can modify workflows based on real-time insights and changing conditions, which might be outside the scope of predefined automation rules. ## Features @@ -76,7 +76,6 @@ python interactive-workflows.py - **Purpose**: Provides custom functionalities used by the interactive workflow, likely data processing or feature extraction methods tailored for specific needs. - **Usage**: This script is imported and used in `interactive-workflow.py`. It should be present in the same directory or properly installed as a module. - ### Acknowledgments Thanks to the Prefect community for providing the tools and support to make this demo possible. Join us in exploring the capabilities of Prefect with interactive workflows, and see how it can transform your data pipeline management and execution.