thread_safe_store.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. utiltrace "k8s.io/utils/trace"
  20. )
  21. // ThreadSafeStore is an interface that allows concurrent indexed
  22. // access to a storage backend. It is like Indexer but does not
  23. // (necessarily) know how to extract the Store key from a given
  24. // object.
  25. //
  26. // TL;DR caveats: you must not modify anything returned by Get or List as it will break
  27. // the indexing feature in addition to not being thread safe.
  28. //
  29. // The guarantees of thread safety provided by List/Get are only valid if the caller
  30. // treats returned items as read-only. For example, a pointer inserted in the store
  31. // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
  32. // on the same key and modify the pointer in a non-thread-safe way. Also note that
  33. // modifying objects stored by the indexers (if any) will *not* automatically lead
  34. // to a re-index. So it's not a good idea to directly modify the objects returned by
  35. // Get/List, in general.
  36. type ThreadSafeStore interface {
  37. Add(key string, obj interface{})
  38. Update(key string, obj interface{})
  39. Delete(key string)
  40. Get(key string) (item interface{}, exists bool)
  41. List() []interface{}
  42. ListKeys() []string
  43. Replace(map[string]interface{}, string)
  44. Index(indexName string, obj interface{}) ([]interface{}, error)
  45. IndexKeys(indexName, indexedValue string) ([]string, error)
  46. ListIndexFuncValues(name string) []string
  47. ByIndex(indexName, indexedValue string) ([]interface{}, error)
  48. GetIndexers() Indexers
  49. // AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
  50. AddIndexers(newIndexers Indexers) error
  51. // Resync is a no-op and is deprecated
  52. Resync() error
  53. }
  54. // ThreadSafeStoreWithTransaction is a store that can batch execute multiple transactions.
  55. type ThreadSafeStoreWithTransaction interface {
  56. ThreadSafeStore
  57. // Transaction allows performing multiple writes in one call.
  58. Transaction(fns ...ThreadSafeStoreTransaction)
  59. }
  60. // ThreadSafeStoreTransaction embeds a Transaction and includes the specific Key identifying the affected object.
  61. type ThreadSafeStoreTransaction struct {
  62. Transaction
  63. Key string
  64. }
  65. // storeIndex implements the indexing functionality for Store interface
  66. type storeIndex struct {
  67. // indexers maps a name to an IndexFunc
  68. indexers Indexers
  69. // indices maps a name to an Index
  70. indices Indices
  71. }
  72. func (i *storeIndex) reset() {
  73. i.indices = Indices{}
  74. }
  75. func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.Set[string], error) {
  76. indexFunc := i.indexers[indexName]
  77. if indexFunc == nil {
  78. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  79. }
  80. indexedValues, err := indexFunc(obj)
  81. if err != nil {
  82. return nil, err
  83. }
  84. index := i.indices[indexName]
  85. var storeKeySet sets.Set[string]
  86. if len(indexedValues) == 1 {
  87. // In majority of cases, there is exactly one value matching.
  88. // Optimize the most common path - deduping is not needed here.
  89. storeKeySet = index[indexedValues[0]]
  90. } else {
  91. // Need to de-dupe the return list.
  92. // Since multiple keys are allowed, this can happen.
  93. storeKeySet = sets.Set[string]{}
  94. for _, indexedValue := range indexedValues {
  95. for key := range index[indexedValue] {
  96. storeKeySet.Insert(key)
  97. }
  98. }
  99. }
  100. return storeKeySet, nil
  101. }
  102. func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.Set[string], error) {
  103. indexFunc := i.indexers[indexName]
  104. if indexFunc == nil {
  105. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  106. }
  107. index := i.indices[indexName]
  108. return index[indexedValue], nil
  109. }
  110. func (i *storeIndex) getIndexValues(indexName string) []string {
  111. index := i.indices[indexName]
  112. names := make([]string, 0, len(index))
  113. for key := range index {
  114. names = append(names, key)
  115. }
  116. return names
  117. }
  118. func (i *storeIndex) addIndexers(newIndexers Indexers) error {
  119. oldKeys := sets.KeySet(i.indexers)
  120. newKeys := sets.KeySet(newIndexers)
  121. if oldKeys.HasAny(sets.List(newKeys)...) {
  122. return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
  123. }
  124. for k, v := range newIndexers {
  125. i.indexers[k] = v
  126. }
  127. return nil
  128. }
  129. // updateSingleIndex modifies the objects location in the named index:
  130. // - for create you must provide only the newObj
  131. // - for update you must provide both the oldObj and the newObj
  132. // - for delete you must provide only the oldObj
  133. // updateSingleIndex must be called from a function that already has a lock on the cache
  134. func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
  135. var oldIndexValues, indexValues []string
  136. indexFunc, ok := i.indexers[name]
  137. if !ok {
  138. // Should never happen. Caller is responsible for ensuring this exists, and should call with lock
  139. // held to avoid any races.
  140. panic(fmt.Errorf("indexer %q does not exist", name))
  141. }
  142. if oldObj != nil {
  143. var err error
  144. oldIndexValues, err = indexFunc(oldObj)
  145. if err != nil {
  146. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
  147. }
  148. } else {
  149. oldIndexValues = oldIndexValues[:0]
  150. }
  151. if newObj != nil {
  152. var err error
  153. indexValues, err = indexFunc(newObj)
  154. if err != nil {
  155. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
  156. }
  157. } else {
  158. indexValues = indexValues[:0]
  159. }
  160. idx := i.indices[name]
  161. if idx == nil {
  162. idx = index{}
  163. i.indices[name] = idx
  164. }
  165. if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
  166. // We optimize for the most common case where indexFunc returns a single value which has not been changed
  167. return
  168. }
  169. for _, value := range oldIndexValues {
  170. i.deleteKeyFromIndex(key, value, idx)
  171. }
  172. for _, value := range indexValues {
  173. i.addKeyToIndex(key, value, idx)
  174. }
  175. }
  176. // updateIndices modifies the objects location in the managed indexes:
  177. // - for create you must provide only the newObj
  178. // - for update you must provide both the oldObj and the newObj
  179. // - for delete you must provide only the oldObj
  180. // updateIndices must be called from a function that already has a lock on the cache
  181. func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
  182. for name := range i.indexers {
  183. i.updateSingleIndex(name, oldObj, newObj, key)
  184. }
  185. }
  186. func (i *storeIndex) addKeyToIndex(key, indexValue string, index index) {
  187. set := index[indexValue]
  188. if set == nil {
  189. set = sets.Set[string]{}
  190. index[indexValue] = set
  191. }
  192. set.Insert(key)
  193. }
  194. func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index index) {
  195. set := index[indexValue]
  196. if set == nil {
  197. return
  198. }
  199. set.Delete(key)
  200. // If we don't delete the set when zero, indices with high cardinality
  201. // short lived resources can cause memory to increase over time from
  202. // unused empty sets. See `kubernetes/kubernetes/issues/84959`.
  203. if len(set) == 0 {
  204. delete(index, indexValue)
  205. }
  206. }
  207. // threadSafeMap implements ThreadSafeStore
  208. type threadSafeMap struct {
  209. lock sync.RWMutex
  210. items map[string]interface{}
  211. // index implements the indexing functionality
  212. index *storeIndex
  213. }
  214. func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
  215. c.lock.Lock()
  216. defer c.lock.Unlock()
  217. trace := utiltrace.New("ThreadSafeMap Transaction Process",
  218. utiltrace.Field{Key: "Size", Value: len(txns)},
  219. utiltrace.Field{Key: "Reason", Value: "Slow batch process due to too many items"})
  220. defer trace.LogIfLong(min(500*time.Millisecond*time.Duration(len(txns)), 5*time.Second))
  221. for _, txn := range txns {
  222. switch txn.Type {
  223. case TransactionTypeAdd:
  224. c.addLocked(txn.Key, txn.Object)
  225. case TransactionTypeUpdate:
  226. c.updateLocked(txn.Key, txn.Object)
  227. case TransactionTypeDelete:
  228. c.deleteLocked(txn.Key)
  229. }
  230. }
  231. }
  232. func (c *threadSafeMap) Add(key string, obj interface{}) {
  233. c.Update(key, obj)
  234. }
  235. func (c *threadSafeMap) addLocked(key string, obj interface{}) {
  236. c.updateLocked(key, obj)
  237. }
  238. func (c *threadSafeMap) Update(key string, obj interface{}) {
  239. c.lock.Lock()
  240. defer c.lock.Unlock()
  241. c.updateLocked(key, obj)
  242. }
  243. func (c *threadSafeMap) updateLocked(key string, obj interface{}) {
  244. oldObject := c.items[key]
  245. c.items[key] = obj
  246. c.index.updateIndices(oldObject, obj, key)
  247. }
  248. func (c *threadSafeMap) Delete(key string) {
  249. c.lock.Lock()
  250. defer c.lock.Unlock()
  251. c.deleteLocked(key)
  252. }
  253. func (c *threadSafeMap) deleteLocked(key string) {
  254. if obj, exists := c.items[key]; exists {
  255. c.index.updateIndices(obj, nil, key)
  256. delete(c.items, key)
  257. }
  258. }
  259. func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
  260. c.lock.RLock()
  261. defer c.lock.RUnlock()
  262. item, exists = c.items[key]
  263. return item, exists
  264. }
  265. func (c *threadSafeMap) List() []interface{} {
  266. c.lock.RLock()
  267. defer c.lock.RUnlock()
  268. list := make([]interface{}, 0, len(c.items))
  269. for _, item := range c.items {
  270. list = append(list, item)
  271. }
  272. return list
  273. }
  274. // ListKeys returns a list of all the keys of the objects currently
  275. // in the threadSafeMap.
  276. func (c *threadSafeMap) ListKeys() []string {
  277. c.lock.RLock()
  278. defer c.lock.RUnlock()
  279. list := make([]string, 0, len(c.items))
  280. for key := range c.items {
  281. list = append(list, key)
  282. }
  283. return list
  284. }
  285. func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
  286. c.lock.Lock()
  287. defer c.lock.Unlock()
  288. c.items = items
  289. // rebuild any index
  290. c.index.reset()
  291. for key, item := range c.items {
  292. c.index.updateIndices(nil, item, key)
  293. }
  294. }
  295. // Index returns a list of items that match the given object on the index function.
  296. // Index is thread-safe so long as you treat all items as immutable.
  297. func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
  298. c.lock.RLock()
  299. defer c.lock.RUnlock()
  300. storeKeySet, err := c.index.getKeysFromIndex(indexName, obj)
  301. if err != nil {
  302. return nil, err
  303. }
  304. list := make([]interface{}, 0, storeKeySet.Len())
  305. for storeKey := range storeKeySet {
  306. list = append(list, c.items[storeKey])
  307. }
  308. return list, nil
  309. }
  310. // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
  311. func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
  312. c.lock.RLock()
  313. defer c.lock.RUnlock()
  314. set, err := c.index.getKeysByIndex(indexName, indexedValue)
  315. if err != nil {
  316. return nil, err
  317. }
  318. list := make([]interface{}, 0, set.Len())
  319. for key := range set {
  320. list = append(list, c.items[key])
  321. }
  322. return list, nil
  323. }
  324. // IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
  325. // IndexKeys is thread-safe so long as you treat all items as immutable.
  326. func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
  327. c.lock.RLock()
  328. defer c.lock.RUnlock()
  329. set, err := c.index.getKeysByIndex(indexName, indexedValue)
  330. if err != nil {
  331. return nil, err
  332. }
  333. return sets.List(set), nil
  334. }
  335. func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
  336. c.lock.RLock()
  337. defer c.lock.RUnlock()
  338. return c.index.getIndexValues(indexName)
  339. }
  340. func (c *threadSafeMap) GetIndexers() Indexers {
  341. return c.index.indexers
  342. }
  343. func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
  344. c.lock.Lock()
  345. defer c.lock.Unlock()
  346. if err := c.index.addIndexers(newIndexers); err != nil {
  347. return err
  348. }
  349. // If there are already items, index them
  350. for key, item := range c.items {
  351. for name := range newIndexers {
  352. c.index.updateSingleIndex(name, nil, item, key)
  353. }
  354. }
  355. return nil
  356. }
  357. func (c *threadSafeMap) Resync() error {
  358. // Nothing to do
  359. return nil
  360. }
  361. // NewThreadSafeStore creates a new instance of ThreadSafeStore.
  362. func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
  363. return &threadSafeMap{
  364. items: map[string]interface{}{},
  365. index: &storeIndex{
  366. indexers: indexers,
  367. indices: indices,
  368. },
  369. }
  370. }