mutation_detector.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "os"
  17. "reflect"
  18. "strconv"
  19. "sync"
  20. "time"
  21. "k8s.io/klog/v2"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/util/diff"
  24. )
  25. var mutationDetectionEnabled = false
  26. func init() {
  27. mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
  28. }
  29. // MutationDetector is able to monitor objects for mutation within a limited window of time
  30. type MutationDetector interface {
  31. // AddObject adds the given object to the set being monitored for a while from now
  32. AddObject(obj interface{})
  33. // Run starts the monitoring and does not return until the monitoring is stopped.
  34. Run(stopCh <-chan struct{})
  35. }
  36. // NewCacheMutationDetector creates a new instance for the defaultCacheMutationDetector.
  37. func NewCacheMutationDetector(name string) MutationDetector {
  38. if !mutationDetectionEnabled {
  39. return dummyMutationDetector{}
  40. }
  41. //nolint:logcheck // This code shouldn't be used in production.
  42. klog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
  43. return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute}
  44. }
  45. type dummyMutationDetector struct{}
  46. func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
  47. }
  48. func (dummyMutationDetector) AddObject(obj interface{}) {
  49. }
  50. // defaultCacheMutationDetector gives a way to detect if a cached object has been mutated
  51. // It has a list of cached objects and their copies. I haven't thought of a way
  52. // to see WHO is mutating it, just that it's getting mutated.
  53. type defaultCacheMutationDetector struct {
  54. name string
  55. period time.Duration
  56. // compareLock ensures only a single call to CompareObjects runs at a time
  57. compareObjectsLock sync.Mutex
  58. // addLock guards addedObjs between AddObject and CompareObjects
  59. addedObjsLock sync.Mutex
  60. addedObjs []cacheObj
  61. cachedObjs []cacheObj
  62. retainDuration time.Duration
  63. lastRotated time.Time
  64. retainedCachedObjs []cacheObj
  65. // failureFunc is injectable for unit testing. If you don't have it, the process will panic.
  66. // This panic is intentional, since turning on this detection indicates you want a strong
  67. // failure signal. This failure is effectively a p0 bug and you can't trust process results
  68. // after a mutation anyway.
  69. failureFunc func(message string)
  70. }
  71. // cacheObj holds the actual object and a copy
  72. type cacheObj struct {
  73. cached interface{}
  74. copied interface{}
  75. }
  76. func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
  77. // we DON'T want protection from panics. If we're running this code, we want to die
  78. for {
  79. if d.lastRotated.IsZero() {
  80. d.lastRotated = time.Now()
  81. } else if time.Since(d.lastRotated) > d.retainDuration {
  82. d.retainedCachedObjs = d.cachedObjs
  83. d.cachedObjs = nil
  84. d.lastRotated = time.Now()
  85. }
  86. d.CompareObjects()
  87. select {
  88. case <-stopCh:
  89. return
  90. case <-time.After(d.period):
  91. }
  92. }
  93. }
  94. // AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object
  95. // but that covers the vast majority of our cached objects
  96. func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
  97. if _, ok := obj.(DeletedFinalStateUnknown); ok {
  98. return
  99. }
  100. if obj, ok := obj.(runtime.Object); ok {
  101. copiedObj := obj.DeepCopyObject()
  102. d.addedObjsLock.Lock()
  103. defer d.addedObjsLock.Unlock()
  104. d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj})
  105. }
  106. }
  107. func (d *defaultCacheMutationDetector) CompareObjects() {
  108. d.compareObjectsLock.Lock()
  109. defer d.compareObjectsLock.Unlock()
  110. // move addedObjs into cachedObjs under lock
  111. // this keeps the critical section small to avoid blocking AddObject while we compare cachedObjs
  112. d.addedObjsLock.Lock()
  113. d.cachedObjs = append(d.cachedObjs, d.addedObjs...)
  114. d.addedObjs = nil
  115. d.addedObjsLock.Unlock()
  116. altered := false
  117. for i, obj := range d.cachedObjs {
  118. if !reflect.DeepEqual(obj.cached, obj.copied) {
  119. fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
  120. altered = true
  121. }
  122. }
  123. for i, obj := range d.retainedCachedObjs {
  124. if !reflect.DeepEqual(obj.cached, obj.copied) {
  125. fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
  126. altered = true
  127. }
  128. }
  129. if altered {
  130. msg := fmt.Sprintf("cache %s modified", d.name)
  131. if d.failureFunc != nil {
  132. d.failureFunc(msg)
  133. return
  134. }
  135. panic(msg)
  136. }
  137. }