| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- package metric
- import (
- "bytes"
- "compress/gzip"
- "fmt"
- "io"
- "path"
- "sort"
- "strings"
- "time"
- "github.com/opencost/opencost/core/pkg/exporter"
- "github.com/opencost/opencost/core/pkg/exporter/pathing"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/storage"
- "github.com/opencost/opencost/core/pkg/util/json"
- "github.com/opencost/opencost/core/pkg/util/worker"
- "github.com/opencost/opencost/modules/collector-source/pkg/util"
- )
- const CollectorEventName = "collector"
- type fileInfo struct {
- name string
- timestamp time.Time
- ext string
- }
- type Walinator struct {
- storage storage.Storage
- paths pathing.StoragePathFormatter[time.Time]
- exporter exporter.EventExporter[UpdateSet]
- limitResolution *util.Resolution
- updater Updater
- }
- func NewWalinator(
- clusterID string,
- applicationName string,
- store storage.Storage,
- resolutions []*util.Resolution,
- updater Updater,
- ) (*Walinator, error) {
- var limitResolution *util.Resolution
- for _, resolution := range resolutions {
- if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
- limitResolution = resolution
- }
- }
- pathFormatter, err := pathing.NewEventStoragePathFormatter(applicationName, clusterID, CollectorEventName)
- if err != nil {
- return nil, fmt.Errorf("failed to create path formatter for scrape controller: %s", err.Error())
- }
- encoder := exporter.NewBingenFileEncoder[UpdateSet]()
- exp := exporter.NewEventStorageExporter(
- pathFormatter,
- encoder,
- store,
- )
- return &Walinator{
- storage: store,
- paths: pathFormatter,
- exporter: exp,
- limitResolution: limitResolution,
- updater: updater,
- }, nil
- }
- func (w *Walinator) Start() {
- w.clean()
- w.restore()
- // Start cleaning function
- go func() {
- for {
- time.Sleep(w.limitResolution.Next().Sub(time.Now().UTC()))
- w.clean()
- }
- }()
- }
- // restore applies updates from wal files to restore the state of the previous updater(repo)
- func (w *Walinator) restore() {
- fileInfos, err := w.getFileInfos()
- if err != nil {
- log.Errorf("failed to retrieve updates files: %s", err.Error())
- }
- limit := w.limitResolution.Limit()
- workerFn := func(fi fileInfo) *UpdateSet {
- if fi.timestamp.Before(limit) {
- return nil
- }
- b, err := w.storage.Read(fi.name)
- if err != nil {
- log.Errorf("failed to load file contents for '%s': %s", fi.name, err.Error())
- return nil
- }
- updateSet, err := deserializeUpdateSet(fi.ext, b)
- if err != nil {
- log.Errorf("failed to deserialize file contents for '%s': %s", fi.name, err.Error())
- return nil
- }
- if updateSet.Timestamp.IsZero() {
- updateSet.Timestamp = fi.timestamp
- }
- return updateSet
- }
- processFn := func(updateSet *UpdateSet) {
- w.updater.Update(updateSet)
- }
- worker.ConcurrentOrderedProcessWith(worker.OptimalWorkerCount(), workerFn, fileInfos, processFn)
- }
- func deserializeUpdateSet(ext string, b []byte) (*UpdateSet, error) {
- extSplit := strings.Split(ext, ".")
- lastElem := extSplit[len(extSplit)-1]
- switch lastElem {
- case "json":
- updateSet := &UpdateSet{}
- err := json.Unmarshal(b, updateSet)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal json: %w", err)
- }
- return updateSet, nil
- case "gz":
- buf := bytes.NewBuffer(b)
- reader, err := gzip.NewReader(buf)
- if err != nil {
- return nil, fmt.Errorf("failed to decompress gzip: %w", err)
- }
- defer reader.Close()
- decompressed, err := io.ReadAll(reader)
- if err != nil {
- return nil, fmt.Errorf("failed to read decompressed gzip: %w", err)
- }
- return deserializeUpdateSet(strings.TrimSuffix(ext, ".gz"), decompressed)
- case "bingen":
- updateSet := new(UpdateSet)
- err := updateSet.UnmarshalBinary(b)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal bingen: %w", err)
- }
- return updateSet, nil
- }
- return nil, fmt.Errorf("unrecognized extension: '%s'", ext)
- }
- // Update calls update on the previous updater(repo) and then exports the update to storage
- func (w *Walinator) Update(
- updateSet *UpdateSet,
- ) {
- if updateSet == nil {
- return
- }
- // run update
- w.updater.Update(updateSet)
- err := w.exporter.Export(updateSet.Timestamp, updateSet)
- if err != nil {
- log.Errorf("failed to export update results: %s", err.Error())
- }
- }
- // getFileInfos returns a sorted slice of fileInfo
- func (w *Walinator) getFileInfos() ([]fileInfo, error) {
- dirPath := w.paths.Dir()
- files, err := w.storage.List(dirPath)
- if err != nil {
- return nil, fmt.Errorf("failed to list files in scrape controller: %w", err)
- }
- var fileInfos []fileInfo
- for _, file := range files {
- fileName := path.Base(file.Name)
- fileNameComponents := strings.SplitN(fileName, ".", 2)
- if len(fileNameComponents) != 2 {
- log.Errorf("file has invalid name: %s", fileName)
- continue
- }
- timeString := fileNameComponents[0]
- timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
- if err != nil {
- log.Errorf("failed to parse fileName %s: %s", fileName, err.Error())
- continue
- }
- ext := fileNameComponents[1]
- fileInfos = append(fileInfos, fileInfo{
- name: w.paths.ToFullPath("", timestamp, ext),
- timestamp: timestamp,
- ext: ext,
- })
- }
- sort.Slice(fileInfos, func(i, j int) bool {
- return fileInfos[i].timestamp.Before(fileInfos[j].timestamp)
- })
- return fileInfos, nil
- }
- // clean removes files that are older than the limit resolution from the storage
- func (w *Walinator) clean() {
- fileInfos, err := w.getFileInfos()
- if err != nil {
- log.Errorf("failed to retrieve file info for cleaning: %s", err.Error())
- }
- limit := w.limitResolution.Limit()
- for _, fi := range fileInfos {
- if !limit.After(fi.timestamp) {
- continue
- }
- err = w.storage.Remove(fi.name)
- if err != nil {
- log.Errorf("failed to remove file '%s': %s", fi.name, err.Error())
- }
- }
- }
|