Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch iterator example #370

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scripts/list-of-samples.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"activities-cancellation-heartbeating",
"activities-dependency-injection",
"activities-examples",
"batch",
"child-workflows",
"continue-as-new",
"cron-workflows",
Expand Down
3 changes: 3 additions & 0 deletions batch/.eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
lib
.eslintrc.js
48 changes: 48 additions & 0 deletions batch/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const { builtinModules } = require('module');

const ALLOWED_NODE_BUILTINS = new Set(['assert']);

module.exports = {
root: true,
parser: '@typescript-eslint/parser',
parserOptions: {
project: './tsconfig.json',
tsconfigRootDir: __dirname,
},
plugins: ['@typescript-eslint', 'deprecation'],
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended',
'prettier',
],
rules: {
// recommended for safety
'@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad
'deprecation/deprecation': 'warn',

// code style preference
'object-shorthand': ['error', 'always'],

// relaxed rules, for convenience
'@typescript-eslint/no-unused-vars': [
'warn',
{
argsIgnorePattern: '^_',
varsIgnorePattern: '^_',
},
],
'@typescript-eslint/no-explicit-any': 'off',
},
overrides: [
{
files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'],
rules: {
'no-restricted-imports': [
'error',
...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]),
],
},
},
],
};
2 changes: 2 additions & 0 deletions batch/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
lib
node_modules
1 change: 1 addition & 0 deletions batch/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package-lock=false
1 change: 1 addition & 0 deletions batch/.nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
16
18 changes: 18 additions & 0 deletions batch/.post-create
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
To begin development, install the Temporal CLI:

Mac: {cyan brew install temporal}
Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest

Start Temporal Server:

{cyan temporal server start-dev}

Use Node version 16+:

Mac: {cyan brew install node@16}
Other: https://nodejs.org/en/download/

Then, in the project directory, using two other shells, run these commands:

{cyan npm run start.watch}
{cyan npm run workflow}
1 change: 1 addition & 0 deletions batch/.prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lib
2 changes: 2 additions & 0 deletions batch/.prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
printWidth: 120
singleQuote: true
3 changes: 3 additions & 0 deletions batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Batch Examples

## [Iterator](./src/iterator/README.md)
42 changes: 42 additions & 0 deletions batch/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"name": "child-workflows",
"version": "0.1.0",
"private": true,
"scripts": {
"build": "tsc --build",
"build.watch": "tsc --build --watch",
"lint": "eslint .",
"start-iterator": "ts-node src/iterator/worker.ts",
"start-iterator.watch": "nodemon src/iterator/worker.ts",
"workflow-iterator": "ts-node src/iterator/client.ts",
"format": "prettier --config .prettierrc 'src/**/*.ts' --write"
},
"nodemonConfig": {
"execMap": {
"ts": "ts-node"
},
"ext": "ts",
"watch": [
"src"
]
},
"dependencies": {
"@temporalio/activity": "^1.10.1",
"@temporalio/client": "^1.10.1",
"@temporalio/worker": "^1.10.1",
"@temporalio/workflow": "^1.10.1"
},
"devDependencies": {
"@tsconfig/node16": "^1.0.0",
"@types/node": "^16.11.43",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^7.32.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-deprecation": "^1.2.1",
"nodemon": "^2.0.22",
"prettier": "^2.8.8",
"ts-node": "^10.9.2",
"typescript": "^4.4.2"
}
}
21 changes: 21 additions & 0 deletions batch/src/iterator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Batch Iterator

A sample implementation of the Workflow iterator pattern.

A workflow starts a configured number of Child Workflows in parallel. Each child processes a single record.
After all children close (complete or fail), the parent calls continue-as-new and starts the children for the next page of records.

The parent tracks and returns the total number of records processed and the number of failed ones.

This allows processing a set of records of any size. The advantage of this approach is simplicity.
The main disadvantage is that it processes records in batches, with each batch waiting for the slowest child workflow.

A variation of this pattern runs activities instead of child workflows.

## Running this sample

1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation).
2. Navigate to the parent directory (`batch`), and run:
1. `npm install` to install dependencies.
2. `npm run start-iterator.watch` to start the Worker.
3. In another shell, `npm run workflow-iterator` to run the Workflow.
20 changes: 20 additions & 0 deletions batch/src/iterator/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
export async function getRecords(pageSize: number, offset: number) {
// This always returns 2 pages, the real implementation would iterate over an existing dataset or file.
const PAGE_COUNT = 2;
const result = [];
if (offset < pageSize * PAGE_COUNT) {
for (let i = 0; i < pageSize; i++) {
result.push(new Record(offset + i));
}
}
return result;
}

export class Record {
public readonly id: any;
public readonly description: string;
constructor(id: number) {
this.id = id;
this.description = 'record number ' + this.id;
}
}
25 changes: 25 additions & 0 deletions batch/src/iterator/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Connection, Client } from '@temporalio/client';
import { processBatch } from './workflows';

async function run() {
const connection = await Connection.connect();
const client = new Client({ connection });

const handle = await client.workflow.start(processBatch, {
taskQueue: 'tq-iterator-wf',
workflowId: 'iterator-wf',
args: [
{
pageSize: 5,
offset: 0,
},
],
});
const result = await handle.result();
console.log('Execution result:', result);
}

run().catch((err) => {
console.error(err);
process.exit(1);
});
17 changes: 17 additions & 0 deletions batch/src/iterator/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Worker } from '@temporalio/worker';
import * as activities from './activities';

async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'tq-iterator-wf',
});

await worker.run();
}

run().catch((err) => {
console.error(err);
process.exit(1);
});
89 changes: 89 additions & 0 deletions batch/src/iterator/workflows.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { ChildWorkflowHandle, continueAsNew, log, sleep, startChild, workflowInfo } from '@temporalio/workflow';

import { proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';
import { Record } from './activities';
import { ApplicationFailure } from '@temporalio/workflow';

const { getRecords } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});

export async function processBatch(batch: Batch, previousExecutionResult?: Result): Promise<Result> {
// load the records to process in this batch
const records: Record[] = await getRecords(batch.pageSize, batch.offset);

// Starts a child per record asynchronously.
const handles: Array<ChildWorkflowHandle<any>> = await Promise.all(
records.map((record) => {
return startChild(recordProcessor, {
workflowId: workflowInfo().workflowId + '/child-' + record.id,
args: [record],
});
})
);

const totalProcessedRecords = previousExecutionResult
? previousExecutionResult.totalProcessedRecords + handles.length
: handles.length;
let failedRecords = previousExecutionResult ? previousExecutionResult.failedRecords : 0;

//wait for all child workflows to close
for (const handle of handles) {
await handle.result().catch(() => {
//intentionally failing 1/5 child workflow, track child workflows failures.
failedRecords++;
});
}

const executionResult = {
totalProcessedRecords,
failedRecords,
};

//Complete the workflow if there are no more records to process
if (records.length == 0) {
return executionResult;
}

//Continue as new to process the next batch
return continueAsNew(
{
pageSize: batch.pageSize,
offset: batch.offset + records.length,
},
executionResult
);
}

export async function recordProcessor(record: Record): Promise<void> {
log.info(`Processing record ${JSON.stringify(record)} in child workflow `);

const maxSleep = 2000;
const minSleep = 1000;

//sleep to simulate record processing
await sleep(Math.floor(Math.random() * (maxSleep - minSleep + 1) + minSleep));

//intentionally failing 1/5 child workflow
if (record.id % 5 == 0) {
throw ApplicationFailure.nonRetryable(
`Intentionally failing the child workflow with input ${JSON.stringify(record)}`
);
}
}

export class Batch {
public readonly pageSize: number;
public readonly offset: number;

constructor(pageSize: number, offset: number) {
this.pageSize = pageSize;
this.offset = offset;
}
}

interface Result {
totalProcessedRecords: number;
failedRecords: number;
}
12 changes: 12 additions & 0 deletions batch/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"extends": "@tsconfig/node16/tsconfig.json",
"version": "4.4.2",
"compilerOptions": {
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"rootDir": "./src",
"outDir": "./lib"
},
"include": ["src/**/*.ts"]
}
Loading