scrapecontroller.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package scrape
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/clustercache"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/nodestats"
  8. "github.com/opencost/opencost/core/pkg/util/atomic"
  9. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  10. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  11. )
  12. // ScrapeController initializes and holds the scrapers in addition to running the loop that triggers scrapes
  13. type ScrapeController struct {
  14. scrapeInterval util.Interval
  15. runState atomic.AtomicRunState
  16. scrapers []Scraper
  17. updater metric.Updater
  18. }
  19. func NewScrapeController(
  20. scrapeInterval string,
  21. networkPort int,
  22. updater metric.Updater,
  23. clusterCache clustercache.ClusterCache,
  24. statSummaryClient nodestats.StatSummaryClient,
  25. ) *ScrapeController {
  26. var scrapers []Scraper
  27. clusterCacheScraper := newClusterCacheScraper(clusterCache)
  28. scrapers = append(scrapers, clusterCacheScraper)
  29. opencostScraper := newOpenCostScraper()
  30. scrapers = append(scrapers, opencostScraper)
  31. statSummaryScraper := newStatSummaryScraper(statSummaryClient)
  32. scrapers = append(scrapers, statSummaryScraper)
  33. networkScraper := newNetworkScraper(networkPort, clusterCache)
  34. scrapers = append(scrapers, networkScraper)
  35. dcgmScraper := newDCGMScrapper(clusterCache)
  36. scrapers = append(scrapers, dcgmScraper)
  37. si, err := util.NewInterval(scrapeInterval)
  38. if err != nil {
  39. panic(fmt.Errorf("scrapecontroller failed to create scrape interval: %w", err))
  40. }
  41. sc := &ScrapeController{
  42. scrapeInterval: si,
  43. scrapers: scrapers,
  44. updater: updater,
  45. }
  46. return sc
  47. }
  48. func (sc *ScrapeController) Start() {
  49. // Before we attempt to start, we must ensure we are not in a stopping state
  50. sc.runState.WaitForReset()
  51. // This will atomically check the current state to ensure we can run, then advances the state.
  52. // If the state is already started, it will return false.
  53. if !sc.runState.Start() {
  54. log.Info("metric already running")
  55. return
  56. }
  57. go func() {
  58. nextScrape := time.Now().UTC()
  59. timer := time.NewTimer(time.Duration(0))
  60. for {
  61. select {
  62. case <-sc.runState.OnStop():
  63. sc.runState.Reset()
  64. timer.Stop()
  65. return // exit go routine
  66. case <-timer.C:
  67. sc.Scrape(nextScrape)
  68. nextScrape = sc.scrapeInterval.Add(sc.scrapeInterval.Truncate(time.Now().UTC()), 1)
  69. timer.Reset(time.Until(nextScrape))
  70. }
  71. }
  72. }()
  73. }
  74. func (sc *ScrapeController) Stop() {
  75. sc.runState.Stop()
  76. }
  77. func (sc *ScrapeController) Scrape(timestamp time.Time) {
  78. // Run scrapes concurrently to minimize time from call to data collection
  79. var scrapeFuncs []ScrapeFunc
  80. for i := range sc.scrapers {
  81. scraper := sc.scrapers[i]
  82. scrapeFuncs = append(scrapeFuncs, scraper.Scrape)
  83. }
  84. scrapeResults := concurrentScrape(scrapeFuncs...)
  85. // once all results are returned run updates all at once with the same timestamp
  86. sc.updater.Update(&metric.UpdateSet{
  87. Timestamp: timestamp,
  88. Updates: scrapeResults,
  89. })
  90. }