瀏覽代碼

add log streaming using remotecommand for incidents

Mohammed Nafees 3 年之前
父節點
當前提交
c5f63aed06

+ 4 - 13
api/server/handlers/cluster/get_incident_event_logs.go

@@ -8,8 +8,8 @@ import (
 	"github.com/porter-dev/porter/api/server/shared"
 	"github.com/porter-dev/porter/api/server/shared/apierrors"
 	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/server/shared/websocket"
 	"github.com/porter-dev/porter/api/types"
-	porter_agent "github.com/porter-dev/porter/internal/kubernetes/porter_agent/v2"
 	"github.com/porter-dev/porter/internal/models"
 )
 
@@ -30,6 +30,7 @@ func NewGetIncidentEventLogsHandler(
 }
 
 func (c *GetIncidentEventLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	safeRW := r.Context().Value(types.RequestCtxWebsocketKey).(*websocket.WebsocketSafeReadWriter)
 	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
 
 	request := &types.GetIncidentEventLogsRequest{}
@@ -38,27 +39,17 @@ func (c *GetIncidentEventLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.R
 		return
 	}
 
-	agent, err := c.GetAgent(r, cluster, "")
+	k8sAgent, err := c.GetAgent(r, cluster, "monitoring")
 
 	if err != nil {
 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
 		return
 	}
 
-	// get agent service
-	agentSvc, err := porter_agent.GetAgentService(agent.Clientset)
+	err = k8sAgent.StreamPorterAgentLokiLog(request.LogID, safeRW)
 
 	if err != nil {
 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
 		return
 	}
-
-	logs, err := porter_agent.GetLogs(agent.Clientset, agentSvc, request.LogID)
-
-	if err != nil {
-		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
-		return
-	}
-
-	c.WriteResult(w, r, logs)
 }

+ 1 - 0
api/server/router/cluster.go

@@ -1168,6 +1168,7 @@ func getClusterRoutes(
 				types.ProjectScope,
 				types.ClusterScope,
 			},
+			IsWebsocket: true,
 		},
 	)
 

+ 1 - 1
api/types/cluster.go

@@ -280,7 +280,7 @@ type GetIncidentsRequest struct {
 }
 
 type GetIncidentEventLogsRequest struct {
-	LogID string `schema:"log_id"`
+	LogID string `schema:"log_id" form:"required"`
 }
 
 type IncidentNotifyRequest struct {

+ 121 - 0
internal/kubernetes/agent.go

@@ -46,6 +46,7 @@ import (
 	"k8s.io/client-go/rest"
 	"k8s.io/client-go/tools/cache"
 	"k8s.io/client-go/tools/remotecommand"
+	"k8s.io/kubectl/pkg/scheme"
 
 	rspb "helm.sh/helm/v3/pkg/release"
 )
@@ -1754,6 +1755,126 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 	return a.RunWebsocketTask(run)
 }
 
+func (a *Agent) StreamPorterAgentLokiLog(logID string, rw *websocket.WebsocketSafeReadWriter) error {
+	run := func() error {
+		errorchan := make(chan error)
+
+		var wg sync.WaitGroup
+		var once sync.Once
+		var err error
+
+		wg.Add(2)
+
+		go func() {
+			wg.Wait()
+			close(errorchan)
+		}()
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
+			defer wg.Done()
+
+			// listens for websocket closing handshake
+			for {
+				if _, _, err := rw.ReadMessage(); err != nil {
+					errorchan <- nil
+					return
+				}
+			}
+		}()
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
+			defer wg.Done()
+
+			podList, err := a.Clientset.CoreV1().Pods("monitoring").List(context.Background(), metav1.ListOptions{
+				LabelSelector: "", // FIXME: add label selector for porter agent deployment
+			})
+
+			if err != nil {
+				errorchan <- err
+				return
+			}
+
+			if len(podList.Items) == 0 {
+				errorchan <- fmt.Errorf("no porter agent pods found")
+				return
+			}
+
+			// FIXME: choose a pod randomly
+			pod := podList.Items[0]
+
+			restConf, err := a.RESTClientGetter.ToRESTConfig()
+
+			if err != nil {
+				errorchan <- err
+				return
+			}
+
+			req := a.Clientset.CoreV1().RESTClient().
+				Post().
+				Resource("pods").
+				Name(pod.Name).
+				Namespace(pod.Namespace).
+				SubResource("exec")
+
+			opts := &v1.PodExecOptions{
+				Command: []string{
+					"logzy",
+					logID,
+				},
+				Stdout: true,
+				Stderr: true,
+			}
+
+			req.VersionedParams(
+				opts,
+				scheme.ParameterCodec,
+			)
+
+			exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
+
+			if err != nil {
+				errorchan <- err
+				return
+			}
+
+			err = exec.Stream(remotecommand.StreamOptions{
+				Stdin:  nil,
+				Stdout: rw,
+				Stderr: rw,
+			})
+
+			if err != nil {
+				errorchan <- err
+				return
+			}
+		}()
+
+		for err = range errorchan {
+			once.Do(func() {
+				rw.Close()
+			})
+		}
+
+		return err
+	}
+
+	return a.RunWebsocketTask(run)
+}
+
 // CreateImagePullSecrets will create the required image pull secrets and
 // return a map from the registry name to the name of the secret.
 func (a *Agent) CreateImagePullSecrets(