Skip to content

Commit

Permalink
Speed Up initial snapshot from MongoDB (opensearch-project#3675)
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
wanghd89 authored Jan 10, 2024
1 parent e198637 commit fbc6fab
Show file tree
Hide file tree
Showing 17 changed files with 1,400 additions and 44 deletions.
36 changes: 26 additions & 10 deletions data-prepper-plugins/kafka-connect-plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,26 @@ Snapshot Mode [ref](https://debezium.io/documentation/reference/stable/connector
| table_name | YES | | String | The table name to ingest, using *schemaName.tableName* format. |

### MongoDB
| Option | Required | Default | Type | Description |
|-----------------|----------|----------|--------------|---------------------------------------------------------------------------------------------------------------------|
| hostname | YES | | String | The hostname of MySQL. |
| port | NO | 27017 | String | The port of MySQL. |
| ssl | NO | FALSE | Boolean | Connector will use SSL to connect to MongoDB instances. |
| snapshot_mode | NO | initial | String | MongoDB snapshot mode. |
| credentials | YES | | Credentials | The Credentials to access the database. |
| collections | YES | | List\<Collection\> | The collections to ingest CDC data. |
| force_update | NO | FALSE | Boolean | When restarting or updating a pipeline, whether to force all connectors to update their config even if the connector name already exists. By default, if the connector name exists, the config will not be updated. The connector name is `<topic_prefix>.<table_name>`. |
| Option | Required | Default | Type | Description |
|----------------|----------|---------------|--------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hostname | YES | | String | The hostname of MongoDB server. |
| port | NO | 27017 | String | The port of MongoDB server. |
| ssl | NO | FALSE | Boolean | Connector will use SSL to connect to MongoDB instances. |
| ingestion_mode | NO | export_stream | String | MongoDB ingestion mode. Available options: export_stream, stream, export |
| export_config | NO | | ExportConfig | The Export Config |
| credentials | YES | | Credentials | The Credentials to access the database. |
| collections | YES | | List\<Collection\> | The collections to ingest CDC data. |
| force_update | NO | FALSE | Boolean | When restarting or updating a pipeline, whether to force all connectors to update their config even if the connector name already exists. By default, if the connector name exists, the config will not be updated. The connector name is `<topic_prefix>.<table_name>`. |
Snapshot Mode [ref](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-snapshot-mode)

#### ExportConfig
| Option | Required | Default | Type | Description |
|---------------------|------------|--------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| acknowledgments | No | FALSE | Boolean | When true, enables the opensearch source to receive end-to-end acknowledgments when events are received by OpenSearch sinks. Default is false. |
| items_per_partition | No | 4000 | Long | Number of Items per partition during initial export. |
| read_preference | No | secondaryPreferred | String | Operations typically read data from secondary members of the replica set. If the replica set has only one single primary member and no other members, operations read data from the primary member. |


#### Collection

| Option | Required | Default | Type | Description |
Expand Down Expand Up @@ -272,7 +281,14 @@ Each connector contains following metrics:
- `source-record-active-count-avg`: Average number of records polled by the task but not yet completely written to Kafka
- `source-record-active-count`: Most recent number of records polled by the task but not yet completely written to Kafka

## Developer Guide
## MongoDB Export Metric
MongoDB export has the following metrics:
- `exportRecordsSuccessTotal`: Number of records writes to the Buffer layer successfully.
- `exportRecordsFailedTotal`: Number of records failed to write to the Buffer layer.
- `exportPartitionSuccessTotal`: Number of partition been processed successfully
- `exportPartitionFailureTotal`: Number of partition failed to be processed.

# Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-connect-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ configurations.all {
}

dependencies {
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:kafka-plugins')
implementation 'org.apache.kafka:connect-runtime:3.5.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public String getPassword() {
return password;
}

static class PlainText {
public static class PlainText {
private String username;
private String password;

Expand All @@ -74,7 +74,7 @@ public PlainText(@JsonProperty("username") String username,
}
}

static class SecretManager {
public static class SecretManager {
private String region;
private String secretId;
private String stsRoleArn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package org.opensearch.dataprepper.plugins.kafkaconnect.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.kafkaconnect.util.Connector;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -19,7 +21,7 @@ public class MongoDBConfig extends ConnectorConfig {
public static final String CONNECTOR_CLASS = "io.debezium.connector.mongodb.MongoDbConnector";
private static final String MONGODB_CONNECTION_STRING_FORMAT = "mongodb://%s:%s/?replicaSet=rs0&directConnection=true";
private static final String DEFAULT_PORT = "27017";
private static final String DEFAULT_SNAPSHOT_MODE = "initial";
private static final String DEFAULT_SNAPSHOT_MODE = "never";
private static final Boolean SSL_ENABLED = false;
private static final Boolean SSL_INVALID_HOST_ALLOWED = false;
private static final String DEFAULT_SNAPSHOT_FETCH_SIZE = "1000";
Expand All @@ -30,8 +32,10 @@ public class MongoDBConfig extends ConnectorConfig {
private String port = DEFAULT_PORT;
@JsonProperty("credentials")
private CredentialsConfig credentialsConfig;
@JsonProperty("snapshot_mode")
private String snapshotMode = DEFAULT_SNAPSHOT_MODE;
@JsonProperty("ingestion_mode")
private IngestionMode ingestionMode = IngestionMode.EXPORT_STREAM;
@JsonProperty("export_config")
private ExportConfig exportConfig = new ExportConfig();
@JsonProperty("snapshot_fetch_size")
private String snapshotFetchSize = DEFAULT_SNAPSHOT_FETCH_SIZE;
@JsonProperty("collections")
Expand All @@ -50,13 +54,45 @@ public List<Connector> buildConnectors() {
}).collect(Collectors.toList());
}

public IngestionMode getIngestionMode() {
return this.ingestionMode;
}

public CredentialsConfig getCredentialsConfig() {
return this.credentialsConfig;
}

public String getHostname() {
return this.hostname;
}

public String getPort() {
return this.port;
}

public Boolean getSSLEnabled() {
return this.ssl;
}

public Boolean getSSLInvalidHostAllowed() {
return this.sslInvalidHostAllowed;
}

public List<CollectionConfig> getCollections() {
return this.collections;
}

public ExportConfig getExportConfig() {
return this.exportConfig;
}

private Map<String, String> buildConfig(final CollectionConfig collection) {
Map<String, String> config = new HashMap<>();
config.put("connector.class", CONNECTOR_CLASS);
config.put("mongodb.connection.string", String.format(MONGODB_CONNECTION_STRING_FORMAT, hostname, port));
config.put("mongodb.user", credentialsConfig.getUsername());
config.put("mongodb.password", credentialsConfig.getPassword());
config.put("snapshot.mode", snapshotMode);
config.put("snapshot.mode", DEFAULT_SNAPSHOT_MODE);
config.put("snapshot.fetch.size", snapshotFetchSize);
config.put("topic.prefix", collection.getTopicPrefix());
config.put("collection.include.list", collection.getCollectionName());
Expand All @@ -71,7 +107,30 @@ private Map<String, String> buildConfig(final CollectionConfig collection) {
return config;
}

private static class CollectionConfig {
public enum IngestionMode {
EXPORT_STREAM("export_stream"),
EXPORT("export"),
STREAM("stream");

private static final Map<String, IngestionMode> OPTIONS_MAP = Arrays.stream(IngestionMode.values())
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

private final String type;

IngestionMode(final String type) {
this.type = type;
}

@JsonCreator
public static IngestionMode fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}
}

public static class CollectionConfig {
@JsonProperty("topic_prefix")
@NotNull
private String topicPrefix;
Expand All @@ -88,4 +147,27 @@ public String getTopicPrefix() {
return topicPrefix;
}
}

public static class ExportConfig {
private static int DEFAULT_ITEMS_PER_PARTITION = 4000;
private static String DEFAULT_READ_PREFERENCE = "secondaryPreferred";
@JsonProperty("acknowledgments")
private Boolean acknowledgments = false;
@JsonProperty("items_per_partition")
private Integer itemsPerPartition = DEFAULT_ITEMS_PER_PARTITION;
@JsonProperty("read_preference")
private String readPreference = DEFAULT_READ_PREFERENCE;

public boolean getAcknowledgements() {
return this.acknowledgments;
}

public Integer getItemsPerPartition() {
return this.itemsPerPartition;
}

public String getReadPreference() {
return this.readPreference;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,59 @@
@SuppressWarnings("deprecation")
public abstract class KafkaConnectSource implements Source<Record<Object>> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectSource.class);
private final ConnectorConfig connectorConfig;
private final KafkaConnectConfig kafkaConnectConfig;
public final ConnectorConfig connectorConfig;
private final String pipelineName;
private KafkaConnectConfig kafkaConnectConfig;
private KafkaConnect kafkaConnect;

public KafkaConnectSource(final ConnectorConfig connectorConfig,
final PluginMetrics pluginMetrics,
final PipelineDescription pipelineDescription,
final KafkaClusterConfigSupplier kafkaClusterConfigSupplier,
final KafkaConnectConfigSupplier kafkaConnectConfigSupplier) {
if (kafkaClusterConfigSupplier == null || kafkaConnectConfigSupplier == null) {
throw new IllegalArgumentException("Extensions: KafkaClusterConfig and KafkaConnectConfig cannot be null");
}
this.connectorConfig = connectorConfig;
this.pipelineName = pipelineDescription.getPipelineName();
this.kafkaConnectConfig = kafkaConnectConfigSupplier.getConfig();
this.updateConfig(kafkaClusterConfigSupplier);
this.kafkaConnect = KafkaConnect.getPipelineInstance(
pipelineName,
pluginMetrics,
kafkaConnectConfig.getConnectStartTimeout(),
kafkaConnectConfig.getConnectorStartTimeout());
if (shouldStartKafkaConnect()) {
if (kafkaClusterConfigSupplier == null || kafkaConnectConfigSupplier == null) {
throw new IllegalArgumentException("Extensions: KafkaClusterConfig and KafkaConnectConfig cannot be null");
}
this.kafkaConnectConfig = kafkaConnectConfigSupplier.getConfig();
this.updateConfig(kafkaClusterConfigSupplier);
this.kafkaConnect = KafkaConnect.getPipelineInstance(
pipelineName,
pluginMetrics,
kafkaConnectConfig.getConnectStartTimeout(),
kafkaConnectConfig.getConnectorStartTimeout());
}
}

@Override
public void start(Buffer<Record<Object>> buffer) {
LOG.info("Starting Kafka Connect Source for pipeline: {}", pipelineName);
// Please make sure buildWokerProperties is always first to execute.
final WorkerProperties workerProperties = this.kafkaConnectConfig.getWorkerProperties();
Map<String, String> workerProps = workerProperties.buildKafkaConnectPropertyMap();
if (workerProps.get(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG) == null || workerProps.get(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG).isEmpty()) {
throw new IllegalArgumentException("Bootstrap Servers cannot be null or empty");
if (shouldStartKafkaConnect()) {
LOG.info("Starting Kafka Connect Source for pipeline: {}", pipelineName);
// Please make sure buildWokerProperties is always first to execute.
final WorkerProperties workerProperties = this.kafkaConnectConfig.getWorkerProperties();
Map<String, String> workerProps = workerProperties.buildKafkaConnectPropertyMap();
if (workerProps.get(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG) == null || workerProps.get(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG).isEmpty()) {
throw new IllegalArgumentException("Bootstrap Servers cannot be null or empty");
}
final List<Connector> connectors = this.connectorConfig.buildConnectors();
kafkaConnect.addConnectors(connectors);
kafkaConnect.initialize(workerProps);
kafkaConnect.start();
}
final List<Connector> connectors = this.connectorConfig.buildConnectors();
kafkaConnect.addConnectors(connectors);
kafkaConnect.initialize(workerProps);
kafkaConnect.start();
}

@Override
public void stop() {
LOG.info("Stopping Kafka Connect Source for pipeline: {}", pipelineName);
kafkaConnect.stop();
if (shouldStartKafkaConnect()) {
LOG.info("Stopping Kafka Connect Source for pipeline: {}", pipelineName);
kafkaConnect.stop();
}
}

public boolean shouldStartKafkaConnect() {
return true;
}

private void updateConfig(final KafkaClusterConfigSupplier kafkaClusterConfigSupplier) {
Expand Down
Loading

0 comments on commit fbc6fab

Please sign in to comment.