blockingqueue.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package util
  2. import (
  3. "sync"
  4. )
  5. //--------------------------------------------------------------------------
  6. // BlockingQueue
  7. //--------------------------------------------------------------------------
  8. // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
  9. // This data structure should use a pool of worker goroutines to await work.
  10. type BlockingQueue interface {
  11. // Enqueue pushes an item onto the queue
  12. Enqueue(item interface{})
  13. // Dequeue removes the first item from the queue and returns it.
  14. Dequeue() interface{}
  15. // Length returns the length of the queue
  16. Length() int
  17. // IsEmpty returns true if the queue is empty
  18. IsEmpty() bool
  19. }
  20. // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
  21. type blockingSliceQueue struct {
  22. q []interface{}
  23. l *sync.Mutex
  24. nonEmpty *sync.Cond
  25. }
  26. // NewBlockingQueue returns a new BlockingQueue implementation
  27. func NewBlockingQueue() BlockingQueue {
  28. l := new(sync.Mutex)
  29. return &blockingSliceQueue{
  30. q: []interface{}{},
  31. l: l,
  32. nonEmpty: sync.NewCond(l),
  33. }
  34. }
  35. // Enqueue pushes an item onto the queue
  36. func (q *blockingSliceQueue) Enqueue(item interface{}) {
  37. q.l.Lock()
  38. defer q.l.Unlock()
  39. q.q = append(q.q, item)
  40. q.nonEmpty.Broadcast()
  41. }
  42. // Dequeue removes the first item from the queue and returns it.
  43. func (q *blockingSliceQueue) Dequeue() interface{} {
  44. q.l.Lock()
  45. defer q.l.Unlock()
  46. // need to tight loop here to ensure only one thread wins and
  47. // others wait again
  48. for len(q.q) == 0 {
  49. q.nonEmpty.Wait()
  50. }
  51. e := q.q[0]
  52. // nil 0 index to prevent leak
  53. q.q[0] = nil
  54. q.q = q.q[1:]
  55. return e
  56. }
  57. // Length returns the length of the queue
  58. func (q *blockingSliceQueue) Length() int {
  59. q.l.Lock()
  60. defer q.l.Unlock()
  61. return len(q.q)
  62. }
  63. // IsEmpty returns true if the queue is empty
  64. func (q *blockingSliceQueue) IsEmpty() bool {
  65. return q.Length() == 0
  66. }