worker.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. package worker
  2. import (
  3. "fmt"
  4. "iter"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/opencost/opencost/core/pkg/collections"
  9. "github.com/opencost/opencost/core/pkg/util/sliceutil"
  10. )
  11. // Runner is a function type that takes a single input and returns nothing.
  12. type Runner[T any] func(T)
  13. // Worker is a transformation function from input type T to output type U.
  14. type Worker[T any, U any] func(T) U
  15. // WorkerPool is a pool of go routines executing a Worker on supplied inputs via
  16. // the Run function.
  17. type WorkerPool[T any, U any] interface {
  18. // Run executes a Worker in the pool on the provided input and onComplete receive chanel
  19. // to get the results. An error is returned if the pool is shutdown, or is in the process
  20. // of shutting down.
  21. Run(input T, onComplete chan<- U) error
  22. // Shutdown stops all of the workers (if running).
  23. Shutdown()
  24. }
  25. // WorkProcessor is a group of inputs that leverage a WorkerPool to run inputs through workers and
  26. // process the job results as they complete.
  27. type WorkProcessor[T any, U any] interface {
  28. // Execute adds all inputs to be run by the worker pool and processed on completion.
  29. Execute(inputs []T, process func(U)) error
  30. }
  31. // WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and
  32. // collect the results in a single slice.
  33. type WorkGroup[T any, U any] interface {
  34. // Push adds a new input to the work group.
  35. Push(T) error
  36. // Wait waits for all pending worker tasks to complete, then returns all the results.
  37. Wait() []U
  38. }
  39. // entry is an internal helper type for pushing payloads to the worker queue
  40. type entry[T any, U any] struct {
  41. payload T
  42. onComplete chan<- U
  43. close bool
  44. }
  45. // queuedWorkerPool is a blocking queue based implementation of a WorkerPool
  46. type queuedWorkerPool[T any, U any] struct {
  47. queue collections.BlockingQueue[entry[T, U]]
  48. work Worker[T, U]
  49. workers int
  50. isShutdown atomic.Bool
  51. }
  52. // NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker
  53. // func used to transform inputs to outputs.
  54. func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
  55. owq := &queuedWorkerPool[T, U]{
  56. workers: workers,
  57. work: work,
  58. queue: collections.NewBlockingQueue[entry[T, U]](),
  59. }
  60. // startup the designated workers
  61. for i := 0; i < workers; i++ {
  62. go owq.worker()
  63. }
  64. return owq
  65. }
  66. // Run executes a Worker in the pool on the provided input and onComplete receive chanel
  67. // to get the results. An error is returned if the pool is shutdown, or is in the process
  68. // of shutting down.
  69. func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
  70. if wq.isShutdown.Load() {
  71. return fmt.Errorf("WorkerPoolShutdown")
  72. }
  73. wq.queue.Enqueue(entry[T, U]{
  74. payload: input,
  75. onComplete: onComplete,
  76. close: false,
  77. })
  78. return nil
  79. }
  80. // Shutdown stops all of the workers (if running).
  81. func (wq *queuedWorkerPool[T, U]) Shutdown() {
  82. if !wq.isShutdown.CompareAndSwap(false, true) {
  83. return
  84. }
  85. for i := 0; i < wq.workers; i++ {
  86. wq.queue.Enqueue(entry[T, U]{
  87. close: true,
  88. })
  89. }
  90. }
  91. func (wq *queuedWorkerPool[T, U]) worker() {
  92. for {
  93. next := wq.queue.Dequeue()
  94. // shutdown the worker on sentinel value
  95. if next.close {
  96. return
  97. }
  98. result := wq.work(next.payload)
  99. // signal on complete if applicable
  100. if next.onComplete != nil {
  101. next.onComplete <- result
  102. }
  103. }
  104. }
  105. // ordered is a WorkGroup implementation which enforces ordering based on when
  106. // inputs were pushed onto the group.
  107. type ordered[T any, U any] struct {
  108. workPool WorkerPool[T, U]
  109. results []U
  110. wg sync.WaitGroup
  111. count int
  112. }
  113. // NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which
  114. // they are pushed. Ordered groups do not support concurrent Push() calls.
  115. func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U] {
  116. return &ordered[T, U]{
  117. workPool: pool,
  118. results: make([]U, size),
  119. count: 0,
  120. }
  121. }
  122. // Push adds a new input to the work group.
  123. func (ow *ordered[T, U]) Push(input T) error {
  124. current := ow.count
  125. if current >= len(ow.results) {
  126. return fmt.Errorf("MaxCapacity")
  127. }
  128. onComplete := make(chan U)
  129. err := ow.workPool.Run(input, onComplete)
  130. if err != nil {
  131. return err
  132. }
  133. ow.count++
  134. ow.wg.Add(1)
  135. go func(index int) {
  136. defer close(onComplete)
  137. ow.results[index] = <-onComplete
  138. ow.wg.Done()
  139. }(int(current))
  140. return nil
  141. }
  142. // Wait waits for all pending worker tasks to complete, then returns all the results.
  143. func (ow *ordered[T, U]) Wait() []U {
  144. ow.wg.Wait()
  145. return ow.results
  146. }
  147. // orderedProcessor is a WorkProcessor implementation which processes inputs in batches
  148. // in the same order as the slice, serially on the same go routine. Note that the process go routine
  149. // will not be the same as the calling go routine. However, process will be called on the same goroutine.
  150. type orderedProcessor[T any, U any] struct {
  151. workPool WorkerPool[T, U]
  152. }
  153. // NewOrderedProcessor creates a new ordered work processor for processing an execution result in the
  154. // order in which the inputs were passed.
  155. func NewOrderedProcessor[T any, U any](pool WorkerPool[T, U]) WorkProcessor[T, U] {
  156. return &orderedProcessor[T, U]{
  157. workPool: pool,
  158. }
  159. }
  160. func (obp *orderedProcessor[T, U]) Execute(inputs []T, process func(U)) error {
  161. if len(inputs) == 0 {
  162. return nil
  163. }
  164. // Run all the inputs in order, creating a channel per input to receive results.
  165. channels := make([]chan U, len(inputs))
  166. for i, input := range inputs {
  167. channels[i] = make(chan U)
  168. err := obp.workPool.Run(input, channels[i])
  169. if err != nil {
  170. return err
  171. }
  172. }
  173. // Create a separate goroutine to process to execute all the results serially
  174. done := make(chan struct{})
  175. go func() {
  176. defer close(done)
  177. for _, ch := range channels {
  178. result := <-ch
  179. process(result)
  180. close(ch)
  181. }
  182. }()
  183. <-done
  184. return nil
  185. }
  186. // noResultGroup is a WorkGroup implementation which arbitrarily pushes inputs to
  187. // a runner pool to be executed concurrently. This group does not collect results.
  188. type noResultGroup[T any] struct {
  189. workPool WorkerPool[T, struct{}]
  190. wg sync.WaitGroup
  191. }
  192. // NewNoResultGroup creates a new WorkGroup implementation for processing a group of inputs concurrently. This
  193. // work group implementation does not collect results, and therefore, requires a worker pool with a struct{} output.
  194. func NewNoResultGroup[T any](pool WorkerPool[T, struct{}]) WorkGroup[T, struct{}] {
  195. return &noResultGroup[T]{
  196. workPool: pool,
  197. }
  198. }
  199. // Push adds a new input to the work group.
  200. func (ow *noResultGroup[T]) Push(input T) error {
  201. onComplete := make(chan struct{})
  202. err := ow.workPool.Run(input, onComplete)
  203. if err != nil {
  204. return err
  205. }
  206. ow.wg.Add(1)
  207. go func() {
  208. defer close(onComplete)
  209. defer ow.wg.Done()
  210. <-onComplete
  211. }()
  212. return nil
  213. }
  214. // Wait waits for all pending worker tasks to complete, then returns all the results.
  215. func (ow *noResultGroup[T]) Wait() []struct{} {
  216. ow.wg.Wait()
  217. return []struct{}{}
  218. }
  219. // collector is a WorkGroup implementation which collects non-nil results into the results slice
  220. // and ignores any nil results.
  221. type collector[T any, U any] struct {
  222. workPool WorkerPool[T, *U]
  223. resultLock sync.Mutex
  224. results []*U
  225. wg sync.WaitGroup
  226. }
  227. // NewCollectionGroup creates a new WorkGroup implementation for processing a group of inputs concurrently. The
  228. // collection group implementation will collect all non-nil results into the output slice. Thus, the worker pool
  229. // parameter requires the output type to be a pointer.
  230. func NewCollectionGroup[T any, U any](pool WorkerPool[T, *U]) WorkGroup[T, *U] {
  231. return &collector[T, U]{
  232. workPool: pool,
  233. }
  234. }
  235. // Push adds a new input to the work group.
  236. func (ow *collector[T, U]) Push(input T) error {
  237. onComplete := make(chan *U)
  238. err := ow.workPool.Run(input, onComplete)
  239. if err != nil {
  240. return err
  241. }
  242. ow.wg.Add(1)
  243. go func() {
  244. defer ow.wg.Done()
  245. defer close(onComplete)
  246. result := <-onComplete
  247. if result != nil {
  248. ow.resultLock.Lock()
  249. ow.results = append(ow.results, result)
  250. ow.resultLock.Unlock()
  251. }
  252. }()
  253. return nil
  254. }
  255. // Wait waits for all pending worker tasks to complete, then returns all the results.
  256. func (ow *collector[T, U]) Wait() []*U {
  257. ow.wg.Wait()
  258. return ow.results
  259. }
  260. // these constraints protect against the possibility of unexpected output from runtime.NumCPU()
  261. const (
  262. defaultMinWorkers = 4
  263. defaultMaxWorkers = 16
  264. )
  265. // OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()
  266. func OptimalWorkerCount() int {
  267. return OptimalWorkerCountInRange(defaultMinWorkers, defaultMaxWorkers)
  268. }
  269. // OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max
  270. // range
  271. func OptimalWorkerCountInRange(min int, max int) int {
  272. cores := runtime.NumCPU()
  273. if cores < min {
  274. return min
  275. }
  276. if cores > max {
  277. return max
  278. }
  279. return cores
  280. }
  281. // ConcurrentDo runs a pool of N workers which concurrently call the provided worker func on each
  282. // input to get ordered output corresponding to the inputs. The total number of workers is determined
  283. // by the total number of CPUs available, bound to a range from 4-16.
  284. func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
  285. return ConcurrentDoWith(OptimalWorkerCount(), worker, inputs)
  286. }
  287. // ConcurrentDoWith runs a pool of workers of the specified size which concurrently call the provided worker func
  288. // on each input to get ordered output corresponding to the inputs. Size inputs < 1 will automatically be set to 1.
  289. func ConcurrentDoWith[T any, U any](size int, worker Worker[T, U], inputs []T) []U {
  290. if size < 1 {
  291. size = 1
  292. }
  293. workerPool := NewWorkerPool(size, worker)
  294. defer workerPool.Shutdown()
  295. workGroup := NewOrderedGroup(workerPool, len(inputs))
  296. for _, input := range inputs {
  297. workGroup.Push(input)
  298. }
  299. return workGroup.Wait()
  300. }
  301. // ConcurrentCollect runs a pool of N workers which concurrently call the provided worker func on each
  302. // input to get a result slice of non-nil outputs. The total number of workers is determined
  303. // by the total number of CPUs available, bound to a range from 4-16.
  304. func ConcurrentCollect[T any, U any](workerFunc Worker[T, *U], inputs []T) []*U {
  305. return ConcurrentCollectWith(OptimalWorkerCount(), workerFunc, inputs)
  306. }
  307. // ConcurrentCollectWith runs a pool of workers of the specified size which concurrently call the provided worker
  308. // func on each input to get a result slice of non-nil outputs. Size inputs < 1 will automatically be set to 1.
  309. func ConcurrentCollectWith[T any, U any](size int, workerFunc Worker[T, *U], inputs []T) []*U {
  310. return ConcurrentIterCollect(size, workerFunc, sliceutil.AsSeq(inputs))
  311. }
  312. // ConcurrentIterCollect runs a pool of workers of the specified size which concurrently call the provided worker
  313. // func on each input to get a result slice of non-nil outputs. Size inputs < 1 will automatically be set to 1.
  314. func ConcurrentIterCollect[T any, U any](size int, workerFunc Worker[T, *U], inputs iter.Seq[T]) []*U {
  315. if size < 1 {
  316. size = 1
  317. }
  318. workerPool := NewWorkerPool(size, workerFunc)
  319. defer workerPool.Shutdown()
  320. workGroup := NewCollectionGroup(workerPool)
  321. for input := range inputs {
  322. workGroup.Push(input)
  323. }
  324. return workGroup.Wait()
  325. }
  326. // ConcurrentRun runs a pool of N workers which concurrently call the provided runner func on each
  327. // input. The total number of workers is determined by the total number of CPUs available, bound to
  328. // a range from 4-16.
  329. func ConcurrentRun[T any](runner Runner[T], inputs []T) {
  330. ConcurrentRunWith(OptimalWorkerCount(), runner, inputs)
  331. }
  332. // ConcurrentRunWith runs a pool of runners of the specified size which concurrently call the provided runner
  333. // func on each input. Size inputs < 1 will automatically be set to 1.
  334. func ConcurrentRunWith[T any](size int, runner Runner[T], inputs []T) {
  335. ConcurrentIterRunWith(size, runner, sliceutil.AsSeq(inputs))
  336. }
  337. // ConcurrentIterRunWith runs a pool of runners of the specified size which concurrently call the provided runner
  338. // func on each input. Size inputs < 1 will automatically be set to 1.
  339. func ConcurrentIterRunWith[T any](size int, runner Runner[T], inputs iter.Seq[T]) {
  340. if size < 1 {
  341. size = 1
  342. }
  343. workerPool := NewWorkerPool(size, func(input T) (void struct{}) {
  344. runner(input)
  345. return
  346. })
  347. defer workerPool.Shutdown()
  348. workGroup := NewNoResultGroup(workerPool)
  349. for input := range inputs {
  350. workGroup.Push(input)
  351. }
  352. workGroup.Wait()
  353. }
  354. // ConcurrentOrderedProcess runs a pool of N workers which concurrently call the provided worker func on each input, then
  355. // calls the process function on each result, as it completes, in the same order as the inputs.
  356. func ConcurrentOrderedProcess[T any, U any](worker Worker[T, U], inputs []T, process func(U)) {
  357. ConcurrentOrderedProcessWith(OptimalWorkerCount(), worker, inputs, process)
  358. }
  359. // ConcurrentOrderedProcess runs a pool of size workers which concurrently call the provided worker func on each input, then
  360. // calls the process function on each result, as it completes, in the same order as the inputs.
  361. func ConcurrentOrderedProcessWith[T any, U any](size int, worker Worker[T, U], inputs []T, process func(U)) {
  362. if len(inputs) == 0 {
  363. return
  364. }
  365. workerPool := NewWorkerPool(size, worker)
  366. defer workerPool.Shutdown()
  367. // processors block on execute, so no need to explicitly wait
  368. workProcessor := NewOrderedProcessor(workerPool)
  369. workProcessor.Execute(inputs, process)
  370. }