diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index 02e77a76c6..b9048a6a29 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -45,6 +45,13 @@ When run, the processor will parse the message into the following output: * `exclude_keys` - An array specifying the parsed keys which should not be added to the event. By default no keys will be excluded. * Default: `[]` * Example: `exclude_keys` is `["key2"]`. `key1=value1&key2=value2` will parse into `{"key1": "value1"}` +* `default_values` - A hash specifying the default keys and their values which should be added to the event in case these keys do not exist in the source field being parsed. + * Default: `{}` + * Example: `default_values` is `{"defaultkey": "defaultvalue"}`. `key1=value1` will parse into `{"key1": "value1", "defaultkey": "defaultvalue"}` + * If the default key already exists in the message, the value is not changed. + * Example: `default_values` is `{"value1": "abc"}`. `key1=value1` will parse into `{"key1": "value1"}` + * It should be noted that the include_keys filter will be applied to the message first, and then default keys. + * Example: `include_keys` is `["key1"]`, and `default_keys` is `{"key2": "value2"}`. `key1=value1&key2=abc` will parse into `{"key1": "value1", "key2": "value2"}` * `key_value_delimiter_regex` - A regex specifying the delimiter between a key and a value. Special regex characters such as `[` and `]` must be escaped using `\\`. * There is no default. * Note: This cannot be defined at the same time as `value_split_characters` diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index 0a9fe30225..bca9b11540 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -37,6 +37,8 @@ public class KeyValueProcessor extends AbstractProcessor, Record includeKeysSet = new HashSet(); private final Set excludeKeysSet = new HashSet(); + private final HashMap defaultValuesMap = new HashMap<>(); + private final Set defaultValuesSet = new HashSet(); private final String lowercaseKey = "lowercase"; private final String uppercaseKey = "uppercase"; private final String capitalizeKey = "capitalize"; @@ -102,9 +104,13 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces includeKeysSet.addAll(keyValueProcessorConfig.getIncludeKeys()); excludeKeysSet.addAll(keyValueProcessorConfig.getExcludeKeys()); + defaultValuesMap.putAll(keyValueProcessorConfig.getDefaultValues()); + if (!defaultValuesMap.isEmpty()) { + defaultValuesSet.addAll(defaultValuesMap.keySet()); + } - validateKeySets(includeKeysSet, excludeKeysSet); - + validateKeySets(includeKeysSet, excludeKeysSet, defaultValuesSet); + if (!validTransformOptionSet.contains(keyValueProcessorConfig.getTransformKey())) { throw new IllegalArgumentException(String.format("The transform_key value: %s is not a valid option", keyValueProcessorConfig.getTransformKey())); } @@ -155,11 +161,18 @@ private boolean validateRegex(final String pattern) return true; } - private void validateKeySets(final Set includeSet, final Set excludeSet) { - Set intersectionSet = new HashSet(includeSet); - intersectionSet.retainAll(excludeSet); - if (!intersectionSet.isEmpty()) { - throw new IllegalArgumentException("Include keys and exclude keys set cannot have any overlap", null); + private void validateKeySets(final Set includeSet, final Set excludeSet, final Set defaultSet) { + final Set includeIntersectionSet = new HashSet(includeSet); + final Set defaultIntersectionSet = new HashSet(defaultSet); + + includeIntersectionSet.retainAll(excludeSet); + if (!includeIntersectionSet.isEmpty()) { + throw new IllegalArgumentException("Include keys and exclude keys set cannot have any overlap"); + } + + defaultIntersectionSet.retainAll(excludeSet); + if (!defaultIntersectionSet.isEmpty()) { + throw new IllegalArgumentException("Cannot exclude a default key!"); } } @@ -171,18 +184,19 @@ public Collection> doExecute(final Collection> recor final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class); final String[] groups = fieldDelimiterPattern.split(groupsRaw, 0); + for(final String group : groups) { final String[] terms = keyValueDelimiterPattern.split(group, 2); String key = terms[0]; Object value; if (!includeKeysSet.isEmpty() && !includeKeysSet.contains(key)) { - LOG.debug(String.format("Skipping not included key: '%s'", key)); + LOG.debug("Skipping not included key: '{}'", key); continue; } - if (!excludeKeysSet.isEmpty() && excludeKeysSet.contains(key)) { - LOG.debug(String.format("Key is being excluded: '%s'", key)); + if (excludeKeysSet.contains(key)) { + LOG.debug("Key is being excluded: '{}'", key); continue; } @@ -194,7 +208,7 @@ public Collection> doExecute(final Collection> recor if (terms.length == 2) { value = terms[1]; } else { - LOG.debug(String.format("Unsuccessful match: '%s'", terms[0])); + LOG.debug("Unsuccessful match: '{}'", terms[0]); value = keyValueProcessorConfig.getNonMatchValue(); } @@ -226,6 +240,14 @@ public Collection> doExecute(final Collection> recor addKeyValueToMap(parsedMap, key, value); } + for (Map.Entry pair : defaultValuesMap.entrySet()) { + if (parsedMap.containsKey(pair.getKey())) { + LOG.debug("Skipping already included default key: '{}'", pair.getKey()); + continue; + } + parsedMap.put(pair.getKey(), pair.getValue()); + } + recordEvent.put(keyValueProcessorConfig.getDestination(), parsedMap); } diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index 57af39de9b..62a2b718f6 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; public class KeyValueProcessorConfig { static final String DEFAULT_SOURCE = "message"; @@ -18,6 +19,7 @@ public class KeyValueProcessorConfig { public static final String DEFAULT_FIELD_SPLIT_CHARACTERS = "&"; static final List DEFAULT_INCLUDE_KEYS = new ArrayList<>(); static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); + static final Map DEFAULT_DEFAULT_VALUES = Map.of(); public static final String DEFAULT_VALUE_SPLIT_CHARACTERS = "="; static final Object DEFAULT_NON_MATCH_VALUE = null; static final String DEFAULT_PREFIX = ""; @@ -49,6 +51,10 @@ public class KeyValueProcessorConfig { @NotNull private List excludeKeys = DEFAULT_EXCLUDE_KEYS; + @JsonProperty("default_values") + @NotNull + private Map defaultValues = DEFAULT_DEFAULT_VALUES; + @JsonProperty("key_value_delimiter_regex") private String keyValueDelimiterRegex; @@ -109,6 +115,10 @@ public List getExcludeKeys() { return excludeKeys; } + public Map getDefaultValues() { + return defaultValues; + } + public String getKeyValueDelimiterRegex() { return keyValueDelimiterRegex; } diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index 6c946c98b1..db50b7a403 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -13,6 +13,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -58,6 +60,7 @@ void setup() { lenient().when(mockConfig.getFieldSplitCharacters()).thenReturn(defaultConfig.getFieldSplitCharacters()); lenient().when(mockConfig.getIncludeKeys()).thenReturn(defaultConfig.getIncludeKeys()); lenient().when(mockConfig.getExcludeKeys()).thenReturn(defaultConfig.getExcludeKeys()); + lenient().when(mockConfig.getDefaultValues()).thenReturn(defaultConfig.getDefaultValues()); lenient().when(mockConfig.getKeyValueDelimiterRegex()).thenReturn(defaultConfig.getKeyValueDelimiterRegex()); lenient().when(mockConfig.getValueSplitCharacters()).thenReturn(defaultConfig.getValueSplitCharacters()); lenient().when(mockConfig.getNonMatchValue()).thenReturn(defaultConfig.getNonMatchValue()); @@ -315,6 +318,103 @@ void testIncludeExcludeKeysOverlapKeyValueProcessor() { assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); } + @Test + void testDefaultKeysNoOverlapsBetweenEventKvProcessor() { + final Map defaultMap = Map.of("dKey", "dValue"); + when(mockConfig.getDefaultValues()).thenReturn(defaultMap); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("key1=value1"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(2)); + assertThatKeyEquals(parsed_message, "key1", "value1"); + assertThatKeyEquals(parsed_message, "dKey", "dValue"); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testDefaultKeysAlreadyInMessageKvProcessor(boolean skipDuplicateValues) { + final Map defaultMap = Map.of("dKey", "dValue"); + when(mockConfig.getDefaultValues()).thenReturn(defaultMap); + when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("key1=value1&dKey=abc"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(2)); + assertThatKeyEquals(parsed_message, "key1", "value1"); + assertThatKeyEquals(parsed_message, "dKey", "abc"); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testDefaultIncludeKeysOverlapKvProcessor(boolean skipDuplicateValues) { + final Map defaultMap = Map.of("key1", "abc"); + final List includeKeys = List.of("key1"); + when(mockConfig.getDefaultValues()).thenReturn(defaultMap); + when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); + when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("key1=value1&key2=value2"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(1)); + assertThatKeyEquals(parsed_message, "key1", "value1"); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testDefaultPrioritizeIncludeKeysKvProcessor(boolean skipDuplicateValues) { + final Map defaultMap = Map.of("key2", "value2"); + final List includeKeys = List.of("key1"); + when(mockConfig.getDefaultValues()).thenReturn(defaultMap); + when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); + when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("key1=value1&key2=abc"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(2)); + assertThatKeyEquals(parsed_message, "key1", "value1"); + assertThatKeyEquals(parsed_message, "key2", "value2"); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testIncludeKeysNotInRecordMessageKvProcessor(boolean skipDuplicateValues) { + final Map defaultMap = Map.of("key2", "value2"); + final List includeKeys = List.of("key1"); + when(mockConfig.getDefaultValues()).thenReturn(defaultMap); + when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); + when(mockConfig.getSkipDuplicateValues()).thenReturn(skipDuplicateValues); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("key2=abc"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(1)); + assertThatKeyEquals(parsed_message, "key2", "value2"); + } + + @Test + void testDefaultExcludeKeysOverlapKeyValueProcessor() { + final Map defaultMap = Map.of("dKey", "dValue"); + final List excludeKeys = List.of("dKey"); + when(mockConfig.getDefaultValues()).thenReturn(defaultMap); + when(mockConfig.getExcludeKeys()).thenReturn(excludeKeys); + + assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig)); + } + @Test void testCustomPrefixKvProcessor() { when(mockConfig.getPrefix()).thenReturn("TEST_"); @@ -444,10 +544,10 @@ void testCapitalizeTransformKvProcessor() { final Record record = getMessage("key1=value1"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); - final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + final LinkedHashMap parsedMessage = getLinkedHashMap(editedRecords); - assertThat(parsed_message.size(), equalTo(1)); - assertThatKeyEquals(parsed_message, "KEY1", "value1"); + assertThat(parsedMessage.size(), equalTo(1)); + assertThatKeyEquals(parsedMessage, "KEY1", "value1"); } @Test