Ivan Galakhov 4 лет назад
Родитель
Сommit
df393caf45
2 измененных файлов с 46 добавлено и 22 удалено
  1. 9 2
      internal/kubernetes/agent.go
  2. 37 20
      server/api/k8s_handler.go

+ 9 - 2
internal/kubernetes/agent.go

@@ -69,6 +69,12 @@ type ListOptions struct {
 	FieldSelector string
 }
 
+type AuthError struct{}
+
+func (e *AuthError) Error() string {
+	return "Unauthorized error"
+}
+
 // CreateConfigMap creates the configmap given the key-value pairs and namespace
 func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
 	return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
@@ -577,8 +583,9 @@ func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string, select
 	defer close(stopper)
 
 	informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
-		fmt.Println("ERROR ERROR ERROR")
-		fmt.Println(err)
+		if strings.HasSuffix(err.Error(), ": Unauthorized") {
+			errorchan <- &AuthError{}
+		}
 	})
 
 	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{

+ 37 - 20
server/api/k8s_handler.go

@@ -2,11 +2,8 @@ package api
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
-	"net/http"
-	"net/url"
-	"strconv"
-
 	"github.com/go-chi/chi"
 	"github.com/gorilla/schema"
 	"github.com/gorilla/websocket"
@@ -16,6 +13,10 @@ import (
 	"github.com/porter-dev/porter/internal/kubernetes/prometheus"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/client-go/tools/clientcmd"
+	"net/http"
+	"net/url"
+	"strconv"
+	"time"
 )
 
 // Enumeration of k8s API error codes, represented as int64
@@ -1134,13 +1135,12 @@ func (app *App) HandleStreamControllerStatus(w http.ResponseWriter, r *http.Requ
 		return
 	}
 
-	// create a new agent
-	var agent *kubernetes.Agent
+	// get path parameters
+	kind := chi.URLParam(r, "kind")
 
-	if app.ServerConf.IsTesting {
-		agent = app.TestAgents.K8sAgent
-	} else {
-		agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
+	selectors := ""
+	if vals["selectors"] != nil {
+		selectors = vals["selectors"][0]
 	}
 
 	upgrader.CheckOrigin = func(r *http.Request) bool { return true }
@@ -1152,18 +1152,35 @@ func (app *App) HandleStreamControllerStatus(w http.ResponseWriter, r *http.Requ
 		app.handleErrorUpgradeWebsocket(err, w)
 	}
 
-	// get path parameters
-	kind := chi.URLParam(r, "kind")
+	lastTime := int64(0)
 
-	selectors := ""
-	if vals["selectors"] != nil {
-		selectors = vals["selectors"][0]
-	}
-	err = agent.StreamControllerStatus(conn, kind, selectors)
+	for {
+		// create a new agent
+		var agent *kubernetes.Agent
 
-	if err != nil {
-		app.handleErrorWebsocketWrite(err, w)
-		return
+		if app.ServerConf.IsTesting {
+			agent = app.TestAgents.K8sAgent
+		} else {
+			agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
+		}
+
+		err = agent.StreamControllerStatus(conn, kind, selectors)
+
+		if err == nil {
+			return
+		}
+
+		if !errors.Is(err, &kubernetes.AuthError{}) {
+			app.handleErrorWebsocketWrite(err, w)
+			return
+		}
+
+		if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
+			app.handleErrorWebsocketWrite(err, w)
+			return
+		}
+
+		lastTime = time.Now().Unix()
 	}
 }