-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8711ece
commit e9a2b0e
Showing
10 changed files
with
143 additions
and
34 deletions.
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
27 changes: 27 additions & 0 deletions
27
examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/Dockerfile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# this Dockerfile can be used to create a venv archive for PySpark on AWS EMR | ||
|
||
FROM amazonlinux:2 AS builder | ||
|
||
RUN yum install -y python3 | ||
|
||
WORKDIR /build | ||
|
||
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv | ||
|
||
ENV VIRTUAL_ENV=/build/.venv | ||
ENV PATH="$VIRTUAL_ENV/bin:$PATH" | ||
|
||
RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16 | ||
|
||
RUN uv venv .venv | ||
|
||
RUN uv pip install pex dagster-pipes boto3 pyspark | ||
|
||
RUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex | ||
|
||
# test imports | ||
RUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;" | ||
|
||
FROM scratch AS export | ||
|
||
COPY --from=builder /output/venv.pex /venv.pex |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
26 changes: 0 additions & 26 deletions
26
examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/emr_script.py
This file was deleted.
Oops, something went wrong.
34 changes: 34 additions & 0 deletions
34
examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/script.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import boto3 | ||
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes | ||
from pyspark.sql import SparkSession | ||
|
||
|
||
def main(): | ||
with open_dagster_pipes( | ||
message_writer=PipesS3MessageWriter(client=boto3.client("s3")) | ||
) as pipes: | ||
pipes.log.info("Hello from AWS EMR!") | ||
|
||
spark = SparkSession.builder.appName("HelloWorld").getOrCreate() | ||
|
||
df = spark.createDataFrame( | ||
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)], | ||
["id", "name", "age"], | ||
) | ||
|
||
# calculate a really important statistic | ||
avg_age = float(df.agg({"age": "avg"}).collect()[0][0]) | ||
|
||
# attach it to the asset materialization in Dagster | ||
pipes.report_asset_materialization( | ||
metadata={"average_age": {"raw_value": avg_age, "type": "float"}}, | ||
data_version="alpha", | ||
) | ||
|
||
spark.stop() | ||
|
||
print("Hello from stdout!") # noqa: T201 | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
46 changes: 46 additions & 0 deletions
46
examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/upload_artifacts.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# this script can be used to pack and upload a python .pex file to an s3 bucket | ||
# requires docker and AWS CLI | ||
|
||
import argparse | ||
import os | ||
import subprocess | ||
import sys | ||
import tempfile | ||
from pathlib import Path | ||
|
||
SCRIPT_DIR = Path(__file__).parent | ||
|
||
REQUIREMENTS_TXT = SCRIPT_DIR / "requirements.txt" | ||
DAGSTER_DIR = Path(*SCRIPT_DIR.parts[: SCRIPT_DIR.parts.index("examples")]) | ||
|
||
DAGSTER_PIPES_DIR = DAGSTER_DIR / "python_modules/dagster-pipes" | ||
|
||
parser = argparse.ArgumentParser(description="Upload a python virtualenv to an s3 path") | ||
parser.add_argument( | ||
"--python", type=str, help="python version to use", default="3.9.16" | ||
) | ||
parser.add_argument( | ||
"--s3-dir", type=str, help="s3 directory to copy files into", required=True | ||
) | ||
|
||
|
||
def main(): | ||
args = parser.parse_args() | ||
|
||
with tempfile.TemporaryDirectory() as temp_dir: | ||
os.chdir(temp_dir) | ||
subprocess.run( | ||
" && \\\n".join( | ||
[ | ||
f"DOCKER_BUILDKIT=1 docker build --output type=local,dest=./output -f {SCRIPT_DIR}/Dockerfile .", | ||
f"aws s3 cp ./output/venv.pex {os.path.join(args.s3_dir, 'venv.pex')}", | ||
f"aws s3 cp {SCRIPT_DIR / 'script.py'} {os.path.join(args.s3_dir, 'script.py')}", | ||
] | ||
), | ||
shell=True, | ||
check=True, | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |