Skip to content

Commit

Permalink
Support input codecs in the file source. Resolves opensearch-project#…
Browse files Browse the repository at this point in the history
…4018. (opensearch-project#4019)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Feb 9, 2024
1 parent bd9ae27 commit 9c4dd59
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@

package org.opensearch.dataprepper.plugins.source.file;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand All @@ -34,13 +38,13 @@

@DataPrepperPlugin(name = "file", pluginType = Source.class, pluginConfigurationType = FileSourceConfig.class)
public class FileSource implements Source<Record<Object>> {

static final String MESSAGE_KEY = "message";
private static final Logger LOG = LoggerFactory.getLogger(FileSource.class);
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {};

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final FileSourceConfig fileSourceConfig;
private final FileStrategy fileStrategy;

private boolean isStopRequested;
private final int writeTimeout;
Expand All @@ -51,66 +55,113 @@ public FileSource(final FileSourceConfig fileSourceConfig, final PluginMetrics p
this.fileSourceConfig = fileSourceConfig;
this.isStopRequested = false;
this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT;

if(fileSourceConfig.getCodec() != null) {
fileStrategy = new CodecFileStrategy(pluginFactory);
} else {
fileStrategy = new ClassicFileStrategy();
}
}


@Override
public void start(final Buffer<Record<Object>> buffer) {
checkNotNull(buffer, "Buffer cannot be null for file source to start");
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) {
String line;
while ((line = reader.readLine()) != null && !isStopRequested) {
writeLineAsEventOrString(line, buffer);
}
} catch (IOException | TimeoutException | IllegalArgumentException ex) {
LOG.error("Error processing the input file path [{}]", fileSourceConfig.getFilePathToRead(), ex);
throw new RuntimeException(format("Error processing the input file %s",
fileSourceConfig.getFilePathToRead()), ex);
}
fileStrategy.start(buffer);
}

@Override
public void stop() {
isStopRequested = true;
}

private Record<Object> getEventRecordFromLine(final String line) {
Map<String, Object> structuredLine = new HashMap<>();
private interface FileStrategy {
void start(final Buffer<Record<Object>> buffer);
}

switch(fileSourceConfig.getFormat()) {
case JSON:
structuredLine = parseJson(line);
break;
case PLAIN:
structuredLine.put(MESSAGE_KEY, line);
break;
private class ClassicFileStrategy implements FileStrategy {
@Override
public void start(Buffer<Record<Object>> buffer) {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) {
String line;
while ((line = reader.readLine()) != null && !isStopRequested) {
writeLineAsEventOrString(line, buffer);
}
} catch (IOException | TimeoutException | IllegalArgumentException ex) {
LOG.error("Error processing the input file path [{}]", fileSourceConfig.getFilePathToRead(), ex);
throw new RuntimeException(format("Error processing the input file %s",
fileSourceConfig.getFilePathToRead()), ex);
}
}

return new Record<>(JacksonEvent
.builder()
.withEventType(fileSourceConfig.getRecordType())
.withData(structuredLine)
.build());
}
private Record<Object> getEventRecordFromLine(final String line) {
Map<String, Object> structuredLine = new HashMap<>();

switch(fileSourceConfig.getFormat()) {
case JSON:
structuredLine = parseJson(line);
break;
case PLAIN:
structuredLine.put(MESSAGE_KEY, line);
break;
}

private Map<String, Object> parseJson(final String jsonString) {
try {
return OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE);
} catch (JsonProcessingException e) {
LOG.error(SENSITIVE, "Unable to parse json data [{}], assuming plain text", jsonString, e);
final Map<String, Object> plainMap = new HashMap<>();
plainMap.put(MESSAGE_KEY, jsonString);
return plainMap;
return new Record<>(JacksonEvent
.builder()
.withEventType(fileSourceConfig.getRecordType())
.withData(structuredLine)
.build());
}

private Map<String, Object> parseJson(final String jsonString) {
try {
return OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE);
} catch (JsonProcessingException e) {
LOG.error(SENSITIVE, "Unable to parse json data [{}], assuming plain text", jsonString, e);
final Map<String, Object> plainMap = new HashMap<>();
plainMap.put(MESSAGE_KEY, jsonString);
return plainMap;
}
}

// Temporary function to support both trace and log ingestion pipelines.
// TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546
private void writeLineAsEventOrString(final String line, final Buffer<Record<Object>> buffer) throws TimeoutException, IllegalArgumentException {
if (fileSourceConfig.getRecordType().equals(FileSourceConfig.EVENT_TYPE)) {
buffer.write(getEventRecordFromLine(line), writeTimeout);
} else if (fileSourceConfig.getRecordType().equals(FileSourceConfig.DEFAULT_TYPE)) {
buffer.write(new Record<>(line), writeTimeout);
}
}
}

// Temporary function to support both trace and log ingestion pipelines.
// TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546
private void writeLineAsEventOrString(final String line, final Buffer<Record<Object>> buffer) throws TimeoutException, IllegalArgumentException {
if (fileSourceConfig.getRecordType().equals(FileSourceConfig.EVENT_TYPE)) {
buffer.write(getEventRecordFromLine(line), writeTimeout);
} else if (fileSourceConfig.getRecordType().equals(FileSourceConfig.DEFAULT_TYPE)) {
buffer.write(new Record<>(line), writeTimeout);

private class CodecFileStrategy implements FileStrategy {

private final InputCodec codec;

CodecFileStrategy(final PluginFactory pluginFactory) {
final PluginModel codecConfiguration = fileSourceConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);

}

@Override
public void start(final Buffer<Record<Object>> buffer) {
try {
codec.parse(new FileInputStream(fileSourceConfig.getFilePathToRead()), eventRecord -> {
try {
buffer.write((Record) eventRecord, writeTimeout);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
});
} catch (final IOException e) {
throw new RuntimeException(e);
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.model.configuration.PluginModel;

import java.util.Objects;

Expand All @@ -30,6 +32,9 @@ public class FileSourceConfig {
@JsonProperty(ATTRIBUTE_TYPE)
private String recordType = DEFAULT_TYPE;

@JsonProperty("codec")
private PluginModel codec;

public String getFilePathToRead() {
return filePathToRead;
}
Expand All @@ -43,9 +48,18 @@ public String getRecordType() {
return recordType;
}

public PluginModel getCodec() {
return codec;
}

void validate() {
Objects.requireNonNull(filePathToRead, "File path is required");
Preconditions.checkArgument(recordType.equals(EVENT_TYPE) || recordType.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]");
Preconditions.checkArgument(format.equals(DEFAULT_FORMAT) || format.equals("json"), "Invalid file format. Options are [json] and [plain]");
}

@AssertTrue(message = "The file source requires recordType to be event when using a codec.")
boolean codeRequiresRecordTypeEvent() {
return codec == null || recordType.equals(EVENT_TYPE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.file;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.configuration.PluginModel;

import java.util.Collections;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

class FileSourceConfigTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@ParameterizedTest
@ValueSource(strings = {FileSourceConfig.EVENT_TYPE, FileSourceConfig.DEFAULT_FORMAT})
void codeRequiresRecordTypeEvent_returns_true_if_no_codec(final String recordType) {
final Map<String, String> fileConfigMap = Map.of(FileSourceConfig.ATTRIBUTE_TYPE, recordType);
final FileSourceConfig objectUnderTest = OBJECT_MAPPER.convertValue(fileConfigMap, FileSourceConfig.class);

assertThat(objectUnderTest.codeRequiresRecordTypeEvent(), equalTo(true));
}

@ParameterizedTest
@CsvSource({
FileSourceConfig.EVENT_TYPE + ",true",
FileSourceConfig.DEFAULT_FORMAT + ",false"
})
void codeRequiresRecordTypeEvent_returns_expected_value_when_there_is_a_codec(final String recordType, final boolean expected) {
final Map<String, Object> fileConfigMap = Map.of(
FileSourceConfig.ATTRIBUTE_TYPE, recordType,
"codec", new PluginModel("fake_codec", Collections.emptyMap())
);
final FileSourceConfig objectUnderTest = OBJECT_MAPPER.convertValue(fileConfigMap, FileSourceConfig.class);

assertThat(objectUnderTest.codeRequiresRecordTypeEvent(), equalTo(expected));
}
}
Loading

0 comments on commit 9c4dd59

Please sign in to comment.