repository.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. package metric
  2. import (
  3. "fmt"
  4. "path"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/opencost/opencost/core/pkg/exporter"
  10. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/storage"
  13. "github.com/opencost/opencost/core/pkg/util/json"
  14. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  15. )
  16. const ControllerEventName = "controller"
  17. type RepositoryConfig struct {
  18. }
  19. // MetricRepository is an MetricUpdater which applies calls to update to all resolutions being tracked. It holds the
  20. // MetricStore instances for each resolution.
  21. type MetricRepository struct {
  22. lock sync.Mutex
  23. resolutionStores map[string]*resolutionStores
  24. exporter exporter.EventExporter[UpdateSet]
  25. }
  26. func NewMetricRepository(
  27. clusterID string,
  28. resolutions []util.ResolutionConfiguration,
  29. store storage.Storage,
  30. storeFactory MetricStoreFactory,
  31. ) *MetricRepository {
  32. resoluationCollectors := make(map[string]*resolutionStores)
  33. for _, resconf := range resolutions {
  34. resolution, err := util.NewResolution(resconf)
  35. if err != nil {
  36. log.Errorf("failed to create resolution %s", err.Error())
  37. }
  38. resCollector, err := newResolutionStores(resolution, storeFactory)
  39. if err != nil {
  40. log.Errorf("NewMetricRepository: failed to init resolution metric: %s", err.Error())
  41. continue
  42. }
  43. resoluationCollectors[resolution.Interval()] = resCollector
  44. }
  45. repo := &MetricRepository{
  46. resolutionStores: resoluationCollectors,
  47. }
  48. if store != nil {
  49. pathFormatter, err := pathing.NewEventStoragePathFormatter("", clusterID, ControllerEventName)
  50. if err != nil {
  51. log.Errorf("filed to create path formatter for scrape controller: %s", err.Error())
  52. return repo
  53. }
  54. encoder := exporter.NewJSONEncoder[UpdateSet]()
  55. repo.exporter = exporter.NewEventStorageExporter(
  56. pathFormatter,
  57. encoder,
  58. store,
  59. )
  60. // attempt to restore state from files
  61. // get path of saved files
  62. dirPath := path.Dir(pathFormatter.ToFullPath("", time.Time{}, ""))
  63. files, err := store.List(dirPath)
  64. if err != nil {
  65. log.Errorf("failed to list files in scrape controller: %s", err.Error())
  66. }
  67. // find oldest limit
  68. limit := time.Now().UTC()
  69. for _, resStore := range repo.resolutionStores {
  70. if limit.After(resStore.resolution.Limit()) {
  71. limit = resStore.resolution.Limit()
  72. }
  73. }
  74. // find files that are within limit
  75. var filesToRun []string
  76. for _, file := range files {
  77. fileName := path.Base(file.Name)
  78. timeString := strings.TrimSuffix(fileName, "."+encoder.FileExt())
  79. timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
  80. if err != nil {
  81. log.Errorf("failed to parse fileName %s: %s", fileName, err.Error())
  82. continue
  83. }
  84. if timestamp.After(limit) {
  85. filesToRun = append(filesToRun, pathFormatter.ToFullPath("", timestamp, encoder.FileExt()))
  86. }
  87. }
  88. // sort files
  89. sort.Strings(filesToRun)
  90. // open files and run updates
  91. for _, fileName := range filesToRun {
  92. b, err := store.Read(fileName)
  93. if err != nil {
  94. log.Errorf("failed to load file contents for '%s': %s", fileName, err.Error())
  95. continue
  96. }
  97. updateSet := UpdateSet{}
  98. err = json.Unmarshal(b, &updateSet)
  99. if err != nil {
  100. log.Errorf("failed to unmarshal file %s: %s", fileName, err.Error())
  101. continue
  102. }
  103. filePrefix := path.Base(fileName)
  104. timeString := strings.TrimSuffix(filePrefix, "."+encoder.FileExt())
  105. timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
  106. repo.Update(updateSet.Updates, timestamp)
  107. }
  108. }
  109. return repo
  110. }
  111. func (r *MetricRepository) GetCollector(interval string, t time.Time) (MetricStore, error) {
  112. r.lock.Lock()
  113. defer r.lock.Unlock()
  114. resCollector, ok := r.resolutionStores[interval]
  115. if !ok {
  116. return nil, fmt.Errorf("failed to find resolution for key %s", interval)
  117. }
  118. return resCollector.getCollector(t)
  119. }
  120. // Update calls Update on the collectors for each resolution
  121. func (r *MetricRepository) Update(
  122. updates []Update,
  123. timestamp time.Time,
  124. ) {
  125. r.lock.Lock()
  126. defer r.lock.Unlock()
  127. for _, update := range updates {
  128. // Call update on the collectors for each resolution
  129. for _, resCollector := range r.resolutionStores {
  130. resCollector.update(update.Name, update.Labels, update.Value, timestamp, update.AdditionalInfo)
  131. }
  132. }
  133. if r.exporter != nil {
  134. err := r.exporter.Export(timestamp, &UpdateSet{
  135. Updates: updates,
  136. })
  137. if err != nil {
  138. log.Errorf("failed to export update results: %s", err.Error())
  139. }
  140. }
  141. }
  142. type UpdateSet struct {
  143. Updates []Update `json:"updates"`
  144. }
  145. type Update struct {
  146. Name string `json:"name"`
  147. Labels map[string]string `json:"labels"`
  148. Value float64 `json:"value"`
  149. AdditionalInfo map[string]string `json:"additionalInfo"`
  150. }
  151. func (r *MetricRepository) Coverage() map[string][]time.Time {
  152. r.lock.Lock()
  153. defer r.lock.Unlock()
  154. result := make(map[string][]time.Time)
  155. for resKey, resCollector := range r.resolutionStores {
  156. var windowStarts []time.Time
  157. for _, key := range resCollector.getKeys() {
  158. windowStarts = append(windowStarts, time.Unix(key, 0).UTC())
  159. }
  160. result[resKey] = windowStarts
  161. }
  162. return result
  163. }
  164. // resolutionStores is a grouping of a resolution and the instances of MetricStore that it is used to manage
  165. type resolutionStores struct {
  166. lock sync.Mutex
  167. resolution *util.Resolution
  168. collectors map[int64]MetricStore
  169. factory func() MetricStore
  170. }
  171. func newResolutionStores(resolution *util.Resolution, factory MetricStoreFactory) (*resolutionStores, error) {
  172. resCol := &resolutionStores{
  173. resolution: resolution,
  174. collectors: map[int64]MetricStore{},
  175. factory: factory,
  176. }
  177. // Start loop which will remove expired MetricStore
  178. go func() {
  179. for {
  180. time.Sleep(resCol.resolution.Next().Sub(time.Now().UTC()))
  181. resCol.clean()
  182. }
  183. }()
  184. return resCol, nil
  185. }
  186. func (r *resolutionStores) clean() {
  187. r.lock.Lock()
  188. defer r.lock.Unlock()
  189. limitKey := r.resolution.Limit().UnixMilli()
  190. for key := range r.collectors {
  191. if key < limitKey {
  192. delete(r.collectors, key)
  193. }
  194. }
  195. }
  196. func (r *resolutionStores) update(
  197. metricName string,
  198. labels map[string]string,
  199. value float64,
  200. timestamp time.Time,
  201. additionalInformation map[string]string,
  202. ) {
  203. r.lock.Lock()
  204. defer r.lock.Unlock()
  205. limit := r.resolution.Limit()
  206. if timestamp.Before(limit) {
  207. log.Debugf(
  208. "failed to call update on resolution '%s' because Timestamp '%s' is before the limit '%s",
  209. r.resolution.Interval(),
  210. timestamp.Format(time.RFC3339),
  211. limit.Format(time.RFC3339),
  212. )
  213. return
  214. }
  215. key := r.resolution.Get(timestamp).UnixMilli()
  216. collector, ok := r.collectors[key]
  217. if !ok {
  218. collector = r.factory()
  219. r.collectors[key] = collector
  220. }
  221. collector.Update(metricName, labels, value, timestamp, additionalInformation)
  222. }
  223. func (r *resolutionStores) getCollector(t time.Time) (MetricStore, error) {
  224. r.lock.Lock()
  225. defer r.lock.Unlock()
  226. if t.Before(r.resolution.Limit()) {
  227. return nil, fmt.Errorf(
  228. "request for metric at time '%s' for resolution '%s' is past limit of '%s'",
  229. t.Format(time.RFC3339),
  230. r.resolution.Interval(),
  231. r.resolution.Limit().Format(time.RFC3339),
  232. )
  233. }
  234. key := r.resolution.Get(t).UnixMilli()
  235. collector, ok := r.collectors[key]
  236. if !ok {
  237. return nil, fmt.Errorf("failed to find MetricCollector for interval '%s' for time '%s'", r.resolution.Interval(), t.Format(time.RFC3339))
  238. }
  239. return collector, nil
  240. }
  241. func (r *resolutionStores) getKeys() []int64 {
  242. r.lock.Lock()
  243. defer r.lock.Unlock()
  244. var keys []int64
  245. for key := range r.collectors {
  246. keys = append(keys, key)
  247. }
  248. return keys
  249. }