Просмотр исходного кода

Adding stream helm releases to agent

jnfrati 4 лет назад
Родитель
Сommit
900004d412
2 измененных файлов с 179 добавлено и 1 удалено
  1. 116 0
      internal/kubernetes/agent.go
  2. 63 1
      server/api/k8s_handler.go

+ 116 - 0
internal/kubernetes/agent.go

@@ -3,10 +3,13 @@ package kubernetes
 import (
 	"bufio"
 	"bytes"
+	"compress/gzip"
 	"context"
+	"encoding/base64"
 	"encoding/json"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"strings"
 
 	"github.com/porter-dev/porter/internal/kubernetes/provisioner"
@@ -45,6 +48,8 @@ import (
 	"k8s.io/client-go/tools/remotecommand"
 
 	"github.com/porter-dev/porter/internal/config"
+
+	rspb "helm.sh/helm/v3/pkg/release"
 )
 
 // Agent is a Kubernetes agent for performing operations that interact with the
@@ -624,6 +629,117 @@ func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string, select
 	}
 }
 
+var b64 = base64.StdEncoding
+
+var magicGzip = []byte{0x1f, 0x8b, 0x08}
+
+func decodeRelease(data string) (*rspb.Release, error) {
+	// base64 decode string
+	b, err := b64.DecodeString(data)
+	if err != nil {
+		return nil, err
+	}
+
+	// For backwards compatibility with releases that were stored before
+	// compression was introduced we skip decompression if the
+	// gzip magic header is not found
+	if bytes.Equal(b[0:3], magicGzip) {
+		r, err := gzip.NewReader(bytes.NewReader(b))
+		if err != nil {
+			return nil, err
+		}
+		defer r.Close()
+		b2, err := ioutil.ReadAll(r)
+		if err != nil {
+			return nil, err
+		}
+		b = b2
+	}
+
+	var rls rspb.Release
+	// unmarshal release object bytes
+	if err := json.Unmarshal(b, &rls); err != nil {
+		return nil, err
+	}
+	return &rls, nil
+}
+
+func (a *Agent) StreamHelmReleases(conn *websocket.Conn, selectors string) error {
+	tweakListOptionsFunc := func(options *metav1.ListOptions) {
+		options.LabelSelector = selectors
+	}
+
+	factory := informers.NewSharedInformerFactoryWithOptions(
+		a.Clientset,
+		0,
+		informers.WithTweakListOptions(tweakListOptionsFunc),
+	)
+
+	informer := factory.Core().V1().Secrets().Informer()
+
+	stopper := make(chan struct{})
+	errorchan := make(chan error)
+	defer close(errorchan)
+
+	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+		UpdateFunc: func(oldObj, newObj interface{}) {
+			obj, _ := decodeRelease(newObj.Data.release)
+			msg := Message{
+				EventType: "UPDATE",
+				Object:    obj,
+			}
+
+			if writeErr := conn.WriteJSON(msg); writeErr != nil {
+				errorchan <- writeErr
+				return
+			}
+		},
+		AddFunc: func(obj interface{}) {
+			msg := Message{
+				EventType: "ADD",
+				Object:    obj,
+			}
+
+			if writeErr := conn.WriteJSON(msg); writeErr != nil {
+				errorchan <- writeErr
+				return
+			}
+		},
+		DeleteFunc: func(obj interface{}) {
+			msg := Message{
+				EventType: "DELETE",
+				Object:    obj,
+			}
+
+			if writeErr := conn.WriteJSON(msg); writeErr != nil {
+				errorchan <- writeErr
+				return
+			}
+		},
+	})
+
+	go func() {
+		// listens for websocket closing handshake
+		for {
+			if _, _, err := conn.ReadMessage(); err != nil {
+				defer conn.Close()
+				defer close(stopper)
+				errorchan <- nil
+				return
+			}
+		}
+	}()
+
+	go informer.Run(stopper)
+
+	for {
+		select {
+		case err := <-errorchan:
+			return err
+		}
+	}
+}
+
 // ProvisionECR spawns a new provisioning pod that creates an ECR instance
 func (a *Agent) ProvisionECR(
 	projectID uint,

+ 63 - 1
server/api/k8s_handler.go

@@ -1064,7 +1064,7 @@ func (app *App) HandleStreamControllerStatus(w http.ResponseWriter, r *http.Requ
 	selectors := ""
 	if vals["selectors"] != nil {
 		selectors = vals["selectors"][0]
-	} 
+	}
 	err = agent.StreamControllerStatus(conn, kind, selectors)
 
 	if err != nil {
@@ -1073,6 +1073,68 @@ func (app *App) HandleStreamControllerStatus(w http.ResponseWriter, r *http.Requ
 	}
 }
 
+func (app *App) HandleStreamHelmReleases(w http.ResponseWriter, r *http.Request) {
+	vals, err := url.ParseQuery(r.URL.RawQuery)
+
+	if err != nil {
+		app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
+		return
+	}
+
+	// get session to retrieve correct kubeconfig
+	_, err = app.Store.Get(r, app.ServerConf.CookieName)
+
+	if err != nil {
+		app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
+		return
+	}
+
+	// get the filter options
+	form := &forms.K8sForm{
+		OutOfClusterConfig: &kubernetes.OutOfClusterConfig{
+			Repo:              app.Repo,
+			DigitalOceanOAuth: app.DOConf,
+		},
+	}
+
+	form.PopulateK8sOptionsFromQueryParams(vals, app.Repo.Cluster)
+
+	// validate the form
+	if err := app.validator.Struct(form); err != nil {
+		app.handleErrorFormValidation(err, ErrK8sValidate, w)
+		return
+	}
+
+	// create a new agent
+	var agent *kubernetes.Agent
+
+	if app.ServerConf.IsTesting {
+		agent = app.TestAgents.K8sAgent
+	} else {
+		agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
+	}
+
+	upgrader.CheckOrigin = func(r *http.Request) bool { return true }
+
+	// upgrade to websocket.
+	conn, err := upgrader.Upgrade(w, r, nil)
+
+	if err != nil {
+		app.handleErrorUpgradeWebsocket(err, w)
+	}
+
+	selectors := ""
+	if vals["selectors"] != nil {
+		selectors = vals["selectors"][0]
+	}
+	err = agent.StreamHelmReleases(conn, selectors)
+
+	if err != nil {
+		app.handleErrorWebsocketWrite(err, w)
+		return
+	}
+}
+
 // HandleDetectPrometheusInstalled detects a prometheus installation in the target cluster
 func (app *App) HandleDetectPrometheusInstalled(w http.ResponseWriter, r *http.Request) {
 	vals, err := url.ParseQuery(r.URL.RawQuery)