diff --git a/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java index 70e36921fb7ad..3051bf3e4ff6e 100644 --- a/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java @@ -40,6 +40,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -116,6 +117,94 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio ); } + public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception { + int numberOfSegments = getNumberOfSegments(); + int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this + // just a bit higher so that each segment can atleast hold 1 entry. + int onHeapCacheSizeInBytes = onHeapCacheSizePerSegmentInBytes * numberOfSegments; + internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments)).build()); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + // Set a very high value for took time policy so that no items evicted from onHeap cache are spilled + // to disk. And then hit requests so that few items are cached into cache. + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(100, TimeUnit.SECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + int numberOfIndexedItems = numberOfSegments + 1; // Best case if all keys are distributed among different + // segment, atleast one of the segment will have 2 entries and we will see evictions. + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + long perQuerySizeInCacheInBytes = -1; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + // Considering disk cache won't be used due to took time policy having a high value, we expect overall cache + // size to be less than or equal to onHeapCache size. + assertTrue(requestCacheStats.getMemorySizeInBytes() <= onHeapCacheSizeInBytes); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + // We should atleast one eviction considering disk cache isn't able to hold anything due to policy. + assertTrue(requestCacheStats.getEvictions() > 0); + assertEquals(0, requestCacheStats.getHitCount()); + long lastEvictionSeen = requestCacheStats.getEvictions(); + + // Decrease took time policy to zero so that disk cache also comes into play. Now we should be able + // to cache all entries. + updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.MILLISECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + for (int iterator = 0; iterator < numberOfIndexedItems * 2; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery(UUID.randomUUID().toString(), UUID.randomUUID().toString())) + .get(); + assertSearchResponse(resp); + } + + requestCacheStats = getRequestCacheStats(client, "index"); + // We shouldn't see any new evictions now. + assertEquals(lastEvictionSeen, requestCacheStats.getEvictions()); + } + public void testWithDynamicTookTimePolicy() throws Exception { int onHeapCacheSizeInBytes = 2000; internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());