Răsfoiți Sursa

Merge pull request #1308 from porter-dev/belanger/por-102-websocket-panic

[POR-102] Fix websocket panic and handle jobs+cronjobs in list of pods
abelanger5 4 ani în urmă
părinte
comite
d9d2fd52a7

+ 73 - 2
api/server/handlers/release/get_all_pods.go

@@ -1,6 +1,7 @@
 package release
 
 import (
+	"fmt"
 	"net/http"
 	"strings"
 
@@ -11,6 +12,7 @@ import (
 	"github.com/porter-dev/porter/api/server/shared/config"
 	"github.com/porter-dev/porter/api/types"
 	"github.com/porter-dev/porter/internal/helm/grapher"
+	"github.com/porter-dev/porter/internal/kubernetes"
 	"github.com/porter-dev/porter/internal/models"
 	"helm.sh/helm/v3/pkg/release"
 	v1 "k8s.io/api/core/v1"
@@ -58,8 +60,38 @@ func (c *GetAllPodsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 		selectors := make([]string, 0)
 
-		for key, val := range selector.MatchLabels {
-			selectors = append(selectors, key+"="+val)
+		if strings.ToLower(controller.Kind) == "cronjob" {
+			// in the case of cronjobs, getting the pod is non-arbitrary. We only get the pod
+			// declared by the manifest, which will have a certain revision attached. But the
+			// label on the pod is the job name, not the cronjob name. So we first find the
+			// list of jobs run by this cronjob, and then get the pods attached to that job.
+			jobLabels := make([]kubernetes.Label, 0)
+
+			for key, val := range selector.MatchLabels {
+				jobLabels = append(jobLabels, kubernetes.Label{
+					Key: key,
+					Val: val,
+				})
+			}
+
+			jobPods, err := getPodsForJobs(agent, helmRelease.Namespace, jobLabels)
+
+			if err != nil {
+				c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+				return
+			}
+
+			pods = append(pods, jobPods...)
+
+			continue
+		} else if strings.ToLower(controller.Kind) == "job" {
+			// in the case of jobs as the controller, we simply find the job matching the
+			// pod name.
+			selectors = append(selectors, "job-name="+controller.Name)
+		} else {
+			for key, val := range selector.MatchLabels {
+				selectors = append(selectors, key+"="+val)
+			}
 		}
 
 		podList, err := agent.GetPodsByLabel(strings.Join(selectors, ","), helmRelease.Namespace)
@@ -72,5 +104,44 @@ func (c *GetAllPodsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		pods = append(pods, podList.Items...)
 	}
 
+	// we also check for jobs attached to this release
+	labels := getJobLabels(helmRelease)
+
+	labels = append(labels, kubernetes.Label{
+		Key: "helm.sh/revision",
+		Val: fmt.Sprintf("%d", helmRelease.Version),
+	})
+
+	jobPods, err := getPodsForJobs(agent, helmRelease.Namespace, labels)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	pods = append(pods, jobPods...)
+
 	c.WriteResult(w, r, pods)
 }
+
+func getPodsForJobs(agent *kubernetes.Agent, namespace string, labels []kubernetes.Label) ([]v1.Pod, error) {
+	pods := make([]v1.Pod, 0)
+
+	jobs, err := agent.ListJobsByLabel(namespace, labels...)
+
+	if err != nil {
+		return nil, err
+	}
+
+	for _, job := range jobs {
+		podList, err := agent.GetPodsByLabel("job-name="+job.Name, namespace)
+
+		if err != nil {
+			return nil, err
+		}
+
+		pods = append(pods, podList.Items...)
+	}
+
+	return pods, nil
+}

+ 17 - 1
api/server/handlers/release/get_controllers.go

@@ -114,7 +114,23 @@ func getController(controller grapher.Object, agent *kubernetes.Agent) (rc inter
 			return nil, nil, err
 		}
 
-		return obj, obj.Spec.JobTemplate.Spec.Selector, nil
+		res := &metav1.LabelSelector{
+			MatchLabels: make(map[string]string),
+		}
+
+		for key, val := range obj.Spec.JobTemplate.Labels {
+			res.MatchLabels[key] = val
+		}
+
+		return obj, res, nil
+	case "job":
+		obj, err := agent.GetJob(controller)
+
+		if err != nil {
+			return nil, nil, err
+		}
+
+		return obj, obj.Spec.Selector, nil
 	}
 
 	return nil, nil, fmt.Errorf("not a valid controller")

+ 2 - 1
internal/kubernetes/provisioner/resource_stream.go

@@ -10,6 +10,7 @@ import (
 // ResourceStream performs an XREAD operation on the given stream and outputs it to the given websocket conn.
 func ResourceStream(client *redis.Client, streamName string, rw *websocket.WebsocketSafeReadWriter) error {
 	errorchan := make(chan error)
+	defer close(errorchan)
 
 	go func() {
 		// listens for websocket closing handshake
@@ -34,6 +35,7 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 			).Result()
 
 			if err != nil {
+				errorchan <- err
 				return
 			}
 
@@ -47,7 +49,6 @@ func ResourceStream(client *redis.Client, streamName string, rw *websocket.Webso
 	for {
 		select {
 		case err := <-errorchan:
-			close(errorchan)
 			client.Close()
 			return err
 		}