Skip to content

Commit

Permalink
Merge pull request #916: [proxima-direct-transaction-manager] Add tim…
Browse files Browse the repository at this point in the history
…eout for the server to terminate after error
  • Loading branch information
je-ik authored Jun 7, 2024
2 parents f273745 + 018ee21 commit 298c1ce
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ interface ServerTransactionConfig extends TransactionConfig {
InitialSequenceIdPolicy getInitialSeqIdPolicy();

TransactionMonitoringPolicy getTransactionMonitoringPolicy();

int getServerTerminationTimeoutSeconds();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public InitialSequenceIdPolicy getInitialSeqIdPolicy() {
public TransactionMonitoringPolicy getTransactionMonitoringPolicy() {
return transactionMonitoringPolicy;
}

@Override
public int getServerTerminationTimeoutSeconds() {
return serverTerminationTimeoutSeconds;
}
}

private class CachedWriters implements AutoCloseable {
Expand Down Expand Up @@ -374,6 +379,9 @@ public void withHandle(ObserveHandle handle) {
@Getter(AccessLevel.PACKAGE)
private final TransactionMonitoringPolicy transactionMonitoringPolicy;

@Getter(AccessLevel.PACKAGE)
private final int serverTerminationTimeoutSeconds;

@VisibleForTesting
public TransactionResourceManager(DirectDataOperator direct, Map<String, Object> cfg) {
this.direct = direct;
Expand All @@ -386,6 +394,7 @@ public TransactionResourceManager(DirectDataOperator direct, Map<String, Object>
this.cleanupIntervalMs = getCleanupInterval(cfg);
this.initialSequenceIdPolicy = getInitialSequenceIdPolicy(cfg);
this.transactionMonitoringPolicy = getTransactionMonitoringPolicy(cfg);
this.serverTerminationTimeoutSeconds = getServerTerminationTimeout(cfg);

log.info(
"Created {} with transaction timeout {} ms",
Expand Down Expand Up @@ -428,6 +437,13 @@ private static TransactionMonitoringPolicy getTransactionMonitoringPolicy(
.orElse(TransactionMonitoringPolicy.nop());
}

private static int getServerTerminationTimeout(Map<String, Object> cfg) {
return Optional.ofNullable(cfg.get("server-termination-timeout"))
.map(Object::toString)
.map(Integer::valueOf)
.orElse(2);
}

@Override
public void close() {
openTransactionMap.forEach((k, v) -> v.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import cz.o2.proxima.typesafe.config.Config;
import cz.o2.proxima.typesafe.config.ConfigFactory;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -104,11 +105,30 @@ private TransactionLogObserverFactory getObserverFactory(Config conf) {
return new TransactionLogObserverFactory.WithOnErrorHandler(
error -> {
log.error("Error processing transactions. Bailing out for safety.", error);
stop();
System.exit(1);
asyncTerminate(this::stop, () -> System.exit(1));
});
}

@VisibleForTesting
void asyncTerminate(Runnable terminateWith, Runnable runAfter) {
CountDownLatch latch = new CountDownLatch(1);
Thread asyncThread =
new Thread(
() -> {
try {
terminateWith.run();
} catch (Throwable err) {
log.warn("Error during terminating", err);
}
latch.countDown();
});
asyncThread.start();
int timeout = manager.getCfg().getServerTerminationTimeoutSeconds();
ExceptionUtils.ignoringInterrupted(() -> latch.await(timeout, TimeUnit.SECONDS));
asyncThread.interrupt();
runAfter.run();
}

public void run() {
TransactionLogObserver observer = newTransactionLogObserver();
observer.run("transaction-manager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import cz.o2.proxima.core.repository.ConfigRepository;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.typesafe.config.Config;
import cz.o2.proxima.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;

/** Test suite for {@link TransactionManagerServer}. */
Expand Down Expand Up @@ -51,4 +54,23 @@ public void testServerRunTearDown() {
server.stop();
assertTrue(server.isStopped());
}

@Test(timeout = 10000)
public void testServerAsyncTerminate() {
AtomicBoolean terminated = new AtomicBoolean();
server.asyncTerminate(() -> {}, () -> terminated.set(true));
assertTrue(terminated.get());
terminated.set(false);
server.asyncTerminate(
() -> ExceptionUtils.ignoringInterrupted(() -> TimeUnit.SECONDS.sleep(100)),
() -> terminated.set(true));
assertTrue(terminated.get());
terminated.set(false);
server.asyncTerminate(
() -> {
throw new RuntimeException("Error");
},
() -> terminated.set(true));
assertTrue(terminated.get());
}
}

0 comments on commit 298c1ce

Please sign in to comment.