Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with Kafka Connect and Debezium MySQL/Postgres/MongoDb plugin for CDC #3236

Closed
wants to merge 2 commits into from

Conversation

wanghd89
Copy link
Contributor

@wanghd89 wanghd89 commented Aug 24, 2023

Description

Integrate with Kafka Connect and Debezium MySQL/Postgres/MongoDb plugin for CDC

Issues Resolved

Resolves #3233 #3234 #3235

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@sharraj
Copy link

sharraj commented Aug 24, 2023

@engechas @dinujoh Please help with review.

@sharraj
Copy link

sharraj commented Aug 24, 2023

@hshardeesi @kkondaka as well to help with review.

@@ -143,13 +143,13 @@ subprojects {
}
implementation('org.eclipse.jetty:jetty-http') {
version {
require '11.0.15'
require '9.4.48.v20220622'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are going to an old revision?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka Connect doesn't work with 11.0.15, the latest one is with 9.4.48.v20220622.
I mentioned this as the first thing in the high-level design, and has been approved by Rajs and Dinu

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a bit of a problem.

Also, why does this even need Jetty? Perhaps we can disable the admin server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka Connect needs it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the newer version is 9.4.51.v20230217

* Unique with single instance for each pipeline.
*/
public class KafkaConnect {
public static final long CONNECTOR_TIMEOUT_MS = 30000L; // 30 seconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this and the other statics below configurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea, I can take these two as config.

@kkondaka
Copy link
Collaborator

@wanghd89 I have a general question. How do we support codecs (like avro, json, protobuf, etc) with the Kafka connect?

@wanghd89
Copy link
Contributor Author

@wanghd89 I have a general question. How do we support codecs (like avro, json, protobuf, etc) with the Kafka connect?

Kafka Connect support those using key.converter/value.converter. It has its own converter classes.
But in our current phase (Debezium connector to read from MySQL/Postges/Mongo), JSON is the only one we support now. Because for Mysql and postgres, it read from binlog and can translate to JSON format, and mongoDB is as JSON already.

If we start bringing more Kafka Connectors who read data in other codecs, Kafka Connect has configurations support for those.

@kkondaka
Copy link
Collaborator

@wanghd89 please fix the code verification failure.

implementation 'javax.validation:validation-api:2.0.1.Final'
implementation libs.reflections.core
implementation 'io.micrometer:micrometer-core'
implementation 'org.eclipse.jetty:jetty-server:9.4.48.v20220622'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we exclude Jetty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot, it has to run a REST API by Kafka Connect

@@ -143,13 +143,13 @@ subprojects {
}
implementation('org.eclipse.jetty:jetty-http') {
version {
require '11.0.15'
require '9.4.48.v20220622'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a bit of a problem.

Also, why does this even need Jetty? Perhaps we can disable the admin server?

@@ -85,9 +86,11 @@ public DataPrepperConfiguration(
final Duration sinkShutdownTimeout,
@JsonProperty("circuit_breakers") final CircuitBreakerConfig circuitBreakerConfig,
@JsonProperty("source_coordination") final SourceCoordinationConfig sourceCoordinationConfig,
@JsonProperty("kafka_cluster_config") final KafkaClusterConfig kafkaClusterConfig,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a perfect place to use Data Prepper extensions.

See my comment on your issue: #3233 (comment)

*/

@SuppressWarnings("deprecation")
@DataPrepperPlugin(name = "kafka_connect", pluginType = Source.class, pluginConfigurationType = KafkaConnectSourceConfig.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see this comment:

#3233 (comment)

As noted there, we should have different sources for each of the databases: Postgresql, MySQL, MongoDB.

A good way to solve this would be to add a Kafka Connect extensions. See the aws-plugin-api and aws-plugin projects for examples.

You could have a kafka-connect-api project which provides a basic API. Then, the mysql-source project would depend on it. (Similar for postgres-source, and mongodb-source).

The implementation can be an extensions in a project such as kakfa-connect-extension.

@@ -36,6 +36,8 @@ dependencyResolutionManagement {
library('spring-context', 'org.springframework', 'spring-context').versionRef('spring')
version('guava', '32.0.1-jre')
library('guava-core', 'com.google.guava', 'guava').versionRef('guava')
version('reflections', '0.9.12')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about downgrading such a core component.

@@ -3,20 +3,42 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;
package org.opensearch.dataprepper.model.plugin.kafka;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep these out of data-prepper-api. We should reserve this project for the most common APIs, not specific technologies.

You can make use of extensions, or perhaps add a kafka-common project if you want to share with the kafka source/sink.

this.encryptionConfig = encryptionConfig;
}

// @Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove all commented-out code.

@@ -0,0 +1,30 @@
package org.opensearch.dataprepper.plugins.sink;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All new files need a header. Use the following if the code is exclusively new code:

/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 */

There is another if you are pulling from another open-source project. Let us know if you need it.

@wanghd89 wanghd89 closed this Sep 7, 2023
@wanghd89
Copy link
Contributor Author

wanghd89 commented Sep 7, 2023

There is major change for this PR Due to the introduction of Extension Plugins.
I closed this PR and please review at new place: #3313

@wanghd89 wanghd89 deleted the main branch October 24, 2023 01:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support CDC from MySQL
4 participants