dispatcher.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package worker
  2. import (
  3. "context"
  4. "log"
  5. "github.com/google/uuid"
  6. )
  7. // Dispatcher is responsible to maintain a global worker pool
  8. // and to dispatch jobs to the underlying workers, in random order
  9. type Dispatcher struct {
  10. maxWorkers int
  11. exitChan chan bool
  12. WorkerPool chan chan Job
  13. }
  14. // NewDispatcher creates a new instance of Dispatcher with
  15. // the given number of workers that should be in the worker pool
  16. func NewDispatcher(maxWorkers int) *Dispatcher {
  17. pool := make(chan chan Job, maxWorkers)
  18. return &Dispatcher{
  19. maxWorkers: maxWorkers,
  20. exitChan: make(chan bool),
  21. WorkerPool: pool,
  22. }
  23. }
  24. // Run creates workers in the worker pool with the given
  25. // job queue and starts the workers
  26. func (d *Dispatcher) Run(ctx context.Context, jobQueue chan Job) error {
  27. go func() {
  28. var workers []*Worker
  29. for i := 0; i < d.maxWorkers; i += 1 {
  30. uuid, err := uuid.NewUUID()
  31. if err != nil {
  32. // FIXME: should let the parent thread know of this error
  33. log.Printf("error creating UUID for worker: %v", err)
  34. return
  35. }
  36. worker := NewWorker(uuid, d.WorkerPool)
  37. workers = append(workers, worker)
  38. log.Printf("starting worker with UUID: %v", uuid)
  39. worker.Start(ctx)
  40. }
  41. for {
  42. select {
  43. case job := <-jobQueue:
  44. go func() {
  45. workerJobChan := <-d.WorkerPool
  46. workerJobChan <- job
  47. }()
  48. case <-d.exitChan:
  49. for _, w := range workers {
  50. w.Stop()
  51. }
  52. return
  53. }
  54. }
  55. }()
  56. return nil
  57. }
  58. // Exit instructs the dispatcher to quit processing any more jobs
  59. func (d *Dispatcher) Exit() {
  60. d.exitChan <- true
  61. }