Просмотр исходного кода

Merge branch 'nafees/jobs-endpoints' of github.com:porter-dev/porter into nico/jobs-overhaul-job-run-list

jnfrati 4 лет назад
Родитель
Сommit
eea8450465

+ 135 - 0
api/server/handlers/namespace/get_job_runs.go

@@ -0,0 +1,135 @@
+package namespace
+
+import (
+	"net/http"
+	"sort"
+	"strings"
+
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/models"
+	v1 "k8s.io/api/batch/v1"
+)
+
+type GetJobRunsHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+func NewGetJobRunsHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) *GetJobRunsHandler {
+	return &GetJobRunsHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+func (c *GetJobRunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	request := &types.GetJobRunsRequest{}
+
+	if ok := c.DecodeAndValidate(w, r, request); !ok {
+		return
+	}
+
+	namespace := r.Context().Value(types.NamespaceScope).(string)
+	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
+
+	agent, err := c.GetAgent(r, cluster, "")
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	allJobs, err := agent.ListAllJobs(namespace)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	var jobs []v1.Job
+
+	if strings.ToLower(request.Status) == "failed" {
+		for _, job := range allJobs {
+			if job.Status.Failed > 0 {
+				jobs = append(jobs, job)
+			}
+		}
+	} else if strings.ToLower(request.Status) == "succeeded" {
+		for _, job := range allJobs {
+			if job.Status.Succeeded > 0 {
+				jobs = append(jobs, job)
+			}
+		}
+	} else if strings.ToLower(request.Status) == "running" {
+		for _, job := range allJobs {
+			if job.Status.Active > 0 {
+				jobs = append(jobs, job)
+			}
+		}
+	} else {
+		// return all
+		jobs = append(jobs, allJobs...)
+	}
+
+	if strings.ToLower(request.Sort) == "oldest" {
+		sort.Sort(sortByOldest(jobs))
+	} else if strings.ToLower(request.Sort) == "alphabetical" {
+		sort.Sort(sortByAlphabetical(jobs))
+	} else {
+		// sort by newest
+		sort.Sort(sortByNewest(jobs))
+	}
+
+	c.WriteResult(w, r, jobs)
+}
+
+type sortByNewest []v1.Job
+
+func (s sortByNewest) Len() int {
+	return len(s)
+}
+
+func (s sortByNewest) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+func (s sortByNewest) Less(i, j int) bool {
+	return s[i].CreationTimestamp.Unix() > s[j].CreationTimestamp.Unix()
+}
+
+type sortByOldest []v1.Job
+
+func (s sortByOldest) Len() int {
+	return len(s)
+}
+
+func (s sortByOldest) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+func (s sortByOldest) Less(i, j int) bool {
+	return s[i].CreationTimestamp.Unix() < s[j].CreationTimestamp.Unix()
+}
+
+type sortByAlphabetical []v1.Job
+
+func (s sortByAlphabetical) Len() int {
+	return len(s)
+}
+
+func (s sortByAlphabetical) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+func (s sortByAlphabetical) Less(i, j int) bool {
+	return s[i].Name < s[j].Name
+}

+ 18 - 2
api/server/handlers/release/get_jobs.go

@@ -22,15 +22,22 @@ type GetJobsHandler struct {
 
 
 func NewGetJobsHandler(
 func NewGetJobsHandler(
 	config *config.Config,
 	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
 	writer shared.ResultWriter,
 	writer shared.ResultWriter,
 ) *GetJobsHandler {
 ) *GetJobsHandler {
 	return &GetJobsHandler{
 	return &GetJobsHandler{
-		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, nil, writer),
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
 		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
 		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
 	}
 	}
 }
 }
 
 
 func (c *GetJobsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (c *GetJobsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	request := &types.GetJobsRequest{}
+
+	if ok := c.DecodeAndValidate(w, r, request); !ok {
+		return
+	}
+
 	helmRelease, _ := r.Context().Value(types.ReleaseScope).(*release.Release)
 	helmRelease, _ := r.Context().Value(types.ReleaseScope).(*release.Release)
 	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
 	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
 	agent, err := c.GetAgent(r, cluster, "")
 	agent, err := c.GetAgent(r, cluster, "")
@@ -40,7 +47,16 @@ func (c *GetJobsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 
 
-	jobs, err := agent.ListJobsByLabel(helmRelease.Namespace, getJobLabels(helmRelease)...)
+	labels := getJobLabels(helmRelease)
+
+	if request.Revision != 0 {
+		labels = append(labels, kubernetes.Label{
+			Key: "helm.sh/revision",
+			Val: fmt.Sprintf("%d", request.Revision),
+		})
+	}
+
+	jobs, err := agent.ListJobsByLabel(helmRelease.Namespace, labels...)
 
 
 	if err != nil {
 	if err != nil {
 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
 		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))

+ 62 - 0
api/server/handlers/release/get_latest_job_run.go

@@ -0,0 +1,62 @@
+package release
+
+import (
+	"net/http"
+
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/models"
+	"helm.sh/helm/v3/pkg/release"
+)
+
+type GetLatestJobRunHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+func NewGetLatestJobRunHandler(
+	config *config.Config,
+	writer shared.ResultWriter,
+) *GetLatestJobRunHandler {
+	return &GetLatestJobRunHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, nil, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+func (c *GetLatestJobRunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	helmRelease, _ := r.Context().Value(types.ReleaseScope).(*release.Release)
+	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
+	agent, err := c.GetAgent(r, cluster, "")
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	jobs, err := agent.ListJobsByLabel(helmRelease.Namespace, getJobLabels(helmRelease)...)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	// get the most recent job
+	if len(jobs) > 0 {
+		mostRecentJob := jobs[0]
+
+		for _, job := range jobs {
+			createdAt := job.ObjectMeta.CreationTimestamp
+
+			if mostRecentJob.CreationTimestamp.Before(&createdAt) {
+				mostRecentJob = job
+			}
+		}
+
+		c.WriteResult(w, r, mostRecentJob)
+	}
+}

+ 32 - 1
api/server/router/namespace.go

@@ -680,7 +680,7 @@ func getNamespaceRoutes(
 	})
 	})
 
 
 	// GET /api/projects/{project_id}/clusters/{cluster_id}/namespaces/{namespace}/ingresses/{name} ->
 	// GET /api/projects/{project_id}/clusters/{cluster_id}/namespaces/{namespace}/ingresses/{name} ->
-	// release.NewGetJobsHandler
+	// namespace.NewGetIngressHandler
 	getIngressEndpoint := factory.NewAPIEndpoint(
 	getIngressEndpoint := factory.NewAPIEndpoint(
 		&types.APIRequestMetadata{
 		&types.APIRequestMetadata{
 			Verb:   types.APIVerbGet,
 			Verb:   types.APIVerbGet,
@@ -709,5 +709,36 @@ func getNamespaceRoutes(
 		Router:   r,
 		Router:   r,
 	})
 	})
 
 
+	// GET /api/projects/{project_id}/clusters/{cluster_id}/namespaces/{namespace}/job_runs ->
+	// namespace.NewGetJobRunsHandler
+	getJobRunsEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbGet,
+			Method: types.HTTPVerbGet,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: relPath + "/job_runs",
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.ClusterScope,
+				types.NamespaceScope,
+			},
+		},
+	)
+
+	getJobRunsHandler := namespace.NewGetJobRunsHandler(
+		config,
+		factory.GetDecoderValidator(),
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &Route{
+		Endpoint: getJobRunsEndpoint,
+		Handler:  getJobRunsHandler,
+		Router:   r,
+	})
+
 	return routes, newPath
 	return routes, newPath
 }
 }

+ 32 - 0
api/server/router/release.go

@@ -681,6 +681,7 @@ func getReleaseRoutes(
 
 
 	getJobsHandler := release.NewGetJobsHandler(
 	getJobsHandler := release.NewGetJobsHandler(
 		config,
 		config,
+		factory.GetDecoderValidator(),
 		factory.GetResultWriter(),
 		factory.GetResultWriter(),
 	)
 	)
 
 
@@ -690,6 +691,37 @@ func getReleaseRoutes(
 		Router:   r,
 		Router:   r,
 	})
 	})
 
 
+	// GET /api/projects/{project_id}/clusters/{cluster_id}/namespaces/{namespace}/releases/{name}/{version}/latest_job_run ->
+	// release.NewGetLatestJobRunHandler
+	getLatestJobRunEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbGet,
+			Method: types.HTTPVerbGet,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: relPath + "/latest_job_run",
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.ClusterScope,
+				types.NamespaceScope,
+				types.ReleaseScope,
+			},
+		},
+	)
+
+	getLatestJobRunHandler := release.NewGetLatestJobRunHandler(
+		config,
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &Route{
+		Endpoint: getLatestJobRunEndpoint,
+		Handler:  getLatestJobRunHandler,
+		Router:   r,
+	})
+
 	// GET /api/projects/{project_id}/clusters/{cluster_id}/namespaces/{namespace}/releases/{name}/{version}/jobs/status ->
 	// GET /api/projects/{project_id}/clusters/{cluster_id}/namespaces/{namespace}/releases/{name}/{version}/jobs/status ->
 	// release.NewGetJobsHandler
 	// release.NewGetJobsHandler
 	getJobsStatusEndpoint := factory.NewAPIEndpoint(
 	getJobsStatusEndpoint := factory.NewAPIEndpoint(

+ 9 - 0
api/types/namespace.go

@@ -176,3 +176,12 @@ type GetPreviousPodLogsRequest struct {
 type GetPreviousPodLogsResponse struct {
 type GetPreviousPodLogsResponse struct {
 	PrevLogs []string `json:"previous_logs"`
 	PrevLogs []string `json:"previous_logs"`
 }
 }
+
+type GetJobsRequest struct {
+	Revision uint `schema:"revision"`
+}
+
+type GetJobRunsRequest struct {
+	Status string `schema:"status"`
+	Sort   string `schema:"sort"`
+}

+ 13 - 0
internal/kubernetes/agent.go

@@ -689,6 +689,19 @@ func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Jo
 	return resp.Items, nil
 	return resp.Items, nil
 }
 }
 
 
+func (a *Agent) ListAllJobs(namespace string) ([]batchv1.Job, error) {
+	resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
+		context.TODO(),
+		metav1.ListOptions{},
+	)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return resp.Items, nil
+}
+
 // DeleteJob deletes the job in the given name and namespace.
 // DeleteJob deletes the job in the given name and namespace.
 func (a *Agent) DeleteJob(name, namespace string) error {
 func (a *Agent) DeleteJob(name, namespace string) error {
 	return a.Clientset.BatchV1().Jobs(namespace).Delete(
 	return a.Clientset.BatchV1().Jobs(namespace).Delete(