Browse Source

Merge pull request #1181 from kubecost/bolt/work-pool

Concurrent Worker Package
Matt Bolt 4 years ago
parent
commit
f4a3440233

+ 28 - 22
pkg/collections/blockingqueue.go

@@ -10,20 +10,20 @@ import (
 
 // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
 // This data structure should use a pool of worker goroutines to await work.
-type BlockingQueue interface {
+type BlockingQueue[T any] interface {
 	// Enqueue pushes an item onto the queue
-	Enqueue(item interface{})
+	Enqueue(item T)
 
 	// Dequeue removes the first item from the queue and returns it.
-	Dequeue() interface{}
+	Dequeue() T
 
 	// TryDequeue attempts to remove the first item from the queue and return it. This
 	// method does not block, and instead, returns true if the item was available and false
 	// otherwise
-	TryDequeue() (interface{}, bool)
+	TryDequeue() (T, bool)
 
 	// Each blocks modification and allows iteration of the queue.
-	Each(f func(int, interface{}))
+	Each(f func(int, T))
 
 	// Length returns the length of the queue
 	Length() int
@@ -36,25 +36,25 @@ type BlockingQueue interface {
 }
 
 // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
-type blockingSliceQueue struct {
-	q        []interface{}
+type blockingSliceQueue[T any] struct {
+	q        []T
 	l        *sync.Mutex
 	nonEmpty *sync.Cond
 }
 
 // NewBlockingQueue returns a new BlockingQueue implementation
-func NewBlockingQueue() BlockingQueue {
+func NewBlockingQueue[T any]() BlockingQueue[T] {
 	l := new(sync.Mutex)
 
-	return &blockingSliceQueue{
-		q:        []interface{}{},
+	return &blockingSliceQueue[T]{
+		q:        []T{},
 		l:        l,
 		nonEmpty: sync.NewCond(l),
 	}
 }
 
 // Enqueue pushes an item onto the queue
-func (q *blockingSliceQueue) Enqueue(item interface{}) {
+func (q *blockingSliceQueue[T]) Enqueue(item T) {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -63,7 +63,7 @@ func (q *blockingSliceQueue) Enqueue(item interface{}) {
 }
 
 // Dequeue removes the first item from the queue and returns it.
-func (q *blockingSliceQueue) Dequeue() interface{} {
+func (q *blockingSliceQueue[T]) Dequeue() T {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -76,7 +76,7 @@ func (q *blockingSliceQueue) Dequeue() interface{} {
 	e := q.q[0]
 
 	// nil 0 index to prevent leak
-	q.q[0] = nil
+	q.q[0] = defaultValue[T]()
 	q.q = q.q[1:]
 	return e
 }
@@ -84,24 +84,24 @@ func (q *blockingSliceQueue) Dequeue() interface{} {
 // TryDequeue attempts to remove the first item from the queue and return it. This
 // method does not block, and instead, returns true if the item was available and false
 // otherwise
-func (q *blockingSliceQueue) TryDequeue() (interface{}, bool) {
+func (q *blockingSliceQueue[T]) TryDequeue() (T, bool) {
 	q.l.Lock()
 	defer q.l.Unlock()
 
 	if len(q.q) == 0 {
-		return nil, false
+		return defaultValue[T](), false
 	}
 
 	e := q.q[0]
 
-	// nil 0 index to prevent leak
-	q.q[0] = nil
+	// default 0 index to prevent leak
+	q.q[0] = defaultValue[T]()
 	q.q = q.q[1:]
 	return e, true
 }
 
 // Each blocks modification and allows iteration of the queue.
-func (q *blockingSliceQueue) Each(f func(int, interface{})) {
+func (q *blockingSliceQueue[T]) Each(f func(int, T)) {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -111,7 +111,7 @@ func (q *blockingSliceQueue) Each(f func(int, interface{})) {
 }
 
 // Length returns the length of the queue
-func (q *blockingSliceQueue) Length() int {
+func (q *blockingSliceQueue[T]) Length() int {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -119,12 +119,12 @@ func (q *blockingSliceQueue) Length() int {
 }
 
 // IsEmpty returns true if the queue is empty
-func (q *blockingSliceQueue) IsEmpty() bool {
+func (q *blockingSliceQueue[T]) IsEmpty() bool {
 	return q.Length() == 0
 }
 
 // Clear empties the queue
-func (q *blockingSliceQueue) Clear() {
+func (q *blockingSliceQueue[T]) Clear() {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -132,5 +132,11 @@ func (q *blockingSliceQueue) Clear() {
 	// avoid capacity ballooning, but does feel like an implementation
 	// specific detail -- we can revisit if there are other relevant
 	// use-cases
-	q.q = []interface{}{}
+	q.q = []T{}
+}
+
+// defaultValue returns the language default for the T type
+func defaultValue[T any]() T {
+	var t T
+	return t
 }

+ 23 - 0
pkg/kubecost/common.go

@@ -0,0 +1,23 @@
+package kubecost
+
+// Pair is a generic struct containing a pair of instances, one of each type similar to std::pair
+type Pair[T any, U any] struct {
+	First  T
+	Second U
+}
+
+// Creates a new pair struct containing the provided parameters. This is useful for creating types
+// capable of representing common paired types (result, error), (result, bool), etc...
+func NewPair[T any, U any](first T, second U) Pair[T, U] {
+	return Pair[T, U]{
+		First:  first,
+		Second: second,
+	}
+}
+
+// DefaultValue[T] returns the default value for any generic type. This is helpful for generic
+// types where a type parameter can be a value type or pointer.
+func DefaultValue[T any]() T {
+	var t T
+	return t
+}

+ 6 - 8
pkg/prom/diagnostics.go

@@ -37,14 +37,12 @@ func GetPrometheusQueueState(client prometheus.Client) (*PrometheusQueueState, e
 	outbound := rlpc.TotalOutboundRequests()
 
 	requests := []*QueuedPromRequest{}
-	rlpc.queue.Each(func(_ int, entry interface{}) {
-		if req, ok := entry.(*workRequest); ok {
-			requests = append(requests, &QueuedPromRequest{
-				Context:   req.contextName,
-				Query:     req.query,
-				QueueTime: time.Since(req.start).Milliseconds(),
-			})
-		}
+	rlpc.queue.Each(func(_ int, req *workRequest) {
+		requests = append(requests, &QueuedPromRequest{
+			Context:   req.contextName,
+			Query:     req.query,
+			QueueTime: time.Since(req.start).Milliseconds(),
+		})
 	})
 
 	return &PrometheusQueueState{

+ 67 - 70
pkg/prom/prom.go

@@ -114,7 +114,7 @@ type RateLimitedPrometheusClient struct {
 	id             string
 	client         prometheus.Client
 	auth           *ClientAuth
-	queue          collections.BlockingQueue
+	queue          collections.BlockingQueue[*workRequest]
 	decorator      QueryParamsDecorator
 	rateLimitRetry *RateLimitRetryOpts
 	outbound       *atomic.AtomicInt32
@@ -139,7 +139,7 @@ func NewRateLimitedClient(
 	rateLimitRetryOpts *RateLimitRetryOpts,
 	queryLogFile string) (prometheus.Client, error) {
 
-	queue := collections.NewBlockingQueue()
+	queue := collections.NewBlockingQueue[*workRequest]()
 	outbound := atomic.NewAtomicInt32(0)
 
 	var logger *golog.Logger
@@ -235,83 +235,80 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 
 	for {
 		// blocks until there is an item available
-		item := rlpc.queue.Dequeue()
-
-		// Ensure the dequeued item was a workRequest
-		if we, ok := item.(*workRequest); ok {
-			// if we need to shut down all workers, we'll need to submit sentinel values
-			// that will force the worker to return
-			if we.closer {
-				return
-			}
+		we := rlpc.queue.Dequeue()
 
-			ctx := we.ctx
-			req := we.req
+		// if we need to shut down all workers, we'll need to submit sentinel values
+		// that will force the worker to return
+		if we.closer {
+			return
+		}
 
-			// decorate the raw query parameters
-			if rlpc.decorator != nil {
-				req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
-			}
+		ctx := we.ctx
+		req := we.req
 
-			// measure time in queue
-			timeInQueue := time.Since(we.start)
-
-			// Increment outbound counter
-			rlpc.outbound.Increment()
-
-			// Execute Request
-			roundTripStart := time.Now()
-			res, body, warnings, err := rlpc.client.Do(ctx, req)
-
-			// If retries on rate limited response is enabled:
-			// * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
-			// * Attempt to parse a Retry-After from response headers (common on 429)
-			// * If we couldn't determine how long to wait for a retry, use 1 second by default
-			if res != nil && retryRateLimit {
-				var status []*RateLimitResponseStatus
-				var retries int = retryOpts.MaxRetries
-				var defaultWait time.Duration = retryOpts.DefaultRetryWait
-
-				for httputil.IsRateLimited(res, body) && retries > 0 {
-					// calculate amount of time to wait before retry, in the event the default wait is used,
-					// an exponential backoff is applied based on the number of times we've retried.
-					retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
-					retries--
-
-					status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
-					log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
-
-					// To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
-					// to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
-					// don't want to block for 10 days.
-					if retryAfter > MaxRetryAfterDuration {
-						retryAfter = MaxRetryAfterDuration
-					}
-
-					// execute wait and retry
-					time.Sleep(retryAfter)
-					res, body, warnings, err = rlpc.client.Do(ctx, req)
-				}
+		// decorate the raw query parameters
+		if rlpc.decorator != nil {
+			req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
+		}
 
-				// if we've broken out of our retry loop and the resp is still rate limited,
-				// then let's generate a meaningful error to pass back
-				if retries == 0 && httputil.IsRateLimited(res, body) {
-					err = &RateLimitedResponseError{RateLimitStatus: status}
+		// measure time in queue
+		timeInQueue := time.Since(we.start)
+
+		// Increment outbound counter
+		rlpc.outbound.Increment()
+
+		// Execute Request
+		roundTripStart := time.Now()
+		res, body, warnings, err := rlpc.client.Do(ctx, req)
+
+		// If retries on rate limited response is enabled:
+		// * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
+		// * Attempt to parse a Retry-After from response headers (common on 429)
+		// * If we couldn't determine how long to wait for a retry, use 1 second by default
+		if res != nil && retryRateLimit {
+			var status []*RateLimitResponseStatus
+			var retries int = retryOpts.MaxRetries
+			var defaultWait time.Duration = retryOpts.DefaultRetryWait
+
+			for httputil.IsRateLimited(res, body) && retries > 0 {
+				// calculate amount of time to wait before retry, in the event the default wait is used,
+				// an exponential backoff is applied based on the number of times we've retried.
+				retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
+				retries--
+
+				status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
+				log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
+
+				// To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
+				// to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
+				// don't want to block for 10 days.
+				if retryAfter > MaxRetryAfterDuration {
+					retryAfter = MaxRetryAfterDuration
 				}
-			}
 
-			// Decrement outbound counter
-			rlpc.outbound.Decrement()
-			LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
+				// execute wait and retry
+				time.Sleep(retryAfter)
+				res, body, warnings, err = rlpc.client.Do(ctx, req)
+			}
 
-			// Pass back response data over channel to caller
-			we.respChan <- &workResponse{
-				res:      res,
-				body:     body,
-				warnings: warnings,
-				err:      err,
+			// if we've broken out of our retry loop and the resp is still rate limited,
+			// then let's generate a meaningful error to pass back
+			if retries == 0 && httputil.IsRateLimited(res, body) {
+				err = &RateLimitedResponseError{RateLimitStatus: status}
 			}
 		}
+
+		// Decrement outbound counter
+		rlpc.outbound.Decrement()
+		LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
+
+		// Pass back response data over channel to caller
+		we.respChan <- &workResponse{
+			res:      res,
+			body:     body,
+			warnings: warnings,
+			err:      err,
+		}
 	}
 }
 

+ 42 - 0
pkg/util/atomic/atomicbool.go

@@ -0,0 +1,42 @@
+package atomic
+
+import (
+	"sync/atomic"
+)
+
+// AtomicBool alias leverages a 32-bit integer CAS
+type AtomicBool int32
+
+// NewAtomicBool creates an AtomicBool with given default value
+func NewAtomicBool(value bool) *AtomicBool {
+	ab := new(AtomicBool)
+	ab.Set(value)
+	return ab
+}
+
+// Loads the bool value atomically
+func (ab *AtomicBool) Get() bool {
+	return atomic.LoadInt32((*int32)(ab)) != 0
+}
+
+// Sets the bool value atomically
+func (ab *AtomicBool) Set(value bool) {
+	if value {
+		atomic.StoreInt32((*int32)(ab), 1)
+	} else {
+		atomic.StoreInt32((*int32)(ab), 0)
+	}
+}
+
+// CompareAndSet sets value to new if current is equal to the current value. If the new value is
+// set, this function returns true.
+func (ab *AtomicBool) CompareAndSet(current, new bool) bool {
+	var o, n int32
+	if current {
+		o = 1
+	}
+	if new {
+		n = 1
+	}
+	return atomic.CompareAndSwapInt32((*int32)(ab), o, n)
+}

+ 148 - 0
pkg/util/worker/example_worker_test.go

@@ -0,0 +1,148 @@
+package worker_test
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/util/worker"
+)
+
+// slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64,
+// waits 1 second, then returns the result.
+func slowAddTenToFloat(i int) float64 {
+	result := float64(i + 10)
+	time.Sleep(time.Second)
+	return result
+}
+
+func Example_concurrentWorkers() {
+	// Assuming we have a list of ints we want to pass to slowAddTenToFloat(),
+	// rather than serially calling the function on each input (requiring a wait
+	// of 1 second between calls), we'll want to execute each in a goroutine. Let's
+	// say we had 100 inputs, we may not want to create that many go routines, so
+	// instead, we can create a pool of goroutines that work on our inputs as fast as
+	// possible.
+
+	// Create a worker pool using 50 goroutines:
+	workerPool := worker.NewWorkerPool(50, slowAddTenToFloat)
+
+	// we want to shutdown the workerPool at the end of it's use to ensure we don't
+	// leak go routines
+	defer workerPool.Shutdown()
+
+	// Loop over 100 inputs and run slowAddTenToFloat
+	for i := 0; i < 100; i++ {
+		// Run accepts a receive channel for each input, but it is not required.
+		// To demonstrate receiving, we'll receive the results when the input
+		// is 50:
+		if i == 50 {
+			receive := make(chan float64)
+			workerPool.Run(i, receive)
+
+			// since we don't want to slow down the input loop, let's receive the
+			// result in a separate go routine
+			go func(input int, rec chan float64) {
+				defer close(rec)
+				result := <-rec
+				fmt.Printf("Receive Result: %.2f for Input: %d\n", result, input)
+			}(i, receive)
+		} else {
+			// pass nil if receiving the result isn't necessary
+			workerPool.Run(i, nil)
+		}
+	}
+
+	// 100 inputs with 50 go routines should take 2 seconds, so let's wait a bit longer than that
+	time.Sleep((2 * time.Second) + (500 * time.Millisecond))
+
+	// Output:
+	// Receive Result: 60.00 for Input: 50
+}
+
+func Example_concurrentOrdered() {
+	// Expanding on the previous idea, let's assume that we want to receive the result for
+	// every input. That would normally require some specialized synchronization and boilerplate,
+	// but the worker package contains a ordered group type for exactly this functionality
+
+	// This time, let's create a worker pool and use the MAXGOPROCS value to determine the number
+	// of workers
+	workerCount := worker.OptimalWorkerCount()
+	workerPool := worker.NewWorkerPool(workerCount, slowAddTenToFloat)
+
+	// Shutdown the worker pool when complete
+	defer workerPool.Shutdown()
+
+	// now we can create our ordered group type and pass in the worker pool, and since we know our
+	// number of inputs (let's choose 12 this time), we can pass that to the group as well.
+	const numInputs = 12
+	orderedGroup := worker.NewOrderedGroup(workerPool, numInputs)
+
+	// loop over our inputs and pass them to the group
+	for i := 0; i < numInputs; i++ {
+		// ordered group has a strict size constraint (set in the NewOrderedGroup func), and will
+		// error if the number of inputs pushed exceeds that size constraint
+		err := orderedGroup.Push(i)
+		if err != nil {
+			panic(err)
+		}
+	}
+
+	// now we can simply call Wait() to receive the results
+	results := orderedGroup.Wait()
+
+	// Note that the order of the results is consistent with the order in which they were pushed
+	for idx, result := range results {
+		fmt.Printf("Received Result: %.2f for Input: %d\n", result, idx)
+	}
+
+	// Output:
+	// Received Result: 10.00 for Input: 0
+	// Received Result: 11.00 for Input: 1
+	// Received Result: 12.00 for Input: 2
+	// Received Result: 13.00 for Input: 3
+	// Received Result: 14.00 for Input: 4
+	// Received Result: 15.00 for Input: 5
+	// Received Result: 16.00 for Input: 6
+	// Received Result: 17.00 for Input: 7
+	// Received Result: 18.00 for Input: 8
+	// Received Result: 19.00 for Input: 9
+	// Received Result: 20.00 for Input: 10
+	// Received Result: 21.00 for Input: 11
+}
+
+func Example_concurrentOrderedSimple() {
+	// This last example highlights a simplified version of the previous example. While
+	// the ordered example provides tuning knobs for total goroutines and allows pushing
+	// data dynamically, it can be quite verbose and difficult to read at times. The worker
+	// package also provides a utility function that simplifies the ordered concurrent
+	// processing into a worker function and a slice of inputs
+
+	// Let's create our inputs 0-12 like in the previous example
+	const numInputs = 12
+	inputs := make([]int, numInputs)
+	for i := 0; i < numInputs; i++ {
+		inputs[i] = i
+	}
+
+	// Now, we can just call ConcurrentDo with the inputs and worker func:
+	results := worker.ConcurrentDo(slowAddTenToFloat, inputs)
+
+	// Note that the order of the results is consistent with the order of inputs
+	for i := 0; i < numInputs; i++ {
+		fmt.Printf("Received Result: %.2f for Input: %d\n", results[i], inputs[i])
+	}
+
+	// Output:
+	// Received Result: 10.00 for Input: 0
+	// Received Result: 11.00 for Input: 1
+	// Received Result: 12.00 for Input: 2
+	// Received Result: 13.00 for Input: 3
+	// Received Result: 14.00 for Input: 4
+	// Received Result: 15.00 for Input: 5
+	// Received Result: 16.00 for Input: 6
+	// Received Result: 17.00 for Input: 7
+	// Received Result: 18.00 for Input: 8
+	// Received Result: 19.00 for Input: 9
+	// Received Result: 20.00 for Input: 10
+	// Received Result: 21.00 for Input: 11
+}

+ 206 - 0
pkg/util/worker/worker.go

@@ -0,0 +1,206 @@
+package worker
+
+import (
+	"fmt"
+	"runtime"
+	"sync"
+
+	"github.com/kubecost/cost-model/pkg/collections"
+	"github.com/kubecost/cost-model/pkg/util/atomic"
+)
+
+// Worker is a transformation function from input type T to output type U.
+type Worker[T any, U any] func(T) U
+
+// WorkerPool is a pool of go routines executing a Worker on supplied inputs via
+// the Run function.
+type WorkerPool[T any, U any] interface {
+	// Run executes a Worker in the pool on the provided input and onComplete receive chanel
+	// to get the results. An error is returned if the pool is shutdown, or is in the process
+	// of shutting down.
+	Run(input T, onComplete chan<- U) error
+
+	// Shutdown stops all of the workers (if running).
+	Shutdown()
+}
+
+// WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and
+// collect the results in a single slice.
+type WorkGroup[T any, U any] interface {
+	// Push adds a new input to the work group.
+	Push(T) error
+
+	// Wait waits for all pending worker tasks to complete, then returns all the results.
+	Wait() []U
+}
+
+// entry is an internal helper type for pushing payloads to the worker queue
+type entry[T any, U any] struct {
+	payload    T
+	onComplete chan<- U
+	close      bool
+}
+
+// queuedWorkerPool is a blocking queue based implementation of a WorkerPool
+type queuedWorkerPool[T any, U any] struct {
+	queue      collections.BlockingQueue[entry[T, U]]
+	work       Worker[T, U]
+	workers    int
+	isShutdown *atomic.AtomicBool
+}
+
+// ordered is a WorkGroup implementation which enforces ordering based on when
+// inputs were pushed onto the group.
+type ordered[T any, U any] struct {
+	workPool WorkerPool[T, U]
+	results  []U
+	wg       *sync.WaitGroup
+	count    int
+}
+
+// NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker
+// func used to transform inputs to outputs.
+func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
+	owq := &queuedWorkerPool[T, U]{
+		workers:    workers,
+		work:       work,
+		queue:      collections.NewBlockingQueue[entry[T, U]](),
+		isShutdown: atomic.NewAtomicBool(false),
+	}
+
+	// startup the designated workers
+	for i := 0; i < workers; i++ {
+		go owq.worker()
+	}
+
+	return owq
+}
+
+// Run executes a Worker in the pool on the provided input and onComplete receive chanel
+// to get the results. An error is returned if the pool is shutdown, or is in the process
+// of shutting down.
+func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
+	if wq.isShutdown.Get() {
+		return fmt.Errorf("WorkerPoolShutdown")
+	}
+
+	wq.queue.Enqueue(entry[T, U]{
+		payload:    input,
+		onComplete: onComplete,
+		close:      false,
+	})
+
+	return nil
+}
+
+// Shutdown stops all of the workers (if running).
+func (wq *queuedWorkerPool[T, U]) Shutdown() {
+	if !wq.isShutdown.CompareAndSet(false, true) {
+		return
+	}
+
+	for i := 0; i < wq.workers; i++ {
+		wq.queue.Enqueue(entry[T, U]{
+			close: true,
+		})
+	}
+}
+
+func (wq *queuedWorkerPool[T, U]) worker() {
+	for {
+		next := wq.queue.Dequeue()
+
+		// shutdown the worker on sentinel value
+		if next.close {
+			return
+		}
+
+		result := wq.work(next.payload)
+
+		// signal on complete if applicable
+		if next.onComplete != nil {
+			next.onComplete <- result
+		}
+	}
+}
+
+// NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which
+// they are pushed. Ordered groups do not support concurrent Push() calls.
+func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U] {
+	return &ordered[T, U]{
+		workPool: pool,
+		results:  make([]U, size),
+		wg:       new(sync.WaitGroup),
+		count:    0,
+	}
+}
+
+// Push adds a new input to the work group.
+func (ow *ordered[T, U]) Push(input T) error {
+	current := ow.count
+	if current >= len(ow.results) {
+		return fmt.Errorf("MaxCapacity")
+	}
+
+	onComplete := make(chan U)
+	err := ow.workPool.Run(input, onComplete)
+	if err != nil {
+		return err
+	}
+
+	ow.count++
+	ow.wg.Add(1)
+
+	go func(index int) {
+		defer close(onComplete)
+
+		ow.results[index] = <-onComplete
+		ow.wg.Done()
+	}(int(current))
+
+	return nil
+}
+
+// Wait waits for all pending worker tasks to complete, then returns all the results.
+func (ow *ordered[T, U]) Wait() []U {
+	ow.wg.Wait()
+	return ow.results
+}
+
+// these constraints protect against the possibility of unexpected output from runtime.NumCPU()
+const (
+	defaultMinWorkers = 4
+	defaultMaxWorkers = 16
+)
+
+// OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()
+func OptimalWorkerCount() int {
+	return OptimalWorkerCountInRange(defaultMinWorkers, defaultMaxWorkers)
+}
+
+// OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max
+// range
+func OptimalWorkerCountInRange(min int, max int) int {
+	cores := runtime.NumCPU()
+	if cores < min {
+		return min
+	}
+	if cores > max {
+		return max
+	}
+	return cores
+}
+
+// ConcurrentDo runs a pool of workers which concurrently call the provided worker func on each input to get ordered
+// output corresponding to the inputs
+func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
+	workerPool := NewWorkerPool(OptimalWorkerCount(), worker)
+	defer workerPool.Shutdown()
+
+	workGroup := NewOrderedGroup(workerPool, len(inputs))
+	for _, input := range inputs {
+		workGroup.Push(input)
+	}
+
+	return workGroup.Wait()
+}

+ 149 - 0
pkg/util/worker/worker_test.go

@@ -0,0 +1,149 @@
+package worker
+
+import (
+	"math/rand"
+	"runtime"
+	"sync"
+	"testing"
+	"time"
+)
+
+type void struct{}
+
+var none = void{}
+
+func waitChannelFor(wg *sync.WaitGroup) <-chan void {
+	ch := make(chan void)
+	go func() {
+		defer close(ch)
+
+		wg.Wait()
+		ch <- none
+	}()
+	return ch
+}
+
+func TestWorkerPoolShutdown(t *testing.T) {
+	const workers = 3
+
+	// running goroutines
+	routines := runtime.NumGoroutine()
+	t.Logf("Go Routines Before: %d\n", routines)
+
+	wp := NewWorkerPool(workers, func(any) any { return nil })
+	t.Logf("Go Routines After: %d\n", runtime.NumGoroutine())
+
+	wp.Shutdown()
+	time.Sleep(time.Second)
+	if runtime.NumGoroutine() != routines {
+		t.Errorf("Go routines after shutdown: %d != Go routines at start of test: %d\n", runtime.NumGoroutine(), routines)
+	}
+}
+
+func TestWorkerPoolExactWorkers(t *testing.T) {
+	const workers = 3
+
+	// worker func logs start/finish for simulated work
+	work := func(i int) void {
+		t.Logf("Starting Work: %d\n", i)
+		time.Sleep(2 * time.Second)
+		t.Logf("Finished Work: %d\n", i)
+		return none
+	}
+
+	var wg sync.WaitGroup
+	wg.Add(workers)
+
+	pool := NewWorkerPool(workers, work)
+	for i := 0; i < workers; i++ {
+		onComplete := make(chan void)
+
+		go func() {
+			defer close(onComplete)
+
+			<-onComplete
+			wg.Done()
+		}()
+
+		// run work on worker pool
+		pool.Run(i+1, onComplete)
+	}
+
+	select {
+	case <-waitChannelFor(&wg):
+	case <-time.After(5 * time.Second):
+		t.Errorf("Failed to Complete Run for %d jobs in 5s\n", workers)
+	}
+
+}
+
+func TestOrderedWorkGroup(t *testing.T) {
+	const workers = 5
+	const tasks = 10
+
+	// worker func logs start/finish for simulated work, returns input value
+	// for testing resulting group output
+	work := func(i int) int {
+		t.Logf("Starting Work: %d\n", i)
+		time.Sleep(2 * time.Second)
+		t.Logf("Finished Work: %d\n", i)
+		return i
+	}
+
+	pool := NewWorkerPool(workers, work)
+	ordered := NewOrderedGroup(pool, tasks)
+	input := make([]int, tasks)
+
+	// we create more tasks than workers to test queueing
+	for i := 0; i < tasks; i++ {
+		input[i] = i + 1
+		err := ordered.Push(input[i])
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// get results and verify they match the recorded inputs
+	results := ordered.Wait()
+	for i := 0; i < tasks; i++ {
+		if results[i] != input[i] {
+			t.Errorf("Expected Results[%d](%d) to equal Input[%d](%d)\n", i, results[i], i, input[i])
+		}
+	}
+
+	// The typical test run will show different tasks starting and stopping out of order (expected),
+	// the result collection handles the ordering in the group, which is what we want to ensure in the
+	// above assertion
+}
+
+func TestConcurrentDoOrdered(t *testing.T) {
+	// Perform a similar test to the above ordered test, but use the helper func with pre-built inputs
+	const tasks = 50
+
+	// worker func logs start/finish for simulated work, returns input value
+	// for testing resulting group output
+	work := func(i int) int {
+		t.Logf("Starting Work: %d\n", i)
+		time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
+		t.Logf("Finished Work: %d\n", i)
+		return i
+	}
+
+	// pre-build inputs
+	input := make([]int, tasks)
+	for i := 0; i < tasks; i++ {
+		input[i] = i + 1
+	}
+
+	// get results and verify they match the recorded inputs
+	results := ConcurrentDo(work, input)
+	for i := 0; i < tasks; i++ {
+		if results[i] != input[i] {
+			t.Errorf("Expected Results[%d](%d) to equal Input[%d](%d)\n", i, results[i], i, input[i])
+		}
+	}
+
+	// The typical test run will show different tasks starting and stopping out of order (expected),
+	// the result collection handles the ordering in the group, which is what we want to ensure in the
+	// above assertion
+}