Skip to content

Commit

Permalink
Write to root when destination is set to null; add overwrite option (o…
Browse files Browse the repository at this point in the history
  • Loading branch information
oeyh authored Sep 22, 2023
1 parent 542b451 commit dc10c2f
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 14 deletions.
4 changes: 3 additions & 1 deletion data-prepper-plugins/key-value-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ When run, the processor will parse the message into the following output:
## Configuration
* `source` - The field in the message that will be parsed.
* Default: `message`
* `destination` - The field the parsed source will be output to. This will overwrite any preexisting data for that key.
* `destination` - The field the parsed source will be output to. This will overwrite any preexisting data for that key. If `destination` is set to `null`, the parsed fields will be written to the root of the event.
* Default: `parsed_message`
* `field_delimiter_regex` - A regex specifying the delimiter between key/value pairs. Special regex characters such as `[` and `]` must be escaped using `\\`.
* There is no default.
Expand Down Expand Up @@ -98,6 +98,8 @@ When run, the processor will parse the message into the following output:
* While `recursive` is `true`, `remove_brackets` cannot also be `true`.
* While `recursive` is `true`, `skip_duplicate_values` will always be `true`.
* While `recursive` is `true`, `whitespace` will always be `"strict"`.
* `overwrite_if_destination_exists` - Specify whether to overwrite existing fields if there are key conflicts when writing parsed fields to the event.
* Default: `true`

## Developer Guide
This plugin is compatible with Java 14. See
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,32 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

final Map<String, Object> processedMap = executeConfigs(outputMap);

recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap);
if (Objects.isNull(keyValueProcessorConfig.getDestination())) {
writeToRoot(recordEvent, processedMap);
} else {
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() ||
!recordEvent.containsKey(keyValueProcessorConfig.getDestination())) {
recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap);
}
}
}

return records;
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}

private ObjectNode recurse(final String input, final ObjectMapper mapper) {
Stack<Character> bracketStack = new Stack<Character>();
Map<Character, Character> bracketMap = initBracketMap();
Expand Down Expand Up @@ -495,16 +515,11 @@ private void addKeyValueToMap(final Map<String, Object> parsedMap, final String
}
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || !event.containsKey(entry.getKey())) {
event.put(entry.getKey(), entry.getValue());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class KeyValueProcessorConfig {
@NotEmpty
private String source = DEFAULT_SOURCE;

@NotEmpty
private String destination = DEFAULT_DESTINATION;

@JsonProperty("field_delimiter_regex")
Expand Down Expand Up @@ -96,6 +95,9 @@ public class KeyValueProcessorConfig {
@NotNull
private boolean recursive = DEFAULT_RECURSIVE;

@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

public String getSource() {
return source;
}
Expand Down Expand Up @@ -167,4 +169,8 @@ public boolean getRemoveBrackets() {
public boolean getRecursive() {
return recursive;
}

public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ void setup() {
lenient().when(mockConfig.getSkipDuplicateValues()).thenReturn(defaultConfig.getSkipDuplicateValues());
lenient().when(mockConfig.getRemoveBrackets()).thenReturn(defaultConfig.getRemoveBrackets());
lenient().when(mockConfig.getRecursive()).thenReturn(defaultConfig.getRecursive());
lenient().when(mockConfig.getOverwriteIfDestinationExists()).thenReturn(defaultConfig.getOverwriteIfDestinationExists());

keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);
}
Expand All @@ -97,6 +98,76 @@ void testMultipleKvToObjectKeyValueProcessor() {
assertThatKeyEquals(parsed_message, "key2", "value2");
}

@Test
void testWriteToRoot() {
when(mockConfig.getDestination()).thenReturn(null);
final Record<Event> record = getMessage("key1=value1&key2=value2");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));

final Event event = editedRecords.get(0).getData();
assertThat(event.containsKey("parsed_message"), is(false));

assertThat(event.containsKey("key1"), is(true));
assertThat(event.containsKey("key2"), is(true));
assertThat(event.get("key1", Object.class), is("value1"));
assertThat(event.get("key2", Object.class), is("value2"));
}

@Test
void testWriteToRootWithOverwrite() {
when(mockConfig.getDestination()).thenReturn(null);
final Record<Event> record = getMessage("key1=value1&key2=value2");
record.getData().put("key1", "value to be overwritten");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));

final Event event = editedRecords.get(0).getData();

assertThat(event.containsKey("key1"), is(true));
assertThat(event.containsKey("key2"), is(true));
assertThat(event.get("key1", Object.class), is("value1"));
assertThat(event.get("key2", Object.class), is("value2"));
}

@Test
void testWriteToDestinationWithOverwrite() {
final Record<Event> record = getMessage("key1=value1&key2=value2");
record.getData().put("parsed_message", "value to be overwritten");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(2));
assertThatKeyEquals(parsed_message, "key1", "value1");
assertThatKeyEquals(parsed_message, "key2", "value2");
}

@Test
void testWriteToRootWithOverwriteDisabled() {
when(mockConfig.getDestination()).thenReturn(null);
when(mockConfig.getOverwriteIfDestinationExists()).thenReturn(false);
final Record<Event> record = getMessage("key1=value1&key2=value2");
record.getData().put("key1", "value will not be overwritten");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));

final Event event = editedRecords.get(0).getData();

assertThat(event.containsKey("key1"), is(true));
assertThat(event.containsKey("key2"), is(true));
assertThat(event.get("key1", Object.class), is("value will not be overwritten"));
assertThat(event.get("key2", Object.class), is("value2"));
}

@Test
void testWriteToDestinationWithOverwriteDisabled() {
when(mockConfig.getOverwriteIfDestinationExists()).thenReturn(false);
final Record<Event> record = getMessage("key1=value1&key2=value2");
record.getData().put("parsed_message", "value will not be overwritten");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final Event event = editedRecords.get(0).getData();

assertThat(event.containsKey("parsed_message"), is(true));
assertThat(event.get("parsed_message", Object.class), is("value will not be overwritten"));
}

@Test
void testSingleRegexFieldDelimiterKvToObjectKeyValueProcessor() {
when(mockConfig.getFieldDelimiterRegex()).thenReturn(":_*:");
Expand Down

0 comments on commit dc10c2f

Please sign in to comment.