diff --git a/src/GoogleCloudPubSub/GoogleCloudPubSub.ts b/src/GoogleCloudPubSub/GoogleCloudPubSub.ts index fbb9ef80..89651108 100644 --- a/src/GoogleCloudPubSub/GoogleCloudPubSub.ts +++ b/src/GoogleCloudPubSub/GoogleCloudPubSub.ts @@ -168,7 +168,11 @@ export class GoogleCloudPubSub implements GCPubSub { data: Record, opts: EmitOptions = {}, ): Promise { - const topic: Topic = await this.getOrCreateTopic(event, opts.options?.topicOptions, opts.options?.publishOptions); + const topic: Topic = await this.getOrCreateTopic( + this.getTopicName(event), + opts.options?.topicOptions, + opts.options?.publishOptions, + ); this.logger.debug(`Found topic ${topic.name} for event ${event}`); const attributes: Attributes = { ...opts.options?.messageOptions?.attributes }; diff --git a/test/GoogleCloudPubSub.test.ts b/test/GoogleCloudPubSub.test.ts index 607fc828..132c7e5e 100644 --- a/test/GoogleCloudPubSub.test.ts +++ b/test/GoogleCloudPubSub.test.ts @@ -148,6 +148,51 @@ test('GPS001b - should properly emit and listen with ordering key', async (t: Ex }); }); +test('GPS001c - should properly emit and listen with a prefix', async (t: ExecutionContext): Promise => { + const topicName: string = generateRandomTopicName(); + const topicsPrefix: string = 'pref'; + const pubSub: GCPubSub = PubSubFactory.create({ + transport: Transport.GOOGLE_PUBSUB, + options: { + projectId, + topicsPrefix, + }, + }); + + await new Promise((resolve, reject) => { + void pubSub.listen(topicName, { + options: { + autoAck: true, + }, + onMessage(message: EmittedMessage): void { + const spy: sinon.SinonSpy = sinon.spy(message.getOriginalMessage(), 'ack'); + if (isPayloadError(message.payload)) { + return reject('Error in payload'); + } + + const payload: OnMessage = message.payload; + t.deepEqual(payload, { + hello: 'world', + }); + t.falsy(spy.called); + t.truthy(message.id); + t.truthy(message.ackId); + t.truthy(message.emittedAt); + t.truthy(message.receivedAt); + t.is(message.count, 0); + t.truthy(message.duration); + t.pass(`Listen successfully to the topic ${topicName}`); + resolve(true); + }, + onError(error) { + reject(error); + }, + }); + + emitAfterDelay(pubSub, topicName); + }); +}); + test('GPS002 - should properly emit but the ack method is never called - no ack', async (t: ExecutionContext): Promise => { const topicName: string = generateRandomTopicName(); const pubSub: GCPubSub = PubSubFactory.create({