scrapecontroller.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package scrape
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/clustercache"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/util/atomic"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  9. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  10. )
  11. // ScrapeController initializes and holds the scrapers in addition to running the loop that triggers scrapes
  12. type ScrapeController struct {
  13. scrapeInterval util.Interval
  14. runState atomic.AtomicRunState
  15. scrapers []Scraper
  16. repo *metric.MetricRepository
  17. }
  18. func NewScrapeController(
  19. scrapeInterval string,
  20. networkPort int,
  21. repo *metric.MetricRepository,
  22. clusterCache clustercache.ClusterCache,
  23. statSummaryClient util.StatSummaryClient,
  24. ) *ScrapeController {
  25. var scrapers []Scraper
  26. clusterCacheScraper := newClusterCacheScraper(clusterCache)
  27. scrapers = append(scrapers, clusterCacheScraper)
  28. opencostScraper := newOpenCostScraper()
  29. scrapers = append(scrapers, opencostScraper)
  30. statSummaryScraper := newStatSummaryScraper(statSummaryClient)
  31. scrapers = append(scrapers, statSummaryScraper)
  32. networkScraper := newNetworkScraper(networkPort, clusterCache)
  33. scrapers = append(scrapers, networkScraper)
  34. dcgmScraper := newDCGMScrapper(clusterCache)
  35. scrapers = append(scrapers, dcgmScraper)
  36. si, err := util.NewInterval(scrapeInterval)
  37. if err != nil {
  38. panic(fmt.Errorf("scrapecontroller failed to create scrape interval: %w", err))
  39. }
  40. sc := &ScrapeController{
  41. scrapeInterval: si,
  42. scrapers: scrapers,
  43. repo: repo,
  44. }
  45. return sc
  46. }
  47. func (sc *ScrapeController) Start() {
  48. // Before we attempt to start, we must ensure we are not in a stopping state
  49. sc.runState.WaitForReset()
  50. // This will atomically check the current state to ensure we can run, then advances the state.
  51. // If the state is already started, it will return false.
  52. if !sc.runState.Start() {
  53. log.Info("metric already running")
  54. return
  55. }
  56. go func() {
  57. nextScrape := time.Now().UTC()
  58. timer := time.NewTimer(time.Duration(0))
  59. for {
  60. select {
  61. case <-sc.runState.OnStop():
  62. sc.runState.Reset()
  63. timer.Stop()
  64. return // exit go routine
  65. case <-timer.C:
  66. sc.Scrape(nextScrape)
  67. nextScrape = sc.scrapeInterval.Add(sc.scrapeInterval.Truncate(time.Now().UTC()), 1)
  68. timer.Reset(time.Until(nextScrape))
  69. }
  70. }
  71. }()
  72. }
  73. func (sc *ScrapeController) Stop() {
  74. sc.runState.Stop()
  75. }
  76. func (sc *ScrapeController) Scrape(timestamp time.Time) {
  77. // Run scrapes concurrently to minimize time from call to data collection
  78. var scrapeFuncs []ScrapeFunc
  79. for i := range sc.scrapers {
  80. scraper := sc.scrapers[i]
  81. scrapeFuncs = append(scrapeFuncs, scraper.Scrape)
  82. }
  83. scrapeResults := concurrentScrape(scrapeFuncs...)
  84. // once all results are returned run updates all at once with the same timestamp
  85. sc.repo.Update(scrapeResults, timestamp)
  86. }