Skip to content

Commit

Permalink
Write DiscoverGranules Distributed Map input to S3 (#299)
Browse files Browse the repository at this point in the history
Fixes #296
  • Loading branch information
chuckwondo authored Nov 28, 2023
1 parent db373b9 commit 6af1af1
Show file tree
Hide file tree
Showing 21 changed files with 469 additions and 133 deletions.
12 changes: 12 additions & 0 deletions app/stacks/cumulus/iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ locals {
#-------------------------------------------------------------------------------

data "aws_iam_policy_document" "allow_sfn_distributed_maps" {
# Allow StepFunctions to manage "child" executions for Distributed Maps.
statement {
effect = "Allow"
actions = [
Expand All @@ -35,6 +36,17 @@ data "aws_iam_policy_document" "allow_sfn_distributed_maps" {
]
resources = ["*"]
}

# Allow StepFunctions to read input from S3, which is necessary when the size
# of the input array message might exceed the quota (256KiB).
statement {
effect = "Allow"
actions = [
"s3:Get*",
"s3:List*"
]
resources = ["*"]
}
}

# Associate permissions above with a policy
Expand Down
11 changes: 10 additions & 1 deletion app/stacks/cumulus/templates/discover-granules-workflow.asl.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@
"End": true,
"MaxConcurrency": 10,
"ToleratedFailurePercentage": 0,
"ItemsPath": "$",
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
"InputType": "JSON"
},
"Parameters": {
"Bucket.$": "$.bucket",
"Key.$": "$.key"
}
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"lambda-archive-dir": "build/main"
},
"scripts": {
"build": "tsc -p tsconfig.json",
"build": "tsc --build",
"build:scripts": "yarn --cwd scripts build",
"clean:build": "rm -rf build/*",
"clean:dependencies": "rm -rf node_modules/* node_modules/.bin node_modules/.cache node_modules/.yarn-integrity",
Expand All @@ -29,7 +29,7 @@
"test": "run-s build test:*",
"test:lint": "eslint src --ext .ts",
"test:prettier": "prettier \"src/**/*.ts\" --check",
"test:unit": "nyc --silent ava",
"test:unit": "nyc --silent ava --node-arguments '--trace-warnings'",
"check-cli": "run-s test diff-integration-tests check-integration-tests",
"check-integration-tests": "run-s check-integration-test:*",
"diff-integration-tests": "mkdir -p diff && rm -rf diff/test && cp -r test diff/test && rm -rf diff/test/test-*/.git && cd diff && git init --quiet && git add -A && git commit --quiet --no-verify --allow-empty -m 'WIP' && echo '\\n\\nCommitted most recent integration test output in the \"diff\" directory. Review the changes with \"cd diff && git diff HEAD\" or your preferred git diff viewer.'",
Expand All @@ -48,7 +48,7 @@
"reset-hard": "git clean -dfx && git reset --hard && yarn",
"prepare-release": "run-s reset-hard test cov:check doc:html version doc:publish",
"lambda:install": "yarn install --production --no-bin-links --modules-folder ${PWD}/${npm_package_config_lambda_archive_dir}/node_modules && rm -rf ${PWD}/${npm_package_config_lambda_archive_dir}/node_modules/aws-sdk && rm -rf ${PWD}/${npm_package_config_lambda_archive_dir}/node_modules/'@types'",
"lambda:archive-exploded": "run-s clean:build test lambda:install",
"lambda:archive-exploded": "run-s build test lambda:install",
"tf:lambda:archive-exploded": "yarn lambda:archive-exploded >&2 && echo { '\"'dir'\"': '\"'${PWD}/${npm_package_config_lambda_archive_dir}'\"' }"
},
"engines": {
Expand Down Expand Up @@ -85,6 +85,7 @@
"@types/aws-lambda": "^8.10.85",
"@types/lodash": "^4.14.177",
"@types/node": "^16.11.1",
"@types/uuid": "^9.0.7",
"@typescript-eslint/eslint-plugin": "^5.1.0",
"@typescript-eslint/parser": "^5.1.0",
"ava": "^4.3.3",
Expand Down
54 changes: 34 additions & 20 deletions src/lib/aws/lambda.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import * as CMA from '@cumulus/cumulus-message-adapter-js';
import * as M from '@cumulus/types/message';
import * as L from 'aws-lambda';
import * as E from 'fp-ts/Either';
import { pipe } from 'fp-ts/function';
import * as E from 'fp-ts/lib/Either';
import { pipe } from 'fp-ts/lib/function';
import * as t from 'io-ts';
import * as fp from 'lodash/fp';

Expand Down Expand Up @@ -100,28 +100,42 @@ export const asyncHandlerFor =
* handler for use with the Cumulus Message Adapter (CMA)
*/
export const cmaAsyncHandler =
<A, B>(handler: AsyncHandler<A, B | ReadonlyArray<B>>) =>
async (
event: CMAEvent,
context: L.Context
): Promise<CMAResult | readonly CMAResult[]> => {
// eslint-disable-next-line functional/no-let
let handlerResult: B | ReadonlyArray<B> | undefined = undefined;
<A, B>(handler: AsyncHandler<A, B>) =>
async (event: CMAEvent, context: L.Context): Promise<CMAResult> =>
await CMA.runCumulusTask(handler, event, context);

/**
*
* @param handler
* @param indexName
* @returns
*/
export const cmaAsyncHandlerIndexed =
(indexName: string) =>
<A, B>(handler: AsyncHandler<A, ReadonlyArray<B>>) =>
async (event: CMAEvent, context: L.Context): Promise<ReadonlyArray<CMAResult>> => {
// We must capture the original result returned by the handler so we can iterate
// over it after the call to runCumulusTask. We are forced to take this approach
// because although the `payload` property of the value returned from runCumulusTask
// represents the value returned by the handler passed to runCumulusTask, the
// property will be empty in cases where it is written to S3. Therefore, this
// approach captures the value before it is written to S3, so we can iterate over
// it after runCumulusTask returns.

// eslint-disable-next-line functional/no-let
let handlerResult: ReadonlyArray<B>;
const handlerWrapper = async (event: A, context: L.Context) =>
// Capture the original result returned by the handler so we can inspect it after
// CMA.runCumulusTask returns to see whether or not the handler produced an array.
(handlerResult = await handler(event, context));

const result = await CMA.runCumulusTask(handlerWrapper, event, context);

// If the handler produced an array, that means we want to also return an array
// from this function. The resulting array is of the same length as the array
// produced by the handler, each element being a copy of the CMA result, along with
// a 0-based meta.batchIndex value corresponding to the item's index within the
// array. This is so that a corresponding "unbatch" handler can select individual
// batches for use with a Map state for parallelizing batch processing.
// We want to return an array the same length as the array returned by the handler,
// each element being a copy of the CMA result, along with a 0-based index value
// corresponding to the item's index within the array. The index value is inserted
// into the element's `meta` property using the specified indexName. This is so
// that a corresponding "get at index" handler can select individual array elements
// for use with a Map state for parallel processing.

return Array.isArray(handlerResult)
? Array.from(handlerResult, (_, i) => fp.set(['meta', 'batchIndex'], i, result))
: result;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return handlerResult!.map((_, index) => fp.set(['meta', indexName], index, result));
};
65 changes: 49 additions & 16 deletions src/lib/aws/s3.fixture.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { Readable } from 'stream';

import { GetObjectCommandInput } from '@aws-sdk/client-s3';
import {
GetObjectCommandInput,
GetObjectCommandOutput,
PutObjectCommandInput,
PutObjectCommandOutput,
} from '@aws-sdk/client-s3';
import { sdkStreamMixin } from '@smithy/util-stream';

const store: { readonly [Bucket: string]: { readonly [Key: string]: string } } = {
// eslint-disable-next-line functional/prefer-readonly-type
const store: { [Bucket: string]: { [Key: string]: string } } = {
'my-bucket': {
'empty': '',
'empty.json': '{}',
Expand Down Expand Up @@ -34,23 +40,50 @@ const store: { readonly [Bucket: string]: { readonly [Key: string]: string } } =
},
};

export const getObject = async (args: GetObjectCommandInput) => {
export const mockGetObject = (
args: GetObjectCommandInput
): Promise<GetObjectCommandOutput> => {
const { Bucket, Key } = args;

// eslint-disable-next-line functional/no-throw-statement
if (!Bucket) throw new Error('No Bucket specified');
// eslint-disable-next-line functional/no-throw-statement
if (!store[Bucket]) throw new Error(`Bucket not found: ${Bucket}`);
// eslint-disable-next-line functional/no-throw-statement
if (!Key) throw new Error('No Key specified');
if (!store[Bucket][Key])
// eslint-disable-next-line functional/no-throw-statement
throw new Error(`Object not found in bucket ${Bucket}: ${Key}`);

return {
if (!Bucket) {
return Promise.reject(new Error('No Bucket specified'));
}
if (!store[Bucket]) {
return Promise.reject(new Error(`Bucket not found: ${Bucket}`));
}
if (!Key) {
return Promise.reject(new Error('No Key specified'));
}
if (!store[Bucket][Key]) {
return Promise.reject(new Error(`Object not found in bucket ${Bucket}: ${Key}`));
}

return Promise.resolve({
$metadata: {},
Body: sdkStreamMixin(Readable.from([store[Bucket][Key]])),
};
});
};

export const getObjectNotFound = async () => Promise.reject(new Error('not found'));
export const mockPutObject = (
args: PutObjectCommandInput
): Promise<PutObjectCommandOutput> => {
const { Bucket, Key, Body } = args;

if (!Bucket) {
return Promise.reject(new Error('No Bucket specified'));
}
if (!Key) {
return Promise.reject(new Error('No Key specified'));
}
if (!Body) {
return Promise.reject(new Error('No Body specified'));
}
if (!store[Bucket]) {
return Promise.reject(new Error(`Bucket not found: ${Bucket}`));
}

// eslint-disable-next-line functional/immutable-data
store[Bucket][Key] = Body.toString();

return Promise.resolve({ $metadata: {} });
};
42 changes: 34 additions & 8 deletions src/lib/aws/s3.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import test from 'ava';
import * as RT from 'fp-ts/ReaderTask';
import * as RTE from 'fp-ts/ReaderTaskEither';
import { pipe } from 'fp-ts/function';
import * as RT from 'fp-ts/lib/ReaderTask';
import * as RTE from 'fp-ts/lib/ReaderTaskEither';
import { pipe } from 'fp-ts/lib/function';

import * as S3 from './s3';
import { getObject, getObjectNotFound } from './s3.fixture';
import { mockGetObject, mockPutObject } from './s3.fixture';

test('safeGetObject gets the contents of an existing object', async (t) => {
const s3 = { getObject };
const s3 = { getObject: mockGetObject };
const program = pipe(
S3.safeGetObject({ Bucket: 'my-bucket', Key: 'greeting.txt' }),
RTE.matchE(
Expand All @@ -26,12 +26,38 @@ test('safeGetObject gets the contents of an existing object', async (t) => {
});

test('safeGetObject returns Left(Error) upon S3 failure', async (t) => {
const s3 = { getObject: getObjectNotFound };
const s3 = { getObject: mockGetObject };
const program = pipe(
S3.safeGetObject({ Bucket: 'my-bucket', Key: 'greeting.txt' }),
S3.safeGetObject({ Bucket: 'no-such-bucket', Key: 'greeting.txt' }),
RTE.matchE(
(e) => RT.of(t.regex(e.message, /not found/)),
(output) => RT.of(t.fail(`Unexpected output received: ${JSON.stringify(output)}`))
)
);

return program({ s3 })();
});

test('safePutObject stores an object in an existing bucket', async (t) => {
const s3 = { putObject: mockPutObject };
const program = pipe(
S3.safePutObject({ Bucket: 'my-bucket', Key: 'something-new', Body: 'foo' }),
RTE.matchE(
(e) => RT.of(t.fail(`Unexpected error: ${e}`)),
(output) => RT.of(t.pass(JSON.stringify(output)))
)
);

return program({ s3 })();
});

test('safePutObject returns Left(Error) upon S3 failure', async (t) => {
const s3 = { putObject: mockPutObject };
const program = pipe(
S3.safePutObject({ Bucket: 'no-such-bucket', Key: 'something-new', Body: 'foo' }),
RTE.matchE(
(e) => RT.of(t.regex(e.message, /not found/)),
(output) => RT.of(t.fail(`Unexpected output received: ${output}`))
(output) => RT.of(t.fail(`Unexpected output received: ${JSON.stringify(output)}`))
)
);

Expand Down
21 changes: 16 additions & 5 deletions src/lib/aws/s3.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { GetObjectCommandInput, GetObjectCommandOutput, S3 } from '@aws-sdk/client-s3';
import * as E from 'fp-ts/Either';
import * as RTE from 'fp-ts/ReaderTaskEither';
import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';
import {
GetObjectCommandInput,
GetObjectCommandOutput,
PutObjectCommandInput,
PutObjectCommandOutput,
S3,
} from '@aws-sdk/client-s3';
import * as E from 'fp-ts/lib/Either';
import * as RTE from 'fp-ts/lib/ReaderTaskEither';
import * as TE from 'fp-ts/lib/TaskEither';
import { pipe } from 'fp-ts/lib/function';

export type HasS3<K extends keyof S3> = { readonly s3: Pick<S3, K> };

Expand Down Expand Up @@ -42,3 +48,8 @@ export const safeReadObject = (
)
)
);

export const safePutObject = (
input: PutObjectCommandInput
): RTE.ReaderTaskEither<HasS3<'putObject'>, Error, PutObjectCommandOutput> =>
TE.tryCatchK(({ s3 }) => s3.putObject(input), E.toError);
Loading

0 comments on commit 6af1af1

Please sign in to comment.