Skip to content

Commit

Permalink
Adds a new integration test for the Kafka buffer which verifies that …
Browse files Browse the repository at this point in the history
…data written is correctly read and decrypted. This work will be used to verify upcoming changes to the Protobuf model. (opensearch-project#3973)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Jan 17, 2024
1 parent 334239b commit 41eab73
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -27,6 +28,8 @@
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -283,16 +286,19 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep

@Nested
class Encrypted {
private Cipher cipher;
private Cipher decryptCipher;
private Cipher encryptCipher;

@BeforeEach
void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException {
final KeyGenerator aesKeyGenerator = KeyGenerator.getInstance("AES");
aesKeyGenerator.init(256);
final SecretKey secretKey = aesKeyGenerator.generateKey();

cipher = Cipher.getInstance("AES");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
decryptCipher = Cipher.getInstance("AES");
decryptCipher.init(Cipher.DECRYPT_MODE, secretKey);
encryptCipher = Cipher.getInstance("AES");
encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey);
final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded());
final String aesKey = new String(base64Bytes);

Expand Down Expand Up @@ -359,7 +365,7 @@ void write_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() throws T
assertThat(innerData, notNullValue());
assertThrows(JsonParseException.class, () -> objectMapper.readValue(innerData, Map.class));

final byte[] deserializedBytes = cipher.doFinal(innerData);
final byte[] deserializedBytes = decryptCipher.doFinal(innerData);

final Map<String, Object> actualEventData = objectMapper.readValue(deserializedBytes, Map.class);
assertThat(actualEventData, notNullValue());
Expand Down Expand Up @@ -401,10 +407,47 @@ void writeBytes_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() thr
assertThat(innerData, notNullValue());
assertThat(innerData, not(equalTo(writtenBytes)));

final byte[] decryptedBytes = cipher.doFinal(innerData);
final byte[] decryptedBytes = decryptCipher.doFinal(innerData);

assertThat(decryptedBytes, equalTo(writtenBytes));
}

@Test
void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeException, BadPaddingException {
final KafkaBuffer objectUnderTest = createObjectUnderTest();
final TestProducer testProducer = new TestProducer(bootstrapServersCommaDelimited, topicName);

final Record<Event> record = createRecord();
final byte[] unencryptedBytes = record.getData().toJsonString().getBytes();
final byte[] encryptedBytes = encryptCipher.doFinal(unencryptedBytes);

final KafkaBufferMessage.BufferData bufferedData = KafkaBufferMessage.BufferData.newBuilder()
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)
.setData(ByteString.copyFrom(encryptedBytes))
.build();

final byte[] unencryptedKeyBytes = createRandomBytes();
final byte[] encryptedKeyBytes = encryptCipher.doFinal(unencryptedKeyBytes);

final KafkaBufferMessage.BufferData keyData = KafkaBufferMessage.BufferData.newBuilder()
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)
.setData(ByteString.copyFrom(encryptedKeyBytes))
.build();

testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray());

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(1));

final Record<Event> onlyResult = readResult.getKey().stream().iterator().next();

assertThat(onlyResult, notNullValue());
assertThat(onlyResult.getData(), notNullValue());
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}
}

private byte[] createRandomBytes() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.util;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.util.Properties;
import java.util.UUID;

/**
* Utility class to produce data to Kafka to help with testing.
*/
public class TestProducer {
private final Producer<byte[], byte[]> kafkaProducer;
private final String topicName;

public TestProducer(final String bootstrapServersCommaDelimited, final String topicName) {
this.topicName = topicName;
final String testGroupId = UUID.randomUUID().toString();
final Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersCommaDelimited);
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId);
kafkaProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProducer = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer());
kafkaProducer.flush();
}

/**
* Publishes a single record to Kafka.
*
* @param key The key as a byte[]
* @param value The value as a byte[]
*/
public void publishRecord(final byte[] key, final byte[] value) {
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topicName, key, value);
kafkaProducer.send(producerRecord);
kafkaProducer.flush();
}
}

0 comments on commit 41eab73

Please sign in to comment.