Skip to content

Commit

Permalink
edit docs
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 18, 2024
1 parent 968ebbf commit 614c8da
Show file tree
Hide file tree
Showing 16 changed files with 298 additions and 61 deletions.
Binary file added dagster_pipes.zip
Binary file not shown.
8 changes: 8 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@
"title": "Dagster Pipes + AWS Glue",
"path": "/concepts/dagster-pipes/aws-glue"
},
{
"title": "Dagster Pipes + AWS EMR",
"path": "/concepts/dagster-pipes/aws-emr"
},
{
"title": "Dagster Pipes + AWS EMR Serverless",
"path": "/concepts/dagster-pipes/aws-emr-serverless"
},
{
"title": "Dagster Pipes + AWS Lambda",
"path": "/concepts/dagster-pipes/aws-lambda"
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
8 changes: 8 additions & 0 deletions docs/content/concepts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ Dagster Pipes is a toolkit for building integrations between Dagster and externa
title="Dagster Pipes + AWS Glue"
href="/concepts/dagster-pipes/aws-glue"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS EMR"
href="/concepts/dagster-pipes/aws-emr"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS EMR Serverless"
href="/concepts/dagster-pipes/aws-emr-serverless"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS Lambda"
href="/concepts/dagster-pipes/aws-lambda"
Expand Down
193 changes: 193 additions & 0 deletions docs/content/concepts/dagster-pipes/aws-emr.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
---
title: "Integrating AWS EMR with Dagster Pipes | Dagster Docs"
description: "Learn to integrate Dagster Pipes with AWS EMR to launch external code from Dagster assets."
---

# AWS EMR & Dagster Pipes

This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS EMR](https://aws.amazon.com/emr/).

The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesEMRClient" module="dagster_aws.pipes" /> resource, which can be used to launch AWS EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.

---

## Prerequisites

- **In the orchestration environment**, you'll need to:

- Install the following packages:

```shell
pip install dagster dagster-webserver dagster-aws
```

Refer to the [Dagster installation guide](/getting-started/install) for more info.

- **AWS authentication credentials configured.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html).
- **In AWS**:
- An existing AWS account
- Prepared infrastructure such as S3 buckets, IAM roles, and other resources required for your EMR job
---
## Step 1: Install the dagster-pipes module
Choose one of the [options](https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#python-package-management) to install `dagster-pipes` in the EMR environment.
For example, this `Dockerfile` can be used to package all required dependencies into a single [PEX](https://docs.pex-tool.org/) file (in practice, the most straightforward way to package Python dependencies for EMR jobs):
```Dockerfile file=/guides/dagster/dagster_pipes/emr/Dockerfile
# this Dockerfile can be used to create a venv archive for PySpark on AWS EMR
FROM amazonlinux:2 AS builder
RUN yum install -y python3
WORKDIR /build
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv
ENV VIRTUAL_ENV=/build/.venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16
RUN uv venv .venv
RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install pex dagster-pipes boto3 pyspark
RUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex
# test imports
RUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;"
FROM scratch AS export
COPY --from=builder /output/venv.pex /venv.pex
```
The build can be launched with:
```shell
DOCKER_BUILDKIT=1 docker build --output type=local,dest=./output .
```
Then, upload the `output` directory to an S3 bucket and use the `.pex` file with `spark-submit` in your EMR step:
```shell
spark-submit ... --files s3://your-bucket/.../venv.pex --conf spark.pyspark.python=./venv.pex
```
---
## Step 2: Add dagster-pipes to the EMR job script
Call `open_dagster_pipes` in the EMR script to create a context that can be used to send messages to Dagster:
```python file=/guides/dagster/dagster_pipes/emr/script.py
import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession
def main():
with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=boto3.client("s3"))
) as pipes:
pipes.log.info("Hello from AWS EMR!")
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
df = spark.createDataFrame(
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
["id", "name", "age"],
)
# calculate a really important statistic
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])
# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
data_version="alpha",
)
spark.stop()
print("Hello from stdout!")
if __name__ == "__main__":
main()
```
---
## Step 3: Create an asset using the PipesEMRClient to launch the job
In the Dagster asset/op code, use the `PipesEMRClient` resource to launch the job:
```python file=/guides/dagster/dagster_pipes/emr/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker
import os
import boto3
from dagster_aws.pipes import PipesEMRClient, PipesS3MessageReader
from mypy_boto3_emr.type_defs import InstanceFleetTypeDef
from dagster import AssetExecutionContext, asset
@asset
def emr_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient):
return pipes_emr_client.run(
context=context,
# see full reference here: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr/client/run_job_flow.html#EMR.Client.run_job_flow
run_job_flow_params={},
).get_materialize_result()
```
This will launch the AWS EMR job and wait for it completion. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.
EMR application steps `stdout` and `stderr` will be forwarded to the Dagster process.
---
## Step 4: Create Dagster definitions
Next, add the `PipesEMRClient` resource to your project's <PyObject object="Definitions" /> object:

```python file=/guides/dagster/dagster_pipes/emr/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions # noqa
defs = Definitions(
assets=[emr_pipes_asset],
resources={
"pipes_emr_client": PipesEMRClient(
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=os.environ["DAGSTER_PIPES_BUCKET"]
)
)
},
)
```

Dagster will now be able to launch the AWS EMR job from the `emr_asset` asset, and receive logs and events from the job.

---

## Related

<ArticleList>
<ArticleListItem
title="Dagster Pipes"
href="/concepts/dagster-pipes"
></ArticleListItem>
<ArticleListItem
title="AWS EMR Pipes API reference"
href="/_apidocs/libraries/dagster-aws#dagster_aws.pipes.PipesEMRClient"
></ArticleListItem>
</ArticleList>
Binary file modified docs/next/public/objects.inv
Binary file not shown.
4 changes: 4 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ Clients

.. autoclass:: dagster_aws.pipes.PipesECSClient

.. autoclass:: dagster_aws.pipes.PipesEMRClient

.. autoclass:: dagster_aws.pipes.PipesEMRServerlessClient

Legacy
--------

Expand Down
4 changes: 3 additions & 1 deletion docs/vale/styles/config/vocabularies/Dagster/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ AWS
AWS Athena
AWS CloudWatch
AWS Glue
AWS EMR
AWS EMR Serverless
AWS Lambda
AWS Redshift
AWS Secrets Manager
Expand Down Expand Up @@ -152,4 +154,4 @@ backfills
anonymized
boolean
python_file
dev
dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# this Dockerfile can be used to create a venv archive for PySpark on AWS EMR

FROM amazonlinux:2 AS builder

RUN yum install -y python3

WORKDIR /build

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

ENV VIRTUAL_ENV=/build/.venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16

RUN uv venv .venv

RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install pex dagster-pipes boto3 pyspark

RUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex

# test imports
RUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;"

FROM scratch AS export

COPY --from=builder /output/venv.pex /venv.pex
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,18 @@
import os

import boto3
from dagster_aws.pipes import PipesEMRClient
from dagster_aws.pipes import PipesEMRClient, PipesS3MessageReader
from mypy_boto3_emr.type_defs import InstanceFleetTypeDef

from dagster import AssetExecutionContext, asset


@asset
def glue_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient):
def emr_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient):
return pipes_emr_client.run(
context=context,
run_job_flow_params={
"Name": "Example Job",
"Instances": {
"MasterInstanceType": "m5.xlarge",
"SlaveInstanceType": "m5.xlarge",
"InstanceCount": 3,
},
"Steps": [
{
"Name": "Example Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"example.py",
],
},
}
],
},
# see full reference here: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr/client/run_job_flow.html#EMR.Client.run_job_flow
run_job_flow_params={}, # type: ignore
).get_materialize_result()


Expand All @@ -45,8 +25,14 @@ def glue_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRC


defs = Definitions(
assets=[glue_pipes_asset],
resources={"pipes_emr_client": PipesEMRClient()},
assets=[emr_pipes_asset],
resources={
"pipes_emr_client": PipesEMRClient(
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=os.environ["DAGSTER_PIPES_BUCKET"]
)
)
},
)

# end_definitions_marker

This file was deleted.

Loading

0 comments on commit 614c8da

Please sign in to comment.