Переглянути джерело

feat: support run job commands via api (1/2) (#3403)

Feroze Mohideen 2 роки тому
батько
коміт
6fd9dfa1db

+ 110 - 0
api/server/handlers/porter_app/run_command.go

@@ -0,0 +1,110 @@
+package porter_app
+
+import (
+	"net/http"
+	"strings"
+
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/server/shared/requestutils"
+	"github.com/porter-dev/porter/api/types"
+	utils "github.com/porter-dev/porter/api/utils/porter_app"
+	"github.com/porter-dev/porter/internal/kubernetes/porter_app"
+	"github.com/porter-dev/porter/internal/models"
+	"github.com/porter-dev/porter/internal/telemetry"
+)
+
+// RunPorterAppCommandHandler runs a command on a porter app
+type RunPorterAppCommandHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+// NewRunPorterAppCommandHandler returns a new RunPorterAppCommandHandler
+func NewRunPorterAppCommandHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) *RunPorterAppCommandHandler {
+	return &RunPorterAppCommandHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+func (c *RunPorterAppCommandHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	ctx := r.Context()
+	cluster, _ := ctx.Value(types.ClusterScope).(*models.Cluster)
+
+	ctx, span := telemetry.NewSpan(r.Context(), "serve-run-porter-app-command")
+	defer span.End()
+
+	request := &types.RunPorterAppCommandRequest{}
+	if ok := c.DecodeAndValidate(w, r, request); !ok {
+		err := telemetry.Error(ctx, span, nil, "error decoding request")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+
+	appName, reqErr := requestutils.GetURLParamString(r, types.URLParamPorterAppName)
+	if reqErr != nil {
+		err := telemetry.Error(ctx, span, reqErr, "error getting app name from url")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+	namespace := utils.NamespaceFromPorterAppName(appName)
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "application-name", Value: appName})
+
+	app, err := c.Repo().PorterApp().ReadPorterAppByName(cluster.ID, appName)
+	if err != nil {
+		err = telemetry.Error(ctx, span, err, "error reading app from DB")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+	if app == nil {
+		err = telemetry.Error(ctx, span, nil, "app with name does not exist in project")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusForbidden))
+		return
+	}
+
+	k8sAgent, err := c.GetAgent(r, cluster, namespace)
+	if err != nil {
+		err = telemetry.Error(ctx, span, err, "error getting k8s agent")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
+	podList, err := k8sAgent.GetPodsByLabel(porter_app.LabelKey_PorterApplication, namespace)
+	if err != nil {
+		err = telemetry.Error(ctx, span, err, "error getting pods by label")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
+	if len(podList.Items) == 0 {
+		err = telemetry.Error(ctx, span, err, "no pods found to run command on")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
+	selectedPod := podList.Items[0]
+	execArgs := strings.Split(request.Command, " ")
+	if app.Builder != "" &&
+		(strings.Contains(app.Builder, "heroku") ||
+			strings.Contains(app.Builder, "paketo")) &&
+		execArgs[0] != "/cnb/lifecycle/launcher" &&
+		execArgs[0] != "launcher" {
+		// this is a buildpacks release using a heroku builder, so we prepend commands with launcher command
+		execArgs = append([]string{"/cnb/lifecycle/launcher"}, execArgs...)
+	}
+
+	err = k8sAgent.RunCommandOnPod(ctx, &selectedPod, execArgs)
+	if err != nil {
+		err = telemetry.Error(ctx, span, err, "error running command on pod")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+}

+ 29 - 0
api/server/router/porter_app.go

@@ -454,6 +454,35 @@ func getPorterAppRoutes(
 		Router:   r,
 	})
 
+	// POST /api/projects/{project_id}/clusters/{cluster_id}/applications/{porter_app_name}/run -> porter_app.NewRunPorterAppCommandHandler
+	runPorterAppCommandEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbCreate,
+			Method: types.HTTPVerbPost,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: fmt.Sprintf("%s/{%s}/run", relPath, types.URLParamPorterAppName),
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.ClusterScope,
+			},
+		},
+	)
+
+	runPorterAppCommandHandler := porter_app.NewRunPorterAppCommandHandler(
+		config,
+		factory.GetDecoderValidator(),
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &router.Route{
+		Endpoint: runPorterAppCommandEndpoint,
+		Handler:  runPorterAppCommandHandler,
+		Router:   r,
+	})
+
 	// TODO: remove these three endpoints once these three 'stacks' routes are no longer used in telemetry
 
 	// GET /api/projects/{project_id}/clusters/{cluster_id}/stacks/{name} -> porter_app.NewPorterAppGetHandler

+ 5 - 0
api/types/porter_app.go

@@ -68,6 +68,11 @@ type UpdatePorterAppRequest struct {
 	PullRequestURL string `json:"pull_request_url"`
 }
 
+// RunPorterAppCommandRequest represents a request to run a command on a pod of a porter app
+type RunPorterAppCommandRequest struct {
+	Command string `json:"command" form:"required"`
+}
+
 type RollbackPorterAppRequest struct {
 	Revision int `json:"revision" form:"required"`
 }

+ 1 - 1
dashboard/src/main/home/app-dashboard/AppDashboard.tsx

@@ -227,7 +227,7 @@ const AppDashboard: React.FC<Props> = ({ }) => {
                   <Spacer y={1} />
 
                   <Text color={"helper"}>
-                    Get started by deploying your app
+                    Get started by deploying your app.
                   </Text>
                   <Spacer y={.5} />
                   <PorterLink to="/apps/new/app">

+ 3 - 3
dashboard/src/main/home/app-dashboard/expanded-app/activity-feed/ActivityFeed.tsx

@@ -276,9 +276,9 @@ const Dot = styled.div<{ shouldAnimate: boolean }>`
   height: 7px;
   background: #fff;
   border-radius: 50%;
-  position: absolute;
-  left: 0;
-  top: 36px;
+  margin-left: -29px;
+  margin-right: 20px;
+  z-index: 1;
   opacity: ${(props) => props.shouldAnimate ? "0" : "1"};
   ${(props) => props.shouldAnimate && "animation: fadeIn 0.3s 0.1s;"}
   ${(props) => props.shouldAnimate && "animation-fill-mode: forwards;"}

+ 1 - 1
dashboard/src/main/home/project-settings/Metadata.tsx

@@ -36,7 +36,7 @@ const Metadata: React.FC<Props> = ({
                 <Icon src={icon} height={"14px"} />
                 <IconWithName>{name}</IconWithName>
                 <CopyContainer>
-                    <IdText>Id: {id}</IdText>
+                    <IdText>ID: {id}</IdText>
                     <CopyToClipboard text={id.toString()}>
                         <CopyIcon src={copy} alt="copy" />
                     </CopyToClipboard>

+ 1 - 1
dashboard/src/main/home/sidebar/ProjectSelectionModal.tsx

@@ -116,7 +116,7 @@ const ProjectSelectionModal: React.FC<Props> = ({
 
 
           <BlockDescription>
-            Project Id: {project.id}
+            Project ID: {project.id}
           </BlockDescription>
         </IdContainer>
       );

+ 137 - 0
internal/kubernetes/agent.go

@@ -18,6 +18,7 @@ import (
 	goerrors "errors"
 
 	"github.com/porter-dev/porter/api/server/shared/websocket"
+	"github.com/porter-dev/porter/cli/cmd/utils"
 	"github.com/porter-dev/porter/internal/models"
 	"github.com/porter-dev/porter/internal/registry"
 	"github.com/porter-dev/porter/internal/repository"
@@ -31,6 +32,7 @@ import (
 	netv1 "k8s.io/api/networking/v1"
 	netv1beta1 "k8s.io/api/networking/v1beta1"
 	"k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/api/resource"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/apimachinery/pkg/runtime"
@@ -1983,6 +1985,53 @@ func (a *Agent) CreateImagePullSecrets(
 	return res, nil
 }
 
+// RunCommandOnPod creates an ephemeral pod from the given pod with the given args as its start command.
+func (a *Agent) RunCommandOnPod(ctx context.Context, p *v1.Pod, args []string) error {
+	container := p.Spec.Containers[0].Name
+
+	newPod, err := a.createEphemeralPodFromExisting(ctx, p, container, args)
+	if err != nil {
+		return err
+	}
+	podName := newPod.ObjectMeta.Name
+
+	err, _ = a.waitForPod(newPod)
+	if err != nil {
+		return err
+	}
+
+	// refresh pod info for latest status
+	newPod, err = a.Clientset.CoreV1().
+		Pods(newPod.Namespace).
+		Get(ctx, newPod.Name, metav1.GetOptions{})
+	if err != nil {
+		return err
+	}
+
+	req := a.Clientset.CoreV1().RESTClient().Post().
+		Resource("pods").
+		Name(podName).
+		Namespace(newPod.Namespace).
+		SubResource("attach")
+
+	req.Param("stdin", "true")
+	req.Param("stdout", "true")
+	req.Param("tty", "true")
+	req.Param("container", container)
+
+	restConf, err := a.RESTClientGetter.ToRESTConfig()
+	if err != nil {
+		return err
+	}
+
+	_, err = remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 // helper that waits for pod to be ready
 func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
 	var (
@@ -2057,3 +2106,91 @@ func isPodReady(pod *v1.Pod) bool {
 func isPodExited(pod *v1.Pod) bool {
 	return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
 }
+
+func (a *Agent) createEphemeralPodFromExisting(
+	ctx context.Context,
+	existing *v1.Pod,
+	container string,
+	args []string,
+) (*v1.Pod, error) {
+	newPod := existing.DeepCopy()
+
+	// only copy the pod spec, overwrite metadata
+	newPod.ObjectMeta = metav1.ObjectMeta{
+		Name:      strings.ToLower(fmt.Sprintf("%s-copy-%s", existing.ObjectMeta.Name, utils.String(4))),
+		Namespace: existing.ObjectMeta.Namespace,
+	}
+
+	newPod.Status = v1.PodStatus{}
+
+	// set restart policy to never
+	newPod.Spec.RestartPolicy = v1.RestartPolicyNever
+
+	// change the command in the pod to the passed in pod command
+	cmdRoot := args[0]
+	cmdArgs := make([]string, 0)
+
+	// annotate with the ephemeral pod tag
+	newPod.Labels = make(map[string]string)
+	newPod.Labels["porter/ephemeral-pod"] = "true"
+
+	if len(args) > 1 {
+		cmdArgs = args[1:]
+	}
+
+	for i := 0; i < len(newPod.Spec.Containers); i++ {
+		if newPod.Spec.Containers[i].Name == container {
+			newPod.Spec.Containers[i].Command = []string{cmdRoot}
+			newPod.Spec.Containers[i].Args = cmdArgs
+			newPod.Spec.Containers[i].TTY = true
+			newPod.Spec.Containers[i].Stdin = true
+			newPod.Spec.Containers[i].StdinOnce = true
+
+			var newCpu int
+			if newPod.Spec.Containers[i].Resources.Requests.Cpu() != nil && newPod.Spec.Containers[i].Resources.Requests.Cpu().MilliValue() > 500 {
+				newCpu = 500
+			}
+			if newCpu != 0 {
+				newPod.Spec.Containers[i].Resources.Limits[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%dm", newCpu))
+				newPod.Spec.Containers[i].Resources.Requests[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%dm", newCpu))
+
+				for j := 0; j < len(newPod.Spec.Containers[i].Env); j++ {
+					if newPod.Spec.Containers[i].Env[j].Name == "PORTER_RESOURCES_CPU" {
+						newPod.Spec.Containers[i].Env[j].Value = fmt.Sprintf("%dm", newCpu)
+						break
+					}
+				}
+			}
+
+			var newMemory int
+			if newPod.Spec.Containers[i].Resources.Requests.Memory() != nil && newPod.Spec.Containers[i].Resources.Requests.Memory().Value() > 1000*1024*1024 {
+				newMemory = 1000
+			}
+			if newMemory != 0 {
+				newPod.Spec.Containers[i].Resources.Limits[v1.ResourceMemory] = resource.MustParse(fmt.Sprintf("%dMi", newMemory))
+				newPod.Spec.Containers[i].Resources.Requests[v1.ResourceMemory] = resource.MustParse(fmt.Sprintf("%dMi", newMemory))
+
+				for j := 0; j < len(newPod.Spec.Containers[i].Env); j++ {
+					if newPod.Spec.Containers[i].Env[j].Name == "PORTER_RESOURCES_RAM" {
+						newPod.Spec.Containers[i].Env[j].Value = fmt.Sprintf("%dMi", newMemory)
+						break
+					}
+				}
+			}
+		}
+
+		// remove health checks and probes
+		newPod.Spec.Containers[i].LivenessProbe = nil
+		newPod.Spec.Containers[i].ReadinessProbe = nil
+		newPod.Spec.Containers[i].StartupProbe = nil
+	}
+
+	newPod.Spec.NodeName = ""
+
+	// create the pod and return it
+	return a.Clientset.CoreV1().Pods(existing.ObjectMeta.Namespace).Create(
+		ctx,
+		newPod,
+		metav1.CreateOptions{},
+	)
+}