| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- package worker
- import (
- "fmt"
- "runtime"
- "sync"
- "sync/atomic"
- "github.com/opencost/opencost/core/pkg/collections"
- )
- // Worker is a transformation function from input type T to output type U.
- type Worker[T any, U any] func(T) U
- // WorkerPool is a pool of go routines executing a Worker on supplied inputs via
- // the Run function.
- type WorkerPool[T any, U any] interface {
- // Run executes a Worker in the pool on the provided input and onComplete receive chanel
- // to get the results. An error is returned if the pool is shutdown, or is in the process
- // of shutting down.
- Run(input T, onComplete chan<- U) error
- // Shutdown stops all of the workers (if running).
- Shutdown()
- }
- // WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and
- // collect the results in a single slice.
- type WorkGroup[T any, U any] interface {
- // Push adds a new input to the work group.
- Push(T) error
- // Wait waits for all pending worker tasks to complete, then returns all the results.
- Wait() []U
- }
- // entry is an internal helper type for pushing payloads to the worker queue
- type entry[T any, U any] struct {
- payload T
- onComplete chan<- U
- close bool
- }
- // queuedWorkerPool is a blocking queue based implementation of a WorkerPool
- type queuedWorkerPool[T any, U any] struct {
- queue collections.BlockingQueue[entry[T, U]]
- work Worker[T, U]
- workers int
- isShutdown atomic.Bool
- }
- // ordered is a WorkGroup implementation which enforces ordering based on when
- // inputs were pushed onto the group.
- type ordered[T any, U any] struct {
- workPool WorkerPool[T, U]
- results []U
- wg *sync.WaitGroup
- count int
- }
- // NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker
- // func used to transform inputs to outputs.
- func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
- owq := &queuedWorkerPool[T, U]{
- workers: workers,
- work: work,
- queue: collections.NewBlockingQueue[entry[T, U]](),
- }
- // startup the designated workers
- for i := 0; i < workers; i++ {
- go owq.worker()
- }
- return owq
- }
- // Run executes a Worker in the pool on the provided input and onComplete receive chanel
- // to get the results. An error is returned if the pool is shutdown, or is in the process
- // of shutting down.
- func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
- if wq.isShutdown.Load() {
- return fmt.Errorf("WorkerPoolShutdown")
- }
- wq.queue.Enqueue(entry[T, U]{
- payload: input,
- onComplete: onComplete,
- close: false,
- })
- return nil
- }
- // Shutdown stops all of the workers (if running).
- func (wq *queuedWorkerPool[T, U]) Shutdown() {
- if !wq.isShutdown.CompareAndSwap(false, true) {
- return
- }
- for i := 0; i < wq.workers; i++ {
- wq.queue.Enqueue(entry[T, U]{
- close: true,
- })
- }
- }
- func (wq *queuedWorkerPool[T, U]) worker() {
- for {
- next := wq.queue.Dequeue()
- // shutdown the worker on sentinel value
- if next.close {
- return
- }
- result := wq.work(next.payload)
- // signal on complete if applicable
- if next.onComplete != nil {
- next.onComplete <- result
- }
- }
- }
- // NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which
- // they are pushed. Ordered groups do not support concurrent Push() calls.
- func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U] {
- return &ordered[T, U]{
- workPool: pool,
- results: make([]U, size),
- wg: new(sync.WaitGroup),
- count: 0,
- }
- }
- // Push adds a new input to the work group.
- func (ow *ordered[T, U]) Push(input T) error {
- current := ow.count
- if current >= len(ow.results) {
- return fmt.Errorf("MaxCapacity")
- }
- onComplete := make(chan U)
- err := ow.workPool.Run(input, onComplete)
- if err != nil {
- return err
- }
- ow.count++
- ow.wg.Add(1)
- go func(index int) {
- defer close(onComplete)
- ow.results[index] = <-onComplete
- ow.wg.Done()
- }(int(current))
- return nil
- }
- // Wait waits for all pending worker tasks to complete, then returns all the results.
- func (ow *ordered[T, U]) Wait() []U {
- ow.wg.Wait()
- return ow.results
- }
- // these constraints protect against the possibility of unexpected output from runtime.NumCPU()
- const (
- defaultMinWorkers = 4
- defaultMaxWorkers = 16
- )
- // OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()
- func OptimalWorkerCount() int {
- return OptimalWorkerCountInRange(defaultMinWorkers, defaultMaxWorkers)
- }
- // OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max
- // range
- func OptimalWorkerCountInRange(min int, max int) int {
- cores := runtime.NumCPU()
- if cores < min {
- return min
- }
- if cores > max {
- return max
- }
- return cores
- }
- // ConcurrentDo runs a pool of workers which concurrently call the provided worker func on each input to get ordered
- // output corresponding to the inputs
- func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
- workerPool := NewWorkerPool(OptimalWorkerCount(), worker)
- defer workerPool.Shutdown()
- workGroup := NewOrderedGroup(workerPool, len(inputs))
- for _, input := range inputs {
- workGroup.Push(input)
- }
- return workGroup.Wait()
- }
|