Skip to content

Commit

Permalink
-Support for kafka-sink
Browse files Browse the repository at this point in the history
Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
rajeshLovesToCode committed Aug 16, 2023
2 parents 3a73019 + 252a0dd commit 6050fa6
Show file tree
Hide file tree
Showing 43 changed files with 1,023 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class InMemorySinkAccessor {
public List<Record<Event>> get(final String testingKey) {
lock.lock();
try {
return recordsMap.getOrDefault(testingKey, Collections.emptyList());
return new ArrayList<>(recordsMap.getOrDefault(testingKey, Collections.emptyList()));
} finally {
lock.unlock();
}
Expand All @@ -49,7 +49,7 @@ public List<Record<Event>> get(final String testingKey) {
public List<Record<Event>> getAndClear(final String testingKey) {
lock.lock();
try {
final List<Record<Event>> records = recordsMap.getOrDefault(testingKey, Collections.emptyList());
final List<Record<Event>> records = get(testingKey);

recordsMap.remove(testingKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void testSimpleRelationalOperatorExpressionWithInValidLiteralType() {
@Test
void testSimpleRelationalOperatorExpressionWithJsonPointerTypeValidValue() {
final String testKey = "testKey";
final int testValue = random.nextInt(1000);
final int testValue = random.nextInt(1000) + 2;
final Map<String, Integer> data = Map.of(testKey, testValue);
final Event testEvent = createTestEvent(data);
final String greaterThanStatement = String.format(" /%s > %d", testKey, testValue - 1);
Expand All @@ -207,7 +207,7 @@ void testSimpleRelationalOperatorExpressionWithJsonPointerTypeValidValue() {
}

@Test
void testSimpleRelationalOperatorExpressionWithJsonPointerTypeInValidValue() {
void testSimpleRelationalOperatorExpressionWithJsonPointerTypeInValidValueWithPositiveInteger() {
final String testKey = "testKey";
final boolean testValue = true;
final Map<String, Boolean> data = Map.of(testKey, testValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.plugins.processor.anomalydetector;

import io.micrometer.core.instrument.Counter;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand All @@ -23,19 +23,20 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

@DataPrepperPlugin(name = "anomaly_detector", pluginType = Processor.class, pluginConfigurationType = AnomalyDetectorProcessorConfig.class)
public class AnomalyDetectorProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
public static final String DEVIATION_KEY = "deviation_from_expected";
public static final String GRADE_KEY = "grade";
static final String NUMBER_RCF_INSTANCES = "numberRCFInstances";
static final String NUMBER_RCF_INSTANCES = "RCFInstances";

private final Boolean verbose;
private final IdentificationKeysHasher identificationKeysHasher;
private final Counter numberRCFInstances;
private final List<String> keys;
private final PluginFactory pluginFactory;
private final HashMap<Integer, AnomalyDetectorMode> forestMap;
private final AtomicInteger cardinality;
private final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig;

@DataPrepperPluginConstructor
Expand All @@ -44,9 +45,9 @@ public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDete
this.identificationKeysHasher = new IdentificationKeysHasher(anomalyDetectorProcessorConfig.getIdentificationKeys());
this.anomalyDetectorProcessorConfig = anomalyDetectorProcessorConfig;
this.pluginFactory = pluginFactory;
this.numberRCFInstances = pluginMetrics.counter(NUMBER_RCF_INSTANCES);
this.keys = anomalyDetectorProcessorConfig.getKeys();
this.verbose = anomalyDetectorProcessorConfig.getVerbose();
this.cardinality = pluginMetrics.gauge(NUMBER_RCF_INSTANCES, new AtomicInteger());
forestMap = new HashMap<>();
}

Expand All @@ -71,10 +72,10 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
forest = loadAnomalyDetectorMode(pluginFactory);
forest.initialize(keys, verbose);
forestMap.put(identificationKeysMap.hashCode(), forest);
this.numberRCFInstances.increment();
}
recordsOut.addAll(forestMap.get(identificationKeysMap.hashCode()).handleEvents(List.of(record)));
}
cardinality.set(forestMap.size());
return recordsOut;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.Collections;
import java.util.List;

public class AnomalyDetectorProcessorConfig {
Expand All @@ -22,7 +23,7 @@ public class AnomalyDetectorProcessorConfig {
private List<String> keys;

@JsonProperty("identification_keys")
private List<String> identificationKeys;
private List<String> identificationKeys = Collections.emptyList();

@JsonProperty("verbose")
private Boolean verbose = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -48,7 +49,7 @@ public class AnomalyDetectorProcessorTests {
@Mock
private PluginMetrics pluginMetrics;
@Mock
private Counter numberRCFInstances;
private AtomicInteger numberRCFInstances;
@Mock
private Counter recordsIn;

Expand Down Expand Up @@ -81,7 +82,7 @@ void setUp() {
when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class)))
.thenAnswer(invocation -> new RandomCutForestMode(randomCutForestModeConfig));

when(pluginMetrics.counter(AnomalyDetectorProcessor.NUMBER_RCF_INSTANCES)).thenReturn(numberRCFInstances);
when(pluginMetrics.gauge(eq(AnomalyDetectorProcessor.NUMBER_RCF_INSTANCES), any())).thenReturn(numberRCFInstances);
when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(recordsIn);
when(pluginMetrics.counter(MetricNames.RECORDS_OUT)).thenReturn(recordsOut);
when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(timeElapsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;
import org.mockito.Mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -205,6 +208,7 @@ void testRandomCutForestModeVerboseFalse() {
}

final List<Record<Event>> anomalyRecords = randomCutForestMode.handleEvents(recordsWithAnomaly).stream().collect(toList());;
assertThat(anomalyRecords.size(), equalTo(1));
// Due to inherent variance in the RCF algorithm, 1-3 anomalies will be detected after the level shift.
assertThat(anomalyRecords.size(), both(greaterThanOrEqualTo(1)).and(lessThanOrEqualTo(3)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.dataprepper.plugins.dlq.s3;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand All @@ -33,9 +33,11 @@ public class S3DlqWriterConfig {
private static final String DEFAULT_AWS_REGION = "us-east-1";
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";
private static final String S3_PREFIX = "s3://";

@JsonProperty("bucket")
@NotNull
@Size(min = 3, max = 63, message = "bucket lengthy should be between 3 and 63 characters")
@NotEmpty
@Size(min = 3, max = 500, message = "bucket length should be at least 3 characters")
private String bucket;

@JsonProperty("key_path_prefix")
Expand All @@ -55,6 +57,9 @@ public class S3DlqWriterConfig {
private String stsExternalId;

public String getBucket() {
if (bucket.startsWith(S3_PREFIX)) {
return bucket.substring(S3_PREFIX.length());
}
return bucket;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import software.amazon.awssdk.regions.Region;
Expand Down Expand Up @@ -42,6 +43,14 @@ public void getS3ClientWithInvalidStsRoleArnThrowException(final String stsRoleA
assertThrows(IllegalArgumentException.class, config::getS3Client);
}

@ParameterizedTest
@CsvSource({"bucket-name, bucket-name", "s3://bucket-name, bucket-name"})
public void getS3BucketNameShouldReturnCorrectBucketName(final String bucketName, final String expectedBucketName) throws NoSuchFieldException, IllegalAccessException {
final S3DlqWriterConfig config = new S3DlqWriterConfig();
reflectivelySetField(config, "bucket", bucketName);
assertThat(config.getBucket(), is(equalTo(expectedBucketName)));
}

@ParameterizedTest
@NullSource
@ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;

import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;

/**
* * A helper class that helps to read consumer configuration values from
* pipelines.yaml
Expand All @@ -22,8 +22,7 @@ public class TopicConfig {
static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final int DEFAULT_MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
static final String DEFAULT_AUTO_OFFSET_RESET = "latest";

static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5);
static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(5);
Expand Down Expand Up @@ -70,7 +69,7 @@ public class TopicConfig {
private Duration maxRetryDelay = DEFAULT_MAX_RETRY_DELAY;

@JsonProperty("serde_format")
private MessageFormat serdeFormat = MessageFormat.PLAINTEXT;
private MessageFormat serdeFormat= MessageFormat.PLAINTEXT;

@JsonProperty("auto_commit")
private Boolean autoCommit = DEFAULT_AUTO_COMMIT;
Expand Down Expand Up @@ -140,7 +139,7 @@ public class TopicConfig {
@JsonProperty("heart_beat_interval")
@Valid
@Size(min = 1)
private Duration heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL_DURATION;
private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION;

@JsonProperty("is_create")
private Boolean isCreate=Boolean.FALSE;
Expand Down Expand Up @@ -303,7 +302,6 @@ public void setName(String name) {
this.name = name;
}


public KafkaKeyMode getKafkaKeyMode() {
return kafkaKeyMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public <T> void consumeRecords() throws Exception {
Thread.sleep(10000);
} catch (RecordDeserializationException e) {
LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record",
e.topicPartition().topic(), e.topicPartition().partition(), e.offset());
e.topicPartition().topic(), e.topicPartition().partition(), e.offset(), e);
topicMetrics.getNumberOfDeserializationErrors().increment();
consumer.seek(e.topicPartition(), e.offset()+1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public void produceRecords(final Record<Event> record) {
Event event = getEvent(record);
final String key = event.formatString(kafkaSinkConfig.getPartitionKey(), expressionEvaluator);
try {
if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) {
if (serdeFormat == MessageFormat.JSON.toString()) {
publishJsonMessage(record, key);
} else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) {
} else if (serdeFormat == MessageFormat.AVRO.toString()) {
publishAvroMessage(record, key);
} else {
publishPlaintextMessage(record, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,6 +98,8 @@ public class KafkaSource implements Source<Record<Event>> {
private static final String SCHEMA_TYPE = "schemaType";
private final AcknowledgementSetManager acknowledgementSetManager;
private static CachedSchemaRegistryClient schemaRegistryClient;
private GlueSchemaRegistryKafkaDeserializer glueDeserializer;
private StringDeserializer stringDeserializer;

@DataPrepperPluginConstructor
public KafkaSource(final KafkaSourceConfig sourceConfig,
Expand All @@ -110,6 +110,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pipelineName = pipelineDescription.getPipelineName();
this.stringDeserializer = new StringDeserializer();
shutdownInProgress = new AtomicBoolean(false);
}

Expand All @@ -135,7 +136,12 @@ public void start(Buffer<Record<Event>> buffer) {
break;
case PLAINTEXT:
default:
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig);
if (Objects.nonNull(glueDeserializer)) {
kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer);
} else {
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
}
break;
}
consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics);
Expand Down Expand Up @@ -296,7 +302,6 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi
}

if (schemaConfig.getType() == SchemaRegistryType.AWS_GLUE) {
setPropertiesForGlueSchemaRegistry(properties);
return;
}

Expand All @@ -309,13 +314,6 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi
}
}

private void setPropertiesForGlueSchemaRegistry(Properties properties) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion());
properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
}

private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, final TopicConfig topicConfig) {
MessageFormat dataFormat = topicConfig.getSerdeFormat();
schemaType = dataFormat.toString();
Expand Down
Loading

0 comments on commit 6050fa6

Please sign in to comment.