From d427f801fdc7182f61b95572228bf64694747e76 Mon Sep 17 00:00:00 2001 From: rajeshLovesToCode Date: Mon, 21 Aug 2023 16:06:36 +0530 Subject: [PATCH] -Support for kafka-sink Signed-off-by: rajeshLovesToCode --- .../dataprepper/plugins/kafka/source/KafkaSource.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index f67720f5db..ea021f8474 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -9,7 +9,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaJsonDeserializer; -//import kafka.common.BrokerEndPointNotAvailableException; +import kafka.common.BrokerEndPointNotAvailableException; import org.apache.avro.generic.GenericRecord; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -148,12 +148,12 @@ public void start(Buffer> buffer) { executorService.submit(consumer); }); } catch (Exception e) { - /* if (e instanceof BrokerNotAvailableException || + if (e instanceof BrokerNotAvailableException || e instanceof BrokerEndPointNotAvailableException || e instanceof TimeoutException) { LOG.error("The kafka broker is not available..."); } else { LOG.error("Failed to setup the Kafka Source Plugin.", e); - }*/ + } throw new RuntimeException(e); } LOG.info("Started Kafka source for topic " + topic.getName());