浏览代码

add backing up to s3 bucket before deleting release revision

Mohammed Nafees 3 年之前
父节点
当前提交
3724f88be3

+ 3 - 0
internal/models/cluster.go

@@ -73,6 +73,9 @@ type Cluster struct {
 
 	// CertificateAuthorityData for the cluster, encrypted at rest
 	CertificateAuthorityData []byte `json:"certificate-authority-data,omitempty"`
+
+	// MonitorHelmReleases to trim down the number of revisions per release
+	MonitorHelmReleases bool `json:"monitor_helm_releases"`
 }
 
 // ToProjectType generates an external types.Project to be shared over REST

+ 1 - 3
internal/worker/dispatcher.go

@@ -37,7 +37,7 @@ func (d *Dispatcher) Run(jobQueue chan Job) error {
 		worker := NewWorker(uuid, d.WorkerPool)
 		d.workers = append(d.workers, worker)
 
-		log.Default().Printf("starting worker with UUID: %v", uuid)
+		log.Printf("starting worker with UUID: %v", uuid)
 
 		worker.Start()
 	}
@@ -66,8 +66,6 @@ func (d *Dispatcher) dispatch(jobQueue chan Job) {
 					w.Stop()
 				}
 
-				fmt.Println("exiting dispatcher")
-
 				return
 			}
 		}

+ 2 - 2
internal/worker/worker.go

@@ -40,10 +40,10 @@ func (w *Worker) Start() {
 			select {
 			case job := <-w.JobChannel:
 				if err := job.Run(); err != nil {
-					log.Default().Printf("error running job %s: %s", job.ID(), err.Error())
+					log.Printf("error running job %s: %s", job.ID(), err.Error())
 				}
 			case <-w.exitChan:
-				log.Default().Printf("quitting worker with UUID: %v", w.uuid)
+				log.Printf("quitting worker with UUID: %v", w.uuid)
 
 				return
 			}

+ 20 - 0
provisioner/integrations/storage/s3/s3.go

@@ -78,6 +78,26 @@ func (s *S3StorageClient) WriteFile(infra *models.Infra, name string, fileBytes
 	return err
 }
 
+func (s *S3StorageClient) WriteFileWithKey(fileBytes []byte, shouldEncrypt bool, key string) error {
+	body := fileBytes
+	var err error
+	if shouldEncrypt {
+		body, err = encryption.Encrypt(fileBytes, s.encryptionKey)
+
+		if err != nil {
+			return err
+		}
+	}
+
+	_, err = s.client.PutObject(&s3.PutObjectInput{
+		Body:   aws.ReadSeekCloser(bytes.NewReader(body)),
+		Bucket: &s.bucket,
+		Key:    aws.String(key),
+	})
+
+	return err
+}
+
 func (s *S3StorageClient) ReadFile(infra *models.Infra, name string, shouldDecrypt bool) ([]byte, error) {
 	output, err := s.client.GetObject(&s3.GetObjectInput{
 		Bucket: &s.bucket,

+ 1 - 0
workers/Dockerfile

@@ -11,6 +11,7 @@ COPY /api ./api
 COPY /ee ./ee
 COPY /internal ./internal
 COPY /pkg ./pkg
+COPY /provisioner ./provisioner
 COPY /workers ./workers
 
 RUN go build -ldflags '-w -s' -tags ee -a -o ./bin/worker-pool ./workers

+ 61 - 15
workers/jobs/helm_revisions_count_tracker.go

@@ -3,6 +3,9 @@
 package jobs
 
 import (
+	"encoding/json"
+	"fmt"
+	"log"
 	"os"
 	"sync"
 	"time"
@@ -10,6 +13,7 @@ import (
 	"github.com/porter-dev/porter/api/server/shared/config/env"
 	"github.com/porter-dev/porter/api/types"
 	"github.com/porter-dev/porter/pkg/logger"
+	"github.com/porter-dev/porter/provisioner/integrations/storage/s3"
 
 	"github.com/porter-dev/porter/ee/integrations/vault"
 	"github.com/porter-dev/porter/internal/adapter"
@@ -17,7 +21,7 @@ import (
 	"github.com/porter-dev/porter/internal/models"
 	"github.com/porter-dev/porter/internal/oauth"
 	"github.com/porter-dev/porter/internal/repository"
-	"github.com/porter-dev/porter/internal/repository/credentials"
+	rcreds "github.com/porter-dev/porter/internal/repository/credentials"
 	rgorm "github.com/porter-dev/porter/internal/repository/gorm"
 	"golang.org/x/oauth2"
 	"gorm.io/gorm"
@@ -27,20 +31,30 @@ import (
 var stepSize int = 100
 
 type helmRevisionsCountTracker struct {
-	enqueueTime time.Time
-	db          *gorm.DB
-	repo        repository.Repository
-	doConf      *oauth2.Config
-	dbConf      *env.DBConf
-	credBackend credentials.CredentialStorage
+	enqueueTime        time.Time
+	db                 *gorm.DB
+	repo               repository.Repository
+	doConf             *oauth2.Config
+	dbConf             *env.DBConf
+	credBackend        rcreds.CredentialStorage
+	awsAccessKeyID     string
+	awsSecretAccessKey string
+	awsRegion          string
+	s3BucketName       string
+	encryptionKey      *[32]byte
 }
 
 type HelmRevisionsCountTrackerOpts struct {
-	DBConf         *env.DBConf
-	DOClientID     string
-	DOClientSecret string
-	DOScopes       []string
-	ServerURL      string
+	DBConf             *env.DBConf
+	DOClientID         string
+	DOClientSecret     string
+	DOScopes           []string
+	ServerURL          string
+	AWSAccessKeyID     string
+	AWSSecretAccessKey string
+	AWSRegion          string
+	S3BucketName       string
+	EncryptionKey      *[32]byte
 }
 
 func NewHelmRevisionsCountTracker(
@@ -53,7 +67,7 @@ func NewHelmRevisionsCountTracker(
 		return nil, err
 	}
 
-	var credBackend credentials.CredentialStorage
+	var credBackend rcreds.CredentialStorage
 
 	if opts.DBConf.VaultAPIKey != "" && opts.DBConf.VaultServerURL != "" && opts.DBConf.VaultPrefix != "" {
 		credBackend = vault.NewClient(
@@ -80,6 +94,8 @@ func NewHelmRevisionsCountTracker(
 
 	return &helmRevisionsCountTracker{
 		enqueueTime, db, repo, doConf, opts.DBConf, credBackend,
+		opts.AWSAccessKeyID, opts.AWSSecretAccessKey, opts.AWSRegion,
+		opts.S3BucketName, opts.EncryptionKey,
 	}, nil
 }
 
@@ -103,7 +119,8 @@ func (t *helmRevisionsCountTracker) Run() error {
 	for i := 0; i < (int(count)/stepSize)+1; i++ {
 		var clusters []*models.Cluster
 
-		if err := t.db.Order("id asc").Offset(i * stepSize).Limit(stepSize).Find(&clusters).Error; err != nil {
+		if err := t.db.Order("id asc").Offset(i*stepSize).Limit(stepSize).Find(&clusters, "monitor_helm_releases = ?", "1").
+			Error; err != nil {
 			return err
 		}
 
@@ -125,6 +142,16 @@ func (t *helmRevisionsCountTracker) Run() error {
 					return
 				}
 
+				// create s3 client to store revisions that need to be deleted
+				s3Client, err := s3.NewS3StorageClient(&s3.S3Options{
+					t.awsRegion, t.awsAccessKeyID, t.awsSecretAccessKey, t.s3BucketName, t.encryptionKey,
+				})
+
+				if err != nil {
+					log.Printf("error creating S3 client for cluster with ID %d: %v. skipping cluster ...", cluster.ID, err)
+					return
+				}
+
 				namespaces, err := agent.K8sAgent.ListNamespaces()
 
 				if err != nil {
@@ -155,7 +182,26 @@ func (t *helmRevisionsCountTracker) Run() error {
 						for i := 100; i < len(revisions); i += 1 {
 							rev := revisions[i]
 
-							err := agent.DeleteReleaseRevision(rev.Name, rev.Version)
+							// store the revision in the s3 bucket before deleting it
+							data, err := json.Marshal(rev)
+
+							if err != nil {
+								log.Printf("error marshalling revision for release %s, number %d: %v. skipping revision ...",
+									rev.Name, rev.Version, err)
+								continue
+							}
+
+							// write to the bucket with key - <project_id>/<cluster_id>/<namespace>/<release_name>/<revision_number>
+							err = s3Client.WriteFileWithKey(data, true, fmt.Sprintf("%d/%d/%s/%s/%d", cluster.ProjectID,
+								cluster.ID, rel.Namespace, rel.Name, rev.Version))
+
+							if err != nil {
+								log.Printf("error backing up revision for release %s, number %s: %v. skipping revision ...",
+									rev.Name, rev.Version, err)
+								continue
+							}
+
+							err = agent.DeleteReleaseRevision(rev.Name, rev.Version)
 
 							if err != nil {
 								continue

+ 36 - 39
workers/main.go

@@ -8,7 +8,6 @@ import (
 	"net/http"
 	"os"
 	"os/signal"
-	"strconv"
 	"syscall"
 	"time"
 
@@ -21,48 +20,41 @@ import (
 )
 
 var (
-	MaxWorkers = os.Getenv("MAX_WORKERS")
-	MaxQueue   = os.Getenv("MAX_QUEUE")
-
 	jobQueue   chan worker.Job
 	envDecoder = EnvConf{}
 )
 
 type EnvConf struct {
-	ServerURL      string `env:"SERVER_URL,default=http://localhost:8080"`
-	DOClientID     string `env:"DO_CLIENT_ID"`
-	DOClientSecret string `env:"DO_CLIENT_SECRET"`
-	DBConf         env.DBConf
+	ServerURL          string `env:"SERVER_URL,default=http://localhost:8080"`
+	DOClientID         string `env:"DO_CLIENT_ID"`
+	DOClientSecret     string `env:"DO_CLIENT_SECRET"`
+	DBConf             env.DBConf
+	MaxWorkers         uint      `env:"MAX_WORKERS,default=10"`
+	MaxQueue           uint      `env:"MAX_QUEUE,default=100"`
+	AWSAccessKeyID     string    `env:"AWS_ACCESS_KEY_ID"`
+	AWSSecretAccessKey string    `env:"AWS_SECRET_ACCESS_KEY"`
+	AWSRegion          string    `env:"AWS_REGION"`
+	S3BucketName       string    `env:"S3_BUCKET_NAME"`
+	EncryptionKey      *[32]byte `env:"ENCRYPTION_KEY"`
 }
 
 func main() {
 	if err := envdecode.StrictDecode(&envDecoder); err != nil {
-		log.Default().Fatalf("Failed to decode server conf: %v", err)
-	}
-
-	workerCount, err := strconv.Atoi(MaxWorkers)
-	if err != nil {
-		log.Default().Fatalln("invalid MAX_WORKERS value")
-	}
-
-	log.Default().Printf("setting max worker count to: %d\n", workerCount)
-
-	queueCount, err := strconv.Atoi(MaxQueue)
-	if err != nil {
-		log.Default().Fatalln("invalid MAX_QUEUE value")
+		log.Fatalf("Failed to decode server conf: %v", err)
 	}
 
-	log.Default().Printf("setting max job queue count to: %d\n", queueCount)
+	log.Printf("setting max worker count to: %d\n", envDecoder.MaxWorkers)
+	log.Printf("setting max job queue count to: %d\n", envDecoder.MaxQueue)
 
-	jobQueue = make(chan worker.Job, queueCount)
-	d := worker.NewDispatcher(workerCount)
+	jobQueue = make(chan worker.Job, envDecoder.MaxQueue)
+	d := worker.NewDispatcher(int(envDecoder.MaxWorkers))
 
-	log.Default().Println("starting worker dispatcher")
+	log.Println("starting worker dispatcher")
 
-	err = d.Run(jobQueue)
+	err := d.Run(jobQueue)
 
 	if err != nil {
-		log.Default().Fatalln(err)
+		log.Fatalln(err)
 	}
 
 	server := &http.Server{Addr: ":3000", Handler: httpService()}
@@ -74,7 +66,7 @@ func main() {
 	go func() {
 		<-sig
 
-		log.Default().Println("shutting down server")
+		log.Println("shutting down server")
 
 		shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
 		defer shutdownCtxCancel()
@@ -92,16 +84,16 @@ func main() {
 			log.Fatalln(err)
 		}
 
-		log.Default().Println("server shutdown completed")
+		log.Println("server shutdown completed")
 
 		serverStopCtx()
 	}()
 
-	log.Default().Println("starting HTTP server at :3000")
+	log.Println("starting HTTP server at :3000")
 
 	err = server.ListenAndServe()
 	if err != nil && err != http.ErrServerClosed {
-		log.Default().Fatalf("error starting HTTP server: %v", err)
+		log.Fatalf("error starting HTTP server: %v", err)
 	}
 
 	// Wait for server context to be stopped
@@ -111,7 +103,7 @@ func main() {
 }
 
 func httpService() http.Handler {
-	log.Default().Println("setting up HTTP router and adding middleware")
+	log.Println("setting up HTTP router and adding middleware")
 
 	r := chi.NewRouter()
 	r.Use(middleware.Logger)
@@ -119,7 +111,7 @@ func httpService() http.Handler {
 	r.Use(middleware.Heartbeat("/ping"))
 	r.Use(middleware.AllowContentType("application/json"))
 
-	log.Default().Println("setting up HTTP POST endpoint to enqueue jobs")
+	log.Println("setting up HTTP POST endpoint to enqueue jobs")
 
 	r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
 		job := getJob(chi.URLParam(r, "id"))
@@ -139,15 +131,20 @@ func httpService() http.Handler {
 func getJob(id string) worker.Job {
 	if id == "helm-revisions-count-tracker" {
 		newJob, err := jobs.NewHelmRevisionsCountTracker(time.Now().UTC(), &jobs.HelmRevisionsCountTrackerOpts{
-			DBConf:         &envDecoder.DBConf,
-			DOClientID:     envDecoder.DOClientID,
-			DOClientSecret: envDecoder.DOClientSecret,
-			DOScopes:       []string{"read", "write"},
-			ServerURL:      envDecoder.ServerURL,
+			DBConf:             &envDecoder.DBConf,
+			DOClientID:         envDecoder.DOClientID,
+			DOClientSecret:     envDecoder.DOClientSecret,
+			DOScopes:           []string{"read", "write"},
+			ServerURL:          envDecoder.ServerURL,
+			AWSAccessKeyID:     envDecoder.AWSAccessKeyID,
+			AWSSecretAccessKey: envDecoder.AWSSecretAccessKey,
+			AWSRegion:          envDecoder.AWSRegion,
+			S3BucketName:       envDecoder.S3BucketName,
+			EncryptionKey:      envDecoder.EncryptionKey,
 		})
 
 		if err != nil {
-			log.Default().Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
+			log.Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
 		}
 
 		return newJob