blockingqueue.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package collections
  2. import (
  3. "sync"
  4. )
  5. //--------------------------------------------------------------------------
  6. // BlockingQueue
  7. //--------------------------------------------------------------------------
  8. // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
  9. // This data structure should use a pool of worker goroutines to await work.
  10. type BlockingQueue interface {
  11. // Enqueue pushes an item onto the queue
  12. Enqueue(item interface{})
  13. // Dequeue removes the first item from the queue and returns it.
  14. Dequeue() interface{}
  15. // TryDequeue attempts to remove the first item from the queue and return it. This
  16. // method does not block, and instead, returns true if the item was available and false
  17. // otherwise
  18. TryDequeue() (interface{}, bool)
  19. // Each blocks modification and allows iteration of the queue.
  20. Each(f func(int, interface{}))
  21. // Length returns the length of the queue
  22. Length() int
  23. // IsEmpty returns true if the queue is empty
  24. IsEmpty() bool
  25. // Clear empties the queue
  26. Clear()
  27. }
  28. // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
  29. type blockingSliceQueue struct {
  30. q []interface{}
  31. l *sync.Mutex
  32. nonEmpty *sync.Cond
  33. }
  34. // NewBlockingQueue returns a new BlockingQueue implementation
  35. func NewBlockingQueue() BlockingQueue {
  36. l := new(sync.Mutex)
  37. return &blockingSliceQueue{
  38. q: []interface{}{},
  39. l: l,
  40. nonEmpty: sync.NewCond(l),
  41. }
  42. }
  43. // Enqueue pushes an item onto the queue
  44. func (q *blockingSliceQueue) Enqueue(item interface{}) {
  45. q.l.Lock()
  46. defer q.l.Unlock()
  47. q.q = append(q.q, item)
  48. q.nonEmpty.Broadcast()
  49. }
  50. // Dequeue removes the first item from the queue and returns it.
  51. func (q *blockingSliceQueue) Dequeue() interface{} {
  52. q.l.Lock()
  53. defer q.l.Unlock()
  54. // need to tight loop here to ensure only one thread wins and
  55. // others wait again
  56. for len(q.q) == 0 {
  57. q.nonEmpty.Wait()
  58. }
  59. e := q.q[0]
  60. // nil 0 index to prevent leak
  61. q.q[0] = nil
  62. q.q = q.q[1:]
  63. return e
  64. }
  65. // TryDequeue attempts to remove the first item from the queue and return it. This
  66. // method does not block, and instead, returns true if the item was available and false
  67. // otherwise
  68. func (q *blockingSliceQueue) TryDequeue() (interface{}, bool) {
  69. q.l.Lock()
  70. defer q.l.Unlock()
  71. if len(q.q) == 0 {
  72. return nil, false
  73. }
  74. e := q.q[0]
  75. // nil 0 index to prevent leak
  76. q.q[0] = nil
  77. q.q = q.q[1:]
  78. return e, true
  79. }
  80. // Each blocks modification and allows iteration of the queue.
  81. func (q *blockingSliceQueue) Each(f func(int, interface{})) {
  82. q.l.Lock()
  83. defer q.l.Unlock()
  84. for i, entry := range q.q {
  85. f(i, entry)
  86. }
  87. }
  88. // Length returns the length of the queue
  89. func (q *blockingSliceQueue) Length() int {
  90. q.l.Lock()
  91. defer q.l.Unlock()
  92. return len(q.q)
  93. }
  94. // IsEmpty returns true if the queue is empty
  95. func (q *blockingSliceQueue) IsEmpty() bool {
  96. return q.Length() == 0
  97. }
  98. // Clear empties the queue
  99. func (q *blockingSliceQueue) Clear() {
  100. q.l.Lock()
  101. defer q.l.Unlock()
  102. // seems optimal here to create a new underlying slice/array to
  103. // avoid capacity ballooning, but does feel like an implementation
  104. // specific detail -- we can revisit if there are other relevant
  105. // use-cases
  106. q.q = []interface{}{}
  107. }