walinator.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package metric
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "path"
  8. "sort"
  9. "strings"
  10. "time"
  11. "github.com/opencost/opencost/core/pkg/exporter"
  12. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/opencost/opencost/core/pkg/storage"
  15. "github.com/opencost/opencost/core/pkg/util/json"
  16. "github.com/opencost/opencost/core/pkg/util/worker"
  17. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  18. )
  19. const ControllerEventName = "controller"
  20. type fileInfo struct {
  21. name string
  22. timestamp time.Time
  23. ext string
  24. }
  25. type Walinator struct {
  26. storage storage.Storage
  27. paths pathing.StoragePathFormatter[time.Time]
  28. exporter exporter.EventExporter[UpdateSet]
  29. limitResolution *util.Resolution
  30. updater Updater
  31. }
  32. func NewWalinator(
  33. clusterID string,
  34. store storage.Storage,
  35. resolutions []*util.Resolution,
  36. updater Updater,
  37. ) (*Walinator, error) {
  38. var limitResolution *util.Resolution
  39. for _, resolution := range resolutions {
  40. if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
  41. limitResolution = resolution
  42. }
  43. }
  44. pathFormatter, err := pathing.NewEventStoragePathFormatter("", clusterID, ControllerEventName)
  45. if err != nil {
  46. return nil, fmt.Errorf("filed to create path formatter for scrape controller: %s", err.Error())
  47. }
  48. encoder := exporter.NewGZipEncoder(exporter.NewJSONEncoder[UpdateSet]())
  49. exp := exporter.NewEventStorageExporter(
  50. pathFormatter,
  51. encoder,
  52. store,
  53. )
  54. return &Walinator{
  55. storage: store,
  56. paths: pathFormatter,
  57. exporter: exp,
  58. limitResolution: limitResolution,
  59. updater: updater,
  60. }, nil
  61. }
  62. func (w *Walinator) Start() {
  63. w.clean()
  64. w.restore()
  65. // Start cleaning function
  66. go func() {
  67. for {
  68. time.Sleep(w.limitResolution.Next().Sub(time.Now().UTC()))
  69. w.clean()
  70. }
  71. }()
  72. }
  73. // restore applies updates from wal files to restore the state of the previous updater(repo)
  74. func (w *Walinator) restore() {
  75. fileInfos, err := w.getFileInfos()
  76. if err != nil {
  77. log.Errorf("failed to retrieve updates files: %s", err.Error())
  78. }
  79. limit := w.limitResolution.Limit()
  80. workerFn := func(fi fileInfo) *UpdateSet {
  81. if fi.timestamp.Before(limit) {
  82. return nil
  83. }
  84. b, err := w.storage.Read(fi.name)
  85. if err != nil {
  86. log.Errorf("failed to load file contents for '%s': %s", fi.name, err.Error())
  87. return nil
  88. }
  89. updateSet, err := deserializeUpdateSet(fi.ext, b)
  90. if err != nil {
  91. log.Errorf("failed to deserialize file contents for '%s': %s", fi.name, err.Error())
  92. return nil
  93. }
  94. if updateSet.Timestamp.IsZero() {
  95. updateSet.Timestamp = fi.timestamp
  96. }
  97. return updateSet
  98. }
  99. processFn := func(updateSet *UpdateSet) {
  100. w.updater.Update(updateSet)
  101. }
  102. worker.ConcurrentOrderedProcessWith(worker.OptimalWorkerCount(), workerFn, fileInfos, processFn)
  103. }
  104. func deserializeUpdateSet(ext string, b []byte) (*UpdateSet, error) {
  105. extSplit := strings.Split(ext, ".")
  106. lastElem := extSplit[len(extSplit)-1]
  107. switch lastElem {
  108. case "json":
  109. updateSet := &UpdateSet{}
  110. err := json.Unmarshal(b, updateSet)
  111. if err != nil {
  112. return nil, fmt.Errorf("failed to unmarshal json: %w", err)
  113. }
  114. return updateSet, nil
  115. case "gz":
  116. buf := bytes.NewBuffer(b)
  117. reader, err := gzip.NewReader(buf)
  118. if err != nil {
  119. return nil, fmt.Errorf("failed to decompress gzip: %w", err)
  120. }
  121. defer reader.Close()
  122. decompressed, err := io.ReadAll(reader)
  123. if err != nil {
  124. return nil, fmt.Errorf("failed to read decompressed gzip: %w", err)
  125. }
  126. return deserializeUpdateSet(strings.TrimSuffix(ext, ".gz"), decompressed)
  127. }
  128. return nil, fmt.Errorf("unrecognized extension: '%s'", ext)
  129. }
  130. // Update calls update on the previous updater(repo) and then exports the update to storage
  131. func (w *Walinator) Update(
  132. updateSet *UpdateSet,
  133. ) {
  134. if updateSet == nil {
  135. return
  136. }
  137. // run update
  138. w.updater.Update(updateSet)
  139. err := w.exporter.Export(updateSet.Timestamp, updateSet)
  140. if err != nil {
  141. log.Errorf("failed to export update results: %s", err.Error())
  142. }
  143. }
  144. // getFileInfos returns a sorted slice of fileInfo
  145. func (w *Walinator) getFileInfos() ([]fileInfo, error) {
  146. dirPath := w.paths.Dir()
  147. files, err := w.storage.List(dirPath)
  148. if err != nil {
  149. return nil, fmt.Errorf("failed to list files in scrape controller: %w", err)
  150. }
  151. var fileInfos []fileInfo
  152. for _, file := range files {
  153. fileName := path.Base(file.Name)
  154. fileNameComponents := strings.SplitN(fileName, ".", 2)
  155. if len(fileNameComponents) != 2 {
  156. log.Errorf("file has invalid name: %s", fileName)
  157. continue
  158. }
  159. timeString := fileNameComponents[0]
  160. timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
  161. if err != nil {
  162. log.Errorf("failed to parse fileName %s: %s", fileName, err.Error())
  163. continue
  164. }
  165. ext := fileNameComponents[1]
  166. fileInfos = append(fileInfos, fileInfo{
  167. name: w.paths.ToFullPath("", timestamp, ext),
  168. timestamp: timestamp,
  169. ext: ext,
  170. })
  171. }
  172. sort.Slice(fileInfos, func(i, j int) bool {
  173. return fileInfos[i].timestamp.Before(fileInfos[j].timestamp)
  174. })
  175. return fileInfos, nil
  176. }
  177. // clean removes files that are older than the limit resolution from the storage
  178. func (w *Walinator) clean() {
  179. fileInfos, err := w.getFileInfos()
  180. if err != nil {
  181. log.Errorf("failed to retrieve file info for cleaning: %s", err.Error())
  182. }
  183. limit := w.limitResolution.Limit()
  184. for _, fi := range fileInfos {
  185. if !limit.After(fi.timestamp) {
  186. continue
  187. }
  188. err = w.storage.Remove(fi.name)
  189. if err != nil {
  190. log.Errorf("failed to remove file '%s': %s", fi.name, err.Error())
  191. }
  192. }
  193. }