2
0
Эх сурвалжийг харах

gzip wal files (#3185)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 11 сар өмнө
parent
commit
97cb9e756b

+ 6 - 4
modules/collector-source/pkg/metric/repository.go

@@ -53,16 +53,18 @@ func (r *MetricRepository) GetCollector(interval string, t time.Time) (MetricSto
 
 // Update calls Update on the collectors for each resolution
 func (r *MetricRepository) Update(
-	updates []Update,
-	timestamp time.Time,
+	updateSet *UpdateSet,
 ) {
 	r.lock.Lock()
 	defer r.lock.Unlock()
+	if updateSet == nil {
+		return
+	}
 
-	for _, update := range updates {
+	for _, update := range updateSet.Updates {
 		// Call update on the collectors for each resolution
 		for _, resCollector := range r.resolutionStores {
-			resCollector.update(update.Name, update.Labels, update.Value, timestamp, update.AdditionalInfo)
+			resCollector.update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
 		}
 	}
 }

+ 3 - 2
modules/collector-source/pkg/metric/update.go

@@ -5,7 +5,8 @@ import (
 )
 
 type UpdateSet struct {
-	Updates []Update `json:"updates"`
+	Timestamp time.Time `json:"timestamp"`
+	Updates   []Update  `json:"updates"`
 }
 
 type Update struct {
@@ -16,5 +17,5 @@ type Update struct {
 }
 
 type Updater interface {
-	Update([]Update, time.Time)
+	Update(*UpdateSet)
 }

+ 66 - 18
modules/collector-source/pkg/metric/walinator.go

@@ -1,7 +1,10 @@
 package metric
 
 import (
+	"bytes"
+	"compress/gzip"
 	"fmt"
+	"io"
 	"path"
 	"sort"
 	"strings"
@@ -12,6 +15,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/storage"
 	"github.com/opencost/opencost/core/pkg/util/json"
+	"github.com/opencost/opencost/core/pkg/util/worker"
 	"github.com/opencost/opencost/modules/collector-source/pkg/util"
 )
 
@@ -47,7 +51,7 @@ func NewWalinator(
 	if err != nil {
 		return nil, fmt.Errorf("filed to create path formatter for scrape controller: %s", err.Error())
 	}
-	encoder := exporter.NewJSONEncoder[UpdateSet]()
+	encoder := exporter.NewGZipEncoder(exporter.NewJSONEncoder[UpdateSet]())
 	exp := exporter.NewEventStorageExporter(
 		pathFormatter,
 		encoder,
@@ -64,12 +68,15 @@ func NewWalinator(
 }
 
 func (w *Walinator) Start() {
+	w.clean()
 	w.restore()
 
 	// Start cleaning function
 	go func() {
-		time.Sleep(w.limitResolution.Next().Sub(time.Now().UTC()))
-		w.clean()
+		for {
+			time.Sleep(w.limitResolution.Next().Sub(time.Now().UTC()))
+			w.clean()
+		}
 	}()
 }
 
@@ -80,37 +87,78 @@ func (w *Walinator) restore() {
 		log.Errorf("failed to retrieve updates files: %s", err.Error())
 	}
 	limit := w.limitResolution.Limit()
-	for _, fi := range fileInfos {
+
+	workerFn := func(fi fileInfo) *UpdateSet {
 		if fi.timestamp.Before(limit) {
-			continue
+			return nil
 		}
 
 		b, err := w.storage.Read(fi.name)
 		if err != nil {
 			log.Errorf("failed to load file contents for '%s': %s", fi.name, err.Error())
-			continue
+			return nil
 		}
-		updateSet := UpdateSet{}
-		err = json.Unmarshal(b, &updateSet)
+
+		updateSet, err := deserializeUpdateSet(fi.ext, b)
 		if err != nil {
-			log.Errorf("failed to unmarshal file %s: %s", fi.name, err.Error())
-			continue
+			log.Errorf("failed to deserialize file contents for '%s': %s", fi.name, err.Error())
+			return nil
+		}
+
+		if updateSet.Timestamp.IsZero() {
+			updateSet.Timestamp = fi.timestamp
+		}
+
+		return updateSet
+	}
+
+	processFn := func(updateSet *UpdateSet) {
+		w.repo.Update(updateSet)
+	}
+	worker.ConcurrentOrderedProcessWith(worker.OptimalWorkerCount(), workerFn, fileInfos, processFn)
+}
+
+func deserializeUpdateSet(ext string, b []byte) (*UpdateSet, error) {
+	extSplit := strings.Split(ext, ".")
+	lastElem := extSplit[len(extSplit)-1]
+	switch lastElem {
+	case "json":
+		updateSet := &UpdateSet{}
+		err := json.Unmarshal(b, updateSet)
+		if err != nil {
+			return nil, fmt.Errorf("failed to unmarshal json: %w", err)
+		}
+		return updateSet, nil
+	case "gz":
+		buf := bytes.NewBuffer(b)
+		reader, err := gzip.NewReader(buf)
+		if err != nil {
+			return nil, fmt.Errorf("failed to decompress gzip: %w", err)
+
 		}
-		w.repo.Update(updateSet.Updates, fi.timestamp)
+		defer reader.Close()
+		decompressed, err := io.ReadAll(reader)
+		if err != nil {
+			return nil, fmt.Errorf("failed to read decompressed gzip: %w", err)
+		}
+
+		return deserializeUpdateSet(strings.TrimSuffix(ext, ".gz"), decompressed)
 	}
+	return nil, fmt.Errorf("unrecognized extension: '%s'", ext)
 }
 
 // Update calls update on the repo and then exports the update to storage
 func (w *Walinator) Update(
-	updates []Update,
-	timestamp time.Time,
+	updateSet *UpdateSet,
 ) {
+	if updateSet == nil {
+		return
+	}
+
 	// run update
-	w.repo.Update(updates, timestamp)
+	w.repo.Update(updateSet)
 
-	err := w.exporter.Export(timestamp, &UpdateSet{
-		Updates: updates,
-	})
+	err := w.exporter.Export(updateSet.Timestamp, updateSet)
 	if err != nil {
 		log.Errorf("failed to export update results: %s", err.Error())
 	}
@@ -126,7 +174,7 @@ func (w *Walinator) getFileInfos() ([]fileInfo, error) {
 	var fileInfos []fileInfo
 	for _, file := range files {
 		fileName := path.Base(file.Name)
-		fileNameComponents := strings.Split(fileName, ".")
+		fileNameComponents := strings.SplitN(fileName, ".", 2)
 		if len(fileNameComponents) != 2 {
 			log.Errorf("file has invalid name: %s", fileName)
 			continue

+ 143 - 52
modules/collector-source/pkg/metric/walinator_test.go

@@ -5,6 +5,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/storage"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric/aggregator"
@@ -62,20 +63,21 @@ func TestWalinator_Update(t *testing.T) {
 		resolutions,
 		repo,
 	)
-	inputUpdateSet1 := UpdateSet{
-		Updates: []Update{
-			{
-				Name: TestMetric,
-				Labels: map[string]string{
-					"test": "test",
-				},
-				Value:          1,
-				AdditionalInfo: nil,
+	inputUpdates1 := []Update{
+		{
+			Name: TestMetric,
+			Labels: map[string]string{
+				"test": "test",
 			},
+			Value:          1,
+			AdditionalInfo: nil,
 		},
 	}
 
-	wal.Update(inputUpdateSet1.Updates, time1)
+	wal.Update(&UpdateSet{
+		Timestamp: time1,
+		Updates:   inputUpdates1,
+	})
 
 	// check that the repo has a collector
 	if len(repo.resolutionStores["1d"].collectors) != 1 {
@@ -110,48 +112,51 @@ func TestWalinator_restore(t *testing.T) {
 		resolutions,
 		repo,
 	)
-	inputUpdateSet1 := UpdateSet{
-		Updates: []Update{
-			{
-				Name: TestMetric,
-				Labels: map[string]string{
-					"test": "test",
-				},
-				Value:          1,
-				AdditionalInfo: nil,
+	inputUpdates1 := []Update{
+		{
+			Name: TestMetric,
+			Labels: map[string]string{
+				"test": "test",
 			},
+			Value:          1,
+			AdditionalInfo: nil,
 		},
 	}
 
-	inputUpdateSet2 := UpdateSet{
-		Updates: []Update{
-			{
-				Name: TestMetric,
-				Labels: map[string]string{
-					"test": "test",
-				},
-				Value:          2,
-				AdditionalInfo: nil,
+	inputUpdates2 := []Update{
+		{
+			Name: TestMetric,
+			Labels: map[string]string{
+				"test": "test",
 			},
+			Value:          2,
+			AdditionalInfo: nil,
 		},
 	}
 
-	inputUpdateSet3 := UpdateSet{
-		Updates: []Update{
-			{
-				Name: TestMetric,
-				Labels: map[string]string{
-					"test": "test",
-				},
-				Value:          3,
-				AdditionalInfo: nil,
+	inputUpdates3 := []Update{
+		{
+			Name: TestMetric,
+			Labels: map[string]string{
+				"test": "test",
 			},
+			Value:          3,
+			AdditionalInfo: nil,
 		},
 	}
 
-	wal.Update(inputUpdateSet1.Updates, time1)
-	wal.Update(inputUpdateSet2.Updates, time2)
-	wal.Update(inputUpdateSet3.Updates, time3)
+	wal.Update(&UpdateSet{
+		Timestamp: time1,
+		Updates:   inputUpdates1,
+	})
+	wal.Update(&UpdateSet{
+		Timestamp: time2,
+		Updates:   inputUpdates2,
+	})
+	wal.Update(&UpdateSet{
+		Timestamp: time3,
+		Updates:   inputUpdates3,
+	})
 
 	repo2 := NewMetricRepository(
 		resolutions,
@@ -219,22 +224,29 @@ func TestWalinator_clean(t *testing.T) {
 		resolutions,
 		repo,
 	)
-	inputUpdateSet1 := UpdateSet{
-		Updates: []Update{
-			{
-				Name: TestMetric,
-				Labels: map[string]string{
-					"test": "test",
-				},
-				Value:          1,
-				AdditionalInfo: nil,
+	inputUpdates1 := []Update{
+		{
+			Name: TestMetric,
+			Labels: map[string]string{
+				"test": "test",
 			},
+			Value:          1,
+			AdditionalInfo: nil,
 		},
 	}
 
-	wal.Update(inputUpdateSet1.Updates, time1)
-	wal.Update(inputUpdateSet1.Updates, time2)
-	wal.Update(inputUpdateSet1.Updates, time3)
+	wal.Update(&UpdateSet{
+		Timestamp: time1,
+		Updates:   inputUpdates1,
+	})
+	wal.Update(&UpdateSet{
+		Timestamp: time2,
+		Updates:   inputUpdates1,
+	})
+	wal.Update(&UpdateSet{
+		Timestamp: time3,
+		Updates:   inputUpdates1,
+	})
 
 	files, err := wal.getFileInfos()
 	if err != nil {
@@ -254,3 +266,82 @@ func TestWalinator_clean(t *testing.T) {
 		t.Errorf("incorrect number of files after clean: wanted %d, got %d", 2, len(files))
 	}
 }
+
+func Test_deserializeUpdateSet(t *testing.T) {
+
+	inputUpdateSet1 := &UpdateSet{
+		Updates: []Update{
+			{
+				Name: TestMetric,
+				Labels: map[string]string{
+					"test": "test",
+				},
+				Value:          1,
+				AdditionalInfo: nil,
+			},
+		},
+	}
+
+	jsonEncoder := exporter.NewJSONEncoder[UpdateSet]()
+	gZipJsonEncoder := exporter.NewGZipEncoder(exporter.NewJSONEncoder[UpdateSet]())
+
+	invalidBytes := []byte("invalid")
+	jsonBytes1, _ := jsonEncoder.Encode(inputUpdateSet1)
+	gZipJsonBytes1, _ := gZipJsonEncoder.Encode(inputUpdateSet1)
+
+	tests := map[string]struct {
+		ext     string
+		b       []byte
+		want    *UpdateSet
+		wantErr bool
+	}{
+		"json with invalid": {
+			ext:     "json",
+			b:       invalidBytes,
+			want:    nil,
+			wantErr: true,
+		},
+		"json with json": {
+			ext:     "json",
+			b:       jsonBytes1,
+			want:    inputUpdateSet1,
+			wantErr: false,
+		},
+		"json with gzipjson": {
+			ext:     "json",
+			b:       gZipJsonBytes1,
+			want:    nil,
+			wantErr: true,
+		},
+		"json.gz with invalid": {
+			ext:     "json.gz",
+			b:       invalidBytes,
+			want:    nil,
+			wantErr: true,
+		},
+		"json.gz with json": {
+			ext:     "json.gz",
+			b:       jsonBytes1,
+			want:    nil,
+			wantErr: true,
+		},
+		"json.gz with gzipjson": {
+			ext:     "json.gz",
+			b:       gZipJsonBytes1,
+			want:    inputUpdateSet1,
+			wantErr: false,
+		},
+	}
+	for name, tt := range tests {
+		t.Run(name, func(t *testing.T) {
+			got, err := deserializeUpdateSet(tt.ext, tt.b)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("deserializeUpdateSet() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("deserializeUpdateSet() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 4 - 1
modules/collector-source/pkg/scrape/scrapecontroller.go

@@ -101,5 +101,8 @@ func (sc *ScrapeController) Scrape(timestamp time.Time) {
 	scrapeResults := concurrentScrape(scrapeFuncs...)
 
 	// once all results are returned run updates all at once with the same timestamp
-	sc.updater.Update(scrapeResults, timestamp)
+	sc.updater.Update(&metric.UpdateSet{
+		Timestamp: timestamp,
+		Updates:   scrapeResults,
+	})
 }