Skip to content

Commit

Permalink
fix: expiring cache entries during read and write
Browse files Browse the repository at this point in the history
  • Loading branch information
shaj13 committed Jun 17, 2022
1 parent aca3e78 commit a0b499e
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 5 deletions.
43 changes: 43 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package libcache_test

import (
"fmt"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -282,6 +283,48 @@ func TestOnExpired(t *testing.T) {
}
}

func TestExpiring(t *testing.T) {
for _, tt := range cacheTests {
t.Run("Test"+tt.cont.String()+"CacheExpiring", func(t *testing.T) {
cache := tt.cont.New(0)
keys := make([]interface{}, 10)
for i := 0; i < 10; i++ {
cache.StoreWithTTL(fmt.Sprintf("%v.100", i), i, time.Millisecond*100)
cache.StoreWithTTL(fmt.Sprintf("%v.200", i), i, time.Millisecond*200)
keys[i] = fmt.Sprintf("%v.200", i)
}

time.Sleep(time.Millisecond * 100)

cache.Peek("notfound") // should expire *.100
got := cache.Keys()
assert.ElementsMatch(t, keys, got)

time.Sleep(time.Millisecond * 100)
cache.Store("notfound", 0) // should expire *.200
got = cache.Keys()
assert.ElementsMatch(t, []string{"notfound"}, got)

cache.Purge()

// check remove element will keep other entries in heap.
// this has been added to make sure we remove right entry
// by its index.
cache.StoreWithTTL(1, 1, time.Millisecond*100)
cache.StoreWithTTL(2, 2, time.Millisecond*200)

cache.Delete(2)
got = cache.Keys()
assert.ElementsMatch(t, []int{1}, got)

time.Sleep(time.Millisecond * 100)
cache.Peek("")
assert.Equal(t, 0, cache.Len())

})
}
}

func BenchmarkCache(b *testing.B) {
for _, tt := range cacheTests {
b.Run("Benchmark"+tt.cont.String()+"Cache", func(b *testing.B) {
Expand Down
64 changes: 59 additions & 5 deletions internal/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"container/heap"
"time"
)

Expand All @@ -22,6 +23,7 @@ type Entry struct {
Element interface{}
Exp time.Time
timer *time.Timer
index int
cancel chan struct{}
}

Expand Down Expand Up @@ -50,6 +52,7 @@ func (e *Entry) stopTimer() {
// of the Cache interface to minimize the effort required to implement interface.
type Cache struct {
coll Collection
heap expiringHeap
entries map[interface{}]*Entry
onEvicted func(key, value interface{})
onExpired func(key, value interface{})
Expand All @@ -68,16 +71,14 @@ func (c *Cache) Peek(key interface{}) (interface{}, bool) {
}

func (c *Cache) get(key interface{}, peek bool) (v interface{}, found bool) {
// Run GC inline before return the entry.
c.gc()

e, ok := c.entries[key]
if !ok {
return
}

if !e.Exp.IsZero() && time.Now().UTC().After(e.Exp) {
c.evict(e)
return
}

if !peek {
c.coll.Move(e)
}
Expand All @@ -101,6 +102,9 @@ func (c *Cache) Store(key, value interface{}) {

// StoreWithTTL sets the key value with TTL overrides the default.
func (c *Cache) StoreWithTTL(key, value interface{}, ttl time.Duration) {
// Run GC inline before pushing the new entry.
c.gc()

if e, ok := c.entries[key]; ok {
c.removeEntry(e)
}
Expand All @@ -112,6 +116,7 @@ func (c *Cache) StoreWithTTL(key, value interface{}, ttl time.Duration) {
e.startTimer(ttl, c.onExpired)
}
e.Exp = time.Now().UTC().Add(ttl)
heap.Push(&c.heap, e)
}

c.entries[key] = e
Expand Down Expand Up @@ -205,6 +210,11 @@ func (c *Cache) removeEntry(e *Entry) {
c.coll.Remove(e)
e.stopTimer()
delete(c.entries, e.Key)
// Remove entry from the heap, the entry may does not exist because
// it has zero ttl or already popped up by gc
if len(c.heap) > 0 && e.index < len(c.heap) && e.Key == c.heap[e.index].Key {
heap.Remove(&c.heap, e.index)
}
}

// evict remove entry and fire on evicted callback.
Expand All @@ -215,6 +225,19 @@ func (c *Cache) evict(e *Entry) {
}
}

func (c *Cache) gc() {
now := time.Now()
for {
// Return from gc if the heap is empty or the next element is not yet
// expired
if len(c.heap) == 0 || now.Before(c.heap[0].Exp) {
return
}
e := heap.Pop(&c.heap).(*Entry)
c.removeEntry(e)
}
}

// TTL returns entries default TTL.
func (c *Cache) TTL() time.Duration {
return c.ttl
Expand Down Expand Up @@ -250,3 +273,34 @@ func New(c Collection, cap int) *Cache {
entries: make(map[interface{}]*Entry),
}
}

// expiringHeap is a min-heap ordered by expiration time of its entries. The
// expiring cache uses this as a priority queue to efficiently organize entries
// which will be garbage collected once they expire.
type expiringHeap []*Entry

var _ heap.Interface = &expiringHeap{}

func (cq expiringHeap) Len() int {
return len(cq)
}

func (cq expiringHeap) Less(i, j int) bool {
return cq[i].Exp.Before(cq[j].Exp)
}

func (cq expiringHeap) Swap(i, j int) {
cq[i].index, cq[j].index = cq[j].index, cq[i].index
cq[i], cq[j] = cq[j], cq[i]
}

func (cq *expiringHeap) Push(c interface{}) {
c.(*Entry).index = len(*cq)
*cq = append(*cq, c.(*Entry))
}

func (cq *expiringHeap) Pop() interface{} {
c := (*cq)[cq.Len()-1]
*cq = (*cq)[:cq.Len()-1]
return c
}

0 comments on commit a0b499e

Please sign in to comment.