Skip to content

Commit

Permalink
Moving query recompute out of write lock
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Jun 3, 2024
1 parent b340577 commit ae62259
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;
Expand Down Expand Up @@ -97,6 +101,8 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
}
}

Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> completableFutureMap = new ConcurrentHashMap<>();


static class SegmentedLock {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -254,13 +260,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try {
writeLock(key);
value = onHeapCache.computeIfAbsent(key, loader);
} finally {
unlockWriteLock(key);
}
V value = compute(key, loader);
// Handle stats
if (loader.isLoaded()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
Expand Down Expand Up @@ -289,6 +289,64 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
// A future that returns a pair of key/value.
CompletableFuture<Tuple<ICacheKey<K>, V>> completableFuture = new CompletableFuture<>();
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, completableFuture);
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, V> handler = (pair, ex) -> {
V value = null;
if (pair != null) {
try {
writeLock(key);
onHeapCache.put(pair.v1(), pair.v2());
} finally {
unlockWriteLock(key);
}
value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should
// be safe to assume if we got no exception and reached here.
}
completableFutureMap.remove(key); // Remove key from map as not needed anymore.
return value;
};
CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V value;
try {
value = loader.load(key);
} catch (Exception ex) {
future.completeExceptionally(ex);
throw new ExecutionException(ex);
}
if (value == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Tuple<>(key, value));
}

} else {
completableValue = future.handle(handler);
}
V value;
try {
value = completableValue.get();
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("Future completed exceptionally but no error thrown");
}
} catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
return value;
}

@Override
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,8 @@ public void testInvalidateAll() throws Exception {
LoadAwareCacheLoader<ICacheKey<String>, String> tieredCacheLoader = getLoadAwareCacheLoader();
tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader);
}
System.out.println("onheap size = " + tieredSpilloverCache.getOnHeapCache().count());
System.out.println("disk size = " + tieredSpilloverCache.getDiskCache().count());
assertEquals(numOfItems1, tieredSpilloverCache.count());
tieredSpilloverCache.invalidateAll();
assertEquals(0, tieredSpilloverCache.count());
Expand Down

0 comments on commit ae62259

Please sign in to comment.