containeruptime.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "k8s.io/klog"
  6. v1 "k8s.io/api/core/v1"
  7. "k8s.io/apimachinery/pkg/fields"
  8. "k8s.io/apimachinery/pkg/util/runtime"
  9. "k8s.io/apimachinery/pkg/util/wait"
  10. "k8s.io/client-go/kubernetes"
  11. "k8s.io/client-go/tools/cache"
  12. "k8s.io/client-go/util/workqueue"
  13. )
  14. type Controller struct {
  15. indexer cache.Indexer
  16. queue workqueue.RateLimitingInterface
  17. informer cache.Controller
  18. }
  19. func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
  20. return &Controller{
  21. informer: informer,
  22. indexer: indexer,
  23. queue: queue,
  24. }
  25. }
  26. func (c *Controller) processNextItem() bool {
  27. // Wait until there is a new item in the working queue
  28. key, quit := c.queue.Get()
  29. if quit {
  30. return false
  31. }
  32. // Tell the queue that we are done with processing this key. This unblocks the key for other workers
  33. // This allows safe parallel processing because two pods with the same key are never processed in
  34. // parallel.
  35. defer c.queue.Done(key)
  36. // Invoke the method containing the business logic
  37. err := c.syncToPrometheus(key.(string))
  38. // Handle the error if something went wrong during the execution of the business logic
  39. c.handleErr(err, key)
  40. return true
  41. }
  42. // syncToPrometheus is the business logic of the controller. In this controller it simply prints
  43. // information about the pod to stdout. In case an error happened, it has to simply return the error.
  44. // The retry logic should not be part of the business logic.
  45. func (c *Controller) syncToPrometheus(key string) error {
  46. obj, exists, err := c.indexer.GetByKey(key)
  47. if err != nil {
  48. klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
  49. return err
  50. }
  51. if !exists {
  52. // Below we will warm up our cache with a Pod, so that we will see a delete for one pod
  53. klog.V(1).Infof("Pod %s does not exist anymore\n", key)
  54. } else {
  55. // Note that you also have to check the uid if you have a local controlled resource, which
  56. // is dependent on the actual instance, to detect that a Pod was recreated with the same name
  57. klog.V(1).Infof("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
  58. }
  59. return nil
  60. }
  61. func (c *Controller) GetAll() []*v1.Pod {
  62. objs := c.indexer.List()
  63. var pods []*v1.Pod
  64. for _, obj := range objs {
  65. pods = append(pods, obj.(*v1.Pod))
  66. }
  67. return pods
  68. }
  69. // handleErr checks if an error happened and makes sure we will retry later.
  70. func (c *Controller) handleErr(err error, key interface{}) {
  71. if err == nil {
  72. // Forget about the #AddRateLimited history of the key on every successful synchronization.
  73. // This ensures that future processing of updates for this key is not delayed because of
  74. // an outdated error history.
  75. c.queue.Forget(key)
  76. return
  77. }
  78. // This controller retries 5 times if something goes wrong. After that, it stops trying.
  79. if c.queue.NumRequeues(key) < 5 {
  80. klog.Infof("Error syncing pod %v: %v", key, err)
  81. // Re-enqueue the key rate limited. Based on the rate limiter on the
  82. // queue and the re-enqueue history, the key will be processed later again.
  83. c.queue.AddRateLimited(key)
  84. return
  85. }
  86. c.queue.Forget(key)
  87. // Report to an external entity that, even after several retries, we could not successfully process this key
  88. runtime.HandleError(err)
  89. klog.Infof("Dropping pod %q out of the queue: %v", key, err)
  90. }
  91. func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
  92. defer runtime.HandleCrash()
  93. // Let the workers stop when we are done
  94. defer c.queue.ShutDown()
  95. klog.Info("Starting Pod controller")
  96. go c.informer.Run(stopCh)
  97. // Wait for all involved caches to be synced, before processing items from the queue is started
  98. if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
  99. runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
  100. return
  101. }
  102. for i := 0; i < threadiness; i++ {
  103. go wait.Until(c.runWorker, time.Second, stopCh)
  104. }
  105. <-stopCh
  106. klog.Info("Stopping Pod controller")
  107. }
  108. func (c *Controller) runWorker() {
  109. for c.processNextItem() {
  110. }
  111. }
  112. func ContainerUptimeWatcher(clientset kubernetes.Interface) {
  113. podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
  114. // create the workqueue
  115. queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
  116. // Bind the workqueue to a cache with the help of an informer. This way we make sure that
  117. // whenever the cache is updated, the pod key is added to the workqueue.
  118. // Note that when we finally process the item from the workqueue, we might see a newer version
  119. // of the Pod than the version which was responsible for triggering the update.
  120. indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
  121. AddFunc: func(obj interface{}) {
  122. key, err := cache.MetaNamespaceKeyFunc(obj)
  123. if err == nil {
  124. queue.Add(key)
  125. }
  126. },
  127. UpdateFunc: func(old interface{}, new interface{}) {
  128. key, err := cache.MetaNamespaceKeyFunc(new)
  129. if err == nil {
  130. queue.Add(key)
  131. }
  132. },
  133. DeleteFunc: func(obj interface{}) {
  134. // IndexerInformer uses a delta queue, therefore for deletes we have to use this
  135. // key function.
  136. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  137. if err == nil {
  138. queue.Add(key)
  139. }
  140. },
  141. }, cache.Indexers{})
  142. controller := NewController(queue, indexer, informer)
  143. /*
  144. podList, _ := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
  145. for _, pod := range podList.Items {
  146. indexer.Add(&pod)
  147. }
  148. */
  149. // Now let's start the controller
  150. stop := make(chan struct{})
  151. //defer close(stop)
  152. go controller.Run(1, stop)
  153. }