Преглед изворни кода

move to stream status branch

sunguroku пре 5 година
родитељ
комит
3dbbccc74b

+ 2 - 0
dashboard/src/main/home/cluster-dashboard/chart/Chart.tsx

@@ -37,6 +37,8 @@ export default class Chart extends Component<PropsType, StateType> {
   render() {
     let { chart, setCurrentChart } = this.props;
 
+    console.log(chart)
+
     return ( 
       <StyledChart
         onMouseEnter={() => this.setState({ expand: true })}

+ 2 - 1
dashboard/src/main/home/cluster-dashboard/chart/ChartList.tsx

@@ -45,7 +45,8 @@ export default class ChartList extends Component<PropsType, StateType> {
       limit: 20,
       skip: 0,
       byDate: false,
-      statusFilter: ['deployed']
+      statusFilter: ['deployed', 'uninstalled', 'pending', 'pending_upgrade',
+        'pending_rollback','superseded','failed']
     }, { id: currentProject.id }, (err: any, res: any) => {
         if (err) {
         console.log(err)

+ 1 - 0
dashboard/src/main/home/cluster-dashboard/expanded-chart/log/Logs.tsx

@@ -31,6 +31,7 @@ export default class Logs extends Component<PropsType, StateType> {
   componentDidMount() {
     let { currentCluster, currentProject } = this.context;
     let ws = new WebSocket(`ws://localhost:8080/api/projects/${currentProject.id}/k8s/default/pod/${this.props.selectedPod}/logs?cluster_id=${currentCluster.id}&service_account_id=${currentCluster.service_account_id}`)
+    // let ws = new WebSocket(`ws://localhost:8080/api/projects/${currentProject.id}/k8s/deployment/status?cluster_id=${currentCluster.id}&service_account_id=${currentCluster.service_account_id}`)
 
     ws.onopen = () => {
       console.log('connected to websocket')

+ 37 - 2
internal/kubernetes/agent.go

@@ -7,10 +7,13 @@ import (
 	"io"
 
 	"github.com/gorilla/websocket"
+	appsv1 "k8s.io/api/apps/v1"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/cli-runtime/pkg/genericclioptions"
+	"k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/cache"
 )
 
 // Agent is a Kubernetes agent for performing operations that interact with the
@@ -52,12 +55,11 @@ func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn)
 	if err != nil {
 		return fmt.Errorf("Cannot open log stream for pod %s", name)
 	}
-	defer podLogs.Close()
 
 	r := bufio.NewReader(podLogs)
 	for {
 		bytes, err := r.ReadBytes('\n')
-
+		fmt.Println(bytes)
 		if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
 			return writeErr
 		}
@@ -70,3 +72,36 @@ func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn)
 		}
 	}
 }
+
+// StreamDeploymentStatus streams deployment status.
+func (a *Agent) StreamDeploymentStatus(conn *websocket.Conn) error {
+	fmt.Println("===========================streaming dep status============================")
+
+	factory := informers.NewSharedInformerFactory(a.Clientset, 0)
+	informer := factory.Apps().V1().Deployments().Informer()
+	stopper := make(chan struct{})
+	defer close(stopper)
+	defer fmt.Println("closing...")
+
+	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+		AddFunc: func(obj interface{}) {
+			d := obj.(*appsv1.Deployment)
+			fmt.Printf("adding deployment %s\n", d.Name)
+			fmt.Println(d.Status.Replicas == d.Status.AvailableReplicas)
+		},
+		UpdateFunc: func(oldObj, newObj interface{}) {
+			d := newObj.(*appsv1.Deployment)
+			fmt.Printf("updating deployment %s\n", d.Name)
+			fmt.Println(d.Status.Replicas == d.Status.AvailableReplicas)
+			fmt.Println(d.Status.Conditions[0].Message)
+		},
+		DeleteFunc: func(obj interface{}) {
+			d := obj.(*appsv1.Deployment)
+			fmt.Printf("deleting deployment %s\n", d.Name)
+			fmt.Println(d.Status.Replicas == d.Status.AvailableReplicas)
+		},
+	})
+
+	informer.Run(stopper)
+	return nil
+}

+ 60 - 0
server/api/k8s_handler.go

@@ -208,3 +208,63 @@ func (app *App) HandleListPods(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 }
+
+// HandleStreamDeployment test calls
+// TODO: Refactor repeated calls.
+func (app *App) HandleStreamDeployment(w http.ResponseWriter, r *http.Request) {
+
+	// get session to retrieve correct kubeconfig
+	_, err := app.store.Get(r, app.cookieName)
+
+	if err != nil {
+		app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
+		return
+	}
+
+	vals, err := url.ParseQuery(r.URL.RawQuery)
+
+	if err != nil {
+		app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
+		return
+	}
+
+	// get the filter options
+	form := &forms.K8sForm{
+		OutOfClusterConfig: &kubernetes.OutOfClusterConfig{
+			UpdateTokenCache: app.updateTokenCache,
+		},
+	}
+
+	form.PopulateK8sOptionsFromQueryParams(vals, app.repo.ServiceAccount)
+
+	// validate the form
+	if err := app.validator.Struct(form); err != nil {
+		app.handleErrorFormValidation(err, ErrK8sValidate, w)
+		return
+	}
+
+	// create a new agent
+	var agent *kubernetes.Agent
+
+	if app.testing {
+		agent = app.TestAgents.K8sAgent
+	} else {
+		agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
+	}
+
+	upgrader.CheckOrigin = func(r *http.Request) bool { return true }
+
+	// upgrade to websocket.
+	conn, err := upgrader.Upgrade(w, r, nil)
+
+	if err != nil {
+		app.handleErrorUpgradeWebsocket(err, w)
+	}
+
+	err = agent.StreamDeploymentStatus(conn)
+
+	if err != nil {
+		app.handleErrorWebsocketWrite(err, w)
+		return
+	}
+}

+ 14 - 0
server/router/router.go

@@ -288,6 +288,20 @@ func New(
 			),
 		)
 
+		r.Method(
+			"GET",
+			"/projects/{project_id}/k8s/deployment/status",
+			auth.DoesUserHaveProjectAccess(
+				auth.DoesUserHaveServiceAccountAccess(
+					requestlog.NewHandler(a.HandleStreamDeployment, l),
+					mw.URLParam,
+					mw.QueryParam,
+				),
+				mw.URLParam,
+				mw.ReadAccess,
+			),
+		)
+
 		r.Method(
 			"GET",
 			"/projects/{project_id}/k8s/pods",