Mohammed Nafees il y a 3 ans
Parent
commit
3fed850b33
5 fichiers modifiés avec 71 ajouts et 25 suppressions
  1. 1 0
      go.mod
  2. 1 0
      go.sum
  3. 22 25
      internal/worker/dispatcher.go
  4. 22 0
      internal/worker/dispatcher_test.go
  5. 25 0
      internal/worker/worker_test.go

+ 1 - 0
go.mod

@@ -112,6 +112,7 @@ require (
 	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 	github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
 	github.com/xanzy/go-gitlab v0.68.0 // indirect
+	go.uber.org/goleak v1.1.12 // indirect
 )
 
 require (

+ 1 - 0
go.sum

@@ -2099,6 +2099,7 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
+go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
 go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
 go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=

+ 22 - 25
internal/worker/dispatcher.go

@@ -1,7 +1,6 @@
 package worker
 
 import (
-	"fmt"
 	"log"
 
 	"github.com/google/uuid"
@@ -12,7 +11,6 @@ import (
 type Dispatcher struct {
 	maxWorkers int
 	exitChan   chan bool
-	workers    []*Worker
 
 	WorkerPool chan chan Job
 }
@@ -24,7 +22,6 @@ func NewDispatcher(maxWorkers int) *Dispatcher {
 	return &Dispatcher{
 		maxWorkers: maxWorkers,
 		exitChan:   make(chan bool),
-		workers:    make([]*Worker, maxWorkers),
 
 		WorkerPool: pool,
 	}
@@ -33,33 +30,26 @@ func NewDispatcher(maxWorkers int) *Dispatcher {
 // Run creates workers in the worker pool with the given
 // job queue and starts the workers
 func (d *Dispatcher) Run(jobQueue chan Job) error {
-	for i := 0; i < d.maxWorkers; i += 1 {
-		uuid, err := uuid.NewUUID()
+	go func() {
+		var workers []*Worker
 
-		if err != nil {
-			return fmt.Errorf("error creating UUID for worker: %w", err)
-		}
-
-		worker := NewWorker(uuid, d.WorkerPool)
-		d.workers = append(d.workers, worker)
-
-		log.Printf("starting worker with UUID: %v", uuid)
+		for i := 0; i < d.maxWorkers; i += 1 {
+			uuid, err := uuid.NewUUID()
 
-		worker.Start()
-	}
+			if err != nil {
+				// FIXME: should let the parent thread know of this error
+				log.Printf("error creating UUID for worker: %v", err)
+				return
+			}
 
-	d.dispatch(jobQueue)
+			worker := NewWorker(uuid, d.WorkerPool)
+			workers = append(workers, worker)
 
-	return nil
-}
+			log.Printf("starting worker with UUID: %v", uuid)
 
-// Exit instructs the dispatcher to quit processing any more jobs
-func (d *Dispatcher) Exit() {
-	d.exitChan <- true
-}
+			worker.Start()
+		}
 
-func (d *Dispatcher) dispatch(jobQueue chan Job) {
-	go func(workers []*Worker) {
 		for {
 			select {
 			case job := <-jobQueue:
@@ -75,5 +65,12 @@ func (d *Dispatcher) dispatch(jobQueue chan Job) {
 				return
 			}
 		}
-	}(d.workers)
+	}()
+
+	return nil
+}
+
+// Exit instructs the dispatcher to quit processing any more jobs
+func (d *Dispatcher) Exit() {
+	d.exitChan <- true
 }

+ 22 - 0
internal/worker/dispatcher_test.go

@@ -0,0 +1,22 @@
+package worker
+
+import (
+	"testing"
+
+	"go.uber.org/goleak"
+)
+
+func TestDispatcher(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
+	jobChan := make(chan Job)
+
+	d := NewDispatcher(10)
+	err := d.Run(jobChan)
+
+	if err != nil {
+		panic(err)
+	}
+
+	d.Exit()
+}

+ 25 - 0
internal/worker/worker_test.go

@@ -0,0 +1,25 @@
+package worker
+
+import (
+	"testing"
+
+	"github.com/google/uuid"
+	"go.uber.org/goleak"
+)
+
+func TestWorker(t *testing.T) {
+	defer goleak.VerifyNone(t)
+
+	uuid, err := uuid.NewUUID()
+
+	if err != nil {
+		panic(err)
+	}
+
+	workerPool := make(chan chan Job, 10)
+
+	w := NewWorker(uuid, workerPool)
+
+	w.Start()
+	w.Stop()
+}