2
0

scrapecontroller.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package scrape
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/clustercache"
  6. "github.com/opencost/opencost/core/pkg/clusters"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/nodestats"
  9. "github.com/opencost/opencost/core/pkg/util/atomic"
  10. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  11. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  12. )
  13. // ScrapeController initializes and holds the scrapers in addition to running the loop that triggers scrapes
  14. type ScrapeController struct {
  15. scrapeInterval util.Interval
  16. runState atomic.AtomicRunState
  17. scrapers []Scraper
  18. updater metric.Updater
  19. }
  20. func NewScrapeController(
  21. clusterUID string,
  22. scrapeInterval string,
  23. networkPort int,
  24. updater metric.Updater,
  25. clusterInfoProvider clusters.ClusterInfoProvider,
  26. clusterCache clustercache.ClusterCache,
  27. statSummaryClient nodestats.StatSummaryClient,
  28. ) *ScrapeController {
  29. var scrapers []Scraper
  30. clusterInfoScrapper := newClusterInfoScrapper(clusterUID, clusterInfoProvider)
  31. scrapers = append(scrapers, clusterInfoScrapper)
  32. clusterCacheScraper := newClusterCacheScraper(clusterCache)
  33. scrapers = append(scrapers, clusterCacheScraper)
  34. opencostScraper := newOpenCostScraper()
  35. scrapers = append(scrapers, opencostScraper)
  36. statSummaryScraper := newStatSummaryScraper(statSummaryClient)
  37. scrapers = append(scrapers, statSummaryScraper)
  38. networkScraper := newNetworkScraper(networkPort, clusterCache)
  39. scrapers = append(scrapers, networkScraper)
  40. dcgmScraper := newDCGMScrapper(clusterCache)
  41. scrapers = append(scrapers, dcgmScraper)
  42. si, err := util.NewInterval(scrapeInterval)
  43. if err != nil {
  44. panic(fmt.Errorf("scrapecontroller failed to create scrape interval: %w", err))
  45. }
  46. sc := &ScrapeController{
  47. scrapeInterval: si,
  48. scrapers: scrapers,
  49. updater: updater,
  50. }
  51. return sc
  52. }
  53. func (sc *ScrapeController) Start() {
  54. // Before we attempt to start, we must ensure we are not in a stopping state
  55. sc.runState.WaitForReset()
  56. // This will atomically check the current state to ensure we can run, then advances the state.
  57. // If the state is already started, it will return false.
  58. if !sc.runState.Start() {
  59. log.Info("metric already running")
  60. return
  61. }
  62. go func() {
  63. nextScrape := time.Now().UTC()
  64. timer := time.NewTimer(time.Duration(0))
  65. for {
  66. select {
  67. case <-sc.runState.OnStop():
  68. sc.runState.Reset()
  69. timer.Stop()
  70. return // exit go routine
  71. case <-timer.C:
  72. sc.Scrape(nextScrape)
  73. nextScrape = sc.scrapeInterval.Add(sc.scrapeInterval.Truncate(time.Now().UTC()), 1)
  74. timer.Reset(time.Until(nextScrape))
  75. }
  76. }
  77. }()
  78. }
  79. func (sc *ScrapeController) Stop() {
  80. sc.runState.Stop()
  81. }
  82. func (sc *ScrapeController) Scrape(timestamp time.Time) {
  83. // Run scrapes concurrently to minimize time from call to data collection
  84. var scrapeFuncs []ScrapeFunc
  85. for i := range sc.scrapers {
  86. scraper := sc.scrapers[i]
  87. scrapeFuncs = append(scrapeFuncs, scraper.Scrape)
  88. }
  89. scrapeResults := concurrentScrape(scrapeFuncs...)
  90. // once all results are returned run updates all at once with the same timestamp
  91. sc.updater.Update(&metric.UpdateSet{
  92. Timestamp: timestamp,
  93. Updates: scrapeResults,
  94. })
  95. }