store.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "strings"
  18. "k8s.io/apimachinery/pkg/api/meta"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. )
  21. // Store is a generic object storage and processing interface. A
  22. // Store holds a map from string keys to accumulators, and has
  23. // operations to add, update, and delete a given object to/from the
  24. // accumulator currently associated with a given key. A Store also
  25. // knows how to extract the key from a given object, so many operations
  26. // are given only the object.
  27. //
  28. // In the simplest Store implementations each accumulator is simply
  29. // the last given object, or empty after Delete, and thus the Store's
  30. // behavior is simple storage.
  31. //
  32. // Reflector knows how to watch a server and update a Store. This
  33. // package provides a variety of implementations of Store.
  34. type Store interface {
  35. // Add adds the given object to the accumulator associated with the given object's key
  36. Add(obj interface{}) error
  37. // Update updates the given object in the accumulator associated with the given object's key
  38. Update(obj interface{}) error
  39. // Delete deletes the given object from the accumulator associated with the given object's key
  40. Delete(obj interface{}) error
  41. // List returns a list of all the currently non-empty accumulators
  42. List() []interface{}
  43. // ListKeys returns a list of all the keys currently associated with non-empty accumulators
  44. ListKeys() []string
  45. // Get returns the accumulator associated with the given object's key
  46. Get(obj interface{}) (item interface{}, exists bool, err error)
  47. // GetByKey returns the accumulator associated with the given key
  48. GetByKey(key string) (item interface{}, exists bool, err error)
  49. // Replace will delete the contents of the store, using instead the
  50. // given list. Store takes ownership of the list, you should not reference
  51. // it after calling this function.
  52. Replace([]interface{}, string) error
  53. // Resync is meaningless in the terms appearing here but has
  54. // meaning in some implementations that have non-trivial
  55. // additional behavior (e.g., DeltaFIFO).
  56. Resync() error
  57. }
  58. // TransactionType defines the type of a transaction operation. It is used to indicate whether
  59. // an object is being added, updated, or deleted.
  60. type TransactionType string
  61. const (
  62. TransactionTypeAdd TransactionType = "Add"
  63. TransactionTypeUpdate TransactionType = "Update"
  64. TransactionTypeDelete TransactionType = "Delete"
  65. )
  66. // Transaction represents a single operation or event in a process. It holds a generic Object
  67. // associated with the transaction and a Type indicating the kind of transaction being performed.
  68. type Transaction struct {
  69. Object interface{}
  70. Type TransactionType
  71. }
  72. type TransactionStore interface {
  73. // Transaction allows multiple operations to occur within a single lock acquisition to
  74. // ensure progress can be made when there is contention.
  75. Transaction(txns ...Transaction) *TransactionError
  76. }
  77. var _ error = &TransactionError{}
  78. type TransactionError struct {
  79. SuccessfulIndices []int
  80. TotalTransactions int
  81. Errors []error
  82. }
  83. func (t *TransactionError) Error() string {
  84. return fmt.Sprintf("failed to execute (%d/%d) transactions failed due to: %v",
  85. t.TotalTransactions-len(t.SuccessfulIndices), t.TotalTransactions, t.Errors)
  86. }
  87. // KeyFunc knows how to make a key from an object. Implementations should be deterministic.
  88. type KeyFunc func(obj interface{}) (string, error)
  89. // KeyError will be returned any time a KeyFunc gives an error; it includes the object
  90. // at fault.
  91. type KeyError struct {
  92. Obj interface{}
  93. Err error
  94. }
  95. // Error gives a human-readable description of the error.
  96. func (k KeyError) Error() string {
  97. return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
  98. }
  99. // Unwrap implements errors.Unwrap
  100. func (k KeyError) Unwrap() error {
  101. return k.Err
  102. }
  103. // ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for
  104. // the object but not the object itself.
  105. type ExplicitKey string
  106. // MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
  107. // keys for API objects which implement meta.Interface.
  108. // The key uses the format <namespace>/<name> unless <namespace> is empty, then
  109. // it's just <name>.
  110. //
  111. // Clients that want a structured alternative can use ObjectToName or MetaObjectToName.
  112. // Note: this would not be a client that wants a key for a Store because those are
  113. // necessarily strings.
  114. //
  115. // TODO maybe some day?: change Store to be keyed differently
  116. func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
  117. if key, ok := obj.(ExplicitKey); ok {
  118. return string(key), nil
  119. }
  120. objName, err := ObjectToName(obj)
  121. if err != nil {
  122. return "", err
  123. }
  124. return objName.String(), nil
  125. }
  126. // ObjectToName returns the structured name for the given object,
  127. // if indeed it can be viewed as a metav1.Object.
  128. func ObjectToName(obj interface{}) (ObjectName, error) {
  129. meta, err := meta.Accessor(obj)
  130. if err != nil {
  131. return ObjectName{}, fmt.Errorf("object has no meta: %v", err)
  132. }
  133. return MetaObjectToName(meta), nil
  134. }
  135. // MetaObjectToName returns the structured name for the given object
  136. func MetaObjectToName(obj metav1.Object) ObjectName {
  137. if len(obj.GetNamespace()) > 0 {
  138. return ObjectName{Namespace: obj.GetNamespace(), Name: obj.GetName()}
  139. }
  140. return ObjectName{Namespace: "", Name: obj.GetName()}
  141. }
  142. // SplitMetaNamespaceKey returns the namespace and name that
  143. // MetaNamespaceKeyFunc encoded into key.
  144. //
  145. // TODO: replace key-as-string with a key-as-struct so that this
  146. // packing/unpacking won't be necessary.
  147. func SplitMetaNamespaceKey(key string) (namespace, name string, err error) {
  148. parts := strings.Split(key, "/")
  149. switch len(parts) {
  150. case 1:
  151. // name only, no namespace
  152. return "", parts[0], nil
  153. case 2:
  154. // namespace and name
  155. return parts[0], parts[1], nil
  156. }
  157. return "", "", fmt.Errorf("unexpected key format: %q", key)
  158. }
  159. // `*cache` implements Indexer in terms of a ThreadSafeStore and an
  160. // associated KeyFunc.
  161. type cache struct {
  162. // cacheStorage bears the burden of thread safety for the cache
  163. cacheStorage ThreadSafeStore
  164. // keyFunc is used to make the key for objects stored in and retrieved from items, and
  165. // should be deterministic.
  166. keyFunc KeyFunc
  167. // Called with every object put in the cache.
  168. transformer TransformFunc
  169. }
  170. var _ Store = &cache{}
  171. func (c *cache) Transaction(txns ...Transaction) *TransactionError {
  172. txnStore, ok := c.cacheStorage.(ThreadSafeStoreWithTransaction)
  173. if !ok {
  174. return &TransactionError{
  175. TotalTransactions: len(txns),
  176. Errors: []error{
  177. errors.New("transaction not supported"),
  178. },
  179. }
  180. }
  181. keyedTxns := make([]ThreadSafeStoreTransaction, 0, len(txns))
  182. successfulIndices := make([]int, 0, len(txns))
  183. errs := make([]error, 0)
  184. for i := range txns {
  185. txn := txns[i]
  186. key, err := c.keyFunc(txn.Object)
  187. if err != nil {
  188. errs = append(errs, KeyError{txn.Object, err})
  189. continue
  190. }
  191. successfulIndices = append(successfulIndices, i)
  192. keyedTxns = append(keyedTxns, ThreadSafeStoreTransaction{txn, key})
  193. }
  194. txnStore.Transaction(keyedTxns...)
  195. if len(errs) > 0 {
  196. return &TransactionError{
  197. SuccessfulIndices: successfulIndices,
  198. TotalTransactions: len(txns),
  199. Errors: errs,
  200. }
  201. }
  202. return nil
  203. }
  204. // Add inserts an item into the cache.
  205. func (c *cache) Add(obj interface{}) error {
  206. key, err := c.keyFunc(obj)
  207. if err != nil {
  208. return KeyError{obj, err}
  209. }
  210. if c.transformer != nil {
  211. obj, err = c.transformer(obj)
  212. if err != nil {
  213. return fmt.Errorf("transforming: %w", err)
  214. }
  215. }
  216. c.cacheStorage.Add(key, obj)
  217. return nil
  218. }
  219. // Update sets an item in the cache to its updated state.
  220. func (c *cache) Update(obj interface{}) error {
  221. key, err := c.keyFunc(obj)
  222. if err != nil {
  223. return KeyError{obj, err}
  224. }
  225. if c.transformer != nil {
  226. obj, err = c.transformer(obj)
  227. if err != nil {
  228. return fmt.Errorf("transforming: %w", err)
  229. }
  230. }
  231. c.cacheStorage.Update(key, obj)
  232. return nil
  233. }
  234. // Delete removes an item from the cache.
  235. func (c *cache) Delete(obj interface{}) error {
  236. key, err := c.keyFunc(obj)
  237. if err != nil {
  238. return KeyError{obj, err}
  239. }
  240. c.cacheStorage.Delete(key)
  241. return nil
  242. }
  243. // List returns a list of all the items.
  244. // List is completely threadsafe as long as you treat all items as immutable.
  245. func (c *cache) List() []interface{} {
  246. return c.cacheStorage.List()
  247. }
  248. // ListKeys returns a list of all the keys of the objects currently
  249. // in the cache.
  250. func (c *cache) ListKeys() []string {
  251. return c.cacheStorage.ListKeys()
  252. }
  253. // GetIndexers returns the indexers of cache
  254. func (c *cache) GetIndexers() Indexers {
  255. return c.cacheStorage.GetIndexers()
  256. }
  257. // Index returns a list of items that match on the index function
  258. // Index is thread-safe so long as you treat all items as immutable
  259. func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
  260. return c.cacheStorage.Index(indexName, obj)
  261. }
  262. // IndexKeys returns the storage keys of the stored objects whose set of
  263. // indexed values for the named index includes the given indexed value.
  264. // The returned keys are suitable to pass to GetByKey().
  265. func (c *cache) IndexKeys(indexName, indexedValue string) ([]string, error) {
  266. return c.cacheStorage.IndexKeys(indexName, indexedValue)
  267. }
  268. // ListIndexFuncValues returns the list of generated values of an Index func
  269. func (c *cache) ListIndexFuncValues(indexName string) []string {
  270. return c.cacheStorage.ListIndexFuncValues(indexName)
  271. }
  272. // ByIndex returns the stored objects whose set of indexed values
  273. // for the named index includes the given indexed value.
  274. func (c *cache) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
  275. return c.cacheStorage.ByIndex(indexName, indexedValue)
  276. }
  277. func (c *cache) AddIndexers(newIndexers Indexers) error {
  278. return c.cacheStorage.AddIndexers(newIndexers)
  279. }
  280. // Get returns the requested item, or sets exists=false.
  281. // Get is completely threadsafe as long as you treat all items as immutable.
  282. func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
  283. key, err := c.keyFunc(obj)
  284. if err != nil {
  285. return nil, false, KeyError{obj, err}
  286. }
  287. return c.GetByKey(key)
  288. }
  289. // GetByKey returns the request item, or exists=false.
  290. // GetByKey is completely threadsafe as long as you treat all items as immutable.
  291. func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
  292. item, exists = c.cacheStorage.Get(key)
  293. return item, exists, nil
  294. }
  295. // Replace will delete the contents of 'c', using instead the given list.
  296. // 'c' takes ownership of the list, you should not reference the list again
  297. // after calling this function.
  298. func (c *cache) Replace(list []interface{}, resourceVersion string) error {
  299. items := make(map[string]interface{}, len(list))
  300. for _, item := range list {
  301. key, err := c.keyFunc(item)
  302. if err != nil {
  303. return KeyError{item, err}
  304. }
  305. if c.transformer != nil {
  306. item, err = c.transformer(item)
  307. if err != nil {
  308. return fmt.Errorf("transforming: %w", err)
  309. }
  310. }
  311. items[key] = item
  312. }
  313. c.cacheStorage.Replace(items, resourceVersion)
  314. return nil
  315. }
  316. // Resync is meaningless for one of these
  317. func (c *cache) Resync() error {
  318. return nil
  319. }
  320. type StoreOption = func(*cache)
  321. func WithTransformer(transformer TransformFunc) StoreOption {
  322. return func(c *cache) {
  323. c.transformer = transformer
  324. }
  325. }
  326. // NewStore returns a Store implemented simply with a map and a lock.
  327. func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store {
  328. c := &cache{
  329. cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
  330. keyFunc: keyFunc,
  331. }
  332. for _, opt := range opts {
  333. opt(c)
  334. }
  335. return c
  336. }
  337. // NewIndexer returns an Indexer implemented simply with a map and a lock.
  338. func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
  339. return &cache{
  340. cacheStorage: NewThreadSafeStore(indexers, Indices{}),
  341. keyFunc: keyFunc,
  342. }
  343. }