Kaynağa Gözat

Merge pull request #1157 from kubecost/bolt/cachegroup

CacheGroup[T] Utility
Matt Bolt 4 yıl önce
ebeveyn
işleme
e524f24a8d

+ 157 - 0
pkg/util/cache/cachegroup.go

@@ -0,0 +1,157 @@
+package cache
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/util/interval"
+	"golang.org/x/sync/singleflight"
+)
+
+// cacheEntry contains a T item and the time it was added to the cache
+type cacheEntry[T comparable] struct {
+	item T
+	ts   time.Time
+}
+
+// CacheGroup provides single flighting for grouping repeated calls for the same workload, as well
+// as a cache that extends the lifetime of the returned result by a specific duration.
+type CacheGroup[T comparable] struct {
+	lock             sync.Mutex
+	cache            map[string]*cacheEntry[T]
+	group            singleflight.Group
+	expirationLock   sync.Mutex
+	expirationRunner *interval.IntervalRunner
+	expiry           time.Duration
+	max              int
+}
+
+// NewCacheGroup[T] creates a new cache group instance given the max number of keys to cache. 
+// If a new cache entry is added that exceeds the maximum, the oldest entry is evicted.
+func NewCacheGroup[T comparable](max int) *CacheGroup[T] {
+	return &CacheGroup[T]{
+		cache: make(map[string]*cacheEntry[T]),
+		max:   max,
+	}
+}
+
+// Do accepts a group key and a factory function to execute a workload request. Any executions
+// of Do() using an identical key will wait on the originating request rather than executing a
+// new request, and the final result will be shared among any callers sharing the same key.
+// Additionally, once returned, the workload for that key will remained cached. An expiration
+// policy can be added for this cache by calling the WithExpiration method.
+func (cg *CacheGroup[T]) Do(key string, factory func() (T, error)) (T, error) {
+	// Check cache for existing data using the group key
+	cg.lock.Lock()
+	if result, ok := cg.cache[key]; ok {
+		cg.lock.Unlock()
+		return result.item, nil
+	}
+	cg.lock.Unlock()
+
+	// single flight the group using the group key
+	item, err, _ := cg.group.Do(key, func() (any, error) {
+		i, err := factory()
+		if err != nil {
+			return nil, err
+		}
+
+		// assign cache once a result for the group key is returned
+		cg.lock.Lock()
+		cg.removeOldestBeyondCapacity()
+		cg.cache[key] = &cacheEntry[T]{
+			item: i,
+			ts:   time.Now(),
+		}
+		cg.lock.Unlock()
+		return i, nil
+	})
+
+	if err != nil {
+		return defaultValue[T](), err
+	}
+
+	tItem, ok := item.(T)
+	if !ok {
+		return defaultValue[T](), fmt.Errorf("Failed to convert single flight result")
+	}
+
+	return tItem, nil
+}
+
+// WithExpiration assigns a cache expiration to cached entries, and  starts an eviction process,
+// which runs on the specified interval.
+func (cg *CacheGroup[T]) WithExpiration(expiry time.Duration, evictionInterval time.Duration) *CacheGroup[T] {
+	cg.expirationLock.Lock()
+	defer cg.expirationLock.Unlock()
+
+	if cg.expirationRunner == nil {
+		cg.expirationRunner = interval.NewIntervalRunner(func() {
+			cg.lock.Lock()
+			defer cg.lock.Unlock()
+
+			cg.removeExpired()
+		}, evictionInterval)
+	}
+
+	if cg.expirationRunner.Start() {
+		cg.expiry = expiry
+	}
+	return cg
+}
+
+// DisableExpiration will shutdown the expiration process which allows cache entries to remain until 'max' is
+// exceeded.
+func (cg *CacheGroup[T]) DisableExpiration() {
+	cg.expirationLock.Lock()
+	defer cg.expirationLock.Unlock()
+
+	if cg.expirationRunner == nil {
+		cg.expirationRunner.Stop()
+		cg.expirationRunner = nil
+	}
+}
+
+// locates the oldest entry and removes it from the map. caller should lock
+// prior to calling
+func (cg *CacheGroup[T]) removeOldestBeyondCapacity() {
+	// only remove the oldest entries if we're at max capacity
+	if len(cg.cache) < cg.max {
+		return
+	}
+
+	oldest := time.Now()
+	oldestKey := ""
+
+	for k, v := range cg.cache {
+		if v.ts.Before(oldest) {
+			oldest = v.ts
+			oldestKey = k
+		}
+	}
+
+	delete(cg.cache, oldestKey)
+}
+
+// removes any entries that have expired from the map. caller should lock prior
+// to calling
+func (cg *CacheGroup[T]) removeExpired() {
+	if len(cg.cache) == 0 {
+		return
+	}
+
+	now := time.Now()
+	for k, v := range cg.cache {
+		if now.Sub(v.ts) >= cg.expiry {
+			delete(cg.cache, k)
+		}
+	}
+}
+
+// default value helper function to returns the initialized value for a T instance
+// (ie: for value types, typically the 0 value. For pointer types, nil)
+func defaultValue[T any]() T {
+	var t T
+	return t
+}

+ 187 - 0
pkg/util/cache/cachegroup_test.go

@@ -0,0 +1,187 @@
+package cache
+
+import (
+	"sync"
+	"testing"
+	"time"
+)
+
+type Obj struct {
+	Value int
+}
+
+func TestGroupCacheSingleFlighting(t *testing.T) {
+	g := NewCacheGroup[*Obj](3)
+
+	factory := func() (*Obj, error) {
+		time.Sleep(2 * time.Second)
+		return &Obj{10}, nil
+	}
+
+	next := make(chan struct{})
+	done := make(chan struct{})
+
+	go func() {
+		now := time.Now()
+		o, _ := g.Do("a", func() (*Obj, error) {
+			next <- struct{}{}
+			return factory()
+		})
+		t.Logf("Took: %d ms, Obj Value: %d\n", time.Now().Sub(now).Milliseconds(), o.Value)
+	}()
+
+	go func() {
+		<-next
+
+		time.Sleep(1 * time.Second)
+
+		now := time.Now()
+		o, _ := g.Do("a", factory)
+		delta := time.Now().Sub(now)
+		t.Logf("Other Go Routine Took: %d ms, Obj Value: %d\n", delta.Milliseconds(), o.Value)
+
+		if delta > (time.Duration(1250 * time.Millisecond)) {
+			t.Errorf("Delta Time > 1250ms. Delta: %d, Expected 1000ms\n", delta)
+		}
+		done <- struct{}{}
+	}()
+
+	<-done
+}
+
+func TestGroupCacheAfterSingleFlighting(t *testing.T) {
+	g := NewCacheGroup[*Obj](3)
+
+	factory := func() (*Obj, error) {
+		time.Sleep(2 * time.Second)
+		return &Obj{10}, nil
+	}
+
+	next := make(chan struct{})
+	done := make(chan struct{})
+
+	go func() {
+		now := time.Now()
+		o, _ := g.Do("a", func() (*Obj, error) {
+			next <- struct{}{}
+			return factory()
+		})
+		t.Logf("Took: %d ms, Obj Value: %d\n", time.Now().Sub(now).Milliseconds(), o.Value)
+	}()
+
+	go func() {
+		<-next
+		// wait the full 2 seconds and then some, which will ensure we are no longer
+		// single flighting, and should reach into the cache
+		time.Sleep(2500 * time.Millisecond)
+
+		now := time.Now()
+		o, _ := g.Do("a", factory)
+		delta := time.Now().Sub(now)
+		t.Logf("Other Go Routine Took: %d ms, Obj Value: %d\n", delta.Milliseconds(), o.Value)
+
+		if delta > (time.Duration(1250 * time.Millisecond)) {
+			t.Errorf("Delta Time > 1250ms. Delta: %d, Expected 1000ms\n", delta)
+		}
+
+		done <- struct{}{}
+	}()
+
+	<-done
+}
+
+func TestGroupCacheMany(t *testing.T) {
+	// Apologies this test can be difficult to follow. (Concurrent tests are hard)
+	// The idea here is that we test a "request" that takes 1 second to return an
+	// Obj{10} result (factory).
+	// * To test the single flight behavior, we make a series of requests that will
+	//   happen while the initial request is in flight.
+	// * The second half of requests will be made after the original request returns
+	//   to ensure that we pull from cache.
+	// * The failure case is if all of these actions takes too long to execute, which
+	//   _should_ indicate a deadlock or problem with the API.
+	g := NewCacheGroup[*Obj](3).WithExpiration(10*time.Second, 5*time.Second)
+
+	factory := func() (*Obj, error) {
+		time.Sleep(1 * time.Second)
+		return &Obj{10}, nil
+	}
+
+	next := make(chan struct{})
+
+	go func() {
+		now := time.Now()
+		o, _ := g.Do("a", func() (*Obj, error) {
+			next <- struct{}{}
+			return factory()
+		})
+		t.Logf("Took: %d ms, Obj Value: %d\n", time.Now().Sub(now).Milliseconds(), o.Value)
+	}()
+
+	<-next
+	var wg sync.WaitGroup
+	wg.Add(10)
+	for i := 0; i < 10; i++ {
+		go func(ii int) {
+			t.Logf("Created Go Routine: %d\n", ii)
+			now := time.Now()
+			o, _ := g.Do("a", factory)
+			delta := time.Now().Sub(now)
+			t.Logf("Go Routine[%d] Took: %d ms, Obj Value: %d\n", ii, delta.Milliseconds(), o.Value)
+			wg.Done()
+		}(i)
+		time.Sleep(250 * time.Millisecond)
+	}
+
+	select {
+	case <-waitChannelFor(&wg):
+		t.Logf("Successfully returned values for all requests.")
+	case <-time.After(time.Second * 8):
+		t.Logf("Failed to complete after 8 second timeout")
+	}
+}
+
+func TestCacheGroupExpirationPolicy(t *testing.T) {
+	g := NewCacheGroup[*Obj](3).WithExpiration(2*time.Second, time.Second)
+	g.Do("a", func() (*Obj, error) {
+		return &Obj{10}, nil
+	})
+
+	time.Sleep(2100 * time.Millisecond)
+	if len(g.cache) > 0 {
+		t.Errorf("Expected cache to be empty (expired). Cache length was: %d\n", len(g.cache))
+	}
+}
+
+func TestCacheGroupMaxRollOff(t *testing.T) {
+	g := NewCacheGroup[*Obj](3)
+
+	g.Do("a", func() (*Obj, error) {
+		return &Obj{1}, nil
+	})
+
+	g.Do("b", func() (*Obj, error) {
+		return &Obj{1}, nil
+	})
+
+	g.Do("c", func() (*Obj, error) {
+		return &Obj{1}, nil
+	})
+
+	g.Do("d", func() (*Obj, error) {
+		return &Obj{1}, nil
+	})
+
+	if _, ok := g.cache["a"]; ok {
+		t.Errorf("Expected 'a' group cache to be evicted")
+	}
+}
+
+func waitChannelFor(wg *sync.WaitGroup) <-chan struct{} {
+	ch := make(chan struct{})
+	go func() {
+		wg.Wait()
+		ch <- struct{}{}
+	}()
+	return ch
+}

+ 72 - 0
pkg/util/interval/intervalrunner.go

@@ -0,0 +1,72 @@
+package interval
+
+import (
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/util/atomic"
+)
+
+// IntervalRunner is an example implementation of AtomicRunState.
+type IntervalRunner struct {
+	runState atomic.AtomicRunState
+	action   func()
+	interval time.Duration
+}
+
+// NewIntervalRunner Creates a new instance of an interval runner to execute the provided
+// function on a designated interval until explicitly stopped.
+func NewIntervalRunner(action func(), interval time.Duration) *IntervalRunner {
+	return &IntervalRunner{
+		action:   action,
+		interval: interval,
+	}
+}
+
+// Start begins the interval execution. It returns true if the interval execution successfully starts.
+// It will return false if the interval execcution is already running.
+func (ir *IntervalRunner) Start() bool {
+	// Before we attempt to start, we must ensure we are not in a stopping state, this is a common
+	// pattern that should be used with the AtomicRunState
+	ir.runState.WaitForReset()
+
+	// This will atomically check the current state to ensure we can run, then advances the state.
+	// If the state is already started, it will return false.
+	if !ir.runState.Start() {
+		return false
+	}
+
+	// our run state is advanced, let's execute our action on the interval
+	// spawn a new goroutine which will loop and wait the interval each iteration
+	go func() {
+		ticker := time.NewTicker(ir.interval)
+		for {
+			// use a select statement to receive whichever channel receives data first
+			select {
+			// if our stop channel receives data, it means we have explicitly called
+			// Stop(), and must reset our AtomicRunState to it's initial idle state
+			case <-ir.runState.OnStop():
+				ticker.Stop()
+				ir.runState.Reset()
+				return // exit go routine
+
+			// After our interval elapses, fall through
+			case <-ticker.C:
+			}
+
+			// Execute the function
+			ir.action()
+
+			// Loop back to the select where we will wait for the interval to elapse
+			// or an explicit stop to be called
+		}
+	}()
+
+	return true
+}
+
+// Stop will explicitly stop the execution of the interval runner. If an action is already executing, it will wait
+// until completion before processing the stop. Any attempts to start during the stopping phase will block until
+// it's possible to Start() again
+func (ir *IntervalRunner) Stop() bool {
+	return ir.runState.Stop()
+}