scrapecontroller.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package scrape
  2. import (
  3. "time"
  4. "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/util/atomic"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  9. "k8s.io/client-go/kubernetes"
  10. )
  11. // ScrapeController initializes and holds the scrapers in addition to running the loop that triggers scrapes
  12. type ScrapeController struct {
  13. scrapeInterval time.Duration
  14. runState atomic.AtomicRunState
  15. scrapers []Scraper
  16. }
  17. func NewScrapeController(
  18. scrapeInterval time.Duration,
  19. releaseName string,
  20. networkPort int,
  21. updater metric.MetricUpdater,
  22. clusterCache clustercache.ClusterCache,
  23. k8s kubernetes.Interface,
  24. statSummaryClient util.StatSummaryClient,
  25. ) *ScrapeController {
  26. var scrapers []Scraper
  27. clusterCacheScraper := newClusterCacheScraper(clusterCache, updater)
  28. scrapers = append(scrapers, clusterCacheScraper)
  29. opencostScraper := newOpenCostScraper(updater)
  30. scrapers = append(scrapers, opencostScraper)
  31. statSummaryScraper := newStatSummaryScraper(statSummaryClient, updater)
  32. scrapers = append(scrapers, statSummaryScraper)
  33. networkScraper := newNetworkScraper(releaseName, networkPort, clusterCache, updater)
  34. scrapers = append(scrapers, networkScraper)
  35. dcgmScraper := newDCGMScrapper(clusterCache, updater)
  36. scrapers = append(scrapers, dcgmScraper)
  37. sc := &ScrapeController{
  38. scrapeInterval: scrapeInterval,
  39. scrapers: scrapers,
  40. }
  41. return sc
  42. }
  43. func (sc *ScrapeController) Start() {
  44. // Before we attempt to start, we must ensure we are not in a stopping state
  45. sc.runState.WaitForReset()
  46. // This will atomically check the current state to ensure we can run, then advances the state.
  47. // If the state is already started, it will return false.
  48. if !sc.runState.Start() {
  49. log.Info("metric already running")
  50. return
  51. }
  52. go func() {
  53. ticker := time.NewTicker(sc.scrapeInterval)
  54. for {
  55. for _, scraper := range sc.scrapers {
  56. scraper.Scrape()
  57. }
  58. select {
  59. case <-sc.runState.OnStop():
  60. sc.runState.Reset()
  61. ticker.Stop()
  62. return // exit go routine
  63. case <-ticker.C:
  64. }
  65. }
  66. }()
  67. }
  68. func (sc *ScrapeController) Stop() {
  69. sc.runState.Stop()
  70. }