2
0

worker.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package worker
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. "sync/atomic"
  7. "github.com/opencost/opencost/pkg/collections"
  8. )
  9. // Worker is a transformation function from input type T to output type U.
  10. type Worker[T any, U any] func(T) U
  11. // WorkerPool is a pool of go routines executing a Worker on supplied inputs via
  12. // the Run function.
  13. type WorkerPool[T any, U any] interface {
  14. // Run executes a Worker in the pool on the provided input and onComplete receive chanel
  15. // to get the results. An error is returned if the pool is shutdown, or is in the process
  16. // of shutting down.
  17. Run(input T, onComplete chan<- U) error
  18. // Shutdown stops all of the workers (if running).
  19. Shutdown()
  20. }
  21. // WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and
  22. // collect the results in a single slice.
  23. type WorkGroup[T any, U any] interface {
  24. // Push adds a new input to the work group.
  25. Push(T) error
  26. // Wait waits for all pending worker tasks to complete, then returns all the results.
  27. Wait() []U
  28. }
  29. // entry is an internal helper type for pushing payloads to the worker queue
  30. type entry[T any, U any] struct {
  31. payload T
  32. onComplete chan<- U
  33. close bool
  34. }
  35. // queuedWorkerPool is a blocking queue based implementation of a WorkerPool
  36. type queuedWorkerPool[T any, U any] struct {
  37. queue collections.BlockingQueue[entry[T, U]]
  38. work Worker[T, U]
  39. workers int
  40. isShutdown atomic.Bool
  41. }
  42. // ordered is a WorkGroup implementation which enforces ordering based on when
  43. // inputs were pushed onto the group.
  44. type ordered[T any, U any] struct {
  45. workPool WorkerPool[T, U]
  46. results []U
  47. wg *sync.WaitGroup
  48. count int
  49. }
  50. // NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker
  51. // func used to transform inputs to outputs.
  52. func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
  53. owq := &queuedWorkerPool[T, U]{
  54. workers: workers,
  55. work: work,
  56. queue: collections.NewBlockingQueue[entry[T, U]](),
  57. }
  58. // startup the designated workers
  59. for i := 0; i < workers; i++ {
  60. go owq.worker()
  61. }
  62. return owq
  63. }
  64. // Run executes a Worker in the pool on the provided input and onComplete receive chanel
  65. // to get the results. An error is returned if the pool is shutdown, or is in the process
  66. // of shutting down.
  67. func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
  68. if wq.isShutdown.Load() {
  69. return fmt.Errorf("WorkerPoolShutdown")
  70. }
  71. wq.queue.Enqueue(entry[T, U]{
  72. payload: input,
  73. onComplete: onComplete,
  74. close: false,
  75. })
  76. return nil
  77. }
  78. // Shutdown stops all of the workers (if running).
  79. func (wq *queuedWorkerPool[T, U]) Shutdown() {
  80. if !wq.isShutdown.CompareAndSwap(false, true) {
  81. return
  82. }
  83. for i := 0; i < wq.workers; i++ {
  84. wq.queue.Enqueue(entry[T, U]{
  85. close: true,
  86. })
  87. }
  88. }
  89. func (wq *queuedWorkerPool[T, U]) worker() {
  90. for {
  91. next := wq.queue.Dequeue()
  92. // shutdown the worker on sentinel value
  93. if next.close {
  94. return
  95. }
  96. result := wq.work(next.payload)
  97. // signal on complete if applicable
  98. if next.onComplete != nil {
  99. next.onComplete <- result
  100. }
  101. }
  102. }
  103. // NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which
  104. // they are pushed. Ordered groups do not support concurrent Push() calls.
  105. func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U] {
  106. return &ordered[T, U]{
  107. workPool: pool,
  108. results: make([]U, size),
  109. wg: new(sync.WaitGroup),
  110. count: 0,
  111. }
  112. }
  113. // Push adds a new input to the work group.
  114. func (ow *ordered[T, U]) Push(input T) error {
  115. current := ow.count
  116. if current >= len(ow.results) {
  117. return fmt.Errorf("MaxCapacity")
  118. }
  119. onComplete := make(chan U)
  120. err := ow.workPool.Run(input, onComplete)
  121. if err != nil {
  122. return err
  123. }
  124. ow.count++
  125. ow.wg.Add(1)
  126. go func(index int) {
  127. defer close(onComplete)
  128. ow.results[index] = <-onComplete
  129. ow.wg.Done()
  130. }(int(current))
  131. return nil
  132. }
  133. // Wait waits for all pending worker tasks to complete, then returns all the results.
  134. func (ow *ordered[T, U]) Wait() []U {
  135. ow.wg.Wait()
  136. return ow.results
  137. }
  138. // these constraints protect against the possibility of unexpected output from runtime.NumCPU()
  139. const (
  140. defaultMinWorkers = 4
  141. defaultMaxWorkers = 16
  142. )
  143. // OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()
  144. func OptimalWorkerCount() int {
  145. return OptimalWorkerCountInRange(defaultMinWorkers, defaultMaxWorkers)
  146. }
  147. // OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max
  148. // range
  149. func OptimalWorkerCountInRange(min int, max int) int {
  150. cores := runtime.NumCPU()
  151. if cores < min {
  152. return min
  153. }
  154. if cores > max {
  155. return max
  156. }
  157. return cores
  158. }
  159. // ConcurrentDo runs a pool of workers which concurrently call the provided worker func on each input to get ordered
  160. // output corresponding to the inputs
  161. func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
  162. workerPool := NewWorkerPool(OptimalWorkerCount(), worker)
  163. defer workerPool.Shutdown()
  164. workGroup := NewOrderedGroup(workerPool, len(inputs))
  165. for _, input := range inputs {
  166. workGroup.Push(input)
  167. }
  168. return workGroup.Wait()
  169. }