Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Nov 22, 2023
1 parent 5ace3a7 commit 2e9c478
Show file tree
Hide file tree
Showing 16 changed files with 237 additions and 159 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814))
- Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Tiered caching] Framework changes ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753)
- [Tiered caching] Defining interfaces, service and extending IndicesRequestCache with basic Tier support ([#10753] (https://github.com/opensearch-project/OpenSearch/pull/10753))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

package org.opensearch.common.cache;

import org.opensearch.common.cache.tier.TierType;

/**
* Notification when an element is removed from the cache
*
Expand All @@ -44,17 +42,11 @@ public class RemovalNotification<K, V> {
private final K key;
private final V value;
private final RemovalReason removalReason;
private final TierType tierType;

public RemovalNotification(K key, V value, RemovalReason removalReason) {
this(key, value, removalReason, TierType.ON_HEAP);
}

public RemovalNotification(K key, V value, RemovalReason removalReason, TierType tierType) {
this.key = key;
this.value = value;
this.removalReason = removalReason;
this.tierType = tierType;
}

public K getKey() {
Expand All @@ -68,8 +60,4 @@ public V getValue() {
public RemovalReason getRemovalReason() {
return removalReason;
}

public TierType getTierType() {
return tierType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

package org.opensearch.common.cache.tier;

import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.tier.enums.CacheStoreType;
import org.opensearch.common.cache.tier.listeners.TieredCacheRemovalListener;

/**
* Caching tier interface. Can be implemented/extended by concrete classes to provide different flavors of cache like
Expand All @@ -28,15 +29,15 @@ public interface CachingTier<K, V> {

V compute(K key, TieredCacheLoader<K, V> loader) throws Exception;

void setRemovalListener(RemovalListener<K, V> removalListener);
void setRemovalListener(TieredCacheRemovalListener<K, V> removalListener);

void invalidateAll();

Iterable<K> keys();

int count();

TierType getTierType();
CacheStoreType getTierType();

/**
* Force any outstanding size-based and time-based evictions to occur
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.tier.enums.CacheStoreType;
import org.opensearch.common.cache.tier.listeners.TieredCacheRemovalListener;
import org.opensearch.common.unit.TimeValue;

import java.util.Objects;
Expand All @@ -26,7 +28,7 @@
public class OpenSearchOnHeapCache<K, V> implements OnHeapCachingTier<K, V>, RemovalListener<K, V> {

private final Cache<K, V> cache;
private RemovalListener<K, V> removalListener;
private TieredCacheRemovalListener<K, V> removalListener;

private OpenSearchOnHeapCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.weigher);
Expand All @@ -41,7 +43,7 @@ private OpenSearchOnHeapCache(Builder<K, V> builder) {
}

@Override
public void setRemovalListener(RemovalListener<K, V> removalListener) {
public void setRemovalListener(TieredCacheRemovalListener<K, V> removalListener) {
this.removalListener = removalListener;
}

Expand All @@ -61,8 +63,8 @@ public int count() {
}

@Override
public TierType getTierType() {
return TierType.ON_HEAP;
public CacheStoreType getTierType() {
return CacheStoreType.ON_HEAP;
}

@Override
Expand Down Expand Up @@ -97,7 +99,8 @@ public void refresh() {

@Override
public void onRemoval(RemovalNotification<K, V> notification) {
removalListener.onRemoval(notification);
removalListener.onRemoval(new TieredCacheRemovalNotification<>(notification.getKey(), notification.getValue()
, notification.getRemovalReason(), CacheStoreType.ON_HEAP));
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.tier.enums.CacheStoreType;

/**
* Notification when an element is removed from tiered cache.
* @param <K> Type of key
* @param <V> Type of value
*
* @opensearch.internal
*/
public class TieredCacheRemovalNotification<K, V> extends RemovalNotification<K, V> {
private final CacheStoreType cacheStoreType;

public TieredCacheRemovalNotification(K key, V value, RemovalReason removalReason) {
super(key, value, removalReason);
this.cacheStoreType = CacheStoreType.ON_HEAP;
}

public TieredCacheRemovalNotification(K key, V value, RemovalReason removalReason, CacheStoreType cacheStoreType) {
super(key, value, removalReason);
this.cacheStoreType = cacheStoreType;
}

public CacheStoreType getTierType() {
return cacheStoreType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
* compatible open source license.
*/

package org.opensearch.common.cache.tier;
package org.opensearch.common.cache.tier.enums;

/**
* Tier types in cache.
* Cache store types in tiered cache.
*/
public enum TierType {
public enum CacheStoreType {

ON_HEAP,
DISK;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier.listeners;

import org.opensearch.common.cache.tier.enums.CacheStoreType;
import org.opensearch.common.cache.tier.TieredCacheRemovalNotification;

/**
* This can be used to listen to tiered caching events
* @param <K> Type of key
* @param <V> Type of value
*/
public interface TieredCacheEventListener<K, V> {

void onMiss(K key, CacheStoreType cacheStoreType);

void onRemoval(TieredCacheRemovalNotification<K, V> notification);

void onHit(K key, V value, CacheStoreType cacheStoreType);

void onCached(K key, V value, CacheStoreType cacheStoreType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier.listeners;

import org.opensearch.common.cache.tier.TieredCacheRemovalNotification;

/**
* Listener for removing an element from tiered cache
*
* @opensearch.internal
*/
@FunctionalInterface
public interface TieredCacheRemovalListener<K, V> {
void onRemoval(TieredCacheRemovalNotification<K, V> notification);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
* compatible open source license.
*/

package org.opensearch.common.cache.tier;
package org.opensearch.common.cache.tier.service;

import org.opensearch.common.cache.tier.DiskCachingTier;
import org.opensearch.common.cache.tier.OnHeapCachingTier;
import org.opensearch.common.cache.tier.TieredCacheLoader;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
* compatible open source license.
*/

package org.opensearch.common.cache.tier;
package org.opensearch.common.cache.tier.service;

import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.tier.enums.CacheStoreType;
import org.opensearch.common.cache.tier.CachingTier;
import org.opensearch.common.cache.tier.DiskCachingTier;
import org.opensearch.common.cache.tier.OnHeapCachingTier;
import org.opensearch.common.cache.tier.TieredCacheLoader;
import org.opensearch.common.cache.tier.TieredCacheRemovalNotification;
import org.opensearch.common.cache.tier.listeners.TieredCacheEventListener;
import org.opensearch.common.cache.tier.listeners.TieredCacheRemovalListener;

import java.util.Arrays;
import java.util.List;
Expand All @@ -24,7 +30,7 @@
* @param <K> Type of key
* @param <V> Type of value
*/
public class TieredCacheSpilloverStrategyService<K, V> implements TieredCacheService<K, V>, RemovalListener<K, V> {
public class TieredCacheSpilloverStrategyService<K, V> implements TieredCacheService<K, V>, TieredCacheRemovalListener<K, V> {

private final OnHeapCachingTier<K, V> onHeapCachingTier;

Expand Down Expand Up @@ -63,7 +69,7 @@ public V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception
if (cacheValue == null) {
// Add the value to the onHeap cache. Any items if evicted will be moved to lower tier.
V value = onHeapCachingTier.compute(key, loader);
tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP);
tieredCacheEventListener.onCached(key, value, CacheStoreType.ON_HEAP);
return value;
}
return cacheValue.value;
Expand Down Expand Up @@ -126,13 +132,13 @@ public long count() {
* @param notification Contains info about the removal like reason, key/value etc.
*/
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
public void onRemoval(TieredCacheRemovalNotification<K, V> notification) {
if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) {
switch (notification.getTierType()) {
case ON_HEAP:
diskCachingTier.ifPresent(diskTier -> {
diskTier.put(notification.getKey(), notification.getValue());
tieredCacheEventListener.onCached(notification.getKey(), notification.getValue(), TierType.DISK);
tieredCacheEventListener.onCached(notification.getKey(), notification.getValue(), CacheStoreType.DISK);
});
break;
default:
Expand Down Expand Up @@ -185,9 +191,9 @@ private Function<K, CacheValue<V>> getValueFromTierCache(boolean trackStats) {
*/
public static class CacheValue<V> {
V value;
TierType source;
CacheStoreType source;

CacheValue(V value, TierType source) {
CacheValue(V value, CacheStoreType source) {
this.value = value;
this.source = source;
}
Expand Down
Loading

0 comments on commit 2e9c478

Please sign in to comment.