فهرست منبع

POR-2379 tail logs for a given app/service from the cli (#4523)

ianedwards 2 سال پیش
والد
کامیت
63987fec40
5فایلهای تغییر یافته به همراه271 افزوده شده و 4 حذف شده
  1. 61 2
      api/client/api.go
  2. 61 0
      api/client/porter_app.go
  3. 9 2
      api/server/handlers/porter_app/stream_logs.go
  4. 43 0
      cli/cmd/commands/app.go
  5. 97 0
      cli/cmd/v2/app_logs.go

+ 61 - 2
api/client/api.go

@@ -10,10 +10,12 @@ import (
 	"net/http"
 	"net/url"
 	"os"
+	"reflect"
 	"strings"
 	"time"
 
 	"github.com/gorilla/schema"
+	"github.com/gorilla/websocket"
 	"github.com/porter-dev/porter/api/types"
 )
 
@@ -80,6 +82,65 @@ func NewClientWithConfig(ctx context.Context, input NewClientInput) (Client, err
 // ErrNoAuthCredential returns an error when no auth credentials have been provided such as cookies or tokens
 var ErrNoAuthCredential = errors.New("unable to create an API session with cookie nor token")
 
+func (c *Client) websocketDial(relPath string, data interface{}) (*websocket.Conn, error) {
+	var conn *websocket.Conn
+
+	var header http.Header
+	if c.Token != "" {
+		header = http.Header{
+			"Authorization": []string{fmt.Sprintf("Bearer %s", c.Token)},
+		}
+	} else if cookie, _ := c.getCookie(); cookie != nil {
+		c.Cookie = cookie
+		header = http.Header{
+			"Cookie": []string{fmt.Sprintf("%s=%s", c.Cookie.Name, c.Cookie.Value)},
+		}
+	}
+
+	encoder := schema.NewEncoder()
+
+	// handle encoding of timestamps
+	encoder.RegisterEncoder(time.Time{}, func(t reflect.Value) string {
+		return t.Interface().(time.Time).Format(time.RFC3339)
+	})
+
+	vals := map[string][]string{}
+	err := encoder.Encode(data, vals)
+	if err != nil {
+		return conn, fmt.Errorf("error encoding data: %w", err)
+	}
+
+	urlVals := url.Values(vals)
+	encodedURLVals := urlVals.Encode()
+
+	baseURL, err := url.Parse(c.BaseURL)
+	if err != nil {
+		return conn, fmt.Errorf("error parsing base url: %w", err)
+	}
+
+	var wsScheme string
+	switch baseURL.Scheme {
+	case "http":
+		wsScheme = "ws"
+	case "https":
+		wsScheme = "wss"
+	}
+
+	u := url.URL{
+		Scheme:   wsScheme,
+		Host:     baseURL.Host,
+		Path:     fmt.Sprintf("/api%s", relPath),
+		RawQuery: encodedURLVals,
+	}
+
+	conn, _, err = websocket.DefaultDialer.Dial(u.String(), header)
+	if err != nil {
+		return nil, fmt.Errorf("error dialing websocket: %w", err)
+	}
+
+	return conn, err
+}
+
 // getRequestConfig defines configuration for a GET request
 type getRequestConfig struct {
 	retryCount uint
@@ -364,7 +425,6 @@ func (c *Client) getCookie() (*http.Cookie, error) {
 	cookie := &CookieStorage{}
 
 	err = json.Unmarshal(data, cookie)
-
 	if err != nil {
 		return nil, err
 	}
@@ -403,7 +463,6 @@ func GetProjectIDFromToken(token string) (uint, bool, error) {
 	res := &TokenProjectID{}
 
 	err = json.Unmarshal(decodedBytes, res)
-
 	if err != nil {
 		return 0, false, fmt.Errorf("could not get token project id: %v", err)
 	}

+ 61 - 0
api/client/porter_app.go

@@ -5,7 +5,9 @@ import (
 	"encoding/base64"
 	"encoding/json"
 	"fmt"
+	"time"
 
+	"github.com/gorilla/websocket"
 	"github.com/porter-dev/porter/api/server/handlers/porter_app"
 	"github.com/porter-dev/porter/internal/models"
 	appInternal "github.com/porter-dev/porter/internal/porter_app"
@@ -262,6 +264,65 @@ func (c *Client) UpdateApp(
 	return resp, err
 }
 
+// AppLogsInput is the input struct to AppLogs and AppLogsStream
+type AppLogsInput struct {
+	ProjectID            uint
+	ClusterID            uint
+	AppName              string
+	ServiceName          string
+	DeploymentTargetName string
+	StartRange           time.Time
+}
+
+// AppLogs gets logs for an app
+func (c *Client) AppLogs(
+	ctx context.Context,
+	inp AppLogsInput,
+) (*porter_app.AppLogsResponse, error) {
+	resp := &porter_app.AppLogsResponse{}
+
+	req := &porter_app.AppLogsRequest{
+		ServiceName:          inp.ServiceName,
+		DeploymentTargetName: inp.DeploymentTargetName,
+		StartRange:           inp.StartRange,
+	}
+
+	err := c.getRequest(
+		fmt.Sprintf(
+			"/projects/%d/clusters/%d/apps/%s/logs",
+			inp.ProjectID, inp.ClusterID, inp.AppName,
+		),
+		req,
+		resp,
+	)
+
+	return resp, err
+}
+
+// AppLogsStream streams logs for an app
+func (c *Client) AppLogsStream(
+	ctx context.Context,
+	inp AppLogsInput,
+) (*websocket.Conn, error) {
+	req := &porter_app.AppLogsRequest{
+		ServiceName:          inp.ServiceName,
+		DeploymentTargetName: inp.DeploymentTargetName,
+	}
+
+	conn, err := c.websocketDial(
+		fmt.Sprintf(
+			"/projects/%d/clusters/%d/apps/%s/logs/loki",
+			inp.ProjectID, inp.ClusterID, inp.AppName,
+		),
+		req,
+	)
+	if err != nil {
+		return conn, err
+	}
+
+	return conn, nil
+}
+
 // DefaultDeploymentTarget returns the default deployment target for a given project and cluster
 func (c *Client) DefaultDeploymentTarget(
 	ctx context.Context,

+ 9 - 2
api/server/handlers/porter_app/stream_logs.go

@@ -117,6 +117,13 @@ func (c *StreamLogsLokiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
 	}
 	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "start-time", Value: string(startTime)})
 
+	app, err := c.Repo().PorterApp().ReadPorterAppByName(cluster.ID, appName)
+	if err != nil {
+		err := telemetry.Error(ctx, span, err, "error reading porter app by name")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
 	safeRW := r.Context().Value(types.RequestCtxWebsocketKey).(*websocket.WebsocketSafeReadWriter)
 
 	agent, err := c.GetAgent(r, cluster, "")
@@ -129,8 +136,8 @@ func (c *StreamLogsLokiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
 	labels := []string{
 		fmt.Sprintf("%s=%s", lokiLabel_Namespace, namespace),
 		fmt.Sprintf("%s=%s", lokiLabel_PorterAppName, appName),
-		fmt.Sprintf("%s=%s", lokiLabel_DeploymentTargetId, request.DeploymentTargetID),
-		fmt.Sprintf("%s=%s", lokiLabel_PorterAppID, fmt.Sprintf("%d", request.AppID)),
+		fmt.Sprintf("%s=%s", lokiLabel_DeploymentTargetId, deploymentTarget.ID),
+		fmt.Sprintf("%s=%s", lokiLabel_PorterAppID, fmt.Sprintf("%d", app.ID)),
 	}
 
 	if request.ServiceName != "all" {

+ 43 - 0
cli/cmd/commands/app.go

@@ -295,6 +295,19 @@ in that it only updates the app, but does not attempt to build a new image.`,
 	}
 	appCmd.AddCommand(appManifestsCmd)
 
+	// appLogsCmd represents the "porter app logs" subcommand
+	appLogsCmd := &cobra.Command{
+		Use:   "logs [application]",
+		Args:  cobra.MinimumNArgs(1),
+		Short: "Streams the latest logs for an application.",
+		RunE: func(cmd *cobra.Command, args []string) error {
+			return checkLoginAndRunWithConfig(cmd, cliConf, args, appLogs)
+		},
+	}
+	appLogsCmd.PersistentFlags().String("service", "", "the name of the service to get logs for")
+
+	appCmd.AddCommand(appLogsCmd)
+
 	return appCmd
 }
 
@@ -574,6 +587,36 @@ func appRollback(ctx context.Context, _ *types.GetAuthenticatedUserResponse, cli
 	return nil
 }
 
+func appLogs(ctx context.Context, _ *types.GetAuthenticatedUserResponse, client api.Client, cliConfig config.CLIConfig, _ config.FeatureFlags, cmd *cobra.Command, args []string) error {
+	appName := args[0]
+	if appName == "" {
+		return fmt.Errorf("app name must be specified")
+	}
+
+	serviceFlag, err := cmd.Flags().GetString("service")
+	if err != nil {
+		return fmt.Errorf("error getting service flag: %w", err)
+	}
+
+	serviceName := v2.ServiceName_AllServices
+	if serviceFlag != "" {
+		serviceName = serviceFlag
+	}
+
+	err = v2.AppLogs(ctx, v2.AppLogsInput{
+		CLIConfig:            cliConfig,
+		Client:               client,
+		AppName:              appName,
+		DeploymentTargetName: deploymentTargetName,
+		ServiceName:          serviceName,
+	})
+	if err != nil {
+		return fmt.Errorf("failed to get app logs: %w", err)
+	}
+
+	return nil
+}
+
 func appRun(ctx context.Context, _ *types.GetAuthenticatedUserResponse, client api.Client, cliConfig config.CLIConfig, ff config.FeatureFlags, _ *cobra.Command, args []string) error {
 	if jobName != "" {
 		if !ff.ValidateApplyV2Enabled {

+ 97 - 0
cli/cmd/v2/app_logs.go

@@ -0,0 +1,97 @@
+package v2
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+
+	"github.com/fatih/color"
+	api "github.com/porter-dev/porter/api/client"
+	"github.com/porter-dev/porter/cli/cmd/config"
+)
+
+// AppLogsInput is the input for the AppLogs function
+type AppLogsInput struct {
+	// CLIConfig is the CLI configuration
+	CLIConfig config.CLIConfig
+	// Client is the Porter API client
+	Client api.Client
+	// DeploymentTargetName is the name of deployment target where the app is deployed
+	DeploymentTargetName string
+	// AppName is the name of the app to get logs for
+	AppName string
+	// ServiceName is an optional service name filter
+	ServiceName string
+}
+
+// ServiceName_AllServices is a special value for ServiceName that indicates all services should be included
+const ServiceName_AllServices = "all"
+
+// AppLogs gets logs for an app
+func AppLogs(ctx context.Context, inp AppLogsInput) error {
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	termChan := make(chan os.Signal, 1)
+	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
+
+	color.New(color.FgGreen).Printf("Streaming logs for app %s...\n\n", inp.AppName) // nolint:errcheck,gosec
+
+	conn, err := inp.Client.AppLogsStream(ctx, api.AppLogsInput{
+		ProjectID:            inp.CLIConfig.Project,
+		ClusterID:            inp.CLIConfig.Cluster,
+		AppName:              inp.AppName,
+		DeploymentTargetName: inp.DeploymentTargetName,
+		ServiceName:          inp.ServiceName,
+	})
+	if err != nil {
+		return fmt.Errorf("error connecting to app logs stream: %w", err)
+	}
+	defer conn.Close() // nolint:errcheck
+
+	go func() {
+		select {
+		case <-termChan:
+			color.New(color.FgYellow).Println("Shutdown signal received, canceling processes") // nolint:errcheck,gosec
+
+			// ReadMessage will block until the next message is received, so we need to set a read deadline
+			conn.SetReadDeadline(time.Now()) // nolint:errcheck,gosec
+
+			cancel()
+		case <-ctx.Done():
+		}
+	}()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+			_, message, _ := conn.ReadMessage()
+			if err != nil {
+				return err
+			}
+			if len(message) == 0 {
+				return nil
+			}
+
+			var line struct {
+				Line string `json:"line"`
+			}
+
+			err = json.Unmarshal(message, &line)
+			if err != nil {
+				return err
+			}
+
+			message = append([]byte(line.Line), '\n')
+			if _, err = os.Stdout.Write(message); err != nil {
+				return nil
+			}
+		}
+	}
+}