Skip to content

Commit

Permalink
Default keys (#3075)
Browse files Browse the repository at this point in the history
* readme and config

Signed-off-by: Kat Shen <[email protected]>

* skeleton logic written

Signed-off-by: Kat Shen <[email protected]>

* default keys impl and tests

Signed-off-by: Kat Shen <[email protected]>

* finish tests

Signed-off-by: Kat Shen <[email protected]>

* rerun checks

Signed-off-by: Kat Shen <[email protected]>

* change impl to have parity with logstash

Signed-off-by: Kat Shen <[email protected]>

* add clarifying example to readme, fix edge cases, add tests

Signed-off-by: Kat Shen <[email protected]>

* rename test for clarity

Signed-off-by: Kat Shen <[email protected]>

* change logging statements from string.format()

Signed-off-by: Kat Shen <[email protected]>

* fix default key check error

Signed-off-by: Kat Shen <[email protected]>

* change default config name to default_values, fix to have parity with logstash, enhance relevant tests

Signed-off-by: Kat Shen <[email protected]>

* rerun checks

Signed-off-by: Kat Shen <[email protected]>

* fix nits

Signed-off-by: Kat Shen <[email protected]>

* remove extraneous test

Signed-off-by: Kat Shen <[email protected]>

* clean up illegal argument statements, parameterize tests

Signed-off-by: Kat Shen <[email protected]>

---------

Signed-off-by: Kat Shen <[email protected]>
Co-authored-by: Kat Shen <[email protected]>
  • Loading branch information
shenkw1 and shenkw1 authored Aug 8, 2023
1 parent 5262eea commit 956e5ad
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 14 deletions.
7 changes: 7 additions & 0 deletions data-prepper-plugins/key-value-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ When run, the processor will parse the message into the following output:
* `exclude_keys` - An array specifying the parsed keys which should not be added to the event. By default no keys will be excluded.
* Default: `[]`
* Example: `exclude_keys` is `["key2"]`. `key1=value1&key2=value2` will parse into `{"key1": "value1"}`
* `default_values` - A hash specifying the default keys and their values which should be added to the event in case these keys do not exist in the source field being parsed.
* Default: `{}`
* Example: `default_values` is `{"defaultkey": "defaultvalue"}`. `key1=value1` will parse into `{"key1": "value1", "defaultkey": "defaultvalue"}`
* If the default key already exists in the message, the value is not changed.
* Example: `default_values` is `{"value1": "abc"}`. `key1=value1` will parse into `{"key1": "value1"}`
* It should be noted that the include_keys filter will be applied to the message first, and then default keys.
* Example: `include_keys` is `["key1"]`, and `default_keys` is `{"key2": "value2"}`. `key1=value1&key2=abc` will parse into `{"key1": "value1", "key2": "value2"}`
* `key_value_delimiter_regex` - A regex specifying the delimiter between a key and a value. Special regex characters such as `[` and `]` must be escaped using `\\`.
* There is no default.
* Note: This cannot be defined at the same time as `value_split_characters`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
private final Pattern keyValueDelimiterPattern;
private final Set<String> includeKeysSet = new HashSet<String>();
private final Set<String> excludeKeysSet = new HashSet<String>();
private final HashMap<String, Object> defaultValuesMap = new HashMap<>();
private final Set<String> defaultValuesSet = new HashSet<String>();
private final String lowercaseKey = "lowercase";
private final String uppercaseKey = "uppercase";
private final String capitalizeKey = "capitalize";
Expand Down Expand Up @@ -102,9 +104,13 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces

includeKeysSet.addAll(keyValueProcessorConfig.getIncludeKeys());
excludeKeysSet.addAll(keyValueProcessorConfig.getExcludeKeys());
defaultValuesMap.putAll(keyValueProcessorConfig.getDefaultValues());
if (!defaultValuesMap.isEmpty()) {
defaultValuesSet.addAll(defaultValuesMap.keySet());
}

validateKeySets(includeKeysSet, excludeKeysSet);

validateKeySets(includeKeysSet, excludeKeysSet, defaultValuesSet);
if (!validTransformOptionSet.contains(keyValueProcessorConfig.getTransformKey())) {
throw new IllegalArgumentException(String.format("The transform_key value: %s is not a valid option", keyValueProcessorConfig.getTransformKey()));
}
Expand Down Expand Up @@ -155,11 +161,18 @@ private boolean validateRegex(final String pattern)
return true;
}

private void validateKeySets(final Set<String> includeSet, final Set<String> excludeSet) {
Set<String> intersectionSet = new HashSet<String>(includeSet);
intersectionSet.retainAll(excludeSet);
if (!intersectionSet.isEmpty()) {
throw new IllegalArgumentException("Include keys and exclude keys set cannot have any overlap", null);
private void validateKeySets(final Set<String> includeSet, final Set<String> excludeSet, final Set<String> defaultSet) {
final Set<String> includeIntersectionSet = new HashSet<String>(includeSet);
final Set<String> defaultIntersectionSet = new HashSet<String>(defaultSet);

includeIntersectionSet.retainAll(excludeSet);
if (!includeIntersectionSet.isEmpty()) {
throw new IllegalArgumentException("Include keys and exclude keys set cannot have any overlap");
}

defaultIntersectionSet.retainAll(excludeSet);
if (!defaultIntersectionSet.isEmpty()) {
throw new IllegalArgumentException("Cannot exclude a default key!");
}
}

Expand All @@ -171,18 +184,19 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
final String[] groups = fieldDelimiterPattern.split(groupsRaw, 0);

for(final String group : groups) {
final String[] terms = keyValueDelimiterPattern.split(group, 2);
String key = terms[0];
Object value;

if (!includeKeysSet.isEmpty() && !includeKeysSet.contains(key)) {
LOG.debug(String.format("Skipping not included key: '%s'", key));
LOG.debug("Skipping not included key: '{}'", key);
continue;
}

if (!excludeKeysSet.isEmpty() && excludeKeysSet.contains(key)) {
LOG.debug(String.format("Key is being excluded: '%s'", key));
if (excludeKeysSet.contains(key)) {
LOG.debug("Key is being excluded: '{}'", key);
continue;
}

Expand All @@ -194,7 +208,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if (terms.length == 2) {
value = terms[1];
} else {
LOG.debug(String.format("Unsuccessful match: '%s'", terms[0]));
LOG.debug("Unsuccessful match: '{}'", terms[0]);
value = keyValueProcessorConfig.getNonMatchValue();
}

Expand Down Expand Up @@ -226,6 +240,14 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
addKeyValueToMap(parsedMap, key, value);
}

for (Map.Entry<String,Object> pair : defaultValuesMap.entrySet()) {
if (parsedMap.containsKey(pair.getKey())) {
LOG.debug("Skipping already included default key: '{}'", pair.getKey());
continue;
}
parsedMap.put(pair.getKey(), pair.getValue());
}

recordEvent.put(keyValueProcessorConfig.getDestination(), parsedMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class KeyValueProcessorConfig {
static final String DEFAULT_SOURCE = "message";
static final String DEFAULT_DESTINATION = "parsed_message";
public static final String DEFAULT_FIELD_SPLIT_CHARACTERS = "&";
static final List<String> DEFAULT_INCLUDE_KEYS = new ArrayList<>();
static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
static final Map<String, Object> DEFAULT_DEFAULT_VALUES = Map.of();
public static final String DEFAULT_VALUE_SPLIT_CHARACTERS = "=";
static final Object DEFAULT_NON_MATCH_VALUE = null;
static final String DEFAULT_PREFIX = "";
Expand Down Expand Up @@ -49,6 +51,10 @@ public class KeyValueProcessorConfig {
@NotNull
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

@JsonProperty("default_values")
@NotNull
private Map<String, Object> defaultValues = DEFAULT_DEFAULT_VALUES;

@JsonProperty("key_value_delimiter_regex")
private String keyValueDelimiterRegex;

Expand Down Expand Up @@ -109,6 +115,10 @@ public List<String> getExcludeKeys() {
return excludeKeys;
}

public Map<String, Object> getDefaultValues() {
return defaultValues;
}

public String getKeyValueDelimiterRegex() {
return keyValueDelimiterRegex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

Expand Down Expand Up @@ -58,6 +60,7 @@ void setup() {
lenient().when(mockConfig.getFieldSplitCharacters()).thenReturn(defaultConfig.getFieldSplitCharacters());
lenient().when(mockConfig.getIncludeKeys()).thenReturn(defaultConfig.getIncludeKeys());
lenient().when(mockConfig.getExcludeKeys()).thenReturn(defaultConfig.getExcludeKeys());
lenient().when(mockConfig.getDefaultValues()).thenReturn(defaultConfig.getDefaultValues());
lenient().when(mockConfig.getKeyValueDelimiterRegex()).thenReturn(defaultConfig.getKeyValueDelimiterRegex());
lenient().when(mockConfig.getValueSplitCharacters()).thenReturn(defaultConfig.getValueSplitCharacters());
lenient().when(mockConfig.getNonMatchValue()).thenReturn(defaultConfig.getNonMatchValue());
Expand Down Expand Up @@ -315,6 +318,103 @@ void testIncludeExcludeKeysOverlapKeyValueProcessor() {
assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig));
}

@Test
void testDefaultKeysNoOverlapsBetweenEventKvProcessor() {
final Map<String, Object> defaultMap = Map.of("dKey", "dValue");
when(mockConfig.getDefaultValues()).thenReturn(defaultMap);
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("key1=value1");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(2));
assertThatKeyEquals(parsed_message, "key1", "value1");
assertThatKeyEquals(parsed_message, "dKey", "dValue");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testDefaultKeysAlreadyInMessageKvProcessor(boolean skipDuplicateValues) {
final Map<String, Object> defaultMap = Map.of("dKey", "dValue");
when(mockConfig.getDefaultValues()).thenReturn(defaultMap);
when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues);
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("key1=value1&dKey=abc");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(2));
assertThatKeyEquals(parsed_message, "key1", "value1");
assertThatKeyEquals(parsed_message, "dKey", "abc");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testDefaultIncludeKeysOverlapKvProcessor(boolean skipDuplicateValues) {
final Map<String, Object> defaultMap = Map.of("key1", "abc");
final List<String> includeKeys = List.of("key1");
when(mockConfig.getDefaultValues()).thenReturn(defaultMap);
when(mockConfig.getIncludeKeys()).thenReturn(includeKeys);
when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues);
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("key1=value1&key2=value2");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(1));
assertThatKeyEquals(parsed_message, "key1", "value1");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testDefaultPrioritizeIncludeKeysKvProcessor(boolean skipDuplicateValues) {
final Map<String, Object> defaultMap = Map.of("key2", "value2");
final List<String> includeKeys = List.of("key1");
when(mockConfig.getDefaultValues()).thenReturn(defaultMap);
when(mockConfig.getIncludeKeys()).thenReturn(includeKeys);
when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues);
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("key1=value1&key2=abc");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(2));
assertThatKeyEquals(parsed_message, "key1", "value1");
assertThatKeyEquals(parsed_message, "key2", "value2");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testIncludeKeysNotInRecordMessageKvProcessor(boolean skipDuplicateValues) {
final Map<String, Object> defaultMap = Map.of("key2", "value2");
final List<String> includeKeys = List.of("key1");
when(mockConfig.getDefaultValues()).thenReturn(defaultMap);
when(mockConfig.getIncludeKeys()).thenReturn(includeKeys);
when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues);
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("key2=abc");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(1));
assertThatKeyEquals(parsed_message, "key2", "value2");
}

@Test
void testDefaultExcludeKeysOverlapKeyValueProcessor() {
final Map<String, Object> defaultMap = Map.of("dKey", "dValue");
final List<String> excludeKeys = List.of("dKey");
when(mockConfig.getDefaultValues()).thenReturn(defaultMap);
when(mockConfig.getExcludeKeys()).thenReturn(excludeKeys);

assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig));
}

@Test
void testCustomPrefixKvProcessor() {
when(mockConfig.getPrefix()).thenReturn("TEST_");
Expand Down Expand Up @@ -444,10 +544,10 @@ void testCapitalizeTransformKvProcessor() {

final Record<Event> record = getMessage("key1=value1");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);
final LinkedHashMap<String, Object> parsedMessage = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(1));
assertThatKeyEquals(parsed_message, "KEY1", "value1");
assertThat(parsedMessage.size(), equalTo(1));
assertThatKeyEquals(parsedMessage, "KEY1", "value1");
}

@Test
Expand Down

0 comments on commit 956e5ad

Please sign in to comment.