thread_safe_store.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "sync"
  17. "k8s.io/apimachinery/pkg/util/sets"
  18. )
  19. // ThreadSafeStore is an interface that allows concurrent indexed
  20. // access to a storage backend. It is like Indexer but does not
  21. // (necessarily) know how to extract the Store key from a given
  22. // object.
  23. //
  24. // TL;DR caveats: you must not modify anything returned by Get or List as it will break
  25. // the indexing feature in addition to not being thread safe.
  26. //
  27. // The guarantees of thread safety provided by List/Get are only valid if the caller
  28. // treats returned items as read-only. For example, a pointer inserted in the store
  29. // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
  30. // on the same key and modify the pointer in a non-thread-safe way. Also note that
  31. // modifying objects stored by the indexers (if any) will *not* automatically lead
  32. // to a re-index. So it's not a good idea to directly modify the objects returned by
  33. // Get/List, in general.
  34. type ThreadSafeStore interface {
  35. Add(key string, obj interface{})
  36. Update(key string, obj interface{})
  37. Delete(key string)
  38. Get(key string) (item interface{}, exists bool)
  39. List() []interface{}
  40. ListKeys() []string
  41. Replace(map[string]interface{}, string)
  42. Index(indexName string, obj interface{}) ([]interface{}, error)
  43. IndexKeys(indexName, indexKey string) ([]string, error)
  44. ListIndexFuncValues(name string) []string
  45. ByIndex(indexName, indexKey string) ([]interface{}, error)
  46. GetIndexers() Indexers
  47. // AddIndexers adds more indexers to this store. If you call this after you already have data
  48. // in the store, the results are undefined.
  49. AddIndexers(newIndexers Indexers) error
  50. // Resync is a no-op and is deprecated
  51. Resync() error
  52. }
  53. // threadSafeMap implements ThreadSafeStore
  54. type threadSafeMap struct {
  55. lock sync.RWMutex
  56. items map[string]interface{}
  57. // indexers maps a name to an IndexFunc
  58. indexers Indexers
  59. // indices maps a name to an Index
  60. indices Indices
  61. }
  62. func (c *threadSafeMap) Add(key string, obj interface{}) {
  63. c.lock.Lock()
  64. defer c.lock.Unlock()
  65. oldObject := c.items[key]
  66. c.items[key] = obj
  67. c.updateIndices(oldObject, obj, key)
  68. }
  69. func (c *threadSafeMap) Update(key string, obj interface{}) {
  70. c.lock.Lock()
  71. defer c.lock.Unlock()
  72. oldObject := c.items[key]
  73. c.items[key] = obj
  74. c.updateIndices(oldObject, obj, key)
  75. }
  76. func (c *threadSafeMap) Delete(key string) {
  77. c.lock.Lock()
  78. defer c.lock.Unlock()
  79. if obj, exists := c.items[key]; exists {
  80. c.deleteFromIndices(obj, key)
  81. delete(c.items, key)
  82. }
  83. }
  84. func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
  85. c.lock.RLock()
  86. defer c.lock.RUnlock()
  87. item, exists = c.items[key]
  88. return item, exists
  89. }
  90. func (c *threadSafeMap) List() []interface{} {
  91. c.lock.RLock()
  92. defer c.lock.RUnlock()
  93. list := make([]interface{}, 0, len(c.items))
  94. for _, item := range c.items {
  95. list = append(list, item)
  96. }
  97. return list
  98. }
  99. // ListKeys returns a list of all the keys of the objects currently
  100. // in the threadSafeMap.
  101. func (c *threadSafeMap) ListKeys() []string {
  102. c.lock.RLock()
  103. defer c.lock.RUnlock()
  104. list := make([]string, 0, len(c.items))
  105. for key := range c.items {
  106. list = append(list, key)
  107. }
  108. return list
  109. }
  110. func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
  111. c.lock.Lock()
  112. defer c.lock.Unlock()
  113. c.items = items
  114. // rebuild any index
  115. c.indices = Indices{}
  116. for key, item := range c.items {
  117. c.updateIndices(nil, item, key)
  118. }
  119. }
  120. // Index returns a list of items that match the given object on the index function.
  121. // Index is thread-safe so long as you treat all items as immutable.
  122. func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
  123. c.lock.RLock()
  124. defer c.lock.RUnlock()
  125. indexFunc := c.indexers[indexName]
  126. if indexFunc == nil {
  127. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  128. }
  129. indexedValues, err := indexFunc(obj)
  130. if err != nil {
  131. return nil, err
  132. }
  133. index := c.indices[indexName]
  134. var storeKeySet sets.String
  135. if len(indexedValues) == 1 {
  136. // In majority of cases, there is exactly one value matching.
  137. // Optimize the most common path - deduping is not needed here.
  138. storeKeySet = index[indexedValues[0]]
  139. } else {
  140. // Need to de-dupe the return list.
  141. // Since multiple keys are allowed, this can happen.
  142. storeKeySet = sets.String{}
  143. for _, indexedValue := range indexedValues {
  144. for key := range index[indexedValue] {
  145. storeKeySet.Insert(key)
  146. }
  147. }
  148. }
  149. list := make([]interface{}, 0, storeKeySet.Len())
  150. for storeKey := range storeKeySet {
  151. list = append(list, c.items[storeKey])
  152. }
  153. return list, nil
  154. }
  155. // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
  156. func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
  157. c.lock.RLock()
  158. defer c.lock.RUnlock()
  159. indexFunc := c.indexers[indexName]
  160. if indexFunc == nil {
  161. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  162. }
  163. index := c.indices[indexName]
  164. set := index[indexedValue]
  165. list := make([]interface{}, 0, set.Len())
  166. for key := range set {
  167. list = append(list, c.items[key])
  168. }
  169. return list, nil
  170. }
  171. // IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
  172. // IndexKeys is thread-safe so long as you treat all items as immutable.
  173. func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
  174. c.lock.RLock()
  175. defer c.lock.RUnlock()
  176. indexFunc := c.indexers[indexName]
  177. if indexFunc == nil {
  178. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  179. }
  180. index := c.indices[indexName]
  181. set := index[indexedValue]
  182. return set.List(), nil
  183. }
  184. func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
  185. c.lock.RLock()
  186. defer c.lock.RUnlock()
  187. index := c.indices[indexName]
  188. names := make([]string, 0, len(index))
  189. for key := range index {
  190. names = append(names, key)
  191. }
  192. return names
  193. }
  194. func (c *threadSafeMap) GetIndexers() Indexers {
  195. return c.indexers
  196. }
  197. func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
  198. c.lock.Lock()
  199. defer c.lock.Unlock()
  200. if len(c.items) > 0 {
  201. return fmt.Errorf("cannot add indexers to running index")
  202. }
  203. oldKeys := sets.StringKeySet(c.indexers)
  204. newKeys := sets.StringKeySet(newIndexers)
  205. if oldKeys.HasAny(newKeys.List()...) {
  206. return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
  207. }
  208. for k, v := range newIndexers {
  209. c.indexers[k] = v
  210. }
  211. return nil
  212. }
  213. // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
  214. // updateIndices must be called from a function that already has a lock on the cache
  215. func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
  216. // if we got an old object, we need to remove it before we add it again
  217. if oldObj != nil {
  218. c.deleteFromIndices(oldObj, key)
  219. }
  220. for name, indexFunc := range c.indexers {
  221. indexValues, err := indexFunc(newObj)
  222. if err != nil {
  223. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
  224. }
  225. index := c.indices[name]
  226. if index == nil {
  227. index = Index{}
  228. c.indices[name] = index
  229. }
  230. for _, indexValue := range indexValues {
  231. set := index[indexValue]
  232. if set == nil {
  233. set = sets.String{}
  234. index[indexValue] = set
  235. }
  236. set.Insert(key)
  237. }
  238. }
  239. }
  240. // deleteFromIndices removes the object from each of the managed indexes
  241. // it is intended to be called from a function that already has a lock on the cache
  242. func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
  243. for name, indexFunc := range c.indexers {
  244. indexValues, err := indexFunc(obj)
  245. if err != nil {
  246. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
  247. }
  248. index := c.indices[name]
  249. if index == nil {
  250. continue
  251. }
  252. for _, indexValue := range indexValues {
  253. set := index[indexValue]
  254. if set != nil {
  255. set.Delete(key)
  256. // If we don't delete the set when zero, indices with high cardinality
  257. // short lived resources can cause memory to increase over time from
  258. // unused empty sets. See `kubernetes/kubernetes/issues/84959`.
  259. if len(set) == 0 {
  260. delete(index, indexValue)
  261. }
  262. }
  263. }
  264. }
  265. }
  266. func (c *threadSafeMap) Resync() error {
  267. // Nothing to do
  268. return nil
  269. }
  270. // NewThreadSafeStore creates a new instance of ThreadSafeStore.
  271. func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
  272. return &threadSafeMap{
  273. items: map[string]interface{}{},
  274. indexers: indexers,
  275. indices: indices,
  276. }
  277. }