worker.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package worker
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. "github.com/kubecost/opencost/pkg/collections"
  7. "github.com/kubecost/opencost/pkg/util/atomic"
  8. )
  9. // Worker is a transformation function from input type T to output type U.
  10. type Worker[T any, U any] func(T) U
  11. // WorkerPool is a pool of go routines executing a Worker on supplied inputs via
  12. // the Run function.
  13. type WorkerPool[T any, U any] interface {
  14. // Run executes a Worker in the pool on the provided input and onComplete receive chanel
  15. // to get the results. An error is returned if the pool is shutdown, or is in the process
  16. // of shutting down.
  17. Run(input T, onComplete chan<- U) error
  18. // Shutdown stops all of the workers (if running).
  19. Shutdown()
  20. }
  21. // WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and
  22. // collect the results in a single slice.
  23. type WorkGroup[T any, U any] interface {
  24. // Push adds a new input to the work group.
  25. Push(T) error
  26. // Wait waits for all pending worker tasks to complete, then returns all the results.
  27. Wait() []U
  28. }
  29. // entry is an internal helper type for pushing payloads to the worker queue
  30. type entry[T any, U any] struct {
  31. payload T
  32. onComplete chan<- U
  33. close bool
  34. }
  35. // queuedWorkerPool is a blocking queue based implementation of a WorkerPool
  36. type queuedWorkerPool[T any, U any] struct {
  37. queue collections.BlockingQueue[entry[T, U]]
  38. work Worker[T, U]
  39. workers int
  40. isShutdown *atomic.AtomicBool
  41. }
  42. // ordered is a WorkGroup implementation which enforces ordering based on when
  43. // inputs were pushed onto the group.
  44. type ordered[T any, U any] struct {
  45. workPool WorkerPool[T, U]
  46. results []U
  47. wg *sync.WaitGroup
  48. count int
  49. }
  50. // NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker
  51. // func used to transform inputs to outputs.
  52. func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
  53. owq := &queuedWorkerPool[T, U]{
  54. workers: workers,
  55. work: work,
  56. queue: collections.NewBlockingQueue[entry[T, U]](),
  57. isShutdown: atomic.NewAtomicBool(false),
  58. }
  59. // startup the designated workers
  60. for i := 0; i < workers; i++ {
  61. go owq.worker()
  62. }
  63. return owq
  64. }
  65. // Run executes a Worker in the pool on the provided input and onComplete receive chanel
  66. // to get the results. An error is returned if the pool is shutdown, or is in the process
  67. // of shutting down.
  68. func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
  69. if wq.isShutdown.Get() {
  70. return fmt.Errorf("WorkerPoolShutdown")
  71. }
  72. wq.queue.Enqueue(entry[T, U]{
  73. payload: input,
  74. onComplete: onComplete,
  75. close: false,
  76. })
  77. return nil
  78. }
  79. // Shutdown stops all of the workers (if running).
  80. func (wq *queuedWorkerPool[T, U]) Shutdown() {
  81. if !wq.isShutdown.CompareAndSet(false, true) {
  82. return
  83. }
  84. for i := 0; i < wq.workers; i++ {
  85. wq.queue.Enqueue(entry[T, U]{
  86. close: true,
  87. })
  88. }
  89. }
  90. func (wq *queuedWorkerPool[T, U]) worker() {
  91. for {
  92. next := wq.queue.Dequeue()
  93. // shutdown the worker on sentinel value
  94. if next.close {
  95. return
  96. }
  97. result := wq.work(next.payload)
  98. // signal on complete if applicable
  99. if next.onComplete != nil {
  100. next.onComplete <- result
  101. }
  102. }
  103. }
  104. // NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which
  105. // they are pushed. Ordered groups do not support concurrent Push() calls.
  106. func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U] {
  107. return &ordered[T, U]{
  108. workPool: pool,
  109. results: make([]U, size),
  110. wg: new(sync.WaitGroup),
  111. count: 0,
  112. }
  113. }
  114. // Push adds a new input to the work group.
  115. func (ow *ordered[T, U]) Push(input T) error {
  116. current := ow.count
  117. if current >= len(ow.results) {
  118. return fmt.Errorf("MaxCapacity")
  119. }
  120. onComplete := make(chan U)
  121. err := ow.workPool.Run(input, onComplete)
  122. if err != nil {
  123. return err
  124. }
  125. ow.count++
  126. ow.wg.Add(1)
  127. go func(index int) {
  128. defer close(onComplete)
  129. ow.results[index] = <-onComplete
  130. ow.wg.Done()
  131. }(int(current))
  132. return nil
  133. }
  134. // Wait waits for all pending worker tasks to complete, then returns all the results.
  135. func (ow *ordered[T, U]) Wait() []U {
  136. ow.wg.Wait()
  137. return ow.results
  138. }
  139. // these constraints protect against the possibility of unexpected output from runtime.NumCPU()
  140. const (
  141. defaultMinWorkers = 4
  142. defaultMaxWorkers = 16
  143. )
  144. // OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()
  145. func OptimalWorkerCount() int {
  146. return OptimalWorkerCountInRange(defaultMinWorkers, defaultMaxWorkers)
  147. }
  148. // OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max
  149. // range
  150. func OptimalWorkerCountInRange(min int, max int) int {
  151. cores := runtime.NumCPU()
  152. if cores < min {
  153. return min
  154. }
  155. if cores > max {
  156. return max
  157. }
  158. return cores
  159. }
  160. // ConcurrentDo runs a pool of workers which concurrently call the provided worker func on each input to get ordered
  161. // output corresponding to the inputs
  162. func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
  163. workerPool := NewWorkerPool(OptimalWorkerCount(), worker)
  164. defer workerPool.Shutdown()
  165. workGroup := NewOrderedGroup(workerPool, len(inputs))
  166. for _, input := range inputs {
  167. workGroup.Push(input)
  168. }
  169. return workGroup.Wait()
  170. }