Skip to content

Commit

Permalink
Add bug fixes and improvements to DDB source (opensearch-project#3559)
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba authored Oct 31, 2023
1 parent 7323c9e commit c560e1f
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 125 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/dynamodb-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {

testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation testLibs.mockito.inline
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
Expand All @@ -37,30 +46,37 @@ public class DataFileLoader implements Runnable {
*/
private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000;

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;

private final String bucketName;

private final String key;

private final ExportRecordConverter recordConverter;

private final S3ObjectReader s3ObjectReader;
private final S3ObjectReader objectReader;

private final DataFileCheckpointer checkpointer;

// Start Line is the checkpoint
/**
* Start Line is the checkpoint
*/
private final int startLine;

private DataFileLoader(Builder builder) {
this.s3ObjectReader = builder.s3ObjectReader;
this.recordConverter = builder.recordConverter;
this.objectReader = builder.objectReader;
this.bucketName = builder.bucketName;
this.key = builder.key;
this.checkpointer = builder.checkpointer;
this.startLine = builder.startLine;

final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
recordConverter = new ExportRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics);
}

public static Builder builder() {
return new Builder();
public static Builder builder(final S3ObjectReader s3ObjectReader, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
return new Builder(s3ObjectReader, pluginMetrics, buffer);
}


Expand All @@ -69,9 +85,14 @@ public static Builder builder() {
*/
static class Builder {

private S3ObjectReader s3ObjectReader;
private final S3ObjectReader objectReader;

private final PluginMetrics pluginMetrics;

private final Buffer<Record<Event>> buffer;

private TableInfo tableInfo;

private ExportRecordConverter recordConverter;

private DataFileCheckpointer checkpointer;

Expand All @@ -81,13 +102,14 @@ static class Builder {

private int startLine;

public Builder s3ObjectReader(S3ObjectReader s3ObjectReader) {
this.s3ObjectReader = s3ObjectReader;
return this;
public Builder(final S3ObjectReader objectReader, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
this.objectReader = objectReader;
this.pluginMetrics = pluginMetrics;
this.buffer = buffer;
}

public Builder recordConverter(ExportRecordConverter recordConverter) {
this.recordConverter = recordConverter;
public Builder tableInfo(TableInfo tableInfo) {
this.tableInfo = tableInfo;
return this;
}

Expand Down Expand Up @@ -128,7 +150,9 @@ public void run() {
int lineCount = 0;
int lastLineProcessed = 0;

try (GZIPInputStream gzipInputStream = new GZIPInputStream(s3ObjectReader.readFile(bucketName, key))) {
try {
InputStream inputStream = objectReader.readFile(bucketName, key);
GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream));

String line;
Expand Down Expand Up @@ -170,11 +194,15 @@ public void run() {
}

lines.clear();

reader.close();
gzipInputStream.close();
inputStream.close();
LOG.info("Complete loading s3://{}/{}", bucketName, key);
} catch (Exception e) {
} catch (IOException e) {
checkpointer.checkpoint(lineCount);
String errorMessage = String.format("Loading of s3://{}/{} completed with Exception: {}", bucketName, key, e.getMessage());

String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %S", bucketName, key, e.getMessage());
throw new RuntimeException(errorMessage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,19 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Duration;

/**
* Factory class for DataFileLoader thread.
*/
public class DataFileLoaderFactory {
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;

private final EnhancedSourceCoordinator coordinator;

Expand All @@ -39,17 +33,14 @@ public DataFileLoaderFactory(EnhancedSourceCoordinator coordinator, S3Client s3C
}

public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableInfo tableInfo) {
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
ExportRecordConverter recordProcessor = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics);

DataFileCheckpointer checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition);

// Start a data loader thread.
DataFileLoader loader = DataFileLoader.builder()
.s3ObjectReader(objectReader)
DataFileLoader loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer)
.bucketName(dataFilePartition.getBucket())
.key(dataFilePartition.getKey())
.recordConverter(recordProcessor)
.tableInfo(tableInfo)
.checkpointer(checkpointer)
.startLine(dataFilePartition.getProgressState().get().getLoaded())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFa
this.coordinator = coordinator;
this.pluginMetrics = pluginMetrics;
this.loaderFactory = loaderFactory;


executor = Executors.newFixedThreadPool(MAX_JOB_COUNT);

this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT);
Expand All @@ -76,7 +74,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) {
String tableArn = getTableArn(exportArn);

TableInfo tableInfo = getTableInfo(tableArn);

Runnable loader = loaderFactory.createDataFileLoader(dataFilePartition, tableInfo);
CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor);
runLoader.whenComplete(completeDataLoader(dataFilePartition));
Expand Down Expand Up @@ -166,6 +164,17 @@ private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) {
};
}

/**
* There is a global state with sourcePartitionKey the export Arn,
* to track the number of files are processed. <br/>
* Each time, load of a data file is completed,
* The state must be updated.<br/>
* Note that the state may be updated since multiple threads are updating the same state.
* Retry is required.
*
* @param exportArn Export Arn.
* @param loaded Number records Loaded.
*/
private void updateState(String exportArn, int loaded) {

String streamArn = getStreamArn(exportArn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ExportSummary {
private long billedSizeBytes;

@JsonProperty("itemCount")
private int itemCount;
private long itemCount;

@JsonProperty("outputFormat")
private String outputFormat;
Expand Down Expand Up @@ -115,7 +115,7 @@ public long getBilledSizeBytes() {
return billedSizeBytes;
}

public int getItemCount() {
public long getItemCount() {
return itemCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public class LoadStatus {

private int loadedFiles;

private int totalRecords;
private long totalRecords;

private int loadedRecords;
private long loadedRecords;

public LoadStatus(int totalFiles, int loadedFiles, int totalRecords, int loadedRecords) {
public LoadStatus(int totalFiles, int loadedFiles, long totalRecords, long loadedRecords) {
this.totalFiles = totalFiles;
this.loadedFiles = loadedFiles;
this.totalRecords = totalRecords;
Expand All @@ -45,19 +45,19 @@ public void setLoadedFiles(int loadedFiles) {
this.loadedFiles = loadedFiles;
}

public int getTotalRecords() {
public long getTotalRecords() {
return totalRecords;
}

public void setTotalRecords(int totalRecords) {
this.totalRecords = totalRecords;
}

public int getLoadedRecords() {
public long getLoadedRecords() {
return loadedRecords;
}

public void setLoadedRecords(int loadedRecords) {
public void setLoadedRecords(long loadedRecords) {
this.loadedRecords = loadedRecords;
}

Expand All @@ -72,10 +72,10 @@ public Map<String, Object> toMap() {

public static LoadStatus fromMap(Map<String, Object> map) {
return new LoadStatus(
(int) map.get(TOTAL_FILES),
(int) map.get(LOADED_FILES),
(int) map.get(TOTAL_RECORDS),
(int) map.get(LOADED_RECORDS)
((Number) map.get(TOTAL_FILES)).intValue(),
((Number) map.get(LOADED_FILES)).intValue(),
((Number) map.get(TOTAL_RECORDS)).longValue(),
((Number) map.get(LOADED_RECORDS)).longValue()
);
}
}
Loading

0 comments on commit c560e1f

Please sign in to comment.