2
0

worker.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package worker
  2. import (
  3. "context"
  4. "log"
  5. "time"
  6. "github.com/google/uuid"
  7. )
  8. // Job is an interface which should be implemented by an individual
  9. // worker process in order to be enqueued in the worker pool
  10. type Job interface {
  11. // The unique string ID of a job
  12. ID() string
  13. // The time in UTC when a job was enqueued to the worker pool queue
  14. EnqueueTime() time.Time
  15. // The main logic and control of a job
  16. Run(ctx context.Context) error
  17. // To set external data if a job needs it
  18. SetData([]byte)
  19. }
  20. // Worker handles a single job or worker process
  21. type Worker struct {
  22. exitChan chan bool
  23. uuid uuid.UUID
  24. WorkerPool chan chan Job
  25. JobChannel chan Job
  26. }
  27. // NewWorker creates a new instance of Worker with the given
  28. // RFC 4122 UUID and a global worker pool
  29. func NewWorker(uuid uuid.UUID, workerPool chan chan Job) *Worker {
  30. return &Worker{
  31. exitChan: make(chan bool),
  32. uuid: uuid,
  33. WorkerPool: workerPool,
  34. JobChannel: make(chan Job),
  35. }
  36. }
  37. // Start spawns a goroutine to add itself to the global worker pool
  38. // and listens for incoming jobs as they come, in random order
  39. func (w *Worker) Start(ctx context.Context) {
  40. go func() {
  41. for {
  42. w.WorkerPool <- w.JobChannel
  43. select {
  44. case job := <-w.JobChannel:
  45. log.Printf("attempting to run job ID '%s' via worker '%s'", job.ID(), w.uuid.String())
  46. if err := job.Run(ctx); err != nil {
  47. log.Printf("error running job %s: %s", job.ID(), err.Error())
  48. }
  49. case <-w.exitChan:
  50. log.Printf("quitting worker with UUID: %v", w.uuid)
  51. return
  52. }
  53. }
  54. }()
  55. }
  56. // Stop instructs the worker to stop listening for incoming jobs
  57. func (w *Worker) Stop() {
  58. w.exitChan <- true
  59. }