Skip to content

Commit

Permalink
Add Dissect Processor (opensearch-project#3363)
Browse files Browse the repository at this point in the history
* Added Dissect Processor Functionality

Signed-off-by: Vishal Boinapalli <[email protected]>

* Fixed checkstyle issue

Signed-off-by: Vishal Boinapalli <[email protected]>

* Tweak readme and a unit test

Signed-off-by: Hai Yan <[email protected]>

* Fix build failures

Signed-off-by: Hai Yan <[email protected]>

* Address review comments - separate unit tests for dissector from processor; add delimiter and fieldhelper tests

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Vishal Boinapalli <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Co-authored-by: Vishal Boinapalli <[email protected]>
  • Loading branch information
oeyh and vishalboin authored Sep 26, 2023
1 parent f2593a9 commit fee636c
Show file tree
Hide file tree
Showing 18 changed files with 1,421 additions and 0 deletions.
129 changes: 129 additions & 0 deletions data-prepper-plugins/dissect-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Dissect Processor

The Dissect processor is useful when dealing with log files or messages that have a known pattern or structure. It extracts specific pieces of information from the text and map them to individual fields based on the defined Dissect patterns.


## Basic Usage

To get started with dissect processor using Data Prepper, create the following `pipeline.yaml`.
```yaml
dissect-pipeline:
source:
file:
path: "/full/path/to/dissect_logs_json.log"
record_type: "event"
format: "json"
processor:
- dissect:
map:
log: "%{Date} %{Time} %{Log_Type}: %{Message}"
sink:
- stdout:
```
Create the following file named `dissect_logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```
{"log": "07-25-2023 10:00:00 ERROR: Some error"}
```
The Dissect processor will retrieve the necessary fields from the `log` message, such as `Date`, `Time`, `Log_Type`, and `Message`, with the help of the pattern `%{Date} %{Time} %{Type}: %{Message}`, configured in the pipeline.
When you run Data Prepper with this `pipeline.yaml` passed in, you should see the following standard output.
```
{
"log" : "07-25-2023 10:00:00 ERROR: Some error",
"Date" : "07-25-2023"
"Time" : "10:00:00"
"Log_Type" : "ERROR"
"Message" : "Some error"
}
```
The fields `Date`, `Time`, `Log_Type`, and `Message` have been extracted from `log` value.
## Configuration
* `map` (Required): `map` is required to specify the dissect patterns. It takes a `Map<String, String>` with fields as keys and respective dissect patterns as values.
* `target_types` (Optional): A `Map<String, String>` that specifies what the target type of specific field should be. Valid options are `integer`, `double`, `string`, and `boolean`. By default, all the values are `string`. Target types will be changed after the dissection process.
* `dissect_when` (Optional): A Data Prepper Expression string following the [Data Prepper Expression syntax](../../docs/expression_syntax.md). When configured, the processor will evaluate the expression before proceeding with the dissection process and perform the dissection if the expression evaluates to `true`.
## Field Notations
Symbols like `?, +, ->, /, &` can be used to perform logical extraction of data.
* **Normal Field** : The field without a suffix or prefix. The field will be directly added to the output Event.
Ex: `%{field_name}`
* **Skip Field** : ? can be used as a prefix to key to skip that field in the output JSON.
* Skip Field : `%{}`
* Named skip field : `%{?field_name}`
* **Append Field** : To append multiple values and put the final value in the field, we can use + before the field name in the dissect pattern
* **Usage**:
Pattern : "%{+field_name}, %{+field_name}"
Text : "foo, bar"
Output : {"field_name" : "foobar"}
We can also define the order the concatenation with the help of suffix `/<digits>` .
* **Usage**:
Pattern : "%{+field_name/2}, %{+field_name/1}"
Text : "foo, bar"
Output : {"field_name" : "barfoo"}
If the order is not mentioned, the append operation will take place in the order of fields specified in the dissect pattern.<br><br>
* **Indirect Field** : While defining a pattern, prefix the field with a `&` to assign the value found with this field to the value of another field found as the key.
* **Usage**:
Pattern : "%{?field_name}, %{&field_name}"
Text: "foo, bar"
Output : {“foo” : “bar”}
Here we can see that `foo` which was captured from the skip field `%{?field_name}` is made the key to value captured form the field `%{&field_name}`
* **Usage**:
Pattern : %{field_name}, %{&field_name}
Text: "foo, bar"
Output : {“field_name”:“foo”, “foo”:“bar”}
We can also indirectly assign the value to an appended field, along with `normal` field and `skip` field.
### Padding
* `->` operator can be used as a suffix to a field to indicate that white spaces after this field can be ignored.
* **Usage**:
Pattern : %{field1→} %{field2}
Text : “firstname lastname”
Output : {“field1” : “firstname”, “field2” : “lastname”}
* This operator should be used as the right most suffix.
* **Usage**:
Pattern : %{fieldname/1->} %{fieldname/2}
If we use `->` before `/<digit>`, the `->` operator will also be considered part of the field name.
## Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
24 changes: 24 additions & 0 deletions data-prepper-plugins/dissect-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}


dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0'
implementation 'io.micrometer:micrometer-core'
implementation project(path: ':data-prepper-api')
implementation project(path: ':data-prepper-plugins:mutate-event-processors')
testImplementation project(':data-prepper-plugins:log-generator-source')
testImplementation project(':data-prepper-test-common')
implementation 'org.apache.commons:commons-lang3:3.12.0'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.dissect;

public class Delimiter {
private final String delimiterString;
private int start = -1;
private int end = -1;
private Delimiter next = null;

private Delimiter prev = null;

public Delimiter(String delimiterString) {
this.delimiterString = delimiterString;
}

public int getStart() {
return start;
}

public void setStart(int ind) {
start = ind;
}

public int getEnd() {
return end;
}

public void setEnd(int ind) {
end = ind;
}

public Delimiter getNext() {
return next;
}

public void setNext(Delimiter nextDelimiter) {
next = nextDelimiter;
}

public Delimiter getPrev() {
return prev;
}

public void setPrev(Delimiter prevDelimiter) {
prev = prevDelimiter;
}

@Override
public String toString() {
return delimiterString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.dissect;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.dissect.Fields.Field;
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType;
import org.opensearch.dataprepper.typeconverter.TypeConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;


@DataPrepperPlugin(name = "dissect", pluginType = Processor.class, pluginConfigurationType = DissectProcessorConfig.class)
public class DissectProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(DissectProcessor.class);
private final DissectProcessorConfig dissectConfig;
private final Map<String, Dissector> dissectorMap = new HashMap<>();
private final Map<String, TargetType> targetTypeMap;
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public DissectProcessor(PluginMetrics pluginMetrics, final DissectProcessorConfig dissectConfig, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.dissectConfig = dissectConfig;
this.expressionEvaluator = expressionEvaluator;
this.targetTypeMap = dissectConfig.getTargetTypes();

Map<String, String> patternsMap = dissectConfig.getMap();
for (String key : patternsMap.keySet()) {
Dissector dissector = new Dissector(patternsMap.get(key));
dissectorMap.put(key, dissector);
}

}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
Event event = record.getData();
String dissectWhen = dissectConfig.getDissectWhen();
if (Objects.nonNull(dissectWhen) && !expressionEvaluator.evaluateConditional(dissectWhen, event)) {
continue;
}
try{
for(String field: dissectorMap.keySet()){
if(event.containsKey(field)){
dissectField(event, field);
}
}
}
catch (Exception ex){
LOG.error(EVENT, "Error dissecting the event [{}] ", record.getData(), ex);
}
}
return records;
}

private void dissectField(Event event, String field){
Dissector dissector = dissectorMap.get(field);
String text = event.get(field, String.class);
if (dissector.dissectText(text)) {
List<Field> dissectedFields = dissector.getDissectedFields();
for(Field disectedField: dissectedFields) {
String dissectFieldName = disectedField.getKey();
Object dissectFieldValue = convertTargetType(dissectFieldName,disectedField.getValue());
event.put(disectedField.getKey(), dissectFieldValue);
}
}
}

private Object convertTargetType(String fieldKey, String fieldValue){
if(targetTypeMap == null){
return fieldValue;
}
try{
if(targetTypeMap.containsKey(fieldKey)){
TypeConverter converter = targetTypeMap.get(fieldKey).getTargetConverter();
return converter.convert(fieldValue);
} else {
return fieldValue;
}
} catch (NumberFormatException ex){
LOG.error("Unable to convert [{}] to the target type mentioned", fieldKey);
return fieldValue;
}
}



@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.dataprepper.plugins.processor.dissect;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType;

import java.util.Map;

public class DissectProcessorConfig {
@NotNull
@JsonProperty("map")
private Map<String, String> map;
@JsonProperty("target_types")
private Map<String, TargetType> targetTypes;
@JsonProperty("dissect_when")
private String dissectWhen;

public String getDissectWhen(){
return dissectWhen;
}

public Map<String, String> getMap() {
return map;
}

public Map<String, TargetType> getTargetTypes() { return targetTypes; }

}
Loading

0 comments on commit fee636c

Please sign in to comment.