Skip to content

Commit

Permalink
Fix kafka plugin dependencies. (opensearch-project#4169)
Browse files Browse the repository at this point in the history
1. Integration test dependencies were being pulled in when compiling
   source code and unit tests.
2. The wrong namespace for json-schema-validator was being used.
3. Remove catching BrokerEndPointNotAvailableException because that
   exception will not be thrown by Kafka clients.3. Remove catching
BrokerEndPointNotAvailableException because that exception will not be
thrown by Kafka clients.3. Remove catching
BrokerEndPointNotAvailableException because that exception will not be
thrown by Kafka clients.
Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 authored Feb 21, 2024
1 parent 049732a commit cad5e3a
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 15 deletions.
18 changes: 9 additions & 9 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,16 @@ dependencies {
implementation 'io.micrometer:micrometer-core'
implementation libs.commons.lang3
implementation 'io.confluent:kafka-avro-serializer:7.4.0'
implementation 'io.confluent:kafka-json-schema-serializer:7.4.0'
implementation 'io.confluent:kafka-schema-registry-client:7.4.0'
implementation ('io.confluent:kafka-schema-registry:7.4.0:tests') {
exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet'
exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2'
exclude group: 'org.glassfish.jersey.ext', module: 'jersey-bean-validation'
}
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:kafka'
implementation 'software.amazon.awssdk:kms'
implementation 'software.amazon.msk:aws-msk-iam-auth:2.0.3'
implementation 'software.amazon.glue:schema-registry-serde:1.1.15'
implementation 'io.confluent:kafka-json-schema-serializer:7.4.0'
implementation project(':data-prepper-plugins:failures-common')
implementation 'com.github.fge:json-schema-validator:2.2.14'
implementation 'com.github.java-json-tools:json-schema-validator:2.2.14'
implementation 'commons-collections:commons-collections:3.2.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
Expand All @@ -65,7 +60,6 @@ dependencies {
testImplementation 'org.apache.kafka:kafka_2.13:3.6.1'
testImplementation 'org.apache.kafka:kafka_2.13:3.6.1:test'
testImplementation 'org.apache.curator:curator-test:5.5.0'
testImplementation 'io.confluent:kafka-schema-registry:7.4.0'
testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39')
testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9'
testImplementation project(':data-prepper-plugins:otel-metrics-source')
Expand All @@ -74,8 +68,15 @@ dependencies {
testImplementation libs.protobuf.util
testImplementation libs.commons.io
testImplementation libs.armeria.grpc
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'

integrationTestImplementation testLibs.junit.vintage
integrationTestImplementation 'io.confluent:kafka-schema-registry:7.4.0'
integrationTestImplementation ('io.confluent:kafka-schema-registry:7.4.0:tests') {
exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet'
exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2'
exclude group: 'org.glassfish.jersey.ext', module: 'jersey-bean-validation'
}

constraints {
implementation('org.mozilla:rhino') {
Expand Down Expand Up @@ -130,4 +131,3 @@ task integrationTest(type: Test) {
includeTestsMatching '*IT'
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import kafka.common.BrokerEndPointNotAvailableException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -108,8 +107,7 @@ public List<KafkaCustomConsumer> createConsumersForTopic(final KafkaConsumerConf

});
} catch (Exception e) {
if (e instanceof BrokerNotAvailableException ||
e instanceof BrokerEndPointNotAvailableException || e instanceof TimeoutException) {
if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) {
LOG.error("The Kafka broker is not available.");
} else {
LOG.error("Failed to setup the Kafka Source Plugin.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import kafka.common.BrokerEndPointNotAvailableException;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -146,8 +145,7 @@ public void start(Buffer<Record<Event>> buffer) {
executorService.submit(consumer);
});
} catch (Exception e) {
if (e instanceof BrokerNotAvailableException ||
e instanceof BrokerEndPointNotAvailableException || e instanceof TimeoutException) {
if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) {
LOG.error("The kafka broker is not available...");
} else {
LOG.error("Failed to setup the Kafka Source Plugin.", e);
Expand Down

0 comments on commit cad5e3a

Please sign in to comment.