Skip to content

Commit

Permalink
Fix NPE on s3 source stopping without sqs, stop s3 scan worker thread…
Browse files Browse the repository at this point in the history
… on stopping of the s3 source (#3178)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Aug 16, 2023
1 parent 91b9eb6 commit 6949780
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public synchronized void shutdown() {
stopRequested.set(true);
} catch (Exception ex) {
LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " +
"proceeding with termination of process workers", name);
"proceeding with termination of process workers", name, ex);
}

shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis(), "processor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public void start() {
scanObjectWorkerThread.start();
}

public void stop() {
scanObjectWorkerThread.interrupt();
}

/**
* This Method Used to fetch the scan options details from {@link S3SourceConfig} amd build the
* all the s3 scan buckets information in list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,13 @@ public void start(Buffer<Record<Event>> buffer) {

@Override
public void stop() {
sqsService.stop();
if (Objects.nonNull(sourceCoordinator)) {

if (Objects.nonNull(sqsService)) {
sqsService.stop();
}

if (Objects.nonNull(s3ScanService) && Objects.nonNull(sourceCoordinator)) {
s3ScanService.stop();
sourceCoordinator.giveUpPartitions();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class ScanObjectWorker implements Runnable{
private final AcknowledgementSetManager acknowledgementSetManager;

// Should there be a duration or time that is configured in the source to stop processing? Otherwise will only stop when data prepper is stopped
private final boolean shouldStopProcessing = false;
private boolean shouldStopProcessing = false;
private final boolean deleteS3ObjectsOnRead;
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
private final PluginMetrics pluginMetrics;
Expand Down Expand Up @@ -109,6 +109,7 @@ public void run() {
Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS);
} catch (InterruptedException ex) {
LOG.error("S3 Scan worker thread interrupted while backing off.", ex);
return;
}
}

Expand All @@ -129,7 +130,7 @@ private void startProcessingObject(final int waitTimeMillis) {
try {
Thread.sleep(waitTimeMillis);
} catch (InterruptedException e) {
e.printStackTrace();
shouldStopProcessing = true;
}
return;
}
Expand Down

0 comments on commit 6949780

Please sign in to comment.