worker.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package worker
  2. import (
  3. "log"
  4. "time"
  5. "github.com/google/uuid"
  6. )
  7. // Job is an interface which should be implemented by an individual
  8. // worker process in order to be enqueued in the worker pool
  9. type Job interface {
  10. // The unique string ID of a job
  11. ID() string
  12. // The time in UTC when a job was enqueued to the worker pool queue
  13. EnqueueTime() time.Time
  14. // The main logic and control of a job
  15. Run() error
  16. // To set external data if a job needs it
  17. SetData([]byte)
  18. }
  19. // Worker handles a single job or worker process
  20. type Worker struct {
  21. exitChan chan bool
  22. uuid uuid.UUID
  23. WorkerPool chan chan Job
  24. JobChannel chan Job
  25. }
  26. // NewWorker creates a new instance of Worker with the given
  27. // RFC 4122 UUID and a global worker pool
  28. func NewWorker(uuid uuid.UUID, workerPool chan chan Job) *Worker {
  29. return &Worker{
  30. exitChan: make(chan bool),
  31. uuid: uuid,
  32. WorkerPool: workerPool,
  33. JobChannel: make(chan Job),
  34. }
  35. }
  36. // Start spawns a goroutine to add itself to the global worker pool
  37. // and listens for incoming jobs as they come, in random order
  38. func (w *Worker) Start() {
  39. go func() {
  40. for {
  41. w.WorkerPool <- w.JobChannel
  42. select {
  43. case job := <-w.JobChannel:
  44. log.Printf("attempting to run job ID '%s' via worker '%s'", job.ID(), w.uuid.String())
  45. if err := job.Run(); err != nil {
  46. log.Printf("error running job %s: %s", job.ID(), err.Error())
  47. }
  48. case <-w.exitChan:
  49. log.Printf("quitting worker with UUID: %v", w.uuid)
  50. return
  51. }
  52. }
  53. }()
  54. }
  55. // Stop instructs the worker to stop listening for incoming jobs
  56. func (w *Worker) Stop() {
  57. w.exitChan <- true
  58. }