Skip to content

Commit

Permalink
Merge pull request #52 from guidesmiths/chore/handler-subscription-fi…
Browse files Browse the repository at this point in the history
…ltering

fix: subscription handler filtering using wrong property
  • Loading branch information
Bounteous17 authored Sep 29, 2021
2 parents e63eae9 + 4b82d92 commit 6a6c7df
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 19 deletions.
6 changes: 3 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ module.exports = () => {
debug(`Enqueued items increase | ${enqueuedItems} items`);
debug(`Handling message on topic ${topic}`);
const { applicationProperties } = brokeredMessage;
const { subscriptionName } = applicationProperties;
const { subscriptionName: messageSubscription } = applicationProperties;

if (!subscriptionName || subscriptionId === subscriptionName) {
if (!messageSubscription || subscription === messageSubscription) {
/**
* The handler is only going to run if the "subscriptionName" property
* The handler is only going to run if the "messageSubscription" property
* does not exists. Or if it exists and is the current subscription from all
* the different ones that the topic can contain.
* But the message confirmation operation will always be done, even if the handler
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "systemic-azure-bus",
"version": "3.1.0",
"version": "3.1.1",
"description": "A systemic component for azure bus",
"main": "index.js",
"scripts": {
Expand Down
46 changes: 31 additions & 15 deletions test/topics/e2e.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require('dotenv').config();
const expect = require('expect.js');
const { bus, createPayload, schedule, attack } = require('../helper'); // eslint-disable-line object-curly-newline
const { bus, createPayload, schedule, attack, sleep } = require('../helper'); // eslint-disable-line object-curly-newline

const stressTopic = 'stress.test';

Expand Down Expand Up @@ -77,32 +77,48 @@ describe('Topics - Systemic Azure Bus API', () => {

it('publish a message with explicit messageId and check structure on receiving if the "subscriptionName" is the one',
() => new Promise(async resolve => {
const payload = { ...createPayload(), applicationProperties: { subscriptionName: 'mocha-test' } };
const payload = createPayload();
const messageId = '8734258619';
const publish = busApi.publish('fire');

const handler = async msg => {
process.env.SUBSCRIPTION_FILTERED = true;
expect(msg.properties.messageId).to.be.eql(messageId);
const messagesConsumed = async () => {
// Not active messages should exists
const messagesActive = await busApi.peek('assess', 10);
expect(messagesActive.length).to.be(0);
};

const handler = async ({ properties, applicationProperties }) => {
process.env.HANDLER_EXPECTS_EXECUTED = true;
expect(applicationProperties.subscriptionName).to.be.eql(`${stressTopic}.assess`);
expect(properties.messageId).to.be.eql(messageId);
};
busApi.safeSubscribe('assess', handler);
await publish(payload, { messageId });
expect(process.env.SUBSCRIPTION_FILTERED).to.be.eql(undefined);
await publish({
...payload,

await publish(payload, {
messageId,
applicationProperties: {
subscriptionName: 'mocha-test',
},
});
await sleep(2000);
await messagesConsumed();
process.env.HANDLER_EXPECTS_EXECUTED = undefined;

await publish(payload, {
messageId,
applicationProperties: {
subscriptionName: `${stressTopic}.assess`,
},
}, { messageId });
});
await sleep(2000);
// Not active messages should exists
await messagesConsumed();
process.env.HANDLER_EXPECTS_EXECUTED = 'true';

// DLQ should be empty
const messagesInDlq = await busApi.peekDlq('assess', 10);
expect(messagesInDlq.length).to.be(0);
// Not active messages should exists
const messagesActive = await busApi.peek('assess', 10);
expect(messagesActive.length).to.be(0);
// After the last publish with the right 'subscriptionName' property value the hander should be done
expect(process.env.SUBSCRIPTION_FILTERED).to.be.eql('true');

resolve();
}));

Expand Down

0 comments on commit 6a6c7df

Please sign in to comment.