Jelajahi Sumber

Merge pull request #1655 from porter-dev/belanger/hotfix-websocket-panic

Fix send on closed channel errors from websockets
abelanger5 4 tahun lalu
induk
melakukan
db16c2a057

+ 6 - 4
api/server/shared/websocket/response_writer.go

@@ -12,18 +12,20 @@ type WebsocketSafeReadWriter struct {
 	conn *websocket.Conn
 }
 
-func (w *WebsocketSafeReadWriter) WriteJSONWithChannel(v interface{}, errorChan chan<- error) {
+func (w *WebsocketSafeReadWriter) WriteJSONWithChannel(v interface{}) error {
 	err := w.conn.WriteJSON(v)
 
 	if err != nil {
 		if errOr(err, websocket.ErrCloseSent, syscall.EPIPE, syscall.ECONNRESET) {
 			// if close has been sent, or error is broken pipe error or connection reset, we want to
 			// send a message to the error channel to ensure closure but we ignore the error
-			errorChan <- nil
-		} else if err != nil {
-			errorChan <- err
+			return nil
 		}
+
+		return err
 	}
+
+	return nil
 }
 
 func (w *WebsocketSafeReadWriter) Write(data []byte) (int, error) {

+ 6 - 6
internal/kubernetes/agent.go

@@ -854,7 +854,7 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 					Object:    newObj,
 					Kind:      strings.ToLower(kind),
 				}
-				rw.WriteJSONWithChannel(msg, errorchan)
+				rw.WriteJSONWithChannel(msg)
 			},
 			AddFunc: func(obj interface{}) {
 				msg := Message{
@@ -862,7 +862,7 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 					Object:    obj,
 					Kind:      strings.ToLower(kind),
 				}
-				rw.WriteJSONWithChannel(msg, errorchan)
+				rw.WriteJSONWithChannel(msg)
 			},
 			DeleteFunc: func(obj interface{}) {
 				msg := Message{
@@ -870,7 +870,7 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 					Object:    obj,
 					Kind:      strings.ToLower(kind),
 				}
-				rw.WriteJSONWithChannel(msg, errorchan)
+				rw.WriteJSONWithChannel(msg)
 			},
 		})
 
@@ -1027,7 +1027,7 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 					Object:    helm_object,
 				}
 
-				rw.WriteJSONWithChannel(msg, errorchan)
+				rw.WriteJSONWithChannel(msg)
 			},
 			AddFunc: func(obj interface{}) {
 				secretObj, ok := obj.(*v1.Secret)
@@ -1053,7 +1053,7 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 					Object:    helm_object,
 				}
 
-				rw.WriteJSONWithChannel(msg, errorchan)
+				rw.WriteJSONWithChannel(msg)
 			},
 			DeleteFunc: func(obj interface{}) {
 				secretObj, ok := obj.(*v1.Secret)
@@ -1079,7 +1079,7 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 					Object:    helm_object,
 				}
 
-				rw.WriteJSONWithChannel(msg, errorchan)
+				rw.WriteJSONWithChannel(msg)
 			},
 		})
 

+ 1 - 1
internal/kubernetes/provisioner/resource_stream.go

@@ -40,7 +40,7 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 			messages := xstream[0].Messages
 			lastID = messages[len(messages)-1].ID
 
-			rw.WriteJSONWithChannel(messages, errorchan)
+			rw.WriteJSONWithChannel(messages)
 		}
 	}()