소스 검색

ignore informer updates for other objects

sunguroku 5 년 전
부모
커밋
8c289fda4a

+ 102 - 23
dashboard/src/main/home/cluster-dashboard/chart/Chart.tsx

@@ -13,12 +13,14 @@ type PropsType = {
 type StateType = {
   expand: boolean,
   controllers: Record<string, boolean>,
+  websockets: Record<string, any>, 
 };
 
 export default class Chart extends Component<PropsType, StateType> {
   state = {
     expand: false,
     controllers: {} as Record<string, boolean>,
+    websockets : [] as any[],
   }
 
   renderIcon = () => {
@@ -38,29 +40,101 @@ export default class Chart extends Component<PropsType, StateType> {
     return `${time} on ${date}`;
   }
 
-  determineAvailability = (cs: any[]) => {
+  getAvailability = (kind: string, c: any) => {
+    switch (kind.toLowerCase()) {
+      case "deployment":
+      case "replicaset":
+        return (c.status.availableReplicas == c.status.replicas)
+      case "statefulset":
+       return (c.status.readyReplicas == c.status.replicas)
+      case "daemonset":
+        return (c.status.numberAvailable == c.status.desiredNumberScheduled)
+      }
+  }
+
+  setControllerStatus = (cs: any[]) => {
     let controllers = {} as Record<string, boolean>;
     cs.map((c) => {
-      switch (c.kind) {
-        case "Deployment":
-        case "ReplicaSet":
-          controllers[c.metadata.uid] = (c.status.availableReplicas == c.status.replicas)
-          break
-        case "StatefulSet":
-          controllers[c.metadata.uid] = (c.status.readyReplicas == c.status.replicas)
-          break
-        case "DaemonSet":
-          controllers[c.metadata.uid] = (c.status.numberAvailable == c.status.desiredNumberScheduled)
-          break
-        }
+      controllers[c.metadata.uid] = this.getAvailability(c.kind, c)
     })
     this.setState({ controllers })
   }
 
+  getChartStatus = (chartStatus: string) => {
+    if (chartStatus === 'deployed') {
+      for (var uid in this.state.controllers) {
+        if (!this.state.controllers[uid]) {
+          console.log(this.props.chart.name, uid)
+          return 'updating'
+        }
+      }
+      return 'deployed'
+    }
+    return chartStatus
+  }
+
+  setupWebsocket = async (uid: string, namespace: string, name: string, kind: string) => {
+    return new Promise((resolve, reject) => {
+      let { currentCluster, currentProject } = this.context;
+      let ws = new WebSocket(`ws://localhost:8080/api/projects/${currentProject.id}/k8s/${namespace}/${kind}/${name}/status?cluster_id=${currentCluster.id}&service_account_id=${currentCluster.service_account_id}`)
+      ws.onopen = () => {
+        console.log('connected to websocket')
+      }
+  
+      ws.onmessage = (evt: MessageEvent) => {
+        let event = JSON.parse(evt.data)
+        let object = event.Object
+        const { chart } = this.props;
+        if (!this.state.controllers[object.metadata.uid]) {
+          return;
+        }
+        this.setState({
+          controllers: {
+            ...this.state.controllers,
+            [object.metadata.uid]: this.getAvailability(event.Kind, object) 
+          }
+        })
+      }
+  
+      ws.onclose = () => {
+        console.log('closing websocket')
+      }
+  
+      ws.onerror = (err: ErrorEvent) => {
+        console.log(err)
+        ws.close()
+      }
+  
+      if (!this.state.websockets) {
+        reject("Cannot establish websocket connection for controllers.")
+      };
+  
+      this.setState({
+        websockets: {
+          ...this.state.websockets,
+          [uid]: ws
+        }
+      }, () => {
+        resolve()
+      })
+    })
+  }
+
+  setControllerWebsockets = (controllers: any[]) => {
+    let { setCurrentError } = this.context;
+    controllers.forEach(async (c: any) => {
+      this.setupWebsocket(c.metadata.uid, c.metadata.namespace || "default", 
+        c.metadata.name, c.kind)
+      .catch(setCurrentError)
+    })
+  }
+
   componentDidMount () {
-    let { currentCluster, currentProject } = this.context;
+    let { currentCluster, currentProject, setCurrentError } = this.context;
     const { chart } = this.props;
 
+    if (chart.info.status == 'failed') return; 
+
     api.getChartControllers('<token>', {
       namespace: chart.namespace,
       cluster_id: currentCluster.id,
@@ -71,20 +145,25 @@ export default class Chart extends Component<PropsType, StateType> {
       name: chart.name,
       revision: chart.version
     }, (err: any, res: any) => {
-      this.determineAvailability(res.data)
+      if (err) {
+        setCurrentError(JSON.stringify(err));
+        return
+      }
+      console.log(res.data)
+      this.setControllerStatus(res.data)
+      this.setControllerWebsockets(res.data)
     });
   }
 
-  getChartStatus = (chartStatus: string) => {
-    if (chartStatus === 'deployed') {
-      for (var uid in this.state.controllers) {
-        if (!this.state.controllers[uid]) {
-          return 'updating'
-        }
+  async componentWillUnmount () {
+    if (this.state.websockets) {
+      for (var uid in this.state.websockets) {
+        await new Promise(next => {
+          this.state.websockets[uid].close()
+          next()
+        })
       }
-      return 'deployed'
     }
-    return chartStatus
   }
 
   render() {

+ 2 - 2
dashboard/src/main/home/cluster-dashboard/chart/ChartList.tsx

@@ -67,7 +67,7 @@ export default class ChartList extends Component<PropsType, StateType> {
 
   setupWebsocket = () => {
     let { currentCluster, currentProject } = this.context;
-    let ws = new WebSocket(`ws://localhost:8080/api/projects/${currentProject.id}/k8s/deployment/status?cluster_id=${currentCluster.id}&service_account_id=${currentCluster.service_account_id}`)
+    let ws = new WebSocket(`ws://localhost:8080`)
 
     this.setState({ ws }, () => {
       if (!this.state.ws) return;
@@ -88,7 +88,7 @@ export default class ChartList extends Component<PropsType, StateType> {
 
   componentDidMount() {
     this.updateCharts();
-    this.setupWebsocket();
+    // this.setupWebsocket();
   }
 
   componentWillUnmount() {

+ 43 - 8
dashboard/src/main/home/cluster-dashboard/expanded-chart/log/LogSection.tsx

@@ -27,7 +27,7 @@ export default class LogSection extends Component<PropsType, StateType> {
   }
 
   renderLogs = () => {
-    return <Logs key={this.state.selectedPod.name} selectedPod={this.state.selectedPod} />
+    return <Logs key={this.state.selectedPod?.name} selectedPod={this.state.selectedPod} />
   }
 
   renderPodTabs = () => {
@@ -46,9 +46,26 @@ export default class LogSection extends Component<PropsType, StateType> {
     })
   }
 
+  renderLogSection = () => {
+    if (this.state.pods.length > 0) {
+      return (
+        <div>
+          <TabWrapper>
+            {this.renderPodTabs()}
+          </TabWrapper>
+          {this.renderLogs()}
+        </div>
+      )
+    } else {
+      return (
+        <NoPods> <i className="material-icons">category</i> No pods to display. </NoPods>
+      )
+    }
+  }
+
   componentDidMount() {
     const { selectors } = this.props;
-    let { currentCluster, currentProject } = this.context;
+    let { currentCluster, currentProject, setCurrenterror } = this.context;
 
     api.getMatchingPods('<token>', { 
       cluster_id: currentCluster.id,
@@ -57,10 +74,15 @@ export default class LogSection extends Component<PropsType, StateType> {
     }, {
       id: currentProject.id
     }, (err: any, res: any) => {
+      if (err) {
+        console.log(err)
+        setCurrenterror(JSON.stringify(err))
+        return
+      }
       let pods = res?.data?.map((pod: any) => {
         return {
-          namespace: pod.metadata.namespace, 
-          name: pod.metadata.name
+          namespace: pod?.metadata?.namespace, 
+          name: pod?.metadata?.name
         }
       })
       this.setState({ pods , selectedPod: pods[0]})
@@ -70,10 +92,7 @@ export default class LogSection extends Component<PropsType, StateType> {
   render() {
     return (
       <StyledLogSection>
-        <TabWrapper>
-          {this.renderPodTabs()}
-        </TabWrapper>
-        {this.renderLogs()}
+        {this.renderLogSection()}
       </StyledLogSection>
     );
   }
@@ -113,4 +132,20 @@ const StyledLogSection = styled.span`
   position: relative;
   padding: 0px;
   user-select: text;
+`;
+
+const NoPods = styled.div`
+  padding-top: 20%;
+  position: relative;
+  width: 100%;
+  display: flex;
+  justify-content: center;
+  align-items: center;
+  color: #ffffff44;
+  font-size: 14px;
+
+  > i {
+    font-size: 18px;
+    margin-right: 12px;
+  }
 `;

+ 34 - 22
internal/kubernetes/agent.go

@@ -5,6 +5,7 @@ import (
 	"context"
 	"fmt"
 	"io"
+	"strings"
 
 	"github.com/gorilla/websocket"
 	"github.com/porter-dev/porter/internal/helm/grapher"
@@ -25,8 +26,13 @@ type Agent struct {
 }
 
 type Message struct {
-	MessageType string
-	Object      interface{}
+	EventType string
+	Object    interface{}
+	Kind      string
+}
+
+type ListOptions struct {
+	FieldSelector string
 }
 
 // ListNamespaces simply lists namespaces
@@ -146,38 +152,44 @@ func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn)
 	}
 }
 
-// StreamDeploymentStatus streams deployment status.
-func (a *Agent) StreamDeploymentStatus(conn *websocket.Conn) error {
+// StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
+// TODO: Support Jobs
+func (a *Agent) StreamControllerStatus(conn *websocket.Conn, namespace string,
+	name string, kind string) error {
+	factory := informers.NewSharedInformerFactoryWithOptions(
+		a.Clientset,
+		10,
+		informers.WithNamespace(namespace),
+	)
+	var informer cache.SharedInformer
+
+	// Spins up an informer depending on kind. Convert to lowercase for robustness
+	switch strings.ToLower(kind) {
+	case "deployment":
+		informer = factory.Apps().V1().Deployments().Informer()
+	case "statefulset":
+		informer = factory.Apps().V1().StatefulSets().Informer()
+	case "replicaset":
+		informer = factory.Apps().V1().ReplicaSets().Informer()
+	case "daemonset":
+		informer = factory.Apps().V1().DaemonSets().Informer()
+	}
 
-	factory := informers.NewSharedInformerFactory(a.Clientset, 0)
-	informer := factory.Apps().V1().Deployments().Informer()
 	stopper := make(chan struct{})
 	errorchan := make(chan error)
 	defer close(errorchan)
 
 	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			d := obj.(*appsv1.Deployment)
-			fmt.Printf("adding deployment %s\n", d.Name)
-			fmt.Println(d.Status.Replicas == d.Status.AvailableReplicas)
-		},
 		UpdateFunc: func(oldObj, newObj interface{}) {
-			d := newObj.(*appsv1.Deployment)
-			fmt.Printf("updating deployment %s\n", d.Name)
-			fmt.Println(d.Status.Replicas == d.Status.AvailableReplicas)
-			fmt.Println(d.Status.Conditions[0].Message)
-		},
-		DeleteFunc: func(obj interface{}) {
-			d := obj.(*appsv1.Deployment)
 			msg := Message{
-				MessageType: "DELETION",
-				Object:      d,
+				EventType: "UPDATE",
+				Object:    newObj,
+				Kind:      strings.ToLower(kind),
 			}
 			if writeErr := conn.WriteJSON(msg); writeErr != nil {
 				errorchan <- writeErr
 				return
 			}
-			fmt.Printf("deleting deployment %s\n", d.Name)
 		},
 	})
 
@@ -187,7 +199,7 @@ func (a *Agent) StreamDeploymentStatus(conn *websocket.Conn) error {
 			if _, _, err := conn.ReadMessage(); err != nil {
 				defer conn.Close()
 				defer close(stopper)
-				defer fmt.Println("Successfully closed deployment status stream")
+				defer fmt.Println("Successfully closed controller status stream")
 				errorchan <- nil
 				return
 			}

+ 8 - 3
server/api/k8s_handler.go

@@ -210,9 +210,9 @@ func (app *App) HandleListPods(w http.ResponseWriter, r *http.Request) {
 	}
 }
 
-// HandleStreamDeployment test calls
+// HandleStreamControllerStatus test calls
 // TODO: Refactor repeated calls.
-func (app *App) HandleStreamDeployment(w http.ResponseWriter, r *http.Request) {
+func (app *App) HandleStreamControllerStatus(w http.ResponseWriter, r *http.Request) {
 
 	// get session to retrieve correct kubeconfig
 	_, err := app.store.Get(r, app.cookieName)
@@ -262,7 +262,12 @@ func (app *App) HandleStreamDeployment(w http.ResponseWriter, r *http.Request) {
 		app.handleErrorUpgradeWebsocket(err, w)
 	}
 
-	err = agent.StreamDeploymentStatus(conn)
+	// get path parameters
+	namespace := chi.URLParam(r, "namespace")
+	name := chi.URLParam(r, "name")
+	kind := chi.URLParam(r, "kind")
+
+	err = agent.StreamControllerStatus(conn, namespace, name, kind)
 
 	if err != nil {
 		app.handleErrorWebsocketWrite(err, w)

+ 2 - 2
server/router/router.go

@@ -304,10 +304,10 @@ func New(
 
 		r.Method(
 			"GET",
-			"/projects/{project_id}/k8s/deployment/status",
+			"/projects/{project_id}/k8s/{namespace}/{kind}/{name}/status",
 			auth.DoesUserHaveProjectAccess(
 				auth.DoesUserHaveServiceAccountAccess(
-					requestlog.NewHandler(a.HandleStreamDeployment, l),
+					requestlog.NewHandler(a.HandleStreamControllerStatus, l),
 					mw.URLParam,
 					mw.QueryParam,
 				),