diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 04147619b7..45a1638c16 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -27,7 +27,6 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; @@ -35,12 +34,13 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapperFactory; -import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingCompressedBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingUncompressedBulkRequest; @@ -75,8 +75,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; - @DataPrepperPlugin(name = "opensearch", pluginType = Sink.class) public class OpenSearchSink extends AbstractSink> { public static final String BULKREQUEST_LATENCY = "bulkRequestLatency"; @@ -387,7 +385,7 @@ private void logFailureForDlqObjects(final List dlqObjects, final Thr BulkOperationWriter.dlqObjectToString(dlqObject), message)); dlqObject.releaseEventHandle(true); } catch (final IOException e) { - LOG.error(SENSITIVE, "DLQ failure for Document[{}]", dlqObject.getFailedData(), e); + LOG.error("Failed to write a document to the DLQ", e); dlqObject.releaseEventHandle(false); } }); @@ -399,13 +397,17 @@ private void logFailureForDlqObjects(final List dlqObjects, final Thr }); } catch (final IOException e) { dlqObjects.forEach(dlqObject -> { - LOG.error(SENSITIVE, "DLQ failure for Document[{}]", dlqObject.getFailedData(), e); + LOG.error("Failed to write a document to the DLQ", e); dlqObject.releaseEventHandle(false); }); } } else { dlqObjects.forEach(dlqObject -> { - LOG.warn(SENSITIVE, "Document [{}] has failure. DLQ not configured", dlqObject.getFailedData(), failure); + + final FailedDlqData failedDlqData = (FailedDlqData) dlqObject.getFailedData(); + + final String message = failure == null ? failedDlqData.getMessage() : failure.getMessage(); + LOG.warn("Document failed to write to OpenSearch with error code {}. Configure a DLQ to save failed documents. Error: {}", failedDlqData.getStatus(), message); dlqObject.releaseEventHandle(false); }); }