2
0
Эх сурвалжийг харах

fix goroutine leaks from stream methods

Mohammed Nafees 4 жил өмнө
parent
commit
f1475c8ba1

+ 4 - 0
api/server/shared/websocket/response_writer.go

@@ -46,6 +46,10 @@ func (w *WebsocketSafeReadWriter) ReadMessage() (messageType int, p []byte, err
 	return w.conn.ReadMessage()
 }
 
+func (w *WebsocketSafeReadWriter) Close() error {
+	return w.conn.Close()
+}
+
 type WebsocketResponseWriter struct {
 	conn       *websocket.Conn
 	safeWriter *WebsocketSafeReadWriter

+ 37 - 19
internal/kubernetes/agent.go

@@ -606,8 +606,6 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 		return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
 	}
 
-	defer podLogs.Close()
-
 	r := bufio.NewReader(podLogs)
 	errorchan := make(chan error)
 
@@ -626,7 +624,6 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 		for {
 			if _, _, err := rw.ReadMessage(); err != nil {
 				errorchan <- nil
-				podLogs.Close()
 				return
 			}
 		}
@@ -651,12 +648,11 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 	}()
 
 	for err = range errorchan {
-		if err != nil {
-			return err
-		}
+		rw.Close()
+		podLogs.Close()
 	}
 
-	return nil
+	return err
 }
 
 // GetPodLogs streams real-time logs from a given pod.
@@ -844,7 +840,6 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 
 		stopper := make(chan struct{})
 		errorchan := make(chan error)
-		defer close(stopper)
 
 		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
 			if strings.HasSuffix(err.Error(), ": Unauthorized") {
@@ -879,8 +874,20 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 			},
 		})
 
+		var wg sync.WaitGroup
+		var err error
+
+		wg.Add(1)
+
+		go func() {
+			wg.Wait()
+			close(errorchan)
+		}()
+
 		go func() {
 			// listens for websocket closing handshake
+			defer wg.Done()
+
 			for {
 				if _, _, err := rw.ReadMessage(); err != nil {
 					errorchan <- nil
@@ -891,12 +898,12 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 
 		go informer.Run(stopper)
 
-		for {
-			select {
-			case err := <-errorchan:
-				return err
-			}
+		for err = range errorchan {
+			close(stopper)
+			rw.Close()
 		}
+
+		return err
 	}
 
 	return a.RunWebsocketTask(run)
@@ -988,7 +995,6 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 
 		stopper := make(chan struct{})
 		errorchan := make(chan error)
-		defer close(stopper)
 
 		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
 			if strings.HasSuffix(err.Error(), ": Unauthorized") {
@@ -1077,8 +1083,20 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 			},
 		})
 
+		var wg sync.WaitGroup
+		var err error
+
+		wg.Add(1)
+
+		go func() {
+			wg.Wait()
+			close(errorchan)
+		}()
+
 		go func() {
 			// listens for websocket closing handshake
+			defer wg.Done()
+
 			for {
 				if _, _, err := rw.ReadMessage(); err != nil {
 					errorchan <- nil
@@ -1089,12 +1107,12 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 
 		go informer.Run(stopper)
 
-		for {
-			select {
-			case err := <-errorchan:
-				return err
-			}
+		for err = range errorchan {
+			close(stopper)
+			rw.Close()
 		}
+
+		return err
 	}
 
 	return a.RunWebsocketTask(run)