瀏覽代碼

Worker pkg Update (#2879)

* Update the worker APIs with a collector and a runner

Signed-off-by: Matt Bolt <mbolt35@gmail.com>

* update go.mod to 1.22

Signed-off-by: Matt Bolt <mbolt35@gmail.com>

* Update codedocs appropriately

Signed-off-by: Matt Bolt <mbolt35@gmail.com>

* Update codedocs appropriately

Signed-off-by: Matt Bolt <mbolt35@gmail.com>

---------

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt 1 年之前
父節點
當前提交
6a1ad9a366
共有 3 個文件被更改,包括 298 次插入7 次删除
  1. 1 1
      core/go.mod
  2. 160 5
      core/pkg/util/worker/worker.go
  3. 137 1
      core/pkg/util/worker/worker_test.go

+ 1 - 1
core/go.mod

@@ -1,6 +1,6 @@
 module github.com/opencost/opencost/core
 
-go 1.21.0
+go 1.22.0
 
 require (
 	github.com/davecgh/go-spew v1.1.1

+ 160 - 5
core/pkg/util/worker/worker.go

@@ -9,6 +9,9 @@ import (
 	"github.com/opencost/opencost/core/pkg/collections"
 )
 
+// Runner is a function type that takes a single input and returns nothing.
+type Runner[T any] func(T)
+
 // Worker is a transformation function from input type T to output type U.
 type Worker[T any, U any] func(T) U
 
@@ -54,7 +57,7 @@ type queuedWorkerPool[T any, U any] struct {
 type ordered[T any, U any] struct {
 	workPool WorkerPool[T, U]
 	results  []U
-	wg       *sync.WaitGroup
+	wg       sync.WaitGroup
 	count    int
 }
 
@@ -129,7 +132,6 @@ func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T,
 	return &ordered[T, U]{
 		workPool: pool,
 		results:  make([]U, size),
-		wg:       new(sync.WaitGroup),
 		count:    0,
 	}
 }
@@ -166,6 +168,96 @@ func (ow *ordered[T, U]) Wait() []U {
 	return ow.results
 }
 
+// noResultGroup is a WorkGroup implementation which arbitrarily pushes inputs to
+// a runner pool to be executed concurrently. This group does not collect results.
+type noResultGroup[T any] struct {
+	workPool WorkerPool[T, struct{}]
+	wg       sync.WaitGroup
+}
+
+// NewNoResultGroup creates a new WorkGroup implementation for processing a group of inputs concurrently. This
+// work group implementation does not collect results, and therefore, requires a worker pool with a struct{} output.
+func NewNoResultGroup[T any](pool WorkerPool[T, struct{}]) WorkGroup[T, struct{}] {
+	return &noResultGroup[T]{
+		workPool: pool,
+	}
+}
+
+// Push adds a new input to the work group.
+func (ow *noResultGroup[T]) Push(input T) error {
+	onComplete := make(chan struct{})
+	err := ow.workPool.Run(input, onComplete)
+	if err != nil {
+		return err
+	}
+
+	ow.wg.Add(1)
+
+	go func() {
+		defer close(onComplete)
+		defer ow.wg.Done()
+
+		<-onComplete
+	}()
+
+	return nil
+}
+
+// Wait waits for all pending worker tasks to complete, then returns all the results.
+func (ow *noResultGroup[T]) Wait() []struct{} {
+	ow.wg.Wait()
+	return []struct{}{}
+}
+
+// collector is a WorkGroup implementation which collects non-nil results into the results slice
+// and ignores any nil results.
+type collector[T any, U any] struct {
+	workPool   WorkerPool[T, *U]
+	resultLock sync.Mutex
+	results    []*U
+	wg         sync.WaitGroup
+}
+
+// NewCollectionGroup creates a new WorkGroup implementation for processing a group of inputs concurrently. The
+// collection group implementation will collect all non-nil results into the output slice. Thus, the worker pool
+// parameter requires the output type to be a pointer.
+func NewCollectionGroup[T any, U any](pool WorkerPool[T, *U]) WorkGroup[T, *U] {
+	return &collector[T, U]{
+		workPool: pool,
+	}
+}
+
+// Push adds a new input to the work group.
+func (ow *collector[T, U]) Push(input T) error {
+	onComplete := make(chan *U)
+	err := ow.workPool.Run(input, onComplete)
+	if err != nil {
+		return err
+	}
+
+	ow.wg.Add(1)
+
+	go func() {
+		defer ow.wg.Done()
+		defer close(onComplete)
+
+		result := <-onComplete
+		if result != nil {
+			ow.resultLock.Lock()
+			ow.results = append(ow.results, result)
+			ow.resultLock.Unlock()
+		}
+	}()
+
+	return nil
+}
+
+// Wait waits for all pending worker tasks to complete, then returns all the results.
+func (ow *collector[T, U]) Wait() []*U {
+	ow.wg.Wait()
+	return ow.results
+}
+
 // these constraints protect against the possibility of unexpected output from runtime.NumCPU()
 const (
 	defaultMinWorkers = 4
@@ -190,10 +282,21 @@ func OptimalWorkerCountInRange(min int, max int) int {
 	return cores
 }
 
-// ConcurrentDo runs a pool of workers which concurrently call the provided worker func on each input to get ordered
-// output corresponding to the inputs
+// ConcurrentDo runs a pool of N workers which concurrently call the provided worker func on each
+// input to get ordered output corresponding to the inputs. The total number of workers is determined
+// by the total number of CPUs available, bound to a range from 4-16.
 func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
-	workerPool := NewWorkerPool(OptimalWorkerCount(), worker)
+	return ConcurrentDoWith(OptimalWorkerCount(), worker, inputs)
+}
+
+// ConcurrentDoWith runs a pool of workers of the specified size which concurrently call the provided worker func
+// on each input to get ordered output corresponding to the inputs. Size inputs < 1 will automatically be set to 1.
+func ConcurrentDoWith[T any, U any](size int, worker Worker[T, U], inputs []T) []U {
+	if size < 1 {
+		size = 1
+	}
+
+	workerPool := NewWorkerPool(size, worker)
 	defer workerPool.Shutdown()
 
 	workGroup := NewOrderedGroup(workerPool, len(inputs))
@@ -203,3 +306,55 @@ func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U {
 
 	return workGroup.Wait()
 }
+
+// ConcurrentCollect runs a pool of N workers which concurrently call the provided worker func on each
+// input to get a result slice of non-nil outputs. The total number of workers is determined
+// by the total number of CPUs available, bound to a range from 4-16.
+func ConcurrentCollect[T any, U any](workerFunc Worker[T, *U], inputs []T) []*U {
+	return ConcurrentCollectWith(OptimalWorkerCount(), workerFunc, inputs)
+}
+
+// ConcurrentCollectWith runs a pool of workers of the specified size which concurrently call the provided worker
+// func on each input to get a result slice of non-nil outputs. Size inputs < 1 will automatically be set to 1.
+func ConcurrentCollectWith[T any, U any](size int, workerFunc Worker[T, *U], inputs []T) []*U {
+	if size < 1 {
+		size = 1
+	}
+
+	workerPool := NewWorkerPool(size, workerFunc)
+	defer workerPool.Shutdown()
+
+	workGroup := NewCollectionGroup(workerPool)
+	for _, input := range inputs {
+		workGroup.Push(input)
+	}
+
+	return workGroup.Wait()
+}
+
+// ConcurrentRun runs a pool of N workers which concurrently call the provided runner func on each
+// input. The total number of workers is determined by the total number of CPUs available, bound to
+// a range from 4-16.
+func ConcurrentRun[T any](runner Runner[T], inputs []T) {
+	ConcurrentRunWith(OptimalWorkerCount(), runner, inputs)
+}
+
+// ConcurrentRunWith runs a pool of runners of the specified size which concurrently call the provided runner
+// func on each input. Size inputs < 1 will automatically be set to 1.
+func ConcurrentRunWith[T any](size int, runner Runner[T], inputs []T) {
+	if size < 1 {
+		size = 1
+	}
+
+	workerPool := NewWorkerPool(size, func(input T) (void struct{}) {
+		runner(input)
+		return
+	})
+
+	workGroup := NewNoResultGroup(workerPool)
+	for _, input := range inputs {
+		workGroup.Push(input)
+	}
+
+	workGroup.Wait()
+}

+ 137 - 1
core/pkg/util/worker/worker_test.go

@@ -74,7 +74,6 @@ func TestWorkerPoolExactWorkers(t *testing.T) {
 	case <-time.After(5 * time.Second):
 		t.Errorf("Failed to Complete Run for %d jobs in 5s\n", workers)
 	}
-
 }
 
 func TestOrderedWorkGroup(t *testing.T) {
@@ -116,6 +115,38 @@ func TestOrderedWorkGroup(t *testing.T) {
 	// above assertion
 }
 
+func TestConcurrentRun(t *testing.T) {
+	const tasks = 50
+
+	var wg sync.WaitGroup
+	wg.Add(tasks)
+
+	// worker func logs start/finish for simulated work, returns input value
+	// for testing resulting group output
+	work := func(i int) {
+		defer wg.Done()
+
+		t.Logf("Starting Work: %d\n", i)
+		time.Sleep(time.Duration(rand.Intn(250)+250) * time.Millisecond)
+		t.Logf("Finished Work: %d\n", i)
+	}
+
+	// pre-build inputs
+	input := make([]int, tasks)
+	for i := 0; i < tasks; i++ {
+		input[i] = i + 1
+	}
+
+	// get results and verify they match the recorded inputs
+	ConcurrentRunWith(10, work, input)
+
+	select {
+	case <-waitChannelFor(&wg):
+	case <-time.After(5 * time.Second):
+		t.Errorf("Failed to Complete Run for %d jobs in 5s\n", tasks)
+	}
+}
+
 func TestConcurrentDoOrdered(t *testing.T) {
 	// Perform a similar test to the above ordered test, but use the helper func with pre-built inputs
 	const tasks = 50
@@ -147,3 +178,108 @@ func TestConcurrentDoOrdered(t *testing.T) {
 	// the result collection handles the ordering in the group, which is what we want to ensure in the
 	// above assertion
 }
+
+func TestConcurrentCollect(t *testing.T) {
+	type A struct {
+		Value int
+	}
+
+	type B struct {
+		Value int
+	}
+
+	// Perform a similar test to the above ordered test, but use the helper func with pre-built inputs
+	const tasks = 100
+	const expectedResults = 50
+
+	var inputs []*A
+	for i := 0; i < tasks; i++ {
+		inputs = append(inputs, &A{Value: i})
+	}
+
+	workerFunc := func(a *A) *B {
+		time.Sleep(time.Duration(rand.Intn(150)+100) * time.Millisecond)
+
+		if a.Value%2 == 0 {
+			return &B{Value: a.Value}
+		}
+
+		return nil
+	}
+
+	results := ConcurrentCollect(workerFunc, inputs)
+
+	if len(results) != expectedResults {
+		t.Errorf("Expected 50 results, got %d", len(results))
+	}
+
+	seen := map[int]bool{}
+	for _, result := range results {
+		if seen[result.Value] {
+			t.Errorf("Duplicate result: %d", result.Value)
+		}
+		seen[result.Value] = true
+
+		if result.Value%2 != 0 {
+			t.Errorf("Found odd value: %d", result.Value)
+		}
+	}
+}
+
+func TestConcurrentDoWithLessThanOne(t *testing.T) {
+	const tasks = 4
+
+	var wg sync.WaitGroup
+	wg.Add(tasks)
+
+	now := time.Now()
+
+	doIt := func(i int) int {
+		defer wg.Done()
+		time.Sleep(250 * time.Millisecond)
+		return i
+	}
+
+	results := ConcurrentDoWith(-1, doIt, []int{1, 2, 3, 4})
+
+	select {
+	case <-waitChannelFor(&wg):
+	case <-time.After(2 * time.Second):
+		t.Errorf("Failed to Complete Run for %d jobs in 2s\n", tasks)
+	}
+
+	if time.Since(now) > 1500*time.Millisecond {
+		t.Errorf("Expected to complete in 1.5s, took %dms", time.Since(now).Milliseconds())
+	}
+	for i := 1; i <= tasks; i++ {
+		if results[i-1] != i {
+			t.Errorf("Expected %d, got %d", i, results[i])
+		}
+	}
+}
+
+func TestConcurrentRunWithLessThanOne(t *testing.T) {
+	const tasks = 4
+
+	var wg sync.WaitGroup
+	wg.Add(tasks)
+
+	now := time.Now()
+
+	doIt := func(i int) {
+		defer wg.Done()
+		time.Sleep(250 * time.Millisecond)
+	}
+
+	ConcurrentRunWith(-1, doIt, []int{1, 2, 3, 4})
+
+	select {
+	case <-waitChannelFor(&wg):
+	case <-time.After(2 * time.Second):
+		t.Errorf("Failed to Complete Run for %d jobs in 2s\n", tasks)
+	}
+
+	if time.Since(now) > 1500*time.Millisecond {
+		t.Errorf("Expected to complete in 1.5s, took %dms", time.Since(now).Milliseconds())
+	}
+}