Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nats Subject Consumer #488

Draft
wants to merge 4 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/ci_staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ jobs:
echo WEEKLY_REPORT_ENABLE= >> .env
- run: yarn nx affected --target format --base $BASE
- run: yarn nx affected:lint --base $BASE
- name: Install NATS server
run: |
wget https://github.com/nats-io/nats-server/releases/download/v2.9.15/nats-server-v2.9.15-amd64.deb
sudo dpkg -i nats-server-v2.9.15-amd64.deb
- name: Start NATS server
run: |
nats-server --jetstream & echo $! > nats-server.pid
- name: Test
run: yarn nx affected:test --base $BASE
- name: Add frontend env vars
Expand Down
1 change: 0 additions & 1 deletion apps/protocol-frontend/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"alwaysStrict": true,
"noImplicitAny": true,
"strict": true,
// "strict": true,
"types": ["vite/client", "@nxext/react/client"]
},
"files": [],
Expand Down
3 changes: 3 additions & 0 deletions libs/govrn-nats/.babelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"presets": [["@nrwl/web/babel", { "useBuiltIns": "usage" }]]
}
18 changes: 18 additions & 0 deletions libs/govrn-nats/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"extends": ["../../.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
}
]
}
11 changes: 11 additions & 0 deletions libs/govrn-nats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# govrn-nats

This library was generated with [Nx](https://nx.dev).

## Running unit tests

Run `nx test govrn-nats` to execute the unit tests via [Jest](https://jestjs.io).

## Running lint

Run `nx lint govrn-nats` to execute the lint via [ESLint](https://eslint.org/).
16 changes: 16 additions & 0 deletions libs/govrn-nats/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* eslint-disable */
export default {
displayName: 'govrn-nats',
preset: '../../jest.preset.js',
globals: {
'ts-jest': {
tsconfig: '<rootDir>/tsconfig.spec.json',
},
},
testEnvironment: 'node',
transform: {
'^.+\\.[tj]sx?$': 'ts-jest',
},
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx'],
coverageDirectory: '../../coverage/libs/govrn-nats',
};
33 changes: 33 additions & 0 deletions libs/govrn-nats/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"$schema": "../../node_modules/nx/schemas/project-schema.json",
"sourceRoot": "libs/govrn-nats/src",
"projectType": "library",
"targets": {
"build": {
"executor": "@nrwl/js:tsc",
"outputs": ["{options.outputPath}"],
"options": {
"outputPath": "dist/libs/govrn-nats",
"main": "libs/govrn-nats/src/index.ts",
"tsConfig": "libs/govrn-nats/tsconfig.lib.json",
"assets": ["libs/govrn-nats/*.md"]
}
},
"lint": {
"executor": "@nrwl/linter:eslint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["libs/govrn-nats/**/*.ts"]
}
},
"test": {
"executor": "@nrwl/jest:jest",
"outputs": ["coverage/libs/govrn-nats"],
"options": {
"jestConfig": "libs/govrn-nats/jest.config.ts",
"passWithNoTests": true
}
}
},
"tags": []
}
1 change: 1 addition & 0 deletions libs/govrn-nats/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './lib/SubjectConsumer';
88 changes: 88 additions & 0 deletions libs/govrn-nats/src/lib/SubjectConsumer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import SubjectConsumer from './SubjectConsumer';
import { NatsConnection, JSONCodec } from 'nats';

interface Message {
id: string;
}

const generateStreamName = () =>
`test-stream-${Math.floor(Math.random() * 10)}`;

const generateMessages = (count: number): Message[] => {
const messages: Message[] = [];
for (let i = 0; i < count; i++) {
messages.push({ id: `test-message-${i}` });
}
return messages;
};
const publishMessages = async (
nc: NatsConnection,
streamName: string,
subjectName: string,
messages: Message[],
) => {
const js = await nc.jetstream();
const subj = `${streamName}.row`;
for (const msg of messages) {
await js.publish(subj, JSONCodec().encode(`${msg}`));
}
};

const deleteStream = async (nc: NatsConnection, streamName: string) => {
const js = await nc.jetstreamManager();
await js.streams.delete(streamName);
};

const createStream = async (nc: NatsConnection, streamName: string) => {
const js = await nc.jetstreamManager();
await js.streams.add({
name: streamName,
subjects: [`${streamName}.row`],
});
};

describe('SubjectConsumer', () => {
let consumer: SubjectConsumer<Message>;
let streamName: string;

jest.setTimeout(10000);

beforeAll(async () => {
streamName = generateStreamName();
consumer = await SubjectConsumer.create({
servers: ['localhost'],
});
await createStream(consumer.natsConnection, streamName);
});

afterAll(async () => {
await deleteStream(consumer.natsConnection, streamName);
await consumer.close();
});

it('should retrieve n published messages', done => {
const data = generateMessages(10);
const callback = jest.fn();

(async () => {
await publishMessages(
consumer.natsConnection,
streamName,
`${streamName}.row`,
data,
);
let tracker = 0;
await consumer.pull(`${streamName}.row`, async msg => {
expect(msg.id).toBe(data[tracker].id);
tracker++;

callback(msg.id);
if (callback.mock.calls.length === data.length) {
expect(callback.mock.calls.length).toEqual(data.length);
expect(callback).toHaveBeenCalledWith(data.map(d => d.id));
}
done();
});
})();
});
});
86 changes: 86 additions & 0 deletions libs/govrn-nats/src/lib/SubjectConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import {
AckPolicy,
connect,
ConnectionOptions,
nanos,
NatsConnection,
} from 'nats';
import { decodeJSON, streamExists } from './utils';

class SubjectConsumer<MessageType> {
private connection: NatsConnection | null = null;

private constructor(
private readonly connectionOptions: Partial<ConnectionOptions>,
) {}

private async init() {
this.connection = await connect(this.connectionOptions);
}

public static async create<MessageType>(
connectionOptions: Partial<ConnectionOptions>,
) {
const consumer = new SubjectConsumer<MessageType>(connectionOptions);
await consumer.init();
return consumer;
}

public async pull(
subject: string,
callback: (data: MessageType) => void,
expires = 5000,
batch = 10,
) {
if (!this.connection) {
throw new Error(':: Connection not initialized');
}

const [streamName] = subject.split('.');
if (!(await streamExists(this.connection, streamName))) {
throw new Error(`Stream ${streamName} does not exist`);
}

console.log(`:: Consuming messages from ${streamName}`);
const js = this.connection.jetstream();

const subscription = await js.pullSubscribe(subject, {
mack: true,
config: {
durable_name: `durable-${streamName}-${Date.now()}`,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(4000),
},
});

const done = (async () => {
await subscription.pull({ no_wait: true, batch, expires });
for await (const m of subscription) {
await callback(decodeJSON<MessageType>(m));
m.ack();
}
})();

await done;
subscription.unsubscribe();
}

public get natsConnection() {
if (!this.connection) {
throw new Error(':: Connection not initialized');
}
return this.connection;
}

public async close() {
if (!this.connection) {
throw new Error(':: Connection not initialized');
}

await this.connection.drain();
await this.connection.close();
console.log(':: Connection closed');
}
}

export default SubjectConsumer;
23 changes: 23 additions & 0 deletions libs/govrn-nats/src/lib/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {
JetStreamClient,
JetStreamManager,
JsMsg,
JSONCodec,
NatsConnection,
} from 'nats';

export const decodeJSON = <T>(msg: JsMsg): T =>
JSONCodec().decode(msg.data) as T;

export const streamExists = async (
connection: NatsConnection,
streamName: string,
) => {
const js = await connection.jetstreamManager();
try {
await js.streams.info(streamName);
return true;
} catch (e) {
return false;
}
};
19 changes: 19 additions & 0 deletions libs/govrn-nats/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"extends": "../../tsconfig.base.json",
"files": [],
"include": [],
"references": [
{
"path": "./tsconfig.lib.json"
},
{
"path": "./tsconfig.spec.json"
}
],
"compilerOptions": {
"forceConsistentCasingInFileNames": true,
"strict": true,
"noImplicitReturns": true,
"noFallthroughCasesInSwitch": true
}
}
10 changes: 10 additions & 0 deletions libs/govrn-nats/tsconfig.lib.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "../../dist/out-tsc",
"declaration": true,
"types": ["node"]
},
"include": ["**/*.ts"],
"exclude": ["jest.config.ts", "**/*.spec.ts"]
}
20 changes: 20 additions & 0 deletions libs/govrn-nats/tsconfig.spec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "../../dist/out-tsc",
"module": "commonjs",
"types": ["jest", "node"]
},
"include": [
"jest.config.ts",
"**/*.test.ts",
"**/*.spec.ts",
"**/*.test.tsx",
"**/*.spec.tsx",
"**/*.test.js",
"**/*.spec.js",
"**/*.test.jsx",
"**/*.spec.jsx",
"**/*.d.ts"
]
}
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"graphql-tag": "^2.12.6",
"ipfs-http-client": "56.0.0",
"lodash": "^4.17.21",
"nats": "^2.13.1",
"notistack": "^1.0.10",
"nx": "14.7.13",
"pg": "^8.8.0",
Expand Down Expand Up @@ -126,6 +127,7 @@
"@nrwl/eslint-plugin-nx": "14.7.13",
"@nrwl/express": "14.7.13",
"@nrwl/jest": "14.7.13",
"@nrwl/js": "14.7.13",
"@nrwl/linter": "14.7.13",
"@nrwl/node": "14.7.13",
"@nrwl/react": "14.7.13",
Expand Down
1 change: 1 addition & 0 deletions tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"skipDefaultLibCheck": true,
"baseUrl": ".",
"paths": {
"@govrn-monorepo/govrn-nats": ["libs/govrn-nats/src/index.ts"],
"@govrn/govrn-contract-client": [
"libs/govrn-contract-client/src/index.ts"
],
Expand Down
1 change: 1 addition & 0 deletions workspace.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"govrn-contract": "apps/govrn-contract",
"govrn-contract-client": "libs/govrn-contract-client",
"govrn-contract-subgraph": "apps/govrn-contract-subgraph",
"nats": "libs/govrn-nats",
"govrn-subgraph-client": "libs/govrn-subgraph-client",
"jobs-contribution-verification": "apps/jobs/contribution-verification",
"kevin-malone": "apps/kevin-malone",
Expand Down
Loading