Skip to content

Commit

Permalink
KAFKA-17395 Flaky test testMissingOffsetNoResetPolicy for new consumer (
Browse files Browse the repository at this point in the history
apache#17056)

In AsyncKafkaConsumer, FindCoordinatorRequest is sent by background thread. In MockClient#prepareResponseFrom, it only matches the response to a future request. If there is some race condition, FindCoordinatorResponse may not match to a FindCoordinatorRequest. It's better to put MockClient#prepareResponseFrom before the request to avoid flaky test.

Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
FrankYang0529 authored Aug 31, 2024
1 parent 3efa785 commit 4a2577b
Showing 1 changed file with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -924,16 +924,14 @@ public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws I
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));

client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());

// lookup committed offset and find nothing
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);

consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));

if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the offset fetch event added by a call to poll, to be processed
// by the background thread, so it can realize there are no committed offsets and then
Expand Down

0 comments on commit 4a2577b

Please sign in to comment.