Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into sns-sink-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
udaych20 authored Jul 17, 2023
2 parents 23b12c7 + 0b804fd commit 69a0ca0
Show file tree
Hide file tree
Showing 40 changed files with 1,490 additions and 431 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ default Boolean evaluateConditional(final String statement, final Event context)
throw new ClassCastException("Unexpected expression return type of " + result.getClass());
}
}

Boolean isValidExpressionStatement(final String statement);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class TestExpressionEvaluator implements ExpressionEvaluator {
public Object evaluate(final String statement, final Event event) {
return event.get(statement, Object.class);
}

@Override
public Boolean isValidExpressionStatement(final String statement) {
return true;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,15 @@ public Object evaluate(final String statement, final Event context) {
throw new ExpressionEvaluationException("Unable to evaluate statement \"" + statement + "\"", exception);
}
}

@Override
public Boolean isValidExpressionStatement(final String statement) {
try {
parser.parse(statement);
return true;
}
catch (final Exception exception) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.UUID;
import java.util.Random;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -93,5 +94,30 @@ void testGivenEvaluatorThrowsExceptionThenExceptionThrown() {
verify(evaluator).evaluate(eq(parseTree), eq(event));
}

@Test
void isValidExpressionStatement_returns_true_when_parse_does_not_throw() {
final String statement = UUID.randomUUID().toString();
final ParseTree parseTree = mock(ParseTree.class);

doReturn(parseTree).when(parser).parse(eq(statement));

final boolean result = statementEvaluator.isValidExpressionStatement(statement);

assertThat(result, equalTo(true));

verify(parser).parse(eq(statement));
}

@Test
void isValidExpressionStatement_returns_false_when_parse_throws() {
final String statement = UUID.randomUUID().toString();

doThrow(RuntimeException.class).when(parser).parse(eq(statement));

final boolean result = statementEvaluator.isValidExpressionStatement(statement);

assertThat(result, equalTo(false));
}

}

1 change: 1 addition & 0 deletions data-prepper-plugins/geoip-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ task integrationTest(type: Test) {
useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.geoipProcessor.maxmindLicenseKey', System.getProperty('tests.geoipProcessor.maxmindLicenseKey')

filter {
includeTestsMatching '*IT'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor;

public class GeoIPInputJson {
Peer PeerObject;
private String status;

// Getter Methods
public Peer getPeer() {
return PeerObject;
}
public String getStatus() {
return status;
}
// Setter Methods
public void setPeer( Peer peerObject ) {
this.PeerObject = peerObject;
}
public void setStatus( String status ) {
this.status = status;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;

import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

@ExtendWith(MockitoExtension.class)
public class GeoIPProcessorUrlServiceIT {

private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));
private String tempPath;
private GeoIPProcessorConfig geoIPProcessorConfig;
private String maxmindLicenseKey;
private GeoIPProcessorService geoIPProcessorService;
private GeoIPInputJson geoIPInputJson;
private String jsonInput;
private static final String TEMP_PATH_FOLDER = "GeoIP";
public static final String DATABASE_1 = "first_database";
public static final String URL_SUFFIX = "&suffix=tar.gz";

@BeforeEach
public void setUp() throws JsonProcessingException {

maxmindLicenseKey = System.getProperty("tests.geoipProcessor.maxmindLicenseKey");

jsonInput = "{\"peer\": {\"ip\": \"8.8.8.8\", \"host\": \"example.org\" }, \"status\": \"success\"}";

geoIPInputJson = objectMapper.readValue(jsonInput, GeoIPInputJson.class);
tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER;

String asnUrl = "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-ASN&license_key=" + maxmindLicenseKey + URL_SUFFIX;
String cityUrl = "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City&license_key=" + maxmindLicenseKey + URL_SUFFIX;
String countryUrl = "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country&license_key=" + maxmindLicenseKey + URL_SUFFIX;

String pipelineConfig = " aws:\n" +
" region: us-east-2\n" +
" sts_role_arn: \"arn:aws:iam::123456789:role/data-prepper-execution-role\"\n" +
" keys:\n" +
" - key:\n" +
" source: \"/peer/ip\"\n" +
" target: \"target1\"\n" +
" - key:\n" +
" source: \"/peer/ip2\"\n" +
" target: \"target2\"\n" +
" attributes: [\"city_name\",\"country_name\"]\n" +
" service_type:\n" +
" maxmind:\n" +
" database_path:\n" +
" - url: " + asnUrl + "\n" +
" - url: " + cityUrl + "\n" +
" - url: " + countryUrl + "\n" +
" load_type: \"cache\"\n" +
" cache_size: 8192\n" +
" cache_refresh_schedule: PT3M";

objectMapper.registerModule(new JavaTimeModule());
this.geoIPProcessorConfig = objectMapper.readValue(pipelineConfig, GeoIPProcessorConfig.class);
}

public GeoIPProcessorService createObjectUnderTest() {
return new GeoIPProcessorService(geoIPProcessorConfig, tempPath);
}

@Test
void verify_enrichment_of_data_from_maxmind_url() throws UnknownHostException {

Map<String, Object> geoData = new HashMap<>();
this.geoIPProcessorService = createObjectUnderTest();
String ipAddress = geoIPInputJson.getPeer().getIp();
if (IPValidationcheck.isPublicIpAddress(ipAddress)) {
InetAddress inetAddress = InetAddress.getByName(ipAddress);
//All attributes are considered by default with the null value
geoData = geoIPProcessorService.getGeoData(inetAddress, null);

assertThat(geoData.get("country_iso_code"), equalTo("US"));
assertThat(geoData.get("ip"), equalTo("8.8.8.8"));
assertThat(geoData.get("country_name"), equalTo("United States"));
assertThat(geoData.get("organization_name"), equalTo("GOOGLE"));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor;

public class Peer {
private String ip;
private String host;

// Getter Methods
public String getIp() {
return ip;
}
public String getHost() {
return host;
}
// Setter Methods
public void setIp( String ip ) {
this.ip = ip;
}
public void setHost( String host ) {
this.host = host;
}
}
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation 'io.confluent:kafka-avro-serializer:7.3.3'
implementation 'io.confluent:kafka-schema-registry-client:7.3.3'
implementation 'io.confluent:kafka-schema-registry:7.3.3:tests'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.6'
testImplementation 'org.mockito:mockito-inline:4.1.0'
testImplementation 'org.yaml:snakeyaml:2.0'
testImplementation testLibs.spring.test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,73 @@

package org.opensearch.dataprepper.plugins.kafka.configuration;


import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.Valid;

import java.util.stream.Stream;

/**
* * A helper class that helps to read auth related configuration values from
* A helper class that helps to read auth related configuration values from
* pipelines.yaml
*/
public class AuthConfig {
@JsonProperty("sasl_plaintext")
private PlainTextAuthConfig plainTextAuthConfig;

@JsonProperty("sasl_oauth")
private OAuthConfig oAuthConfig;
public static class SaslAuthConfig {
@JsonProperty("plaintext")
private PlainTextAuthConfig plainTextAuthConfig;

@JsonProperty("oauth")
private OAuthConfig oAuthConfig;

@JsonProperty("aws_iam")
private AwsIamAuthConfig awsIamAuthConfig;

public AwsIamAuthConfig getAwsIamAuthConfig() {
return awsIamAuthConfig;
}

public PlainTextAuthConfig getPlainTextAuthConfig() {
return plainTextAuthConfig;
}

public OAuthConfig getOAuthConfig() {
return oAuthConfig;
}

@AssertTrue(message = "Only one of AwsIam or oAuth or PlainText auth config must be specified")
public boolean hasOnlyOneConfig() {
return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n!=null).count() == 1;
}

public OAuthConfig getoAuthConfig() {
return oAuthConfig;
}

public void setoAuthConfig(OAuthConfig oAuthConfig) {
this.oAuthConfig = oAuthConfig;
public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication

public SslAuthConfig() {
}
}

public void setPlainTextAuthConfig(PlainTextAuthConfig plainTextAuthConfig) {
this.plainTextAuthConfig = plainTextAuthConfig;
@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;

@Valid
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}

public PlainTextAuthConfig getPlainTextAuthConfig() {
return plainTextAuthConfig;
public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}

@AssertTrue(message = "Only one of SSL or SASL auth config must be specified")
public boolean hasSaslOrSslConfig() {
return Stream.of(sslAuthConfig, saslAuthConfig).filter(n -> n!=null).count() == 1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,50 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import jakarta.validation.Valid;

public class AwsConfig {
@JsonProperty("msk_arn")
@Size(min = 20, max = 2048, message = "mskArn length should be between 20 and 2048 characters")
private String awsMskArn;

public String getAwsMskArn() {
return awsMskArn;
public static class AwsMskConfig {
@Valid
@Size(min = 20, max = 2048, message = "msk_arn length should be between 20 and 2048 characters")
@JsonProperty("arn")
private String arn;

@JsonProperty("broker_connection_type")
private MskBrokerConnectionType brokerConnectionType;

public String getArn() {
return arn;
}

public MskBrokerConnectionType getBrokerConnectionType() {
return brokerConnectionType;
}
}

@JsonProperty("msk")
private AwsMskConfig awsMskConfig;

@Valid
@Size(min = 1, message = "Region cannot be empty string")
@JsonProperty("region")
private String region;

@Valid
@Size(min = 20, max = 2048, message = "sts_role_arn length should be between 20 and 2048 characters")
@JsonProperty("sts_role_arn")
private String stsRoleArn;

public AwsMskConfig getAwsMskConfig() {
return awsMskConfig;
}

public String getRegion() {
return region;
}

public String getStsRoleArn() {
return stsRoleArn;
}
}
Loading

0 comments on commit 69a0ca0

Please sign in to comment.