Переглянути джерело

first attempt at streaming job runs

Alexander Belanger 4 роки тому
батько
коміт
b3bc07b878

+ 56 - 0
api/server/handlers/namespace/stream_job_runs.go

@@ -0,0 +1,56 @@
+package namespace
+
+import (
+	"net/http"
+	"strings"
+
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/server/shared/websocket"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/models"
+)
+
+type StreamJobRunsHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+func NewStreamJobRunsHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) *StreamJobRunsHandler {
+	return &StreamJobRunsHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+func (c *StreamJobRunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	safeRW := r.Context().Value(types.RequestCtxWebsocketKey).(*websocket.WebsocketSafeReadWriter)
+	namespace := r.Context().Value(types.NamespaceScope).(string)
+
+	cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
+
+	agent, err := c.GetAgent(r, cluster, "")
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	if strings.ToLower(namespace) == "all" {
+		namespace = ""
+	}
+
+	err = agent.StreamJobs(namespace, "", safeRW)
+
+	if err != nil {
+		c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+}

+ 34 - 0
api/server/router/namespace.go

@@ -450,6 +450,40 @@ func getNamespaceRoutes(
 		Router:   r,
 	})
 
+	// GET /api/projects/{project_id}/clusters/{cluster_id}/namespaces/{namespace}/jobs/stream -> namespace.NewStreamJobRunsHandler
+	streamJobRunsEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbGet,
+			Method: types.HTTPVerbGet,
+			Path: &types.Path{
+				Parent: basePath,
+				RelativePath: fmt.Sprintf(
+					"%s/jobs/stream",
+					relPath,
+				),
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.ClusterScope,
+				types.NamespaceScope,
+			},
+			IsWebsocket: true,
+		},
+	)
+
+	streamJobRunsHandler := namespace.NewStreamJobRunsHandler(
+		config,
+		factory.GetDecoderValidator(),
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &Route{
+		Endpoint: streamJobRunsEndpoint,
+		Handler:  streamJobRunsHandler,
+		Router:   r,
+	})
+
 	// GET /api/projects/{project_id}/clusters/{cluster_id}/namespaces/{namespace}/pod/{name}/previous_logs
 	getPreviousLogsEndpoint := factory.NewAPIEndpoint(
 		&types.APIRequestMetadata{

+ 93 - 0
internal/kubernetes/agent.go

@@ -702,6 +702,99 @@ func (a *Agent) ListAllJobs(namespace string) ([]batchv1.Job, error) {
 	return resp.Items, nil
 }
 
+// StreamJobs streams a list of jobs to the websocket writer, closing the connection once all jobs have been sent
+func (a *Agent) StreamJobs(namespace string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
+	run := func() error {
+		errorchan := make(chan error)
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+
+		var wg sync.WaitGroup
+		var once sync.Once
+		var err error
+
+		wg.Add(2)
+
+		go func() {
+			wg.Wait()
+			close(errorchan)
+		}()
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
+			// listens for websocket closing handshake
+			defer wg.Done()
+
+			for {
+				if _, _, err := rw.ReadMessage(); err != nil {
+					errorchan <- nil
+					return
+				}
+			}
+		}()
+
+		go func() {
+			defer func() {
+				if r := recover(); r != nil {
+					// TODO: add method to alert on panic
+					return
+				}
+			}()
+
+			// listens for websocket closing handshake
+			defer wg.Done()
+
+			continueVal := ""
+
+			for {
+				if ctx.Err() != nil {
+					errorchan <- nil
+					return
+				}
+
+				jobs, err := a.Clientset.BatchV1().Jobs(namespace).List(
+					ctx,
+					metav1.ListOptions{
+						Limit:    100,
+						Continue: continueVal,
+					},
+				)
+
+				if err != nil {
+					errorchan <- err
+					return
+				}
+
+				for _, job := range jobs.Items {
+					err := rw.WriteJSON(job)
+
+					if err != nil {
+						errorchan <- err
+						return
+					}
+				}
+			}
+		}()
+
+		for err = range errorchan {
+			once.Do(func() {
+				rw.Close()
+				cancel()
+			})
+		}
+
+		return err
+	}
+
+	return a.RunWebsocketTask(run)
+}
+
 // DeleteJob deletes the job in the given name and namespace.
 func (a *Agent) DeleteJob(name, namespace string) error {
 	return a.Clientset.BatchV1().Jobs(namespace).Delete(