blockingqueue.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package util
  2. import (
  3. "sync"
  4. )
  5. //--------------------------------------------------------------------------
  6. // BlockingQueue
  7. //--------------------------------------------------------------------------
  8. // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
  9. // This data structure should use a pool of worker goroutines to await work.
  10. type BlockingQueue interface {
  11. // Enqueue pushes an item onto the queue
  12. Enqueue(item interface{})
  13. // Dequeue removes the first item from the queue and returns it.
  14. Dequeue() interface{}
  15. // Each blocks modification and allows iteration of the queue.
  16. Each(f func(int, interface{}))
  17. // Length returns the length of the queue
  18. Length() int
  19. // IsEmpty returns true if the queue is empty
  20. IsEmpty() bool
  21. }
  22. // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
  23. type blockingSliceQueue struct {
  24. q []interface{}
  25. l *sync.Mutex
  26. nonEmpty *sync.Cond
  27. }
  28. // NewBlockingQueue returns a new BlockingQueue implementation
  29. func NewBlockingQueue() BlockingQueue {
  30. l := new(sync.Mutex)
  31. return &blockingSliceQueue{
  32. q: []interface{}{},
  33. l: l,
  34. nonEmpty: sync.NewCond(l),
  35. }
  36. }
  37. // Enqueue pushes an item onto the queue
  38. func (q *blockingSliceQueue) Enqueue(item interface{}) {
  39. q.l.Lock()
  40. defer q.l.Unlock()
  41. q.q = append(q.q, item)
  42. q.nonEmpty.Broadcast()
  43. }
  44. // Dequeue removes the first item from the queue and returns it.
  45. func (q *blockingSliceQueue) Dequeue() interface{} {
  46. q.l.Lock()
  47. defer q.l.Unlock()
  48. // need to tight loop here to ensure only one thread wins and
  49. // others wait again
  50. for len(q.q) == 0 {
  51. q.nonEmpty.Wait()
  52. }
  53. e := q.q[0]
  54. // nil 0 index to prevent leak
  55. q.q[0] = nil
  56. q.q = q.q[1:]
  57. return e
  58. }
  59. // Each blocks modification and allows iteration of the queue.
  60. func (q *blockingSliceQueue) Each(f func(int, interface{})) {
  61. q.l.Lock()
  62. defer q.l.Unlock()
  63. for i, entry := range q.q {
  64. f(i, entry)
  65. }
  66. }
  67. // Length returns the length of the queue
  68. func (q *blockingSliceQueue) Length() int {
  69. q.l.Lock()
  70. defer q.l.Unlock()
  71. return len(q.q)
  72. }
  73. // IsEmpty returns true if the queue is empty
  74. func (q *blockingSliceQueue) IsEmpty() bool {
  75. return q.Length() == 0
  76. }