diff --git a/flows/aws/interactive-workflow/README.md b/flows/aws/interactive-workflow/README.md new file mode 100644 index 0000000..36074df --- /dev/null +++ b/flows/aws/interactive-workflow/README.md @@ -0,0 +1,82 @@ +# Interactive Workflow Demo with Prefect + +This repository demonstrates the integration of Prefect with interactive workflows, aiming to provide a hands-on example for orchestrating complex data pipelines interactively. It showcases how to leverage Prefect's powerful scheduling, execution, and management capabilities to build efficient and scalable data workflows. Prefect allows for dynamic data cleaning and kicking off jobs based on human involvement. This repo can be great for folks looking to add in more QA steps to their pre-existing pipelines. + +## Overview + +Interactive Workflow Demo is designed to guide you through the process of setting up and running interactive data pipelines using Prefect. It covers the basics of Prefect's workflow orchestration capabilities, including the use of tasks, flows, parameters, and Prefect Cloud for monitoring and managing workflows. Human interaction allows for greater oversight over automated processes. Users can intervene in workflows when necessary, for example, to handle exceptions, perform approvals, or make decisions that are too complex or sensitive for automated systems. This adds an important layer of control, ensuring that operations conform to business rules and regulatory requirements. Human operators can modify workflows based on real-time insights and changing conditions, which might be outside the scope of predefined automation rules. + +## Features + +- **Prefect Integration**: Learn how to integrate Prefect into Python-based workflows, ensuring smooth operation and scalability. +- **Interactive Workflows**: Dive into creating workflows that can dynamically adjust based on user input or external data. +- **Task Management**: Understand how to define, organize, and manage tasks within Prefect to create comprehensive data pipelines. +- **Visualization**: Explore how to use Prefect's UI for monitoring and visualizing workflow execution. + +## Getting Started + +### Prerequisites + +Before starting, ensure you have the following installed: +- Prefect +- Marvin +- Any other dependencies listed in `requirements.txt`. + +### Setup + +1. Clone this repository: +```bash +git clone https://github.com/Sahiler/interactive-workflow-demo.git +cd interactive-workflow-demo +``` +2. Set up a virtual environment (optional but recommended) + +```bash +python -m venv venv +source venv/bin/activate # On Windows use `venv\Scripts\activate` +``` +3. Install dependencies +```bash +pip install -r requirements.txt +``` + +1. **Start Prefect** + + Make sure Prefect is running and you are logged into Prefect Cloud or your local Prefect instance. + +2. **Prefect & Marvin environment is set up** + +Ensure your [S3Bucket](https://prefecthq.github.io/prefect-aws/s3/) block is correctly set up to upload files to that specific region. Additionally, ensure you have the OpenAI api key correctly loaded into [Marvin](https://www.askmarvin.ai/welcome/installation/). Once you are authenticated to the correct workspace, you are good to go to run the workflow. + +2. **Execute the Deployment script** + + Run the main script to execute the interactive workflow + +```bash +python interactive-workflows.py +``` +3. **Follow the interactive prompts to navigate through the workflow.** + +## Understanding the Workflow +#### 1. deploy-interactive-workflow.py + +- **Purpose**: Deploys an interactive workflow using Prefect to a managed execution environment. +- **Key Components**: Prefect flows, job variable configuration for Python packages. +- **Usage**: Run this script to deploy the workflow defined in the `interactive-workflow.py` script to an execution environment. Ensure you have Prefect and other dependencies installed. + +#### 2. interactive-workflow.py + +- **Purpose**: Defines an interactive data processing workflow that fetches data from an API, allows users to specify data cleaning preferences, and manages user approvals for further actions. +- **Key Components**: Prefect flows and tasks, user interaction for data processing, logging and error handling. +- **Dependencies**: Requires `marvin_extension` module, `requests` for API calls, and Prefect libraries for workflow management. +- **Usage**: Run this script in an environment where Prefect is set up and the `marvin_extension.py` is accessible. + +#### 3. marvin_extension.py + +- **Purpose**: Provides custom functionalities used by the interactive workflow, likely data processing or feature extraction methods tailored for specific needs. +- **Usage**: This script is imported and used in `interactive-workflow.py`. It should be present in the same directory or properly installed as a module. + +### Acknowledgments +Thanks to the Prefect community for providing the tools and support to make this demo possible. +Join us in exploring the capabilities of Prefect with interactive workflows, and see how it can transform your data pipeline management and execution. + diff --git a/flows/aws/interactive-workflow/deploy.py b/flows/aws/interactive-workflow/deploy.py new file mode 100644 index 0000000..bb23092 --- /dev/null +++ b/flows/aws/interactive-workflow/deploy.py @@ -0,0 +1,10 @@ +from prefect import flow + +flow.from_source( + source="https://github.com/Sahiler/interactive-workflow-demo.git", + entrypoint="interactive.py:interactive", +).deploy( + name="interactive-workflow-demo", + work_pool_name="managed-execution", + job_variables={"pip_packages": ["marvin", "prefect-aws"]}, +) diff --git a/flows/machine_learning/interactive-workflows/interactive-workflow.py b/flows/aws/interactive-workflow/interactive-workflow.py similarity index 86% rename from flows/machine_learning/interactive-workflows/interactive-workflow.py rename to flows/aws/interactive-workflow/interactive-workflow.py index ea7cf24..da99c86 100644 --- a/flows/machine_learning/interactive-workflows/interactive-workflow.py +++ b/flows/aws/interactive-workflow/interactive-workflow.py @@ -23,8 +23,8 @@ ] -class CreateArtifact(RunInput): - create_artifact: bool = Field(description="Would you like to create an artifact?") +class UserApproval(RunInput): + approve: bool = Field(description="Would you like to approve?") class CleanedInput(RunInput): @@ -56,7 +56,6 @@ def user_input_remove_features(url: str): raw_data = fetch(url) features = "\n".join(raw_data.get("results")[0].keys()) - description_md = ( "## Features available:" f"\n```json{features}\n```\n" @@ -75,18 +74,18 @@ def user_input_remove_features(url: str): def create_artifact(): features = JSON.load("all-users-json").value description_md = ( - "### Features available:\n" + "### Artifact Object:\n" f"```{features}```\n" "### Would you like to create an artifact?" ) logger = get_run_logger() create_artifact_input = pause_flow_run( - wait_for_input=CreateArtifact.with_initial_data( - description=description_md, create_artifact=False + wait_for_input=UserApproval.with_initial_data( + description=description_md, approve=False ) ) - if create_artifact_input.create_artifact: + if create_artifact_input.approve: logger.info("Report approved! Creating artifact...") create_table_artifact( key="table-of-users", table=JSON.load("all-users-json").value @@ -120,7 +119,13 @@ def create_names(): return df -if __name__ == "__main__": - list_of_names = create_names() +@flow(name="Interactive Workflow") +def interactive(): + create_names() create_artifact() - ai_functions.extract_information() + results = ai_functions.extract_information() + ai_functions.upload_to_s3(results) + + +if __name__ == "__main__": + interactive() diff --git a/flows/aws/interactive-workflow/marvin_extension.py b/flows/aws/interactive-workflow/marvin_extension.py new file mode 100644 index 0000000..3c173f8 --- /dev/null +++ b/flows/aws/interactive-workflow/marvin_extension.py @@ -0,0 +1,113 @@ +import marvin +from prefect import flow, get_run_logger, pause_flow_run, task +from prefect.blocks.system import JSON, Secret +from prefect.input import RunInput +from prefect.variables import Variable +from prefect_aws.s3 import S3Bucket +from pydantic import BaseModel, Field, constr +from datetime import datetime + + +DEFAULT_EXTRACT_QUERY = "Group by location and count the number of users in each location." # Create a table of a users name, location, coordinates, and continent the user is located +GENERATE_SUGGESTED_FILE_NAME = ( + "Output a 10-letter single word that describes the user's query: " +) + + +class userApprovalAndFileName(RunInput): + file_name: str + approve: bool = Field(description="Would you like to approve?") + + +class InputQuery(RunInput): + input_instructions: str + + +class GeneratedFileName(BaseModel): + fixed_length_string: constr(pattern=r"^[a-zA-Z]+$", min_length=10, max_length=10) + + +@flow(name="Extract User Insights") +def extract_information(): + secret_block = Secret.load("openai-creds-interactive-workflows") + marvin.settings.openai.api_key = secret_block.get() + logger = get_run_logger() + + features = JSON.load("all-users-json").value + description_md = ( + "### Features available:\n" + f"```{features}```\n" + "### Please provide a query to extract user insights.\n" + ) + + logger = get_run_logger() + user_input = pause_flow_run( + wait_for_input=InputQuery.with_initial_data( + description=description_md, + input_instructions=DEFAULT_EXTRACT_QUERY, + ) + ) + + logger.info( + f""" + Extracting user insights... \n + User input: {user_input.input_instructions} + """ + ) + result = marvin.extract( + features, + target=str, + instructions=user_input.input_instructions, + ) + Variable.set(name="user_query", value=user_input.input_instructions, overwrite=True) + + logger.info(f"Query results: {result}") + return result + + +@task(name="Generate Suggested File Name") +def generate_suggested_file_name(results): + user_query = Variable.get("user_query") + instructions = f"{GENERATE_SUGGESTED_FILE_NAME} + {user_query}" + marvin_annotated_file_name = marvin.extract( + results, + target=GeneratedFileName, + instructions=instructions, + ) + + output_file_name = marvin_annotated_file_name[0].fixed_length_string + Variable.set(name=output_file_name, value=output_file_name, overwrite=True) + return output_file_name + + +@flow(name="Upload to S3") +def upload_to_s3(results): + logger = get_run_logger() + logger.info(f"Uploading to S3: {results}") + + description_md = ( + "## Query results:\n" + f"```{results}```\n" + "## Would you like to upload the results to s3?\n" + "### Please provide a file name based on the query from results.\n" + "### A suggestion is provided:" + ) + + # extract providing flaky results, in the meantime just use a static file name + # output_file_name = generate_suggested_file_name(results) + output_file_name = f"{datetime.now()}_marvin_extracted_results" + + upload_to_s3_input = pause_flow_run( + wait_for_input=userApprovalAndFileName.with_initial_data( + description=description_md, file_name=output_file_name, approve=False + ) + ) + output_file_name = upload_to_s3_input.file_name + + if upload_to_s3_input.approve: + s3_havanese_path = s3_bucket_block.write_path(path=f"./{output_file_name}.txt", content=str(results)) + + else: + raise Exception("User did not approve") + + return results diff --git a/flows/machine_learning/interactive-workflows/requirements.txt b/flows/aws/interactive-workflow/requirements.txt similarity index 55% rename from flows/machine_learning/interactive-workflows/requirements.txt rename to flows/aws/interactive-workflow/requirements.txt index 6babbeb..cc2c9c3 100644 --- a/flows/machine_learning/interactive-workflows/requirements.txt +++ b/flows/aws/interactive-workflow/requirements.txt @@ -1,2 +1,3 @@ prefect marvin +prefect-aws diff --git a/flows/machine_learning/interactive-workflows/README.md b/flows/machine_learning/interactive-workflows/README.md deleted file mode 100644 index 4fe87ad..0000000 --- a/flows/machine_learning/interactive-workflows/README.md +++ /dev/null @@ -1,66 +0,0 @@ -# Interactive Workflow Demo with Prefect - -This repository demonstrates the integration of Prefect with interactive workflows, aiming to provide a hands-on example for orchestrating complex data pipelines interactively. It showcases how to leverage Prefect's powerful scheduling, execution, and management capabilities to build efficient and scalable data workflows. - -## Overview - -Interactive Workflow Demo is designed to guide you through the process of setting up and running interactive data pipelines using Prefect. It covers the basics of Prefect's workflow orchestration capabilities, including the use of tasks, flows, parameters, and Prefect Cloud for monitoring and managing workflows. - -## Features - -- **Prefect Integration**: Learn how to integrate Prefect into Python-based workflows, ensuring smooth operation and scalability. -- **Interactive Workflows**: Dive into creating workflows that can dynamically adjust based on user input or external data. -- **Task Management**: Understand how to define, organize, and manage tasks within Prefect to create comprehensive data pipelines. -- **Visualization**: Explore how to use Prefect's UI for monitoring and visualizing workflow execution. - -## Getting Started - -### Prerequisites - -Before starting, ensure you have the following installed: -- Python 3.6+ -- Prefect -- Any other dependencies listed in `requirements.txt`. - -### Installation - -1. Clone this repository: -```bash -git clone https://github.com/Sahiler/interactive-workflow-demo.git -cd interactive-workflow-demo -``` -2. Set up a virtual environment (optional but recommended) - -```bash -python -m venv venv -source venv/bin/activate # On Windows use `venv\Scripts\activate` -``` -3. Install dependencies -```bash -pip install -r requirements.txt -``` - -### Running the Demo -1. **Start Prefect** - - Make sure Prefect is running and you are logged into Prefect Cloud or your local Prefect instance. - -2. **Execute the Workflow** - - Run the main script to execute the interactive workflow - -```bash -python interactive-workflows.py -``` -3. **Follow the interactive prompts to navigate through the workflow.** - -### Understanding the Workflow -The demo includes detailed comments explaining each step of the workflow and how it integrates with Prefect's features. Pay special attention to the use of blocks for reusable logic and artifacts for visualizing workflow outputs. - -### Contributing -We welcome contributions to improve this demo! Please feel free to fork the repository, make your changes, and submit a pull request. Whether it's adding new features, fixing bugs, or improving the documentation, your contributions are highly appreciated. - -### Acknowledgments -Thanks to the Prefect community for providing the tools and support to make this demo possible. -Join us in exploring the capabilities of Prefect with interactive workflows, and see how it can transform your data pipeline management and execution. - diff --git a/flows/machine_learning/interactive-workflows/marvin_extension.py b/flows/machine_learning/interactive-workflows/marvin_extension.py deleted file mode 100644 index b74d547..0000000 --- a/flows/machine_learning/interactive-workflows/marvin_extension.py +++ /dev/null @@ -1,44 +0,0 @@ -import marvin -from prefect import flow, get_run_logger, pause_flow_run -from prefect.blocks.system import JSON, Secret -from prefect.input import RunInput - -DEFAULT_EXTRACT_QUERY = "Group by location and count the number of users in each location." # Create a table of a users name, location, coordinates, and continent the user is located - - -class InputQuery(RunInput): - input_instructions: str - - -@flow(name="Extract User Insights") -def extract_information(): - secret_block = Secret.load("openai-creds-interactive-workflows") - marvin.settings.openai.api_key = secret_block.get() - - description_md = f""" - The most recent user information: {JSON.load("all-users-json")} - What would you like to gain insights on? - """ - logger = get_run_logger() - user_input = pause_flow_run( - wait_for_input=InputQuery.with_initial_data( - description=description_md, - input_instructions=DEFAULT_EXTRACT_QUERY, - ) - ) - - logger = get_run_logger() - - logger.info( - f""" - Extracting user insights... \n - User input: {user_input.input_instructions} - """ - ) - result = marvin.extract( - JSON.load("all-users-json"), - target=str, - instructions=user_input.input_instructions, - ) - logger.info(f"Query results: {result}") - return result