Просмотр исходного кода

NodeStats Memory Leak fixes + Concurrent Processor (#3190)

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt 11 месяцев назад
Родитель
Сommit
f7dcf7676e

+ 17 - 6
core/pkg/nodestats/nodestats.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"io"
 	"net/http"
 	"os"
 
@@ -73,8 +74,6 @@ func (nssc *NodeStatsSummaryClient) GetNodeData() ([]*stats.Summary, error) {
 			return nil
 		}
 
-		defer resp.Body.Close()
-
 		data, err := nodeResponseToStatSummary(resp)
 		if err != nil {
 			log.Warnf("error converting node data: %s", err)
@@ -199,13 +198,25 @@ func NodeAddress(node *clustercache.Node) (string, int32, error) {
 }
 
 func nodeResponseToStatSummary(resp *http.Response) (*stats.Summary, error) {
+	if resp == nil || resp.Body == nil {
+		return nil, fmt.Errorf("response or response body is nil")
+	}
+
+	defer resp.Body.Close()
+
 	data := &stats.Summary{}
-	err := json.NewDecoder(resp.Body).Decode(&data)
-	if err == nil {
-		return data, nil
+
+	bytes, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return nil, fmt.Errorf("could not read response body: %w", err)
+	}
+
+	err = json.Unmarshal(bytes, data)
+	if err != nil {
+		return nil, fmt.Errorf("could not unmarshal response body: %w", err)
 	}
 
-	return nil, err
+	return data, nil
 }
 
 // loadBearerToken reads the service account token

+ 5 - 0
core/pkg/nodestats/request.go

@@ -2,6 +2,7 @@ package nodestats
 
 import (
 	"fmt"
+	"io"
 	"math"
 	"net/http"
 	"strconv"
@@ -64,6 +65,10 @@ func (c *NodeHttpClient) makeRequest(method string, URL string, bearerToken stri
 	}
 
 	if !(resp.StatusCode >= 200 && resp.StatusCode <= 299) {
+		if resp.Body != nil {
+			io.Copy(io.Discard, resp.Body)
+			resp.Body.Close()
+		}
 		return nil, fmt.Errorf("invalid response %s", strconv.Itoa(resp.StatusCode))
 	}
 

+ 84 - 9
core/pkg/util/worker/worker.go

@@ -29,6 +29,13 @@ type WorkerPool[T any, U any] interface {
 	Shutdown()
 }
 
+// WorkProcessor is a group of inputs that leverage a WorkerPool to run inputs through workers and
+// process the job results as they complete.
+type WorkProcessor[T any, U any] interface {
+	// Execute adds all inputs to be run by the worker pool and processed on completion.
+	Execute(inputs []T, process func(U)) error
+}
+
 // WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and
 // collect the results in a single slice.
 type WorkGroup[T any, U any] interface {
@@ -54,15 +61,6 @@ type queuedWorkerPool[T any, U any] struct {
 	isShutdown atomic.Bool
 }
 
-// ordered is a WorkGroup implementation which enforces ordering based on when
-// inputs were pushed onto the group.
-type ordered[T any, U any] struct {
-	workPool WorkerPool[T, U]
-	results  []U
-	wg       sync.WaitGroup
-	count    int
-}
-
 // NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker
 // func used to transform inputs to outputs.
 func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
@@ -128,6 +126,15 @@ func (wq *queuedWorkerPool[T, U]) worker() {
 	}
 }
 
+// ordered is a WorkGroup implementation which enforces ordering based on when
+// inputs were pushed onto the group.
+type ordered[T any, U any] struct {
+	workPool WorkerPool[T, U]
+	results  []U
+	wg       sync.WaitGroup
+	count    int
+}
+
 // NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which
 // they are pushed. Ordered groups do not support concurrent Push() calls.
 func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U] {
@@ -170,6 +177,53 @@ func (ow *ordered[T, U]) Wait() []U {
 	return ow.results
 }
 
+// orderedProcessor is a WorkProcessor implementation which processes inputs in batches
+// in the same order as the slice, serially on the same go routine. Note that the process go routine
+// will not be the same as the calling go routine. However, process will be called on the same goroutine.
+type orderedProcessor[T any, U any] struct {
+	workPool WorkerPool[T, U]
+}
+
+// NewOrderedProcessor creates a new ordered work processor for processing an execution result in the
+// order in which the inputs were passed.
+func NewOrderedProcessor[T any, U any](pool WorkerPool[T, U]) WorkProcessor[T, U] {
+	return &orderedProcessor[T, U]{
+		workPool: pool,
+	}
+}
+
+func (obp *orderedProcessor[T, U]) Execute(inputs []T, process func(U)) error {
+	if len(inputs) == 0 {
+		return nil
+	}
+
+	// Run all the inputs in order, creating a channel per input to receive results.
+	channels := make([]chan U, len(inputs))
+	for i, input := range inputs {
+		channels[i] = make(chan U)
+		err := obp.workPool.Run(input, channels[i])
+		if err != nil {
+			return err
+		}
+	}
+
+	// Create a separate goroutine to process to execute all the results serially
+	done := make(chan struct{})
+	go func() {
+		defer close(done)
+
+		for _, ch := range channels {
+			result := <-ch
+			process(result)
+
+			close(ch)
+		}
+	}()
+
+	<-done
+	return nil
+}
+
 // noResultGroup is a WorkGroup implementation which arbitrarily pushes inputs to
 // a runner pool to be executed concurrently. This group does not collect results.
 type noResultGroup[T any] struct {
@@ -373,3 +427,24 @@ func ConcurrentIterRunWith[T any](size int, runner Runner[T], inputs iter.Seq[T]
 
 	workGroup.Wait()
 }
+
+// ConcurrentOrderedProcess runs a pool of N workers which concurrently call the provided worker func on each input, then
+// calls the process function on each result, as it completes, in the same order as the inputs.
+func ConcurrentOrderedProcess[T any, U any](worker Worker[T, U], inputs []T, process func(U)) {
+	ConcurrentOrderedProcessWith(OptimalWorkerCount(), worker, inputs, process)
+}
+
+// ConcurrentOrderedProcess runs a pool of size workers which concurrently call the provided worker func on each input, then
+// calls the process function on each result, as it completes, in the same order as the inputs.
+func ConcurrentOrderedProcessWith[T any, U any](size int, worker Worker[T, U], inputs []T, process func(U)) {
+	if len(inputs) == 0 {
+		return
+	}
+
+	workerPool := NewWorkerPool(size, worker)
+	defer workerPool.Shutdown()
+
+	// processors block on execute, so no need to explicitly wait
+	workProcessor := NewOrderedProcessor(workerPool)
+	workProcessor.Execute(inputs, process)
+}

+ 31 - 0
core/pkg/util/worker/worker_test.go

@@ -283,3 +283,34 @@ func TestConcurrentRunWithLessThanOne(t *testing.T) {
 		t.Errorf("Expected to complete in 1.5s, took %dms", time.Since(now).Milliseconds())
 	}
 }
+
+func TestConcurrentOrderedProcess(t *testing.T) {
+	const (
+		tasks   = 10
+		workers = 3
+	)
+
+	workFunc := func(i int) int {
+		t.Logf("Starting Work For: %d\n", i)
+		time.Sleep(time.Duration(rand.Intn(500)+500) * time.Millisecond)
+		t.Logf("Finished Work For: %d\n", i)
+		return i
+	}
+
+	lastProcessed := -1
+	processFunc := func(i int) {
+		if i < lastProcessed {
+			t.Errorf("Expected to process in order, but got %d after %d", i, lastProcessed)
+		}
+		lastProcessed = i
+		t.Logf("Processing Result For: %d\n", i)
+	}
+
+	// create tasks
+	inputs := make([]int, tasks)
+	for i := 0; i < tasks; i++ {
+		inputs[i] = i + 1
+	}
+
+	ConcurrentOrderedProcessWith(workers, workFunc, inputs, processFunc)
+}