Quellcode durchsuchen

graceful shutdown of server

Mohammed Nafees vor 3 Jahren
Ursprung
Commit
cd5ea41354

+ 49 - 17
internal/worker/dispatcher.go

@@ -1,10 +1,17 @@
 package worker
 
-import "context"
+import (
+	"fmt"
+	"log"
+
+	"github.com/google/uuid"
+)
 
 type Dispatcher struct {
-	ctx        context.Context
 	maxWorkers int
+	exitChan   chan bool
+	workers    []*Worker
+
 	WorkerPool chan chan Job
 }
 
@@ -12,32 +19,57 @@ func NewDispatcher(maxWorkers int) *Dispatcher {
 	pool := make(chan chan Job, maxWorkers)
 	return &Dispatcher{
 		maxWorkers: maxWorkers,
+		exitChan:   make(chan bool),
+		workers:    make([]*Worker, maxWorkers),
+
 		WorkerPool: pool,
 	}
 }
 
-func (d *Dispatcher) Run(ctx context.Context, jobQueue chan Job) {
+func (d *Dispatcher) Run(jobQueue chan Job) error {
 	for i := 0; i < d.maxWorkers; i += 1 {
-		worker := NewWorker(ctx, d.WorkerPool)
+		uuid, err := uuid.NewUUID()
+
+		if err != nil {
+			return fmt.Errorf("error creating UUID for worker: %w", err)
+		}
+
+		worker := NewWorker(uuid, d.WorkerPool)
+		d.workers = append(d.workers, worker)
+
+		log.Default().Printf("starting worker with UUID: %v", uuid)
+
 		worker.Start()
 	}
 
-	d.ctx = ctx
+	d.dispatch(jobQueue)
+
+	return nil
+}
 
-	go d.dispatch(jobQueue)
+func (d *Dispatcher) Exit() {
+	d.exitChan <- true
 }
 
 func (d *Dispatcher) dispatch(jobQueue chan Job) {
-	for {
-		select {
-		case job := <-jobQueue:
-			go func(job Job) {
-				jobChannel := <-d.WorkerPool
-
-				jobChannel <- job
-			}(job)
-		case <-d.ctx.Done():
-			return
+	go func(workers []*Worker) {
+		for {
+			select {
+			case job := <-jobQueue:
+				go func(job Job) {
+					jobChannel := <-d.WorkerPool
+
+					jobChannel <- job
+				}(job)
+			case <-d.exitChan:
+				for _, w := range workers {
+					w.Stop()
+				}
+
+				fmt.Println("exiting dispatcher")
+
+				return
+			}
 		}
-	}
+	}(d.workers)
 }

+ 21 - 7
internal/worker/worker.go

@@ -1,30 +1,38 @@
 package worker
 
 import (
-	"context"
 	"log"
+	"time"
+
+	"github.com/google/uuid"
 )
 
 type Job interface {
 	ID() string
+	EnqueueTime() time.Time
 	Run() error
+	SetData([]byte)
 }
 
 type Worker struct {
+	exitChan chan bool
+	uuid     uuid.UUID
+
 	WorkerPool chan chan Job
 	JobChannel chan Job
-	ctx        context.Context
 }
 
-func NewWorker(ctx context.Context, workerPool chan chan Job) Worker {
-	return Worker{
+func NewWorker(uuid uuid.UUID, workerPool chan chan Job) *Worker {
+	return &Worker{
+		exitChan: make(chan bool),
+		uuid:     uuid,
+
 		WorkerPool: workerPool,
 		JobChannel: make(chan Job),
-		ctx:        ctx,
 	}
 }
 
-func (w Worker) Start() {
+func (w *Worker) Start() {
 	go func() {
 		for {
 			w.WorkerPool <- w.JobChannel
@@ -34,9 +42,15 @@ func (w Worker) Start() {
 				if err := job.Run(); err != nil {
 					log.Default().Printf("error running job %s: %s", job.ID(), err.Error())
 				}
-			case <-w.ctx.Done():
+			case <-w.exitChan:
+				log.Default().Printf("quitting worker with UUID: %v", w.uuid)
+
 				return
 			}
 		}
 	}()
 }
+
+func (w *Worker) Stop() {
+	w.exitChan <- true
+}

+ 28 - 0
workers/doc.go

@@ -0,0 +1,28 @@
+/*
+
+                            === Porter Worker Pool and Job Queue System ===
+
+This software is intended to be deployed alongside the main Porter server and dashboard and act as a background
+worker pool for certain jobs that the Porter server should be running as separate processes / goroutines periodically
+or at-will, depending on the task at hand.
+
+TERMINOLOGIES
+
+  - The terms `worker pool`, `pool`, `Go application` are interchangably used to denote this application.
+  - Jobs should have their unique string identifiers, denoted as IDs for short.
+
+ARCHITECTURE
+
+  - The worker pool is a Go application that takes in environment variables `MAX_WORKERS` and `MAX_QUEUE` to
+    denote the maximum number of workers and maximum number of jobs in the queue, respectively.
+  - The worker pool has specific jobs that it can execute, written separately with their own logic flow.
+  - The individual jobs need to have a unique string identifier.
+  - The jobs should be registered at startup time with their respective unique identifiers for the worker pool
+    to correctly relay execution information to the correct job.
+  - The worker pool has an exposed HTTP POST endpoint to enqueue jobs with their IDs. Depending on the kind of job,
+    a job can expect to receive a body of JSON data in the HTTP request.
+  - By exposing an HTTP endpoint, the worker pool can be called to enqueue jobs using crontab and other sources.
+
+*/
+
+package main

+ 0 - 11
workers/jobs/helm_revision_tracker.go

@@ -1,11 +0,0 @@
-package jobs
-
-type HelmReleaseTracker struct{}
-
-func (t *HelmReleaseTracker) ID() string {
-	return "helm-release-tracker"
-}
-
-func (t *HelmReleaseTracker) Run() error {
-	return nil
-}

+ 27 - 0
workers/jobs/helm_revisions_count_tracker.go

@@ -0,0 +1,27 @@
+package jobs
+
+import "time"
+
+type helmRevisionsCountTracker struct {
+	enqueueTime time.Time
+}
+
+func NewHelmRevisionsCountTracker(enqueueTime time.Time) *helmRevisionsCountTracker {
+	return &helmRevisionsCountTracker{
+		enqueueTime: enqueueTime,
+	}
+}
+
+func (t *helmRevisionsCountTracker) ID() string {
+	return "helm-revisions-count-tracker"
+}
+
+func (t *helmRevisionsCountTracker) EnqueueTime() time.Time {
+	return t.enqueueTime
+}
+
+func (t *helmRevisionsCountTracker) Run() error {
+	return nil
+}
+
+func (t *helmRevisionsCountTracker) SetData([]byte) {}

+ 70 - 8
workers/main.go

@@ -5,7 +5,10 @@ import (
 	"log"
 	"net/http"
 	"os"
+	"os/signal"
 	"strconv"
+	"syscall"
+	"time"
 
 	"github.com/go-chi/chi"
 	"github.com/go-chi/chi/middleware"
@@ -16,26 +19,83 @@ import (
 var (
 	MaxWorkers = os.Getenv("MAX_WORKERS")
 	MaxQueue   = os.Getenv("MAX_QUEUE")
+
+	jobQueue chan worker.Job
 )
 
 func main() {
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
 	workerCount, err := strconv.Atoi(MaxWorkers)
 	if err != nil {
 		log.Default().Fatalln("invalid MAX_WORKERS value")
 	}
 
+	log.Default().Printf("setting max worker count to: %d\n", workerCount)
+
 	queueCount, err := strconv.Atoi(MaxQueue)
 	if err != nil {
 		log.Default().Fatalln("invalid MAX_QUEUE value")
 	}
 
-	jobQueue := make(chan worker.Job, queueCount)
+	log.Default().Printf("setting max job queue count to: %d\n", queueCount)
 
+	jobQueue = make(chan worker.Job, queueCount)
 	d := worker.NewDispatcher(workerCount)
-	d.Run(ctx, jobQueue)
+
+	log.Default().Println("starting worker dispatcher")
+
+	err = d.Run(jobQueue)
+
+	if err != nil {
+		log.Default().Fatalln(err)
+	}
+
+	server := &http.Server{Addr: ":3000", Handler: httpService()}
+
+	serverCtx, serverStopCtx := context.WithCancel(context.Background())
+
+	sig := make(chan os.Signal, 1)
+	signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+	go func() {
+		<-sig
+
+		log.Default().Println("shutting down server")
+
+		shutdownCtx, shutdownCtxCancel := context.WithTimeout(serverCtx, 30*time.Second)
+		defer shutdownCtxCancel()
+
+		go func() {
+			<-shutdownCtx.Done()
+			if shutdownCtx.Err() == context.DeadlineExceeded {
+				log.Fatal("graceful shutdown timed out.. forcing exit.")
+			}
+		}()
+
+		err = server.Shutdown(shutdownCtx)
+
+		if err != nil {
+			log.Fatalln(err)
+		}
+
+		log.Default().Println("server shutdown completed")
+
+		serverStopCtx()
+	}()
+
+	log.Default().Println("starting HTTP server at :3000")
+
+	err = server.ListenAndServe()
+	if err != nil && err != http.ErrServerClosed {
+		log.Default().Fatalf("error starting HTTP server: %v", err)
+	}
+
+	// Wait for server context to be stopped
+	<-serverCtx.Done()
+
+	d.Exit()
+}
+
+func httpService() http.Handler {
+	log.Default().Println("setting up HTTP router and adding middleware")
 
 	r := chi.NewRouter()
 	r.Use(middleware.Logger)
@@ -43,6 +103,8 @@ func main() {
 	r.Use(middleware.Heartbeat("/ping"))
 	r.Use(middleware.AllowContentType("application/json"))
 
+	log.Default().Println("setting up HTTP POST endpoint to enqueue jobs")
+
 	r.Post("/enqueue/{id}", func(w http.ResponseWriter, r *http.Request) {
 		job := getJob(chi.URLParam(r, "id"))
 
@@ -55,12 +117,12 @@ func main() {
 		w.WriteHeader(http.StatusCreated)
 	})
 
-	http.ListenAndServe(":3000", r)
+	return r
 }
 
 func getJob(id string) worker.Job {
-	if id == "helm-release-tracker" {
-		return &jobs.HelmReleaseTracker{}
+	if id == "helm-revisions-count-tracker" {
+		return jobs.NewHelmRevisionsCountTracker(time.Now().UTC())
 	}
 
 	return nil