Forráskód Böngészése

update log structure used by api clients (#4299)

ianedwards 2 éve
szülő
commit
e5439016c5

+ 5 - 1
api/server/handlers/namespace/stream_pod_logs_loki.go

@@ -13,6 +13,7 @@ import (
 	"github.com/porter-dev/porter/api/server/shared/websocket"
 	"github.com/porter-dev/porter/api/types"
 	"github.com/porter-dev/porter/internal/models"
+	"github.com/porter-dev/porter/internal/telemetry"
 )
 
 type StreamPodLogsLokiHandler struct {
@@ -32,6 +33,9 @@ func NewStreamPodLogsLokiHandler(
 }
 
 func (c *StreamPodLogsLokiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	ctx, span := telemetry.NewSpan(r.Context(), "serve-stream-pod-logs")
+	defer span.End()
+
 	request := &types.GetLogRequest{}
 
 	if ok := c.DecodeAndValidate(w, r, request); !ok {
@@ -59,7 +63,7 @@ func (c *StreamPodLogsLokiHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ
 		return
 	}
 
-	err = agent.StreamPorterAgentLokiLog([]string{
+	err = agent.StreamPorterAgentLokiLog(ctx, []string{
 		fmt.Sprintf("pod=%s", request.PodSelector),
 		fmt.Sprintf("namespace=%s", request.Namespace),
 	}, string(startTime), request.SearchParam, 0, safeRW)

+ 90 - 40
api/server/handlers/porter_app/logs_apply_v2.go

@@ -15,6 +15,7 @@ import (
 	"github.com/porter-dev/porter/internal/deployment_target"
 	porter_agent "github.com/porter-dev/porter/internal/kubernetes/porter_agent/v2"
 	"github.com/porter-dev/porter/internal/models"
+	"github.com/porter-dev/porter/internal/porter_app"
 	"github.com/porter-dev/porter/internal/telemetry"
 )
 
@@ -38,15 +39,17 @@ func NewAppLogsHandler(
 
 // AppLogsRequest represents the accepted fields on a request to the /apps/logs endpoint
 type AppLogsRequest struct {
-	DeploymentTargetID string    `schema:"deployment_target_id"`
-	ServiceName        string    `schema:"service_name"`
-	AppID              uint      `schema:"app_id"`
-	Limit              uint      `schema:"limit"`
-	StartRange         time.Time `schema:"start_range,omitempty"`
-	EndRange           time.Time `schema:"end_range,omitempty"`
-	SearchParam        string    `schema:"search_param"`
-	Direction          string    `schema:"direction"`
-	AppRevisionID      string    `schema:"app_revision_id"`
+	DeploymentTargetID   string    `schema:"deployment_target_id"`
+	DeploymentTargetName string    `schema:"deployment_target_name"`
+	ServiceName          string    `schema:"service_name"`
+	AppID                uint      `schema:"app_id"`
+	Limit                uint      `schema:"limit"`
+	StartRange           time.Time `schema:"start_range,omitempty"`
+	EndRange             time.Time `schema:"end_range,omitempty"`
+	SearchParam          string    `schema:"search_param"`
+	Direction            string    `schema:"direction"`
+	AppRevisionID        string    `schema:"app_revision_id"`
+	JobRunName           string    `schema:"job_run_name"`
 }
 
 const (
@@ -55,9 +58,19 @@ const (
 	lokiLabel_PorterServiceName   = "porter_run_service_name"
 	lokiLabel_PorterAppRevisionID = "porter_run_app_revision_id"
 	lokiLabel_DeploymentTargetId  = "porter_run_deployment_target_id"
+	lokiLabel_JobRunName          = "job_name"
 	lokiLabel_Namespace           = "namespace"
 )
 
+const defaultLogLineLimit = 1000
+
+// AppLogsResponse represents the response to the /apps/logs endpoint
+type AppLogsResponse struct {
+	BackwardContinueTime *time.Time                 `json:"backward_continue_time,omitempty"`
+	ForwardContinueTime  *time.Time                 `json:"forward_continue_time,omitempty"`
+	Logs                 []porter_app.StructuredLog `json:"logs"`
+}
+
 // ServeHTTP gets logs for a given app, service, and deployment target
 func (c *AppLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	ctx, span := telemetry.NewSpan(r.Context(), "serve-app-logs")
@@ -81,12 +94,6 @@ func (c *AppLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "app-name", Value: appName})
 
-	if request.AppID == 0 {
-		err := telemetry.Error(ctx, span, nil, "must provide app id")
-		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
-		return
-	}
-
 	if request.ServiceName == "" {
 		err := telemetry.Error(ctx, span, nil, "must provide service name")
 		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
@@ -94,18 +101,32 @@ func (c *AppLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "service-name", Value: request.ServiceName})
 
-	if request.DeploymentTargetID == "" {
-		err := telemetry.Error(ctx, span, nil, "must provide deployment target id")
-		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
-		return
+	deploymentTargetName := request.DeploymentTargetName
+	if request.DeploymentTargetName == "" && request.DeploymentTargetID == "" {
+		defaultDeploymentTarget, err := defaultDeploymentTarget(ctx, defaultDeploymentTargetInput{
+			ProjectID:                 project.ID,
+			ClusterID:                 cluster.ID,
+			ClusterControlPlaneClient: c.Config().ClusterControlPlaneClient,
+		})
+		if err != nil {
+			err := telemetry.Error(ctx, span, err, "error getting default deployment target")
+			c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+			return
+		}
+		deploymentTargetName = defaultDeploymentTarget.Name
 	}
-	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "deployment-target-id", Value: request.DeploymentTargetID})
+
+	telemetry.WithAttributes(span,
+		telemetry.AttributeKV{Key: "deployment-target-name", Value: deploymentTargetName},
+		telemetry.AttributeKV{Key: "deployment-target-id", Value: request.DeploymentTargetID},
+	)
 
 	deploymentTarget, err := deployment_target.DeploymentTargetDetails(ctx, deployment_target.DeploymentTargetDetailsInput{
-		ProjectID:          int64(project.ID),
-		ClusterID:          int64(cluster.ID),
-		DeploymentTargetID: request.DeploymentTargetID,
-		CCPClient:          c.Config().ClusterControlPlaneClient,
+		ProjectID:            int64(project.ID),
+		ClusterID:            int64(cluster.ID),
+		DeploymentTargetID:   request.DeploymentTargetID,
+		DeploymentTargetName: deploymentTargetName,
+		CCPClient:            c.Config().ClusterControlPlaneClient,
 	})
 	if err != nil {
 		err := telemetry.Error(ctx, span, err, "error getting deployment target details")
@@ -116,14 +137,32 @@ func (c *AppLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	namespace := deploymentTarget.Namespace
 	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "namespace", Value: namespace})
 
-	if request.StartRange.IsZero() || request.EndRange.IsZero() {
-		err := telemetry.Error(ctx, span, nil, "must provide start and end range")
-		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
-		return
+	startRange := request.StartRange
+	if request.StartRange.IsZero() {
+		dayAgo := time.Now().Add(-24 * time.Hour).UTC()
+		startRange = dayAgo
 	}
+
+	endRange := request.EndRange
+	if request.EndRange.IsZero() {
+		endRange = time.Now().UTC()
+	}
+
+	limit := request.Limit
+	if request.Limit == 0 {
+		limit = defaultLogLineLimit
+	}
+
+	direction := request.Direction
+	if request.Direction == "" {
+		direction = "backward"
+	}
+
 	telemetry.WithAttributes(span,
 		telemetry.AttributeKV{Key: "start-range", Value: request.StartRange.String()},
 		telemetry.AttributeKV{Key: "end-range", Value: request.EndRange.String()},
+		telemetry.AttributeKV{Key: "limit", Value: limit},
+		telemetry.AttributeKV{Key: "direction", Value: direction},
 	)
 
 	k8sAgent, err := c.GetAgent(r, cluster, "")
@@ -141,36 +180,47 @@ func (c *AppLogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 
 	matchLabels := map[string]string{
-		lokiLabel_Namespace:     namespace,
-		lokiLabel_PorterAppName: appName,
-		lokiLabel_PorterAppID:   fmt.Sprintf("%d", request.AppID),
+		lokiLabel_Namespace:          namespace,
+		lokiLabel_PorterAppName:      appName,
+		lokiLabel_DeploymentTargetId: request.DeploymentTargetID,
 	}
 
 	if request.ServiceName != "all" {
 		matchLabels[lokiLabel_PorterServiceName] = request.ServiceName
 	}
-
 	if request.AppRevisionID != "" {
 		matchLabels[lokiLabel_PorterAppRevisionID] = request.AppRevisionID
 	}
-
-	matchLabels[lokiLabel_DeploymentTargetId] = request.DeploymentTargetID
+	if request.JobRunName != "" {
+		matchLabels[lokiLabel_JobRunName] = request.JobRunName
+	}
 
 	logRequest := &types.LogRequest{
-		Limit:       request.Limit,
-		StartRange:  &request.StartRange,
-		EndRange:    &request.EndRange,
+		Limit:       limit,
+		StartRange:  &startRange,
+		EndRange:    &endRange,
 		MatchLabels: matchLabels,
-		Direction:   request.Direction,
+		Direction:   direction,
 		SearchParam: request.SearchParam,
 	}
 
-	logs, err := porter_agent.Logs(k8sAgent.Clientset, agentSvc, logRequest)
+	logs, err := porter_agent.Logs(ctx, k8sAgent.Clientset, agentSvc, logRequest)
 	if err != nil {
 		_ = telemetry.Error(ctx, span, err, "unable to get logs")
 		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(fmt.Errorf("unable to get logs"), http.StatusInternalServerError))
 		return
 	}
+	if logs == nil {
+		err := telemetry.Error(ctx, span, nil, "logs response is nil")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
+	res := AppLogsResponse{
+		Logs:                 porter_app.AgentLogToStructuredLog(logs.Logs),
+		ForwardContinueTime:  logs.ForwardContinueTime,
+		BackwardContinueTime: logs.BackwardContinueTime,
+	}
 
-	c.WriteResult(w, r, logs)
+	c.WriteResult(w, r, res)
 }

+ 27 - 11
api/server/handlers/porter_app/stream_logs.go

@@ -68,18 +68,32 @@ func (c *StreamLogsLokiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
 	}
 	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "service-name", Value: request.ServiceName})
 
-	if request.DeploymentTargetID == "" {
-		err := telemetry.Error(ctx, span, nil, "must provide deployment target id")
-		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
-		return
+	deploymentTargetName := request.DeploymentTargetName
+	if request.DeploymentTargetName == "" && request.DeploymentTargetID == "" {
+		defaultDeploymentTarget, err := defaultDeploymentTarget(ctx, defaultDeploymentTargetInput{
+			ProjectID:                 project.ID,
+			ClusterID:                 cluster.ID,
+			ClusterControlPlaneClient: c.Config().ClusterControlPlaneClient,
+		})
+		if err != nil {
+			err := telemetry.Error(ctx, span, err, "error getting default deployment target")
+			c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+			return
+		}
+		deploymentTargetName = defaultDeploymentTarget.Name
 	}
-	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "deployment-target-id", Value: request.DeploymentTargetID})
+
+	telemetry.WithAttributes(span,
+		telemetry.AttributeKV{Key: "deployment-target-name", Value: deploymentTargetName},
+		telemetry.AttributeKV{Key: "deployment-target-id", Value: request.DeploymentTargetID},
+	)
 
 	deploymentTarget, err := deployment_target.DeploymentTargetDetails(ctx, deployment_target.DeploymentTargetDetailsInput{
-		ProjectID:          int64(project.ID),
-		ClusterID:          int64(cluster.ID),
-		DeploymentTargetID: request.DeploymentTargetID,
-		CCPClient:          c.Config().ClusterControlPlaneClient,
+		ProjectID:            int64(project.ID),
+		ClusterID:            int64(cluster.ID),
+		DeploymentTargetID:   request.DeploymentTargetID,
+		DeploymentTargetName: deploymentTargetName,
+		CCPClient:            c.Config().ClusterControlPlaneClient,
 	})
 	if err != nil {
 		err := telemetry.Error(ctx, span, err, "error getting deployment target details")
@@ -122,14 +136,16 @@ func (c *StreamLogsLokiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
 	if request.ServiceName != "all" {
 		labels = append(labels, fmt.Sprintf("%s=%s", lokiLabel_PorterServiceName, request.ServiceName))
 	}
-
 	if request.AppRevisionID != "" {
 		labels = append(labels, fmt.Sprintf("%s=%s", lokiLabel_PorterAppRevisionID, request.AppRevisionID))
 	}
+	if request.JobRunName != "" {
+		labels = append(labels, fmt.Sprintf("%s=%s", lokiLabel_JobRunName, request.JobRunName))
+	}
 
 	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "labels", Value: strings.Join(labels, ",")})
 
-	err = agent.StreamPorterAgentLokiLog(labels, string(startTime), request.SearchParam, 0, safeRW)
+	err = agent.StreamPorterAgentLokiLog(ctx, labels, string(startTime), request.SearchParam, 0, safeRW)
 	if err != nil {
 		err := telemetry.Error(ctx, span, err, "error streaming logs")
 		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))

+ 17 - 15
dashboard/src/main/home/app-dashboard/app-view/tabs/LogsTab.tsx

@@ -1,24 +1,26 @@
 import React from "react";
-import Logs from "../../validate-apply/logs/Logs"
+
+import Logs from "../../validate-apply/logs/Logs";
 import { useLatestRevision } from "../LatestRevisionContext";
 
 const LogsTab: React.FC = () => {
-    const { projectId, clusterId, latestProto, deploymentTarget, porterApp } = useLatestRevision();
+  const { projectId, clusterId, latestProto, deploymentTarget, porterApp } =
+    useLatestRevision();
 
-    const appName = latestProto.name
-    const serviceNames = Object.keys(latestProto.services)
+  const appName = latestProto.name;
+  const serviceNames = Object.keys(latestProto.services);
 
-    return (
-        <Logs
-            projectId={projectId}
-            clusterId={clusterId}
-            appName={appName}
-            serviceNames={serviceNames}
-            deploymentTargetId={deploymentTarget.id}
-            filterPredeploy={true}
-            appId={porterApp.id}
-        />
-    );
+  return (
+    <Logs
+      projectId={projectId}
+      clusterId={clusterId}
+      appName={appName}
+      serviceNames={serviceNames}
+      deploymentTargetId={deploymentTarget.id}
+      filterPredeploy={true}
+      appId={porterApp.id}
+    />
+  );
 };
 
 export default LogsTab;

+ 2 - 0
dashboard/src/main/home/app-dashboard/expanded-app/logs/types.ts

@@ -25,6 +25,8 @@ const rawLabelsValidator = z.object({
     porter_run_app_revision_id: z.string().optional(),
     porter_run_service_name: z.string().optional(),
     porter_run_service_type: z.string().optional(),
+    porter_run_deployment_target_id: z.string().optional(),
+    job_name: z.string().optional(),
     controller_uid: z.string().optional(),
 });
 export type RawLabels = z.infer<typeof rawLabelsValidator>;

+ 1 - 1
dashboard/src/main/home/app-dashboard/validate-apply/jobs/JobRunDetails.tsx

@@ -107,7 +107,7 @@ const JobRunDetails: React.FC<Props> = ({ jobRun }) => {
         }}
         appId={porterApp.id}
         defaultLatestRevision={false}
-        jobRunID={jobRun.id}
+        jobRunName={jobRun.name}
       />
     </>
   );

+ 6 - 5
dashboard/src/main/home/app-dashboard/validate-apply/logs/Logs.tsx

@@ -23,13 +23,13 @@ import api from "shared/api";
 import spinner from "assets/loading.gif";
 
 import { useLatestRevision } from "../../app-view/LatestRevisionContext";
-import StyledLogs from "../../expanded-app/logs/StyledLogs";
 import {
   Direction,
   GenericFilter,
   GenericFilterOption,
   type FilterName,
 } from "../../expanded-app/logs/types";
+import StyledLogs from "./StyledLogs";
 import { useLogs } from "./utils";
 
 type Props = {
@@ -50,7 +50,7 @@ type Props = {
   selectedRevisionId?: string;
   defaultScrollToBottomEnabled?: boolean;
   defaultLatestRevision?: boolean;
-  jobRunID?: string;
+  jobRunName?: string;
 };
 
 const DEFAULT_LOG_TIMEOUT_SECONDS = 60;
@@ -70,7 +70,7 @@ const Logs: React.FC<Props> = ({
   selectedRevisionId,
   defaultScrollToBottomEnabled = true,
   defaultLatestRevision = true,
-  jobRunID = "",
+  jobRunName = "",
 }) => {
   const { search } = useLocation();
   const queryParams = new URLSearchParams(search);
@@ -128,7 +128,7 @@ const Logs: React.FC<Props> = ({
     });
   }, [selectedService, selectedRevisionId]);
 
-  const { revisionIdToNumber } = useRevisionList({
+  const { revisionIdToNumber, numberToRevisionId } = useRevisionList({
     appName,
     deploymentTargetId,
     projectId,
@@ -251,12 +251,13 @@ const Logs: React.FC<Props> = ({
     notify,
     setLoading: setIsLoading,
     revisionIdToNumber,
+    revisionNumberToId: numberToRevisionId,
     setDate: selectedDate,
     appRevisionId,
     filterPredeploy,
     timeRange,
     appID: appId,
-    jobRunID,
+    jobRunName,
   });
 
   const {

+ 191 - 0
dashboard/src/main/home/app-dashboard/validate-apply/logs/StyledLogs.tsx

@@ -0,0 +1,191 @@
+import React from "react";
+import type Anser from "anser";
+import dayjs from "dayjs";
+import styled from "styled-components";
+import { match } from "ts-pattern";
+
+import { GenericFilter } from "../../expanded-app/logs/types";
+import { type PorterLog } from "./utils";
+
+export const getVersionTagColor = (version: string): string => {
+  const colors = ["#7B61FF", "#FF7B61", "#61FF7B"];
+
+  const versionInt = parseInt(version);
+  if (isNaN(versionInt)) {
+    return colors[0];
+  }
+  return colors[versionInt % colors.length];
+};
+
+type Props = {
+  logs: PorterLog[];
+  appName: string;
+  filters: GenericFilter[];
+};
+
+const StyledLogs: React.FC<Props> = ({ logs, filters }) => {
+  const renderFilterTagForLog = (
+    filter: GenericFilter,
+    log: PorterLog,
+    index: number
+  ): React.ReactNode => {
+    return match(filter.name)
+      .with("revision", () => {
+        return (
+          <StyledLogsTableData width={"100px"} key={index}>
+            <LogInnerPill
+              color={getVersionTagColor(log.revision)}
+              key={index}
+              onClick={() => {
+                filter.setValue(log.revision);
+              }}
+            >
+              {`Version: ${log.revision}`}
+            </LogInnerPill>
+          </StyledLogsTableData>
+        );
+      })
+      .with("pod_name", () => {
+        return (
+          <StyledLogsTableData width={"100px"} key={index}>
+            <LogInnerPill
+              color={"white"}
+              key={index}
+              onClick={() => {
+                filter.setValue(log.service_name);
+              }}
+            >
+              {log.service_name}
+            </LogInnerPill>
+          </StyledLogsTableData>
+        );
+      })
+      .with("service_name", () => {
+        return (
+          <StyledLogsTableData width={"100px"} key={index}>
+            <LogInnerPill
+              color={"white"}
+              key={index}
+              onClick={() => {
+                filter.setValue(
+                  log.service_name ??
+                    GenericFilter.getDefaultOption("service_name").value
+                );
+              }}
+            >
+              {log.service_name}
+            </LogInnerPill>
+          </StyledLogsTableData>
+        );
+      })
+      .otherwise(() => null);
+  };
+
+  return (
+    <StyledLogsTable>
+      <StyledLogsTableBody>
+        {logs.map((log, i) => {
+          return (
+            <StyledLogsTableRow key={[log.lineNumber, i].join(".")}>
+              <StyledLogsTableData width={"100px"}>
+                <LineTimestamp className="line-timestamp">
+                  {log.timestamp
+                    ? dayjs(log.timestamp).format("MM/DD HH:mm:ss")
+                    : "-"}
+                </LineTimestamp>
+              </StyledLogsTableData>
+              {filters.map((filter, j) => {
+                return renderFilterTagForLog(filter, log, j);
+              })}
+              <StyledLogsTableData>
+                <LogOuter key={[log.lineNumber, i].join(".")}>
+                  {log.line?.map((ansi, j) => {
+                    if (ansi.clearLine) {
+                      return null;
+                    }
+
+                    return (
+                      <LogInnerSpan
+                        key={[log.lineNumber, i, j].join(".")}
+                        ansi={ansi}
+                      >
+                        {ansi.content.replace(/ /g, "\u00a0")}
+                      </LogInnerSpan>
+                    );
+                  })}
+                </LogOuter>
+              </StyledLogsTableData>
+            </StyledLogsTableRow>
+          );
+        })}
+      </StyledLogsTableBody>
+    </StyledLogsTable>
+  );
+};
+
+export default StyledLogs;
+
+const StyledLogsTable = styled.table`
+  border-collapse: collapse;
+`;
+
+const StyledLogsTableBody = styled.tbody``;
+
+const StyledLogsTableRow = styled.tr``;
+
+const StyledLogsTableData = styled.td<{ width?: string }>`
+  padding: 2px;
+  vertical-align: top;
+  ${(props) => props.width && `width: ${props.width};`}
+`;
+
+const LineTimestamp = styled.div`
+  height: 100%;
+  color: #949effff;
+  opacity: 0.5;
+  font-family: monospace;
+  white-space: nowrap;
+`;
+
+const LogInnerPill = styled.div<{ color: string }>`
+  display: inline-block;
+  vertical-align: middle;
+  width: 120px;
+  padding: 0px 5px;
+  height: 20px;
+  color: black;
+  background-color: ${(props) => props.color};
+  border-radius: 5px;
+  opacity: 1;
+  font-family: monospace;
+  cursor: pointer;
+  hover: {
+    border: 1px solid #949effff;
+  }
+  overflow: hidden;
+  white-space: nowrap;
+  text-overflow: ellipsis;
+`;
+
+const LogOuter = styled.div`
+  user-select: text;
+  display: inline-block;
+  word-wrap: anywhere;
+  flex-grow: 1;
+  font-family: monospace, sans-serif;
+  font-size: 12px;
+`;
+
+const LogInnerSpan = styled.span`
+  user-select: text;
+  font-family: monospace, sans-serif;
+  font-size: 12px;
+  font-weight: ${(props: { ansi: Anser.AnserJsonEntry }) =>
+    props.ansi?.decoration && props.ansi?.decoration === "bold"
+      ? "700"
+      : "400"};
+  color: ${(props: { ansi: Anser.AnserJsonEntry }) =>
+    props.ansi?.fg ? `rgb(${props.ansi?.fg})` : "white"};
+  background-color: ${(props: { ansi: Anser.AnserJsonEntry }) =>
+    props.ansi?.bg ? `rgb(${props.ansi?.bg})` : "transparent"};
+`;

+ 162 - 126
dashboard/src/main/home/app-dashboard/validate-apply/logs/utils.ts

@@ -1,42 +1,76 @@
+import { useEffect, useRef, useState } from "react";
+import Anser from "anser";
 import dayjs, { type Dayjs } from "dayjs";
 import _ from "lodash";
-import { useEffect, useRef, useState } from "react";
+import { z } from "zod";
+
 import api from "shared/api";
-import Anser from "anser";
-import { useWebsockets, type NewWebsocketOptions } from "shared/hooks/useWebsockets";
 import {
-  type AgentLog,
+  useWebsockets,
+  type NewWebsocketOptions,
+} from "shared/hooks/useWebsockets";
+
+import {
   agentLogValidator,
   Direction,
-  type PorterLog,
-  type PaginationInfo,
+  GenericFilter,
+  type AgentLog,
   type FilterName,
-  GenericFilter
+  type PaginationInfo,
 } from "../../expanded-app/logs/types";
 
 const MAX_LOGS = 5000;
 const MAX_BUFFER_LOGS = 1000;
 const QUERY_LIMIT = 1000;
 
-export const parseLogs = (logs: any[] = []): PorterLog[] => {
-  return logs.map((log: any, idx) => {
+const porterLogValidator = z.object({
+  timestamp: z.string(),
+  line: z.string().transform((val) => Anser.ansiToJson(val)),
+  output_stream: z.string(),
+  service_name: z.string(),
+  app_revision_id: z.string(),
+  deployment_target_id: z.string(),
+  job_name: z.string().default(""),
+  job_run_id: z.string().default(""),
+  lineNumber: z.number().default(0),
+  revision: z.string().default("0"),
+});
+export type PorterLog = z.infer<typeof porterLogValidator>;
+
+export const parseLogsFromAgent = (logs: unknown[] = []): PorterLog[] => {
+  return logs.map((log, idx) => {
     try {
       const parsed: AgentLog = agentLogValidator.parse(log);
       // TODO Move log parsing to the render method
       const ansiLog = Anser.ansiToJson(parsed.line);
       return {
+        timestamp: parsed.timestamp,
         line: ansiLog,
+        output_stream: parsed.metadata?.output_stream ?? "",
+        service_name:
+          parsed.metadata?.raw_labels?.porter_run_service_name ?? "",
+        app_revision_id:
+          parsed.metadata?.raw_labels?.porter_run_app_revision_id ?? "",
+        deployment_target_id:
+          parsed.metadata?.raw_labels?.porter_run_deployment_target_id ?? "",
+        job_name: parsed.metadata?.raw_labels?.job_name ?? "",
+        job_run_id: parsed.metadata?.raw_labels?.controller_uid ?? "",
+        revision: "0",
         lineNumber: idx + 1,
-        timestamp: parsed.timestamp,
-        metadata: parsed.metadata,
       };
     } catch (err) {
-      console.log(err)
       return {
-        line: Anser.ansiToJson(log.toString()),
+        timestamp: "",
+        line: Anser.ansiToJson(JSON.stringify(log)),
+        output_stream: "",
+        service_name: "",
+        app_revision_id: "",
+        deployment_target_id: "",
+        job_name: "",
+        job_run_id: "",
+        revision: "0",
         lineNumber: idx + 1,
-        timestamp: undefined,
-      }
+      };
     }
   });
 };
@@ -51,35 +85,38 @@ export const useLogs = ({
   notify,
   setLoading,
   revisionIdToNumber,
+  revisionNumberToId,
   setDate,
   appRevisionId = "",
   timeRange,
   filterPredeploy,
   appID,
-  jobRunID = ""
+  jobRunName = "",
 }: {
-  projectID: number,
-  clusterID: number,
-  selectedFilterValues: Record<FilterName, string>,
-  appName: string,
-  deploymentTargetId: string,
-  searchParam: string,
-  notify: (message: string) => void,
-  setLoading: (isLoading: boolean) => void,
-  revisionIdToNumber: Record<string, number>,
+  projectID: number;
+  clusterID: number;
+  selectedFilterValues: Record<FilterName, string>;
+  appName: string;
+  deploymentTargetId: string;
+  searchParam: string;
+  notify: (message: string) => void;
+  setLoading: (isLoading: boolean) => void;
+  revisionIdToNumber: Record<string, number>;
+  revisionNumberToId: Record<number, string>;
   // if setDate is set, results are not live
-  setDate?: Date,
-  appRevisionId?: string,
+  setDate?: Date;
+  appRevisionId?: string;
   timeRange?: {
-    startTime?: Dayjs,
-    endTime?: Dayjs,
-  },
-  filterPredeploy: boolean,
-  appID: number,
-  jobRunID?: string,
-}
-) => {
-  const [isLive, setIsLive] = useState<boolean>(!setDate && (timeRange?.startTime == null && timeRange?.endTime == null));
+    startTime?: Dayjs;
+    endTime?: Dayjs;
+  };
+  filterPredeploy: boolean;
+  appID: number;
+  jobRunName?: string;
+}) => {
+  const [isLive, setIsLive] = useState<boolean>(
+    !setDate && timeRange?.startTime == null && timeRange?.endTime == null
+  );
   const logsBufferRef = useRef<PorterLog[]>([]);
   const [logs, setLogs] = useState<PorterLog[]>([]);
   const [paginationInfo, setPaginationInfo] = useState<PaginationInfo>({
@@ -98,16 +135,12 @@ export const useLogs = ({
   //   result of the initial query
   // - moving the cursor both forward and backward changes the start and end dates
 
-  const {
-    newWebsocket,
-    openWebsocket,
-    closeAllWebsockets,
-  } = useWebsockets();
+  const { newWebsocket, openWebsocket, closeAllWebsockets } = useWebsockets();
 
   const updateLogs = (
     newLogs: PorterLog[],
     direction: Direction = Direction.forward
-  ) => {
+  ): void => {
     // Nothing to update here
     if (!newLogs.length) {
       return;
@@ -187,7 +220,7 @@ export const useLogs = ({
       search_param: searchParam,
       app_revision_id: appRevisionId,
       app_id: appID.toString(),
-    }
+    };
 
     const q = new URLSearchParams(searchParams).toString();
 
@@ -202,31 +235,32 @@ export const useLogs = ({
         if (evt.data == null) {
           return;
         }
-        const jsonData = evt.data.trim().split("\n")
-        const newLogs: any[] = [];
-        jsonData.forEach((data: string) => {
-          try {
-            const jsonLog = JSON.parse(data);
-            newLogs.push(jsonLog)
-          } catch (err) {
-            // TODO: better error handling
-            // console.log(err)
+        const jsonData = evt.data.trim().split("\n");
+        const newLogs = jsonData.map((data: string) => {
+          const parsedLogData = z
+            .record(z.unknown())
+            .safeParse(JSON.parse(data));
+          if (!parsedLogData.success) {
+            return {};
           }
+
+          return parsedLogData.data;
         });
-        const newLogsParsed = parseLogs(newLogs);
-        newLogsParsed.filter((log) => {
-          return log.metadata?.raw_labels?.porter_run_app_revision_id != null
-            && revisionIdToNumber[log.metadata.raw_labels.porter_run_app_revision_id] != null
-            && revisionIdToNumber[log.metadata.raw_labels.porter_run_app_revision_id] != 0
-        }).forEach((log) => {
-          if (log.metadata?.raw_labels?.porter_run_app_revision_id != null) {
-            const revisionNumber = revisionIdToNumber[log.metadata.raw_labels.porter_run_app_revision_id];
-            if (revisionNumber != null && revisionNumber != 0) {
-              log.metadata.revision = revisionNumber.toString();
-            }
-          }
-        })
-        const newLogsFiltered = filterLogs(newLogsParsed);
+        const newLogsParsed = parseLogsFromAgent(newLogs);
+
+        const logsWithRevisionNumber = newLogsParsed
+          .filter(
+            (log) =>
+              !!log.app_revision_id &&
+              !!revisionIdToNumber[log.app_revision_id] &&
+              revisionIdToNumber[log.app_revision_id] !== 0
+          )
+          .map((log) => ({
+            ...log,
+            revision: revisionIdToNumber[log.app_revision_id].toString(),
+          }));
+
+        const newLogsFiltered = filterLogs(logsWithRevisionNumber);
         pushLogs(newLogsFiltered);
       },
       onclose: () => {
@@ -238,32 +272,17 @@ export const useLogs = ({
     openWebsocket(websocketKey);
   };
 
-  const filterLogs = (logs: PorterLog[]) => {
-    return logs.filter(log => {
-      if (log.metadata == null) {
-        return true;
-      }
-
-      if (jobRunID !== "" && log.metadata.raw_labels?.controller_uid !== jobRunID) {
-        return false;
-      }
-
-      if (selectedFilterValues.output_stream !== GenericFilter.getDefaultOption("output_stream").value &&
-        log.metadata.output_stream !== selectedFilterValues.output_stream) {
-        return false;
-      }
-
-      if (filterPredeploy && (log.metadata.raw_labels?.porter_run_service_name ?? "").endsWith("predeploy")) {
-        return false;
-      }
-
-      if (selectedFilterValues.revision !== GenericFilter.getDefaultOption("revision").value &&
-        log.metadata.revision !== selectedFilterValues.revision) {
+  const filterLogs = (logs: PorterLog[]): PorterLog[] => {
+    return logs.filter((log) => {
+      if (
+        selectedFilterValues.output_stream !==
+          GenericFilter.getDefaultOption("output_stream").value &&
+        log.output_stream !== selectedFilterValues.output_stream
+      ) {
         return false;
       }
 
-      if (selectedFilterValues.revision_id !== GenericFilter.getDefaultOption("revision_id").value &&
-        log.metadata.raw_labels?.porter_run_app_revision_id !== selectedFilterValues.revision_id) {
+      if (filterPredeploy && log.service_name.endsWith("predeploy")) {
         return false;
       }
 
@@ -283,7 +302,6 @@ export const useLogs = ({
   }> => {
     try {
       const getLogsReq = {
-        app_id: appID,
         service_name: selectedFilterValues.service_name,
         deployment_target_id: deploymentTargetId,
         search_param: searchParam,
@@ -291,20 +309,26 @@ export const useLogs = ({
         end_range: endDate,
         limit,
         direction,
-        app_revision_id: appRevisionId,
+        app_revision_id:
+          revisionNumberToId[parseInt(selectedFilterValues.revision)],
+        job_run_name: jobRunName,
       };
 
-      const logsResp = await api.appLogs(
-        "<token>",
-        getLogsReq,
-        {
-          cluster_id: clusterID,
-          project_id: projectID,
-          porter_app_name: appName,
-        }
-      )
+      const logsResp = await api.appLogs("<token>", getLogsReq, {
+        cluster_id: clusterID,
+        project_id: projectID,
+        porter_app_name: appName,
+      });
+
+      const parsedRes = z
+        .object({
+          logs: z.array(porterLogValidator),
+          backward_continue_time: z.string().nullable(),
+          forward_continue_time: z.string().nullable(),
+        })
+        .safeParse(logsResp.data);
 
-      if (logsResp.data == null) {
+      if (!parsedRes.success) {
         return {
           logs: [],
           previousCursor: null,
@@ -312,32 +336,32 @@ export const useLogs = ({
         };
       }
 
-      const newLogs = parseLogs(logsResp.data.logs);
+      const newLogs = parsedRes.data.logs;
       if (direction === Direction.backward) {
         newLogs.reverse();
       }
 
-      newLogs.filter((log) => {
-        return log.metadata?.raw_labels?.porter_run_app_revision_id != null
-          && revisionIdToNumber[log.metadata.raw_labels.porter_run_app_revision_id] != null
-          && revisionIdToNumber[log.metadata.raw_labels.porter_run_app_revision_id] != 0
-      }).forEach((log) => {
-        if (log.metadata?.raw_labels?.porter_run_app_revision_id != null) {
-          const revisionNumber = revisionIdToNumber[log.metadata.raw_labels.porter_run_app_revision_id];
-          if (revisionNumber != null && revisionNumber != 0) {
-            log.metadata.revision = revisionNumber.toString();
-          }
-        }
-      })
+      const logsWithRevisionNumber = newLogs
+        .filter(
+          (log) =>
+            !!log.app_revision_id &&
+            !!revisionIdToNumber[log.app_revision_id] &&
+            revisionIdToNumber[log.app_revision_id] !== 0
+        )
+        .map((log) => ({
+          ...log,
+          revision: revisionIdToNumber[log.app_revision_id].toString(),
+        }));
 
       return {
-        logs: newLogs,
+        logs: logsWithRevisionNumber,
         previousCursor:
           // There are no more historical logs so don't set the previous cursor
-          newLogs.length < QUERY_LIMIT && direction == Direction.backward
+          logsWithRevisionNumber.length < QUERY_LIMIT &&
+          direction === Direction.backward
             ? null
-            : logsResp.data.backward_continue_time,
-        nextCursor: logsResp.data.forward_continue_time,
+            : parsedRes.data.backward_continue_time,
+        nextCursor: parsedRes.data.forward_continue_time,
       };
     } catch {
       return {
@@ -352,10 +376,18 @@ export const useLogs = ({
     setLoading(true);
     setLogs([]);
     flushLogsBuffer(true);
-    const endDate = timeRange?.endTime != null ? timeRange.endTime : dayjs(setDate);
-    const oneDayAgo = timeRange?.startTime != null ? timeRange.startTime : endDate.subtract(1, "day");
-
-    const { logs: initialLogs, previousCursor, nextCursor } = await queryLogs(
+    const endDate =
+      timeRange?.endTime != null ? timeRange.endTime : dayjs(setDate);
+    const oneDayAgo =
+      timeRange?.startTime != null
+        ? timeRange.startTime
+        : endDate.subtract(1, "day");
+
+    const {
+      logs: initialLogs,
+      previousCursor,
+      nextCursor,
+    } = await queryLogs(
       oneDayAgo.toISOString(),
       endDate.toISOString(),
       Direction.backward
@@ -468,14 +500,18 @@ export const useLogs = ({
 
     const flushLogsBufferInterval = setInterval(flushLogsBuffer, 3000);
 
-    return () => { clearInterval(flushLogsBufferInterval); };
+    return () => {
+      clearInterval(flushLogsBufferInterval);
+    };
   }, []);
 
   useEffect(() => {
     if (Object.keys(revisionIdToNumber).length) {
       // if a complete time range is not given, then we are live
-      const isLive = !setDate && (timeRange?.startTime == null || timeRange?.endTime == null);
-      refresh({ isLive });
+      const isLive =
+        !setDate &&
+        (timeRange?.startTime == null || timeRange?.endTime == null);
+      void refresh({ isLive });
       setIsLive(isLive);
     }
   }, [
@@ -509,4 +545,4 @@ export const useLogs = ({
     paginationInfo,
     stopLogStream: closeAllWebsockets,
   };
-};
+};

+ 1 - 1
dashboard/src/shared/api.tsx

@@ -282,7 +282,6 @@ const getLogsWithinTimeRange = baseApi<
 
 const appLogs = baseApi<
   {
-    app_id: number;
     service_name: string;
     deployment_target_id: string;
     limit: number;
@@ -291,6 +290,7 @@ const appLogs = baseApi<
     search_param?: string;
     direction?: string;
     app_revision_id?: string;
+    job_run_name?: string;
   },
   {
     project_id: number;

+ 3 - 2
internal/kubernetes/agent.go

@@ -1781,6 +1781,7 @@ func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selecto
 }
 
 func (a *Agent) StreamPorterAgentLokiLog(
+	ctx context.Context,
 	labels []string,
 	startTime string,
 	searchParam string,
@@ -1830,7 +1831,7 @@ func (a *Agent) StreamPorterAgentLokiLog(
 
 			defer wg.Done()
 
-			podList, err := a.Clientset.CoreV1().Pods("porter-agent-system").List(context.Background(), metav1.ListOptions{
+			podList, err := a.Clientset.CoreV1().Pods("porter-agent-system").List(ctx, metav1.ListOptions{
 				LabelSelector: "control-plane=controller-manager",
 			})
 			if err != nil {
@@ -1893,7 +1894,7 @@ func (a *Agent) StreamPorterAgentLokiLog(
 				return
 			}
 
-			err = exec.Stream(remotecommand.StreamOptions{
+			err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
 				Stdin:  nil,
 				Stdout: rw,
 				Stderr: rw,

+ 9 - 5
internal/kubernetes/porter_agent/v2/agent_server.go

@@ -316,10 +316,14 @@ func GetHistoricalLogs(
 
 // Logs returns logs from the porter agent matching the provided labels and other query parameters
 func Logs(
+	ctx context.Context,
 	clientset kubernetes.Interface,
 	service *v1.Service,
 	req *types.LogRequest,
 ) (*types.GetLogResponse, error) {
+	ctx, span := telemetry.NewSpan(ctx, "agent-get-logs")
+	defer span.End()
+
 	vals := make(map[string]string)
 
 	if req.Limit != 0 {
@@ -329,7 +333,7 @@ func Logs(
 	if req.StartRange != nil {
 		startVal, err := req.StartRange.MarshalText()
 		if err != nil {
-			return nil, err
+			return nil, telemetry.Error(ctx, span, err, "unable to marshal start range")
 		}
 
 		vals["start_range"] = string(startVal)
@@ -338,7 +342,7 @@ func Logs(
 	if req.EndRange != nil {
 		endVal, err := req.EndRange.MarshalText()
 		if err != nil {
-			return nil, err
+			return nil, telemetry.Error(ctx, span, err, "unable to marshal end range")
 		}
 
 		vals["end_range"] = string(endVal)
@@ -368,16 +372,16 @@ func Logs(
 		vals,
 	)
 
-	rawQuery, err := resp.DoRaw(context.Background())
+	rawQuery, err := resp.DoRaw(ctx)
 	if err != nil {
-		return nil, err
+		return nil, telemetry.Error(ctx, span, err, "unable to get raw response")
 	}
 
 	logsResp := &types.GetLogResponse{}
 
 	err = json.Unmarshal(rawQuery, logsResp)
 	if err != nil {
-		return nil, err
+		return nil, telemetry.Error(ctx, span, err, "unable to unmarshal logs response")
 	}
 
 	return logsResp, nil

+ 57 - 0
internal/porter_app/logs.go

@@ -0,0 +1,57 @@
+package porter_app
+
+import (
+	"time"
+
+	"github.com/porter-dev/porter/api/types"
+)
+
+// StructuredLog represents a single log line with all necessary fields required by Porter API clients
+type StructuredLog struct {
+	Timestamp          time.Time `json:"timestamp"`
+	Line               string    `json:"line"`
+	OutputStream       string    `json:"output_stream"`
+	ServiceName        string    `json:"service_name"`
+	AppRevisionID      string    `json:"app_revision_id"`
+	DeploymentTargetID string    `json:"deployment_target_id"`
+	AppInstanceID      string    `json:"app_instance_id"`
+	JobName            string    `json:"job_name,omitempty"`
+	JobRunID           string    `json:"job_run_id,omitempty"`
+}
+
+const (
+	lokiLabel_PorterAppName       = "porter_run_app_name"
+	lokiLabel_PorterAppID         = "porter_run_app_id"
+	lokiLabel_PorterServiceName   = "porter_run_service_name"
+	lokiLabel_PorterAppRevisionID = "porter_run_app_revision_id"
+	lokiLabel_DeploymentTargetId  = "porter_run_deployment_target_id"
+	lokiLabel_AppInstanceID       = "porter_run_app_instance_id"
+	lokiLabel_JobRunName          = "job_name"
+	lokiLabel_ControllerUID       = "controller_uid"
+)
+
+// AgentLogToStructuredLog converts a set of raw logs from the agent to structured logs
+func AgentLogToStructuredLog(rawLogs []types.LogLine) []StructuredLog {
+	var logs []StructuredLog
+
+	for _, log := range rawLogs {
+		structuredLog := StructuredLog{
+			Line:               log.Line,
+			OutputStream:       log.Metadata.OutputStream,
+			ServiceName:        log.Metadata.RawLabels[lokiLabel_PorterServiceName],
+			AppRevisionID:      log.Metadata.RawLabels[lokiLabel_PorterAppRevisionID],
+			DeploymentTargetID: log.Metadata.RawLabels[lokiLabel_DeploymentTargetId],
+			JobName:            log.Metadata.RawLabels[lokiLabel_JobRunName],
+			JobRunID:           log.Metadata.RawLabels[lokiLabel_ControllerUID],
+			AppInstanceID:      log.Metadata.RawLabels[lokiLabel_AppInstanceID],
+		}
+
+		if log.Timestamp != nil {
+			structuredLog.Timestamp = *log.Timestamp
+		}
+
+		logs = append(logs, structuredLog)
+	}
+
+	return logs
+}