2
0

blockingqueue.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package collections
  2. import (
  3. "sync"
  4. )
  5. //--------------------------------------------------------------------------
  6. // BlockingQueue
  7. //--------------------------------------------------------------------------
  8. // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
  9. // This data structure should use a pool of worker goroutines to await work.
  10. type BlockingQueue[T any] interface {
  11. // Enqueue pushes an item onto the queue
  12. Enqueue(item T)
  13. // Dequeue removes the first item from the queue and returns it.
  14. Dequeue() T
  15. // TryDequeue attempts to remove the first item from the queue and return it. This
  16. // method does not block, and instead, returns true if the item was available and false
  17. // otherwise
  18. TryDequeue() (T, bool)
  19. // Each blocks modification and allows iteration of the queue.
  20. Each(f func(int, T))
  21. // Length returns the length of the queue
  22. Length() int
  23. // IsEmpty returns true if the queue is empty
  24. IsEmpty() bool
  25. // Clear empties the queue
  26. Clear()
  27. }
  28. // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
  29. type blockingSliceQueue[T any] struct {
  30. q []T
  31. l *sync.Mutex
  32. nonEmpty *sync.Cond
  33. }
  34. // NewBlockingQueue returns a new BlockingQueue implementation
  35. func NewBlockingQueue[T any]() BlockingQueue[T] {
  36. l := new(sync.Mutex)
  37. return &blockingSliceQueue[T]{
  38. q: []T{},
  39. l: l,
  40. nonEmpty: sync.NewCond(l),
  41. }
  42. }
  43. // Enqueue pushes an item onto the queue
  44. func (q *blockingSliceQueue[T]) Enqueue(item T) {
  45. q.l.Lock()
  46. defer q.l.Unlock()
  47. q.q = append(q.q, item)
  48. q.nonEmpty.Broadcast()
  49. }
  50. // Dequeue removes the first item from the queue and returns it.
  51. func (q *blockingSliceQueue[T]) Dequeue() T {
  52. q.l.Lock()
  53. defer q.l.Unlock()
  54. // need to tight loop here to ensure only one thread wins and
  55. // others wait again
  56. for len(q.q) == 0 {
  57. q.nonEmpty.Wait()
  58. }
  59. e := q.q[0]
  60. // nil 0 index to prevent leak
  61. q.q[0] = defaultValue[T]()
  62. q.q = q.q[1:]
  63. return e
  64. }
  65. // TryDequeue attempts to remove the first item from the queue and return it. This
  66. // method does not block, and instead, returns true if the item was available and false
  67. // otherwise
  68. func (q *blockingSliceQueue[T]) TryDequeue() (T, bool) {
  69. q.l.Lock()
  70. defer q.l.Unlock()
  71. if len(q.q) == 0 {
  72. return defaultValue[T](), false
  73. }
  74. e := q.q[0]
  75. // default 0 index to prevent leak
  76. q.q[0] = defaultValue[T]()
  77. q.q = q.q[1:]
  78. return e, true
  79. }
  80. // Each blocks modification and allows iteration of the queue.
  81. func (q *blockingSliceQueue[T]) Each(f func(int, T)) {
  82. q.l.Lock()
  83. defer q.l.Unlock()
  84. for i, entry := range q.q {
  85. f(i, entry)
  86. }
  87. }
  88. // Length returns the length of the queue
  89. func (q *blockingSliceQueue[T]) Length() int {
  90. q.l.Lock()
  91. defer q.l.Unlock()
  92. return len(q.q)
  93. }
  94. // IsEmpty returns true if the queue is empty
  95. func (q *blockingSliceQueue[T]) IsEmpty() bool {
  96. return q.Length() == 0
  97. }
  98. // Clear empties the queue
  99. func (q *blockingSliceQueue[T]) Clear() {
  100. q.l.Lock()
  101. defer q.l.Unlock()
  102. // seems optimal here to create a new underlying slice/array to
  103. // avoid capacity ballooning, but does feel like an implementation
  104. // specific detail -- we can revisit if there are other relevant
  105. // use-cases
  106. q.q = []T{}
  107. }
  108. // defaultValue returns the language default for the T type
  109. func defaultValue[T any]() T {
  110. var t T
  111. return t
  112. }