watchcontroller.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package costmodel
  2. import (
  3. "fmt"
  4. "reflect"
  5. "time"
  6. "k8s.io/klog"
  7. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  8. "k8s.io/apimachinery/pkg/fields"
  9. rt "k8s.io/apimachinery/pkg/runtime"
  10. "k8s.io/apimachinery/pkg/util/runtime"
  11. "k8s.io/apimachinery/pkg/util/wait"
  12. "k8s.io/client-go/rest"
  13. "k8s.io/client-go/tools/cache"
  14. "k8s.io/client-go/util/workqueue"
  15. )
  16. // Type alias for a receiver func
  17. type WatchHandler = func(interface{})
  18. // WatchController defines a contract for an object which watches a specific resource set for
  19. // add, updates, and removals
  20. type WatchController interface {
  21. // Initializes the cache
  22. WarmUp(chan struct{})
  23. // Run starts the watching process
  24. Run(int, chan struct{})
  25. // GetAll returns all of the resources
  26. GetAll() []interface{}
  27. // SetUpdateHandler sets a specific handler for adding/updating individual resources
  28. SetUpdateHandler(WatchHandler) WatchController
  29. // SetRemovedHandler sets a specific handler for removing individual resources
  30. SetRemovedHandler(WatchHandler) WatchController
  31. }
  32. // CachingWatchController composites the watching behavior and a cache to ensure that all
  33. // up to date resources are readily available
  34. type CachingWatchController struct {
  35. indexer cache.Indexer
  36. queue workqueue.RateLimitingInterface
  37. informer cache.Controller
  38. resource string
  39. resourceType string
  40. updateHandler WatchHandler
  41. removeHandler WatchHandler
  42. }
  43. func NewCachingWatcher(restClient rest.Interface, resource string, resourceType rt.Object, namespace string, fieldSelector fields.Selector) WatchController {
  44. resourceCache := cache.NewListWatchFromClient(restClient, resource, namespace, fieldSelector)
  45. queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
  46. indexer, informer := cache.NewIndexerInformer(resourceCache, resourceType, 0, cache.ResourceEventHandlerFuncs{
  47. AddFunc: func(obj interface{}) {
  48. key, err := cache.MetaNamespaceKeyFunc(obj)
  49. if err == nil {
  50. queue.Add(key)
  51. }
  52. },
  53. UpdateFunc: func(old interface{}, new interface{}) {
  54. key, err := cache.MetaNamespaceKeyFunc(new)
  55. if err == nil {
  56. queue.Add(key)
  57. }
  58. },
  59. DeleteFunc: func(obj interface{}) {
  60. // IndexerInformer uses a delta queue, therefore for deletes we have to use this
  61. // key function.
  62. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  63. if err == nil {
  64. queue.Add(key)
  65. }
  66. },
  67. }, cache.Indexers{})
  68. return &CachingWatchController{
  69. indexer: indexer,
  70. queue: queue,
  71. informer: informer,
  72. resource: resource,
  73. resourceType: reflect.TypeOf(resourceType).String(),
  74. }
  75. }
  76. func (c *CachingWatchController) GetAll() []interface{} {
  77. return c.indexer.List()
  78. }
  79. func (c *CachingWatchController) SetUpdateHandler(handler WatchHandler) WatchController {
  80. c.updateHandler = handler
  81. return c
  82. }
  83. func (c *CachingWatchController) SetRemovedHandler(handler WatchHandler) WatchController {
  84. c.removeHandler = handler
  85. return c
  86. }
  87. func (c *CachingWatchController) processNextItem() bool {
  88. // Wait until there is a new item in the working queue
  89. key, quit := c.queue.Get()
  90. if quit {
  91. return false
  92. }
  93. // Tell the queue that we are done with processing this key. This unblocks the key for other workers
  94. // This allows safe parallel processing because two pods with the same key are never processed in
  95. // parallel.
  96. defer c.queue.Done(key)
  97. // Invoke the method containing the business logic
  98. err := c.handle(key.(string))
  99. // Handle the error if something went wrong during the execution of the business logic
  100. c.handleErr(err, key)
  101. return true
  102. }
  103. // handle is the business logic of the controller.
  104. func (c *CachingWatchController) handle(key string) error {
  105. obj, exists, err := c.indexer.GetByKey(key)
  106. if err != nil {
  107. klog.Errorf("Fetching %s with key %s from store failed with %v", c.resourceType, key, err)
  108. return err
  109. }
  110. if !exists {
  111. klog.V(3).Infof("Removed %s for key: %s\n", c.resourceType, key)
  112. if c.removeHandler != nil {
  113. c.removeHandler(key)
  114. }
  115. } else {
  116. klog.V(3).Infof("Updated %s: %s\n", c.resourceType, obj.(v1.Object).GetName())
  117. if c.updateHandler != nil {
  118. c.updateHandler(obj)
  119. }
  120. }
  121. return nil
  122. }
  123. // handleErr checks if an error happened and makes sure we will retry later.
  124. func (c *CachingWatchController) handleErr(err error, key interface{}) {
  125. if err == nil {
  126. // Forget about the #AddRateLimited history of the key on every successful synchronization.
  127. // This ensures that future processing of updates for this key is not delayed because of
  128. // an outdated error history.
  129. c.queue.Forget(key)
  130. return
  131. }
  132. // This controller retries 5 times if something goes wrong. After that, it stops trying.
  133. if c.queue.NumRequeues(key) < 5 {
  134. klog.V(3).Infof("Error syncing %s %v: %v", c.resourceType, key, err)
  135. // Re-enqueue the key rate limited. Based on the rate limiter on the
  136. // queue and the re-enqueue history, the key will be processed later again.
  137. c.queue.AddRateLimited(key)
  138. return
  139. }
  140. c.queue.Forget(key)
  141. // Report to an external entity that, even after several retries, we could not successfully process this key
  142. runtime.HandleError(err)
  143. klog.Infof("Dropping %s %q out of the queue: %v", c.resourceType, key, err)
  144. }
  145. func (c *CachingWatchController) WarmUp(cancelCh chan struct{}) {
  146. go c.informer.Run(cancelCh)
  147. // Wait for all involved caches to be synced, before processing items from the queue is started
  148. if !cache.WaitForCacheSync(cancelCh, c.informer.HasSynced) {
  149. runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
  150. return
  151. }
  152. }
  153. func (c *CachingWatchController) Run(threadiness int, stopCh chan struct{}) {
  154. defer runtime.HandleCrash()
  155. // Let the workers stop when we are done
  156. defer c.queue.ShutDown()
  157. klog.V(3).Infof("Starting %s controller", c.resourceType)
  158. for i := 0; i < threadiness; i++ {
  159. go wait.Until(c.runWorker, time.Second, stopCh)
  160. }
  161. <-stopCh
  162. klog.V(3).Infof("Stopping %s controller", c.resourceType)
  163. }
  164. func (c *CachingWatchController) runWorker() {
  165. for c.processNextItem() {
  166. }
  167. }