watchcontroller.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package clustercache
  2. import (
  3. "fmt"
  4. "reflect"
  5. "time"
  6. "github.com/opencost/opencost/pkg/log"
  7. "k8s.io/apimachinery/pkg/fields"
  8. rt "k8s.io/apimachinery/pkg/runtime"
  9. "k8s.io/apimachinery/pkg/util/runtime"
  10. "k8s.io/apimachinery/pkg/util/wait"
  11. "k8s.io/client-go/rest"
  12. "k8s.io/client-go/tools/cache"
  13. "k8s.io/client-go/util/workqueue"
  14. )
  15. // Type alias for a receiver func
  16. type WatchHandler = func(interface{})
  17. // WatchController defines a contract for an object which watches a specific resource set for
  18. // add, updates, and removals
  19. type WatchController interface {
  20. // Initializes the cache
  21. WarmUp(chan struct{})
  22. // Run starts the watching process
  23. Run(int, chan struct{})
  24. // GetAll returns all of the resources
  25. GetAll() []interface{}
  26. // SetUpdateHandler sets a specific handler for adding/updating individual resources
  27. SetUpdateHandler(WatchHandler) WatchController
  28. // SetRemovedHandler sets a specific handler for removing individual resources
  29. SetRemovedHandler(WatchHandler) WatchController
  30. }
  31. // CachingWatchController composites the watching behavior and a cache to ensure that all
  32. // up to date resources are readily available
  33. type CachingWatchController struct {
  34. indexer cache.Indexer
  35. queue workqueue.RateLimitingInterface
  36. informer cache.Controller
  37. resource string
  38. resourceType string
  39. updateHandler WatchHandler
  40. removeHandler WatchHandler
  41. }
  42. func NewCachingWatcher(restClient rest.Interface, resource string, resourceType rt.Object, namespace string, fieldSelector fields.Selector) WatchController {
  43. resourceCache := cache.NewListWatchFromClient(restClient, resource, namespace, fieldSelector)
  44. queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
  45. indexer, informer := cache.NewIndexerInformer(resourceCache, resourceType, 0, cache.ResourceEventHandlerFuncs{
  46. AddFunc: func(obj interface{}) {
  47. key, err := cache.MetaNamespaceKeyFunc(obj)
  48. if err == nil {
  49. queue.Add(key)
  50. }
  51. },
  52. UpdateFunc: func(old interface{}, new interface{}) {
  53. key, err := cache.MetaNamespaceKeyFunc(new)
  54. if err == nil {
  55. queue.Add(key)
  56. }
  57. },
  58. DeleteFunc: func(obj interface{}) {
  59. // IndexerInformer uses a delta queue, therefore for deletes we have to use this
  60. // key function.
  61. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  62. if err == nil {
  63. queue.Add(key)
  64. }
  65. },
  66. }, cache.Indexers{})
  67. return &CachingWatchController{
  68. indexer: indexer,
  69. queue: queue,
  70. informer: informer,
  71. resource: resource,
  72. resourceType: reflect.TypeOf(resourceType).String(),
  73. }
  74. }
  75. func (c *CachingWatchController) GetAll() []interface{} {
  76. list := c.indexer.List()
  77. // since the indexer returns the as-is pointer to the resource,
  78. // we deep copy the resources such that callers don't corrupt the
  79. // index
  80. cloneList := make([]interface{}, 0, len(list))
  81. for _, v := range list {
  82. if deepCopyable, ok := v.(rt.Object); ok {
  83. cloneList = append(cloneList, deepCopyable.DeepCopyObject())
  84. }
  85. }
  86. return cloneList
  87. }
  88. func (c *CachingWatchController) SetUpdateHandler(handler WatchHandler) WatchController {
  89. c.updateHandler = handler
  90. return c
  91. }
  92. func (c *CachingWatchController) SetRemovedHandler(handler WatchHandler) WatchController {
  93. c.removeHandler = handler
  94. return c
  95. }
  96. func (c *CachingWatchController) processNextItem() bool {
  97. // Wait until there is a new item in the working queue
  98. key, quit := c.queue.Get()
  99. if quit {
  100. return false
  101. }
  102. // Tell the queue that we are done with processing this key. This unblocks the key for other workers
  103. // This allows safe parallel processing because two pods with the same key are never processed in
  104. // parallel.
  105. defer c.queue.Done(key)
  106. // Invoke the method containing the business logic
  107. err := c.handle(key.(string))
  108. // Handle the error if something went wrong during the execution of the business logic
  109. c.handleErr(err, key)
  110. return true
  111. }
  112. // handle is the business logic of the controller.
  113. func (c *CachingWatchController) handle(key string) error {
  114. obj, exists, err := c.indexer.GetByKey(key)
  115. if err != nil {
  116. log.Errorf("Fetching %s with key %s from store failed with %v", c.resourceType, key, err)
  117. return err
  118. }
  119. if !exists {
  120. if c.removeHandler != nil {
  121. c.removeHandler(key)
  122. }
  123. } else {
  124. if c.updateHandler != nil {
  125. c.updateHandler(obj)
  126. }
  127. }
  128. return nil
  129. }
  130. // handleErr checks if an error happened and makes sure we will retry later.
  131. func (c *CachingWatchController) handleErr(err error, key interface{}) {
  132. if err == nil {
  133. // Forget about the #AddRateLimited history of the key on every successful synchronization.
  134. // This ensures that future processing of updates for this key is not delayed because of
  135. // an outdated error history.
  136. c.queue.Forget(key)
  137. return
  138. }
  139. // This controller retries 5 times if something goes wrong. After that, it stops trying.
  140. if c.queue.NumRequeues(key) < 5 {
  141. log.Errorf("Error syncing %s %v: %v", c.resourceType, key, err)
  142. // Re-enqueue the key rate limited. Based on the rate limiter on the
  143. // queue and the re-enqueue history, the key will be processed later again.
  144. c.queue.AddRateLimited(key)
  145. return
  146. }
  147. c.queue.Forget(key)
  148. // Report to an external entity that, even after several retries, we could not successfully process this key
  149. runtime.HandleError(err)
  150. log.Infof("Dropping %s %q out of the queue: %v", c.resourceType, key, err)
  151. }
  152. func (c *CachingWatchController) WarmUp(cancelCh chan struct{}) {
  153. go c.informer.Run(cancelCh)
  154. // Wait for all involved caches to be synced, before processing items from the queue is started
  155. if !cache.WaitForCacheSync(cancelCh, c.informer.HasSynced) {
  156. runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
  157. return
  158. }
  159. }
  160. func (c *CachingWatchController) Run(threadiness int, stopCh chan struct{}) {
  161. defer runtime.HandleCrash()
  162. // Let the workers stop when we are done
  163. defer c.queue.ShutDown()
  164. log.Infof("Starting %s controller", c.resourceType)
  165. for i := 0; i < threadiness; i++ {
  166. go wait.Until(c.runWorker, time.Second, stopCh)
  167. }
  168. <-stopCh
  169. log.Infof("Stopping %s controller", c.resourceType)
  170. }
  171. func (c *CachingWatchController) runWorker() {
  172. for c.processNextItem() {
  173. }
  174. }