Просмотр исходного кода

Merge pull request #2240 from porter-dev/nafees/workers

[POR-638] Helm release revisions monitoring using a worker pool and job queue
abelanger5 3 лет назад
Родитель
Сommit
ec3d4c049d

+ 84 - 60
.github/workflows/dev.yaml

@@ -22,7 +22,7 @@ jobs:
       - name: Install kubectl
         uses: azure/setup-kubectl@v2.0
         with:
-          version: 'v1.19.15'
+          version: "v1.19.15"
       - name: Log in to gcloud CLI
         run: gcloud auth configure-docker
       - name: Checkout
@@ -50,7 +50,7 @@ jobs:
       - name: Deploy to cluster
         run: |
           aws eks --region ${{ secrets.AWS_REGION }} update-kubeconfig --name dev
-            
+
           kubectl rollout restart deployment/porter
   deploy-provisioner:
     runs-on: ubuntu-latest
@@ -70,7 +70,7 @@ jobs:
       - name: Install kubectl
         uses: azure/setup-kubectl@v2.0
         with:
-          version: 'v1.19.15'
+          version: "v1.19.15"
       - name: Log in to gcloud CLI
         run: gcloud auth configure-docker
       - name: Checkout
@@ -84,67 +84,91 @@ jobs:
       - name: Deploy to cluster
         run: |
           aws eks --region ${{ secrets.AWS_REGION }} update-kubeconfig --name dev
-            
+
           kubectl rollout restart deployment/provisioner
   build-push-ecr-server:
     runs-on: ubuntu-latest
     steps:
-    - name: Checkout code
-      uses: actions/checkout@v2.3.4
-    - name: Set Github tag
-      id: vars
-      run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
-    - name: Configure AWS credentials
-      uses: aws-actions/configure-aws-credentials@v1
-      with:
-        aws-access-key-id: ${{ secrets.ECR_DEV_AWS_ACCESS_KEY_ID }}
-        aws-secret-access-key: ${{ secrets.ECR_DEV_AWS_ACCESS_SECRET_KEY }}
-        aws-region: us-east-2
-    - name: Login to ECR
-      id: login-ecr
-      run: |
-        aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin 801172602658.dkr.ecr.us-east-2.amazonaws.com
-    - name: Write Dashboard Environment Variables
-      run: |
-        cat >./dashboard/.env <<EOL
-        NODE_ENV=development
-        API_SERVER=dashboard.dev.getporter.dev
-        DISCORD_KEY=${{secrets.DISCORD_KEY}}
-        DISCORD_CID=${{secrets.DISCORD_CID}}
-        FEEDBACK_ENDPOINT=${{secrets.FEEDBACK_ENDPOINT}}
-        APPLICATION_CHART_REPO_URL=https://charts.dev.getporter.dev
-        ADDON_CHART_REPO_URL=https://chart-addons.dev.getporter.dev
-        ENABLE_SENTRY=true
-        SENTRY_DSN=${{secrets.SENTRY_DSN}}
-        SENTRY_ENV=frontend-development
-        EOL
-    - name: Build
-      run: |
-        DOCKER_BUILDKIT=1 docker build . -t 801172602658.dkr.ecr.us-east-2.amazonaws.com/porter:${{ steps.vars.outputs.sha_short }} -f ./ee/docker/ee.Dockerfile
-    - name: Push to ECR
-      run: |
-        docker push 801172602658.dkr.ecr.us-east-2.amazonaws.com/porter:${{ steps.vars.outputs.sha_short }}
+      - name: Checkout code
+        uses: actions/checkout@v2.3.4
+      - name: Set Github tag
+        id: vars
+        run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
+      - name: Configure AWS credentials
+        uses: aws-actions/configure-aws-credentials@v1
+        with:
+          aws-access-key-id: ${{ secrets.ECR_DEV_AWS_ACCESS_KEY_ID }}
+          aws-secret-access-key: ${{ secrets.ECR_DEV_AWS_ACCESS_SECRET_KEY }}
+          aws-region: us-east-2
+      - name: Login to ECR
+        id: login-ecr
+        run: |
+          aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin 801172602658.dkr.ecr.us-east-2.amazonaws.com
+      - name: Write Dashboard Environment Variables
+        run: |
+          cat >./dashboard/.env <<EOL
+          NODE_ENV=development
+          API_SERVER=dashboard.dev.getporter.dev
+          DISCORD_KEY=${{secrets.DISCORD_KEY}}
+          DISCORD_CID=${{secrets.DISCORD_CID}}
+          FEEDBACK_ENDPOINT=${{secrets.FEEDBACK_ENDPOINT}}
+          APPLICATION_CHART_REPO_URL=https://charts.dev.getporter.dev
+          ADDON_CHART_REPO_URL=https://chart-addons.dev.getporter.dev
+          ENABLE_SENTRY=true
+          SENTRY_DSN=${{secrets.SENTRY_DSN}}
+          SENTRY_ENV=frontend-development
+          EOL
+      - name: Build
+        run: |
+          DOCKER_BUILDKIT=1 docker build . -t 801172602658.dkr.ecr.us-east-2.amazonaws.com/porter:${{ steps.vars.outputs.sha_short }} -f ./ee/docker/ee.Dockerfile
+      - name: Push to ECR
+        run: |
+          docker push 801172602658.dkr.ecr.us-east-2.amazonaws.com/porter:${{ steps.vars.outputs.sha_short }}
   build-push-ecr-provisioner:
     runs-on: ubuntu-latest
     steps:
-    - name: Checkout code
-      uses: actions/checkout@v2.3.4
-    - name: Set Github tag
-      id: vars
-      run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
-    - name: Configure AWS credentials
-      uses: aws-actions/configure-aws-credentials@v1
-      with:
-        aws-access-key-id: ${{ secrets.ECR_DEV_AWS_ACCESS_KEY_ID }}
-        aws-secret-access-key: ${{ secrets.ECR_DEV_AWS_ACCESS_SECRET_KEY }}
-        aws-region: us-east-2
-    - name: Login to ECR
-      id: login-ecr
-      run: |
-        aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin 801172602658.dkr.ecr.us-east-2.amazonaws.com
-    - name: Build
-      run: |
-        DOCKER_BUILDKIT=1 docker build . -t 801172602658.dkr.ecr.us-east-2.amazonaws.com/provisioner-service:${{ steps.vars.outputs.sha_short }} -f ./ee/docker/provisioner.Dockerfile
-    - name: Push to ECR
-      run: |
-        docker push 801172602658.dkr.ecr.us-east-2.amazonaws.com/provisioner-service:${{ steps.vars.outputs.sha_short }}
+      - name: Checkout code
+        uses: actions/checkout@v2.3.4
+      - name: Set Github tag
+        id: vars
+        run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
+      - name: Configure AWS credentials
+        uses: aws-actions/configure-aws-credentials@v1
+        with:
+          aws-access-key-id: ${{ secrets.ECR_DEV_AWS_ACCESS_KEY_ID }}
+          aws-secret-access-key: ${{ secrets.ECR_DEV_AWS_ACCESS_SECRET_KEY }}
+          aws-region: us-east-2
+      - name: Login to ECR
+        id: login-ecr
+        run: |
+          aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin 801172602658.dkr.ecr.us-east-2.amazonaws.com
+      - name: Build
+        run: |
+          DOCKER_BUILDKIT=1 docker build . -t 801172602658.dkr.ecr.us-east-2.amazonaws.com/provisioner-service:${{ steps.vars.outputs.sha_short }} -f ./ee/docker/provisioner.Dockerfile
+      - name: Push to ECR
+        run: |
+          docker push 801172602658.dkr.ecr.us-east-2.amazonaws.com/provisioner-service:${{ steps.vars.outputs.sha_short }}
+  build-push-worker-pool:
+    runs-on: ubuntu-latest
+    steps:
+      - name: Checkout code
+        uses: actions/checkout@v2.3.4
+      - name: Set Github tag
+        id: vars
+        run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
+      - name: Configure AWS credentials
+        uses: aws-actions/configure-aws-credentials@v1
+        with:
+          aws-access-key-id: ${{ secrets.ECR_DEV_AWS_ACCESS_KEY_ID }}
+          aws-secret-access-key: ${{ secrets.ECR_DEV_AWS_ACCESS_SECRET_KEY }}
+          aws-region: us-east-2
+      - name: Login to ECR
+        id: login-ecr
+        run: |
+          aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin 801172602658.dkr.ecr.us-east-2.amazonaws.com
+      - name: Build
+        run: |
+          DOCKER_BUILDKIT=1 docker build . -t 801172602658.dkr.ecr.us-east-2.amazonaws.com/worker-pool:${{ steps.vars.outputs.sha_short }} -f ./workers/Dockerfile
+      - name: Push to ECR
+        run: |
+          docker push 801172602658.dkr.ecr.us-east-2.amazonaws.com/worker-pool:${{ steps.vars.outputs.sha_short }}

+ 28 - 0
.github/workflows/prerelease.yaml

@@ -78,6 +78,34 @@ jobs:
       - name: Push to ECR public
         run: |
           docker push public.ecr.aws/o1j4x7p4/provisioner-service:${{steps.tag_name.outputs.tag}}
+  build-push-worker-pool:
+    runs-on: ubuntu-latest
+    steps:
+      - name: Get tag name
+        id: tag_name
+        run: |
+          tag=${GITHUB_TAG/refs\/tags\//}
+          echo ::set-output name=tag::$tag
+        env:
+          GITHUB_TAG: ${{ github.ref }}
+      - name: Checkout
+        uses: actions/checkout@v2.3.4
+      - name: Configure AWS credentials
+        uses: aws-actions/configure-aws-credentials@v1
+        with:
+          aws-access-key-id: ${{ secrets.ECR_AWS_ACCESS_KEY_ID }}
+          aws-secret-access-key: ${{ secrets.ECR_AWS_SECRET_ACCESS_KEY }}
+          aws-region: us-east-2
+      - name: Login to ECR public
+        id: login-ecr
+        run: |
+          aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws/o1j4x7p4
+      - name: Build
+        run: |
+          DOCKER_BUILDKIT=1 docker build . -t public.ecr.aws/o1j4x7p4/worker-pool:${{steps.tag_name.outputs.tag}} -f ./workers/Dockerfile
+      - name: Push to ECR public
+        run: |
+          docker push public.ecr.aws/o1j4x7p4/worker-pool:${{steps.tag_name.outputs.tag}}
   build-linux:
     name: Build Linux binaries
     runs-on: ubuntu-latest

+ 1 - 0
go.mod

@@ -112,6 +112,7 @@ require (
 	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 	github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
 	github.com/xanzy/go-gitlab v0.68.0 // indirect
+	go.uber.org/goleak v1.1.12 // indirect
 )
 
 require (

+ 1 - 0
go.sum

@@ -2099,6 +2099,7 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
+go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
 go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
 go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=

+ 10 - 0
internal/helm/agent.go

@@ -141,6 +141,16 @@ func (a *Agent) GetRelease(
 	return release, err
 }
 
+// DeleteReleaseRevision deletes a specific revision of a release
+func (a *Agent) DeleteReleaseRevision(
+	name string,
+	version int,
+) error {
+	_, err := a.ActionConfig.Releases.Delete(name, version)
+
+	return err
+}
+
 // GetReleaseHistory returns a list of charts for a specific release
 func (a *Agent) GetReleaseHistory(
 	name string,

+ 3 - 0
internal/models/cluster.go

@@ -75,6 +75,9 @@ type Cluster struct {
 
 	// CertificateAuthorityData for the cluster, encrypted at rest
 	CertificateAuthorityData []byte `json:"certificate-authority-data,omitempty"`
+
+	// MonitorHelmReleases to trim down the number of revisions per release
+	MonitorHelmReleases bool
 }
 
 // ToProjectType generates an external types.Project to be shared over REST

+ 76 - 0
internal/worker/dispatcher.go

@@ -0,0 +1,76 @@
+package worker
+
+import (
+	"log"
+
+	"github.com/google/uuid"
+)
+
+// Dispatcher is responsible to maintain a global worker pool
+// and to dispatch jobs to the underlying workers, in random order
+type Dispatcher struct {
+	maxWorkers int
+	exitChan   chan bool
+
+	WorkerPool chan chan Job
+}
+
+// NewDispatcher creates a new instance of Dispatcher with
+// the given number of workers that should be in the worker pool
+func NewDispatcher(maxWorkers int) *Dispatcher {
+	pool := make(chan chan Job, maxWorkers)
+	return &Dispatcher{
+		maxWorkers: maxWorkers,
+		exitChan:   make(chan bool),
+
+		WorkerPool: pool,
+	}
+}
+
+// Run creates workers in the worker pool with the given
+// job queue and starts the workers
+func (d *Dispatcher) Run(jobQueue chan Job) error {
+	go func() {
+		var workers []*Worker
+
+		for i := 0; i < d.maxWorkers; i += 1 {
+			uuid, err := uuid.NewUUID()
+
+			if err != nil {
+				// FIXME: should let the parent thread know of this error
+				log.Printf("error creating UUID for worker: %v", err)
+				return
+			}
+
+			worker := NewWorker(uuid, d.WorkerPool)
+			workers = append(workers, worker)
+
+			log.Printf("starting worker with UUID: %v", uuid)
+
+			worker.Start()
+		}
+
+		for {
+			select {
+			case job := <-jobQueue:
+				go func() {
+					workerJobChan := <-d.WorkerPool
+					workerJobChan <- job
+				}()
+			case <-d.exitChan:
+				for _, w := range workers {
+					w.Stop()
+				}
+
+				return
+			}
+		}
+	}()
+
+	return nil
+}
+
+// Exit instructs the dispatcher to quit processing any more jobs
+func (d *Dispatcher) Exit() {
+	d.exitChan <- true
+}

+ 22 - 0
internal/worker/dispatcher_test.go

@@ -0,0 +1,22 @@
+package worker
+
+import (
+	"testing"
+
+	"go.uber.org/goleak"
+)
+
+func TestDispatcher(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
+	jobChan := make(chan Job)
+
+	d := NewDispatcher(10)
+	err := d.Run(jobChan)
+
+	if err != nil {
+		panic(err)
+	}
+
+	d.Exit()
+}

+ 73 - 0
internal/worker/worker.go

@@ -0,0 +1,73 @@
+package worker
+
+import (
+	"log"
+	"time"
+
+	"github.com/google/uuid"
+)
+
+// Job is an interface which should be implemented by an individual
+// worker process in order to be enqueued in the worker pool
+type Job interface {
+	// The unique string ID of a job
+	ID() string
+
+	// The time in UTC when a job was enqueued to the worker pool queue
+	EnqueueTime() time.Time
+
+	// The main logic and control of a job
+	Run() error
+
+	// To set external data if a job needs it
+	SetData([]byte)
+}
+
+// Worker handles a single job or worker process
+type Worker struct {
+	exitChan chan bool
+	uuid     uuid.UUID
+
+	WorkerPool chan chan Job
+	JobChannel chan Job
+}
+
+// NewWorker creates a new instance of Worker with the given
+// RFC 4122 UUID and a global worker pool
+func NewWorker(uuid uuid.UUID, workerPool chan chan Job) *Worker {
+	return &Worker{
+		exitChan: make(chan bool),
+		uuid:     uuid,
+
+		WorkerPool: workerPool,
+		JobChannel: make(chan Job),
+	}
+}
+
+// Start spawns a goroutine to add itself to the global worker pool
+// and listens for incoming jobs as they come, in random order
+func (w *Worker) Start() {
+	go func() {
+		for {
+			w.WorkerPool <- w.JobChannel
+
+			select {
+			case job := <-w.JobChannel:
+				log.Printf("attempting to run job ID '%s' via worker '%s'", job.ID(), w.uuid.String())
+
+				if err := job.Run(); err != nil {
+					log.Printf("error running job %s: %s", job.ID(), err.Error())
+				}
+			case <-w.exitChan:
+				log.Printf("quitting worker with UUID: %v", w.uuid)
+
+				return
+			}
+		}
+	}()
+}
+
+// Stop instructs the worker to stop listening for incoming jobs
+func (w *Worker) Stop() {
+	w.exitChan <- true
+}

+ 25 - 0
internal/worker/worker_test.go

@@ -0,0 +1,25 @@
+package worker
+
+import (
+	"testing"
+
+	"github.com/google/uuid"
+	"go.uber.org/goleak"
+)
+
+func TestWorker(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
+	uuid, err := uuid.NewUUID()
+
+	if err != nil {
+		panic(err)
+	}
+
+	workerPool := make(chan chan Job, 10)
+
+	w := NewWorker(uuid, workerPool)
+
+	w.Start()
+	w.Stop()
+}

+ 20 - 0
provisioner/integrations/storage/s3/s3.go

@@ -78,6 +78,26 @@ func (s *S3StorageClient) WriteFile(infra *models.Infra, name string, fileBytes
 	return err
 }
 
+func (s *S3StorageClient) WriteFileWithKey(fileBytes []byte, shouldEncrypt bool, key string) error {
+	body := fileBytes
+	var err error
+	if shouldEncrypt {
+		body, err = encryption.Encrypt(fileBytes, s.encryptionKey)
+
+		if err != nil {
+			return err
+		}
+	}
+
+	_, err = s.client.PutObject(&s3.PutObjectInput{
+		Body:   aws.ReadSeekCloser(bytes.NewReader(body)),
+		Bucket: &s.bucket,
+		Key:    aws.String(key),
+	})
+
+	return err
+}
+
 func (s *S3StorageClient) ReadFile(infra *models.Infra, name string, shouldDecrypt bool) ([]byte, error) {
 	output, err := s.client.GetObject(&s3.GetObjectInput{
 		Bucket: &s.bucket,

+ 30 - 0
workers/Dockerfile

@@ -0,0 +1,30 @@
+# This Dockerfile is used for building the worker pool binary itself
+
+# Buildtime environment
+# -------------------------------------------
+FROM golang:1.18-alpine3.16 as build
+WORKDIR /app
+
+RUN apk update && apk add gcc binutils-gold musl-dev
+
+COPY go.mod .
+COPY go.sum .
+COPY /api ./api
+COPY /ee ./ee
+COPY /internal ./internal
+COPY /pkg ./pkg
+COPY /provisioner ./provisioner
+COPY /workers ./workers
+
+RUN go build -ldflags '-w -s' -tags ee -a -o ./bin/worker-pool ./workers
+
+# Runtime environment
+# ----------------------
+FROM alpine:3.16
+WORKDIR /app
+
+RUN apk update && apk add curl
+
+COPY --from=build /app/bin/worker-pool /usr/bin/
+
+ENTRYPOINT [ "worker-pool" ]

+ 28 - 0
workers/doc.go

@@ -0,0 +1,28 @@
+/*
+
+                            === Porter Worker Pool and Job Queue System ===
+
+This software is intended to be deployed alongside the main Porter server and dashboard and act as a background
+worker pool for certain jobs that the Porter server should be running as separate processes / goroutines periodically
+or at-will, depending on the task at hand.
+
+TERMINOLOGIES
+
+  - The terms `worker pool`, `pool`, `Go application` are interchangably used to denote this application.
+  - Jobs should have their unique string identifiers, denoted as IDs for short.
+
+ARCHITECTURE
+
+  - The worker pool is a Go application that takes in environment variables `MAX_WORKERS` and `MAX_QUEUE` to
+    denote the maximum number of workers and maximum number of jobs in the queue, respectively.
+  - The worker pool has specific jobs that it can execute, written separately with their own logic flow.
+  - The individual jobs need to have a unique string identifier.
+  - The jobs should be registered at startup time with their respective unique identifiers for the worker pool
+    to correctly relay execution information to the correct job.
+  - The worker pool has an exposed HTTP POST endpoint to enqueue jobs with their IDs. Depending on the kind of job,
+    a job can expect to receive a body of JSON data in the HTTP request.
+  - By exposing an HTTP endpoint, the worker pool can be called to enqueue jobs using crontab and other sources.
+
+*/
+
+package main

+ 302 - 0
workers/jobs/helm_revisions_count_tracker.go

@@ -0,0 +1,302 @@
+//go:build ee
+
+/*
+
+                            === Helm Release Revisions Tracker Job ===
+
+This job keeps a track of helm releases and their revisions and deletes older revisions once they are
+backed up to an S3 bucket.
+
+  - The job looks for clusters which have the `monitor_helm_releases` set to true.
+  - The clusters are then checked for old helm release revisions.
+  - In a cluster, list of all namespaces is fetched.
+  - For every namespace, the list of releases is fetched.
+  - For every release, its revision history is fetched.
+  - If the number of revisions exceeds 100, then we intend to only keep the most recent 100 revisions.
+  - For this, the older revisions are first backed up to an S3 bucket and then deleted.
+
+*/
+
+package jobs
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	"os"
+	"sync"
+	"time"
+
+	"github.com/porter-dev/porter/api/server/shared/config/env"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/pkg/logger"
+	"github.com/porter-dev/porter/provisioner/integrations/storage/s3"
+
+	"github.com/porter-dev/porter/ee/integrations/vault"
+	"github.com/porter-dev/porter/internal/adapter"
+	"github.com/porter-dev/porter/internal/helm"
+	"github.com/porter-dev/porter/internal/kubernetes"
+	"github.com/porter-dev/porter/internal/models"
+	"github.com/porter-dev/porter/internal/oauth"
+	"github.com/porter-dev/porter/internal/repository"
+	rcreds "github.com/porter-dev/porter/internal/repository/credentials"
+	rgorm "github.com/porter-dev/porter/internal/repository/gorm"
+	"golang.org/x/oauth2"
+	"gorm.io/gorm"
+	"helm.sh/helm/v3/pkg/releaseutil"
+)
+
+var stepSize int = 100
+
+type helmRevisionsCountTracker struct {
+	enqueueTime        time.Time
+	db                 *gorm.DB
+	repo               repository.Repository
+	doConf             *oauth2.Config
+	dbConf             *env.DBConf
+	credBackend        rcreds.CredentialStorage
+	awsAccessKeyID     string
+	awsSecretAccessKey string
+	awsRegion          string
+	s3BucketName       string
+	encryptionKey      *[32]byte
+}
+
+// HelmRevisionsCountTrackerOpts holds the options required to run this job
+type HelmRevisionsCountTrackerOpts struct {
+	DBConf             *env.DBConf
+	DOClientID         string
+	DOClientSecret     string
+	DOScopes           []string
+	ServerURL          string
+	AWSAccessKeyID     string
+	AWSSecretAccessKey string
+	AWSRegion          string
+	S3BucketName       string
+	EncryptionKey      string
+}
+
+func NewHelmRevisionsCountTracker(
+	enqueueTime time.Time,
+	opts *HelmRevisionsCountTrackerOpts,
+) (*helmRevisionsCountTracker, error) {
+	db, err := adapter.New(opts.DBConf)
+
+	if err != nil {
+		return nil, err
+	}
+
+	var credBackend rcreds.CredentialStorage
+
+	if opts.DBConf.VaultAPIKey != "" && opts.DBConf.VaultServerURL != "" && opts.DBConf.VaultPrefix != "" {
+		credBackend = vault.NewClient(
+			opts.DBConf.VaultServerURL,
+			opts.DBConf.VaultAPIKey,
+			opts.DBConf.VaultPrefix,
+		)
+	}
+
+	var key [32]byte
+
+	for i, b := range []byte(opts.DBConf.EncryptionKey) {
+		key[i] = b
+	}
+
+	repo := rgorm.NewRepository(db, &key, credBackend)
+
+	doConf := oauth.NewDigitalOceanClient(&oauth.Config{
+		ClientID:     opts.DOClientID,
+		ClientSecret: opts.DOClientSecret,
+		Scopes:       opts.DOScopes,
+		BaseURL:      opts.ServerURL,
+	})
+
+	var s3Key [32]byte
+
+	for i, b := range []byte(opts.EncryptionKey) {
+		s3Key[i] = b
+	}
+
+	return &helmRevisionsCountTracker{
+		enqueueTime, db, repo, doConf, opts.DBConf, credBackend,
+		opts.AWSAccessKeyID, opts.AWSSecretAccessKey, opts.AWSRegion,
+		opts.S3BucketName, &s3Key,
+	}, nil
+}
+
+func (t *helmRevisionsCountTracker) ID() string {
+	return "helm-revisions-count-tracker"
+}
+
+func (t *helmRevisionsCountTracker) EnqueueTime() time.Time {
+	return t.enqueueTime
+}
+
+func (t *helmRevisionsCountTracker) Run() error {
+	var count int64
+
+	if err := t.db.Model(&models.Cluster{}).Count(&count).Error; err != nil {
+		return err
+	}
+
+	var wg sync.WaitGroup
+
+	for i := 0; i < (int(count)/stepSize)+1; i++ {
+		var clusters []*models.Cluster
+
+		if err := t.db.Order("id asc").Offset(i*stepSize).Limit(stepSize).Find(&clusters, "monitor_helm_releases = ?", "1").
+			Error; err != nil {
+			return err
+		}
+
+		// go through each project
+		for _, cluster := range clusters {
+			wg.Add(1)
+
+			go func(projID, clusterID uint) {
+				defer wg.Done()
+
+				log.Printf("starting release revision monitoring for cluster with ID %d", cluster.ID)
+
+				cluster, err := t.repo.Cluster().ReadCluster(projID, clusterID)
+
+				if err != nil {
+					log.Printf("error reading cluster ID %d: %v. skipping cluster ...", clusterID, err)
+					return
+				}
+
+				// create s3 client to store revisions that need to be deleted
+				s3Client, err := s3.NewS3StorageClient(&s3.S3Options{
+					t.awsRegion, t.awsAccessKeyID, t.awsSecretAccessKey, t.s3BucketName, t.encryptionKey,
+				})
+
+				if err != nil {
+					log.Printf("error creating S3 client for cluster ID %d: %v. skipping cluster ...", cluster.ID, err)
+					return
+				}
+
+				k8sAgent, err := kubernetes.GetAgentOutOfClusterConfig(&kubernetes.OutOfClusterConfig{
+					Cluster:                   cluster,
+					Repo:                      t.repo,
+					DigitalOceanOAuth:         t.doConf,
+					AllowInClusterConnections: false,
+				})
+
+				if err != nil {
+					log.Printf("error getting k8s agent for cluster ID %d: %v. skipping cluster ...", cluster.ID, err)
+					return
+				}
+
+				namespaces, err := k8sAgent.ListNamespaces()
+
+				if err != nil {
+					log.Printf("error fetching namespaces for cluster ID %d: %v. skipping cluster ...", cluster.ID, err)
+					return
+				}
+
+				log.Printf("fetched %d namespaces for cluster ID %d", len(namespaces.Items), cluster.ID)
+
+				for _, ns := range namespaces.Items {
+					agent, err := helm.GetAgentOutOfClusterConfig(&helm.Form{
+						Cluster:                   cluster,
+						Namespace:                 ns.Name,
+						Repo:                      t.repo,
+						DigitalOceanOAuth:         t.doConf,
+						AllowInClusterConnections: false,
+					}, logger.New(true, os.Stdout))
+
+					if err != nil {
+						log.Printf("error fetching helm client for namespace %s in cluster ID %d: %v. "+
+							"skipping namespace ...", ns.Name, cluster.ID, err)
+						continue
+					}
+
+					releases, err := agent.ListReleases(ns.GetName(), &types.ReleaseListFilter{
+						ByDate: true,
+						StatusFilter: []string{
+							"deployed",
+							"pending",
+							"pending-install",
+							"pending-upgrade",
+							"pending-rollback",
+							"failed",
+						},
+					})
+
+					if err != nil {
+						log.Printf("error fetching releases for namespace %s in cluster ID %d: %v. skipping namespace ...",
+							len(releases), ns.Name, cluster.ID, err)
+						continue
+					}
+
+					log.Printf("fetched %d releases for namespace %s in cluster ID %d", len(releases), ns.Name, cluster.ID)
+
+					for _, rel := range releases {
+						revisions, err := agent.GetReleaseHistory(rel.Name)
+
+						if err != nil {
+							log.Printf("error fetching release history for release %s in namespace %s of cluster ID %d: %v."+
+								" skipping release ...", rel.Name, ns.Name, cluster.ID, err)
+							continue
+						}
+
+						if len(revisions) <= 100 {
+							log.Printf("release %s of namespace %s in cluster ID %d has <= 100 revisions. "+
+								"skipping release...", rel.Name, ns.Name, cluster.ID)
+							continue
+						}
+
+						log.Printf("release %s of namespace %s in cluster ID %d has more than 100 revisions. attempting to "+
+							"delete the older ones.", rel.Name, ns.Name, cluster.ID)
+
+						// sort revisions from newest to oldest
+						releaseutil.Reverse(revisions, releaseutil.SortByRevision)
+
+						for i := 100; i < len(revisions); i += 1 {
+							rev := revisions[i]
+
+							// store the revision in the s3 bucket before deleting it
+							data, err := json.Marshal(rev)
+
+							if err != nil {
+								log.Printf("error marshalling revision for release %s, number %d: %v. skipping revision ...",
+									rev.Name, rev.Version, err)
+								continue
+							}
+
+							// write to the bucket with key - <project_id>/<cluster_id>/<namespace>/<release_name>/<revision_number>
+							err = s3Client.WriteFileWithKey(data, true, fmt.Sprintf("%d/%d/%s/%s/%d", cluster.ProjectID,
+								cluster.ID, rel.Namespace, rel.Name, rev.Version))
+
+							if err != nil {
+								log.Printf("error backing up revision for release %s, number %d: %v. skipping revision ...",
+									rev.Name, rev.Version, err)
+								continue
+							}
+
+							log.Printf("revision %d of release %s in namespace %s of cluster ID %d was successfully backed up.",
+								rev.Version, rel.Name, ns.Name, cluster.ID)
+
+							err = agent.DeleteReleaseRevision(rev.Name, rev.Version)
+
+							if err != nil {
+								log.Printf("error deleting revision %d of release %s in namespace %s of cluster ID %d: %v",
+									rev.Version, rel.Name, ns.Name, cluster.ID, err)
+								continue
+							}
+
+							log.Printf("revision %d of release %s in namespace %s of cluster ID %d was successfully deleted.",
+								rev.Version, rel.Name, ns.Name, cluster.ID)
+						}
+					}
+				}
+			}(cluster.ProjectID, cluster.ID)
+		}
+
+		wg.Wait()
+	}
+
+	return nil
+}
+
+func (t *helmRevisionsCountTracker) SetData([]byte) {}

+ 159 - 0
workers/main.go

@@ -0,0 +1,159 @@
+//go:build ee
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"net/http"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+
+	"github.com/go-chi/chi"
+	"github.com/go-chi/chi/middleware"
+	"github.com/joeshaw/envdecode"
+	"github.com/porter-dev/porter/api/server/shared/config/env"
+	"github.com/porter-dev/porter/internal/worker"
+	"github.com/porter-dev/porter/workers/jobs"
+)
+
+var (
+	jobQueue   chan worker.Job
+	envDecoder = EnvConf{}
+)
+
+// EnvConf holds the environment variables for this binary
+type EnvConf struct {
+	ServerURL          string `env:"SERVER_URL,default=http://localhost:8080"`
+	DOClientID         string `env:"DO_CLIENT_ID"`
+	DOClientSecret     string `env:"DO_CLIENT_SECRET"`
+	DBConf             env.DBConf
+	MaxWorkers         uint   `env:"MAX_WORKERS,default=10"`
+	MaxQueue           uint   `env:"MAX_QUEUE,default=100"`
+	AWSAccessKeyID     string `env:"AWS_ACCESS_KEY_ID"`
+	AWSSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
+	AWSRegion          string `env:"AWS_REGION"`
+	S3BucketName       string `env:"S3_BUCKET_NAME"`
+	EncryptionKey      string `env:"S3_ENCRYPTION_KEY"`
+
+	Port uint `env:"PORT,default=3000"`
+}
+
+func main() {
+	if err := envdecode.StrictDecode(&envDecoder); err != nil {
+		log.Fatalf("Failed to decode server conf: %v", err)
+	}
+
+	log.Printf("setting max worker count to: %d\n", envDecoder.MaxWorkers)
+	log.Printf("setting max job queue count to: %d\n", envDecoder.MaxQueue)
+
+	jobQueue = make(chan worker.Job, envDecoder.MaxQueue)
+	d := worker.NewDispatcher(int(envDecoder.MaxWorkers))
+
+	log.Println("starting worker dispatcher")
+
+	err := d.Run(jobQueue)
+
+	if err != nil {
+		log.Fatalln(err)
+	}
+
+	server := &http.Server{Addr: fmt.Sprintf(":%d", envDecoder.Port), Handler: httpService()}
+
+	serverCtx, serverStopCtx := context.WithCancel(context.Background())
+
+	sig := make(chan os.Signal, 1)
+	signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+	go func() {
+		<-sig
+
+		log.Println("shutting down server")
+
+		shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
+		defer shutdownCtxCancel()
+
+		go func() {
+			<-shutdownCtx.Done()
+			if shutdownCtx.Err() == context.DeadlineExceeded {
+				log.Fatal("graceful shutdown timed out.. forcing exit.")
+			}
+		}()
+
+		err = server.Shutdown(shutdownCtx)
+
+		if err != nil {
+			log.Fatalln(err)
+		}
+
+		log.Println("server shutdown completed")
+
+		serverStopCtx()
+	}()
+
+	log.Println("starting HTTP server at :3000")
+
+	err = server.ListenAndServe()
+	if err != nil && err != http.ErrServerClosed {
+		log.Fatalf("error starting HTTP server: %v", err)
+	}
+
+	// Wait for server context to be stopped
+	<-serverCtx.Done()
+
+	d.Exit()
+}
+
+func httpService() http.Handler {
+	log.Println("setting up HTTP router and adding middleware")
+
+	r := chi.NewRouter()
+	r.Use(middleware.Logger)
+	r.Use(middleware.Recoverer)
+	r.Use(middleware.Heartbeat("/ping"))
+	r.Use(middleware.AllowContentType("application/json"))
+
+	log.Println("setting up HTTP POST endpoint to enqueue jobs")
+
+	r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
+		job := getJob(chi.URLParam(r, "id"))
+
+		if job == nil {
+			w.WriteHeader(http.StatusNotFound)
+			return
+		}
+
+		jobQueue <- job
+		w.WriteHeader(http.StatusCreated)
+	})
+
+	return r
+}
+
+func getJob(id string) worker.Job {
+	if id == "helm-revisions-count-tracker" {
+		newJob, err := jobs.NewHelmRevisionsCountTracker(time.Now().UTC(), &jobs.HelmRevisionsCountTrackerOpts{
+			DBConf:             &envDecoder.DBConf,
+			DOClientID:         envDecoder.DOClientID,
+			DOClientSecret:     envDecoder.DOClientSecret,
+			DOScopes:           []string{"read", "write"},
+			ServerURL:          envDecoder.ServerURL,
+			AWSAccessKeyID:     envDecoder.AWSAccessKeyID,
+			AWSSecretAccessKey: envDecoder.AWSSecretAccessKey,
+			AWSRegion:          envDecoder.AWSRegion,
+			S3BucketName:       envDecoder.S3BucketName,
+			EncryptionKey:      envDecoder.EncryptionKey,
+		})
+
+		if err != nil {
+			log.Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
+			return nil
+		}
+
+		return newJob
+	}
+
+	return nil
+}