Просмотр исходного кода

feat: add ability to wait for a one-off job to complete after execution (#4138)

jose-fully-ported 2 лет назад
Родитель
Сommit
8a2522e0ee

+ 54 - 3
api/client/porter_app.go

@@ -801,10 +801,10 @@ func (c *Client) RunAppJob(
 	projectID, clusterID uint,
 	appName string, jobName string,
 	deploymentTargetID string,
-) (*porter_app.AppRunResponse, error) {
-	resp := &porter_app.AppRunResponse{}
+) (*porter_app.RunAppJobResponse, error) {
+	resp := &porter_app.RunAppJobResponse{}
 
-	req := &porter_app.AppRunRequest{
+	req := &porter_app.RunAppJobRequest{
 		ServiceName:        jobName,
 		DeploymentTargetID: deploymentTargetID,
 	}
@@ -821,3 +821,54 @@ func (c *Client) RunAppJob(
 
 	return resp, err
 }
+
+// RunAppJobStatusInput contains all the information necessary to check the status of a job
+type RunAppJobStatusInput struct {
+	// AppName is the name of the app associated with the job
+	AppName string
+
+	// Cluster is the id of the cluster against which to retrieve a helm agent for
+	ClusterID uint
+
+	// DeploymentTargetID is the id of the deployment target the job was run against
+	DeploymentTargetID string
+
+	// DeploymentTargetNamespace is the namespace in which the job was deployed
+	DeploymentTargetNamespace string
+
+	// ServiceName is the name of the app service that was triggered
+	ServiceName string
+
+	// JobRunID is the UID returned from the /apps/{porter_app_name}/run endpoint
+	JobRunID string
+
+	// ProjectID is the project in which the cluster exists
+	ProjectID uint
+}
+
+// RunAppJobStatus gets the status for a job app run
+func (c *Client) RunAppJobStatus(
+	ctx context.Context,
+	input RunAppJobStatusInput,
+) (*porter_app.AppJobRunStatusResponse, error) {
+	resp := &porter_app.AppJobRunStatusResponse{}
+
+	req := &porter_app.AppJobRunStatusRequest{
+		DeploymentTargetID: input.DeploymentTargetID,
+		JobRunID:           input.JobRunID,
+		Namespace:          input.DeploymentTargetNamespace,
+		ServiceName:        input.ServiceName,
+	}
+
+	err := c.getRequest(
+		fmt.Sprintf(
+			"/projects/%d/clusters/%d/apps/%s/run-status",
+			input.ProjectID, input.ClusterID,
+			input.AppName,
+		),
+		req,
+		resp,
+	)
+
+	return resp, err
+}

+ 14 - 14
api/server/handlers/porter_app/app_run.go → api/server/handlers/porter_app/run_app_job.go

@@ -20,38 +20,38 @@ import (
 	"github.com/porter-dev/porter/internal/models"
 )
 
-// AppRunHandler handles requests to the /apps/{porter_app_name}/run endpoint
-type AppRunHandler struct {
+// RunAppJobHandler handles requests to the /apps/{porter_app_name}/run endpoint
+type RunAppJobHandler struct {
 	handlers.PorterHandlerReadWriter
 	authz.KubernetesAgentGetter
 }
 
-// NewAppRunHandler returns a new AppRunHandler
-func NewAppRunHandler(
+// NewRunAppJobHandler returns a new AppJobRunHandler
+func NewRunAppJobHandler(
 	config *config.Config,
 	decoderValidator shared.RequestDecoderValidator,
 	writer shared.ResultWriter,
-) *AppRunHandler {
-	return &AppRunHandler{
+) *RunAppJobHandler {
+	return &RunAppJobHandler{
 		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
 		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
 	}
 }
 
-// AppRunRequest is the request object for the /apps/{porter_app_name}/run endpoint
-type AppRunRequest struct {
+// RunAppJobRequest is the request object for the /apps/{porter_app_name}/run endpoint
+type RunAppJobRequest struct {
 	ServiceName        string `json:"service_name"`
 	DeploymentTargetID string `json:"deployment_target_id"`
 }
 
-// AppRunResponse is the response object for the /apps/{porter_app_name}/run endpoint
-type AppRunResponse struct {
+// RunAppJobResponse is the response object for the /apps/{porter_app_name}/run endpoint
+type RunAppJobResponse struct {
 	JobRunID string `json:"job_run_id"`
 }
 
 // ServeHTTP runs a one-off command in the same environment as the provided service, app and deployment target
-func (c *AppRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	ctx, span := telemetry.NewSpan(r.Context(), "serve-app-run")
+func (c *RunAppJobHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	ctx, span := telemetry.NewSpan(r.Context(), "serve-app-job-run")
 	defer span.End()
 
 	project, _ := ctx.Value(types.ProjectScope).(*models.Project)
@@ -65,7 +65,7 @@ func (c *AppRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "app-name", Value: appName})
 
-	request := &AppRunRequest{}
+	request := &RunAppJobRequest{}
 	if ok := c.DecodeAndValidate(w, r, request); !ok {
 		err := telemetry.Error(ctx, span, nil, "error decoding request")
 		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
@@ -109,7 +109,7 @@ func (c *AppRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	response := AppRunResponse{
+	response := RunAppJobResponse{
 		JobRunID: serviceResp.Msg.JobRunId,
 	}
 

+ 224 - 0
api/server/handlers/porter_app/run_app_job_status.go

@@ -0,0 +1,224 @@
+package porter_app
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"strings"
+
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/shared/requestutils"
+
+	"github.com/porter-dev/porter/internal/kubernetes"
+	"github.com/porter-dev/porter/internal/porter_app"
+	"github.com/porter-dev/porter/internal/telemetry"
+
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/models"
+)
+
+// AppJobRunStatusHandler handles requests to the /apps/{porter_app_name}/run-status endpoint
+type AppJobRunStatusHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+// NewAppJobRunStatusHandler returns a new AppRunJobStatusHandler
+func NewAppJobRunStatusHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) *AppJobRunStatusHandler {
+	return &AppJobRunStatusHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+// AppJobRunStatusRequest is the request object for the /apps/{porter_app_name}/run-status endpoint
+type AppJobRunStatusRequest struct {
+	// DeploymentTargetID is the id of the deployment target the job was run against
+	DeploymentTargetID string `json:"deployment_target_id"`
+
+	// JobRunID is the UID returned from the /apps/{porter_app_name}/run endpoint
+	JobRunID string `json:"job_id"`
+
+	// ServiceName is the name of the app service that was triggered
+	ServiceName string `json:"service_name"`
+
+	// Namespace is the namespace in which the job was deployed
+	Namespace string `json:"namespace"`
+}
+
+// AppJobRunStatusResponse is the response object for the /apps/{porter_app_name}/run-status endpoint
+type AppJobRunStatusResponse struct {
+	Status porter_app.InstanceStatusDescriptor `json:"status"`
+}
+
+// ServeHTTP gets the status of a one-off command in the same environment as the provided service, app and deployment target
+func (c *AppJobRunStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	ctx, span := telemetry.NewSpan(r.Context(), "serve-app-job-run-status")
+	defer span.End()
+
+	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
+
+	appName, reqErr := requestutils.GetURLParamString(r, types.URLParamPorterAppName)
+	if reqErr != nil {
+		e := telemetry.Error(ctx, span, reqErr, "error parsing app name from url")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(e, http.StatusBadRequest))
+		return
+	}
+
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "app-name", Value: appName})
+
+	request := &AppJobRunStatusRequest{}
+	if ok := c.DecodeAndValidate(w, r, request); !ok {
+		err := telemetry.Error(ctx, span, nil, "error decoding request")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+
+	if request.JobRunID == "" {
+		err := telemetry.Error(ctx, span, nil, "job id is required")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "job-run-id", Value: request.JobRunID})
+
+	if request.Namespace == "" {
+		err := telemetry.Error(ctx, span, nil, "namespace is required")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "namespace", Value: request.Namespace})
+
+	if request.ServiceName == "" {
+		err := telemetry.Error(ctx, span, nil, "service name is required")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "service-name", Value: request.ServiceName})
+
+	if request.DeploymentTargetID == "" {
+		err := telemetry.Error(ctx, span, nil, "deployment target id is required")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "deployment-target-id", Value: request.DeploymentTargetID})
+
+	agent, err := c.GetAgent(r, cluster, "")
+	if err != nil {
+		err := telemetry.Error(ctx, span, err, "unable to get kubernetes agent")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
+	if agent == nil {
+		err := telemetry.Error(ctx, span, nil, "no kubernetes agent returned")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
+	status, err := c.getJobStatus(ctx, getJobStatusInput{
+		AppName:            appName,
+		DeploymentTargetID: request.DeploymentTargetID,
+		ClusterK8sAgent:    *agent,
+		JobRunID:           request.JobRunID,
+		Namespace:          request.Namespace,
+		ServiceName:        request.ServiceName,
+	})
+	if err != nil {
+		err := telemetry.Error(ctx, span, err, "error getting job status")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+
+	response := AppJobRunStatusResponse{
+		Status: status,
+	}
+
+	c.WriteResult(w, r, response)
+}
+
+type getJobStatusInput struct {
+	// AppName is the name of the app associated with the job
+	AppName string
+
+	// DeploymentTargetID is the id of the deployment target the job was run against
+	DeploymentTargetID string
+
+	// ClusterK8sAgent is a kubernetes agent
+	ClusterK8sAgent kubernetes.Agent
+
+	// JobRunID is the UID returned from the /apps/{porter_app_name}/run endpoint
+	JobRunID string
+
+	// Namespace is the namespace in which the job was deployed
+	Namespace string
+
+	// ServiceName is the name of the app service that was triggered
+	ServiceName string
+}
+
+func (c *AppJobRunStatusHandler) getJobStatus(ctx context.Context, input getJobStatusInput) (porter_app.InstanceStatusDescriptor, error) {
+	ctx, span := telemetry.NewSpan(ctx, "get-job-status")
+	defer span.End()
+
+	if input.AppName == "" {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, nil, "missing app name in input")
+	}
+	if input.DeploymentTargetID == "" {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, nil, "missing deployment target id in input")
+	}
+	if input.JobRunID == "" {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, nil, "missing job run id in input")
+	}
+	if input.Namespace == "" {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, nil, "missing namespace in input")
+	}
+	if input.ServiceName == "" {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, nil, "missing service name in input")
+	}
+
+	selectors := []string{
+		fmt.Sprintf("batch.kubernetes.io/controller-uid=%s", input.JobRunID),
+		fmt.Sprintf("porter.run/app-name=%s", input.AppName),
+		fmt.Sprintf("porter.run/deployment-target-id=%s", input.DeploymentTargetID),
+		fmt.Sprintf("porter.run/service-name=%s", input.ServiceName),
+	}
+	labelSelector := strings.Join(selectors, ",")
+
+	podsList, err := input.ClusterK8sAgent.GetPodsByLabel(labelSelector, input.Namespace)
+	if err != nil {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, err, "error getting jobs from cluster")
+	}
+
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "pod-count", Value: len(podsList.Items)})
+
+	if len(podsList.Items) == 0 {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, err, "no matching jobs found for specified job id")
+	}
+
+	if len(podsList.Items) != 1 {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, err, "too many pods found for specified job id")
+	}
+
+	status, err := porter_app.InstanceStatusFromPod(ctx, porter_app.InstanceStatusFromPodInput{
+		Pod:         podsList.Items[0],
+		AppName:     input.AppName,
+		ServiceName: input.ServiceName,
+	})
+	if err != nil {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, err, "unable to fetch instance status from job pod")
+	}
+
+	if status.Status == porter_app.InstanceStatusDescriptor_Unknown {
+		return porter_app.InstanceStatusDescriptor_Unknown, telemetry.Error(ctx, span, nil, "unknown status for job")
+	}
+
+	return status.Status, nil
+}

+ 34 - 5
api/server/router/porter_app.go

@@ -1676,8 +1676,8 @@ func getPorterAppRoutes(
 		Router:   r,
 	})
 
-	// POST /api/projects/{project_id}/clusters/{cluster_id}/apps/{app_name}/run -> porter_app.NewAppRunHandler
-	appRunEndpoint := factory.NewAPIEndpoint(
+	// POST /api/projects/{project_id}/clusters/{cluster_id}/apps/{app_name}/run -> porter_app.NewRunAppJobHandler
+	runAppJobEndpoint := factory.NewAPIEndpoint(
 		&types.APIRequestMetadata{
 			Verb:   types.APIVerbUpdate,
 			Method: types.HTTPVerbPost,
@@ -1693,15 +1693,44 @@ func getPorterAppRoutes(
 		},
 	)
 
-	appRunHandler := porter_app.NewAppRunHandler(
+	runAppJobHandler := porter_app.NewRunAppJobHandler(
 		config,
 		factory.GetDecoderValidator(),
 		factory.GetResultWriter(),
 	)
 
 	routes = append(routes, &router.Route{
-		Endpoint: appRunEndpoint,
-		Handler:  appRunHandler,
+		Endpoint: runAppJobEndpoint,
+		Handler:  runAppJobHandler,
+		Router:   r,
+	})
+
+	// GET /api/projects/{project_id}/clusters/{cluster_id}/apps/{app_name}/run-status -> porter_app.NewAppJobRunStatusHandler
+	appJobRunStatusEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbGet,
+			Method: types.HTTPVerbGet,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: fmt.Sprintf("%s/{%s}/run-status", relPathV2, types.URLParamPorterAppName),
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.ClusterScope,
+			},
+		},
+	)
+
+	appJobRunStatusHandler := porter_app.NewAppJobRunStatusHandler(
+		config,
+		factory.GetDecoderValidator(),
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &router.Route{
+		Endpoint: appJobRunStatusEndpoint,
+		Handler:  appJobRunStatusHandler,
 		Router:   r,
 	})
 

+ 25 - 17
cli/cmd/commands/app.go

@@ -35,17 +35,17 @@ import (
 )
 
 var (
-	appNamespace                string
-	appVerbose                  bool
-	appExistingPod              bool
-	appInteractive              bool
-	appContainerName            string
-	appTag                      string
-	deploymentTarget            string
-	appCpuMilli                 int
-	appMemoryMi                 int
-	jobName                     string
-	waitForSuccessfulDeployment bool
+	appContainerName string
+	appCpuMilli      int
+	appExistingPod   bool
+	appInteractive   bool
+	appMemoryMi      int
+	appNamespace     string
+	appTag           string
+	appVerbose       bool
+	appWait          bool
+	deploymentTarget string
+	jobName          string
 )
 
 const (
@@ -112,7 +112,7 @@ func registerCommand_App(cliConf config.CLIConfig) *cobra.Command {
 	}
 
 	appUpdateTagCmd.PersistentFlags().BoolVarP(
-		&waitForSuccessfulDeployment,
+		&appWait,
 		"wait",
 		"w",
 		false,
@@ -177,6 +177,13 @@ func appRunFlags(appRunCmd *cobra.Command) {
 		"whether to run in interactive mode (default false)",
 	)
 
+	appRunCmd.PersistentFlags().BoolVar(
+		&appWait,
+		"wait",
+		false,
+		"whether to wait for the command to complete before exiting for non-interactive mode (default false)",
+	)
+
 	appRunCmd.PersistentFlags().IntVarP(
 		&appCpuMilli,
 		"cpu",
@@ -267,10 +274,11 @@ func appRun(ctx context.Context, _ *types.GetAuthenticatedUserResponse, client a
 		}
 
 		return v2.RunAppJob(ctx, v2.RunAppJobInput{
-			CLIConfig: cliConfig,
-			Client:    client,
-			AppName:   args[0],
-			JobName:   jobName,
+			CLIConfig:   cliConfig,
+			Client:      client,
+			AppName:     args[0],
+			JobName:     jobName,
+			WaitForExit: appWait,
 		})
 	}
 
@@ -1265,7 +1273,7 @@ func appUpdateTag(ctx context.Context, user *types.GetAuthenticatedUserResponse,
 			DeploymentTargetName:        deploymentTarget,
 			Tag:                         appTag,
 			Client:                      client,
-			WaitForSuccessfulDeployment: waitForSuccessfulDeployment,
+			WaitForSuccessfulDeployment: appWait,
 		})
 		if err != nil {
 			return fmt.Errorf("error updating tag: %w", err)

+ 2 - 2
cli/cmd/commands/apply.go

@@ -108,7 +108,7 @@ applying a configuration:
 	applyCmd.PersistentFlags().StringVarP(&porterYAML, "file", "f", "", "path to porter.yaml")
 	applyCmd.PersistentFlags().BoolVarP(&previewApply, "preview", "p", false, "apply as preview environment based on current git branch")
 	applyCmd.PersistentFlags().BoolVarP(
-		&waitForSuccessfulDeployment,
+		&appWait,
 		"wait",
 		"w",
 		false,
@@ -148,7 +148,7 @@ func apply(ctx context.Context, _ *types.GetAuthenticatedUserResponse, client ap
 			PorterYamlPath:              porterYAML,
 			AppName:                     appName,
 			PreviewApply:                previewApply,
-			WaitForSuccessfulDeployment: waitForSuccessfulDeployment,
+			WaitForSuccessfulDeployment: appWait,
 		}
 		err := v2.Apply(ctx, inp)
 		if err != nil {

+ 1 - 1
cli/cmd/commands/kubectl.go

@@ -70,7 +70,7 @@ func runKubectl(ctx context.Context, _ *types.GetAuthenticatedUserResponse, clie
 
 	err = execCommand.Run()
 	if err != nil {
-		return fmt.Errorf("error running helm: %w", err)
+		return fmt.Errorf("error running kubectl: %w", err)
 	}
 
 	return nil

+ 125 - 0
cli/cmd/v2/run_app_job.go

@@ -0,0 +1,125 @@
+package v2
+
+import (
+	"context"
+	"encoding/base64"
+	"fmt"
+	"time"
+
+	"github.com/fatih/color"
+
+	"github.com/porter-dev/api-contracts/generated/go/helpers"
+	porterv1 "github.com/porter-dev/api-contracts/generated/go/porter/v1"
+	api "github.com/porter-dev/porter/api/client"
+	"github.com/porter-dev/porter/cli/cmd/config"
+	porter_app_internal "github.com/porter-dev/porter/internal/porter_app"
+)
+
+// WaitIntervalInSeconds is the amount of time to wait when polling for job status
+const WaitIntervalInSeconds = 5 * time.Second
+
+// RunAppJobInput is the input for the RunAppJob function
+type RunAppJobInput struct {
+	// CLIConfig is the CLI configuration
+	CLIConfig config.CLIConfig
+	// Client is the Porter API client
+	Client api.Client
+
+	AppName string
+	JobName string
+
+	// WaitForExit
+	WaitForExit bool
+}
+
+// RunAppJob triggers a job run for an app and returns without waiting for the job to complete
+func RunAppJob(ctx context.Context, inp RunAppJobInput) error {
+	targetResp, err := inp.Client.DefaultDeploymentTarget(ctx, inp.CLIConfig.Project, inp.CLIConfig.Cluster)
+	if err != nil {
+		return fmt.Errorf("error calling default deployment target endpoint: %w", err)
+	}
+
+	currentAppRevisionResp, err := inp.Client.CurrentAppRevision(ctx, inp.CLIConfig.Project, inp.CLIConfig.Cluster, inp.AppName, targetResp.DeploymentTargetID) // nolint:staticcheck
+	if err != nil {
+		return fmt.Errorf("error getting current app revision: %w", err)
+	}
+
+	resp, err := inp.Client.RunAppJob(ctx, inp.CLIConfig.Project, inp.CLIConfig.Cluster, inp.AppName, inp.JobName, targetResp.DeploymentTargetID) // nolint:staticcheck
+	if err != nil {
+		return fmt.Errorf("unable to run job: %w", err)
+	}
+
+	triggeredBackgroundColor := color.FgGreen
+	if inp.WaitForExit {
+		triggeredBackgroundColor = color.FgBlue
+	}
+	color.New(triggeredBackgroundColor).Println("Triggered job with id:", resp.JobRunID) // nolint:errcheck,gosec
+
+	if !inp.WaitForExit {
+		return nil
+	}
+
+	decoded, err := base64.StdEncoding.DecodeString(currentAppRevisionResp.AppRevision.B64AppProto)
+	if err != nil {
+		return fmt.Errorf("unable to decode base64 app for revision: %w", err)
+	}
+
+	app := &porterv1.PorterApp{}
+	err = helpers.UnmarshalContractObject(decoded, app)
+	if err != nil {
+		return fmt.Errorf("unable to unmarshal app for revision: %w", err)
+	}
+
+	timeoutSeconds := 1800 * time.Second
+	for _, service := range app.ServiceList {
+		if inp.JobName != service.Name {
+			continue
+		}
+		if service.GetJobConfig() == nil {
+			return fmt.Errorf("error getting job timeout")
+		}
+
+		timeoutSeconds = time.Duration(service.GetJobConfig().TimeoutSeconds) * time.Second
+	}
+
+	deadline := time.Now().Add(timeoutSeconds)
+
+	color.New(color.FgBlue).Printf("Waiting %.f seconds for job to complete\n", timeoutSeconds.Seconds()) // nolint:errcheck,gosec
+	time.Sleep(2 * time.Second)
+
+	input := api.RunAppJobStatusInput{
+		AppName:                   inp.AppName,
+		ClusterID:                 inp.CLIConfig.Cluster,
+		DeploymentTargetID:        targetResp.DeploymentTargetID, // nolint:staticcheck
+		DeploymentTargetNamespace: targetResp.Namespace,
+		ServiceName:               inp.JobName,
+		JobRunID:                  resp.JobRunID,
+		ProjectID:                 inp.CLIConfig.Project,
+	}
+
+	for time.Now().Before(deadline) {
+		statusResp, err := inp.Client.RunAppJobStatus(ctx, input)
+		if err != nil {
+			return fmt.Errorf("unable to get job status: %w", err)
+		}
+
+		switch statusResp.Status {
+		case porter_app_internal.InstanceStatusDescriptor_Pending:
+			print(".")
+			time.Sleep(WaitIntervalInSeconds)
+		case porter_app_internal.InstanceStatusDescriptor_Running:
+			print(".")
+			time.Sleep(WaitIntervalInSeconds)
+		case porter_app_internal.InstanceStatusDescriptor_Succeeded:
+			print("\n")
+			color.New(color.FgGreen).Println("Job completed successfully") // nolint:errcheck,gosec
+			return nil
+		case porter_app_internal.InstanceStatusDescriptor_Failed:
+			return fmt.Errorf("job exited with non-zero exit code: %w", err)
+		case porter_app_internal.InstanceStatusDescriptor_Unknown:
+			return fmt.Errorf("unknown job status: %w", err)
+		}
+	}
+
+	return fmt.Errorf("timeout exceeded")
+}

+ 0 - 39
cli/cmd/v2/run_job.go

@@ -1,39 +0,0 @@
-package v2
-
-import (
-	"context"
-	"fmt"
-
-	"github.com/fatih/color"
-
-	api "github.com/porter-dev/porter/api/client"
-	"github.com/porter-dev/porter/cli/cmd/config"
-)
-
-// RunAppJobInput is the input for the RunAppJob function
-type RunAppJobInput struct {
-	// CLIConfig is the CLI configuration
-	CLIConfig config.CLIConfig
-	// Client is the Porter API client
-	Client api.Client
-
-	AppName string
-	JobName string
-}
-
-// RunAppJob triggers a job run for an app and returns without waiting for the job to complete
-func RunAppJob(ctx context.Context, inp RunAppJobInput) error {
-	targetResp, err := inp.Client.DefaultDeploymentTarget(ctx, inp.CLIConfig.Project, inp.CLIConfig.Cluster)
-	if err != nil {
-		return fmt.Errorf("error calling default deployment target endpoint: %w", err)
-	}
-
-	resp, err := inp.Client.RunAppJob(ctx, inp.CLIConfig.Project, inp.CLIConfig.Cluster, inp.AppName, inp.JobName, targetResp.DeploymentTargetID)
-	if err != nil {
-		return fmt.Errorf("unable to run job: %w", err)
-	}
-
-	color.New(color.FgGreen).Println("Triggered job with id:", resp.JobRunID) // nolint:errcheck,gosec
-
-	return nil
-}

+ 17 - 7
internal/porter_app/status.go

@@ -40,12 +40,16 @@ type RevisionStatus struct {
 type InstanceStatusDescriptor string
 
 const (
+	// InstanceStatusDescriptor_Failed means the instance has failed
+	InstanceStatusDescriptor_Failed InstanceStatusDescriptor = "FAILED"
 	// InstanceStatusDescriptor_Pending means the instance is pending
 	InstanceStatusDescriptor_Pending InstanceStatusDescriptor = "PENDING"
 	// InstanceStatusDescriptor_Running means the instance is running normally
 	InstanceStatusDescriptor_Running InstanceStatusDescriptor = "RUNNING"
-	// InstanceStatusDescriptor_Failed means the instance has failed
-	InstanceStatusDescriptor_Failed InstanceStatusDescriptor = "FAILED"
+	// InstanceStatusDescriptor_Succeeded means the instance is succeeded
+	InstanceStatusDescriptor_Succeeded InstanceStatusDescriptor = "SUCCEEDED"
+	// InstanceStatusDescriptor_Unknown means the instance is unknown
+	InstanceStatusDescriptor_Unknown InstanceStatusDescriptor = "UNKNOWN"
 )
 
 // CrashLoopBackOff is a string that describes the status of a pod that is in a crash loop backoff
@@ -153,7 +157,7 @@ func revisionStatusFromPods(ctx context.Context, inp revisionStatusFromPodsInput
 			instanceStatusList = []InstanceStatus{}
 		}
 
-		instanceStatus, err := instanceStatusFromPod(ctx, instanceStatusFromPodInput{
+		instanceStatus, err := InstanceStatusFromPod(ctx, InstanceStatusFromPodInput{
 			Pod:         pod,
 			AppName:     inp.AppName,
 			ServiceName: inp.ServiceName,
@@ -187,13 +191,15 @@ func revisionStatusFromPods(ctx context.Context, inp revisionStatusFromPodsInput
 	return revisionStatusList, nil
 }
 
-type instanceStatusFromPodInput struct {
+// InstanceStatusFromPodInput contains all the data necessary to get the status of the primary service container from a pod
+type InstanceStatusFromPodInput struct {
 	Pod         v1.Pod
 	AppName     string
 	ServiceName string
 }
 
-func instanceStatusFromPod(ctx context.Context, inp instanceStatusFromPodInput) (InstanceStatus, error) {
+// InstanceStatusFromPod gets the status of the primary service container from a pod
+func InstanceStatusFromPod(ctx context.Context, inp InstanceStatusFromPodInput) (InstanceStatus, error) {
 	ctx, span := telemetry.NewSpan(ctx, "instance-status-from-pod")
 	defer span.End()
 
@@ -220,12 +226,16 @@ func instanceStatusFromPod(ctx context.Context, inp instanceStatusFromPodInput)
 	instanceStatus.RestartCount = int(appContainerStatus.RestartCount)
 
 	switch inp.Pod.Status.Phase {
+	case v1.PodFailed:
+		instanceStatus.Status = InstanceStatusDescriptor_Failed
 	case v1.PodPending:
 		instanceStatus.Status = InstanceStatusDescriptor_Pending
 	case v1.PodRunning:
 		instanceStatus.Status = InstanceStatusDescriptor_Running
-	case v1.PodFailed:
-		instanceStatus.Status = InstanceStatusDescriptor_Failed
+	case v1.PodSucceeded:
+		instanceStatus.Status = InstanceStatusDescriptor_Succeeded
+	case v1.PodUnknown:
+		instanceStatus.Status = InstanceStatusDescriptor_Unknown
 	}
 
 	if appContainerStatus.State.Waiting != nil && appContainerStatus.State.Waiting.Reason == CrashLoopBackOff {