diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index 92150cccd7..6e1ffc8ec2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -74,6 +74,8 @@ * @author Gary Russell * @author Artem Bilan * @author Adrian Gygax + * @author Sanghyeok An + * @author Valentina Armenise * * @since 1.3 */ @@ -213,6 +215,15 @@ public void setClusterId(String clusterId) { this.clusterId = clusterId; } + /** + * Get the clusterId property. + * @return the cluster id. + * @since 3.1.8 + */ + public String getClusterId() { + return this.clusterId; + } + @Override public Map getConfigurationProperties() { Map configs2 = new HashMap<>(this.configs); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index a1e6043310..c4f89d8a27 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -102,6 +102,7 @@ * @author Thomas Strauß * @author Soby Chacko * @author Gurps Bassi + * @author Valentina Armenise */ public class KafkaTemplate implements KafkaOperations, ApplicationContextAware, BeanNameAware, ApplicationListener, DisposableBean, SmartInitializingSingleton { @@ -485,13 +486,17 @@ public void afterSingletonsInstantiated() { if (this.kafkaAdmin != null) { Object producerServers = this.producerFactory.getConfigurationProperties() .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); - String adminServers = this.kafkaAdmin.getBootstrapServers(); + String adminServers = getAdminBootstrapAddress(); if (!producerServers.equals(adminServers)) { Map props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties()); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers); int opTo = this.kafkaAdmin.getOperationTimeout(); + String clusterId = this.kafkaAdmin.getClusterId(); this.kafkaAdmin = new KafkaAdmin(props); this.kafkaAdmin.setOperationTimeout(opTo); + if (clusterId != null && !clusterId.isEmpty()) { + this.kafkaAdmin.setClusterId(clusterId); + } } } } @@ -501,6 +506,21 @@ else if (this.micrometerEnabled) { } } + private String getAdminBootstrapAddress() { + // Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available + String adminServers = this.kafkaAdmin.getBootstrapServers(); + + // Fallback to configuration properties if bootstrap servers are not set + if (adminServers == null) { + adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + "" + ).toString(); + } + + return adminServers; + } + @Nullable private String clusterId() { if (this.kafkaAdmin != null && this.clusterId == null) {