example_worker_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package worker_test
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/pkg/util/worker"
  6. )
  7. // slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64,
  8. // waits 1 second, then returns the result.
  9. func slowAddTenToFloat(i int) float64 {
  10. result := float64(i + 10)
  11. time.Sleep(time.Second)
  12. return result
  13. }
  14. func Example_concurrentWorkers() {
  15. // Assuming we have a list of ints we want to pass to slowAddTenToFloat(),
  16. // rather than serially calling the function on each input (requiring a wait
  17. // of 1 second between calls), we'll want to execute each in a goroutine. Let's
  18. // say we had 100 inputs, we may not want to create that many go routines, so
  19. // instead, we can create a pool of goroutines that work on our inputs as fast as
  20. // possible.
  21. // Create a worker pool using 50 goroutines:
  22. workerPool := worker.NewWorkerPool(50, slowAddTenToFloat)
  23. // we want to shutdown the workerPool at the end of it's use to ensure we don't
  24. // leak go routines
  25. defer workerPool.Shutdown()
  26. // Loop over 100 inputs and run slowAddTenToFloat
  27. for i := 0; i < 100; i++ {
  28. // Run accepts a receive channel for each input, but it is not required.
  29. // To demonstrate receiving, we'll receive the results when the input
  30. // is 50:
  31. if i == 50 {
  32. receive := make(chan float64)
  33. workerPool.Run(i, receive)
  34. // since we don't want to slow down the input loop, let's receive the
  35. // result in a separate go routine
  36. go func(input int, rec chan float64) {
  37. defer close(rec)
  38. result := <-rec
  39. fmt.Printf("Receive Result: %.2f for Input: %d\n", result, input)
  40. }(i, receive)
  41. } else {
  42. // pass nil if receiving the result isn't necessary
  43. workerPool.Run(i, nil)
  44. }
  45. }
  46. // 100 inputs with 50 go routines should take 2 seconds, so let's wait a bit longer than that
  47. time.Sleep((2 * time.Second) + (500 * time.Millisecond))
  48. // Output:
  49. // Receive Result: 60.00 for Input: 50
  50. }
  51. func Example_concurrentOrdered() {
  52. // Expanding on the previous idea, let's assume that we want to receive the result for
  53. // every input. That would normally require some specialized synchronization and boilerplate,
  54. // but the worker package contains a ordered group type for exactly this functionality
  55. // This time, let's create a worker pool and use the MAXGOPROCS value to determine the number
  56. // of workers
  57. workerCount := worker.OptimalWorkerCount()
  58. workerPool := worker.NewWorkerPool(workerCount, slowAddTenToFloat)
  59. // Shutdown the worker pool when complete
  60. defer workerPool.Shutdown()
  61. // now we can create our ordered group type and pass in the worker pool, and since we know our
  62. // number of inputs (let's choose 12 this time), we can pass that to the group as well.
  63. const numInputs = 12
  64. orderedGroup := worker.NewOrderedGroup(workerPool, numInputs)
  65. // loop over our inputs and pass them to the group
  66. for i := 0; i < numInputs; i++ {
  67. // ordered group has a strict size constraint (set in the NewOrderedGroup func), and will
  68. // error if the number of inputs pushed exceeds that size constraint
  69. err := orderedGroup.Push(i)
  70. if err != nil {
  71. panic(err)
  72. }
  73. }
  74. // now we can simply call Wait() to receive the results
  75. results := orderedGroup.Wait()
  76. // Note that the order of the results is consistent with the order in which they were pushed
  77. for idx, result := range results {
  78. fmt.Printf("Received Result: %.2f for Input: %d\n", result, idx)
  79. }
  80. // Output:
  81. // Received Result: 10.00 for Input: 0
  82. // Received Result: 11.00 for Input: 1
  83. // Received Result: 12.00 for Input: 2
  84. // Received Result: 13.00 for Input: 3
  85. // Received Result: 14.00 for Input: 4
  86. // Received Result: 15.00 for Input: 5
  87. // Received Result: 16.00 for Input: 6
  88. // Received Result: 17.00 for Input: 7
  89. // Received Result: 18.00 for Input: 8
  90. // Received Result: 19.00 for Input: 9
  91. // Received Result: 20.00 for Input: 10
  92. // Received Result: 21.00 for Input: 11
  93. }
  94. func Example_concurrentOrderedSimple() {
  95. // This last example highlights a simplified version of the previous example. While
  96. // the ordered example provides tuning knobs for total goroutines and allows pushing
  97. // data dynamically, it can be quite verbose and difficult to read at times. The worker
  98. // package also provides a utility function that simplifies the ordered concurrent
  99. // processing into a worker function and a slice of inputs
  100. // Let's create our inputs 0-12 like in the previous example
  101. const numInputs = 12
  102. inputs := make([]int, numInputs)
  103. for i := 0; i < numInputs; i++ {
  104. inputs[i] = i
  105. }
  106. // Now, we can just call ConcurrentDo with the inputs and worker func:
  107. results := worker.ConcurrentDo(slowAddTenToFloat, inputs)
  108. // Note that the order of the results is consistent with the order of inputs
  109. for i := 0; i < numInputs; i++ {
  110. fmt.Printf("Received Result: %.2f for Input: %d\n", results[i], inputs[i])
  111. }
  112. // Output:
  113. // Received Result: 10.00 for Input: 0
  114. // Received Result: 11.00 for Input: 1
  115. // Received Result: 12.00 for Input: 2
  116. // Received Result: 13.00 for Input: 3
  117. // Received Result: 14.00 for Input: 4
  118. // Received Result: 15.00 for Input: 5
  119. // Received Result: 16.00 for Input: 6
  120. // Received Result: 17.00 for Input: 7
  121. // Received Result: 18.00 for Input: 8
  122. // Received Result: 19.00 for Input: 9
  123. // Received Result: 20.00 for Input: 10
  124. // Received Result: 21.00 for Input: 11
  125. }