Pārlūkot izejas kodu

add casing to only close channels once and recover from goroutine panics

Alexander Belanger 4 gadi atpakaļ
vecāks
revīzija
ebdeecb97e

+ 58 - 8
internal/kubernetes/agent.go

@@ -610,6 +610,7 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 	errorchan := make(chan error)
 
 	var wg sync.WaitGroup
+	var once sync.Once
 	wg.Add(2)
 
 	go func() {
@@ -618,6 +619,13 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 	}()
 
 	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				// TODO: add method to alert on panic
+				return
+			}
+		}()
+
 		// listens for websocket closing handshake
 		defer wg.Done()
 
@@ -630,6 +638,13 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 	}()
 
 	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				// TODO: add method to alert on panic
+				return
+			}
+		}()
+
 		defer wg.Done()
 
 		for {
@@ -648,8 +663,11 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 	}()
 
 	for err = range errorchan {
-		rw.Close()
-		podLogs.Close()
+		// only call these methods a single time
+		once.Do(func() {
+			rw.Close()
+			podLogs.Close()
+		})
 	}
 
 	return err
@@ -842,6 +860,7 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 		errorchan := make(chan error)
 
 		var wg sync.WaitGroup
+		var once sync.Once
 		var err error
 
 		wg.Add(2)
@@ -852,6 +871,13 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 		}()
 
 		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
 			// listens for websocket closing handshake
 			defer wg.Done()
 
@@ -864,6 +890,13 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 		}()
 
 		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
 			// listens for websocket closing handshake
 			defer wg.Done()
 
@@ -918,8 +951,10 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 		}()
 
 		for err = range errorchan {
-			close(stopper)
-			rw.Close()
+			once.Do(func() {
+				close(stopper)
+				rw.Close()
+			})
 		}
 
 		return err
@@ -1016,6 +1051,7 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 		errorchan := make(chan error)
 
 		var wg sync.WaitGroup
+		var once sync.Once
 		var err error
 
 		wg.Add(2)
@@ -1026,6 +1062,13 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 		}()
 
 		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
 			// listens for websocket closing handshake
 			defer wg.Done()
 
@@ -1037,9 +1080,14 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 			}
 		}()
 
-		go informer.Run(stopper)
-
 		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
 			// listens for websocket closing handshake
 			defer wg.Done()
 
@@ -1134,8 +1182,10 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 		}()
 
 		for err = range errorchan {
-			close(stopper)
-			rw.Close()
+			once.Do(func() {
+				close(stopper)
+				rw.Close()
+			})
 		}
 
 		return err

+ 14 - 0
internal/kubernetes/provisioner/resource_stream.go

@@ -12,6 +12,13 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 	errorchan := make(chan error)
 
 	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				// TODO: add method to alert on panic
+				return
+			}
+		}()
+
 		// listens for websocket closing handshake
 		for {
 			if _, _, err := rw.ReadMessage(); err != nil {
@@ -22,6 +29,13 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 	}()
 
 	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				// TODO: add method to alert on panic
+				return
+			}
+		}()
+
 		lastID := "0-0"
 
 		for {