| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- package worker_test
- import (
- "fmt"
- "time"
- "github.com/opencost/opencost/pkg/util/worker"
- )
- // slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64,
- // waits 1 second, then returns the result.
- func slowAddTenToFloat(i int) float64 {
- result := float64(i + 10)
- time.Sleep(time.Second)
- return result
- }
- func Example_concurrentWorkers() {
- // Assuming we have a list of ints we want to pass to slowAddTenToFloat(),
- // rather than serially calling the function on each input (requiring a wait
- // of 1 second between calls), we'll want to execute each in a goroutine. Let's
- // say we had 100 inputs, we may not want to create that many go routines, so
- // instead, we can create a pool of goroutines that work on our inputs as fast as
- // possible.
- // Create a worker pool using 50 goroutines:
- workerPool := worker.NewWorkerPool(50, slowAddTenToFloat)
- // we want to shutdown the workerPool at the end of it's use to ensure we don't
- // leak go routines
- defer workerPool.Shutdown()
- // Loop over 100 inputs and run slowAddTenToFloat
- for i := 0; i < 100; i++ {
- // Run accepts a receive channel for each input, but it is not required.
- // To demonstrate receiving, we'll receive the results when the input
- // is 50:
- if i == 50 {
- receive := make(chan float64)
- workerPool.Run(i, receive)
- // since we don't want to slow down the input loop, let's receive the
- // result in a separate go routine
- go func(input int, rec chan float64) {
- defer close(rec)
- result := <-rec
- fmt.Printf("Receive Result: %.2f for Input: %d\n", result, input)
- }(i, receive)
- } else {
- // pass nil if receiving the result isn't necessary
- workerPool.Run(i, nil)
- }
- }
- // 100 inputs with 50 go routines should take 2 seconds, so let's wait a bit longer than that
- time.Sleep((2 * time.Second) + (500 * time.Millisecond))
- // Output:
- // Receive Result: 60.00 for Input: 50
- }
- func Example_concurrentOrdered() {
- // Expanding on the previous idea, let's assume that we want to receive the result for
- // every input. That would normally require some specialized synchronization and boilerplate,
- // but the worker package contains a ordered group type for exactly this functionality
- // This time, let's create a worker pool and use the MAXGOPROCS value to determine the number
- // of workers
- workerCount := worker.OptimalWorkerCount()
- workerPool := worker.NewWorkerPool(workerCount, slowAddTenToFloat)
- // Shutdown the worker pool when complete
- defer workerPool.Shutdown()
- // now we can create our ordered group type and pass in the worker pool, and since we know our
- // number of inputs (let's choose 12 this time), we can pass that to the group as well.
- const numInputs = 12
- orderedGroup := worker.NewOrderedGroup(workerPool, numInputs)
- // loop over our inputs and pass them to the group
- for i := 0; i < numInputs; i++ {
- // ordered group has a strict size constraint (set in the NewOrderedGroup func), and will
- // error if the number of inputs pushed exceeds that size constraint
- err := orderedGroup.Push(i)
- if err != nil {
- panic(err)
- }
- }
- // now we can simply call Wait() to receive the results
- results := orderedGroup.Wait()
- // Note that the order of the results is consistent with the order in which they were pushed
- for idx, result := range results {
- fmt.Printf("Received Result: %.2f for Input: %d\n", result, idx)
- }
- // Output:
- // Received Result: 10.00 for Input: 0
- // Received Result: 11.00 for Input: 1
- // Received Result: 12.00 for Input: 2
- // Received Result: 13.00 for Input: 3
- // Received Result: 14.00 for Input: 4
- // Received Result: 15.00 for Input: 5
- // Received Result: 16.00 for Input: 6
- // Received Result: 17.00 for Input: 7
- // Received Result: 18.00 for Input: 8
- // Received Result: 19.00 for Input: 9
- // Received Result: 20.00 for Input: 10
- // Received Result: 21.00 for Input: 11
- }
- func Example_concurrentOrderedSimple() {
- // This last example highlights a simplified version of the previous example. While
- // the ordered example provides tuning knobs for total goroutines and allows pushing
- // data dynamically, it can be quite verbose and difficult to read at times. The worker
- // package also provides a utility function that simplifies the ordered concurrent
- // processing into a worker function and a slice of inputs
- // Let's create our inputs 0-12 like in the previous example
- const numInputs = 12
- inputs := make([]int, numInputs)
- for i := 0; i < numInputs; i++ {
- inputs[i] = i
- }
- // Now, we can just call ConcurrentDo with the inputs and worker func:
- results := worker.ConcurrentDo(slowAddTenToFloat, inputs)
- // Note that the order of the results is consistent with the order of inputs
- for i := 0; i < numInputs; i++ {
- fmt.Printf("Received Result: %.2f for Input: %d\n", results[i], inputs[i])
- }
- // Output:
- // Received Result: 10.00 for Input: 0
- // Received Result: 11.00 for Input: 1
- // Received Result: 12.00 for Input: 2
- // Received Result: 13.00 for Input: 3
- // Received Result: 14.00 for Input: 4
- // Received Result: 15.00 for Input: 5
- // Received Result: 16.00 for Input: 6
- // Received Result: 17.00 for Input: 7
- // Received Result: 18.00 for Input: 8
- // Received Result: 19.00 for Input: 9
- // Received Result: 20.00 for Input: 10
- // Received Result: 21.00 for Input: 11
- }
|