diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/dagster_code.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/dagster_code.py new file mode 100644 index 0000000000000..9fbf4f54d4083 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/dagster_code.py @@ -0,0 +1,52 @@ +# start_asset_marker +import os + +import boto3 +from dagster_aws.pipes import PipesEMRClient + +from dagster import AssetExecutionContext, asset + + +@asset +def glue_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient): + return pipes_emr_client.run( + context=context, + run_job_flow_params={ + "Name": "Example Job", + "Instances": { + "MasterInstanceType": "m5.xlarge", + "SlaveInstanceType": "m5.xlarge", + "InstanceCount": 3, + }, + "Steps": [ + { + "Name": "Example Step", + "ActionOnFailure": "CONTINUE", + "HadoopJarStep": { + "Jar": "command-runner.jar", + "Args": [ + "spark-submit", + "--deploy-mode", + "cluster", + "example.py", + ], + }, + } + ], + }, + ).get_materialize_result() + + +# end_asset_marker + +# start_definitions_marker + +from dagster import Definitions # noqa + + +defs = Definitions( + assets=[glue_pipes_asset], + resources={"pipes_emr_client": PipesEMRClient()}, +) + +# end_definitions_marker diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/emr_script.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/emr_script.py new file mode 100644 index 0000000000000..df7c61a10fd4d --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr/emr_script.py @@ -0,0 +1,26 @@ +import boto3 +from dagster_pipes import ( + PipesCliArgsParamsLoader, + PipesS3ContextLoader, + open_dagster_pipes, +) + +client = boto3.client("s3") +context_loader = PipesS3ContextLoader(client) +params_loader = PipesCliArgsParamsLoader() + + +def main(): + with open_dagster_pipes( + context_loader=context_loader, + params_loader=params_loader, + ) as pipes: + pipes.log.info("Hello from AWS Glue job!") + pipes.report_asset_materialization( + metadata={"some_metric": {"raw_value": 0, "type": "int"}}, + data_version="alpha", + ) + + +if __name__ == "__main__": + main()