Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload to s3 feature added into interactive workflow #84

Closed
wants to merge 12 commits into from
82 changes: 82 additions & 0 deletions flows/aws/interactive-workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Interactive Workflow Demo with Prefect

This repository demonstrates the integration of Prefect with interactive workflows, aiming to provide a hands-on example for orchestrating complex data pipelines interactively. It showcases how to leverage Prefect's powerful scheduling, execution, and management capabilities to build efficient and scalable data workflows. Prefect allows for dynamic data cleaning and kicking off jobs based on human involvement. This repo can be great for folks looking to add in more QA steps to their pre-existing pipelines.

## Overview

Interactive Workflow Demo is designed to guide you through the process of setting up and running interactive data pipelines using Prefect. It covers the basics of Prefect's workflow orchestration capabilities, including the use of tasks, flows, parameters, and Prefect Cloud for monitoring and managing workflows. Human interaction allows for greater oversight over automated processes. Users can intervene in workflows when necessary, for example, to handle exceptions, perform approvals, or make decisions that are too complex or sensitive for automated systems. This adds an important layer of control, ensuring that operations conform to business rules and regulatory requirements. Human operators can modify workflows based on real-time insights and changing conditions, which might be outside the scope of predefined automation rules.

## Features

- **Prefect Integration**: Learn how to integrate Prefect into Python-based workflows, ensuring smooth operation and scalability.
- **Interactive Workflows**: Dive into creating workflows that can dynamically adjust based on user input or external data.
- **Task Management**: Understand how to define, organize, and manage tasks within Prefect to create comprehensive data pipelines.
- **Visualization**: Explore how to use Prefect's UI for monitoring and visualizing workflow execution.

## Getting Started

### Prerequisites

Before starting, ensure you have the following installed:
- Prefect
- Marvin
- Any other dependencies listed in `requirements.txt`.

### Setup

1. Clone this repository:
```bash
git clone https://github.com/Sahiler/interactive-workflow-demo.git
cd interactive-workflow-demo
```
2. Set up a virtual environment (optional but recommended)

```bash
python -m venv venv
source venv/bin/activate # On Windows use `venv\Scripts\activate`
```
3. Install dependencies
```bash
pip install -r requirements.txt
```

1. **Start Prefect**

Make sure Prefect is running and you are logged into Prefect Cloud or your local Prefect instance.

2. **Prefect & Marvin environment is set up**

Ensure your [S3Bucket](https://prefecthq.github.io/prefect-aws/s3/) block is correctly set up to upload files to that specific region. Additionally, ensure you have the OpenAI api key correctly loaded into [Marvin](https://www.askmarvin.ai/welcome/installation/). Once you are authenticated to the correct workspace, you are good to go to run the workflow.

2. **Execute the Deployment script**

Run the main script to execute the interactive workflow

```bash
python interactive-workflows.py
```
3. **Follow the interactive prompts to navigate through the workflow.**

## Understanding the Workflow
#### 1. deploy-interactive-workflow.py

- **Purpose**: Deploys an interactive workflow using Prefect to a managed execution environment.
- **Key Components**: Prefect flows, job variable configuration for Python packages.
- **Usage**: Run this script to deploy the workflow defined in the `interactive-workflow.py` script to an execution environment. Ensure you have Prefect and other dependencies installed.

#### 2. interactive-workflow.py

- **Purpose**: Defines an interactive data processing workflow that fetches data from an API, allows users to specify data cleaning preferences, and manages user approvals for further actions.
- **Key Components**: Prefect flows and tasks, user interaction for data processing, logging and error handling.
- **Dependencies**: Requires `marvin_extension` module, `requests` for API calls, and Prefect libraries for workflow management.
- **Usage**: Run this script in an environment where Prefect is set up and the `marvin_extension.py` is accessible.

#### 3. marvin_extension.py

- **Purpose**: Provides custom functionalities used by the interactive workflow, likely data processing or feature extraction methods tailored for specific needs.
- **Usage**: This script is imported and used in `interactive-workflow.py`. It should be present in the same directory or properly installed as a module.
Sahiler marked this conversation as resolved.
Show resolved Hide resolved

### Acknowledgments
Thanks to the Prefect community for providing the tools and support to make this demo possible.
Join us in exploring the capabilities of Prefect with interactive workflows, and see how it can transform your data pipeline management and execution.

10 changes: 10 additions & 0 deletions flows/aws/interactive-workflow/deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from prefect import flow

flow.from_source(
source="https://github.com/Sahiler/interactive-workflow-demo.git",
entrypoint="interactive.py:interactive",
).deploy(
name="interactive-workflow-demo",
work_pool_name="managed-execution",
job_variables={"pip_packages": ["marvin", "prefect-aws"]},
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
]


class CreateArtifact(RunInput):
create_artifact: bool = Field(description="Would you like to create an artifact?")
class UserApproval(RunInput):
approve: bool = Field(description="Would you like to approve?")


class CleanedInput(RunInput):
Expand Down Expand Up @@ -56,7 +56,6 @@ def user_input_remove_features(url: str):
raw_data = fetch(url)

features = "\n".join(raw_data.get("results")[0].keys())

description_md = (
"## Features available:"
f"\n```json{features}\n```\n"
Expand All @@ -75,18 +74,18 @@ def user_input_remove_features(url: str):
def create_artifact():
features = JSON.load("all-users-json").value
description_md = (
"### Features available:\n"
"### Artifact Object:\n"
f"```{features}```\n"
"### Would you like to create an artifact?"
)

logger = get_run_logger()
create_artifact_input = pause_flow_run(
wait_for_input=CreateArtifact.with_initial_data(
description=description_md, create_artifact=False
wait_for_input=UserApproval.with_initial_data(
description=description_md, approve=False
)
)
if create_artifact_input.create_artifact:
if create_artifact_input.approve:
logger.info("Report approved! Creating artifact...")
create_table_artifact(
key="table-of-users", table=JSON.load("all-users-json").value
Expand Down Expand Up @@ -120,7 +119,13 @@ def create_names():
return df


if __name__ == "__main__":
list_of_names = create_names()
@flow(name="Interactive Workflow")
def interactive():
create_names()
create_artifact()
ai_functions.extract_information()
results = ai_functions.extract_information()
ai_functions.upload_to_s3(results)


if __name__ == "__main__":
interactive()
113 changes: 113 additions & 0 deletions flows/aws/interactive-workflow/marvin_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import marvin
from prefect import flow, get_run_logger, pause_flow_run, task
from prefect.blocks.system import JSON, Secret
from prefect.input import RunInput
from prefect.variables import Variable
from prefect_aws.s3 import S3Bucket
from pydantic import BaseModel, Field, constr
from datetime import datetime


DEFAULT_EXTRACT_QUERY = "Group by location and count the number of users in each location." # Create a table of a users name, location, coordinates, and continent the user is located
GENERATE_SUGGESTED_FILE_NAME = (
"Output a 10-letter single word that describes the user's query: "
)


class userApprovalAndFileName(RunInput):
file_name: str
approve: bool = Field(description="Would you like to approve?")


class InputQuery(RunInput):
input_instructions: str


class GeneratedFileName(BaseModel):
fixed_length_string: constr(pattern=r"^[a-zA-Z]+$", min_length=10, max_length=10)


@flow(name="Extract User Insights")
def extract_information():
secret_block = Secret.load("openai-creds-interactive-workflows")
marvin.settings.openai.api_key = secret_block.get()
logger = get_run_logger()

features = JSON.load("all-users-json").value
Sahiler marked this conversation as resolved.
Show resolved Hide resolved
description_md = (
"### Features available:\n"
f"```{features}```\n"
"### Please provide a query to extract user insights.\n"
)

logger = get_run_logger()
user_input = pause_flow_run(
wait_for_input=InputQuery.with_initial_data(
description=description_md,
input_instructions=DEFAULT_EXTRACT_QUERY,
)
)

logger.info(
f"""
Extracting user insights... \n
User input: {user_input.input_instructions}
"""
)
result = marvin.extract(
features,
target=str,
instructions=user_input.input_instructions,
)
Variable.set(name="user_query", value=user_input.input_instructions, overwrite=True)

logger.info(f"Query results: {result}")
return result


@task(name="Generate Suggested File Name")
def generate_suggested_file_name(results):
user_query = Variable.get("user_query")
instructions = f"{GENERATE_SUGGESTED_FILE_NAME} + {user_query}"
marvin_annotated_file_name = marvin.extract(
results,
target=GeneratedFileName,
instructions=instructions,
)

output_file_name = marvin_annotated_file_name[0].fixed_length_string
Variable.set(name=output_file_name, value=output_file_name, overwrite=True)
return output_file_name


@flow(name="Upload to S3")
def upload_to_s3(results):
logger = get_run_logger()
logger.info(f"Uploading to S3: {results}")

description_md = (
"## Query results:\n"
f"```{results}```\n"
"## Would you like to upload the results to s3?\n"
"### Please provide a file name based on the query from results.\n"
"### A suggestion is provided:"
)

# extract providing flaky results, in the meantime just use a static file name
# output_file_name = generate_suggested_file_name(results)
output_file_name = f"{datetime.now()}_marvin_extracted_results"

upload_to_s3_input = pause_flow_run(
wait_for_input=userApprovalAndFileName.with_initial_data(
description=description_md, file_name=output_file_name, approve=False
)
)
output_file_name = upload_to_s3_input.file_name

if upload_to_s3_input.approve:
s3_havanese_path = s3_bucket_block.write_path(path=f"./{output_file_name}.txt", content=str(results))

else:
raise Exception("User did not approve")

return results
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
prefect
marvin
prefect-aws
66 changes: 0 additions & 66 deletions flows/machine_learning/interactive-workflows/README.md

This file was deleted.

This file was deleted.

Loading