Skip to content

Commit

Permalink
Fix uncaught exception causing pipeline shutdown (#3189)
Browse files Browse the repository at this point in the history
* Catch ClassCastException in JacksonOtelLog.toJsonString()
* Add overwrite option to parse-json processor

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Aug 21, 2023
1 parent 2f2c047 commit 8bb4178
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,21 @@ public static JacksonOtelLog.Builder builder() {

@Override
public String toJsonString() {
final ObjectNode attributesNode = (ObjectNode) getJsonNode().get("attributes");
final ObjectNode flattenedJsonNode = getJsonNode().deepCopy();
if (attributesNode != null) {
Object anyAttributes = getJsonNode().get("attributes");
if(anyAttributes instanceof ObjectNode) {
final ObjectNode flattenedJsonNode = getJsonNode().deepCopy();
flattenedJsonNode.remove("attributes");
for (Iterator<Map.Entry<String, JsonNode>> it = attributesNode.fields(); it.hasNext(); ) {

for (Iterator<Map.Entry<String, JsonNode>> it = ((ObjectNode) anyAttributes).fields(); it.hasNext(); ) {
Map.Entry<String, JsonNode> entry = it.next();
String field = entry.getKey();
if (!flattenedJsonNode.has(field)) {
flattenedJsonNode.set(field, entry.getValue());
}
}
return flattenedJsonNode.toString();
}
return flattenedJsonNode.toString();
return super.toJsonString();
}
/**
* Builder for creating {@link JacksonLog}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,16 @@ public void testHistogramToJsonString() throws JSONException {
String expected = String.format(file, TEST_TIME_KEY1, TEST_KEY2);
JSONAssert.assertEquals(expected, actual, false);
}

@Test
public void test_non_object_attributes_toJsonString_serializes_as_is() {
JacksonOtelLog testLog = JacksonOtelLog.builder()
.withAttributes(Map.of("key", "value"))
.build();
assertThat(testLog.toJsonString(), equalTo("{\"key\":\"value\"}"));

testLog.put("attributes", "a string");
assertThat(testLog.toJsonString(), equalTo("{\"attributes\":\"a string\"}"));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ParseJsonProcessor extends AbstractProcessor<Record<Event>, Record<
private final String pointer;
private final String parseWhen;
private final List<String> tagsOnFailure;
private final boolean overwriteIfDestinationExists;

private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -54,6 +55,7 @@ public ParseJsonProcessor(final PluginMetrics pluginMetrics,
pointer = parseJsonProcessorConfig.getPointer();
parseWhen = parseJsonProcessorConfig.getParseWhen();
tagsOnFailure = parseJsonProcessorConfig.getTagsOnFailure();
overwriteIfDestinationExists = parseJsonProcessorConfig.getOverwriteIfDestinationExists();
this.expressionEvaluator = expressionEvaluator;
}

Expand Down Expand Up @@ -85,7 +87,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

if (doWriteToRoot) {
writeToRoot(event, parsedJson);
} else {
} else if (overwriteIfDestinationExists || !event.containsKey(destination)) {
event.put(destination, parsedJson);
}
} catch (final JsonProcessingException jsonException) {
Expand Down Expand Up @@ -169,7 +171,9 @@ private String trimPointer(String pointer) {

private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
event.put(entry.getKey(), entry.getValue());
if (overwriteIfDestinationExists || !event.containsKey(entry.getKey())) {
event.put(entry.getKey(), entry.getValue());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class ParseJsonProcessorConfig {
@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

/**
* The field of the Event that contains the JSON data.
*
Expand Down Expand Up @@ -68,6 +71,10 @@ public List<String> getTagsOnFailure() {

public String getParseWhen() { return parseWhen; }

public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
boolean isValidDestination() {
if (Objects.isNull(destination)) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value
assertThat(objectUnderTest.getDestination(), equalTo(null));
assertThat(objectUnderTest.getPointer(), equalTo(null));
assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null));
assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true));
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ void setup() {
when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination());
when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer());
when(processorConfig.getParseWhen()).thenReturn(null);
when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true);
}

private ParseJsonProcessor createObjectUnderTest() {
Expand Down Expand Up @@ -94,6 +95,38 @@ void test_when_dataFieldEqualToRootField_then_overwritesOriginalFields() {
assertThatKeyEquals(parsedEvent, "key", "value");
}

@Test
void test_when_dataFieldEqualToRootField_then_notOverwritesOriginalFields() {
final String source = "root_source";
when(processorConfig.getSource()).thenReturn(source);
when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(false);
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used

final Map<String, Object> data = Map.of(source,"value_that_will_not_be_overwritten");

final String serializedMessage = convertMapToJSONString(data);
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThatKeyEquals(parsedEvent, source, "{\"root_source\":\"value_that_will_not_be_overwritten\"}");
}

@Test
void test_when_dataFieldEqualToDestinationField_then_notOverwritesOriginalFields() {
final String source = "root_source";
when(processorConfig.getSource()).thenReturn(source);
when(processorConfig.getDestination()).thenReturn(source); // write back to source
when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(false);
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used

final Map<String, Object> data = Map.of("key","value");

final String serializedMessage = convertMapToJSONString(data);
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThatKeyEquals(parsedEvent, source, "{\"key\":\"value\"}");
assertThat(parsedEvent.containsKey("key"), equalTo(false));
}

@Test
void test_when_valueIsEmpty_then_notParsed() {
parseJsonProcessor = createObjectUnderTest();
Expand Down

0 comments on commit 8bb4178

Please sign in to comment.