From a793d85896f3362341376b354e2bd3d96d2adb64 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Thu, 12 Sep 2024 09:40:46 +0200 Subject: [PATCH 1/2] [proxima-direct-core] Allow committing transaction without updating --- .../TransactionResourceManager.java | 6 ++- .../TransactionalOnlineAttributeWriter.java | 15 ++------ ...ransactionalOnlineAttributeWriterTest.java | 37 +++++++++++++++++++ 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java index 8921ec149..b09674cb2 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java @@ -552,7 +552,11 @@ private BiConsumer> createTransactionUpdateCon if (element.getAttributeDescriptor().equals(getStateDesc())) { Optional state = getStateDesc().valueOf(element); if (state.isPresent()) { - getOrCreateCachedTransaction(element.getKey(), state.get()).touch(element.getStamp()); + CachedTransaction openTransaction = + getOrCreateCachedTransaction(element.getKey(), state.get()); + if (openTransaction != null) { + openTransaction.touch(element.getStamp()); + } } } delegate.accept(element, cached); diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java index fd8538609..35d1e80b9 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java @@ -30,7 +30,6 @@ import cz.o2.proxima.core.transaction.State; import cz.o2.proxima.core.transform.ElementWiseTransformation; import cz.o2.proxima.core.transform.ElementWiseTransformation.Collector; -import cz.o2.proxima.core.util.ExceptionUtils; import cz.o2.proxima.core.util.Optionals; import cz.o2.proxima.core.util.Pair; import cz.o2.proxima.direct.core.CommitCallback; @@ -71,16 +70,7 @@ public static TransactionalOnlineAttributeWriter global(DirectDataOperator direc ClientTransactionManager manager = direct.getClientTransactionManager(); // write each output to the _transaction.commit, even if there is single output return new TransactionalOnlineAttributeWriter( - direct, Optionals.get(direct.getWriter(manager.getCommitDesc()))) { - - @Override - public Transaction begin() { - Transaction ret = super.begin(); - // this should never throw TransactionRejectedException - ExceptionUtils.unchecked(ret::beginGlobal); - return ret; - } - }; + direct, Optionals.get(direct.getWriter(manager.getCommitDesc()))); } /** Interface for a transformation to get access to {@link Transaction}. */ @@ -231,6 +221,9 @@ public void update(List addedInputs) { public void commitWrite(List outputs, CommitCallback callback) { commitAttempted = true; + if (state == State.Flags.UNKNOWN) { + beginGlobal(); + } waitForInFlight() .thenCompose(ign -> runTransforms(outputs)) // need to wait for any requests added during possible transforms diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java index 9ec8ef8c8..2c557c3dd 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java @@ -182,6 +182,43 @@ public void testTransactionCreateUpdateCommit() throws InterruptedException { assertEquals(stamp, res.get().getStamp()); } + @Test(timeout = 10_000) + public void testTransactionCreateCommit() throws InterruptedException { + CachedView view = Optionals.get(direct.getCachedView(status)); + view.assign(view.getPartitions()); + OnlineAttributeWriter writer = Optionals.get(direct.getWriter(status)); + assertTrue(writer.isTransactional()); + long stamp = 123456789000L; + // we successfully open and commit the transaction + toReturn.add(Response.forRequest(anyRequest()).open(1L, stamp)); + toReturn.add(Response.forRequest(anyRequest()).committed()); + CountDownLatch latch = new CountDownLatch(1); + try (TransactionalOnlineAttributeWriter.Transaction t = writer.transactional().begin()) { + t.commitWrite( + Collections.singletonList( + StreamElement.upsert( + gateway, + status, + UUID.randomUUID().toString(), + "key", + status.getName(), + System.currentTimeMillis(), + new byte[] {1, 2, 3})), + (succ, exc) -> { + assertTrue(succ); + assertNull(exc); + latch.countDown(); + }); + } + latch.await(); + Optional> res = view.get("key", status); + assertTrue(res.isPresent()); + assertEquals("key", res.get().getKey()); + assertTrue(res.get().hasSequentialId()); + assertEquals(1L, res.get().getSequentialId()); + assertEquals(stamp, res.get().getStamp()); + } + @Test(timeout = 10_000) public void testTransactionCreateUpdateCommitMultipleOutputs() throws InterruptedException, TransactionRejectedException { From 3cf82953b81fd5a8ee98414c601374ce9c93547f Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Thu, 12 Sep 2024 10:48:00 +0200 Subject: [PATCH 2/2] [proxima-direct-ingest-server] Fix flaky test --- .../o2/proxima/direct/server/transaction/TransactionsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java b/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java index 47f078439..297ed667f 100644 --- a/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java +++ b/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java @@ -118,7 +118,7 @@ public void testTransactionReadWrite() throws InterruptedException { String transactionId = response.getTransactionId(); assertFalse(transactionId.isEmpty()); long stamp = System.currentTimeMillis(); - replicatedLatch = new CountDownLatch(1); + replicatedLatch = new CountDownLatch(2); StatusBulk status = ingestBulk( transactionId,