dispatcher.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package worker
  2. import (
  3. "log"
  4. "github.com/google/uuid"
  5. )
  6. // Dispatcher is responsible to maintain a global worker pool
  7. // and to dispatch jobs to the underlying workers, in random order
  8. type Dispatcher struct {
  9. maxWorkers int
  10. exitChan chan bool
  11. WorkerPool chan chan Job
  12. }
  13. // NewDispatcher creates a new instance of Dispatcher with
  14. // the given number of workers that should be in the worker pool
  15. func NewDispatcher(maxWorkers int) *Dispatcher {
  16. pool := make(chan chan Job, maxWorkers)
  17. return &Dispatcher{
  18. maxWorkers: maxWorkers,
  19. exitChan: make(chan bool),
  20. WorkerPool: pool,
  21. }
  22. }
  23. // Run creates workers in the worker pool with the given
  24. // job queue and starts the workers
  25. func (d *Dispatcher) Run(jobQueue chan Job) error {
  26. go func() {
  27. var workers []*Worker
  28. for i := 0; i < d.maxWorkers; i += 1 {
  29. uuid, err := uuid.NewUUID()
  30. if err != nil {
  31. // FIXME: should let the parent thread know of this error
  32. log.Printf("error creating UUID for worker: %v", err)
  33. return
  34. }
  35. worker := NewWorker(uuid, d.WorkerPool)
  36. workers = append(workers, worker)
  37. log.Printf("starting worker with UUID: %v", uuid)
  38. worker.Start()
  39. }
  40. for {
  41. select {
  42. case job := <-jobQueue:
  43. go func() {
  44. workerJobChan := <-d.WorkerPool
  45. workerJobChan <- job
  46. }()
  47. case <-d.exitChan:
  48. for _, w := range workers {
  49. w.Stop()
  50. }
  51. return
  52. }
  53. }
  54. }()
  55. return nil
  56. }
  57. // Exit instructs the dispatcher to quit processing any more jobs
  58. func (d *Dispatcher) Exit() {
  59. d.exitChan <- true
  60. }