| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- package collections
- import (
- "sync"
- )
- //--------------------------------------------------------------------------
- // BlockingQueue
- //--------------------------------------------------------------------------
- // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
- // This data structure should use a pool of worker goroutines to await work.
- type BlockingQueue interface {
- // Enqueue pushes an item onto the queue
- Enqueue(item interface{})
- // Dequeue removes the first item from the queue and returns it.
- Dequeue() interface{}
- // TryDequeue attempts to remove the first item from the queue and return it. This
- // method does not block, and instead, returns true if the item was available and false
- // otherwise
- TryDequeue() (interface{}, bool)
- // Each blocks modification and allows iteration of the queue.
- Each(f func(int, interface{}))
- // Length returns the length of the queue
- Length() int
- // IsEmpty returns true if the queue is empty
- IsEmpty() bool
- // Clear empties the queue
- Clear()
- }
- // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
- type blockingSliceQueue struct {
- q []interface{}
- l *sync.Mutex
- nonEmpty *sync.Cond
- }
- // NewBlockingQueue returns a new BlockingQueue implementation
- func NewBlockingQueue() BlockingQueue {
- l := new(sync.Mutex)
- return &blockingSliceQueue{
- q: []interface{}{},
- l: l,
- nonEmpty: sync.NewCond(l),
- }
- }
- // Enqueue pushes an item onto the queue
- func (q *blockingSliceQueue) Enqueue(item interface{}) {
- q.l.Lock()
- defer q.l.Unlock()
- q.q = append(q.q, item)
- q.nonEmpty.Broadcast()
- }
- // Dequeue removes the first item from the queue and returns it.
- func (q *blockingSliceQueue) Dequeue() interface{} {
- q.l.Lock()
- defer q.l.Unlock()
- // need to tight loop here to ensure only one thread wins and
- // others wait again
- for len(q.q) == 0 {
- q.nonEmpty.Wait()
- }
- e := q.q[0]
- // nil 0 index to prevent leak
- q.q[0] = nil
- q.q = q.q[1:]
- return e
- }
- // TryDequeue attempts to remove the first item from the queue and return it. This
- // method does not block, and instead, returns true if the item was available and false
- // otherwise
- func (q *blockingSliceQueue) TryDequeue() (interface{}, bool) {
- q.l.Lock()
- defer q.l.Unlock()
- if len(q.q) == 0 {
- return nil, false
- }
- e := q.q[0]
- // nil 0 index to prevent leak
- q.q[0] = nil
- q.q = q.q[1:]
- return e, true
- }
- // Each blocks modification and allows iteration of the queue.
- func (q *blockingSliceQueue) Each(f func(int, interface{})) {
- q.l.Lock()
- defer q.l.Unlock()
- for i, entry := range q.q {
- f(i, entry)
- }
- }
- // Length returns the length of the queue
- func (q *blockingSliceQueue) Length() int {
- q.l.Lock()
- defer q.l.Unlock()
- return len(q.q)
- }
- // IsEmpty returns true if the queue is empty
- func (q *blockingSliceQueue) IsEmpty() bool {
- return q.Length() == 0
- }
- // Clear empties the queue
- func (q *blockingSliceQueue) Clear() {
- q.l.Lock()
- defer q.l.Unlock()
- // seems optimal here to create a new underlying slice/array to
- // avoid capacity ballooning, but does feel like an implementation
- // specific detail -- we can revisit if there are other relevant
- // use-cases
- q.q = []interface{}{}
- }
|