Skip to content

Commit

Permalink
MINOR: add helper function createTopic to ClusterInstance (apache#1…
Browse files Browse the repository at this point in the history
…6852)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
TaiJuWu authored Aug 31, 2024
1 parent 1841c07 commit 8f4d856
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
8 changes: 8 additions & 0 deletions core/src/test/java/kafka/test/ClusterInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import kafka.test.annotation.Type;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.test.TestUtils;
Expand Down Expand Up @@ -186,6 +187,13 @@ default Set<GroupProtocol> supportedGroupProtocols() {
default void waitTopicDeletion(String topic) throws InterruptedException {
waitForTopic(topic, 0);
}

default void createTopic(String topicName, int partitions, short replicas) throws InterruptedException {
try (Admin admin = createAdminClient()) {
admin.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicas)));
waitForTopic(topicName, partitions);
}
}

void waitForReadyBrokers() throws InterruptedException;

Expand Down
19 changes: 19 additions & 0 deletions core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.server.common.MetadataVersion;

Expand Down Expand Up @@ -232,6 +233,24 @@ public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size());
}



@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
public void testCreateTopic(ClusterInstance clusterInstance) throws Exception {
String topicName = "test";
int numPartition = 3;
short numReplicas = 3;
clusterInstance.createTopic(topicName, numPartition, numReplicas);

try (Admin admin = clusterInstance.createAdminClient()) {
Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s -> s.name().equals(topicName)));
List<TopicPartitionInfo> partitions = admin.describeTopics(Collections.singleton(topicName)).allTopicNames().get()
.get(topicName).partitions();
Assertions.assertEquals(numPartition, partitions.size());
Assertions.assertTrue(partitions.stream().allMatch(partition -> partition.replicas().size() == numReplicas));
}
}

@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testClusterAliveBrokers(ClusterInstance clusterInstance) throws Exception {
clusterInstance.waitForReadyBrokers();
Expand Down

0 comments on commit 8f4d856

Please sign in to comment.