Skip to content

Commit

Permalink
Merge pull request #1498 from bcgov/fix/EDX-1565
Browse files Browse the repository at this point in the history
EDX-1565 - Waits for new exchange message saga to finish before refre…
  • Loading branch information
SoLetsDev authored Jul 13, 2023
2 parents 437ef22 + 3958cd9 commit 22e96dd
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
17 changes: 11 additions & 6 deletions backend/src/messaging/handlers/edx-event-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const NATS = require('../message-pub-sub');
const {StringCodec} = require('nats');
const cacheService = require('../../components/cache-service');
const sc = StringCodec();
const TOPICS = [CONSTANTS.WS_MOVE_SCHOOL_TOPIC, CONSTANTS.WS_NEW_SECURE_MESSAGE_TOPIC];


function broadCastMessageToWebSocketClients(msg) {
Expand All @@ -21,23 +22,27 @@ function broadCastMessageToWebSocketClients(msg) {
}
}

async function subscribeToWebSocketMessageTopic(nats) {
async function subscribeToWebSocketMessageTopic(nats, topic) {
const opts = {};
const sub = nats.subscribe(CONSTANTS.WS_MOVE_SCHOOL_TOPIC, opts);
log.info(` listening to ${CONSTANTS.WS_MOVE_SCHOOL_TOPIC}`);
const sub = nats.subscribe(topic, opts);
log.info(` listening to ${topic}`);
for await (const msg of sub) {
const dataStr = sc.decode(msg.data);
const data = JSON.parse(dataStr);
log.info(`Received message, on ${msg.subject} , Subscription Id :: [${msg.sid}], Reply to :: [${msg.reply}] :: Data ::`, data);
await cacheService.loadAllSchoolsToMap();
if (topic === CONSTANTS.WS_MOVE_SCHOOL_TOPIC) {
await cacheService.loadAllSchoolsToMap();
}
broadCastMessageToWebSocketClients(dataStr);
}
}


const EdxSagaMessageHandler = {
subscribe() {
subscribeToWebSocketMessageTopic(NATS.getConnection());
async subscribe() {
for (const topic of TOPICS) {
await subscribeToWebSocketMessageTopic(NATS.getConnection(), topic);
}
},

};
Expand Down
11 changes: 7 additions & 4 deletions backend/src/messaging/handlers/edx-jetstream-subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ const handleJetStreamMessage = async (err, msg) => {
logger.debug(`Received message, on ${msg.subject} , Sequence :: [${msg.seq}], sid :: [${msg.sid}], redelivered :: [${msg.redelivered}] :: Data ::`, data);
try {
if (data.eventType === CONSTANTS.EVENT_TYPE.COPY_USERS_TO_NEW_SCHOOL && data.eventOutcome === CONSTANTS.EVENT_OUTCOME.USERS_TO_NEW_SCHOOL_COPIED) {
await handleEdxMoveEvent(data);
await handleEdxEvent(data, CONSTANTS.WS_MOVE_SCHOOL_TOPIC);
}
else if (data.eventType === CONSTANTS.EVENT_TYPE.SEND_EMAIL_NOTIFICATION_FOR_NEW_SECURE_EXCHANGE && data.eventOutcome === CONSTANTS.EVENT_OUTCOME.EMAIL_NOTIFICATION_FOR_NEW_SECURE_EXCHANGE_SENT) {
await handleEdxEvent(data, CONSTANTS.WS_NEW_SECURE_MESSAGE_TOPIC);
}
msg.ack(); // acknowledge to JetStream
} catch (e) {
logger.error('Error while handling school data from update school event', e);
}
};

async function handleEdxMoveEvent(data) {
async function handleEdxEvent(data, topic) {
logger.debug('Received edx message: ' + JSON.stringify(data.eventPayload));
NATS.publishMessage(CONSTANTS.WS_MOVE_SCHOOL_TOPIC, StringCodec().encode(safeStringify(data))).then(() => { // publish the message only if key was present in redis, otherwise just acknowledge to STAN.
logger.debug(`Message published to ${CONSTANTS.WS_MOVE_SCHOOL_TOPIC}`, data);
NATS.publishMessage(topic, StringCodec().encode(safeStringify(data))).then(() => { // publish the message only if key was present in redis, otherwise just acknowledge to STAN.
logger.info(`Message published to ${topic}`, data);
});
}

Expand Down
10 changes: 7 additions & 3 deletions backend/src/util/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,22 @@ const EVENT_TYPE = Object.freeze({
CREATE_DISTRICT: 'CREATE_DISTRICT',
UPDATE_AUTHORITY: 'UPDATE_AUTHORITY',
CREATE_AUTHORITY: 'CREATE_AUTHORITY',
COPY_USERS_TO_NEW_SCHOOL: 'COPY_USERS_TO_NEW_SCHOOL'
COPY_USERS_TO_NEW_SCHOOL: 'COPY_USERS_TO_NEW_SCHOOL',
SEND_EMAIL_NOTIFICATION_FOR_NEW_SECURE_EXCHANGE: 'SEND_EMAIL_NOTIFICATION_FOR_NEW_SECURE_EXCHANGE'
});
const EVENT_OUTCOME = Object.freeze({
STUDENT_UPDATED: 'STUDENT_UPDATED',
STUDENT_CREATED: 'STUDENT_CREATED',
SCHOOL_UPDATED: 'SCHOOL_UPDATED',
SCHOOL_CREATED: 'SCHOOL_CREATED',
SCHOOL_MOVED: 'SCHOOL_MOVED',
USERS_TO_NEW_SCHOOL_COPIED: 'USERS_TO_NEW_SCHOOL_COPIED'
USERS_TO_NEW_SCHOOL_COPIED: 'USERS_TO_NEW_SCHOOL_COPIED',
EMAIL_NOTIFICATION_FOR_NEW_SECURE_EXCHANGE_SENT: 'EMAIL_NOTIFICATION_FOR_NEW_SECURE_EXCHANGE_SENT'
});
const EVENT_WS_TOPIC = 'EVENT_WS_TOPIC';
const INSTITUTE_CACHE_REFRESH_TOPIC = 'INSTITUTE_CACHE_REFRESH_TOPIC';
const WS_MOVE_SCHOOL_TOPIC = 'WS_MOVE_SCHOOL_TOPIC';
const WS_NEW_SECURE_MESSAGE_TOPIC = 'WS_NEW_SECURE_MESSAGE_TOPIC';
module.exports = {
FILTER_OPERATION,
CONDITION,
Expand All @@ -174,5 +177,6 @@ module.exports = {
EVENT_WS_TOPIC,
INSTITUTE_CACHE_REFRESH_TOPIC,
CACHE_KEYS,
WS_MOVE_SCHOOL_TOPIC
WS_MOVE_SCHOOL_TOPIC,
WS_NEW_SECURE_MESSAGE_TOPIC
};
10 changes: 8 additions & 2 deletions frontend/src/components/secure-message/ExchangePage.vue
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@
<script>
import ApiService from '../../common/apiService';
import {EDX_SAGA_REQUEST_DELAY_MILLISECONDS, Routes} from '@/utils/constants';
import {Routes} from '@/utils/constants';
import PrimaryButton from '../util/PrimaryButton.vue';
import NewMessagePage from './NewMessagePage.vue';
import {mapState} from 'pinia';
Expand All @@ -527,6 +527,7 @@ import alertMixin from '@/mixins/alertMixin';
import {edxStore} from '@/store/modules/edx';
import {appStore} from '@/store/modules/app';
import {authStore} from '@/store/modules/auth';
import {notificationsStore} from '@/store/modules/notifications';
export default {
name: 'ExchangeInbox',
Expand Down Expand Up @@ -600,6 +601,7 @@ export default {
...mapState(authStore, ['userInfo']),
...mapState(appStore, ['schoolMap', 'districtMap', 'mincodeSchoolNames']),
...mapState(edxStore, ['statuses', 'ministryTeams']),
...mapState(notificationsStore, ['notification']),
secureExchangeStatusCodes() {
return this.statuses;
},
Expand Down Expand Up @@ -642,6 +644,11 @@ export default {
},
pageNumber() {
this.getExchanges();
},
notification(notificationData) {
if (notificationData?.eventType === 'SEND_EMAIL_NOTIFICATION_FOR_NEW_SECURE_EXCHANGE' && notificationData?.eventOutcome === 'EMAIL_NOTIFICATION_FOR_NEW_SECURE_EXCHANGE_SENT') {
this.getExchanges();
}
}
},
created() {
Expand All @@ -664,7 +671,6 @@ export default {
},
messageSent() {
this.newMessageSheet = !this.newMessageSheet;
setTimeout(this.getExchanges, EDX_SAGA_REQUEST_DELAY_MILLISECONDS);
},
getMinistryTeamNameByGroupRoleID() {
this.ministryTeamName = this.ministryTeams.find(item => item.groupRoleIdentifier === this.ministryOwnershipGroupRoleID).teamName;
Expand Down

0 comments on commit 22e96dd

Please sign in to comment.