Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-aws] [docs] add docs for PipesEMRClient #25011

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added dagster_pipes.zip
Binary file not shown.
8 changes: 8 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@
"title": "Dagster Pipes + AWS Glue",
"path": "/concepts/dagster-pipes/aws-glue"
},
{
"title": "Dagster Pipes + AWS EMR",
"path": "/concepts/dagster-pipes/aws-emr"
},
{
"title": "Dagster Pipes + AWS EMR Serverless",
"path": "/concepts/dagster-pipes/aws-emr-serverless"
},
{
"title": "Dagster Pipes + AWS Lambda",
"path": "/concepts/dagster-pipes/aws-lambda"
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
8 changes: 8 additions & 0 deletions docs/content/concepts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ Dagster Pipes is a toolkit for building integrations between Dagster and externa
title="Dagster Pipes + AWS Glue"
href="/concepts/dagster-pipes/aws-glue"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS EMR"
href="/concepts/dagster-pipes/aws-emr"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS EMR Serverless"
href="/concepts/dagster-pipes/aws-emr-serverless"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS Lambda"
href="/concepts/dagster-pipes/aws-lambda"
Expand Down
4 changes: 2 additions & 2 deletions docs/content/concepts/dagster-pipes/aws-ecs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides

## Prerequisites

- **In the orchestration environment**, you'll need to:
- **In the Dagster environment**, you'll need to:

- Install the following packages:

Expand All @@ -32,7 +32,7 @@ The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides

---

## Step 1: Install the dagster-pipes module
## Step 1: Install the dagster-pipes module in your ECS environment

Install the `dagster-pipes` module in the image used for your ECS task. For example, you can install the dependency with `pip` in your image Dockerfile:

Expand Down
4 changes: 2 additions & 2 deletions docs/content/concepts/dagster-pipes/aws-emr-serverless.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides

## Prerequisites

- **In the orchestration environment**, you'll need to:
- **In the Dagster environment**, you'll need to:

- Install the following packages:

Expand All @@ -40,7 +40,7 @@ The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides

---

## Step 1: Install the dagster-pipes module
## Step 1: Install the dagster-pipes module in your EMR Serverless environment

There are a [few options](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-python-libraries.html) available for shipping Python packages to a PySpark job. For example, [install it in your Docker image](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/application-custom-image.html):

Expand Down
199 changes: 199 additions & 0 deletions docs/content/concepts/dagster-pipes/aws-emr.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
---
title: "Integrating AWS EMR with Dagster Pipes | Dagster Docs"
description: "Learn to integrate Dagster Pipes with AWS EMR to launch external code from Dagster assets."
---

# AWS EMR & Dagster Pipes

This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS EMR](https://aws.amazon.com/emr/).

The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesEMRClient" module="dagster_aws.pipes" /> resource, which can be used to launch AWS EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.

---

## Prerequisites

- **In the Dagster environment**, you'll need to:

- Install the following packages:

```shell
pip install dagster dagster-webserver dagster-aws
```

Refer to the [Dagster installation guide](/getting-started/install) for more info.

- **AWS authentication credentials configured.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html).

- **In AWS**:

- An existing AWS account
- Prepared infrastructure such as S3 buckets, IAM roles, and other resources required for your EMR job

---

## Step 1: Install the dagster-pipes module in your EMR environment

Choose one of the [options](https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#python-package-management) to install `dagster-pipes` in the EMR environment.

For example, this `Dockerfile` can be used to package all required dependencies into a single [PEX](https://docs.pex-tool.org/) file (in practice, the most straightforward way to package Python dependencies for EMR jobs):

```Dockerfile file=/guides/dagster/dagster_pipes/emr/Dockerfile
# this Dockerfile can be used to create a venv archive for PySpark on AWS EMR

FROM amazonlinux:2 AS builder

RUN yum install -y python3

WORKDIR /build

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

ENV VIRTUAL_ENV=/build/.venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16

RUN uv venv .venv

RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install pex dagster-pipes boto3 pyspark

RUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex

# test imports
RUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;"

FROM scratch AS export

COPY --from=builder /output/venv.pex /venv.pex
```

The build can be launched with:

```shell
DOCKER_BUILDKIT=1 docker build --output type=local,dest=./output .
```

Then, upload the produced `output/venv.pix` file to an S3 bucket:

```shell
aws s3 cp output/venv.pex s3://your-bucket/venv.pex
```

Finally, use the `--files` and `spark.pyspark.python` options to specify the path to the PEX file in the `spark-submit` command:

```shell
spark-submit ... --files s3://your-bucket/venv.pex --conf spark.pyspark.python=./venv.pex
```

---

## Step 2: Add dagster-pipes to the EMR job script

Call `open_dagster_pipes` in the EMR script to create a context that can be used to send messages to Dagster:

```python file=/guides/dagster/dagster_pipes/emr/script.py
import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession


def main():
with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=boto3.client("s3"))
) as pipes:
pipes.log.info("Hello from AWS EMR!")

spark = SparkSession.builder.appName("HelloWorld").getOrCreate()

df = spark.createDataFrame(
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
["id", "name", "age"],
)

# calculate a really important statistic
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])

# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
data_version="alpha",
)

spark.stop()

print("Hello from stdout!")


if __name__ == "__main__":
main()
```

---

## Step 3: Create an asset using the PipesEMRClient to launch the job

In the Dagster asset/op code, use the `PipesEMRClient` resource to launch the job:

```python file=/guides/dagster/dagster_pipes/emr/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker
import os

import boto3
from dagster_aws.pipes import PipesEMRClient, PipesS3MessageReader
from mypy_boto3_emr.type_defs import InstanceFleetTypeDef

from dagster import AssetExecutionContext, asset


@asset
def emr_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient):
return pipes_emr_client.run(
context=context,
# see full reference here: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr/client/run_job_flow.html#EMR.Client.run_job_flow
run_job_flow_params={},
).get_materialize_result()
```

This will launch the AWS EMR job and wait for it completion. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.

EMR application steps `stdout` and `stderr` will be forwarded to the Dagster process.

---

## Step 4: Create Dagster definitions

Next, add the `PipesEMRClient` resource to your project's <PyObject object="Definitions" /> object:

```python file=/guides/dagster/dagster_pipes/emr/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions # noqa


defs = Definitions(
assets=[emr_pipes_asset],
resources={
"pipes_emr_client": PipesEMRClient(
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=os.environ["DAGSTER_PIPES_BUCKET"]
)
)
},
)
```

Dagster will now be able to launch the AWS EMR job from the `emr_asset` asset, and receive logs and events from the job.

---

## Related

<ArticleList>
<ArticleListItem
title="Dagster Pipes"
href="/concepts/dagster-pipes"
></ArticleListItem>
<ArticleListItem
title="AWS EMR Pipes API reference"
href="/_apidocs/libraries/dagster-aws#dagster_aws.pipes.PipesEMRClient"
></ArticleListItem>
</ArticleList>
4 changes: 2 additions & 2 deletions docs/content/concepts/dagster-pipes/aws-glue.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides

## Prerequisites

- **In the orchestration environment**, you'll need to:
- **In the Dagster environment**, you'll need to:

- Install the following packages:

Expand All @@ -32,7 +32,7 @@ The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides

---

## Step 1: Provide the dagster-pipes module
## Step 1: Provide the dagster-pipes module in your Glue environment

Provide the `dagster-pipes` module to the AWS Glue job either by installing it in the Glue job environment or packaging it along with the job script.

Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/dagster-pipes/aws-lambda.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Pipes allows your code to interact with Dagster outside of a full Dagster enviro

To use Dagster Pipes with AWS Lambda, you’ll need:

- **In the orchestration environment**, you'll need to:
- **In the Dagster environment**, you'll need to:

- Install the following packages:

Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/dagster-pipes/databricks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Pipes allows your Databricks jobs to stream logs (including `stdout` and `stderr

To use Dagster Pipes with Databricks:

- **In the orchestration environment**, you'll need to install the following packages:
- **In the Dagster environment**, you'll need to install the following packages:

```shell
pip install dagster dagster-webserver dagster-databricks
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/dagster-pipes/kubernetes.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Pipes allows your code to interact with Dagster outside of a full Dagster enviro

To use Dagster Pipes with Kubernetes, you’ll need:

- **In the orchestration environment**, you'll need to install the following packages:
- **In the Dagster environment**, you'll need to install the following packages:

```shell
pip install dagster dagster-webserver dagster-k8s
Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
4 changes: 4 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ Clients

.. autoclass:: dagster_aws.pipes.PipesECSClient

.. autoclass:: dagster_aws.pipes.PipesEMRClient

.. autoclass:: dagster_aws.pipes.PipesEMRServerlessClient
Copy link
Contributor Author

@danielgafni danielgafni Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops... was missing


Legacy
--------

Expand Down
4 changes: 3 additions & 1 deletion docs/vale/styles/config/vocabularies/Dagster/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ AWS
AWS Athena
AWS CloudWatch
AWS Glue
AWS EMR
AWS EMR Serverless
AWS Lambda
AWS Redshift
AWS Secrets Manager
Expand Down Expand Up @@ -152,4 +154,4 @@ backfills
anonymized
boolean
python_file
dev
dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# this Dockerfile can be used to create a venv archive for PySpark on AWS EMR

FROM amazonlinux:2 AS builder

RUN yum install -y python3

WORKDIR /build

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

ENV VIRTUAL_ENV=/build/.venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16

RUN uv venv .venv

RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install pex dagster-pipes boto3 pyspark

RUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex

# test imports
RUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;"

FROM scratch AS export

COPY --from=builder /output/venv.pex /venv.pex
Loading