Sfoglia il codice sorgente

Added worker package with unit tests and examples for language docs with an in depth explanation of each component and it's possible use cases.

Matt Bolt 4 anni fa
parent
commit
f2c55fe5a2

+ 147 - 0
pkg/util/worker/example_worker_test.go

@@ -0,0 +1,147 @@
+package worker_test
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/util/worker"
+)
+
+// slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64,
+// waits 1 second, then returns the result.
+func slowAddTenToFloat(i int) float64 {
+	result := float64(i + 10)
+	time.Sleep(time.Second)
+	return result
+}
+
+func Example_concurrentWorkers() {
+	// Assuming we have a list of ints we want to pass to slowAddTenToFloat(),
+	// rather than serially calling the function on each input (requiring a wait
+	// of 1 second between calls), we'll want to execute each in a goroutine. Let's
+	// say we had 100 inputs, we may not want to create that many go routines, so
+	// instead, we can create a pool of goroutines that work on our inputs as fast as
+	// possible.
+
+	// Create a worker pool using 50 goroutines:
+	workerPool := worker.NewWorkerPool(50, slowAddTenToFloat)
+
+	// went want to shutdown the workerPool at the end of it's use to ensure we don't
+	// leak go routines
+	defer workerPool.Shutdown()
+
+	// Loop over 100 inputs and run slowAddTenToFloat
+	for i := 0; i < 100; i++ {
+		// Run accepts a receive channel for each input, but it is not required
+		// to demonstrate receiving, we'll receive the results when the input
+		// is 50:
+		if i == 50 {
+			receive := make(chan float64)
+			workerPool.Run(i, receive)
+
+			// since we don't want to slow down the input loop, let's receive the
+			// result in a separate go routine
+			go func(input int, rec chan float64) {
+				defer close(rec)
+				result := <-rec
+				fmt.Printf("Receive Result: %.2f for Input: %d\n", result, input)
+			}(i, receive)
+		} else {
+
+		}
+	}
+
+	// 100 inputs with 50 go routines should take 2 seconds, so let's wait a bit longer than that:
+	time.Sleep((2 * time.Second) + (500 * time.Millisecond))
+
+	// Output:
+	// Receive Result: 60.00 for Input: 50
+}
+
+func Example_concurrentOrdered() {
+	// Expanding on the previous idea, let's assume that we want to receive the result for
+	// every input. That would normally require some specialized synchronization and boilerplate,
+	// but the worker package contains a ordered group type for exactly this functionality
+
+	// This time, let's create a worker pool and use the MAXGOPROCS value to determine the number
+	// of workers
+	workerCount := worker.OptimalWorkerCount()
+	workerPool := worker.NewWorkerPool(workerCount, slowAddTenToFloat)
+
+	// Shutdown the worker pool when complete
+	defer workerPool.Shutdown()
+
+	// now we can create our ordered group type and pass in the worker pool, and since we know our
+	// number of inputs (let's choose 12 this time), we can pass that to the group as well.
+	const numInputs = 12
+	orderedGroup := worker.NewOrderedGroup(workerPool, numInputs)
+
+	// loop over our inputs and pass them to the group
+	for i := 0; i < numInputs; i++ {
+		// ordered group has a strict size constraint (set in the NewOrderedGroup func), and will
+		// error if the number of inputs pushed exceeds that size constraint
+		err := orderedGroup.Push(i)
+		if err != nil {
+			panic(err)
+		}
+	}
+
+	// now we can simply call Wait() to receive the results
+	results := orderedGroup.Wait()
+
+	// Note that the order of the results is consistent with the order in which they were pushed
+	for idx, result := range results {
+		fmt.Printf("Received Result: %.2f for Input: %d\n", result, idx)
+	}
+
+	// Output:
+	// Received Result: 10.00 for Input: 0
+	// Received Result: 11.00 for Input: 1
+	// Received Result: 12.00 for Input: 2
+	// Received Result: 13.00 for Input: 3
+	// Received Result: 14.00 for Input: 4
+	// Received Result: 15.00 for Input: 5
+	// Received Result: 16.00 for Input: 6
+	// Received Result: 17.00 for Input: 7
+	// Received Result: 18.00 for Input: 8
+	// Received Result: 19.00 for Input: 9
+	// Received Result: 20.00 for Input: 10
+	// Received Result: 21.00 for Input: 11
+}
+
+func Example_concurrentOrderedSimple() {
+	// This last example highlights a simplified version of the previous example. While
+	// the ordered example provides tuning knobs for total goroutines and allows pushing
+	// data dynamically, it can be quite verbose and difficult to read at times. The worker
+	// package also provides a utility function that simplifies the ordered concurrent
+	// processing into a worker function and a slice of inputs
+
+	// Let's create our inputs 0-12 like in the previous example
+	const numInputs = 12
+	inputs := make([]int, numInputs)
+	for i := 0; i < numInputs; i++ {
+		inputs[i] = i
+	}
+
+	// Now, we can just call ConcurrentDo with the inputs and worker func:
+	results := worker.ConcurrentDo(slowAddTenToFloat, inputs)
+
+	// Note that the order of the results is consistent with the order of inputs
+	for i := 0; i < numInputs; i++ {
+		fmt.Printf("Received Result: %.2f for Input: %d\n", results[i], inputs[i])
+	}
+
+	// Output:
+	// Received Result: 10.00 for Input: 0
+	// Received Result: 11.00 for Input: 1
+	// Received Result: 12.00 for Input: 2
+	// Received Result: 13.00 for Input: 3
+	// Received Result: 14.00 for Input: 4
+	// Received Result: 15.00 for Input: 5
+	// Received Result: 16.00 for Input: 6
+	// Received Result: 17.00 for Input: 7
+	// Received Result: 18.00 for Input: 8
+	// Received Result: 19.00 for Input: 9
+	// Received Result: 20.00 for Input: 10
+	// Received Result: 21.00 for Input: 11
+}

+ 206 - 0
pkg/util/worker/worker.go

@@ -0,0 +1,206 @@
+package worker
+
+import (
+	"fmt"
+	"runtime"
+	"sync"
+
+	"github.com/kubecost/cost-model/pkg/collections"
+	"github.com/kubecost/cost-model/pkg/util/atomic"
+)
+
+// Worker is a transformation function from input type T to output type U.
+type Worker[T any, U any] func(T) U
+
+// WorkerPool is a pool of go routines executing a Worker on supplied inputs via
+// the Run function.
+type WorkerPool[T any, U any] interface {
+	// Run executes a Worker in the pool on the provided input and onComplete receive chanel
+	// to get the results. An error is returned if the pool is shutdown, or is in the process
+	// of shutting down.
+	Run(input T, onComplete chan<- U) error
+
+	// Shutdown stops all of the workers (if running).
+	Shutdown()
+}
+
+// WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and
+// collect the results in a single slice.
+type WorkGroup[T any, U any] interface {
+	// Push adds a new input to the work group.
+	Push(T) error
+
+	// Wait waits for all pending worker tasks to complete, then returns all the results.
+	Wait() []U
+}
+
+// entry is an internal helper type for pushing payloads to the worker queue
+type entry[T any, U any] struct {
+	payload    T
+	onComplete chan<- U
+	close      bool
+}
+
+// queuedWorkerPool is a blocking queue based implementation of a WorkerPool
+type queuedWorkerPool[T any, U any] struct {
+	queue      collections.BlockingQueue[entry[T, U]]
+	work       Worker[T, U]
+	workers    int
+	isShutdown *atomic.AtomicBool
+}
+
+// ordered is a WorkGroup implementation which enforces ordering based on when
+// inputs were pushed onto the group.
+type ordered[T any, U any] struct {
+	workPool WorkerPool[T, U]
+	results  []U
+	wg       *sync.WaitGroup
+	count    int
+}
+
+// NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker
+// func used to transform inputs to outputs.
+func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
+	owq := &queuedWorkerPool[T, U]{
+		workers:    workers,
+		work:       work,
+		queue:      collections.NewBlockingQueue[entry[T, U]](),
+		isShutdown: atomic.NewAtomicBool(false),
+	}
+
+	// startup the designated workers
+	for i := 0; i < workers; i++ {
+		go owq.worker()
+	}
+
+	return owq
+}
+
+// Run executes a Worker in the pool on the provided input and onComplete receive chanel
+// to get the results. An error is returned if the pool is shutdown, or is in the process
+// of shutting down.
+func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
+	if wq.isShutdown.Get() {
+		return fmt.Errorf("WorkerPoolShutdown")
+	}
+
+	wq.queue.Enqueue(entry[T, U]{
+		payload:    input,
+		onComplete: onComplete,
+		close:      false,
+	})
+
+	return nil
+}
+
+// Shutdown stops all of the workers (if running).
+func (wq *queuedWorkerPool[T, U]) Shutdown() {
+	if !wq.isShutdown.CompareAndSet(false, true) {
+		return
+	}
+
+	for i := 0; i < wq.workers; i++ {
+		wq.queue.Enqueue(entry[T, U]{
+			close: true,
+		})
+	}
+}
+
+func (wq *queuedWorkerPool[T, U]) worker() {
+	for {
+		next := wq.queue.Dequeue()
+
+		// shutdown the worker on sentinel value
+		if next.close {
+			return
+		}
+
+		result := wq.work(next.payload)
+
+		// signal on complete if applicable
+		if next.onComplete != nil {
+			next.onComplete <- result
+		}
+	}
+}
+
+// NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which
+// they are pushed. Ordered groups do not support concurrent Push() calls.
+func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U] {
+	return &ordered[T, U]{
+		workPool: pool,
+		results:  make([]U, size),
+		wg:       new(sync.WaitGroup),
+		count:    0,
+	}
+}
+
+// Push adds a new input to the work group.
+func (ow *ordered[T, U]) Push(input T) error {
+	current := ow.count
+	if current >= len(ow.results) {
+		return fmt.Errorf("MaxCapacity")
+	}
+
+	onComplete := make(chan U)
+	err := ow.workPool.Run(input, onComplete)
+	if err != nil {
+		return err
+	}
+
+	ow.count++
+	ow.wg.Add(1)
+
+	go func(index int) {
+		defer close(onComplete)
+
+		ow.results[index] = <-onComplete
+		ow.wg.Done()
+	}(int(current))
+
+	return nil
+}
+
+// Wait waits for all pending worker tasks to complete, then returns all the results.
+func (ow *ordered[T, U]) Wait() []U {
+	ow.wg.Wait()
+	return ow.results
+}
+
+// these constraints protect against the possibility of unexpected output from runtime.NumCPU()
+const (
+	defaultMinWorkers = 4
+	defaultMaxWorkers = 16
+)
+
+// OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()
+func OptimalWorkerCount() int {
+	return OptimalWorkerCountInRange(defaultMinWorkers, defaultMaxWorkers)
+}
+
+// OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max
+// range
+func OptimalWorkerCountInRange(min int, max int) int {
+	cores := runtime.NumCPU()
+	if cores < min {
+		return min
+	}
+	if cores > max {
+		return max
+	}
+	return cores
+}
+
+// ConcurrentDo runs a pool of workers which concurrently call the provided worker func on each input to get ordered
+// output corresponding to the inputs
+func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
+	workerPool := NewWorkerPool(OptimalWorkerCount(), worker)
+	defer workerPool.Shutdown()
+
+	workGroup := NewOrderedGroup(workerPool, len(inputs))
+	for _, input := range inputs {
+		workGroup.Push(input)
+	}
+
+	return workGroup.Wait()
+}

+ 149 - 0
pkg/util/worker/worker_test.go

@@ -0,0 +1,149 @@
+package worker
+
+import (
+	"math/rand"
+	"runtime"
+	"sync"
+	"testing"
+	"time"
+)
+
+type void struct{}
+
+var none = void{}
+
+func waitChannelFor(wg *sync.WaitGroup) <-chan void {
+	ch := make(chan void)
+	go func() {
+		defer close(ch)
+
+		wg.Wait()
+		ch <- none
+	}()
+	return ch
+}
+
+func TestWorkerPoolShutdown(t *testing.T) {
+	const workers = 3
+
+	// running goroutines
+	routines := runtime.NumGoroutine()
+	t.Logf("Go Routines Before: %d\n", routines)
+
+	wp := NewWorkerPool(workers, func(any) any { return nil })
+	t.Logf("Go Routines After: %d\n", runtime.NumGoroutine())
+
+	wp.Shutdown()
+	time.Sleep(time.Second)
+	if runtime.NumGoroutine() != routines {
+		t.Errorf("Go routines after shutdown: %d != Go routines at start of test: %d\n", runtime.NumGoroutine(), routines)
+	}
+}
+
+func TestWorkerPoolExactWorkers(t *testing.T) {
+	const workers = 3
+
+	// worker func logs start/finish for simulated work
+	work := func(i int) void {
+		t.Logf("Starting Work: %d\n", i)
+		time.Sleep(2 * time.Second)
+		t.Logf("Finished Work: %d\n", i)
+		return none
+	}
+
+	var wg sync.WaitGroup
+	wg.Add(workers)
+
+	pool := NewWorkerPool(workers, work)
+	for i := 0; i < workers; i++ {
+		onComplete := make(chan void)
+
+		go func() {
+			defer close(onComplete)
+
+			<-onComplete
+			wg.Done()
+		}()
+
+		// run work on worker pool
+		pool.Run(i+1, onComplete)
+	}
+
+	select {
+	case <-waitChannelFor(&wg):
+	case <-time.After(5 * time.Second):
+		t.Errorf("Failed to Complete Run for %d jobs in 5s\n", workers)
+	}
+
+}
+
+func TestOrderedWorkGroup(t *testing.T) {
+	const workers = 5
+	const tasks = 10
+
+	// worker func logs start/finish for simulated work, returns input value
+	// for testing resulting group output
+	work := func(i int) int {
+		t.Logf("Starting Work: %d\n", i)
+		time.Sleep(2 * time.Second)
+		t.Logf("Finished Work: %d\n", i)
+		return i
+	}
+
+	pool := NewWorkerPool(workers, work)
+	ordered := NewOrderedGroup(pool, tasks)
+	input := make([]int, tasks)
+
+	// we create more tasks than workers to test queueing
+	for i := 0; i < tasks; i++ {
+		input[i] = i + 1
+		err := ordered.Push(input[i])
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// get results and verify they match the recorded inputs
+	results := ordered.Wait()
+	for i := 0; i < tasks; i++ {
+		if results[i] != input[i] {
+			t.Errorf("Expected Results[%d](%d) to equal Input[%d](%d)\n", i, results[i], i, input[i])
+		}
+	}
+
+	// The typical test run will show different tasks starting and stopping out of order (expected),
+	// the result collection handles the ordering in the group, which is what we want to ensure in the
+	// above assertion
+}
+
+func TestConcurrentDoOrdered(t *testing.T) {
+	// Perform a similar test to the above ordered test, but use the helper func with pre-built inputs
+	const tasks = 50
+
+	// worker func logs start/finish for simulated work, returns input value
+	// for testing resulting group output
+	work := func(i int) int {
+		t.Logf("Starting Work: %d\n", i)
+		time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
+		t.Logf("Finished Work: %d\n", i)
+		return i
+	}
+
+	// pre-build inputs
+	input := make([]int, tasks)
+	for i := 0; i < tasks; i++ {
+		input[i] = i + 1
+	}
+
+	// get results and verify they match the recorded inputs
+	results := ConcurrentDo(work, input)
+	for i := 0; i < tasks; i++ {
+		if results[i] != input[i] {
+			t.Errorf("Expected Results[%d](%d) to equal Input[%d](%d)\n", i, results[i], i, input[i])
+		}
+	}
+
+	// The typical test run will show different tasks starting and stopping out of order (expected),
+	// the result collection handles the ordering in the group, which is what we want to ensure in the
+	// above assertion
+}