Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add BulkWriter buffer limit #1606

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@
<className>com/google/cloud/firestore/Firestore</className>
<method>com.google.cloud.firestore.BulkWriter bulkWriter(*)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/firestore/BulkWriterOptions</className>
<method>java.lang.Integer getMaxPendingOps()</method>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/firestore/UpdateBuilder</className>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -116,11 +117,11 @@ enum OperationType {

/**
* The default maximum number of pending operations that can be enqueued onto a BulkWriter
* instance. An operation is considered pending if BulkWriter has sent it via RPC and is awaiting
* the result. BulkWriter buffers additional writes after this many pending operations in order to
* avoiding going OOM.
* instance. An operation is considered in-flight if BulkWriter has sent it via RPC and is
* awaiting the result. BulkWriter buffers additional writes after this many in-flight operations
* in order to avoid going OOM.
*/
private static final int DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 500;
static final int DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS = 500;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why define it here, while it is only used in the "BulkWriterOptions" file


/**
* The default jitter to apply to the exponential backoff used in retries. For example, a factor
Expand Down Expand Up @@ -167,24 +168,22 @@ enum OperationType {
private final RateLimiter rateLimiter;

/**
* The number of pending operations enqueued on this BulkWriter instance. An operation is
* The number of in-flight operations enqueued on this BulkWriter instance. An operation is
* considered pending if BulkWriter has sent it via RPC and is awaiting the result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

considered pending if BulkWriter should this be changed to "in-flight" as well?

*/
@GuardedBy("lock")
private int pendingOpsCount = 0;
private AtomicInteger inFlightCount = new AtomicInteger();

/**
* An array containing buffered BulkWriter operations after the maximum number of pending
* operations has been enqueued.
*/
@GuardedBy("lock")
private final List<Runnable> bufferedOperations = new ArrayList<>();
private final BlockingQueue<Runnable> bufferedOperations;

/**
* The maximum number of pending operations that can be enqueued onto this BulkWriter instance.
* Once the this number of writes have been enqueued, subsequent writes are buffered.
* The maximum number of concurrent in-flight operations that can be sent by BulkWriter instance.
* Once this number of writes have been sent, subsequent writes are buffered.
*/
private int maxPendingOpCount = DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT;
private int maxInFlight;

/**
* The batch that is currently used to schedule operations. Once this batch reaches maximum
Expand Down Expand Up @@ -235,6 +234,13 @@ enum OperationType {
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
this.maxInFlight = options.getMaxInFlightOps();
if (options.getMaxPendingOps() == null) {
this.bufferedOperations = new LinkedBlockingQueue<>();
} else {
int maxBufferedOps = options.getMaxPendingOps() - options.getMaxInFlightOps();
this.bufferedOperations = new LinkedBlockingQueue<>(maxBufferedOps);
}

if (!options.getThrottlingEnabled()) {
this.rateLimiter =
Expand Down Expand Up @@ -577,6 +583,10 @@ public ApiFuture<WriteResult> update(
batch.update(documentReference, precondition, fieldPath, value, moreFieldsAndValues));
}

boolean isReady() {
return bufferedOperations.remainingCapacity() > 0;
}

/**
* Schedules the provided write operation and runs the user success callback when the write result
* is obtained.
Expand Down Expand Up @@ -617,39 +627,58 @@ private ApiFuture<WriteResult> executeWrite(
lastOperation,
aVoid -> silenceFuture(operation.getFuture()),
MoreExecutors.directExecutor());
}

// Schedule the operation if the BulkWriter has fewer than the maximum number of allowed
// pending operations, or add the operation to the buffer.
if (pendingOpsCount < maxPendingOpCount) {
pendingOpsCount++;
// Schedule the operation if the BulkWriter has fewer than the maximum number of allowed
// pending operations, or add the operation to the buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"allowed in-flight operations"

if (incrementInFlightCountIfLessThanMax()) {
synchronized (lock) {
sendOperationLocked(enqueueOperationOnBatchCallback, operation);
} else {
bufferedOperations.add(
}
} else {
try {
bufferedOperations.put(
() -> {
synchronized (lock) {
pendingOpsCount++;
sendOperationLocked(enqueueOperationOnBatchCallback, operation);
}
});
} catch (InterruptedException exception) {
operation.onException(new FirestoreException(exception));
Thread.currentThread().interrupt();
return ApiFutures.immediateFailedFuture(exception);
}

// If another operation completed during buffering, then we can process the next buffered
// operation now. This overcomes the small chance that an in-flight operation completes
// before another operation has been added to buffer.
if (incrementInFlightCountIfLessThanMax()) {
if (!processNextBufferedOperation()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks strange to do the same condition check twice to see if any operation has been completed during the last condition check.

inFlightCount.decrementAndGet();
}
}
}

ApiFuture<WriteResult> processedOperationFuture =
ApiFutures.transformAsync(
operation.getFuture(),
result -> {
pendingOpsCount--;
processBufferedOperations();
// pendingOpsCount remains the same if a buffered operation is sent.
if (!processNextBufferedOperation()) {
inFlightCount.decrementAndGet();
}
return ApiFutures.immediateFuture(result);
},
MoreExecutors.directExecutor());

return ApiFutures.catchingAsync(
return ApiFutures.catching(
processedOperationFuture,
ApiException.class,
e -> {
pendingOpsCount--;
processBufferedOperations();
// pendingOpsCount remains the same if a buffered operation is sent.
if (!processNextBufferedOperation()) {
inFlightCount.decrementAndGet();
}
throw e;
},
MoreExecutors.directExecutor());
Expand All @@ -659,11 +688,22 @@ private ApiFuture<WriteResult> executeWrite(
* Manages the pending operation counter and schedules the next BulkWriter operation if we're
* under the maximum limit.
*/
private void processBufferedOperations() {
if (pendingOpsCount < maxPendingOpCount && bufferedOperations.size() > 0) {
Runnable nextOp = bufferedOperations.remove(0);
nextOp.run();
}
private boolean processNextBufferedOperation() {
Runnable nextOp = bufferedOperations.poll();
if (nextOp == null) return false;
nextOp.run();
return true;
}

/**
* Atomically increments inFlightCount if less than `maxInFlight`
*
* @return boolean indicating whether increment occurred.
*/
private boolean incrementInFlightCountIfLessThanMax() {
int previousInFlightCount =
inFlightCount.getAndAccumulate(0, (v, x) -> v < maxInFlight ? v + 1 : v);
return previousInFlightCount < maxInFlight;
}

/**
Expand Down Expand Up @@ -945,8 +985,8 @@ int getBufferedOperationsCount() {
}

@VisibleForTesting
void setMaxPendingOpCount(int newMax) {
maxPendingOpCount = newMax;
void setMaxInFlight(int newMax) {
maxInFlight = newMax;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.firestore;

import static com.google.cloud.firestore.BulkWriter.DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS;

import com.google.auto.value.AutoValue;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -57,11 +59,23 @@ public abstract class BulkWriterOptions {
@Nullable
public abstract ScheduledExecutorService getExecutor();

/**
* Limit on the total number of mutations in-memory.
*
* @return The maximum number of operations that will be queued.
*/
@Nullable
public abstract Integer getMaxPendingOps();

abstract int getMaxInFlightOps();
Copy link
Contributor

@milaGGL milaGGL Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a comment here.
BTW, maybe expalin the relationship between pendingOps and InFlightOps


public static Builder builder() {
return new AutoValue_BulkWriterOptions.Builder()
.setMaxOpsPerSecond(null)
.setInitialOpsPerSecond(null)
.setThrottlingEnabled(true)
.setMaxInFlightOps(DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS)
.setMaxPendingOps(null)
.setExecutor(null);
}

Expand Down Expand Up @@ -121,13 +135,28 @@ public Builder setMaxOpsPerSecond(int maxOpsPerSecond) {
*/
public abstract Builder setExecutor(@Nullable ScheduledExecutorService executor);

/**
* Limit on the total number of mutations in-memory.
*
* @return The maximum number of operations that will be queued.
*/
public Builder setMaxPendingOps(int maxPending) {
return setMaxPendingOps(Integer.valueOf(maxPending));
}

abstract Builder setMaxPendingOps(@Nullable Integer maxPending);

abstract Builder setMaxInFlightOps(int maxInFlight);

public abstract BulkWriterOptions autoBuild();

@Nonnull
public BulkWriterOptions build() {
BulkWriterOptions options = autoBuild();
Double initialRate = options.getInitialOpsPerSecond();
Double maxRate = options.getMaxOpsPerSecond();
int maxInFlightOps = options.getMaxInFlightOps();
Integer maxPendingOps = options.getMaxPendingOps();

if (initialRate != null && initialRate < 1) {
throw FirestoreException.forInvalidArgument(
Expand All @@ -150,6 +179,19 @@ public BulkWriterOptions build() {
throw FirestoreException.forInvalidArgument(
"Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false.");
}

if (maxInFlightOps < 1) {
throw FirestoreException.forInvalidArgument(
"Value for argument 'maxInFlightOps' must be greater than 1, but was :"
+ maxInFlightOps);
}

if (maxPendingOps != null && maxInFlightOps > maxPendingOps) {
throw FirestoreException.forInvalidArgument(
"Value for argument 'maxPendingOps' must be greater than `maxInFlightOps`, but was :"
+ maxPendingOps);
}

return options;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,34 @@

/** A Firestore Service exception. */
public class FirestoreException extends BaseGrpcServiceException {
private Status status;
private final Status status;

FirestoreException(String reason, Status status) {
this(reason, status, null);
}

private FirestoreException(String reason, Status status, @Nullable Throwable cause) {
super(reason, cause, status.getCode().value(), false);

this.status = status;
}

FirestoreException(InterruptedException cause) {
super("Executing thread was interrupted", cause, Status.ABORTED.getCode().value(), true);
this.status = Status.ABORTED;
}

private FirestoreException(String reason, ApiException exception) {
super(
reason,
exception,
exception.getStatusCode().getCode().getHttpStatusCode(),
exception.isRetryable());
this.status = null;
}

private FirestoreException(IOException exception, boolean retryable) {
super(exception, retryable);
this.status = null;
}

/**
Expand Down
Loading
Loading