Mohammed Nafees před 3 roky
rodič
revize
019e013740

+ 11 - 5
internal/worker/dispatcher.go

@@ -7,6 +7,8 @@ import (
 	"github.com/google/uuid"
 )
 
+// Dispatcher is responsible to maintain a global worker pool
+// and to dispatch jobs to the underlying workers, in random order
 type Dispatcher struct {
 	maxWorkers int
 	exitChan   chan bool
@@ -15,6 +17,8 @@ type Dispatcher struct {
 	WorkerPool chan chan Job
 }
 
+// NewDispatcher creates a new instance of Dispatcher with
+// the given number of workers hat should be in the worker pool
 func NewDispatcher(maxWorkers int) *Dispatcher {
 	pool := make(chan chan Job, maxWorkers)
 	return &Dispatcher{
@@ -26,6 +30,8 @@ func NewDispatcher(maxWorkers int) *Dispatcher {
 	}
 }
 
+// Run creates workers in the worker pool with the given
+// job queue and starts the workers
 func (d *Dispatcher) Run(jobQueue chan Job) error {
 	for i := 0; i < d.maxWorkers; i += 1 {
 		uuid, err := uuid.NewUUID()
@@ -47,6 +53,7 @@ func (d *Dispatcher) Run(jobQueue chan Job) error {
 	return nil
 }
 
+// Exit instructs the dispatcher to quit processing any more jobs
 func (d *Dispatcher) Exit() {
 	d.exitChan <- true
 }
@@ -56,11 +63,10 @@ func (d *Dispatcher) dispatch(jobQueue chan Job) {
 		for {
 			select {
 			case job := <-jobQueue:
-				go func(job Job) {
-					jobChannel := <-d.WorkerPool
-
-					jobChannel <- job
-				}(job)
+				go func() {
+					workerJobChan := <-d.WorkerPool
+					workerJobChan <- job
+				}()
 			case <-d.exitChan:
 				for _, w := range workers {
 					w.Stop()

+ 17 - 0
internal/worker/worker.go

@@ -7,13 +7,23 @@ import (
 	"github.com/google/uuid"
 )
 
+// Job is an interface which should be implemented by an individual
+// worker process in order to be enqueued in the worker pool
 type Job interface {
+	// The unique string ID of a job
 	ID() string
+
+	// The time in UTC when a job was enqueued to the worker pool queue
 	EnqueueTime() time.Time
+
+	// The main logic and control of a job
 	Run() error
+
+	// To set external data if a job needs it
 	SetData([]byte)
 }
 
+// Worker handles a single job or worker process
 type Worker struct {
 	exitChan chan bool
 	uuid     uuid.UUID
@@ -22,6 +32,8 @@ type Worker struct {
 	JobChannel chan Job
 }
 
+// NewWorker creates a new instance of Worker with the given
+// RFC 4122 UUID and a global worker pool
 func NewWorker(uuid uuid.UUID, workerPool chan chan Job) *Worker {
 	return &Worker{
 		exitChan: make(chan bool),
@@ -32,6 +44,8 @@ func NewWorker(uuid uuid.UUID, workerPool chan chan Job) *Worker {
 	}
 }
 
+// Start spawns a goroutine to add itself to the global worker pool
+// and listens for incoming jobs as they come, in random order
 func (w *Worker) Start() {
 	go func() {
 		for {
@@ -39,6 +53,8 @@ func (w *Worker) Start() {
 
 			select {
 			case job := <-w.JobChannel:
+				log.Printf("attempting to run job ID '%s' via worker '%s'", job.ID(), w.uuid.String())
+
 				if err := job.Run(); err != nil {
 					log.Printf("error running job %s: %s", job.ID(), err.Error())
 				}
@@ -51,6 +67,7 @@ func (w *Worker) Start() {
 	}()
 }
 
+// Stop instructs the worker to stop listening for incoming jobs
 func (w *Worker) Stop() {
 	w.exitChan <- true
 }

+ 4 - 2
workers/Dockerfile

@@ -1,4 +1,6 @@
-# Environment to build the worker pool binary
+# This Dockerfile is used for building the worker pool binary itself
+
+# Buildtime environment
 # -------------------------------------------
 FROM golang:1.18-alpine3.16 as build
 WORKDIR /app
@@ -16,7 +18,7 @@ COPY /workers ./workers
 
 RUN go build -ldflags '-w -s' -tags ee -a -o ./bin/worker-pool ./workers
 
-# Deployment environment
+# Runtime environment
 # ----------------------
 FROM alpine:3.16
 WORKDIR /app

+ 5 - 0
workers/Dockerfile.job

@@ -1,3 +1,8 @@
+# This Dockerfile is used for building a base image with curl installed
+# and is intended to be used via a Porter job to call the worker pool
+# HTTP endpoints in order to enqueue different kinds of jobs
+
+# Runtime environment
 FROM alpine:3.16
 WORKDIR /app
 

+ 18 - 0
workers/jobs/helm_revisions_count_tracker.go

@@ -1,5 +1,22 @@
 //go:build ee
 
+/*
+
+                            === Helm Release Revisions Tracker Job ===
+
+This job keeps a track of helm releases and their revisions and deletes older revisions once they are
+backed up to an S3 bucket.
+
+  - The job looks for clusters which have the `monitor_helm_releases` set to true.
+  - The clusters are then checked for old helm release revisions.
+  - In a cluster, list of all namespaces is fetched.
+  - For every namespace, the list of releases is fetched.
+  - For every release, its revision history is fetched.
+  - If the number of revisions exceeds 100, then we intend to only keep the most recent 100 revisions.
+  - For this, the older revisions are first backed up to an S3 bucket and then deleted.
+
+*/
+
 package jobs
 
 import (
@@ -45,6 +62,7 @@ type helmRevisionsCountTracker struct {
 	encryptionKey      *[32]byte
 }
 
+// HelmRevisionsCountTrackerOpts holds the options required to run this job
 type HelmRevisionsCountTrackerOpts struct {
 	DBConf             *env.DBConf
 	DOClientID         string

+ 2 - 0
workers/main.go

@@ -24,6 +24,7 @@ var (
 	envDecoder = EnvConf{}
 )
 
+// EnvConf holds the environment variables for this binary
 type EnvConf struct {
 	ServerURL          string `env:"SERVER_URL,default=http://localhost:8080"`
 	DOClientID         string `env:"DO_CLIENT_ID"`
@@ -145,6 +146,7 @@ func getJob(id string) worker.Job {
 
 		if err != nil {
 			log.Printf("error creating job with ID: helm-revisions-count-tracker. Error: %v", err)
+			return nil
 		}
 
 		return newJob