walinator.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package metric
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "path"
  8. "sort"
  9. "strings"
  10. "time"
  11. "github.com/opencost/opencost/core/pkg/exporter"
  12. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/opencost/opencost/core/pkg/storage"
  15. "github.com/opencost/opencost/core/pkg/util/json"
  16. "github.com/opencost/opencost/core/pkg/util/worker"
  17. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  18. )
  19. const CollectorEventName = "collector"
  20. type fileInfo struct {
  21. name string
  22. timestamp time.Time
  23. ext string
  24. }
  25. type Walinator struct {
  26. storage storage.Storage
  27. paths pathing.StoragePathFormatter[time.Time]
  28. exporter exporter.EventExporter[UpdateSet]
  29. limitResolution *util.Resolution
  30. updater Updater
  31. }
  32. func NewWalinator(
  33. clusterID string,
  34. applicationName string,
  35. store storage.Storage,
  36. resolutions []*util.Resolution,
  37. updater Updater,
  38. ) (*Walinator, error) {
  39. var limitResolution *util.Resolution
  40. for _, resolution := range resolutions {
  41. if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
  42. limitResolution = resolution
  43. }
  44. }
  45. pathFormatter, err := pathing.NewEventStoragePathFormatter(applicationName, clusterID, CollectorEventName)
  46. if err != nil {
  47. return nil, fmt.Errorf("failed to create path formatter for scrape controller: %s", err.Error())
  48. }
  49. encoder := exporter.NewBingenFileEncoder[UpdateSet]()
  50. exp := exporter.NewEventStorageExporter(
  51. pathFormatter,
  52. encoder,
  53. store,
  54. )
  55. return &Walinator{
  56. storage: store,
  57. paths: pathFormatter,
  58. exporter: exp,
  59. limitResolution: limitResolution,
  60. updater: updater,
  61. }, nil
  62. }
  63. func (w *Walinator) Start() {
  64. w.clean()
  65. w.restore()
  66. // Start cleaning function
  67. go func() {
  68. for {
  69. time.Sleep(w.limitResolution.Next().Sub(time.Now().UTC()))
  70. w.clean()
  71. }
  72. }()
  73. }
  74. // restore applies updates from wal files to restore the state of the previous updater(repo)
  75. func (w *Walinator) restore() {
  76. fileInfos, err := w.getFileInfos()
  77. if err != nil {
  78. log.Errorf("failed to retrieve updates files: %s", err.Error())
  79. }
  80. limit := w.limitResolution.Limit()
  81. workerFn := func(fi fileInfo) *UpdateSet {
  82. if fi.timestamp.Before(limit) {
  83. return nil
  84. }
  85. b, err := w.storage.Read(fi.name)
  86. if err != nil {
  87. log.Errorf("failed to load file contents for '%s': %s", fi.name, err.Error())
  88. return nil
  89. }
  90. updateSet, err := deserializeUpdateSet(fi.ext, b)
  91. if err != nil {
  92. log.Errorf("failed to deserialize file contents for '%s': %s", fi.name, err.Error())
  93. return nil
  94. }
  95. if updateSet.Timestamp.IsZero() {
  96. updateSet.Timestamp = fi.timestamp
  97. }
  98. return updateSet
  99. }
  100. processFn := func(updateSet *UpdateSet) {
  101. w.updater.Update(updateSet)
  102. }
  103. worker.ConcurrentOrderedProcessWith(worker.OptimalWorkerCount(), workerFn, fileInfos, processFn)
  104. }
  105. func deserializeUpdateSet(ext string, b []byte) (*UpdateSet, error) {
  106. extSplit := strings.Split(ext, ".")
  107. lastElem := extSplit[len(extSplit)-1]
  108. switch lastElem {
  109. case "json":
  110. updateSet := &UpdateSet{}
  111. err := json.Unmarshal(b, updateSet)
  112. if err != nil {
  113. return nil, fmt.Errorf("failed to unmarshal json: %w", err)
  114. }
  115. return updateSet, nil
  116. case "gz":
  117. buf := bytes.NewBuffer(b)
  118. reader, err := gzip.NewReader(buf)
  119. if err != nil {
  120. return nil, fmt.Errorf("failed to decompress gzip: %w", err)
  121. }
  122. defer reader.Close()
  123. decompressed, err := io.ReadAll(reader)
  124. if err != nil {
  125. return nil, fmt.Errorf("failed to read decompressed gzip: %w", err)
  126. }
  127. return deserializeUpdateSet(strings.TrimSuffix(ext, ".gz"), decompressed)
  128. case "bingen":
  129. updateSet := new(UpdateSet)
  130. err := updateSet.UnmarshalBinary(b)
  131. if err != nil {
  132. return nil, fmt.Errorf("failed to unmarshal bingen: %w", err)
  133. }
  134. return updateSet, nil
  135. }
  136. return nil, fmt.Errorf("unrecognized extension: '%s'", ext)
  137. }
  138. // Update calls update on the previous updater(repo) and then exports the update to storage
  139. func (w *Walinator) Update(
  140. updateSet *UpdateSet,
  141. ) {
  142. if updateSet == nil {
  143. return
  144. }
  145. // run update
  146. w.updater.Update(updateSet)
  147. err := w.exporter.Export(updateSet.Timestamp, updateSet)
  148. if err != nil {
  149. log.Errorf("failed to export update results: %s", err.Error())
  150. }
  151. }
  152. // getFileInfos returns a sorted slice of fileInfo
  153. func (w *Walinator) getFileInfos() ([]fileInfo, error) {
  154. dirPath := w.paths.Dir()
  155. files, err := w.storage.List(dirPath)
  156. if err != nil {
  157. return nil, fmt.Errorf("failed to list files in scrape controller: %w", err)
  158. }
  159. var fileInfos []fileInfo
  160. for _, file := range files {
  161. fileName := path.Base(file.Name)
  162. fileNameComponents := strings.SplitN(fileName, ".", 2)
  163. if len(fileNameComponents) != 2 {
  164. log.Errorf("file has invalid name: %s", fileName)
  165. continue
  166. }
  167. timeString := fileNameComponents[0]
  168. timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
  169. if err != nil {
  170. log.Errorf("failed to parse fileName %s: %s", fileName, err.Error())
  171. continue
  172. }
  173. ext := fileNameComponents[1]
  174. fileInfos = append(fileInfos, fileInfo{
  175. name: w.paths.ToFullPath("", timestamp, ext),
  176. timestamp: timestamp,
  177. ext: ext,
  178. })
  179. }
  180. sort.Slice(fileInfos, func(i, j int) bool {
  181. return fileInfos[i].timestamp.Before(fileInfos[j].timestamp)
  182. })
  183. return fileInfos, nil
  184. }
  185. // clean removes files that are older than the limit resolution from the storage
  186. func (w *Walinator) clean() {
  187. fileInfos, err := w.getFileInfos()
  188. if err != nil {
  189. log.Errorf("failed to retrieve file info for cleaning: %s", err.Error())
  190. }
  191. limit := w.limitResolution.Limit()
  192. for _, fi := range fileInfos {
  193. if !limit.After(fi.timestamp) {
  194. continue
  195. }
  196. err = w.storage.Remove(fi.name)
  197. if err != nil {
  198. log.Errorf("failed to remove file '%s': %s", fi.name, err.Error())
  199. }
  200. }
  201. }