diff --git a/google-cloud-firestore/clirr-ignored-differences.xml b/google-cloud-firestore/clirr-ignored-differences.xml
index 7a4c83689..226ebd46e 100644
--- a/google-cloud-firestore/clirr-ignored-differences.xml
+++ b/google-cloud-firestore/clirr-ignored-differences.xml
@@ -209,6 +209,11 @@
com/google/cloud/firestore/Firestore
com.google.cloud.firestore.BulkWriter bulkWriter(*)
+
+ 7013
+ com/google/cloud/firestore/BulkWriterOptions
+ java.lang.Integer getMaxPendingOps()
+
7006
com/google/cloud/firestore/UpdateBuilder
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
index 4c71a3f18..fd0feafa2 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
@@ -29,15 +29,16 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
@@ -116,11 +117,11 @@ enum OperationType {
/**
* The default maximum number of pending operations that can be enqueued onto a BulkWriter
- * instance. An operation is considered pending if BulkWriter has sent it via RPC and is awaiting
- * the result. BulkWriter buffers additional writes after this many pending operations in order to
- * avoiding going OOM.
+ * instance. An operation is considered in-flight if BulkWriter has sent it via RPC and is
+ * awaiting the result. BulkWriter buffers additional writes after this many in-flight operations
+ * in order to avoid going OOM.
*/
- private static final int DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 500;
+ static final int DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS = 500;
/**
* The default jitter to apply to the exponential backoff used in retries. For example, a factor
@@ -167,24 +168,22 @@ enum OperationType {
private final RateLimiter rateLimiter;
/**
- * The number of pending operations enqueued on this BulkWriter instance. An operation is
+ * The number of in-flight operations enqueued on this BulkWriter instance. An operation is
* considered pending if BulkWriter has sent it via RPC and is awaiting the result.
*/
- @GuardedBy("lock")
- private int pendingOpsCount = 0;
+ private AtomicInteger inFlightCount = new AtomicInteger();
/**
* An array containing buffered BulkWriter operations after the maximum number of pending
* operations has been enqueued.
*/
- @GuardedBy("lock")
- private final List bufferedOperations = new ArrayList<>();
+ private final BlockingQueue bufferedOperations;
/**
- * The maximum number of pending operations that can be enqueued onto this BulkWriter instance.
- * Once the this number of writes have been enqueued, subsequent writes are buffered.
+ * The maximum number of concurrent in-flight operations that can be sent by BulkWriter instance.
+ * Once this number of writes have been sent, subsequent writes are buffered.
*/
- private int maxPendingOpCount = DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT;
+ private int maxInFlight;
/**
* The batch that is currently used to schedule operations. Once this batch reaches maximum
@@ -235,6 +234,13 @@ enum OperationType {
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
+ this.maxInFlight = options.getMaxInFlightOps();
+ if (options.getMaxPendingOps() == null) {
+ this.bufferedOperations = new LinkedBlockingQueue<>();
+ } else {
+ int maxBufferedOps = options.getMaxPendingOps() - options.getMaxInFlightOps();
+ this.bufferedOperations = new LinkedBlockingQueue<>(maxBufferedOps);
+ }
if (!options.getThrottlingEnabled()) {
this.rateLimiter =
@@ -577,6 +583,10 @@ public ApiFuture update(
batch.update(documentReference, precondition, fieldPath, value, moreFieldsAndValues));
}
+ boolean isReady() {
+ return bufferedOperations.remainingCapacity() > 0;
+ }
+
/**
* Schedules the provided write operation and runs the user success callback when the write result
* is obtained.
@@ -617,20 +627,35 @@ private ApiFuture executeWrite(
lastOperation,
aVoid -> silenceFuture(operation.getFuture()),
MoreExecutors.directExecutor());
+ }
- // Schedule the operation if the BulkWriter has fewer than the maximum number of allowed
- // pending operations, or add the operation to the buffer.
- if (pendingOpsCount < maxPendingOpCount) {
- pendingOpsCount++;
+ // Schedule the operation if the BulkWriter has fewer than the maximum number of allowed
+ // pending operations, or add the operation to the buffer.
+ if (incrementInFlightCountIfLessThanMax()) {
+ synchronized (lock) {
sendOperationLocked(enqueueOperationOnBatchCallback, operation);
- } else {
- bufferedOperations.add(
+ }
+ } else {
+ try {
+ bufferedOperations.put(
() -> {
synchronized (lock) {
- pendingOpsCount++;
sendOperationLocked(enqueueOperationOnBatchCallback, operation);
}
});
+ } catch (InterruptedException exception) {
+ operation.onException(new FirestoreException(exception));
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(exception);
+ }
+
+ // If another operation completed during buffering, then we can process the next buffered
+ // operation now. This overcomes the small chance that an in-flight operation completes
+ // before another operation has been added to buffer.
+ if (incrementInFlightCountIfLessThanMax()) {
+ if (!processNextBufferedOperation()) {
+ inFlightCount.decrementAndGet();
+ }
}
}
@@ -638,18 +663,22 @@ private ApiFuture executeWrite(
ApiFutures.transformAsync(
operation.getFuture(),
result -> {
- pendingOpsCount--;
- processBufferedOperations();
+ // pendingOpsCount remains the same if a buffered operation is sent.
+ if (!processNextBufferedOperation()) {
+ inFlightCount.decrementAndGet();
+ }
return ApiFutures.immediateFuture(result);
},
MoreExecutors.directExecutor());
- return ApiFutures.catchingAsync(
+ return ApiFutures.catching(
processedOperationFuture,
ApiException.class,
e -> {
- pendingOpsCount--;
- processBufferedOperations();
+ // pendingOpsCount remains the same if a buffered operation is sent.
+ if (!processNextBufferedOperation()) {
+ inFlightCount.decrementAndGet();
+ }
throw e;
},
MoreExecutors.directExecutor());
@@ -659,11 +688,22 @@ private ApiFuture executeWrite(
* Manages the pending operation counter and schedules the next BulkWriter operation if we're
* under the maximum limit.
*/
- private void processBufferedOperations() {
- if (pendingOpsCount < maxPendingOpCount && bufferedOperations.size() > 0) {
- Runnable nextOp = bufferedOperations.remove(0);
- nextOp.run();
- }
+ private boolean processNextBufferedOperation() {
+ Runnable nextOp = bufferedOperations.poll();
+ if (nextOp == null) return false;
+ nextOp.run();
+ return true;
+ }
+
+ /**
+ * Atomically increments inFlightCount if less than `maxInFlight`
+ *
+ * @return boolean indicating whether increment occurred.
+ */
+ private boolean incrementInFlightCountIfLessThanMax() {
+ int previousInFlightCount =
+ inFlightCount.getAndAccumulate(0, (v, x) -> v < maxInFlight ? v + 1 : v);
+ return previousInFlightCount < maxInFlight;
}
/**
@@ -945,8 +985,8 @@ int getBufferedOperationsCount() {
}
@VisibleForTesting
- void setMaxPendingOpCount(int newMax) {
- maxPendingOpCount = newMax;
+ void setMaxInFlight(int newMax) {
+ maxInFlight = newMax;
}
/**
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java
index 8cfe0fdbf..58e86d647 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java
@@ -16,6 +16,8 @@
package com.google.cloud.firestore;
+import static com.google.cloud.firestore.BulkWriter.DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS;
+
import com.google.auto.value.AutoValue;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
@@ -57,11 +59,23 @@ public abstract class BulkWriterOptions {
@Nullable
public abstract ScheduledExecutorService getExecutor();
+ /**
+ * Limit on the total number of mutations in-memory.
+ *
+ * @return The maximum number of operations that will be queued.
+ */
+ @Nullable
+ public abstract Integer getMaxPendingOps();
+
+ abstract int getMaxInFlightOps();
+
public static Builder builder() {
return new AutoValue_BulkWriterOptions.Builder()
.setMaxOpsPerSecond(null)
.setInitialOpsPerSecond(null)
.setThrottlingEnabled(true)
+ .setMaxInFlightOps(DEFAULT_MAXIMUM_IN_FLIGHT_OPERATIONS)
+ .setMaxPendingOps(null)
.setExecutor(null);
}
@@ -121,6 +135,19 @@ public Builder setMaxOpsPerSecond(int maxOpsPerSecond) {
*/
public abstract Builder setExecutor(@Nullable ScheduledExecutorService executor);
+ /**
+ * Limit on the total number of mutations in-memory.
+ *
+ * @return The maximum number of operations that will be queued.
+ */
+ public Builder setMaxPendingOps(int maxPending) {
+ return setMaxPendingOps(Integer.valueOf(maxPending));
+ }
+
+ abstract Builder setMaxPendingOps(@Nullable Integer maxPending);
+
+ abstract Builder setMaxInFlightOps(int maxInFlight);
+
public abstract BulkWriterOptions autoBuild();
@Nonnull
@@ -128,6 +155,8 @@ public BulkWriterOptions build() {
BulkWriterOptions options = autoBuild();
Double initialRate = options.getInitialOpsPerSecond();
Double maxRate = options.getMaxOpsPerSecond();
+ int maxInFlightOps = options.getMaxInFlightOps();
+ Integer maxPendingOps = options.getMaxPendingOps();
if (initialRate != null && initialRate < 1) {
throw FirestoreException.forInvalidArgument(
@@ -150,6 +179,19 @@ public BulkWriterOptions build() {
throw FirestoreException.forInvalidArgument(
"Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false.");
}
+
+ if (maxInFlightOps < 1) {
+ throw FirestoreException.forInvalidArgument(
+ "Value for argument 'maxInFlightOps' must be greater than 1, but was :"
+ + maxInFlightOps);
+ }
+
+ if (maxPendingOps != null && maxInFlightOps > maxPendingOps) {
+ throw FirestoreException.forInvalidArgument(
+ "Value for argument 'maxPendingOps' must be greater than `maxInFlightOps`, but was :"
+ + maxPendingOps);
+ }
+
return options;
}
}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java
index 0c1c8dab2..5ee667a39 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java
@@ -25,7 +25,7 @@
/** A Firestore Service exception. */
public class FirestoreException extends BaseGrpcServiceException {
- private Status status;
+ private final Status status;
FirestoreException(String reason, Status status) {
this(reason, status, null);
@@ -33,20 +33,26 @@ public class FirestoreException extends BaseGrpcServiceException {
private FirestoreException(String reason, Status status, @Nullable Throwable cause) {
super(reason, cause, status.getCode().value(), false);
-
this.status = status;
}
+ FirestoreException(InterruptedException cause) {
+ super("Executing thread was interrupted", cause, Status.ABORTED.getCode().value(), true);
+ this.status = Status.ABORTED;
+ }
+
private FirestoreException(String reason, ApiException exception) {
super(
reason,
exception,
exception.getStatusCode().getCode().getHttpStatusCode(),
exception.isRetryable());
+ this.status = null;
}
private FirestoreException(IOException exception, boolean retryable) {
super(exception, retryable);
+ this.status = null;
}
/**
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
index 90e29d50f..6621138f8 100644
--- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
@@ -24,6 +24,9 @@
import static com.google.cloud.firestore.LocalFirestoreHelper.update;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
@@ -45,6 +48,7 @@
import com.google.protobuf.GeneratedMessageV3;
import com.google.rpc.Code;
import io.grpc.Status;
+import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -56,6 +60,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Assert;
@@ -94,6 +99,7 @@ public class BulkWriterTest {
new IllegalStateException("Mock batchWrite failed in test"),
GrpcStatusCode.of(Status.Code.RESOURCE_EXHAUSTED),
true));
+ public static final int VERIFY_TIMEOUT_MS = 200;
@Rule public Timeout timeout = new Timeout(2, TimeUnit.SECONDS);
@@ -121,14 +127,20 @@ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit)
private BulkWriter bulkWriter;
private DocumentReference doc1;
private DocumentReference doc2;
+ private DocumentReference doc3;
+ private DocumentReference doc4;
private ScheduledExecutorService timeoutExecutor;
public static ApiFuture successResponse(int updateTimeSeconds) {
+ return ApiFutures.immediateFuture(success(updateTimeSeconds));
+ }
+
+ private static BatchWriteResponse success(int updateTimeSeconds) {
BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder();
response.addWriteResultsBuilder().getUpdateTimeBuilder().setSeconds(updateTimeSeconds).build();
response.addStatusBuilder().build();
- return ApiFutures.immediateFuture(response.build());
+ return response.build();
}
public static ApiFuture failedResponse(int code) {
@@ -171,6 +183,8 @@ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit)
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
doc1 = firestoreMock.document("coll/doc1");
doc2 = firestoreMock.document("coll/doc2");
+ doc3 = firestoreMock.document("coll/doc3");
+ doc4 = firestoreMock.document("coll/doc4");
}
@After
@@ -459,7 +473,7 @@ public void buffersSubsequentOpsAfterReachingMaxPendingOpCount() throws Exceptio
};
responseStubber.initializeStub(batchWriteCapture, firestoreMock);
- bulkWriter.setMaxPendingOpCount(3);
+ bulkWriter.setMaxInFlight(3);
bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.set(firestoreMock.document("coll/doc3"), LocalFirestoreHelper.SINGLE_FIELD_MAP);
@@ -1401,4 +1415,117 @@ public void optionsInitialAndMaxRatesAreProperlySet() throws Exception {
assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), Integer.MAX_VALUE);
assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), Integer.MAX_VALUE);
}
+
+ @Test
+ public void blocksWriteWhenBufferIsFull() throws Exception {
+ BulkWriter bulkWriter =
+ firestoreMock.bulkWriter(
+ BulkWriterOptions.builder()
+ .setMaxInFlightOps(1)
+ .setMaxPendingOps(2)
+ .setExecutor(timeoutExecutor)
+ .build());
+
+ SettableApiFuture response1 = SettableApiFuture.create();
+ SettableApiFuture response2 = SettableApiFuture.create();
+ SettableApiFuture response3 = SettableApiFuture.create();
+ SettableApiFuture response4 = SettableApiFuture.create();
+
+ Mockito.doReturn(response1, response2, response3, response4)
+ .when(firestoreMock)
+ .sendRequest(any(), any());
+
+ bulkWriter.setMaxBatchSize(1);
+
+ // First call sent as part of batch immediately.
+ ApiFuture result1 = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
+
+ // The buffer is not full, so bulk writer is ready to receive more writes.
+ assertTrue(bulkWriter.isReady());
+
+ // Second call is buffered.
+ ApiFuture result2 = bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
+
+ // The buffer is full, so bulk writer is not ready to write.
+ assertFalse(bulkWriter.isReady());
+
+ // Third call is blocked, fourth hasn't been attempted.
+ AtomicReference> threadResult3 = new AtomicReference<>();
+ AtomicReference> threadResult4 = new AtomicReference<>();
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+ threadResult3.set(bulkWriter.set(doc3, LocalFirestoreHelper.SINGLE_FIELD_MAP));
+ threadResult4.set(bulkWriter.set(doc4, LocalFirestoreHelper.SINGLE_FIELD_MAP));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+
+ // Only one batch should have been sent, since everything else is buffered or blocked.
+ verify(firestoreMock, timeout(VERIFY_TIMEOUT_MS).times(1)).sendRequest(any(), any());
+
+ // Expect thread to be waiting because blocked by adding write to buffer.
+ assertEquals(State.WAITING, threadStateAfterRunning(thread));
+
+ // Thread should not have received `ApiFuture` yet.
+ assertNull(threadResult3.get());
+ assertNull(threadResult4.get());
+
+ // First call should not have returned a result yet.
+ assertFalse(result1.isDone());
+ assertFalse(result2.isDone());
+
+ // Complete first call, and verify calls 2 and 3 make progress.
+ response1.set(success(1));
+ assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(1, 0)), result1.get());
+
+ // Thread should be unblocked, run set method for doc3, then blocked again.
+ assertEquals(State.WAITING, threadStateAfterRunning(thread));
+ ApiFuture result3 = threadResult3.get();
+ assertNotNull(result3);
+ assertNull(threadResult4.get());
+
+ // Now two batches should have been sent.
+ verify(firestoreMock, timeout(VERIFY_TIMEOUT_MS).times(2)).sendRequest(any(), any());
+ assertFalse(result2.isDone());
+ assertFalse(result3.isDone());
+
+ // Buffer should still be full
+ assertFalse(bulkWriter.isReady());
+
+ // Complete second call, and verify third call makes progress.
+ response2.set(success(2));
+ assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(2, 0)), result2.get());
+
+ // Thread should be unblocked, run set method for doc4, then terminate.
+ assertEquals(State.TERMINATED, threadStateAfterRunning(thread));
+ ApiFuture result4 = threadResult4.get();
+ assertNotNull(result4);
+
+ // Now three batches should have been sent.
+ verify(firestoreMock, timeout(VERIFY_TIMEOUT_MS).times(3)).sendRequest(any(), any());
+ assertFalse(result3.isDone());
+ assertFalse(result4.isDone());
+
+ // Complete third call, and verify response.
+ response3.set(success(3));
+ assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(3, 0)), result3.get());
+ assertFalse(result4.isDone());
+
+ // Complete fourth call, and verify response.
+ response4.set(success(4));
+ assertEquals(new WriteResult(Timestamp.ofTimeSecondsAndNanos(4, 0)), result4.get());
+ }
+
+ private static State threadStateAfterRunning(Thread thread) throws InterruptedException {
+ State state = thread.getState();
+ while (state == State.RUNNABLE) {
+ Thread.sleep(1);
+ state = thread.getState();
+ }
+ return state;
+ }
}