Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused configurations from the Avro and Parquet codecs #3205

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ public AvroOutputCodec(final AvroOutputCodecConfig config) {

if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if (config.getFileLocation() != null) {
schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
} else if (config.getSchemaRegistryUrl() != null) {
schema = parseSchema(AvroSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl()));
} else if (checkS3SchemaValidity()) {
schema = AvroSchemaParserFromS3.parseSchema(config);
}
}

Expand Down Expand Up @@ -118,8 +112,4 @@ Schema parseSchema(final String schemaString) {
throw new RuntimeException("There is an error in the schema: " + e.getMessage());
}
}

private boolean checkS3SchemaValidity() {
return config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,19 @@
package org.opensearch.dataprepper.plugins.codec.avro;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Size;

/**
* Configuration class for {@link AvroOutputCodec}.
*/
public class AvroOutputCodecConfig {

@JsonProperty("schema")
private String schema;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("schema_file_location")
private String fileLocation;

@Valid
@Size(max = 0, message = "Schema from schema registry is not supported.")
@JsonProperty("schema_registry_url")
private String schemaRegistryUrl;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("region")
private String region;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("bucket_name")
private String bucketName;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("file_key")
private String fileKey;

public String getSchema() {
return schema;
}

public void setSchema(String schema) {
this.schema = schema;
}

public String getFileLocation() {
return fileLocation;
}

public String getSchemaRegistryUrl() {
return schemaRegistryUrl;
}

public String getRegion() {
return region;
}

public String getBucketName() {
return bucketName;
}

public String getFileKey() {
return fileKey;
}


}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ void verify_flushed_records_into_s3_bucket_Parquet() throws IOException {
private void configureParquetCodec() {
parquetOutputCodecConfig = new ParquetOutputCodecConfig();
parquetOutputCodecConfig.setSchema(parseSchema().toString());
parquetOutputCodecConfig.setPathPrefix(PATH_PREFIX);
codec = new ParquetOutputCodec(parquetOutputCodecConfig);
keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,23 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.plugins.fs.LocalInputFile;
import org.opensearch.dataprepper.plugins.s3keyindex.S3ObjectIndexUtility;
import org.opensearch.dataprepper.plugins.sink.s3.S3OutputCodecContext;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;

@DataPrepperPlugin(name = "parquet", pluginType = OutputCodec.class, pluginConfigurationType = ParquetOutputCodecConfig.class)
public class ParquetOutputCodec implements OutputCodec {
private static final String PARQUET = "parquet";
private final ParquetOutputCodecConfig config;
private static Schema schema;
private final AvroEventConverter avroEventConverter;
private final AvroAutoSchemaGenerator avroAutoSchemaGenerator;
private ParquetWriter<GenericRecord> writer;
private OutputCodecContext codecContext;
private static final String PARQUET = "parquet";

private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}";
private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(TIME_PATTERN_REGULAR_EXPRESSION);
private String key;


@DataPrepperPluginConstructor
Expand Down Expand Up @@ -79,16 +73,9 @@ public boolean isCompressionInternal() {
void buildSchemaAndKey(final Event event) throws IOException {
if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if (config.getFileLocation() != null) {
schema = ParquetSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
} else if (config.getSchemaRegistryUrl() != null) {
schema = parseSchema(ParquetSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl()));
} else if (checkS3SchemaValidity()) {
schema = ParquetSchemaParserFromS3.parseSchema(config);
} else {
schema = buildInlineSchemaFromEvent(event);
}
key = generateKey();
}

public Schema buildInlineSchemaFromEvent(final Event event) throws IOException {
Expand Down Expand Up @@ -141,42 +128,4 @@ public String getExtension() {
static Schema parseSchema(final String schemaString) {
return new Schema.Parser().parse(schemaString);
}

/**
* Generate the s3 object path prefix and object file name.
*
* @return object key path.
*/
protected String generateKey() {
final String pathPrefix = buildObjectPath(config.getPathPrefix());
final String namePattern = buildObjectFileName(config.getNamePattern());
return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern;
}

private static String buildObjectPath(final String pathPrefix) {
final StringBuilder s3ObjectPath = new StringBuilder();
if (pathPrefix != null && !pathPrefix.isEmpty()) {
String[] pathPrefixList = pathPrefix.split("\\/");
for (final String prefixPath : pathPrefixList) {
if (SIMPLE_DURATION_PATTERN.matcher(prefixPath).find()) {
s3ObjectPath.append(S3ObjectIndexUtility.getObjectPathPrefix(prefixPath)).append("/");
} else {
s3ObjectPath.append(prefixPath).append("/");
}
}
}
return s3ObjectPath.toString();
}

private String buildObjectFileName(final String configNamePattern) {
return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern) + "." + getExtension();
}

boolean checkS3SchemaValidity() {
if (config.getSchemaBucket() != null && config.getFileKey() != null && config.getSchemaRegion() != null) {
return true;
} else {
return false;
}
}
}
Loading
Loading