ソースを参照

add mutex for preventing synchronous ws writes, remove duplicate err informer

Alexander Belanger 4 年 前
コミット
d9b645362a

+ 7 - 1
api/server/shared/websocket/response_writer.go

@@ -3,6 +3,7 @@ package websocket
 import (
 	"errors"
 	"net/http"
+	"sync"
 	"syscall"
 
 	"github.com/gorilla/websocket"
@@ -10,9 +11,12 @@ import (
 
 type WebsocketSafeReadWriter struct {
 	conn *websocket.Conn
+	mu   sync.Mutex
 }
 
-func (w *WebsocketSafeReadWriter) WriteJSONWithChannel(v interface{}) error {
+func (w *WebsocketSafeReadWriter) WriteJSON(v interface{}) error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
 	err := w.conn.WriteJSON(v)
 
 	if err != nil {
@@ -29,6 +33,8 @@ func (w *WebsocketSafeReadWriter) WriteJSONWithChannel(v interface{}) error {
 }
 
 func (w *WebsocketSafeReadWriter) Write(data []byte) (int, error) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
 	err := w.conn.WriteMessage(websocket.TextMessage, data)
 
 	if err != nil {

+ 4 - 1
api/server/shared/websocket/upgrader.go

@@ -27,7 +27,10 @@ func (u *Upgrader) Upgrade(
 
 	conn, err := u.WSUpgrader.Upgrade(w, r, responseHeader)
 
-	safeWriter := &WebsocketSafeReadWriter{conn}
+	safeWriter := &WebsocketSafeReadWriter{
+		conn: conn,
+	}
+
 	rw := &WebsocketResponseWriter{conn, safeWriter}
 
 	return conn, rw, safeWriter, err

+ 6 - 12
internal/kubernetes/agent.go

@@ -880,7 +880,7 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 						Object:    newObj,
 						Kind:      strings.ToLower(kind),
 					}
-					err := rw.WriteJSONWithChannel(msg)
+					err := rw.WriteJSON(msg)
 
 					if err != nil {
 						errorchan <- err
@@ -893,7 +893,7 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 						Kind:      strings.ToLower(kind),
 					}
 
-					err := rw.WriteJSONWithChannel(msg)
+					err := rw.WriteJSON(msg)
 
 					if err != nil {
 						errorchan <- err
@@ -906,7 +906,7 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 						Kind:      strings.ToLower(kind),
 					}
 
-					err := rw.WriteJSONWithChannel(msg)
+					err := rw.WriteJSON(msg)
 
 					if err != nil {
 						errorchan <- err
@@ -1049,12 +1049,6 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 				}
 			})
 
-			informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
-				if strings.HasSuffix(err.Error(), ": Unauthorized") {
-					errorchan <- &AuthError{}
-				}
-			})
-
 			informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
 				UpdateFunc: func(oldObj, newObj interface{}) {
 					secretObj, ok := newObj.(*v1.Secret)
@@ -1080,7 +1074,7 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 						Object:    helm_object,
 					}
 
-					rw.WriteJSONWithChannel(msg)
+					rw.WriteJSON(msg)
 				},
 				AddFunc: func(obj interface{}) {
 					secretObj, ok := obj.(*v1.Secret)
@@ -1106,7 +1100,7 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 						Object:    helm_object,
 					}
 
-					rw.WriteJSONWithChannel(msg)
+					rw.WriteJSON(msg)
 				},
 				DeleteFunc: func(obj interface{}) {
 					secretObj, ok := obj.(*v1.Secret)
@@ -1132,7 +1126,7 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 						Object:    helm_object,
 					}
 
-					rw.WriteJSONWithChannel(msg)
+					rw.WriteJSON(msg)
 				},
 			})
 

+ 1 - 1
internal/kubernetes/provisioner/resource_stream.go

@@ -40,7 +40,7 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 			messages := xstream[0].Messages
 			lastID = messages[len(messages)-1].ID
 
-			rw.WriteJSONWithChannel(messages)
+			rw.WriteJSON(messages)
 		}
 	}()