浏览代码

wrap informers in goroutine to prevent errorchan panics

Alexander Belanger 4 年之前
父节点
当前提交
1f3518b7ef
共有 1 个文件被更改,包括 148 次插入116 次删除
  1. 148 116
      internal/kubernetes/agent.go

+ 148 - 116
internal/kubernetes/agent.go

@@ -841,43 +841,10 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 		stopper := make(chan struct{})
 		errorchan := make(chan error)
 
-		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
-			if strings.HasSuffix(err.Error(), ": Unauthorized") {
-				errorchan <- &AuthError{}
-			}
-		})
-
-		informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
-			UpdateFunc: func(oldObj, newObj interface{}) {
-				msg := Message{
-					EventType: "UPDATE",
-					Object:    newObj,
-					Kind:      strings.ToLower(kind),
-				}
-				rw.WriteJSONWithChannel(msg)
-			},
-			AddFunc: func(obj interface{}) {
-				msg := Message{
-					EventType: "ADD",
-					Object:    obj,
-					Kind:      strings.ToLower(kind),
-				}
-				rw.WriteJSONWithChannel(msg)
-			},
-			DeleteFunc: func(obj interface{}) {
-				msg := Message{
-					EventType: "DELETE",
-					Object:    obj,
-					Kind:      strings.ToLower(kind),
-				}
-				rw.WriteJSONWithChannel(msg)
-			},
-		})
-
 		var wg sync.WaitGroup
 		var err error
 
-		wg.Add(1)
+		wg.Add(2)
 
 		go func() {
 			wg.Wait()
@@ -896,7 +863,59 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 			}
 		}()
 
-		go informer.Run(stopper)
+		go func() {
+			// listens for websocket closing handshake
+			defer wg.Done()
+
+			informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
+				if strings.HasSuffix(err.Error(), ": Unauthorized") {
+					errorchan <- &AuthError{}
+				}
+			})
+
+			informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+				UpdateFunc: func(oldObj, newObj interface{}) {
+					msg := Message{
+						EventType: "UPDATE",
+						Object:    newObj,
+						Kind:      strings.ToLower(kind),
+					}
+					err := rw.WriteJSONWithChannel(msg)
+
+					if err != nil {
+						errorchan <- err
+					}
+				},
+				AddFunc: func(obj interface{}) {
+					msg := Message{
+						EventType: "ADD",
+						Object:    obj,
+						Kind:      strings.ToLower(kind),
+					}
+
+					err := rw.WriteJSONWithChannel(msg)
+
+					if err != nil {
+						errorchan <- err
+					}
+				},
+				DeleteFunc: func(obj interface{}) {
+					msg := Message{
+						EventType: "DELETE",
+						Object:    obj,
+						Kind:      strings.ToLower(kind),
+					}
+
+					err := rw.WriteJSONWithChannel(msg)
+
+					if err != nil {
+						errorchan <- err
+					}
+				},
+			})
+
+			informer.Run(stopper)
+		}()
 
 		for err = range errorchan {
 			close(stopper)
@@ -996,116 +1015,129 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 		stopper := make(chan struct{})
 		errorchan := make(chan error)
 
-		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
-			if strings.HasSuffix(err.Error(), ": Unauthorized") {
-				errorchan <- &AuthError{}
-			}
-		})
+		var wg sync.WaitGroup
+		var err error
 
-		informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
-			UpdateFunc: func(oldObj, newObj interface{}) {
-				secretObj, ok := newObj.(*v1.Secret)
+		wg.Add(2)
 
-				if !ok {
-					errorchan <- fmt.Errorf("could not cast to secret")
-					return
-				}
+		go func() {
+			wg.Wait()
+			close(errorchan)
+		}()
 
-				helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
+		go func() {
+			// listens for websocket closing handshake
+			defer wg.Done()
 
-				if isNotHelmRelease && err == nil {
+			for {
+				if _, _, err := rw.ReadMessage(); err != nil {
+					errorchan <- nil
 					return
 				}
+			}
+		}()
 
-				if err != nil {
-					errorchan <- err
-					return
+		go informer.Run(stopper)
+
+		go func() {
+			// listens for websocket closing handshake
+			defer wg.Done()
+
+			informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
+				if strings.HasSuffix(err.Error(), ": Unauthorized") {
+					errorchan <- &AuthError{}
 				}
+			})
 
-				msg := Message{
-					EventType: "UPDATE",
-					Object:    helm_object,
+			informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
+				if strings.HasSuffix(err.Error(), ": Unauthorized") {
+					errorchan <- &AuthError{}
 				}
+			})
 
-				rw.WriteJSONWithChannel(msg)
-			},
-			AddFunc: func(obj interface{}) {
-				secretObj, ok := obj.(*v1.Secret)
+			informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+				UpdateFunc: func(oldObj, newObj interface{}) {
+					secretObj, ok := newObj.(*v1.Secret)
 
-				if !ok {
-					errorchan <- fmt.Errorf("could not cast to secret")
-					return
-				}
+					if !ok {
+						errorchan <- fmt.Errorf("could not cast to secret")
+						return
+					}
 
-				helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
+					helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
 
-				if isNotHelmRelease && err == nil {
-					return
-				}
+					if isNotHelmRelease && err == nil {
+						return
+					}
 
-				if err != nil {
-					errorchan <- err
-					return
-				}
+					if err != nil {
+						errorchan <- err
+						return
+					}
 
-				msg := Message{
-					EventType: "ADD",
-					Object:    helm_object,
-				}
+					msg := Message{
+						EventType: "UPDATE",
+						Object:    helm_object,
+					}
 
-				rw.WriteJSONWithChannel(msg)
-			},
-			DeleteFunc: func(obj interface{}) {
-				secretObj, ok := obj.(*v1.Secret)
+					rw.WriteJSONWithChannel(msg)
+				},
+				AddFunc: func(obj interface{}) {
+					secretObj, ok := obj.(*v1.Secret)
 
-				if !ok {
-					errorchan <- fmt.Errorf("could not cast to secret")
-					return
-				}
+					if !ok {
+						errorchan <- fmt.Errorf("could not cast to secret")
+						return
+					}
 
-				helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
+					helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
 
-				if isNotHelmRelease && err == nil {
-					return
-				}
+					if isNotHelmRelease && err == nil {
+						return
+					}
 
-				if err != nil {
-					errorchan <- err
-					return
-				}
+					if err != nil {
+						errorchan <- err
+						return
+					}
 
-				msg := Message{
-					EventType: "DELETE",
-					Object:    helm_object,
-				}
+					msg := Message{
+						EventType: "ADD",
+						Object:    helm_object,
+					}
 
-				rw.WriteJSONWithChannel(msg)
-			},
-		})
+					rw.WriteJSONWithChannel(msg)
+				},
+				DeleteFunc: func(obj interface{}) {
+					secretObj, ok := obj.(*v1.Secret)
 
-		var wg sync.WaitGroup
-		var err error
+					if !ok {
+						errorchan <- fmt.Errorf("could not cast to secret")
+						return
+					}
 
-		wg.Add(1)
+					helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
 
-		go func() {
-			wg.Wait()
-			close(errorchan)
-		}()
+					if isNotHelmRelease && err == nil {
+						return
+					}
 
-		go func() {
-			// listens for websocket closing handshake
-			defer wg.Done()
+					if err != nil {
+						errorchan <- err
+						return
+					}
 
-			for {
-				if _, _, err := rw.ReadMessage(); err != nil {
-					errorchan <- nil
-					return
-				}
-			}
-		}()
+					msg := Message{
+						EventType: "DELETE",
+						Object:    helm_object,
+					}
 
-		go informer.Run(stopper)
+					rw.WriteJSONWithChannel(msg)
+				},
+			})
+
+			informer.Run(stopper)
+		}()
 
 		for err = range errorchan {
 			close(stopper)