Explorar el Código

pod logs backend with websockets

sunguroku hace 5 años
padre
commit
77c20d7bcd

+ 0 - 1
dashboard/src/main/home/dashboard/expanded-chart/ExpandedChart.tsx

@@ -126,7 +126,6 @@ export default class ExpandedChart extends Component<PropsType, StateType> {
     if (this.state.revisionPreview) {
       chart = this.state.revisionPreview;
     }
-    console.log(chart)
     
     if (this.state.currentTab === 'graph') {
       return (

+ 21 - 1
dashboard/src/main/home/dashboard/expanded-chart/LogSection.tsx

@@ -1,25 +1,45 @@
 import React, { Component } from 'react';
 import styled from 'styled-components';
+import api from '../../../../shared/api';
+import { Context } from '../../../../shared/Context';
 
 type PropsType = {
 };
 
 type StateType = {
+  logs: string
 };
 
 export default class LogSection extends Component<PropsType, StateType> {
   state = {
+    logs: ""
+  }
+
+  componentDidMount() {
+    const { currentCluster } = this.context;
+
+    api.getPodLogs('<token>', { context: currentCluster }, {}, (err: any, res: any) => {
+      if (err) {
+        this.setState({logs: "ERROR"})
+        // this.setState({ namespaceOptions: [{ label: 'All', value: '' }] });
+      } else {
+        this.setState({logs: res.data});
+      }
+    });
   }
 
   render() {
     return (
       <StyledLogSection>
-        (Logs unimplemented)
+        {this.state.logs}
       </StyledLogSection>
     );
   }
 }
 
+LogSection.contextType = Context;
+
+
 const StyledLogSection = styled.div`
   width: 100%;
   height: 100%;

+ 5 - 0
dashboard/src/shared/api.tsx

@@ -70,6 +70,10 @@ const getNamespaces = baseApi<{
   context: string
 }>('GET', '/api/k8s/namespaces');
 
+const getPodLogs = baseApi<{
+  context: string
+}>('GET', '/api/k8s/pods/logs');
+
 const getRevisions = baseApi<{
   namespace: string,
   context: string,
@@ -109,6 +113,7 @@ export default {
   getChart,
   getChartComponents,
   getNamespaces,
+  getPodLogs,
   getRevisions,
   rollbackChart,
   upgradeChartValues

+ 1 - 0
go.mod

@@ -25,6 +25,7 @@ require (
 	github.com/google/go-cmp v0.5.1
 	github.com/gorilla/securecookie v1.1.1
 	github.com/gorilla/sessions v1.2.1
+	github.com/gorilla/websocket v1.4.2
 	github.com/imdario/mergo v0.3.11 // indirect
 	github.com/jinzhu/gorm v1.9.16
 	github.com/joeshaw/envdecode v0.0.0-20200121155833-099f1fc765bd

+ 2 - 0
go.sum

@@ -438,6 +438,8 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg
 github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
 github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
 github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
+github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/gosuri/uitable v0.0.4 h1:IG2xLKRvErL3uhY6e1BylFzG+aJiwQviDDTfOKeKTpY=
 github.com/gosuri/uitable v0.0.4/go.mod h1:tKR86bXuXPZazfOTG1FIzvjIdXzd0mo4Vtn16vt0PJo=
 github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=

+ 0 - 2
internal/helm/grapher/relation.go

@@ -1,7 +1,6 @@
 package grapher
 
 import (
-	"fmt"
 	"strconv"
 )
 
@@ -71,7 +70,6 @@ func (parsed *ParsedObjs) GetControlRel() {
 			kind = ""
 		}
 
-		fmt.Println(kind.(string))
 		switch kind.(string) {
 		// Parse for all possible controller types
 		case "Deployment", "StatefulSet", "ReplicaSet", "DaemonSet", "Job":

+ 11 - 8
internal/kubernetes/agent.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"io"
 
+	"github.com/gorilla/websocket"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/cli-runtime/pkg/genericclioptions"
@@ -28,22 +29,24 @@ func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
 }
 
 // GetPodLogs streams real-time logs from a given pod.
-func (a *Agent) GetPodLogs(pod *v1.Pod) (string, error) {
-	podLogOpts := v1.PodLogOptions{}
-	req := a.Clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
+func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
+	// follow logs
+	podLogOpts := v1.PodLogOptions{Follow: true}
+	req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
 	podLogs, err := req.Stream(context.TODO())
+
 	if err != nil {
-		return "Error: Cannot open log stream.", fmt.Errorf("Cannot open log stream for pod %s", pod.Name)
+		return fmt.Errorf("Cannot open log stream for pod %s", name)
 	}
 	defer podLogs.Close()
 
 	buf := new(bytes.Buffer)
 	_, err = io.Copy(buf, podLogs)
+	log := buf.String()
 
-	if err != nil {
-		return "Error: Cannot encode Pod logs.", fmt.Errorf("Cannot copy logs from pod %s to buf", pod.Name)
+	if writeErr := conn.WriteMessage(websocket.TextMessage, []byte(log)); writeErr != nil {
+		return writeErr
 	}
-	str := buf.String()
 
-	return str, nil
+	return err
 }

+ 26 - 0
server/api/errors.go

@@ -19,6 +19,14 @@ type HTTPError struct {
 type ErrorCode int64
 
 var (
+	// ErrorUpgradeWebsocket describes an error while upgrading http to a websocket endpoint.
+	ErrorUpgradeWebsocket = HTTPError{
+		Code: 500,
+		Errors: []string{
+			"could not upgrade to websocket",
+		},
+	}
+
 	// ErrorDataWrite describes an error in writing to the database
 	ErrorDataWrite = HTTPError{
 		Code: 500,
@@ -27,6 +35,14 @@ var (
 		},
 	}
 
+	// ErrorWebsocketWrite describes an error in writing to websocket connection
+	ErrorWebsocketWrite = HTTPError{
+		Code: 500,
+		Errors: []string{
+			"could not write data via websocket",
+		},
+	}
+
 	// ErrorDataRead describes an error when reading from the database
 	ErrorDataRead = HTTPError{
 		Code: 500,
@@ -129,6 +145,16 @@ func (app *App) handleErrorDataWrite(err error, w http.ResponseWriter) {
 	app.sendExternalError(err, http.StatusInternalServerError, ErrorDataWrite, w)
 }
 
+// handleErrorWebsocketWrite handles an error from websocket.WriteMessage
+func (app *App) handleErrorWebsocketWrite(err error, w http.ResponseWriter) {
+	app.sendExternalError(err, http.StatusInternalServerError, ErrorWebsocketWrite, w)
+}
+
+// handleErrorUpgradeWebsocket handles error in upgrading a http endpoint to websocket conn
+func (app *App) handleErrorUpgradeWebsocket(err error, w http.ResponseWriter) {
+	app.sendExternalError(err, http.StatusInternalServerError, ErrorUpgradeWebsocket, w)
+}
+
 // handleErrorDataRead handles a database read error due to an internal error, such as
 // the database connection or gorm internals
 func (app *App) handleErrorDataRead(err error, w http.ResponseWriter) {

+ 80 - 0
server/api/k8s_handler.go

@@ -5,8 +5,10 @@ import (
 	"net/http"
 	"net/url"
 
+	"github.com/go-chi/chi"
 	"github.com/porter-dev/porter/internal/kubernetes"
 
+	"github.com/gorilla/websocket"
 	"github.com/porter-dev/porter/internal/forms"
 )
 
@@ -16,6 +18,11 @@ const (
 	ErrK8sValidate
 )
 
+var upgrader = websocket.Upgrader{
+	ReadBufferSize:  1024,
+	WriteBufferSize: 1024,
+}
+
 // HandleListNamespaces retrieves a list of namespaces
 func (app *App) HandleListNamespaces(w http.ResponseWriter, r *http.Request) {
 	session, err := app.store.Get(r, app.cookieName)
@@ -69,3 +76,76 @@ func (app *App) HandleListNamespaces(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 }
+
+// HandleGetPodLogs returns real-time logs of the pod via websockets
+func (app *App) HandleGetPodLogs(w http.ResponseWriter, r *http.Request) {
+	// get session to retrieve correct kubeconfig
+	session, err := app.store.Get(r, app.cookieName)
+
+	// get path parameters
+	namespace := chi.URLParam(r, "namespace")
+	podName := chi.URLParam(r, "name")
+
+	if err != nil {
+		app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
+		return
+	}
+
+	vals, err := url.ParseQuery(r.URL.RawQuery)
+
+	if err != nil {
+		app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
+		return
+	}
+
+	// get the filter options
+	form := &forms.K8sForm{
+		OutOfClusterConfig: &kubernetes.OutOfClusterConfig{},
+	}
+	form.PopulateK8sOptionsFromQueryParams(vals)
+
+	if sessID, ok := session.Values["user_id"].(uint); ok {
+		form.PopulateK8sOptionsFromUserID(sessID, app.repo.User)
+	}
+
+	// validate the form
+	if err := app.validator.Struct(form); err != nil {
+		app.handleErrorFormValidation(err, ErrK8sValidate, w)
+		return
+	}
+
+	// create a new agent
+	var agent *kubernetes.Agent
+
+	if app.testing {
+		agent = app.TestAgents.K8sAgent
+	} else {
+		agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
+	}
+
+	// allow all hosts for now
+	upgrader.CheckOrigin = func(r *http.Request) bool { return true }
+
+	// upgrade to websocket.
+	conn, err := upgrader.Upgrade(w, r, nil)
+	if err != nil {
+		app.handleErrorUpgradeWebsocket(err, w)
+	}
+
+	err = agent.GetPodLogs(namespace, podName, conn)
+
+	if err != nil {
+		app.handleErrorWebsocketWrite(err, w)
+		return
+	}
+
+	if err != nil {
+		app.handleErrorFormValidation(err, ErrK8sValidate, w)
+		return
+	}
+
+	if err := json.NewEncoder(w).Encode(logs); err != nil {
+		app.handleErrorFormDecoding(err, ErrK8sDecode, w)
+		return
+	}
+}

+ 1 - 0
server/router/router.go

@@ -45,6 +45,7 @@ func New(
 
 		// /api/k8s routes
 		r.Method("GET", "/k8s/namespaces", auth.BasicAuthenticate(requestlog.NewHandler(a.HandleListNamespaces, l)))
+		r.Method("GET", "/k8s/{namespace}/pod/{name}/logs", auth.BasicAuthenticate(requestlog.NewHandler(a.HandleGetPodLogs, l)))
 	})
 
 	fs := http.FileServer(http.Dir(staticFilePath))