streamwatcher.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package watch
  14. import (
  15. "fmt"
  16. "io"
  17. "sync"
  18. "k8s.io/klog/v2"
  19. "k8s.io/apimachinery/pkg/runtime"
  20. "k8s.io/apimachinery/pkg/util/net"
  21. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  22. )
  23. // Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
  24. type Decoder interface {
  25. // Decode should return the type of event, the decoded object, or an error.
  26. // An error will cause StreamWatcher to call Close(). Decode should block until
  27. // it has data or an error occurs.
  28. Decode() (action EventType, object runtime.Object, err error)
  29. // Close should close the underlying io.Reader, signalling to the source of
  30. // the stream that it is no longer being watched. Close() must cause any
  31. // outstanding call to Decode() to return with an error of some sort.
  32. Close()
  33. }
  34. // Reporter hides the details of how an error is turned into a runtime.Object for
  35. // reporting on a watch stream since this package may not import a higher level report.
  36. type Reporter interface {
  37. // AsObject must convert err into a valid runtime.Object for the watch stream.
  38. AsObject(err error) runtime.Object
  39. }
  40. // StreamWatcher turns any stream for which you can write a Decoder interface
  41. // into a watch.Interface.
  42. type StreamWatcher struct {
  43. logger klog.Logger
  44. sync.Mutex
  45. source Decoder
  46. reporter Reporter
  47. result chan Event
  48. done chan struct{}
  49. }
  50. // NewStreamWatcher creates a StreamWatcher from the given decoder.
  51. //
  52. // Contextual logging: NewStreamWatcherWithLogger should be used instead of NewStreamWatcher in code which supports contextual logging.
  53. func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
  54. return NewStreamWatcherWithLogger(klog.Background(), d, r)
  55. }
  56. // NewStreamWatcherWithLogger creates a StreamWatcher from the given decoder and logger.
  57. func NewStreamWatcherWithLogger(logger klog.Logger, d Decoder, r Reporter) *StreamWatcher {
  58. sw := &StreamWatcher{
  59. logger: logger,
  60. source: d,
  61. reporter: r,
  62. // It's easy for a consumer to add buffering via an extra
  63. // goroutine/channel, but impossible for them to remove it,
  64. // so nonbuffered is better.
  65. result: make(chan Event),
  66. // If the watcher is externally stopped there is no receiver anymore
  67. // and the send operations on the result channel, especially the
  68. // error reporting might block forever.
  69. // Therefore a dedicated stop channel is used to resolve this blocking.
  70. done: make(chan struct{}),
  71. }
  72. go sw.receive()
  73. return sw
  74. }
  75. // ResultChan implements Interface.
  76. func (sw *StreamWatcher) ResultChan() <-chan Event {
  77. return sw.result
  78. }
  79. // Stop implements Interface.
  80. func (sw *StreamWatcher) Stop() {
  81. // Call Close() exactly once by locking and setting a flag.
  82. sw.Lock()
  83. defer sw.Unlock()
  84. // closing a closed channel always panics, therefore check before closing
  85. select {
  86. case <-sw.done:
  87. default:
  88. close(sw.done)
  89. sw.source.Close()
  90. }
  91. }
  92. // receive reads result from the decoder in a loop and sends down the result channel.
  93. func (sw *StreamWatcher) receive() {
  94. defer utilruntime.HandleCrashWithLogger(sw.logger)
  95. defer close(sw.result)
  96. defer sw.Stop()
  97. for {
  98. action, obj, err := sw.source.Decode()
  99. if err != nil {
  100. switch err {
  101. case io.EOF:
  102. // watch closed normally
  103. case io.ErrUnexpectedEOF:
  104. sw.logger.V(1).Info("Unexpected EOF during watch stream event decoding", "err", err)
  105. default:
  106. if net.IsProbableEOF(err) || net.IsTimeout(err) {
  107. sw.logger.V(5).Info("Unable to decode an event from the watch stream", "err", err)
  108. } else {
  109. select {
  110. case <-sw.done:
  111. case sw.result <- Event{
  112. Type: Error,
  113. Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
  114. }:
  115. }
  116. }
  117. }
  118. return
  119. }
  120. select {
  121. case <-sw.done:
  122. return
  123. case sw.result <- Event{
  124. Type: action,
  125. Object: obj,
  126. }:
  127. }
  128. }
  129. }