Skip to content

Commit

Permalink
Integrate CDC data from MySQL/Postgres/MongoDb data source (opensearc…
Browse files Browse the repository at this point in the history
…h-project#3313)

Signed-off-by: Haidong <[email protected]>
  • Loading branch information
wanghd89 authored Oct 12, 2023
1 parent 633401a commit 5b822f3
Show file tree
Hide file tree
Showing 70 changed files with 4,045 additions and 32 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ subprojects {
}
dependencies {
implementation platform('com.fasterxml.jackson:jackson-bom:2.15.0')
implementation platform('org.eclipse.jetty:jetty-bom:11.0.17')
implementation platform('org.eclipse.jetty:jetty-bom:9.4.53.v20231009')
implementation platform('io.micrometer:micrometer-bom:1.10.5')
implementation libs.guava.core
implementation libs.slf4j.api
Expand Down Expand Up @@ -154,13 +154,13 @@ subprojects {
}
implementation('org.eclipse.jetty:http2-common') {
version {
require '11.0.17'
require '9.4.53.v20231009'
}
because 'Fixes CVE-2023-44487'
}
implementation('org.eclipse.jetty:http2-server') {
version {
require '11.0.17'
require '9.4.53.v20231009'
}
because 'Fixes CVE-2023-44487'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

Expand Down Expand Up @@ -78,8 +79,11 @@ public <T> T gauge(String name, T obj, ToDoubleFunction<T> valueFunction) {
return Metrics.gauge(getMeterName(name), obj, valueFunction);
}

public <T> T gaugeWithTags(String name, Iterable<Tag> tags, T obj, ToDoubleFunction<T> valueFunction) {
return Metrics.gauge(getMeterName(name), tags, obj, valueFunction);
}

private String getMeterName(final String name) {
return new StringJoiner(MetricNames.DELIMITER).add(metricsPrefix).add(name).toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -125,6 +126,18 @@ public void testReferenceGauge() {
assertEquals(3, gauge.length());
}

@Test
public void testReferenceGaugeWithTags() {
final String testString = "abc";
final String gauge = objectUnderTest.gaugeWithTags("gauge", emptyList(), testString, String::length);
assertNotNull(
Metrics.globalRegistry.get(new StringJoiner(MetricNames.DELIMITER)
.add(PIPELINE_NAME).add(PLUGIN_NAME)
.add("gauge").toString()).meter());
assertEquals(3, gauge.length());
}


@Test
public void testEmptyPipelineName() {
assertThrows(
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies {
testImplementation project(':data-prepper-plugins:common').sourceSets.test.output
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation "org.reflections:reflections:0.10.2"
implementation libs.reflections.core
implementation 'io.micrometer:micrometer-core'
implementation 'io.micrometer:micrometer-registry-prometheus'
implementation 'io.micrometer:micrometer-registry-cloudwatch2'
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {
implementation libs.commons.lang3
implementation libs.bouncycastle.bcprov
implementation libs.bouncycastle.bcpkix
implementation 'org.reflections:reflections:0.10.2'
implementation libs.reflections.core
implementation 'io.micrometer:micrometer-core'
testImplementation testLibs.junit.vintage
implementation 'org.apache.parquet:parquet-common:1.13.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;

import java.util.Collection;

@DataPrepperPlugin(name = "noop", pluginType = Sink.class)
public class NoopSink implements Sink<Record<Object>> {
@Override
public void output(Collection<Record<Object>> records) {
// empty by design.
}

@Override
public void shutdown() {
// empty by design.
}

@Override
public void initialize() {
// empty by design.
}

@Override
public boolean isReady() {
return true;
}
}
Loading

0 comments on commit 5b822f3

Please sign in to comment.