Skip to content

Commit

Permalink
Updated Kafka security configuration (#2994)
Browse files Browse the repository at this point in the history
* Add Kafka Security Configurations

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified kafka security config. Added new fields to AwsConfig

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified AwsConfig to have msk option that can take multiple options

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
Signed-off-by: George Chen <[email protected]>
  • Loading branch information
2 people authored and chenqi0805 committed Jul 19, 2023
1 parent 5ffeee2 commit 3fc21ca
Show file tree
Hide file tree
Showing 17 changed files with 423 additions and 57 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation 'io.confluent:kafka-avro-serializer:7.3.3'
implementation 'io.confluent:kafka-schema-registry-client:7.3.3'
implementation 'io.confluent:kafka-schema-registry:7.3.3:tests'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.6'
testImplementation 'org.mockito:mockito-inline:4.1.0'
testImplementation 'org.yaml:snakeyaml:2.0'
testImplementation testLibs.spring.test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,73 @@

package org.opensearch.dataprepper.plugins.kafka.configuration;


import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.Valid;

import java.util.stream.Stream;

/**
* * A helper class that helps to read auth related configuration values from
* A helper class that helps to read auth related configuration values from
* pipelines.yaml
*/
public class AuthConfig {
@JsonProperty("sasl_plaintext")
private PlainTextAuthConfig plainTextAuthConfig;

@JsonProperty("sasl_oauth")
private OAuthConfig oAuthConfig;
public static class SaslAuthConfig {
@JsonProperty("plaintext")
private PlainTextAuthConfig plainTextAuthConfig;

@JsonProperty("oauth")
private OAuthConfig oAuthConfig;

@JsonProperty("aws_iam")
private AwsIamAuthConfig awsIamAuthConfig;

public AwsIamAuthConfig getAwsIamAuthConfig() {
return awsIamAuthConfig;
}

public PlainTextAuthConfig getPlainTextAuthConfig() {
return plainTextAuthConfig;
}

public OAuthConfig getOAuthConfig() {
return oAuthConfig;
}

@AssertTrue(message = "Only one of AwsIam or oAuth or PlainText auth config must be specified")
public boolean hasOnlyOneConfig() {
return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n!=null).count() == 1;
}

public OAuthConfig getoAuthConfig() {
return oAuthConfig;
}

public void setoAuthConfig(OAuthConfig oAuthConfig) {
this.oAuthConfig = oAuthConfig;
public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication

public SslAuthConfig() {
}
}

public void setPlainTextAuthConfig(PlainTextAuthConfig plainTextAuthConfig) {
this.plainTextAuthConfig = plainTextAuthConfig;
@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;

@Valid
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}

public PlainTextAuthConfig getPlainTextAuthConfig() {
return plainTextAuthConfig;
public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}

@AssertTrue(message = "Only one of SSL or SASL auth config must be specified")
public boolean hasSaslOrSslConfig() {
return Stream.of(sslAuthConfig, saslAuthConfig).filter(n -> n!=null).count() == 1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,50 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import jakarta.validation.Valid;

public class AwsConfig {
@JsonProperty("msk_arn")
@Size(min = 20, max = 2048, message = "mskArn length should be between 20 and 2048 characters")
private String awsMskArn;

public String getAwsMskArn() {
return awsMskArn;
public static class AwsMskConfig {
@Valid
@Size(min = 20, max = 2048, message = "msk_arn length should be between 20 and 2048 characters")
@JsonProperty("arn")
private String arn;

@JsonProperty("broker_connection_type")
private MskBrokerConnectionType brokerConnectionType;

public String getArn() {
return arn;
}

public MskBrokerConnectionType getBrokerConnectionType() {
return brokerConnectionType;
}
}

@JsonProperty("msk")
private AwsMskConfig awsMskConfig;

@Valid
@Size(min = 1, message = "Region cannot be empty string")
@JsonProperty("region")
private String region;

@Valid
@Size(min = 20, max = 2048, message = "sts_role_arn length should be between 20 and 2048 characters")
@JsonProperty("sts_role_arn")
private String stsRoleArn;

public AwsMskConfig getAwsMskConfig() {
return awsMskConfig;
}

public String getRegion() {
return region;
}

public String getStsRoleArn() {
return stsRoleArn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;

public enum AwsIamAuthConfig {
ROLE("role"),
DEFAULT("default");
//TODO add "PROFILE" option

private static final Map<String, AwsIamAuthConfig> OPTIONS_MAP = Arrays.stream(AwsIamAuthConfig.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private final String option;

AwsIamAuthConfig(final String option) {
this.option = option;
}

@JsonCreator
static AwsIamAuthConfig fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;

public enum EncryptionType {
SSL("ssl"),
PLAINTEXT("plaintext");

private static final Map<String, EncryptionType> OPTIONS_MAP = Arrays.stream(EncryptionType.values())
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

private final String type;

EncryptionType(final String type) {
this.type = type;
}

@JsonCreator
static EncryptionType fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ public class KafkaSourceConfig {
@Valid
private SchemaConfig schemaConfig;

@Valid
@JsonProperty("authentication")
private AuthConfig authConfig;

@JsonProperty("encryption")
private EncryptionType encryptionType = EncryptionType.SSL;

@JsonProperty("aws")
@Valid
private AwsConfig awsConfig;

@JsonProperty("acknowledgments")
Expand Down Expand Up @@ -83,6 +88,14 @@ public AuthConfig getAuthConfig() {
return authConfig;
}

public EncryptionType getEncryptionType() {
return encryptionType;
}

public AwsConfig getAwsConfig() {
return awsConfig;
}

public void setAuthConfig(AuthConfig authConfig) {
this.authConfig = authConfig;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;

public enum MskBrokerConnectionType {
PUBLIC("public"),
SINGLE_VPC("single_vpc"),
MULTI_VPC("multi_vpc");

private static final Map<String, MskBrokerConnectionType> OPTIONS_MAP = Arrays.stream(MskBrokerConnectionType.values())
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

private final String type;

MskBrokerConnectionType(final String type) {
this.type = type;
}

@JsonCreator
static MskBrokerConnectionType fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}
}
Loading

0 comments on commit 3fc21ca

Please sign in to comment.