Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with Kafka Connect and Debezium MySQL/Postgres/MongoDb plugin for CDC #3236

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ subprojects {
}
implementation('org.eclipse.jetty:jetty-http') {
version {
require '11.0.15'
require '9.4.48.v20220622'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are going to an old revision?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka Connect doesn't work with 11.0.15, the latest one is with 9.4.48.v20220622.
I mentioned this as the first thing in the high-level design, and has been approved by Rajs and Dinu

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a bit of a problem.

Also, why does this even need Jetty? Perhaps we can disable the admin server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka Connect needs it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the newer version is 9.4.51.v20230217

}
because 'CVE from transitive dependencies'
}
implementation('org.eclipse.jetty:jetty-server') {
version {
require '11.0.15'
require '9.4.48.v20220622'
}
because 'CVE from transitive dependencies'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

Expand Down Expand Up @@ -78,8 +79,11 @@ public <T> T gauge(String name, T obj, ToDoubleFunction<T> valueFunction) {
return Metrics.gauge(getMeterName(name), obj, valueFunction);
}

public <T> T gaugeWithTags(String name, Iterable<Tag> tags, T obj, ToDoubleFunction<T> valueFunction) {
return Metrics.gauge(getMeterName(name), tags, obj, valueFunction);
}

private String getMeterName(final String name) {
return new StringJoiner(MetricNames.DELIMITER).add(metricsPrefix).add(name).toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,42 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;
package org.opensearch.dataprepper.model.plugin.kafka;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep these out of data-prepper-api. We should reserve this project for the most common APIs, not specific technologies.

You can make use of extensions, or perhaps add a kafka-common project if you want to share with the kafka source/sink.


import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;

import java.util.stream.Stream;

/**
* A helper class that helps to read auth related configuration values from
* pipelines.yaml
*/
public class AuthConfig {

@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

/*
* TODO
public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication

public SslAuthConfig() {
}
}

@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}

*/

public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}

public static class SaslAuthConfig {
@JsonProperty("plaintext")
private PlainTextAuthConfig plainTextAuthConfig;
Expand Down Expand Up @@ -45,40 +67,6 @@ public OAuthConfig getOAuthConfig() {
public String getSslEndpointAlgorithm() {
return sslEndpointAlgorithm;
}

@AssertTrue(message = "Only one of AwsIam or oAuth or PlainText auth config must be specified")
public boolean hasOnlyOneConfig() {
return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n != null).count() == 1;
}

}


/*
* TODO
public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication

public SslAuthConfig() {
}
}

@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}

*/

@Valid
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;
package org.opensearch.dataprepper.model.plugin.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import jakarta.validation.Valid;

public class AwsConfig {

public static class AwsMskConfig {
@Valid
@Size(min = 20, max = 2048, message = "msk_arn length should be between 20 and 2048 characters")
@JsonProperty("arn")
private String arn;

Expand All @@ -32,13 +28,9 @@ public MskBrokerConnectionType getBrokerConnectionType() {
@JsonProperty("msk")
private AwsMskConfig awsMskConfig;

@Valid
@Size(min = 1, message = "Region cannot be empty string")
@JsonProperty("region")
private String region;

@Valid
@Size(min = 20, max = 2048, message = "sts_role_arn length should be between 20 and 2048 characters")
@JsonProperty("sts_role_arn")
private String stsRoleArn;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;
package org.opensearch.dataprepper.model.plugin.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.Map;
Expand All @@ -28,7 +28,7 @@ public enum AwsIamAuthConfig {
}

@JsonCreator
static AwsIamAuthConfig fromOptionValue(final String option) {
public static AwsIamAuthConfig fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.plugin.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;

public class EncryptionConfig {
@JsonProperty("type")
private EncryptionType type = EncryptionType.SSL;

@JsonProperty("insecure")
private boolean insecure = false;

public EncryptionType getType() {
return type;
}

public boolean getInsecure() {
return insecure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;
package org.opensearch.dataprepper.model.plugin.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;

Expand All @@ -28,7 +28,7 @@ public enum EncryptionType {
}

@JsonCreator
static EncryptionType fromTypeValue(final String type) {
public static EncryptionType fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.plugin.kafka;

public interface KafkaClusterAuthConfig {
AwsConfig getAwsConfig();

AuthConfig getAuthConfig();

EncryptionConfig getEncryptionConfig();

String getBootStrapServers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;
package org.opensearch.dataprepper.model.plugin.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;

Expand All @@ -29,7 +29,7 @@ public enum MskBrokerConnectionType {
}

@JsonCreator
static MskBrokerConnectionType fromTypeValue(final String type) {
public static MskBrokerConnectionType fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.kafka.configuration;
package org.opensearch.dataprepper.model.plugin.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;
package org.opensearch.dataprepper.model.plugin.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.plugin.kafka;

import java.util.List;

/**
* An interface that a plugin who uses Kafka Cluster will implement when kafka_cluster_config is configured
*/
public interface UsesKafkaClusterConfig {

void setBootstrapServers(final List<String> bootstrapServers);

void setKafkaClusterAuthConfig(final AuthConfig authConfig,
final AwsConfig awsConfig,
final EncryptionConfig encryptionConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -125,6 +126,18 @@ public void testReferenceGauge() {
assertEquals(3, gauge.length());
}

@Test
public void testReferenceGaugeWithTags() {
final String testString = "abc";
final String gauge = objectUnderTest.gaugeWithTags("gauge", emptyList(), testString, String::length);
assertNotNull(
Metrics.globalRegistry.get(new StringJoiner(MetricNames.DELIMITER)
.add(PIPELINE_NAME).add(PLUGIN_NAME)
.add("gauge").toString()).meter());
assertEquals(3, gauge.length());
}


@Test
public void testEmptyPipelineName() {
assertThrows(
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies {
testImplementation project(':data-prepper-plugins:common').sourceSets.test.output
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation "org.reflections:reflections:0.10.2"
implementation libs.reflections.core
implementation 'io.micrometer:micrometer-core'
implementation 'io.micrometer:micrometer-registry-prometheus'
implementation 'io.micrometer:micrometer-registry-cloudwatch2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.plugin.kafka.UsesKafkaClusterConfig;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.parser.model.KafkaClusterConfig;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration;
Expand Down Expand Up @@ -233,6 +235,8 @@ private void buildPipelineFromConfiguration(

final Router router = routerFactory.createRouter(pipelineConfiguration.getRoutes());

configPluginsUsesKafka(source, buffer, decoratedProcessorSets, sinks);

final Pipeline pipeline = new Pipeline(pipelineName, source, buffer, decoratedProcessorSets, sinks, router,
eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, processorThreads, readBatchDelay,
dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(),
Expand All @@ -247,6 +251,20 @@ private void buildPipelineFromConfiguration(

}

private void configPluginsUsesKafka(final Source source,
final Buffer buffer,
final List<List<Processor>> decoratedProcessorSets,
final List<DataFlowComponent<Sink>> sinks) {
if (source instanceof UsesKafkaClusterConfig) {
final KafkaClusterConfig kafkaClusterConfig = dataPrepperConfiguration.getKafkaClusterConfig();
((UsesKafkaClusterConfig) source).setBootstrapServers(kafkaClusterConfig.getBootStrapServers());
((UsesKafkaClusterConfig) source).setKafkaClusterAuthConfig(
kafkaClusterConfig.getAuthConfig(),
kafkaClusterConfig.getAwsConfig(),
kafkaClusterConfig.getEncryptionConfig());
}
}

private List<IdentifiedComponent<Processor>> newProcessor(final PluginSetting pluginSetting) {
final List<Processor> processors = pluginFactory.loadPlugins(
Processor.class,
Expand Down
Loading
Loading