| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "fmt"
- "sync"
- "time"
- "k8s.io/apimachinery/pkg/util/sets"
- utiltrace "k8s.io/utils/trace"
- )
- // ThreadSafeStore is an interface that allows concurrent indexed
- // access to a storage backend. It is like Indexer but does not
- // (necessarily) know how to extract the Store key from a given
- // object.
- //
- // TL;DR caveats: you must not modify anything returned by Get or List as it will break
- // the indexing feature in addition to not being thread safe.
- //
- // The guarantees of thread safety provided by List/Get are only valid if the caller
- // treats returned items as read-only. For example, a pointer inserted in the store
- // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
- // on the same key and modify the pointer in a non-thread-safe way. Also note that
- // modifying objects stored by the indexers (if any) will *not* automatically lead
- // to a re-index. So it's not a good idea to directly modify the objects returned by
- // Get/List, in general.
- type ThreadSafeStore interface {
- Add(key string, obj interface{})
- Update(key string, obj interface{})
- Delete(key string)
- Get(key string) (item interface{}, exists bool)
- List() []interface{}
- ListKeys() []string
- Replace(map[string]interface{}, string)
- Index(indexName string, obj interface{}) ([]interface{}, error)
- IndexKeys(indexName, indexedValue string) ([]string, error)
- ListIndexFuncValues(name string) []string
- ByIndex(indexName, indexedValue string) ([]interface{}, error)
- GetIndexers() Indexers
- // AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
- AddIndexers(newIndexers Indexers) error
- // Resync is a no-op and is deprecated
- Resync() error
- }
- // ThreadSafeStoreWithTransaction is a store that can batch execute multiple transactions.
- type ThreadSafeStoreWithTransaction interface {
- ThreadSafeStore
- // Transaction allows performing multiple writes in one call.
- Transaction(fns ...ThreadSafeStoreTransaction)
- }
- // ThreadSafeStoreTransaction embeds a Transaction and includes the specific Key identifying the affected object.
- type ThreadSafeStoreTransaction struct {
- Transaction
- Key string
- }
- // storeIndex implements the indexing functionality for Store interface
- type storeIndex struct {
- // indexers maps a name to an IndexFunc
- indexers Indexers
- // indices maps a name to an Index
- indices Indices
- }
- func (i *storeIndex) reset() {
- i.indices = Indices{}
- }
- func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.Set[string], error) {
- indexFunc := i.indexers[indexName]
- if indexFunc == nil {
- return nil, fmt.Errorf("Index with name %s does not exist", indexName)
- }
- indexedValues, err := indexFunc(obj)
- if err != nil {
- return nil, err
- }
- index := i.indices[indexName]
- var storeKeySet sets.Set[string]
- if len(indexedValues) == 1 {
- // In majority of cases, there is exactly one value matching.
- // Optimize the most common path - deduping is not needed here.
- storeKeySet = index[indexedValues[0]]
- } else {
- // Need to de-dupe the return list.
- // Since multiple keys are allowed, this can happen.
- storeKeySet = sets.Set[string]{}
- for _, indexedValue := range indexedValues {
- for key := range index[indexedValue] {
- storeKeySet.Insert(key)
- }
- }
- }
- return storeKeySet, nil
- }
- func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.Set[string], error) {
- indexFunc := i.indexers[indexName]
- if indexFunc == nil {
- return nil, fmt.Errorf("Index with name %s does not exist", indexName)
- }
- index := i.indices[indexName]
- return index[indexedValue], nil
- }
- func (i *storeIndex) getIndexValues(indexName string) []string {
- index := i.indices[indexName]
- names := make([]string, 0, len(index))
- for key := range index {
- names = append(names, key)
- }
- return names
- }
- func (i *storeIndex) addIndexers(newIndexers Indexers) error {
- oldKeys := sets.KeySet(i.indexers)
- newKeys := sets.KeySet(newIndexers)
- if oldKeys.HasAny(sets.List(newKeys)...) {
- return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
- }
- for k, v := range newIndexers {
- i.indexers[k] = v
- }
- return nil
- }
- // updateSingleIndex modifies the objects location in the named index:
- // - for create you must provide only the newObj
- // - for update you must provide both the oldObj and the newObj
- // - for delete you must provide only the oldObj
- // updateSingleIndex must be called from a function that already has a lock on the cache
- func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
- var oldIndexValues, indexValues []string
- indexFunc, ok := i.indexers[name]
- if !ok {
- // Should never happen. Caller is responsible for ensuring this exists, and should call with lock
- // held to avoid any races.
- panic(fmt.Errorf("indexer %q does not exist", name))
- }
- if oldObj != nil {
- var err error
- oldIndexValues, err = indexFunc(oldObj)
- if err != nil {
- panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
- }
- } else {
- oldIndexValues = oldIndexValues[:0]
- }
- if newObj != nil {
- var err error
- indexValues, err = indexFunc(newObj)
- if err != nil {
- panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
- }
- } else {
- indexValues = indexValues[:0]
- }
- idx := i.indices[name]
- if idx == nil {
- idx = index{}
- i.indices[name] = idx
- }
- if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
- // We optimize for the most common case where indexFunc returns a single value which has not been changed
- return
- }
- for _, value := range oldIndexValues {
- i.deleteKeyFromIndex(key, value, idx)
- }
- for _, value := range indexValues {
- i.addKeyToIndex(key, value, idx)
- }
- }
- // updateIndices modifies the objects location in the managed indexes:
- // - for create you must provide only the newObj
- // - for update you must provide both the oldObj and the newObj
- // - for delete you must provide only the oldObj
- // updateIndices must be called from a function that already has a lock on the cache
- func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
- for name := range i.indexers {
- i.updateSingleIndex(name, oldObj, newObj, key)
- }
- }
- func (i *storeIndex) addKeyToIndex(key, indexValue string, index index) {
- set := index[indexValue]
- if set == nil {
- set = sets.Set[string]{}
- index[indexValue] = set
- }
- set.Insert(key)
- }
- func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index index) {
- set := index[indexValue]
- if set == nil {
- return
- }
- set.Delete(key)
- // If we don't delete the set when zero, indices with high cardinality
- // short lived resources can cause memory to increase over time from
- // unused empty sets. See `kubernetes/kubernetes/issues/84959`.
- if len(set) == 0 {
- delete(index, indexValue)
- }
- }
- // threadSafeMap implements ThreadSafeStore
- type threadSafeMap struct {
- lock sync.RWMutex
- items map[string]interface{}
- // index implements the indexing functionality
- index *storeIndex
- }
- func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
- c.lock.Lock()
- defer c.lock.Unlock()
- trace := utiltrace.New("ThreadSafeMap Transaction Process",
- utiltrace.Field{Key: "Size", Value: len(txns)},
- utiltrace.Field{Key: "Reason", Value: "Slow batch process due to too many items"})
- defer trace.LogIfLong(min(500*time.Millisecond*time.Duration(len(txns)), 5*time.Second))
- for _, txn := range txns {
- switch txn.Type {
- case TransactionTypeAdd:
- c.addLocked(txn.Key, txn.Object)
- case TransactionTypeUpdate:
- c.updateLocked(txn.Key, txn.Object)
- case TransactionTypeDelete:
- c.deleteLocked(txn.Key)
- }
- }
- }
- func (c *threadSafeMap) Add(key string, obj interface{}) {
- c.Update(key, obj)
- }
- func (c *threadSafeMap) addLocked(key string, obj interface{}) {
- c.updateLocked(key, obj)
- }
- func (c *threadSafeMap) Update(key string, obj interface{}) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.updateLocked(key, obj)
- }
- func (c *threadSafeMap) updateLocked(key string, obj interface{}) {
- oldObject := c.items[key]
- c.items[key] = obj
- c.index.updateIndices(oldObject, obj, key)
- }
- func (c *threadSafeMap) Delete(key string) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.deleteLocked(key)
- }
- func (c *threadSafeMap) deleteLocked(key string) {
- if obj, exists := c.items[key]; exists {
- c.index.updateIndices(obj, nil, key)
- delete(c.items, key)
- }
- }
- func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- item, exists = c.items[key]
- return item, exists
- }
- func (c *threadSafeMap) List() []interface{} {
- c.lock.RLock()
- defer c.lock.RUnlock()
- list := make([]interface{}, 0, len(c.items))
- for _, item := range c.items {
- list = append(list, item)
- }
- return list
- }
- // ListKeys returns a list of all the keys of the objects currently
- // in the threadSafeMap.
- func (c *threadSafeMap) ListKeys() []string {
- c.lock.RLock()
- defer c.lock.RUnlock()
- list := make([]string, 0, len(c.items))
- for key := range c.items {
- list = append(list, key)
- }
- return list
- }
- func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.items = items
- // rebuild any index
- c.index.reset()
- for key, item := range c.items {
- c.index.updateIndices(nil, item, key)
- }
- }
- // Index returns a list of items that match the given object on the index function.
- // Index is thread-safe so long as you treat all items as immutable.
- func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- storeKeySet, err := c.index.getKeysFromIndex(indexName, obj)
- if err != nil {
- return nil, err
- }
- list := make([]interface{}, 0, storeKeySet.Len())
- for storeKey := range storeKeySet {
- list = append(list, c.items[storeKey])
- }
- return list, nil
- }
- // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
- func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- set, err := c.index.getKeysByIndex(indexName, indexedValue)
- if err != nil {
- return nil, err
- }
- list := make([]interface{}, 0, set.Len())
- for key := range set {
- list = append(list, c.items[key])
- }
- return list, nil
- }
- // IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
- // IndexKeys is thread-safe so long as you treat all items as immutable.
- func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- set, err := c.index.getKeysByIndex(indexName, indexedValue)
- if err != nil {
- return nil, err
- }
- return sets.List(set), nil
- }
- func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
- c.lock.RLock()
- defer c.lock.RUnlock()
- return c.index.getIndexValues(indexName)
- }
- func (c *threadSafeMap) GetIndexers() Indexers {
- return c.index.indexers
- }
- func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
- c.lock.Lock()
- defer c.lock.Unlock()
- if err := c.index.addIndexers(newIndexers); err != nil {
- return err
- }
- // If there are already items, index them
- for key, item := range c.items {
- for name := range newIndexers {
- c.index.updateSingleIndex(name, nil, item, key)
- }
- }
- return nil
- }
- func (c *threadSafeMap) Resync() error {
- // Nothing to do
- return nil
- }
- // NewThreadSafeStore creates a new instance of ThreadSafeStore.
- func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
- return &threadSafeMap{
- items: map[string]interface{}{},
- index: &storeIndex{
- indexers: indexers,
- indices: indices,
- },
- }
- }
|