Skip to content

Commit

Permalink
Add support for fully async acknowledgments in source coordination (o…
Browse files Browse the repository at this point in the history
…pensearch-project#3384)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Sep 26, 2023
1 parent 3689b12 commit 6546901
Show file tree
Hide file tree
Showing 14 changed files with 213 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ public Long getLongOrDefault(final String attribute, final long defaultValue) {
throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute));
}

private <T> void checkObjectType(final String attribute, final Object object, final Class<T> type) {
if (object != null && !(type.isAssignableFrom(object.getClass()))){
private <T> void checkObjectType(final String attribute, final Object object, final Class<T> type) {if (object != null && !(type.isAssignableFrom(object.getClass()))){
throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public interface Event extends Serializable {
* Returns formatted parts of the input string replaced by their values in the event or the values from the result
* of a Data Prepper expression
* @param format input format
* @param expressionEvaluator - The expression evaluator that will support formatting from Data Prepper expressions
* @return returns a string with no formatted parts, returns null if no value is found
* @throws RuntimeException if the input string is not properly formatted
* @since 2.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public interface SourceCoordinator<T> {
* If the global state map is not needed, then it can be ignored. Updating the global state map will save it, so the next time the supplier function is run,
* it will contain the most recent state from the previous supplier function run.
* @return {@link SourcePartition} with the details about how to process this partition. Will repeatedly return the partition until
* {@link SourceCoordinator#completePartition(String)}
* or {@link SourceCoordinator#closePartition(String, Duration, int)} are called by the source,
* {@link SourceCoordinator#completePartition(String, Boolean)}
* or {@link SourceCoordinator#closePartition(String, Duration, int, Boolean)} are called by the source,
* or until the partition ownership times out.
* @since 2.2
*/
Expand All @@ -49,9 +49,10 @@ public interface SourceCoordinator<T> {
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException if the partition is not owned by this instance of SourceCoordinator
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition can not be completed
* @param partitionKey - The partition key that uniquely identifies the partition of work that was fully processed
* @param fromAcknowledgmentsCallback - Whether this method is being called from an acknowledgment callback or not
* @since 2.2
*/
void completePartition(final String partitionKey);
void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback);

/**
* Should be called by the source when it has processed all that it can up to this point in time for a given partition,
Expand All @@ -62,12 +63,13 @@ public interface SourceCoordinator<T> {
* @param reopenAfter - The duration from the current time to wait before this partition should be processed further at a later date
* @param maxClosedCount - The number of times to allow this partition to be closed. Will mark the partition as completed if the partition has been closed this many times or more
* in the past
* @param fromAcknowledgmentsCallback - Whether this method is being called from an acknowledgment callback or not
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException if the partition key could not be found in the distributed store
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException if the partition is not owned by this instance of SourceCoordinator
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition can not be closed
* @since 2.2
*/
void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount);
void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount, final Boolean fromAcknowledgmentsCallback);

/**
* Should be called by the source when it has completed some work for a partition, and needs to save its progress before continuing work on the partition.
Expand All @@ -94,4 +96,14 @@ public interface SourceCoordinator<T> {
* @since 2.2
*/
void giveUpPartitions();


/**
* Should be called by the source after when acknowledgments are enabled to keep ownership of the partition for acknowledgmentTimeout amount of time
* before another instance of Data Prepper can pick it up for processing. Allows the source to acquire another partition immediately for processing
* @param partitionKey - the partition to update for ack timeout
* @param ackowledgmentTimeout - the amount of time that this partition can be completed by the acknowledgment callback before another instance of Data Prepper
* can pick it up for processing
*/
void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
private static final Logger LOG = LoggerFactory.getLogger(LeaseBasedSourceCoordinator.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private static final String COMPLETE_ACTION = "complete";
private static final String CLOSE_ACTION = "close";
private static final String SAVE_STATE_ACTION = "saveState";

static final String PARTITION_CREATION_SUPPLIER_INVOCATION_COUNT = "partitionCreationSupplierInvocations";
static final String NO_PARTITIONS_ACQUIRED_COUNT = "noPartitionsAcquired";
static final String PARTITION_CREATED_COUNT = "partitionsCreatedCount";
Expand Down Expand Up @@ -121,9 +125,9 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
this.partitionsGivenUpCounter = pluginMetrics.counter(PARTITION_OWNERSHIP_GIVEN_UP_COUNT);
this.partitionNotFoundErrorCounter = pluginMetrics.counter(PARTITION_NOT_FOUND_ERROR_COUNT);
this.partitionNotOwnedErrorCounter = pluginMetrics.counter(PARTITION_NOT_OWNED_ERROR_COUNT);
this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "saveState");
this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "close");
this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "complete");
this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, SAVE_STATE_ACTION);
this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, CLOSE_ACTION);
this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, COMPLETE_ACTION);
}

@Override
Expand Down Expand Up @@ -202,10 +206,10 @@ private void createPartitions(final List<PartitionIdentifier> partitionIdentifie
}

@Override
public void completePartition(final String partitionKey) {
public void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "complete");
final SourcePartitionStoreItem itemToUpdate = getItemWithAction(partitionKey, COMPLETE_ACTION, fromAcknowledgmentsCallback);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwner(null);
Expand All @@ -220,17 +224,19 @@ public void completePartition(final String partitionKey) {
throw e;
}

partitionManager.removeActivePartition();
if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was completed by owner {}.", partitionKey, ownerId);
partitionsCompletedCounter.increment();
}

@Override
public void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount) {
public void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount, final Boolean fromAcknowledgmentsCallback) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "close");
final SourcePartitionStoreItem itemToUpdate = getItemWithAction(partitionKey, CLOSE_ACTION, fromAcknowledgmentsCallback);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwner(null);
Expand Down Expand Up @@ -262,7 +268,9 @@ public void closePartition(final String partitionKey, final Duration reopenAfter
partitionsClosedCounter.increment();
}

partitionManager.removeActivePartition();
if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was closed by owner {}. The resulting status of the partition is now {}", partitionKey, ownerId, itemToUpdate.getSourcePartitionStatus());
}
Expand All @@ -271,7 +279,7 @@ public void closePartition(final String partitionKey, final Duration reopenAfter
public <S extends T> void saveProgressStateForPartition(final String partitionKey, final S partitionProgressState) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "save state");
final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, SAVE_STATE_ACTION);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT));
Expand All @@ -290,6 +298,20 @@ public <S extends T> void saveProgressStateForPartition(final String partitionKe
saveProgressStateInvocationSuccessCounter.increment();
}

@Override
public void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "update for ack wait");
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ackowledgmentTimeout));

sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);

partitionManager.removeActivePartition();
}

@Override
public void giveUpPartitions() {

Expand Down Expand Up @@ -380,6 +402,10 @@ private SourcePartitionStoreItem validateAndGetSourcePartitionStoreItem(final St
);
}

return getSourcePartitionStoreItem(partitionKey, action);
}

private SourcePartitionStoreItem getSourcePartitionStoreItem(final String partitionKey, final String action) {
final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionKey);

if (optionalPartitionItem.isEmpty()) {
Expand Down Expand Up @@ -441,4 +467,10 @@ private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionS
LOG.warn("There was an error saving global state after creating partitions.", e);
}
}

private SourcePartitionStoreItem getItemWithAction(final String partitionKey, final String action, final Boolean fromAcknowledgmentsCallback) {
// The validation against activePartition in partition manager needs to be skipped when called from acknowledgments callback
// because otherwise it will fail the validation since it is actively working on a different partition when ack is received
return fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, action) : validateAndGetSourcePartitionStoreItem(partitionKey, action);
}
}
Loading

0 comments on commit 6546901

Please sign in to comment.