Procházet zdrojové kódy

Walinator (#3172)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb před 11 měsíci
rodič
revize
d9d4b29f10

+ 11 - 0
core/pkg/exporter/pathing/bingenpath.go

@@ -55,6 +55,17 @@ func (bsf *BingenStoragePathFormatter) RootDir() string {
 	return bsf.rootDir
 }
 
+// Dir returns the director that files will be placed in
+func (bsf *BingenStoragePathFormatter) Dir() string {
+	return path.Join(
+		bsf.rootDir,
+		bsf.clusterId,
+		baseStorageDir,
+		bsf.pipeline,
+		bsf.resolution,
+	)
+}
+
 // ToFullPath returns the full path to a file name within the storage directory using the format:
 //
 //	<root>/federated/<cluster>/etl/bingen/<pipeline>/<resolution>/<prefix>.<start-epoch>-<end-epoch>

+ 10 - 0
core/pkg/exporter/pathing/eventpath.go

@@ -53,6 +53,16 @@ func (espf *EventStoragePathFormatter) RootDir() string {
 	return espf.rootDir
 }
 
+// Dir  returns the director that files will be placed in
+func (espf *EventStoragePathFormatter) Dir() string {
+	return path.Join(
+		espf.rootDir,
+		espf.clusterId,
+		espf.event,
+		path.Join(espf.subPaths...),
+	)
+}
+
 // ToFullPath returns the full path to a file name within the storage directory using the format:
 //
 //	<root>/federated/<cluster>/<event>/YYYYMMDDHHmm.json

+ 3 - 0
core/pkg/exporter/pathing/pathing.go

@@ -5,6 +5,9 @@ type StoragePathFormatter[T any] interface {
 	// RootDir returns the root directory for the storage path.
 	RootDir() string
 
+	// Dir returns the director where files are placed
+	Dir() string
+
 	// ToFullPath returns the full path to a file name within the storage
 	// directory leveraging a prefix and an incoming T type (generally a daterange or timestamp).
 	ToFullPath(prefix string, in T, fileExt string) string

+ 29 - 4
modules/collector-source/pkg/collector/datasource.go

@@ -7,6 +7,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/clusters"
 	"github.com/opencost/opencost/core/pkg/diagnostics"
+	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/nodestats"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/storage"
@@ -45,17 +46,41 @@ func NewCollectorDataSource(
 	clusterCache clustercache.ClusterCache,
 	statSummaryClient nodestats.StatSummaryClient,
 ) source.OpenCostDataSource {
+	var resolutions []*util.Resolution
+	for _, resconf := range config.Resolutions {
+		resolution, err := util.NewResolution(resconf)
+		if err != nil {
+			log.Errorf("failed to create resolution %s", err.Error())
+			continue
+		}
+		resolutions = append(resolutions, resolution)
+	}
+
 	repo := metric.NewMetricRepository(
-		config.ClusterID,
-		config.Resolutions,
-		store,
+		resolutions,
 		NewOpenCostMetricStore,
 	)
+	var updater metric.Updater
+	updater = repo
+	if store != nil {
+		wal, err := metric.NewWalinator(
+			config.ClusterID,
+			store,
+			resolutions,
+			repo,
+		)
+		if err != nil {
+			log.Errorf("failed to initialize the walinator: %s", err.Error())
+		} else {
+			wal.Start()
+			updater = wal
+		}
+	}
 
 	scrapeController := scrape.NewScrapeController(
 		config.ScrapeInterval,
 		config.NetworkPort,
-		repo,
+		updater,
 		clusterCache,
 		statSummaryClient,
 	)

+ 6 - 108
modules/collector-source/pkg/metric/repository.go

@@ -2,44 +2,29 @@ package metric
 
 import (
 	"fmt"
-	"path"
-	"sort"
-	"strings"
 	"sync"
 	"time"
 
-	"github.com/opencost/opencost/core/pkg/exporter"
-	"github.com/opencost/opencost/core/pkg/exporter/pathing"
 	"github.com/opencost/opencost/core/pkg/log"
-	"github.com/opencost/opencost/core/pkg/storage"
-	"github.com/opencost/opencost/core/pkg/util/json"
 	"github.com/opencost/opencost/modules/collector-source/pkg/util"
 )
 
-const ControllerEventName = "controller"
-
-type RepositoryConfig struct {
-}
-
 // MetricRepository is an MetricUpdater which applies calls to update to all resolutions being tracked. It holds the
 // MetricStore instances for each resolution.
 type MetricRepository struct {
 	lock             sync.Mutex
 	resolutionStores map[string]*resolutionStores
-	exporter         exporter.EventExporter[UpdateSet]
 }
 
 func NewMetricRepository(
-	clusterID string,
-	resolutions []util.ResolutionConfiguration,
-	store storage.Storage,
+	resolutions []*util.Resolution,
 	storeFactory MetricStoreFactory,
 ) *MetricRepository {
 	resoluationCollectors := make(map[string]*resolutionStores)
-	for _, resconf := range resolutions {
-		resolution, err := util.NewResolution(resconf)
-		if err != nil {
-			log.Errorf("failed to create resolution %s", err.Error())
+	var limitResolution *util.Resolution
+	for _, resolution := range resolutions {
+		if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
+			limitResolution = resolution
 		}
 		resCollector, err := newResolutionStores(resolution, storeFactory)
 		if err != nil {
@@ -49,76 +34,9 @@ func NewMetricRepository(
 		resoluationCollectors[resolution.Interval()] = resCollector
 	}
 
-	repo := &MetricRepository{
+	return &MetricRepository{
 		resolutionStores: resoluationCollectors,
 	}
-
-	if store != nil {
-		pathFormatter, err := pathing.NewEventStoragePathFormatter("", clusterID, ControllerEventName)
-		if err != nil {
-			log.Errorf("filed to create path formatter for scrape controller: %s", err.Error())
-			return repo
-		}
-		encoder := exporter.NewJSONEncoder[UpdateSet]()
-		repo.exporter = exporter.NewEventStorageExporter(
-			pathFormatter,
-			encoder,
-			store,
-		)
-		// attempt to restore state from files
-		// get path of saved files
-		dirPath := path.Dir(pathFormatter.ToFullPath("", time.Time{}, ""))
-		files, err := store.List(dirPath)
-		if err != nil {
-			log.Errorf("failed to list files in scrape controller: %s", err.Error())
-		}
-		// find oldest limit
-		limit := time.Now().UTC()
-		for _, resStore := range repo.resolutionStores {
-			if limit.After(resStore.resolution.Limit()) {
-				limit = resStore.resolution.Limit()
-			}
-		}
-
-		// find files that are within limit
-		var filesToRun []string
-		for _, file := range files {
-			fileName := path.Base(file.Name)
-			timeString := strings.TrimSuffix(fileName, "."+encoder.FileExt())
-			timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
-			if err != nil {
-				log.Errorf("failed to parse fileName %s: %s", fileName, err.Error())
-				continue
-			}
-			if timestamp.After(limit) {
-				filesToRun = append(filesToRun, pathFormatter.ToFullPath("", timestamp, encoder.FileExt()))
-			}
-		}
-
-		// sort files
-		sort.Strings(filesToRun)
-
-		// open files and run updates
-		for _, fileName := range filesToRun {
-			b, err := store.Read(fileName)
-			if err != nil {
-				log.Errorf("failed to load file contents for '%s': %s", fileName, err.Error())
-				continue
-			}
-			updateSet := UpdateSet{}
-			err = json.Unmarshal(b, &updateSet)
-			if err != nil {
-				log.Errorf("failed to unmarshal file %s: %s", fileName, err.Error())
-				continue
-			}
-			filePrefix := path.Base(fileName)
-			timeString := strings.TrimSuffix(filePrefix, "."+encoder.FileExt())
-			timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
-			repo.Update(updateSet.Updates, timestamp)
-		}
-	}
-
-	return repo
 }
 
 func (r *MetricRepository) GetCollector(interval string, t time.Time) (MetricStore, error) {
@@ -147,26 +65,6 @@ func (r *MetricRepository) Update(
 			resCollector.update(update.Name, update.Labels, update.Value, timestamp, update.AdditionalInfo)
 		}
 	}
-
-	if r.exporter != nil {
-		err := r.exporter.Export(timestamp, &UpdateSet{
-			Updates: updates,
-		})
-		if err != nil {
-			log.Errorf("failed to export update results: %s", err.Error())
-		}
-	}
-}
-
-type UpdateSet struct {
-	Updates []Update `json:"updates"`
-}
-
-type Update struct {
-	Name           string            `json:"name"`
-	Labels         map[string]string `json:"labels"`
-	Value          float64           `json:"value"`
-	AdditionalInfo map[string]string `json:"additionalInfo"`
 }
 
 func (r *MetricRepository) Coverage() map[string][]time.Time {

+ 20 - 0
modules/collector-source/pkg/metric/update.go

@@ -0,0 +1,20 @@
+package metric
+
+import (
+	"time"
+)
+
+type UpdateSet struct {
+	Updates []Update `json:"updates"`
+}
+
+type Update struct {
+	Name           string            `json:"name"`
+	Labels         map[string]string `json:"labels"`
+	Value          float64           `json:"value"`
+	AdditionalInfo map[string]string `json:"additionalInfo"`
+}
+
+type Updater interface {
+	Update([]Update, time.Time)
+}

+ 169 - 0
modules/collector-source/pkg/metric/walinator.go

@@ -0,0 +1,169 @@
+package metric
+
+import (
+	"fmt"
+	"path"
+	"sort"
+	"strings"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/json"
+	"github.com/opencost/opencost/modules/collector-source/pkg/util"
+)
+
+const ControllerEventName = "controller"
+
+type fileInfo struct {
+	name      string
+	timestamp time.Time
+	ext       string
+}
+
+type Walinator struct {
+	storage         storage.Storage
+	paths           pathing.StoragePathFormatter[time.Time]
+	exporter        exporter.EventExporter[UpdateSet]
+	limitResolution *util.Resolution
+	repo            *MetricRepository
+}
+
+func NewWalinator(
+	clusterID string,
+	store storage.Storage,
+	resolutions []*util.Resolution,
+	repo *MetricRepository,
+) (*Walinator, error) {
+	var limitResolution *util.Resolution
+	for _, resolution := range resolutions {
+		if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
+			limitResolution = resolution
+		}
+	}
+	pathFormatter, err := pathing.NewEventStoragePathFormatter("", clusterID, ControllerEventName)
+	if err != nil {
+		return nil, fmt.Errorf("filed to create path formatter for scrape controller: %s", err.Error())
+	}
+	encoder := exporter.NewJSONEncoder[UpdateSet]()
+	exp := exporter.NewEventStorageExporter(
+		pathFormatter,
+		encoder,
+		store,
+	)
+
+	return &Walinator{
+		storage:         store,
+		paths:           pathFormatter,
+		exporter:        exp,
+		limitResolution: limitResolution,
+		repo:            repo,
+	}, nil
+}
+
+func (w *Walinator) Start() {
+	w.restore()
+
+	// Start cleaning function
+	go func() {
+		time.Sleep(w.limitResolution.Next().Sub(time.Now().UTC()))
+		w.clean()
+	}()
+}
+
+// restore applies updates from wal files to restore the state of the repo
+func (w *Walinator) restore() {
+	fileInfos, err := w.getFileInfos()
+	if err != nil {
+		log.Errorf("failed to retrieve updates files: %s", err.Error())
+	}
+	limit := w.limitResolution.Limit()
+	for _, fi := range fileInfos {
+		if fi.timestamp.Before(limit) {
+			continue
+		}
+
+		b, err := w.storage.Read(fi.name)
+		if err != nil {
+			log.Errorf("failed to load file contents for '%s': %s", fi.name, err.Error())
+			continue
+		}
+		updateSet := UpdateSet{}
+		err = json.Unmarshal(b, &updateSet)
+		if err != nil {
+			log.Errorf("failed to unmarshal file %s: %s", fi.name, err.Error())
+			continue
+		}
+		w.repo.Update(updateSet.Updates, fi.timestamp)
+	}
+}
+
+// Update calls update on the repo and then exports the update to storage
+func (w *Walinator) Update(
+	updates []Update,
+	timestamp time.Time,
+) {
+	// run update
+	w.repo.Update(updates, timestamp)
+
+	err := w.exporter.Export(timestamp, &UpdateSet{
+		Updates: updates,
+	})
+	if err != nil {
+		log.Errorf("failed to export update results: %s", err.Error())
+	}
+}
+
+// getFileInfos returns a sorted slice of fileInfo
+func (w *Walinator) getFileInfos() ([]fileInfo, error) {
+	dirPath := w.paths.Dir()
+	files, err := w.storage.List(dirPath)
+	if err != nil {
+		return nil, fmt.Errorf("failed to list files in scrape controller: %w", err)
+	}
+	var fileInfos []fileInfo
+	for _, file := range files {
+		fileName := path.Base(file.Name)
+		fileNameComponents := strings.Split(fileName, ".")
+		if len(fileNameComponents) != 2 {
+			log.Errorf("file has invalid name: %s", fileName)
+			continue
+		}
+		timeString := fileNameComponents[0]
+		timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
+		if err != nil {
+			log.Errorf("failed to parse fileName %s: %s", fileName, err.Error())
+			continue
+		}
+		ext := fileNameComponents[1]
+		fileInfos = append(fileInfos, fileInfo{
+			name:      w.paths.ToFullPath("", timestamp, ext),
+			timestamp: timestamp,
+			ext:       ext,
+		})
+	}
+	sort.Slice(fileInfos, func(i, j int) bool {
+		return fileInfos[i].timestamp.Before(fileInfos[j].timestamp)
+	})
+	return fileInfos, nil
+}
+
+// clean removes files that are older than the limit resolution from the storage
+func (w *Walinator) clean() {
+	fileInfos, err := w.getFileInfos()
+	if err != nil {
+		log.Errorf("failed to retrieve file info for cleaning: %s", err.Error())
+	}
+	limit := w.limitResolution.Limit()
+	for _, fi := range fileInfos {
+		if !limit.After(fi.timestamp) {
+			continue
+		}
+		err = w.storage.Remove(fi.name)
+		if err != nil {
+			log.Errorf("failed to remove file '%s': %s", fi.name, err.Error())
+		}
+	}
+}

+ 128 - 19
modules/collector-source/pkg/metric/repository_test.go → modules/collector-source/pkg/metric/walinator_test.go

@@ -41,22 +41,75 @@ func testMetricCollector() MetricStore {
 	return memStore
 }
 
-func TestNewMetricRepository_DisasterRecovery(t *testing.T) {
-	time3 := time.Now().UTC().Truncate(timeutil.Day)
-	time2 := time3.Add(-12 * time.Hour)
-	time1 := time3.Add(-timeutil.Day)
+func TestWalinator_Update(t *testing.T) {
+	time2 := time.Now().UTC().Truncate(timeutil.Day)
+	time1 := time2.Add(-timeutil.Day)
 	store := storage.NewMemoryStorage()
+	res1d, _ := util.NewResolution(util.ResolutionConfiguration{
+		Interval:  "1d",
+		Retention: 3,
+	})
+	resolutions := []*util.Resolution{
+		res1d,
+	}
 	repo := NewMetricRepository(
+		resolutions,
+		testMetricCollector,
+	)
+	wal, _ := NewWalinator(
 		"test",
-		[]util.ResolutionConfiguration{
+		store,
+		resolutions,
+		repo,
+	)
+	inputUpdateSet1 := UpdateSet{
+		Updates: []Update{
 			{
-				Interval:  "1d",
-				Retention: 3,
+				Name: TestMetric,
+				Labels: map[string]string{
+					"test": "test",
+				},
+				Value:          1,
+				AdditionalInfo: nil,
 			},
 		},
-		store,
+	}
+
+	wal.Update(inputUpdateSet1.Updates, time1)
+
+	// check that the repo has a collector
+	if len(repo.resolutionStores["1d"].collectors) != 1 {
+		t.Error("call to Update did not update repository correctly")
+	}
+	files, _ := store.List(wal.paths.Dir())
+	// check storage
+	if len(files) != 1 {
+		t.Error("Update did not update storage")
+	}
+}
+
+func TestWalinator_restore(t *testing.T) {
+	time3 := time.Now().UTC().Truncate(timeutil.Day)
+	time2 := time3.Add(-12 * time.Hour)
+	time1 := time3.Add(-timeutil.Day)
+	store := storage.NewMemoryStorage()
+	res1d, _ := util.NewResolution(util.ResolutionConfiguration{
+		Interval:  "1d",
+		Retention: 3,
+	})
+	resolutions := []*util.Resolution{
+		res1d,
+	}
+	repo := NewMetricRepository(
+		resolutions,
 		testMetricCollector,
 	)
+	wal, _ := NewWalinator(
+		"test",
+		store,
+		resolutions,
+		repo,
+	)
 	inputUpdateSet1 := UpdateSet{
 		Updates: []Update{
 			{
@@ -96,22 +149,20 @@ func TestNewMetricRepository_DisasterRecovery(t *testing.T) {
 		},
 	}
 
-	repo.Update(inputUpdateSet1.Updates, time1)
-	repo.Update(inputUpdateSet2.Updates, time2)
-	repo.Update(inputUpdateSet3.Updates, time3)
+	wal.Update(inputUpdateSet1.Updates, time1)
+	wal.Update(inputUpdateSet2.Updates, time2)
+	wal.Update(inputUpdateSet3.Updates, time3)
 
 	repo2 := NewMetricRepository(
-		"test",
-		[]util.ResolutionConfiguration{
-			{
-				Interval:  "1d",
-				Retention: 3,
-			},
-		},
-		store,
+		resolutions,
 		testMetricCollector,
 	)
 
+	// replace the repo in the walinator
+	wal.repo = repo2
+
+	wal.restore()
+
 	collector1, err := repo.GetCollector("1d", time3)
 	if err != nil {
 		t.Fatalf("failed to get collector from repo1: %s", err.Error())
@@ -145,3 +196,61 @@ func TestNewMetricRepository_DisasterRecovery(t *testing.T) {
 		t.Errorf("average query results did not match 1: %v, 2: %v", averageRes1, averageRes2)
 	}
 }
+
+func TestWalinator_clean(t *testing.T) {
+	time3 := time.Now().UTC().Truncate(timeutil.Day)
+	time2 := time3.Add(-timeutil.Day)
+	time1 := time2.Add(-timeutil.Day)
+	store := storage.NewMemoryStorage()
+	res1d, _ := util.NewResolution(util.ResolutionConfiguration{
+		Interval:  "1d",
+		Retention: 2,
+	})
+	resolutions := []*util.Resolution{
+		res1d,
+	}
+	repo := NewMetricRepository(
+		resolutions,
+		testMetricCollector,
+	)
+	wal, _ := NewWalinator(
+		"test",
+		store,
+		resolutions,
+		repo,
+	)
+	inputUpdateSet1 := UpdateSet{
+		Updates: []Update{
+			{
+				Name: TestMetric,
+				Labels: map[string]string{
+					"test": "test",
+				},
+				Value:          1,
+				AdditionalInfo: nil,
+			},
+		},
+	}
+
+	wal.Update(inputUpdateSet1.Updates, time1)
+	wal.Update(inputUpdateSet1.Updates, time2)
+	wal.Update(inputUpdateSet1.Updates, time3)
+
+	files, err := wal.getFileInfos()
+	if err != nil {
+		t.Errorf("failed to retrieve file info: %s", err.Error())
+	}
+	if len(files) != 3 {
+		t.Errorf("incorrect number of files after updates: wanted %d, got %d", 3, len(files))
+	}
+
+	wal.clean()
+
+	files, err = wal.getFileInfos()
+	if err != nil {
+		t.Errorf("failed to retrieve file info: %s", err.Error())
+	}
+	if len(files) != 2 {
+		t.Errorf("incorrect number of files after clean: wanted %d, got %d", 2, len(files))
+	}
+}

+ 4 - 4
modules/collector-source/pkg/scrape/scrapecontroller.go

@@ -17,13 +17,13 @@ type ScrapeController struct {
 	scrapeInterval util.Interval
 	runState       atomic.AtomicRunState
 	scrapers       []Scraper
-	repo           *metric.MetricRepository
+	updater        metric.Updater
 }
 
 func NewScrapeController(
 	scrapeInterval string,
 	networkPort int,
-	repo *metric.MetricRepository,
+	updater metric.Updater,
 	clusterCache clustercache.ClusterCache,
 	statSummaryClient nodestats.StatSummaryClient,
 ) *ScrapeController {
@@ -52,7 +52,7 @@ func NewScrapeController(
 	sc := &ScrapeController{
 		scrapeInterval: si,
 		scrapers:       scrapers,
-		repo:           repo,
+		updater:        updater,
 	}
 
 	return sc
@@ -101,5 +101,5 @@ func (sc *ScrapeController) Scrape(timestamp time.Time) {
 	scrapeResults := concurrentScrape(scrapeFuncs...)
 
 	// once all results are returned run updates all at once with the same timestamp
-	sc.repo.Update(scrapeResults, timestamp)
+	sc.updater.Update(scrapeResults, timestamp)
 }