Skip to content

Commit

Permalink
Merge pull request #50 from guidesmiths/chore/handler-subscription-fi…
Browse files Browse the repository at this point in the history
…ltering

chore: handler subscription filtering
  • Loading branch information
Bounteous17 authored Sep 28, 2021
2 parents c7db762 + 47737cb commit e63eae9
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 6 deletions.
21 changes: 20 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,26 @@ module.exports = () => {
enqueuedItems++;
debug(`Enqueued items increase | ${enqueuedItems} items`);
debug(`Handling message on topic ${topic}`);
await handler({ body: getBodyDecoded(brokeredMessage.body, brokeredMessage.applicationProperties.contentEncoding), applicationProperties: brokeredMessage.applicationProperties, properties: getProperties(brokeredMessage) });
const { applicationProperties } = brokeredMessage;
const { subscriptionName } = applicationProperties;

if (!subscriptionName || subscriptionId === subscriptionName) {
/**
* The handler is only going to run if the "subscriptionName" property
* does not exists. Or if it exists and is the current subscription from all
* the different ones that the topic can contain.
* But the message confirmation operation will always be done, even if the handler
* is not executed because of the comment above.
*/
await handler({
body: getBodyDecoded(
brokeredMessage.body,
applicationProperties.contentEncoding,
),
applicationProperties,
properties: getProperties(brokeredMessage),
});
}
await receiver.completeMessage(brokeredMessage);
} catch (e) {
const subscriptionErrorStrategy = (errorHandling || {}).strategy;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "systemic-azure-bus",
"version": "3.0.0",
"version": "3.1.0",
"description": "A systemic component for azure bus",
"main": "index.js",
"scripts": {
Expand Down
4 changes: 0 additions & 4 deletions test/topics/dlq.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,8 @@ describe('Topics - Systemic Azure Bus API - DLQ', () => {

expect(messagesInDlq.length).to.be(BULLETS);

await sleep(8000); // needed for correct peek

await busApi.emptyDlq('assess');

await sleep(8000); // needed for correct peek

const messagesInDlqAfterEmptying = await busApi.peekDlq('assess', BULLETS);

expect(messagesInDlqAfterEmptying.length).to.be(0);
Expand Down
31 changes: 31 additions & 0 deletions test/topics/e2e.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,37 @@ describe('Topics - Systemic Azure Bus API', () => {
await publish(payload, { messageId });
}));

it('publish a message with explicit messageId and check structure on receiving if the "subscriptionName" is the one',
() => new Promise(async resolve => {
const payload = { ...createPayload(), applicationProperties: { subscriptionName: 'mocha-test' } };
const messageId = '8734258619';
const publish = busApi.publish('fire');

const handler = async msg => {
process.env.SUBSCRIPTION_FILTERED = true;
expect(msg.properties.messageId).to.be.eql(messageId);
};

busApi.safeSubscribe('assess', handler);
await publish(payload, { messageId });
expect(process.env.SUBSCRIPTION_FILTERED).to.be.eql(undefined);
await publish({
...payload,
applicationProperties: {
subscriptionName: `${stressTopic}.assess`,
},
}, { messageId });
// DLQ should be empty
const messagesInDlq = await busApi.peekDlq('assess', 10);
expect(messagesInDlq.length).to.be(0);
// Not active messages should exists
const messagesActive = await busApi.peek('assess', 10);
expect(messagesActive.length).to.be(0);
// After the last publish with the right 'subscriptionName' property value the hander should be done
expect(process.env.SUBSCRIPTION_FILTERED).to.be.eql('true');
resolve();
}));

it('publishes lots of messages with no explicit messageId and receives them all', () => new Promise(async resolve => {
const BULLETS = 10;
const publishFire = busApi.publish('fire');
Expand Down

0 comments on commit e63eae9

Please sign in to comment.