blockingqueue.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package collections
  2. import (
  3. "sync"
  4. )
  5. //--------------------------------------------------------------------------
  6. // BlockingQueue
  7. //--------------------------------------------------------------------------
  8. // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
  9. // This data structure should use a pool of worker goroutines to await work.
  10. type BlockingQueue interface {
  11. // Enqueue pushes an item onto the queue
  12. Enqueue(item interface{})
  13. // Dequeue removes the first item from the queue and returns it.
  14. Dequeue() interface{}
  15. // TryDequeue attempts to remove the first item from the queue and return it. This
  16. // method does not block, and instead, returns true if the item was available and false
  17. // otherwise
  18. TryDequeue() (interface{}, bool)
  19. // Each blocks modification and allows iteration of the queue.
  20. Each(f func(int, interface{}))
  21. // Length returns the length of the queue
  22. Length() int
  23. // IsEmpty returns true if the queue is empty
  24. IsEmpty() bool
  25. }
  26. // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
  27. type blockingSliceQueue struct {
  28. q []interface{}
  29. l *sync.Mutex
  30. nonEmpty *sync.Cond
  31. }
  32. // NewBlockingQueue returns a new BlockingQueue implementation
  33. func NewBlockingQueue() BlockingQueue {
  34. l := new(sync.Mutex)
  35. return &blockingSliceQueue{
  36. q: []interface{}{},
  37. l: l,
  38. nonEmpty: sync.NewCond(l),
  39. }
  40. }
  41. // Enqueue pushes an item onto the queue
  42. func (q *blockingSliceQueue) Enqueue(item interface{}) {
  43. q.l.Lock()
  44. defer q.l.Unlock()
  45. q.q = append(q.q, item)
  46. q.nonEmpty.Broadcast()
  47. }
  48. // Dequeue removes the first item from the queue and returns it.
  49. func (q *blockingSliceQueue) Dequeue() interface{} {
  50. q.l.Lock()
  51. defer q.l.Unlock()
  52. // need to tight loop here to ensure only one thread wins and
  53. // others wait again
  54. for len(q.q) == 0 {
  55. q.nonEmpty.Wait()
  56. }
  57. e := q.q[0]
  58. // nil 0 index to prevent leak
  59. q.q[0] = nil
  60. q.q = q.q[1:]
  61. return e
  62. }
  63. // TryDequeue attempts to remove the first item from the queue and return it. This
  64. // method does not block, and instead, returns true if the item was available and false
  65. // otherwise
  66. func (q *blockingSliceQueue) TryDequeue() (interface{}, bool) {
  67. q.l.Lock()
  68. defer q.l.Unlock()
  69. if len(q.q) == 0 {
  70. return nil, false
  71. }
  72. e := q.q[0]
  73. // nil 0 index to prevent leak
  74. q.q[0] = nil
  75. q.q = q.q[1:]
  76. return e, true
  77. }
  78. // Each blocks modification and allows iteration of the queue.
  79. func (q *blockingSliceQueue) Each(f func(int, interface{})) {
  80. q.l.Lock()
  81. defer q.l.Unlock()
  82. for i, entry := range q.q {
  83. f(i, entry)
  84. }
  85. }
  86. // Length returns the length of the queue
  87. func (q *blockingSliceQueue) Length() int {
  88. q.l.Lock()
  89. defer q.l.Unlock()
  90. return len(q.q)
  91. }
  92. // IsEmpty returns true if the queue is empty
  93. func (q *blockingSliceQueue) IsEmpty() bool {
  94. return q.Length() == 0
  95. }