Jelajahi Sumber

basic worker pool and job queue

Mohammed Nafees 3 tahun lalu
induk
melakukan
fbd9b8da7c

+ 43 - 0
internal/worker/dispatcher.go

@@ -0,0 +1,43 @@
+package worker
+
+import "context"
+
+type Dispatcher struct {
+	ctx        context.Context
+	maxWorkers int
+	WorkerPool chan chan Job
+}
+
+func NewDispatcher(maxWorkers int) *Dispatcher {
+	pool := make(chan chan Job, maxWorkers)
+	return &Dispatcher{
+		maxWorkers: maxWorkers,
+		WorkerPool: pool,
+	}
+}
+
+func (d *Dispatcher) Run(ctx context.Context, jobQueue chan Job) {
+	for i := 0; i < d.maxWorkers; i += 1 {
+		worker := NewWorker(ctx, d.WorkerPool)
+		worker.Start()
+	}
+
+	d.ctx = ctx
+
+	go d.dispatch(jobQueue)
+}
+
+func (d *Dispatcher) dispatch(jobQueue chan Job) {
+	for {
+		select {
+		case job := <-jobQueue:
+			go func(job Job) {
+				jobChannel := <-d.WorkerPool
+
+				jobChannel <- job
+			}(job)
+		case <-d.ctx.Done():
+			return
+		}
+	}
+}

+ 44 - 0
internal/worker/worker.go

@@ -0,0 +1,44 @@
+package worker
+
+import (
+	"context"
+	"log"
+
+	"github.com/google/uuid"
+)
+
+type Job interface {
+	ID() uuid.UUID
+	Run() error
+}
+
+type Worker struct {
+	WorkerPool chan chan Job
+	JobChannel chan Job
+	ctx        context.Context
+}
+
+func NewWorker(ctx context.Context, workerPool chan chan Job) Worker {
+	return Worker{
+		WorkerPool: workerPool,
+		JobChannel: make(chan Job),
+		ctx:        ctx,
+	}
+}
+
+func (w Worker) Start() {
+	go func() {
+		for {
+			w.WorkerPool <- w.JobChannel
+
+			select {
+			case job := <-w.JobChannel:
+				if err := job.Run(); err != nil {
+					log.Default().Printf("error running job %s: %s", job.ID(), err.Error())
+				}
+			case <-w.ctx.Done():
+				return
+			}
+		}
+	}()
+}

+ 0 - 16
services/helm_release/helm_release_tracker.go

@@ -1,16 +0,0 @@
-package helmrelease
-
-import (
-	"github.com/porter-dev/porter/internal/repository"
-	"golang.org/x/oauth2"
-	"gorm.io/gorm"
-)
-
-type HelmReleaseTracker struct {
-	db               *gorm.DB
-	repo             repository.Repository
-	doConf           *oauth2.Config
-	whitelistedUsers map[uint]uint
-}
-
-const stepSize = 100

+ 40 - 0
workers/main.go

@@ -0,0 +1,40 @@
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"strconv"
+
+	"github.com/porter-dev/porter/internal/worker"
+)
+
+var (
+	MaxWorkers = os.Getenv("MAX_WORKERS")
+	MaxQueue   = os.Getenv("MAX_QUEUE")
+)
+
+func main() {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	workerCount, err := strconv.Atoi(MaxWorkers)
+	if err != nil {
+		log.Default().Fatalln("invalid MAX_WORKERS value")
+	}
+
+	queueCount, err := strconv.Atoi(MaxQueue)
+	if err != nil {
+		log.Default().Fatalln("invalid MAX_QUEUE value")
+	}
+
+	jobQueue := make(chan worker.Job, queueCount)
+
+	d := worker.NewDispatcher(workerCount)
+	d.Run(ctx, jobQueue)
+
+	exitChannel := make(chan bool)
+
+	log.Default().Println("use Ctrl+C to exit")
+	<-exitChannel
+}