| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package util
- import (
- "sync"
- )
- //--------------------------------------------------------------------------
- // BlockingQueue
- //--------------------------------------------------------------------------
- // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
- // This data structure should use a pool of worker goroutines to await work.
- type BlockingQueue interface {
- // Enqueue pushes an item onto the queue
- Enqueue(item interface{})
- // Dequeue removes the first item from the queue and returns it.
- Dequeue() interface{}
- // Length returns the length of the queue
- Length() int
- // IsEmpty returns true if the queue is empty
- IsEmpty() bool
- }
- // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
- type blockingSliceQueue struct {
- q []interface{}
- l *sync.Mutex
- nonEmpty *sync.Cond
- }
- // NewBlockingQueue returns a new BlockingQueue implementation
- func NewBlockingQueue() BlockingQueue {
- l := new(sync.Mutex)
- return &blockingSliceQueue{
- q: []interface{}{},
- l: l,
- nonEmpty: sync.NewCond(l),
- }
- }
- // Enqueue pushes an item onto the queue
- func (q *blockingSliceQueue) Enqueue(item interface{}) {
- q.l.Lock()
- defer q.l.Unlock()
- q.q = append(q.q, item)
- q.nonEmpty.Broadcast()
- }
- // Dequeue removes the first item from the queue and returns it.
- func (q *blockingSliceQueue) Dequeue() interface{} {
- q.l.Lock()
- defer q.l.Unlock()
- // need to tight loop here to ensure only one thread wins and
- // others wait again
- for len(q.q) == 0 {
- q.nonEmpty.Wait()
- }
- e := q.q[0]
- // nil 0 index to prevent leak
- q.q[0] = nil
- q.q = q.q[1:]
- return e
- }
- // Length returns the length of the queue
- func (q *blockingSliceQueue) Length() int {
- q.l.Lock()
- defer q.l.Unlock()
- return len(q.q)
- }
- // IsEmpty returns true if the queue is empty
- func (q *blockingSliceQueue) IsEmpty() bool {
- return q.Length() == 0
- }
|