walinator.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package metric
  2. import (
  3. "fmt"
  4. "path"
  5. "sort"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/exporter"
  9. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/storage"
  12. "github.com/opencost/opencost/core/pkg/util/json"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  14. )
  15. const ControllerEventName = "controller"
  16. type fileInfo struct {
  17. name string
  18. timestamp time.Time
  19. ext string
  20. }
  21. type Walinator struct {
  22. storage storage.Storage
  23. paths pathing.StoragePathFormatter[time.Time]
  24. exporter exporter.EventExporter[UpdateSet]
  25. limitResolution *util.Resolution
  26. repo *MetricRepository
  27. }
  28. func NewWalinator(
  29. clusterID string,
  30. store storage.Storage,
  31. resolutions []*util.Resolution,
  32. repo *MetricRepository,
  33. ) (*Walinator, error) {
  34. var limitResolution *util.Resolution
  35. for _, resolution := range resolutions {
  36. if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
  37. limitResolution = resolution
  38. }
  39. }
  40. pathFormatter, err := pathing.NewEventStoragePathFormatter("", clusterID, ControllerEventName)
  41. if err != nil {
  42. return nil, fmt.Errorf("filed to create path formatter for scrape controller: %s", err.Error())
  43. }
  44. encoder := exporter.NewJSONEncoder[UpdateSet]()
  45. exp := exporter.NewEventStorageExporter(
  46. pathFormatter,
  47. encoder,
  48. store,
  49. )
  50. return &Walinator{
  51. storage: store,
  52. paths: pathFormatter,
  53. exporter: exp,
  54. limitResolution: limitResolution,
  55. repo: repo,
  56. }, nil
  57. }
  58. func (w *Walinator) Start() {
  59. w.restore()
  60. // Start cleaning function
  61. go func() {
  62. time.Sleep(w.limitResolution.Next().Sub(time.Now().UTC()))
  63. w.clean()
  64. }()
  65. }
  66. // restore applies updates from wal files to restore the state of the repo
  67. func (w *Walinator) restore() {
  68. fileInfos, err := w.getFileInfos()
  69. if err != nil {
  70. log.Errorf("failed to retrieve updates files: %s", err.Error())
  71. }
  72. limit := w.limitResolution.Limit()
  73. for _, fi := range fileInfos {
  74. if fi.timestamp.Before(limit) {
  75. continue
  76. }
  77. b, err := w.storage.Read(fi.name)
  78. if err != nil {
  79. log.Errorf("failed to load file contents for '%s': %s", fi.name, err.Error())
  80. continue
  81. }
  82. updateSet := UpdateSet{}
  83. err = json.Unmarshal(b, &updateSet)
  84. if err != nil {
  85. log.Errorf("failed to unmarshal file %s: %s", fi.name, err.Error())
  86. continue
  87. }
  88. w.repo.Update(updateSet.Updates, fi.timestamp)
  89. }
  90. }
  91. // Update calls update on the repo and then exports the update to storage
  92. func (w *Walinator) Update(
  93. updates []Update,
  94. timestamp time.Time,
  95. ) {
  96. // run update
  97. w.repo.Update(updates, timestamp)
  98. err := w.exporter.Export(timestamp, &UpdateSet{
  99. Updates: updates,
  100. })
  101. if err != nil {
  102. log.Errorf("failed to export update results: %s", err.Error())
  103. }
  104. }
  105. // getFileInfos returns a sorted slice of fileInfo
  106. func (w *Walinator) getFileInfos() ([]fileInfo, error) {
  107. dirPath := w.paths.Dir()
  108. files, err := w.storage.List(dirPath)
  109. if err != nil {
  110. return nil, fmt.Errorf("failed to list files in scrape controller: %w", err)
  111. }
  112. var fileInfos []fileInfo
  113. for _, file := range files {
  114. fileName := path.Base(file.Name)
  115. fileNameComponents := strings.Split(fileName, ".")
  116. if len(fileNameComponents) != 2 {
  117. log.Errorf("file has invalid name: %s", fileName)
  118. continue
  119. }
  120. timeString := fileNameComponents[0]
  121. timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
  122. if err != nil {
  123. log.Errorf("failed to parse fileName %s: %s", fileName, err.Error())
  124. continue
  125. }
  126. ext := fileNameComponents[1]
  127. fileInfos = append(fileInfos, fileInfo{
  128. name: w.paths.ToFullPath("", timestamp, ext),
  129. timestamp: timestamp,
  130. ext: ext,
  131. })
  132. }
  133. sort.Slice(fileInfos, func(i, j int) bool {
  134. return fileInfos[i].timestamp.Before(fileInfos[j].timestamp)
  135. })
  136. return fileInfos, nil
  137. }
  138. // clean removes files that are older than the limit resolution from the storage
  139. func (w *Walinator) clean() {
  140. fileInfos, err := w.getFileInfos()
  141. if err != nil {
  142. log.Errorf("failed to retrieve file info for cleaning: %s", err.Error())
  143. }
  144. limit := w.limitResolution.Limit()
  145. for _, fi := range fileInfos {
  146. if !limit.After(fi.timestamp) {
  147. continue
  148. }
  149. err = w.storage.Remove(fi.name)
  150. if err != nil {
  151. log.Errorf("failed to remove file '%s': %s", fi.name, err.Error())
  152. }
  153. }
  154. }