Skip to content

Commit

Permalink
modifying unit tests to catch rare deadlock issues
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Jun 4, 2024
1 parent 6eb3e05 commit 3b75273
Showing 1 changed file with 137 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -1323,7 +1324,7 @@ public void testTierStatsAddCorrectly() throws Exception {
}

public void testNumLocksTiming() throws Exception {
int onHeapCacheSize = randomIntBetween(2400, 2401);
int onHeapCacheSize = 10;//randomIntBetween(2400, 2401);
int diskCacheSize = randomIntBetween(5000, 10000);
int keyValueSize = 50;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Expand All @@ -1341,22 +1342,37 @@ public void testNumLocksTiming() throws Exception {
.build(),
0
);

long startTime = System.currentTimeMillis();
int numRequests = 100_000;
// Each thread will do this many requests for key with string value of i, and then that many again (for possible hits)
int numThreads = 8;
Thread[] threads = new Thread[numThreads];
Phaser phaser = new Phaser(numThreads + 1);
CountDownLatch countDownLatch = new CountDownLatch(numThreads);

new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Taking thread dump");
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
System.out.println(entry.getKey() + " " + entry.getKey().getState());
for (StackTraceElement ste : entry.getValue()) {
System.out.println("\tat " + ste);
}
System.out.println();
}
}).start();
// Precompute the keys each thread will request so we don't include that in the time estimate
List<List<ICacheKey<String>>> keysPerThread = new ArrayList<>();

for (int i = 0; i < numThreads; i++) {
keysPerThread.add(new ArrayList<>());
int finalI = i;
for (int j = 0; j < numRequests; j++) {
keysPerThread.get(i).add(getICacheKey(String.valueOf(randomInt(numRequests))));
keysPerThread.get(i).add(getICacheKey(String.valueOf(randomInt(10))));
}

threads[i] = new Thread(() -> {
Expand All @@ -1365,14 +1381,15 @@ public void testNumLocksTiming() throws Exception {
for (int j = 0; j < numRequests; j++) {
tieredSpilloverCache.computeIfAbsent(keysPerThread.get(finalI).get(j), getLoadAwareCacheLoader());
if (j % 100 == 0) {
System.out.println("Finished iter " + j);
//System.out.println("Finished iter " + j + " elapsed time = " + (System
// .currentTimeMillis() - startTime));
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
});
}, "testing-" + i);
threads[i].start();
}
long now = System.nanoTime();
Expand All @@ -1383,6 +1400,121 @@ public void testNumLocksTiming() throws Exception {
// or " + (float) elapsed / 1000000000 + " sec");
}

public void testDeadLockWithApproach2() throws Exception {
int onHeapCacheSize = 1;
int diskCacheSize = 0;
int keyValueSize = 50;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
TieredSpilloverCache<String, String> tieredSpilloverCache = initializeTieredSpilloverCache(
keyValueSize,
diskCacheSize,
removalListener,
Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build(),
0
);

String key1 = "test1";
String key2 = "test2";
ICacheKey<String> cacheKey = getICacheKey(key1);
ICacheKey<String> cacheKey1 = getICacheKey(key2);

// Thread thread2 = new Thread(() -> {
// try {
// tieredSpilloverCache.computeIfAbsent(cacheKey1, new LoadAwareCacheLoader<>() {
// boolean isLoaded = false;
//
// @Override
// public boolean isLoaded() {
// return isLoaded;
// }
//
// @Override
// public String load(ICacheKey<String> key) throws Exception {
// isLoaded = true;
// return key.key;
// }
// });
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });

AtomicInteger check = new AtomicInteger(0);

RemovalListener<String, String> mockRemovalListener = new RemovalListener<String, String>() {
@Override
public void onRemoval(RemovalNotification<String, String> notification) {
if (check.get() == 0) {
try {
new Thread(() -> {
try {
tieredSpilloverCache.computeIfAbsent(cacheKey1, new LoadAwareCacheLoader<>() {
boolean isLoaded = false;

@Override
public boolean isLoaded() {
return isLoaded;
}

@Override
public String load(ICacheKey<String> key) throws Exception {
isLoaded = true;
return key.key;
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
} catch (Exception e) {
throw new RuntimeException(e);
}
check.incrementAndGet();
}
}
};

tieredSpilloverCache.setMockListener(mockRemovalListener);

tieredSpilloverCache.computeIfAbsent(cacheKey1, new LoadAwareCacheLoader<>() {
boolean isLoaded = false;

@Override
public boolean isLoaded() {
return isLoaded;
}

@Override
public String load(ICacheKey<String> key) throws Exception {
isLoaded = true;
return key.key;
}
});


tieredSpilloverCache.computeIfAbsent(cacheKey, new LoadAwareCacheLoader<>() {
boolean isLoaded = false;

@Override
public boolean isLoaded() {
return isLoaded;
}

@Override
public String load(ICacheKey<String> key) throws Exception {
isLoaded = true;
return key.key;
}
});
}

private List<String> getMockDimensions() {
List<String> dims = new ArrayList<>();
for (String dimensionName : dimensionNames) {
Expand Down

0 comments on commit 3b75273

Please sign in to comment.