worker.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. package worker
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. "sync/atomic"
  7. "github.com/opencost/opencost/core/pkg/collections"
  8. )
  9. // Runner is a function type that takes a single input and returns nothing.
  10. type Runner[T any] func(T)
  11. // Worker is a transformation function from input type T to output type U.
  12. type Worker[T any, U any] func(T) U
  13. // WorkerPool is a pool of go routines executing a Worker on supplied inputs via
  14. // the Run function.
  15. type WorkerPool[T any, U any] interface {
  16. // Run executes a Worker in the pool on the provided input and onComplete receive chanel
  17. // to get the results. An error is returned if the pool is shutdown, or is in the process
  18. // of shutting down.
  19. Run(input T, onComplete chan<- U) error
  20. // Shutdown stops all of the workers (if running).
  21. Shutdown()
  22. }
  23. // WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and
  24. // collect the results in a single slice.
  25. type WorkGroup[T any, U any] interface {
  26. // Push adds a new input to the work group.
  27. Push(T) error
  28. // Wait waits for all pending worker tasks to complete, then returns all the results.
  29. Wait() []U
  30. }
  31. // entry is an internal helper type for pushing payloads to the worker queue
  32. type entry[T any, U any] struct {
  33. payload T
  34. onComplete chan<- U
  35. close bool
  36. }
  37. // queuedWorkerPool is a blocking queue based implementation of a WorkerPool
  38. type queuedWorkerPool[T any, U any] struct {
  39. queue collections.BlockingQueue[entry[T, U]]
  40. work Worker[T, U]
  41. workers int
  42. isShutdown atomic.Bool
  43. }
  44. // ordered is a WorkGroup implementation which enforces ordering based on when
  45. // inputs were pushed onto the group.
  46. type ordered[T any, U any] struct {
  47. workPool WorkerPool[T, U]
  48. results []U
  49. wg sync.WaitGroup
  50. count int
  51. }
  52. // NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker
  53. // func used to transform inputs to outputs.
  54. func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
  55. owq := &queuedWorkerPool[T, U]{
  56. workers: workers,
  57. work: work,
  58. queue: collections.NewBlockingQueue[entry[T, U]](),
  59. }
  60. // startup the designated workers
  61. for i := 0; i < workers; i++ {
  62. go owq.worker()
  63. }
  64. return owq
  65. }
  66. // Run executes a Worker in the pool on the provided input and onComplete receive chanel
  67. // to get the results. An error is returned if the pool is shutdown, or is in the process
  68. // of shutting down.
  69. func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
  70. if wq.isShutdown.Load() {
  71. return fmt.Errorf("WorkerPoolShutdown")
  72. }
  73. wq.queue.Enqueue(entry[T, U]{
  74. payload: input,
  75. onComplete: onComplete,
  76. close: false,
  77. })
  78. return nil
  79. }
  80. // Shutdown stops all of the workers (if running).
  81. func (wq *queuedWorkerPool[T, U]) Shutdown() {
  82. if !wq.isShutdown.CompareAndSwap(false, true) {
  83. return
  84. }
  85. for i := 0; i < wq.workers; i++ {
  86. wq.queue.Enqueue(entry[T, U]{
  87. close: true,
  88. })
  89. }
  90. }
  91. func (wq *queuedWorkerPool[T, U]) worker() {
  92. for {
  93. next := wq.queue.Dequeue()
  94. // shutdown the worker on sentinel value
  95. if next.close {
  96. return
  97. }
  98. result := wq.work(next.payload)
  99. // signal on complete if applicable
  100. if next.onComplete != nil {
  101. next.onComplete <- result
  102. }
  103. }
  104. }
  105. // NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which
  106. // they are pushed. Ordered groups do not support concurrent Push() calls.
  107. func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U] {
  108. return &ordered[T, U]{
  109. workPool: pool,
  110. results: make([]U, size),
  111. count: 0,
  112. }
  113. }
  114. // Push adds a new input to the work group.
  115. func (ow *ordered[T, U]) Push(input T) error {
  116. current := ow.count
  117. if current >= len(ow.results) {
  118. return fmt.Errorf("MaxCapacity")
  119. }
  120. onComplete := make(chan U)
  121. err := ow.workPool.Run(input, onComplete)
  122. if err != nil {
  123. return err
  124. }
  125. ow.count++
  126. ow.wg.Add(1)
  127. go func(index int) {
  128. defer close(onComplete)
  129. ow.results[index] = <-onComplete
  130. ow.wg.Done()
  131. }(int(current))
  132. return nil
  133. }
  134. // Wait waits for all pending worker tasks to complete, then returns all the results.
  135. func (ow *ordered[T, U]) Wait() []U {
  136. ow.wg.Wait()
  137. return ow.results
  138. }
  139. // noResultGroup is a WorkGroup implementation which arbitrarily pushes inputs to
  140. // a runner pool to be executed concurrently. This group does not collect results.
  141. type noResultGroup[T any] struct {
  142. workPool WorkerPool[T, struct{}]
  143. wg sync.WaitGroup
  144. }
  145. // NewNoResultGroup creates a new WorkGroup implementation for processing a group of inputs concurrently. This
  146. // work group implementation does not collect results, and therefore, requires a worker pool with a struct{} output.
  147. func NewNoResultGroup[T any](pool WorkerPool[T, struct{}]) WorkGroup[T, struct{}] {
  148. return &noResultGroup[T]{
  149. workPool: pool,
  150. }
  151. }
  152. // Push adds a new input to the work group.
  153. func (ow *noResultGroup[T]) Push(input T) error {
  154. onComplete := make(chan struct{})
  155. err := ow.workPool.Run(input, onComplete)
  156. if err != nil {
  157. return err
  158. }
  159. ow.wg.Add(1)
  160. go func() {
  161. defer close(onComplete)
  162. defer ow.wg.Done()
  163. <-onComplete
  164. }()
  165. return nil
  166. }
  167. // Wait waits for all pending worker tasks to complete, then returns all the results.
  168. func (ow *noResultGroup[T]) Wait() []struct{} {
  169. ow.wg.Wait()
  170. return []struct{}{}
  171. }
  172. // collector is a WorkGroup implementation which collects non-nil results into the results slice
  173. // and ignores any nil results.
  174. type collector[T any, U any] struct {
  175. workPool WorkerPool[T, *U]
  176. resultLock sync.Mutex
  177. results []*U
  178. wg sync.WaitGroup
  179. }
  180. // NewCollectionGroup creates a new WorkGroup implementation for processing a group of inputs concurrently. The
  181. // collection group implementation will collect all non-nil results into the output slice. Thus, the worker pool
  182. // parameter requires the output type to be a pointer.
  183. func NewCollectionGroup[T any, U any](pool WorkerPool[T, *U]) WorkGroup[T, *U] {
  184. return &collector[T, U]{
  185. workPool: pool,
  186. }
  187. }
  188. // Push adds a new input to the work group.
  189. func (ow *collector[T, U]) Push(input T) error {
  190. onComplete := make(chan *U)
  191. err := ow.workPool.Run(input, onComplete)
  192. if err != nil {
  193. return err
  194. }
  195. ow.wg.Add(1)
  196. go func() {
  197. defer ow.wg.Done()
  198. defer close(onComplete)
  199. result := <-onComplete
  200. if result != nil {
  201. ow.resultLock.Lock()
  202. ow.results = append(ow.results, result)
  203. ow.resultLock.Unlock()
  204. }
  205. }()
  206. return nil
  207. }
  208. // Wait waits for all pending worker tasks to complete, then returns all the results.
  209. func (ow *collector[T, U]) Wait() []*U {
  210. ow.wg.Wait()
  211. return ow.results
  212. }
  213. // these constraints protect against the possibility of unexpected output from runtime.NumCPU()
  214. const (
  215. defaultMinWorkers = 4
  216. defaultMaxWorkers = 16
  217. )
  218. // OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()
  219. func OptimalWorkerCount() int {
  220. return OptimalWorkerCountInRange(defaultMinWorkers, defaultMaxWorkers)
  221. }
  222. // OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max
  223. // range
  224. func OptimalWorkerCountInRange(min int, max int) int {
  225. cores := runtime.NumCPU()
  226. if cores < min {
  227. return min
  228. }
  229. if cores > max {
  230. return max
  231. }
  232. return cores
  233. }
  234. // ConcurrentDo runs a pool of N workers which concurrently call the provided worker func on each
  235. // input to get ordered output corresponding to the inputs. The total number of workers is determined
  236. // by the total number of CPUs available, bound to a range from 4-16.
  237. func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
  238. return ConcurrentDoWith(OptimalWorkerCount(), worker, inputs)
  239. }
  240. // ConcurrentDoWith runs a pool of workers of the specified size which concurrently call the provided worker func
  241. // on each input to get ordered output corresponding to the inputs. Size inputs < 1 will automatically be set to 1.
  242. func ConcurrentDoWith[T any, U any](size int, worker Worker[T, U], inputs []T) []U {
  243. if size < 1 {
  244. size = 1
  245. }
  246. workerPool := NewWorkerPool(size, worker)
  247. defer workerPool.Shutdown()
  248. workGroup := NewOrderedGroup(workerPool, len(inputs))
  249. for _, input := range inputs {
  250. workGroup.Push(input)
  251. }
  252. return workGroup.Wait()
  253. }
  254. // ConcurrentCollect runs a pool of N workers which concurrently call the provided worker func on each
  255. // input to get a result slice of non-nil outputs. The total number of workers is determined
  256. // by the total number of CPUs available, bound to a range from 4-16.
  257. func ConcurrentCollect[T any, U any](workerFunc Worker[T, *U], inputs []T) []*U {
  258. return ConcurrentCollectWith(OptimalWorkerCount(), workerFunc, inputs)
  259. }
  260. // ConcurrentCollectWith runs a pool of workers of the specified size which concurrently call the provided worker
  261. // func on each input to get a result slice of non-nil outputs. Size inputs < 1 will automatically be set to 1.
  262. func ConcurrentCollectWith[T any, U any](size int, workerFunc Worker[T, *U], inputs []T) []*U {
  263. if size < 1 {
  264. size = 1
  265. }
  266. workerPool := NewWorkerPool(size, workerFunc)
  267. defer workerPool.Shutdown()
  268. workGroup := NewCollectionGroup(workerPool)
  269. for _, input := range inputs {
  270. workGroup.Push(input)
  271. }
  272. return workGroup.Wait()
  273. }
  274. // ConcurrentRun runs a pool of N workers which concurrently call the provided runner func on each
  275. // input. The total number of workers is determined by the total number of CPUs available, bound to
  276. // a range from 4-16.
  277. func ConcurrentRun[T any](runner Runner[T], inputs []T) {
  278. ConcurrentRunWith(OptimalWorkerCount(), runner, inputs)
  279. }
  280. // ConcurrentRunWith runs a pool of runners of the specified size which concurrently call the provided runner
  281. // func on each input. Size inputs < 1 will automatically be set to 1.
  282. func ConcurrentRunWith[T any](size int, runner Runner[T], inputs []T) {
  283. if size < 1 {
  284. size = 1
  285. }
  286. workerPool := NewWorkerPool(size, func(input T) (void struct{}) {
  287. runner(input)
  288. return
  289. })
  290. workGroup := NewNoResultGroup(workerPool)
  291. for _, input := range inputs {
  292. workGroup.Push(input)
  293. }
  294. workGroup.Wait()
  295. }