watch.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package watch
  14. import (
  15. "fmt"
  16. "sync"
  17. "k8s.io/klog/v2"
  18. "k8s.io/apimachinery/pkg/runtime"
  19. "k8s.io/utils/ptr"
  20. )
  21. // Interface can be implemented by anything that knows how to watch and report changes.
  22. type Interface interface {
  23. // Stop tells the producer that the consumer is done watching, so the
  24. // producer should stop sending events and close the result channel. The
  25. // consumer should keep watching for events until the result channel is
  26. // closed.
  27. //
  28. // Because some implementations may create channels when constructed, Stop
  29. // must always be called, even if the consumer has not yet called
  30. // ResultChan().
  31. //
  32. // Only the consumer should call Stop(), not the producer. If the producer
  33. // errors and needs to stop the watch prematurely, it should instead send
  34. // an error event and close the result channel.
  35. Stop()
  36. // ResultChan returns a channel which will receive events from the event
  37. // producer. If an error occurs or Stop() is called, the producer must
  38. // close this channel and release any resources used by the watch.
  39. // Closing the result channel tells the consumer that no more events will be
  40. // sent.
  41. ResultChan() <-chan Event
  42. }
  43. // EventType defines the possible types of events.
  44. type EventType string
  45. const (
  46. Added EventType = "ADDED"
  47. Modified EventType = "MODIFIED"
  48. Deleted EventType = "DELETED"
  49. Bookmark EventType = "BOOKMARK"
  50. Error EventType = "ERROR"
  51. )
  52. var (
  53. DefaultChanSize int32 = 100
  54. )
  55. // Event represents a single event to a watched resource.
  56. // +k8s:deepcopy-gen=true
  57. type Event struct {
  58. Type EventType
  59. // Object is:
  60. // * If Type is Added or Modified: the new state of the object.
  61. // * If Type is Deleted: the state of the object immediately before deletion.
  62. // * If Type is Bookmark: the object (instance of a type being watched) where
  63. // only ResourceVersion field is set. On successful restart of watch from a
  64. // bookmark resourceVersion, client is guaranteed to not get repeat event
  65. // nor miss any events.
  66. // * If Type is Error: *api.Status is recommended; other types may make sense
  67. // depending on context.
  68. Object runtime.Object
  69. }
  70. type emptyWatch chan Event
  71. // NewEmptyWatch returns a watch interface that returns no results and is closed.
  72. // May be used in certain error conditions where no information is available but
  73. // an error is not warranted.
  74. func NewEmptyWatch() Interface {
  75. ch := make(chan Event)
  76. close(ch)
  77. return emptyWatch(ch)
  78. }
  79. // Stop implements Interface
  80. func (w emptyWatch) Stop() {
  81. }
  82. // ResultChan implements Interface
  83. func (w emptyWatch) ResultChan() <-chan Event {
  84. return chan Event(w)
  85. }
  86. // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
  87. type FakeWatcher struct {
  88. logger klog.Logger
  89. result chan Event
  90. stopped bool
  91. sync.Mutex
  92. }
  93. var _ Interface = &FakeWatcher{}
  94. // Contextual logging: NewFakeWithOptions and a logger in the FakeOptions should be used instead in code which supports contextual logging.
  95. func NewFake() *FakeWatcher {
  96. return NewFakeWithOptions(FakeOptions{})
  97. }
  98. // Contextual logging: NewFakeWithOptions and a logger in the FakeOptions should be used instead in code which supports contextual logging.
  99. func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
  100. return NewFakeWithOptions(FakeOptions{ChannelSize: size})
  101. }
  102. func NewFakeWithOptions(options FakeOptions) *FakeWatcher {
  103. return &FakeWatcher{
  104. logger: ptr.Deref(options.Logger, klog.Background()),
  105. result: make(chan Event, options.ChannelSize),
  106. }
  107. }
  108. type FakeOptions struct {
  109. Logger *klog.Logger
  110. ChannelSize int
  111. }
  112. // Stop implements Interface.Stop().
  113. func (f *FakeWatcher) Stop() {
  114. f.Lock()
  115. defer f.Unlock()
  116. if !f.stopped {
  117. f.logger.V(4).Info("Stopping fake watcher")
  118. close(f.result)
  119. f.stopped = true
  120. }
  121. }
  122. func (f *FakeWatcher) IsStopped() bool {
  123. f.Lock()
  124. defer f.Unlock()
  125. return f.stopped
  126. }
  127. // Reset prepares the watcher to be reused.
  128. func (f *FakeWatcher) Reset() {
  129. f.Lock()
  130. defer f.Unlock()
  131. f.stopped = false
  132. f.result = make(chan Event)
  133. }
  134. func (f *FakeWatcher) ResultChan() <-chan Event {
  135. return f.result
  136. }
  137. // Add sends an add event.
  138. func (f *FakeWatcher) Add(obj runtime.Object) {
  139. f.result <- Event{Added, obj}
  140. }
  141. // Modify sends a modify event.
  142. func (f *FakeWatcher) Modify(obj runtime.Object) {
  143. f.result <- Event{Modified, obj}
  144. }
  145. // Delete sends a delete event.
  146. func (f *FakeWatcher) Delete(lastValue runtime.Object) {
  147. f.result <- Event{Deleted, lastValue}
  148. }
  149. // Error sends an Error event.
  150. func (f *FakeWatcher) Error(errValue runtime.Object) {
  151. f.result <- Event{Error, errValue}
  152. }
  153. // Action sends an event of the requested type, for table-based testing.
  154. func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
  155. f.result <- Event{action, obj}
  156. }
  157. // RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
  158. type RaceFreeFakeWatcher struct {
  159. logger klog.Logger
  160. result chan Event
  161. Stopped bool
  162. sync.Mutex
  163. }
  164. var _ Interface = &RaceFreeFakeWatcher{}
  165. // Contextual logging: RaceFreeFakeWatcherWithLogger should be used instead of NewRaceFreeFake in code which supports contextual logging.
  166. func NewRaceFreeFake() *RaceFreeFakeWatcher {
  167. return NewRaceFreeFakeWithLogger(klog.Background())
  168. }
  169. func NewRaceFreeFakeWithLogger(logger klog.Logger) *RaceFreeFakeWatcher {
  170. return &RaceFreeFakeWatcher{
  171. logger: logger,
  172. result: make(chan Event, DefaultChanSize),
  173. }
  174. }
  175. // Stop implements Interface.Stop().
  176. func (f *RaceFreeFakeWatcher) Stop() {
  177. f.Lock()
  178. defer f.Unlock()
  179. if !f.Stopped {
  180. f.logger.V(4).Info("Stopping fake watcher")
  181. close(f.result)
  182. f.Stopped = true
  183. }
  184. }
  185. func (f *RaceFreeFakeWatcher) IsStopped() bool {
  186. f.Lock()
  187. defer f.Unlock()
  188. return f.Stopped
  189. }
  190. // Reset prepares the watcher to be reused.
  191. func (f *RaceFreeFakeWatcher) Reset() {
  192. f.Lock()
  193. defer f.Unlock()
  194. f.Stopped = false
  195. f.result = make(chan Event, DefaultChanSize)
  196. }
  197. func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
  198. f.Lock()
  199. defer f.Unlock()
  200. return f.result
  201. }
  202. // Add sends an add event.
  203. func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
  204. f.Lock()
  205. defer f.Unlock()
  206. if !f.Stopped {
  207. select {
  208. case f.result <- Event{Added, obj}:
  209. return
  210. default:
  211. panic(fmt.Errorf("channel full"))
  212. }
  213. }
  214. }
  215. // Modify sends a modify event.
  216. func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
  217. f.Lock()
  218. defer f.Unlock()
  219. if !f.Stopped {
  220. select {
  221. case f.result <- Event{Modified, obj}:
  222. return
  223. default:
  224. panic(fmt.Errorf("channel full"))
  225. }
  226. }
  227. }
  228. // Delete sends a delete event.
  229. func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
  230. f.Lock()
  231. defer f.Unlock()
  232. if !f.Stopped {
  233. select {
  234. case f.result <- Event{Deleted, lastValue}:
  235. return
  236. default:
  237. panic(fmt.Errorf("channel full"))
  238. }
  239. }
  240. }
  241. // Error sends an Error event.
  242. func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
  243. f.Lock()
  244. defer f.Unlock()
  245. if !f.Stopped {
  246. select {
  247. case f.result <- Event{Error, errValue}:
  248. return
  249. default:
  250. panic(fmt.Errorf("channel full"))
  251. }
  252. }
  253. }
  254. // Action sends an event of the requested type, for table-based testing.
  255. func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
  256. f.Lock()
  257. defer f.Unlock()
  258. if !f.Stopped {
  259. select {
  260. case f.result <- Event{action, obj}:
  261. return
  262. default:
  263. panic(fmt.Errorf("channel full"))
  264. }
  265. }
  266. }
  267. // ProxyWatcher lets you wrap your channel in watch Interface. threadsafe.
  268. type ProxyWatcher struct {
  269. result chan Event
  270. stopCh chan struct{}
  271. mutex sync.Mutex
  272. stopped bool
  273. }
  274. var _ Interface = &ProxyWatcher{}
  275. // NewProxyWatcher creates new ProxyWatcher by wrapping a channel
  276. func NewProxyWatcher(ch chan Event) *ProxyWatcher {
  277. return &ProxyWatcher{
  278. result: ch,
  279. stopCh: make(chan struct{}),
  280. stopped: false,
  281. }
  282. }
  283. // Stop implements Interface
  284. func (pw *ProxyWatcher) Stop() {
  285. pw.mutex.Lock()
  286. defer pw.mutex.Unlock()
  287. if !pw.stopped {
  288. pw.stopped = true
  289. close(pw.stopCh)
  290. }
  291. }
  292. // Stopping returns true if Stop() has been called
  293. func (pw *ProxyWatcher) Stopping() bool {
  294. pw.mutex.Lock()
  295. defer pw.mutex.Unlock()
  296. return pw.stopped
  297. }
  298. // ResultChan implements Interface
  299. func (pw *ProxyWatcher) ResultChan() <-chan Event {
  300. return pw.result
  301. }
  302. // StopChan returns stop channel
  303. func (pw *ProxyWatcher) StopChan() <-chan struct{} {
  304. return pw.stopCh
  305. }
  306. // MockWatcher implements watch.Interface with mockable functions.
  307. type MockWatcher struct {
  308. StopFunc func()
  309. ResultChanFunc func() <-chan Event
  310. }
  311. var _ Interface = &MockWatcher{}
  312. // Stop calls StopFunc
  313. func (mw MockWatcher) Stop() {
  314. mw.StopFunc()
  315. }
  316. // ResultChan calls ResultChanFunc
  317. func (mw MockWatcher) ResultChan() <-chan Event {
  318. return mw.ResultChanFunc()
  319. }