From 9c4dd590afd3b5d006f4f8b53400b43d8b095846 Mon Sep 17 00:00:00 2001 From: David Venable Date: Fri, 9 Feb 2024 15:16:46 -0800 Subject: [PATCH] Support input codecs in the file source. Resolves #4018. (#4019) Signed-off-by: David Venable --- .../plugins/source/file/FileSource.java | 139 ++++--- .../plugins/source/file/FileSourceConfig.java | 14 + .../source/file/FileSourceConfigTest.java | 46 +++ .../plugins/source/file/FileSourceTests.java | 373 +++++++++++------- 4 files changed, 387 insertions(+), 185 deletions(-) create mode 100644 data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfigTest.java diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java index 2cc44732fa..d658bdf897 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java @@ -5,21 +5,25 @@ package org.opensearch.dataprepper.plugins.source.file; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.FileInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -34,13 +38,13 @@ @DataPrepperPlugin(name = "file", pluginType = Source.class, pluginConfigurationType = FileSourceConfig.class) public class FileSource implements Source> { - static final String MESSAGE_KEY = "message"; private static final Logger LOG = LoggerFactory.getLogger(FileSource.class); private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() {}; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final FileSourceConfig fileSourceConfig; + private final FileStrategy fileStrategy; private boolean isStopRequested; private final int writeTimeout; @@ -51,22 +55,19 @@ public FileSource(final FileSourceConfig fileSourceConfig, final PluginMetrics p this.fileSourceConfig = fileSourceConfig; this.isStopRequested = false; this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT; + + if(fileSourceConfig.getCodec() != null) { + fileStrategy = new CodecFileStrategy(pluginFactory); + } else { + fileStrategy = new ClassicFileStrategy(); + } } @Override public void start(final Buffer> buffer) { checkNotNull(buffer, "Buffer cannot be null for file source to start"); - try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) { - String line; - while ((line = reader.readLine()) != null && !isStopRequested) { - writeLineAsEventOrString(line, buffer); - } - } catch (IOException | TimeoutException | IllegalArgumentException ex) { - LOG.error("Error processing the input file path [{}]", fileSourceConfig.getFilePathToRead(), ex); - throw new RuntimeException(format("Error processing the input file %s", - fileSourceConfig.getFilePathToRead()), ex); - } + fileStrategy.start(buffer); } @Override @@ -74,43 +75,93 @@ public void stop() { isStopRequested = true; } - private Record getEventRecordFromLine(final String line) { - Map structuredLine = new HashMap<>(); + private interface FileStrategy { + void start(final Buffer> buffer); + } - switch(fileSourceConfig.getFormat()) { - case JSON: - structuredLine = parseJson(line); - break; - case PLAIN: - structuredLine.put(MESSAGE_KEY, line); - break; + private class ClassicFileStrategy implements FileStrategy { + @Override + public void start(Buffer> buffer) { + try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) { + String line; + while ((line = reader.readLine()) != null && !isStopRequested) { + writeLineAsEventOrString(line, buffer); + } + } catch (IOException | TimeoutException | IllegalArgumentException ex) { + LOG.error("Error processing the input file path [{}]", fileSourceConfig.getFilePathToRead(), ex); + throw new RuntimeException(format("Error processing the input file %s", + fileSourceConfig.getFilePathToRead()), ex); + } } - return new Record<>(JacksonEvent - .builder() - .withEventType(fileSourceConfig.getRecordType()) - .withData(structuredLine) - .build()); - } + private Record getEventRecordFromLine(final String line) { + Map structuredLine = new HashMap<>(); + + switch(fileSourceConfig.getFormat()) { + case JSON: + structuredLine = parseJson(line); + break; + case PLAIN: + structuredLine.put(MESSAGE_KEY, line); + break; + } - private Map parseJson(final String jsonString) { - try { - return OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE); - } catch (JsonProcessingException e) { - LOG.error(SENSITIVE, "Unable to parse json data [{}], assuming plain text", jsonString, e); - final Map plainMap = new HashMap<>(); - plainMap.put(MESSAGE_KEY, jsonString); - return plainMap; + return new Record<>(JacksonEvent + .builder() + .withEventType(fileSourceConfig.getRecordType()) + .withData(structuredLine) + .build()); + } + + private Map parseJson(final String jsonString) { + try { + return OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE); + } catch (JsonProcessingException e) { + LOG.error(SENSITIVE, "Unable to parse json data [{}], assuming plain text", jsonString, e); + final Map plainMap = new HashMap<>(); + plainMap.put(MESSAGE_KEY, jsonString); + return plainMap; + } + } + + // Temporary function to support both trace and log ingestion pipelines. + // TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 + private void writeLineAsEventOrString(final String line, final Buffer> buffer) throws TimeoutException, IllegalArgumentException { + if (fileSourceConfig.getRecordType().equals(FileSourceConfig.EVENT_TYPE)) { + buffer.write(getEventRecordFromLine(line), writeTimeout); + } else if (fileSourceConfig.getRecordType().equals(FileSourceConfig.DEFAULT_TYPE)) { + buffer.write(new Record<>(line), writeTimeout); + } } } - // Temporary function to support both trace and log ingestion pipelines. - // TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 - private void writeLineAsEventOrString(final String line, final Buffer> buffer) throws TimeoutException, IllegalArgumentException { - if (fileSourceConfig.getRecordType().equals(FileSourceConfig.EVENT_TYPE)) { - buffer.write(getEventRecordFromLine(line), writeTimeout); - } else if (fileSourceConfig.getRecordType().equals(FileSourceConfig.DEFAULT_TYPE)) { - buffer.write(new Record<>(line), writeTimeout); + + private class CodecFileStrategy implements FileStrategy { + + private final InputCodec codec; + + CodecFileStrategy(final PluginFactory pluginFactory) { + final PluginModel codecConfiguration = fileSourceConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); + + } + + @Override + public void start(final Buffer> buffer) { + try { + codec.parse(new FileInputStream(fileSourceConfig.getFilePathToRead()), eventRecord -> { + try { + buffer.write((Record) eventRecord, writeTimeout); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + }); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } } + } \ No newline at end of file diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java index ba1cd06674..255857a4bb 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java @@ -8,6 +8,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import jakarta.validation.constraints.AssertTrue; +import org.opensearch.dataprepper.model.configuration.PluginModel; import java.util.Objects; @@ -30,6 +32,9 @@ public class FileSourceConfig { @JsonProperty(ATTRIBUTE_TYPE) private String recordType = DEFAULT_TYPE; + @JsonProperty("codec") + private PluginModel codec; + public String getFilePathToRead() { return filePathToRead; } @@ -43,9 +48,18 @@ public String getRecordType() { return recordType; } + public PluginModel getCodec() { + return codec; + } + void validate() { Objects.requireNonNull(filePathToRead, "File path is required"); Preconditions.checkArgument(recordType.equals(EVENT_TYPE) || recordType.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]"); Preconditions.checkArgument(format.equals(DEFAULT_FORMAT) || format.equals("json"), "Invalid file format. Options are [json] and [plain]"); } + + @AssertTrue(message = "The file source requires recordType to be event when using a codec.") + boolean codeRequiresRecordTypeEvent() { + return codec == null || recordType.equals(EVENT_TYPE); + } } diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfigTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfigTest.java new file mode 100644 index 0000000000..9208c52b66 --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfigTest.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.file; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.configuration.PluginModel; + +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +class FileSourceConfigTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @ParameterizedTest + @ValueSource(strings = {FileSourceConfig.EVENT_TYPE, FileSourceConfig.DEFAULT_FORMAT}) + void codeRequiresRecordTypeEvent_returns_true_if_no_codec(final String recordType) { + final Map fileConfigMap = Map.of(FileSourceConfig.ATTRIBUTE_TYPE, recordType); + final FileSourceConfig objectUnderTest = OBJECT_MAPPER.convertValue(fileConfigMap, FileSourceConfig.class); + + assertThat(objectUnderTest.codeRequiresRecordTypeEvent(), equalTo(true)); + } + + @ParameterizedTest + @CsvSource({ + FileSourceConfig.EVENT_TYPE + ",true", + FileSourceConfig.DEFAULT_FORMAT + ",false" + }) + void codeRequiresRecordTypeEvent_returns_expected_value_when_there_is_a_codec(final String recordType, final boolean expected) { + final Map fileConfigMap = Map.of( + FileSourceConfig.ATTRIBUTE_TYPE, recordType, + "codec", new PluginModel("fake_codec", Collections.emptyMap()) + ); + final FileSourceConfig objectUnderTest = OBJECT_MAPPER.convertValue(fileConfigMap, FileSourceConfig.class); + + assertThat(objectUnderTest.codeRequiresRecordTypeEvent(), equalTo(expected)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java index 4cc796e47f..6bc886b52e 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java @@ -5,8 +5,11 @@ package org.opensearch.dataprepper.plugins.source.file; +import org.junit.jupiter.api.Nested; +import org.mockito.ArgumentCaptor; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -24,31 +27,40 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class FileSourceTests { private static final Logger LOG = LoggerFactory.getLogger(FileSourceTests.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() {}; - - private static final String TEST_PIPELINE_NAME = "pipeline"; + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() { + }; private static final String TEST_FILE_PATH_PLAIN = "src/test/resources/test-file-source-plain.tst"; - private static final String TEST_FILE_PATH_JSON = "src/test/resources/test-file-source-json.tst"; - private static final String TEST_FILE_PATH_INVALID_JSON = "src/test/resources/test-file-source-invalid-json.tst"; - private static final String FILE_DOES_NOT_EXIST = "file_does_not_exist"; private FileSourceConfig fileSourceConfig; - private FileSource fileSource; + + private Map pluginSettings; @Mock private PluginMetrics pluginMetrics; @@ -56,184 +68,263 @@ public class FileSourceTests { @Mock private PluginFactory pluginFactory; - private Buffer> buffer; - - private Map pluginSettings; - - private List> expectedEventsPlain; - private List> expectedEventsJson; - private List> expectedEventsInvalidJson; - - @BeforeEach - public void setup() { + void setUp() { pluginSettings = new HashMap<>(); - expectedEventsPlain = new ArrayList<>(); - expectedEventsJson = new ArrayList<>(); - expectedEventsInvalidJson = new ArrayList<>(); pluginSettings.put(FileSourceConfig.ATTRIBUTE_TYPE, FileSourceConfig.EVENT_TYPE); pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, TEST_FILE_PATH_PLAIN); + } - // plain - final String expectedPlainFirstLine = "THIS IS A PLAINTEXT LINE"; - final String expectedPlainSecondLine = "THIS IS ANOTHER PLAINTEXT LINE"; + private FileSource createObjectUnderTest() { + fileSourceConfig = OBJECT_MAPPER.convertValue(pluginSettings, FileSourceConfig.class); + return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory); + } - final Record firstEventPlain = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedPlainFirstLine); - final Record secondEventPlain = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedPlainSecondLine); + @Nested + class WithRecord { + private static final String TEST_PIPELINE_NAME = "pipeline"; + private static final String TEST_FILE_PATH_JSON = "src/test/resources/test-file-source-json.tst"; + private static final String TEST_FILE_PATH_INVALID_JSON = "src/test/resources/test-file-source-invalid-json.tst"; + private static final String FILE_DOES_NOT_EXIST = "file_does_not_exist"; - expectedEventsPlain.add(firstEventPlain); - expectedEventsPlain.add(secondEventPlain); + private FileSource fileSource; - //json - final Record firstEventJson = createRecordEventWithKeyValuePair("test_key", "test_value"); - final Record secondEventJson = createRecordEventWithKeyValuePair("second_test_key", "second_test_value"); + private Buffer> buffer; - expectedEventsJson.add(firstEventJson); - expectedEventsJson.add(secondEventJson); + private List> expectedEventsPlain; + private List> expectedEventsJson; + private List> expectedEventsInvalidJson; - // invalid json - final String expectedInvalidJsonFirstLine = "{\"test_key: test_value\"}"; - final String expectedInvalidJsonSecondLine = "{\"second_test_key\": \"second_test_value\""; + @BeforeEach + public void setup() { + expectedEventsPlain = new ArrayList<>(); + expectedEventsJson = new ArrayList<>(); + expectedEventsInvalidJson = new ArrayList<>(); - final Record firstEventInvalidJson = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedInvalidJsonFirstLine); - final Record secondEventInvalidJson = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedInvalidJsonSecondLine); + // plain + final String expectedPlainFirstLine = "THIS IS A PLAINTEXT LINE"; + final String expectedPlainSecondLine = "THIS IS ANOTHER PLAINTEXT LINE"; - expectedEventsInvalidJson.add(firstEventInvalidJson); - expectedEventsInvalidJson.add(secondEventInvalidJson); + final Record firstEventPlain = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedPlainFirstLine); + final Record secondEventPlain = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedPlainSecondLine); + expectedEventsPlain.add(firstEventPlain); + expectedEventsPlain.add(secondEventPlain); + //json + final Record firstEventJson = createRecordEventWithKeyValuePair("test_key", "test_value"); + final Record secondEventJson = createRecordEventWithKeyValuePair("second_test_key", "second_test_value"); - buffer = getBuffer(); - } + expectedEventsJson.add(firstEventJson); + expectedEventsJson.add(secondEventJson); - private FileSource createObjectUnderTest() { - fileSourceConfig = OBJECT_MAPPER.convertValue(pluginSettings, FileSourceConfig.class); - return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory); - } + // invalid json + final String expectedInvalidJsonFirstLine = "{\"test_key: test_value\"}"; + final String expectedInvalidJsonSecondLine = "{\"second_test_key\": \"second_test_value\""; - private BlockingBuffer> getBuffer() { - final HashMap integerHashMap = new HashMap<>(); - integerHashMap.put("buffer_size", 2); - integerHashMap.put("batch_size", 2); - final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); - return new BlockingBuffer<>(pluginSetting); - } - @Test - public void testFileSourceWithEmptyFilePathThrowsRuntimeException() { - pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, ""); - fileSource = createObjectUnderTest(); - assertThrows(RuntimeException.class, () -> fileSource.start(buffer)); - } + final Record firstEventInvalidJson = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedInvalidJsonFirstLine); + final Record secondEventInvalidJson = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedInvalidJsonSecondLine); - @Test - public void testFileSourceWithNonexistentFilePathThrowsRuntimeException() { - pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, FILE_DOES_NOT_EXIST); - fileSource = createObjectUnderTest(); - assertThrows(RuntimeException.class, () -> fileSource.start(buffer)); - } + expectedEventsInvalidJson.add(firstEventInvalidJson); + expectedEventsInvalidJson.add(secondEventInvalidJson); - @Test - public void testFileSourceWithNullFilePathThrowsNullPointerException() { - pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, null); - assertThrows(NullPointerException.class, this::createObjectUnderTest); - } - @Test - public void testFileWithPlainTextAddsEventsToBufferCorrectly() { - fileSource = createObjectUnderTest(); - fileSource.start(buffer); + buffer = getBuffer(); + } - final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + private BlockingBuffer> getBuffer() { + final HashMap integerHashMap = new HashMap<>(); + integerHashMap.put("buffer_size", 2); + integerHashMap.put("batch_size", 2); + final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); + pluginSetting.setPipelineName(TEST_PIPELINE_NAME); + return new BlockingBuffer<>(pluginSetting); + } - assertThat(bufferEvents.size(), equalTo(expectedEventsPlain.size())); - assertExpectedRecordsAreEqual(expectedEventsPlain, bufferEvents); - } + @Test + public void testFileSourceWithEmptyFilePathThrowsRuntimeException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, ""); + fileSource = createObjectUnderTest(); + assertThrows(RuntimeException.class, () -> fileSource.start(buffer)); + } + + @Test + public void testFileSourceWithNonexistentFilePathThrowsRuntimeException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, FILE_DOES_NOT_EXIST); + fileSource = createObjectUnderTest(); + assertThrows(RuntimeException.class, () -> fileSource.start(buffer)); + } - @Test - public void testFileWithJSONAddsEventsToBufferCorrectly() { - pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, TEST_FILE_PATH_JSON); - pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "json"); + @Test + public void testFileSourceWithNullFilePathThrowsNullPointerException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, null); + assertThrows(NullPointerException.class, FileSourceTests.this::createObjectUnderTest); + } - fileSource = createObjectUnderTest(); - fileSource.start(buffer); + @Test + public void testFileWithPlainTextAddsEventsToBufferCorrectly() { + fileSource = createObjectUnderTest(); + fileSource.start(buffer); - final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); - assertThat(bufferEvents.size(), equalTo(expectedEventsJson.size())); - assertExpectedRecordsAreEqual(expectedEventsJson, bufferEvents); - } + assertThat(bufferEvents.size(), equalTo(expectedEventsPlain.size())); + assertExpectedRecordsAreEqual(expectedEventsPlain, bufferEvents); + } - @Test - public void testFileWithInvalidJSONAddsEventsToBufferAsPlainText() { - pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, TEST_FILE_PATH_INVALID_JSON); - pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "json"); - fileSource = createObjectUnderTest(); - fileSource.start(buffer); + @Test + public void testFileWithJSONAddsEventsToBufferCorrectly() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, TEST_FILE_PATH_JSON); + pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "json"); - final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + fileSource = createObjectUnderTest(); + fileSource.start(buffer); - assertThat(bufferEvents.size(), equalTo(expectedEventsInvalidJson.size())); - assertExpectedRecordsAreEqual(expectedEventsInvalidJson, bufferEvents); - } + final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); - @Test - public void testStringTypeAddsStringsToBufferCorrectly() { - pluginSettings.put(FileSourceConfig.ATTRIBUTE_TYPE, FileSourceConfig.DEFAULT_TYPE); - fileSource = createObjectUnderTest(); - fileSource.start(buffer); + assertThat(bufferEvents.size(), equalTo(expectedEventsJson.size())); + assertExpectedRecordsAreEqual(expectedEventsJson, bufferEvents); + } - final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + @Test + public void testFileWithInvalidJSONAddsEventsToBufferAsPlainText() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, TEST_FILE_PATH_INVALID_JSON); + pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "json"); + fileSource = createObjectUnderTest(); + fileSource.start(buffer); - assertThat(bufferEvents.size(), equalTo(expectedEventsPlain.size())); - assertThat(bufferEvents.get(0).getData(), equalTo("THIS IS A PLAINTEXT LINE")); - assertThat(bufferEvents.get(1).getData(), equalTo("THIS IS ANOTHER PLAINTEXT LINE")); + final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); - } + assertThat(bufferEvents.size(), equalTo(expectedEventsInvalidJson.size())); + assertExpectedRecordsAreEqual(expectedEventsInvalidJson, bufferEvents); + } - @Test - public void testNonSupportedFileFormatThrowsIllegalArgumentException() { - pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "unsupported"); - assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); - } + @Test + public void testStringTypeAddsStringsToBufferCorrectly() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_TYPE, FileSourceConfig.DEFAULT_TYPE); + fileSource = createObjectUnderTest(); + fileSource.start(buffer); - @Test - public void testNonSupportedFileTypeThrowsIllegalArgumentException() { - pluginSettings.put(FileSourceConfig.ATTRIBUTE_TYPE, "bad_type"); - assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); - } + final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + + assertThat(bufferEvents.size(), equalTo(expectedEventsPlain.size())); + assertThat(bufferEvents.get(0).getData(), equalTo("THIS IS A PLAINTEXT LINE")); + assertThat(bufferEvents.get(1).getData(), equalTo("THIS IS ANOTHER PLAINTEXT LINE")); - static void assertExpectedRecordsAreEqual(final List> expectedEvents, final List> actualEvents) { - for (int i = 0; i < expectedEvents.size(); i++) { - assertThat(actualEvents.get(i), notNullValue()); - assertThat(actualEvents.get(i).getData(), notNullValue()); - assertEventRecordsAreEqual(actualEvents.get(i), expectedEvents.get(i)); } - } - static void assertEventRecordsAreEqual(final Record first, final Record second) { - try { - final Event firstEvent = (Event) first.getData(); - final Event secondEvent = (Event) second.getData(); - final Map recordMapFirst = OBJECT_MAPPER.readValue(firstEvent.toJsonString(), MAP_TYPE_REFERENCE); - final Map recordMapSecond = OBJECT_MAPPER.readValue(secondEvent.toJsonString(), MAP_TYPE_REFERENCE); - assertThat(recordMapFirst, is(equalTo(recordMapSecond))); - } catch (JsonProcessingException e) { - LOG.error("Unable to parse Event as JSON"); + @Test + public void testNonSupportedFileFormatThrowsIllegalArgumentException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "unsupported"); + assertThrows(IllegalArgumentException.class, FileSourceTests.this::createObjectUnderTest); + } + + @Test + public void testNonSupportedFileTypeThrowsIllegalArgumentException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_TYPE, "bad_type"); + assertThrows(IllegalArgumentException.class, FileSourceTests.this::createObjectUnderTest); + } + + void assertExpectedRecordsAreEqual(final List> expectedEvents, final List> actualEvents) { + for (int i = 0; i < expectedEvents.size(); i++) { + assertThat(actualEvents.get(i), notNullValue()); + assertThat(actualEvents.get(i).getData(), notNullValue()); + assertEventRecordsAreEqual(actualEvents.get(i), expectedEvents.get(i)); + } + } + + void assertEventRecordsAreEqual(final Record first, final Record second) { + try { + final Event firstEvent = (Event) first.getData(); + final Event secondEvent = (Event) second.getData(); + final Map recordMapFirst = OBJECT_MAPPER.readValue(firstEvent.toJsonString(), MAP_TYPE_REFERENCE); + final Map recordMapSecond = OBJECT_MAPPER.readValue(secondEvent.toJsonString(), MAP_TYPE_REFERENCE); + assertThat(recordMapFirst, is(equalTo(recordMapSecond))); + } catch (JsonProcessingException e) { + LOG.error("Unable to parse Event as JSON"); + } + } + + private Record createRecordEventWithKeyValuePair(final String key, final String value) { + final Map eventData = new HashMap<>(); + eventData.put(key, value); + + return new Record<>(JacksonEvent + .builder() + .withEventType("event") + .withData(eventData) + .build()); } } - private Record createRecordEventWithKeyValuePair(final String key, final String value) { - final Map eventData = new HashMap<>(); - eventData.put(key, value); + @Nested + class WithCodec { + + @Mock + private InputCodec inputCodec; + + @Mock + private Buffer buffer; + + @BeforeEach + void setUp() { + Map codecConfiguration = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + Map> codecSettings = Map.of("fake_codec", codecConfiguration); + pluginSettings.put("codec", codecSettings); + + when(pluginFactory.loadPlugin(eq(InputCodec.class), any(PluginSetting.class))) + .thenReturn(inputCodec); + } + + @Test + void start_will_parse_codec_with_correct_inputStream() throws IOException { + createObjectUnderTest().start(buffer); + + final ArgumentCaptor inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class); + + verify(inputCodec).parse(inputStreamArgumentCaptor.capture(), any(Consumer.class)); + + final InputStream actualInputStream = inputStreamArgumentCaptor.getValue(); + + final byte[] actualBytes = actualInputStream.readAllBytes(); + final FileInputStream fileInputStream = new FileInputStream(TEST_FILE_PATH_PLAIN); + final byte[] expectedBytes = fileInputStream.readAllBytes(); + + assertThat(actualBytes, equalTo(expectedBytes)); + } + + @Test + void start_will_parse_codec_with_a_Consumer_that_writes_to_the_buffer() throws IOException, TimeoutException { + createObjectUnderTest().start(buffer); + + final ArgumentCaptor consumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(inputCodec).parse(any(InputStream.class), consumerArgumentCaptor.capture()); + + final Consumer> actualConsumer = consumerArgumentCaptor.getValue(); + + final Record record = mock(Record.class); + + actualConsumer.accept(record); + verify(buffer).write(record, FileSourceConfig.DEFAULT_TIMEOUT); + } + + @Test + void start_will_throw_exception_if_codec_throws() throws IOException, TimeoutException { + + final IOException mockedException = mock(IOException.class); + doThrow(mockedException) + .when(inputCodec).parse(any(InputStream.class), any(Consumer.class)); + + FileSource objectUnderTest = createObjectUnderTest(); + + RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.start(buffer)); + + assertThat(actualException.getCause(), equalTo(mockedException)); + } - return new Record<>(JacksonEvent - .builder() - .withEventType("event") - .withData(eventData) - .build()); } }