diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/log/JacksonOtelLog.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/log/JacksonOtelLog.java index cd9f00eef3..23127b153e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/log/JacksonOtelLog.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/log/JacksonOtelLog.java @@ -109,19 +109,21 @@ public static JacksonOtelLog.Builder builder() { @Override public String toJsonString() { - final ObjectNode attributesNode = (ObjectNode) getJsonNode().get("attributes"); - final ObjectNode flattenedJsonNode = getJsonNode().deepCopy(); - if (attributesNode != null) { + Object anyAttributes = getJsonNode().get("attributes"); + if(anyAttributes instanceof ObjectNode) { + final ObjectNode flattenedJsonNode = getJsonNode().deepCopy(); flattenedJsonNode.remove("attributes"); - for (Iterator> it = attributesNode.fields(); it.hasNext(); ) { + + for (Iterator> it = ((ObjectNode) anyAttributes).fields(); it.hasNext(); ) { Map.Entry entry = it.next(); String field = entry.getKey(); if (!flattenedJsonNode.has(field)) { flattenedJsonNode.set(field, entry.getValue()); } } + return flattenedJsonNode.toString(); } - return flattenedJsonNode.toString(); + return super.toJsonString(); } /** * Builder for creating {@link JacksonLog}. diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/log/JacksonOtelLogTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/log/JacksonOtelLogTest.java index db0d3934ef..85c763ba38 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/log/JacksonOtelLogTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/log/JacksonOtelLogTest.java @@ -149,5 +149,16 @@ public void testHistogramToJsonString() throws JSONException { String expected = String.format(file, TEST_TIME_KEY1, TEST_KEY2); JSONAssert.assertEquals(expected, actual, false); } + + @Test + public void test_non_object_attributes_toJsonString_serializes_as_is() { + JacksonOtelLog testLog = JacksonOtelLog.builder() + .withAttributes(Map.of("key", "value")) + .build(); + assertThat(testLog.toJsonString(), equalTo("{\"key\":\"value\"}")); + + testLog.put("attributes", "a string"); + assertThat(testLog.toJsonString(), equalTo("{\"attributes\":\"a string\"}")); + } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java index e076b7de10..c770fac74d 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java @@ -40,6 +40,7 @@ public class ParseJsonProcessor extends AbstractProcessor, Record< private final String pointer; private final String parseWhen; private final List tagsOnFailure; + private final boolean overwriteIfDestinationExists; private final ExpressionEvaluator expressionEvaluator; @@ -54,6 +55,7 @@ public ParseJsonProcessor(final PluginMetrics pluginMetrics, pointer = parseJsonProcessorConfig.getPointer(); parseWhen = parseJsonProcessorConfig.getParseWhen(); tagsOnFailure = parseJsonProcessorConfig.getTagsOnFailure(); + overwriteIfDestinationExists = parseJsonProcessorConfig.getOverwriteIfDestinationExists(); this.expressionEvaluator = expressionEvaluator; } @@ -85,7 +87,7 @@ public Collection> doExecute(final Collection> recor if (doWriteToRoot) { writeToRoot(event, parsedJson); - } else { + } else if (overwriteIfDestinationExists || !event.containsKey(destination)) { event.put(destination, parsedJson); } } catch (final JsonProcessingException jsonException) { @@ -169,7 +171,9 @@ private String trimPointer(String pointer) { private void writeToRoot(final Event event, final Map parsedJson) { for (Map.Entry entry : parsedJson.entrySet()) { - event.put(entry.getKey(), entry.getValue()); + if (overwriteIfDestinationExists || !event.containsKey(entry.getKey())) { + event.put(entry.getKey(), entry.getValue()); + } } } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java index fa6dbe199b..6295f12fba 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java @@ -31,6 +31,9 @@ public class ParseJsonProcessorConfig { @JsonProperty("tags_on_failure") private List tagsOnFailure; + @JsonProperty("overwrite_if_destination_exists") + private boolean overwriteIfDestinationExists = true; + /** * The field of the Event that contains the JSON data. * @@ -68,6 +71,10 @@ public List getTagsOnFailure() { public String getParseWhen() { return parseWhen; } + public boolean getOverwriteIfDestinationExists() { + return overwriteIfDestinationExists; + } + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") boolean isValidDestination() { if (Objects.isNull(destination)) return true; diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java index ea3ef5a7f7..8a08e5bd36 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value assertThat(objectUnderTest.getDestination(), equalTo(null)); assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); + assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); } @Nested diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java index 7fce6ecbe5..8e2ea861ab 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java @@ -52,6 +52,7 @@ void setup() { when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination()); when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer()); when(processorConfig.getParseWhen()).thenReturn(null); + when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); } private ParseJsonProcessor createObjectUnderTest() { @@ -94,6 +95,38 @@ void test_when_dataFieldEqualToRootField_then_overwritesOriginalFields() { assertThatKeyEquals(parsedEvent, "key", "value"); } + @Test + void test_when_dataFieldEqualToRootField_then_notOverwritesOriginalFields() { + final String source = "root_source"; + when(processorConfig.getSource()).thenReturn(source); + when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(false); + parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used + + final Map data = Map.of(source,"value_that_will_not_be_overwritten"); + + final String serializedMessage = convertMapToJSONString(data); + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThatKeyEquals(parsedEvent, source, "{\"root_source\":\"value_that_will_not_be_overwritten\"}"); + } + + @Test + void test_when_dataFieldEqualToDestinationField_then_notOverwritesOriginalFields() { + final String source = "root_source"; + when(processorConfig.getSource()).thenReturn(source); + when(processorConfig.getDestination()).thenReturn(source); // write back to source + when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(false); + parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used + + final Map data = Map.of("key","value"); + + final String serializedMessage = convertMapToJSONString(data); + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThatKeyEquals(parsedEvent, source, "{\"key\":\"value\"}"); + assertThat(parsedEvent.containsKey("key"), equalTo(false)); + } + @Test void test_when_valueIsEmpty_then_notParsed() { parseJsonProcessor = createObjectUnderTest();