Преглед изворни кода

Merge branch 'master' into clean-implement-rds-endpoints

Alexander Belanger пре 4 година
родитељ
комит
3963dc05fe

+ 16 - 4
api/server/shared/websocket/response_writer.go

@@ -3,6 +3,7 @@ package websocket
 import (
 	"errors"
 	"net/http"
+	"sync"
 	"syscall"
 
 	"github.com/gorilla/websocket"
@@ -10,23 +11,30 @@ import (
 
 type WebsocketSafeReadWriter struct {
 	conn *websocket.Conn
+	mu   sync.Mutex
 }
 
-func (w *WebsocketSafeReadWriter) WriteJSONWithChannel(v interface{}, errorChan chan<- error) {
+func (w *WebsocketSafeReadWriter) WriteJSON(v interface{}) error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
 	err := w.conn.WriteJSON(v)
 
 	if err != nil {
 		if errOr(err, websocket.ErrCloseSent, syscall.EPIPE, syscall.ECONNRESET) {
 			// if close has been sent, or error is broken pipe error or connection reset, we want to
 			// send a message to the error channel to ensure closure but we ignore the error
-			errorChan <- nil
-		} else if err != nil {
-			errorChan <- err
+			return nil
 		}
+
+		return err
 	}
+
+	return nil
 }
 
 func (w *WebsocketSafeReadWriter) Write(data []byte) (int, error) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
 	err := w.conn.WriteMessage(websocket.TextMessage, data)
 
 	if err != nil {
@@ -46,6 +54,10 @@ func (w *WebsocketSafeReadWriter) ReadMessage() (messageType int, p []byte, err
 	return w.conn.ReadMessage()
 }
 
+func (w *WebsocketSafeReadWriter) Close() error {
+	return w.conn.Close()
+}
+
 type WebsocketResponseWriter struct {
 	conn       *websocket.Conn
 	safeWriter *WebsocketSafeReadWriter

+ 4 - 1
api/server/shared/websocket/upgrader.go

@@ -27,7 +27,10 @@ func (u *Upgrader) Upgrade(
 
 	conn, err := u.WSUpgrader.Upgrade(w, r, responseHeader)
 
-	safeWriter := &WebsocketSafeReadWriter{conn}
+	safeWriter := &WebsocketSafeReadWriter{
+		conn: conn,
+	}
+
 	rw := &WebsocketResponseWriter{conn, safeWriter}
 
 	return conn, rw, safeWriter, err

+ 232 - 130
internal/kubernetes/agent.go

@@ -12,6 +12,7 @@ import (
 	"io/ioutil"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	goerrors "errors"
@@ -952,13 +953,29 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 		return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
 	}
 
-	defer podLogs.Close()
-
 	r := bufio.NewReader(podLogs)
 	errorchan := make(chan error)
 
+	var wg sync.WaitGroup
+	var once sync.Once
+	wg.Add(2)
+
+	go func() {
+		wg.Wait()
+		close(errorchan)
+	}()
+
 	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				// TODO: add method to alert on panic
+				return
+			}
+		}()
+
 		// listens for websocket closing handshake
+		defer wg.Done()
+
 		for {
 			if _, _, err := rw.ReadMessage(); err != nil {
 				errorchan <- nil
@@ -968,11 +985,20 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 	}()
 
 	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				// TODO: add method to alert on panic
+				return
+			}
+		}()
+
+		defer wg.Done()
+
 		for {
 			bytes, err := r.ReadBytes('\n')
 
-			if err == io.EOF {
-				errorchan <- nil
+			if err != nil {
+				errorchan <- err
 				return
 			}
 
@@ -980,22 +1006,18 @@ func (a *Agent) GetPodLogs(namespace string, name string, selectedContainer stri
 				errorchan <- writeErr
 				return
 			}
-
-			select {
-			case <-errorchan:
-				return
-			default:
-			}
 		}
 	}()
 
-	for {
-		select {
-		case err = <-errorchan:
-			close(errorchan)
-			return err
-		}
+	for err = range errorchan {
+		// only call these methods a single time
+		once.Do(func() {
+			rw.Close()
+			podLogs.Close()
+		})
 	}
+
+	return err
 }
 
 // GetPodLogs streams real-time logs from a given pod.
@@ -1183,43 +1205,29 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 
 		stopper := make(chan struct{})
 		errorchan := make(chan error)
-		defer close(stopper)
 
-		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
-			if strings.HasSuffix(err.Error(), ": Unauthorized") {
-				errorchan <- &AuthError{}
-			}
-		})
+		var wg sync.WaitGroup
+		var once sync.Once
+		var err error
 
-		informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
-			UpdateFunc: func(oldObj, newObj interface{}) {
-				msg := Message{
-					EventType: "UPDATE",
-					Object:    newObj,
-					Kind:      strings.ToLower(kind),
-				}
-				rw.WriteJSONWithChannel(msg, errorchan)
-			},
-			AddFunc: func(obj interface{}) {
-				msg := Message{
-					EventType: "ADD",
-					Object:    obj,
-					Kind:      strings.ToLower(kind),
-				}
-				rw.WriteJSONWithChannel(msg, errorchan)
-			},
-			DeleteFunc: func(obj interface{}) {
-				msg := Message{
-					EventType: "DELETE",
-					Object:    obj,
-					Kind:      strings.ToLower(kind),
-				}
-				rw.WriteJSONWithChannel(msg, errorchan)
-			},
-		})
+		wg.Add(2)
 
 		go func() {
+			wg.Wait()
+			close(errorchan)
+		}()
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
 			// listens for websocket closing handshake
+			defer wg.Done()
+
 			for {
 				if _, _, err := rw.ReadMessage(); err != nil {
 					errorchan <- nil
@@ -1228,14 +1236,75 @@ func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websoc
 			}
 		}()
 
-		go informer.Run(stopper)
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
 
-		for {
-			select {
-			case err := <-errorchan:
-				return err
-			}
+			// listens for websocket closing handshake
+			defer wg.Done()
+
+			informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
+				if strings.HasSuffix(err.Error(), ": Unauthorized") {
+					errorchan <- &AuthError{}
+				}
+			})
+
+			informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+				UpdateFunc: func(oldObj, newObj interface{}) {
+					msg := Message{
+						EventType: "UPDATE",
+						Object:    newObj,
+						Kind:      strings.ToLower(kind),
+					}
+					err := rw.WriteJSON(msg)
+
+					if err != nil {
+						errorchan <- err
+					}
+				},
+				AddFunc: func(obj interface{}) {
+					msg := Message{
+						EventType: "ADD",
+						Object:    obj,
+						Kind:      strings.ToLower(kind),
+					}
+
+					err := rw.WriteJSON(msg)
+
+					if err != nil {
+						errorchan <- err
+					}
+				},
+				DeleteFunc: func(obj interface{}) {
+					msg := Message{
+						EventType: "DELETE",
+						Object:    obj,
+						Kind:      strings.ToLower(kind),
+					}
+
+					err := rw.WriteJSON(msg)
+
+					if err != nil {
+						errorchan <- err
+					}
+				},
+			})
+
+			informer.Run(stopper)
+		}()
+
+		for err = range errorchan {
+			once.Do(func() {
+				close(stopper)
+				rw.Close()
+			})
 		}
+
+		return err
 	}
 
 	return a.RunWebsocketTask(run)
@@ -1327,113 +1396,146 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 
 		stopper := make(chan struct{})
 		errorchan := make(chan error)
-		defer close(stopper)
 
-		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
-			if strings.HasSuffix(err.Error(), ": Unauthorized") {
-				errorchan <- &AuthError{}
-			}
-		})
+		var wg sync.WaitGroup
+		var once sync.Once
+		var err error
 
-		informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
-			UpdateFunc: func(oldObj, newObj interface{}) {
-				secretObj, ok := newObj.(*v1.Secret)
+		wg.Add(2)
 
-				if !ok {
-					errorchan <- fmt.Errorf("could not cast to secret")
+		go func() {
+			wg.Wait()
+			close(errorchan)
+		}()
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
 					return
 				}
+			}()
 
-				helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
+			// listens for websocket closing handshake
+			defer wg.Done()
 
-				if isNotHelmRelease && err == nil {
+			for {
+				if _, _, err := rw.ReadMessage(); err != nil {
+					errorchan <- nil
 					return
 				}
+			}
+		}()
 
-				if err != nil {
-					errorchan <- err
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
 					return
 				}
+			}()
+
+			// listens for websocket closing handshake
+			defer wg.Done()
 
-				msg := Message{
-					EventType: "UPDATE",
-					Object:    helm_object,
+			informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
+				if strings.HasSuffix(err.Error(), ": Unauthorized") {
+					errorchan <- &AuthError{}
 				}
+			})
 
-				rw.WriteJSONWithChannel(msg, errorchan)
-			},
-			AddFunc: func(obj interface{}) {
-				secretObj, ok := obj.(*v1.Secret)
+			informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+				UpdateFunc: func(oldObj, newObj interface{}) {
+					secretObj, ok := newObj.(*v1.Secret)
 
-				if !ok {
-					errorchan <- fmt.Errorf("could not cast to secret")
-					return
-				}
+					if !ok {
+						errorchan <- fmt.Errorf("could not cast to secret")
+						return
+					}
 
-				helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
+					helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
 
-				if isNotHelmRelease && err == nil {
-					return
-				}
+					if isNotHelmRelease && err == nil {
+						return
+					}
 
-				if err != nil {
-					errorchan <- err
-					return
-				}
+					if err != nil {
+						errorchan <- err
+						return
+					}
 
-				msg := Message{
-					EventType: "ADD",
-					Object:    helm_object,
-				}
+					msg := Message{
+						EventType: "UPDATE",
+						Object:    helm_object,
+					}
 
-				rw.WriteJSONWithChannel(msg, errorchan)
-			},
-			DeleteFunc: func(obj interface{}) {
-				secretObj, ok := obj.(*v1.Secret)
+					rw.WriteJSON(msg)
+				},
+				AddFunc: func(obj interface{}) {
+					secretObj, ok := obj.(*v1.Secret)
 
-				if !ok {
-					errorchan <- fmt.Errorf("could not cast to secret")
-					return
-				}
+					if !ok {
+						errorchan <- fmt.Errorf("could not cast to secret")
+						return
+					}
 
-				helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
+					helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
 
-				if isNotHelmRelease && err == nil {
-					return
-				}
+					if isNotHelmRelease && err == nil {
+						return
+					}
 
-				if err != nil {
-					errorchan <- err
-					return
-				}
+					if err != nil {
+						errorchan <- err
+						return
+					}
 
-				msg := Message{
-					EventType: "DELETE",
-					Object:    helm_object,
-				}
+					msg := Message{
+						EventType: "ADD",
+						Object:    helm_object,
+					}
 
-				rw.WriteJSONWithChannel(msg, errorchan)
-			},
-		})
+					rw.WriteJSON(msg)
+				},
+				DeleteFunc: func(obj interface{}) {
+					secretObj, ok := obj.(*v1.Secret)
 
-		go func() {
-			// listens for websocket closing handshake
-			for {
-				if _, _, err := rw.ReadMessage(); err != nil {
-					errorchan <- nil
-					return
-				}
-			}
-		}()
+					if !ok {
+						errorchan <- fmt.Errorf("could not cast to secret")
+						return
+					}
 
-		go informer.Run(stopper)
+					helm_object, isNotHelmRelease, err := ParseSecretToHelmRelease(*secretObj, chartList)
 
-		for {
-			select {
-			case err := <-errorchan:
-				return err
-			}
+					if isNotHelmRelease && err == nil {
+						return
+					}
+
+					if err != nil {
+						errorchan <- err
+						return
+					}
+
+					msg := Message{
+						EventType: "DELETE",
+						Object:    helm_object,
+					}
+
+					rw.WriteJSON(msg)
+				},
+			})
+
+			informer.Run(stopper)
+		}()
+
+		for err = range errorchan {
+			once.Do(func() {
+				close(stopper)
+				rw.Close()
+			})
 		}
+
+		return err
 	}
 
 	return a.RunWebsocketTask(run)

+ 15 - 1
internal/redis_stream/resource_stream.go

@@ -12,6 +12,13 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 	errorchan := make(chan error)
 
 	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				// TODO: add method to alert on panic
+				return
+			}
+		}()
+
 		// listens for websocket closing handshake
 		for {
 			if _, _, err := rw.ReadMessage(); err != nil {
@@ -22,6 +29,13 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 	}()
 
 	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				// TODO: add method to alert on panic
+				return
+			}
+		}()
+
 		lastID := "0-0"
 
 		for {
@@ -40,7 +54,7 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 			messages := xstream[0].Messages
 			lastID = messages[len(messages)-1].ID
 
-			rw.WriteJSONWithChannel(messages, errorchan)
+			rw.WriteJSON(messages)
 		}
 	}()