diff --git a/dagster_pipes.zip b/dagster_pipes.zip new file mode 100644 index 0000000000000..e3d84dec45601 Binary files /dev/null and b/dagster_pipes.zip differ diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 511f75a02c426..608d7e367ab3a 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index 223ce9d8c80ec..2e0014c61518b 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 0ede0a2ce9625..8ca2237877fb2 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index b72f3e38eea76..a676a598d34fa 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/Dockerfile b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/Dockerfile new file mode 100644 index 0000000000000..469c05e752c49 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/Dockerfile @@ -0,0 +1,27 @@ +# this Dockerfile can be used to create a venv archive for PySpark on AWS EMR + +FROM amazonlinux:2 AS builder + +RUN yum install -y python3 + +WORKDIR /build + +COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv + +ENV VIRTUAL_ENV=/build/.venv +ENV PATH="$VIRTUAL_ENV/bin:$PATH" + +RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16 + +RUN uv venv .venv + +RUN uv pip install pex dagster-pipes boto3 pyspark + +RUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex + +# test imports +RUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;" + +FROM scratch AS export + +COPY --from=builder /output/venv.pex /venv.pex diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/dagster_code.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/dagster_code.py index 9fbf4f54d4083..2bf3ba15323c0 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/dagster_code.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/dagster_code.py @@ -2,35 +2,57 @@ import os import boto3 -from dagster_aws.pipes import PipesEMRClient +from dagster_aws.pipes import PipesEMRClient, PipesS3MessageReader +from mypy_boto3_emr.type_defs import InstanceFleetTypeDef from dagster import AssetExecutionContext, asset @asset -def glue_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient): +def emr_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient): return pipes_emr_client.run( context=context, run_job_flow_params={ - "Name": "Example Job", + "Name": "Dagster Pipes", + "LogUri": "s3://aws-glue-assets-467123434025-eu-north-1/emr/logs", + "JobFlowRole": "arn:aws:iam::467123434025:instance-profile/AmazonEMR-InstanceProfile-20241001T134828", + "ServiceRole": "arn:aws:iam::467123434025:role/service-role/AmazonEMR-ServiceRole-20241001T134845", + "ReleaseLabel": "emr-7.3.0", "Instances": { "MasterInstanceType": "m5.xlarge", "SlaveInstanceType": "m5.xlarge", "InstanceCount": 3, + "Ec2KeyName": "YubiKey", }, + "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}], + "StepConcurrencyLevel": 1, "Steps": [ { - "Name": "Example Step", - "ActionOnFailure": "CONTINUE", + "Name": "Main", + "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", - "example.py", + "--master", + "yarn", + "--files", + "s3://aws-glue-assets-467123434025-eu-north-1/envs/emr/pipes/venv.pex", + "--conf", + "spark.pyspark.python=./venv.pex", + "--conf", + "spark.yarn.submit.waitAppCompletion=true", + "s3://aws-glue-assets-467123434025-eu-north-1/envs/emr/pipes/script.py", ], }, + }, + ], + "Tags": [ + { + "Key": "for-use-with-amazon-emr-managed-policies", + "Value": "true", } ], }, @@ -45,8 +67,14 @@ def glue_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRC defs = Definitions( - assets=[glue_pipes_asset], - resources={"pipes_emr_client": PipesEMRClient()}, + assets=[emr_pipes_asset], + resources={ + "pipes_emr_client": PipesEMRClient( + message_reader=PipesS3MessageReader( + bucket=os.environ["DAGSTER_PIPES_BUCKET"] + ) + ) + }, ) # end_definitions_marker diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/emr_script.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/emr_script.py deleted file mode 100644 index df7c61a10fd4d..0000000000000 --- a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/emr_script.py +++ /dev/null @@ -1,26 +0,0 @@ -import boto3 -from dagster_pipes import ( - PipesCliArgsParamsLoader, - PipesS3ContextLoader, - open_dagster_pipes, -) - -client = boto3.client("s3") -context_loader = PipesS3ContextLoader(client) -params_loader = PipesCliArgsParamsLoader() - - -def main(): - with open_dagster_pipes( - context_loader=context_loader, - params_loader=params_loader, - ) as pipes: - pipes.log.info("Hello from AWS Glue job!") - pipes.report_asset_materialization( - metadata={"some_metric": {"raw_value": 0, "type": "int"}}, - data_version="alpha", - ) - - -if __name__ == "__main__": - main() diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/script.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/script.py new file mode 100644 index 0000000000000..d211c4ee85b4f --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/script.py @@ -0,0 +1,34 @@ +import boto3 +from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes +from pyspark.sql import SparkSession + + +def main(): + with open_dagster_pipes( + message_writer=PipesS3MessageWriter(client=boto3.client("s3")) + ) as pipes: + pipes.log.info("Hello from AWS EMR!") + + spark = SparkSession.builder.appName("HelloWorld").getOrCreate() + + df = spark.createDataFrame( + [(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)], + ["id", "name", "age"], + ) + + # calculate a really important statistic + avg_age = float(df.agg({"age": "avg"}).collect()[0][0]) + + # attach it to the asset materialization in Dagster + pipes.report_asset_materialization( + metadata={"average_age": {"raw_value": avg_age, "type": "float"}}, + data_version="alpha", + ) + + spark.stop() + + print("Hello from stdout!") # noqa: T201 + + +if __name__ == "__main__": + main() diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/upload_artifacts.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/upload_artifacts.py new file mode 100644 index 0000000000000..7a5c243676efe --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/upload_artifacts.py @@ -0,0 +1,46 @@ +# this script can be used to pack and upload a python .pex file to an s3 bucket +# requires docker and AWS CLI + +import argparse +import os +import subprocess +import sys +import tempfile +from pathlib import Path + +SCRIPT_DIR = Path(__file__).parent + +REQUIREMENTS_TXT = SCRIPT_DIR / "requirements.txt" +DAGSTER_DIR = Path(*SCRIPT_DIR.parts[: SCRIPT_DIR.parts.index("examples")]) + +DAGSTER_PIPES_DIR = DAGSTER_DIR / "python_modules/dagster-pipes" + +parser = argparse.ArgumentParser(description="Upload a python virtualenv to an s3 path") +parser.add_argument( + "--python", type=str, help="python version to use", default="3.9.16" +) +parser.add_argument( + "--s3-dir", type=str, help="s3 directory to copy files into", required=True +) + + +def main(): + args = parser.parse_args() + + with tempfile.TemporaryDirectory() as temp_dir: + os.chdir(temp_dir) + subprocess.run( + " && \\\n".join( + [ + f"DOCKER_BUILDKIT=1 docker build --output type=local,dest=./output -f {SCRIPT_DIR}/Dockerfile .", + f"aws s3 cp ./output/venv.pex {os.path.join(args.s3_dir, 'venv.pex')}", + f"aws s3 cp {SCRIPT_DIR / 'script.py'} {os.path.join(args.s3_dir, 'script.py')}", + ] + ), + shell=True, + check=True, + ) + + +if __name__ == "__main__": + main()