Explorar o código

Fix Memory Leak on Scrape Target Parse (#3593)

Matt Bolt hai 2 meses
pai
achega
822f5f9d56

+ 5 - 0
modules/collector-source/pkg/scrape/targetscraper.go

@@ -1,6 +1,7 @@
 package scrape
 
 import (
+	"io"
 	"sync"
 
 	"github.com/kubecost/events"
@@ -40,6 +41,7 @@ func (s *TargetScraper) Scrape() []metric.Update {
 	var scrapeFuncs []ScrapeFunc
 	for i := range targets {
 		target := targets[i]
+
 		fn := func() []metric.Update {
 			var scrapeResults []metric.Update
 			f, err := target.Load()
@@ -51,6 +53,9 @@ func (s *TargetScraper) Scrape() []metric.Update {
 				log.Errorf("failed to scrape target: %s", err.Error())
 				return scrapeResults
 			}
+			if closer, ok := f.(io.ReadCloser); ok {
+				defer closer.Close()
+			}
 			results, err := parser.Parse(f)
 			if err != nil {
 				errLock.Lock()

+ 76 - 9
modules/collector-source/pkg/scrape/targetscraper_test.go

@@ -1,7 +1,11 @@
 package scrape
 
 import (
+	"fmt"
+	"io"
 	"reflect"
+	"strings"
+	"sync/atomic"
 	"testing"
 
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
@@ -100,6 +104,53 @@ DCGM_FI_PROF_GR_ENGINE_ACTIVE{gpu="0",UUID="GPU-1",pci_bus_id="00000000:00:0A.0"
 DCGM_FI_DEV_DEC_UTIL{gpu="0",UUID="GPU-1",pci_bus_id="00000000:00:0A.0",device="nvidia0",modelName="Tesla T4",Hostname="localhost"} 0 
 `
 
+type CloseableStringReader struct {
+	*strings.Reader
+
+	closed *atomic.Bool
+}
+
+func newCloseableStringReader(reader *strings.Reader, closed *atomic.Bool) *CloseableStringReader {
+	return &CloseableStringReader{
+		Reader: reader,
+		closed: closed,
+	}
+}
+
+func (csr *CloseableStringReader) Close() error {
+	if csr.closed != nil {
+		csr.closed.Store(true)
+	}
+
+	return nil
+}
+
+type CloseableStringTarget struct {
+	sTarget *target.StringTarget
+	closed  *atomic.Bool
+}
+
+func newCloseableStringTarget(sTarget *target.StringTarget, closed *atomic.Bool) *CloseableStringTarget {
+	return &CloseableStringTarget{
+		sTarget: sTarget,
+		closed:  closed,
+	}
+}
+
+func (cst *CloseableStringTarget) Load() (io.Reader, error) {
+	reader, err := cst.sTarget.Load()
+	if err != nil {
+		return nil, err
+	}
+
+	sReader, ok := reader.(*strings.Reader)
+	if !ok {
+		return nil, fmt.Errorf("Reader was not a string reader")
+	}
+
+	return newCloseableStringReader(sReader, cst.closed), nil
+}
+
 func TestTargetScraper_Scrape(t *testing.T) {
 	tests := []struct {
 		name                 string
@@ -479,18 +530,34 @@ func TestTargetScraper_Scrape(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			scraper := tt.targetScraperFactory(target.NewDefaultTargetProvider(target.NewStringTarget(tt.scrapeText)))
-			scrapeResults := scraper.Scrape()
+			closed := new(atomic.Bool)
 
-			if len(scrapeResults) != len(tt.expected) {
-				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(scrapeResults))
-			}
+			for i := range 2 {
+				var sTarget target.ScrapeTarget
+				if i == 0 {
+					sTarget = target.NewStringTarget(tt.scrapeText)
+				} else {
+					sTarget = newCloseableStringTarget(target.NewStringTarget(tt.scrapeText), closed)
+				}
 
-			for i, expected := range tt.expected {
-				got := scrapeResults[i]
-				if !reflect.DeepEqual(expected, got) {
-					t.Errorf("Result did not match expected at index %d: got %v, want %v", i, got, expected)
+				scraper := tt.targetScraperFactory(target.NewDefaultTargetProvider(sTarget))
+				scrapeResults := scraper.Scrape()
+
+				if len(scrapeResults) != len(tt.expected) {
+					t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(scrapeResults))
 				}
+
+				for i, expected := range tt.expected {
+					got := scrapeResults[i]
+					if !reflect.DeepEqual(expected, got) {
+						t.Errorf("Result did not match expected at index %d: got %v, want %v", i, got, expected)
+					}
+				}
+			}
+
+			if !closed.Load() {
+				t.Errorf("Closeable target did not call Close on Scrape()")
+				t.Fail()
 			}
 		})
 	}