Skip to content

Commit

Permalink
[dagster-aws] [docs] add docs for PipesEMRClient
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 4, 2024
1 parent c158f37 commit 1b18d97
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# start_asset_marker
import os

import boto3
from dagster_aws.pipes import PipesEMRClient

from dagster import AssetExecutionContext, asset


@asset
def glue_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient):
return pipes_emr_client.run(
context=context,
run_job_flow_params={
"Name": "Example Job",
"Instances": {
"MasterInstanceType": "m5.xlarge",
"SlaveInstanceType": "m5.xlarge",
"InstanceCount": 3,
},
"Steps": [
{
"Name": "Example Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"example.py",
],
},
}
],
},
).get_materialize_result()


# end_asset_marker

# start_definitions_marker

from dagster import Definitions # noqa


defs = Definitions(
assets=[glue_pipes_asset],
resources={"pipes_emr_client": PipesEMRClient()},
)

# end_definitions_marker
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesS3ContextLoader,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
main()

0 comments on commit 1b18d97

Please sign in to comment.