Skip to content

Commit

Permalink
Merge pull request #929: [proxima-direct-core] Allow committing trans…
Browse files Browse the repository at this point in the history
…action without updating
  • Loading branch information
je-ik authored Sep 12, 2024
2 parents 9f74486 + 3cf8295 commit 090d70d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,11 @@ private BiConsumer<StreamElement, Pair<Long, Object>> createTransactionUpdateCon
if (element.getAttributeDescriptor().equals(getStateDesc())) {
Optional<State> state = getStateDesc().valueOf(element);
if (state.isPresent()) {
getOrCreateCachedTransaction(element.getKey(), state.get()).touch(element.getStamp());
CachedTransaction openTransaction =
getOrCreateCachedTransaction(element.getKey(), state.get());
if (openTransaction != null) {
openTransaction.touch(element.getStamp());
}
}
}
delegate.accept(element, cached);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import cz.o2.proxima.core.transaction.State;
import cz.o2.proxima.core.transform.ElementWiseTransformation;
import cz.o2.proxima.core.transform.ElementWiseTransformation.Collector;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.core.util.Optionals;
import cz.o2.proxima.core.util.Pair;
import cz.o2.proxima.direct.core.CommitCallback;
Expand Down Expand Up @@ -71,16 +70,7 @@ public static TransactionalOnlineAttributeWriter global(DirectDataOperator direc
ClientTransactionManager manager = direct.getClientTransactionManager();
// write each output to the _transaction.commit, even if there is single output
return new TransactionalOnlineAttributeWriter(
direct, Optionals.get(direct.getWriter(manager.getCommitDesc()))) {

@Override
public Transaction begin() {
Transaction ret = super.begin();
// this should never throw TransactionRejectedException
ExceptionUtils.unchecked(ret::beginGlobal);
return ret;
}
};
direct, Optionals.get(direct.getWriter(manager.getCommitDesc())));
}

/** Interface for a transformation to get access to {@link Transaction}. */
Expand Down Expand Up @@ -231,6 +221,9 @@ public void update(List<KeyAttribute> addedInputs) {

public void commitWrite(List<StreamElement> outputs, CommitCallback callback) {
commitAttempted = true;
if (state == State.Flags.UNKNOWN) {
beginGlobal();
}
waitForInFlight()
.thenCompose(ign -> runTransforms(outputs))
// need to wait for any requests added during possible transforms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,43 @@ public void testTransactionCreateUpdateCommit() throws InterruptedException {
assertEquals(stamp, res.get().getStamp());
}

@Test(timeout = 10_000)
public void testTransactionCreateCommit() throws InterruptedException {
CachedView view = Optionals.get(direct.getCachedView(status));
view.assign(view.getPartitions());
OnlineAttributeWriter writer = Optionals.get(direct.getWriter(status));
assertTrue(writer.isTransactional());
long stamp = 123456789000L;
// we successfully open and commit the transaction
toReturn.add(Response.forRequest(anyRequest()).open(1L, stamp));
toReturn.add(Response.forRequest(anyRequest()).committed());
CountDownLatch latch = new CountDownLatch(1);
try (TransactionalOnlineAttributeWriter.Transaction t = writer.transactional().begin()) {
t.commitWrite(
Collections.singletonList(
StreamElement.upsert(
gateway,
status,
UUID.randomUUID().toString(),
"key",
status.getName(),
System.currentTimeMillis(),
new byte[] {1, 2, 3})),
(succ, exc) -> {
assertTrue(succ);
assertNull(exc);
latch.countDown();
});
}
latch.await();
Optional<KeyValue<byte[]>> res = view.get("key", status);
assertTrue(res.isPresent());
assertEquals("key", res.get().getKey());
assertTrue(res.get().hasSequentialId());
assertEquals(1L, res.get().getSequentialId());
assertEquals(stamp, res.get().getStamp());
}

@Test(timeout = 10_000)
public void testTransactionCreateUpdateCommitMultipleOutputs()
throws InterruptedException, TransactionRejectedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testTransactionReadWrite() throws InterruptedException {
String transactionId = response.getTransactionId();
assertFalse(transactionId.isEmpty());
long stamp = System.currentTimeMillis();
replicatedLatch = new CountDownLatch(1);
replicatedLatch = new CountDownLatch(2);
StatusBulk status =
ingestBulk(
transactionId,
Expand Down

0 comments on commit 090d70d

Please sign in to comment.