scrapecontroller.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package scrape
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/clustercache"
  6. "github.com/opencost/opencost/core/pkg/clusters"
  7. coreenv "github.com/opencost/opencost/core/pkg/env"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/nodestats"
  10. "github.com/opencost/opencost/core/pkg/util/atomic"
  11. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  12. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  13. )
  14. // ScrapeController initializes and holds the scrapers in addition to running the loop that triggers scrapes
  15. type ScrapeController struct {
  16. scrapeInterval util.Interval
  17. runState atomic.AtomicRunState
  18. scrapers []Scraper
  19. updater metric.Updater
  20. }
  21. // getDefaultMetricFilter builds a MetricFilter from environment variable configuration.
  22. // Metrics whose corresponding env var is false (the default) are added to the deny set.
  23. func getDefaultMetricFilter() MetricFilter {
  24. f := MetricFilter{}
  25. deny := func(name string) { f[name] = struct{}{} }
  26. if !coreenv.IsEmitPodAnnotationsMetric() {
  27. deny(metric.KubePodAnnotations)
  28. }
  29. if !coreenv.IsEmitNamespaceAnnotationsMetric() {
  30. deny(metric.KubeNamespaceAnnotations)
  31. }
  32. if !coreenv.IsEmitDeploymentLabelsMetric() {
  33. deny(metric.DeploymentLabels)
  34. }
  35. if !coreenv.IsEmitDeploymentAnnotationsMetric() {
  36. deny(metric.DeploymentAnnotations)
  37. }
  38. if !coreenv.IsEmitStatefulSetLabelsMetric() {
  39. deny(metric.StatefulSetLabels)
  40. }
  41. if !coreenv.IsEmitStatefulSetAnnotationsMetric() {
  42. deny(metric.StatefulSetAnnotations)
  43. }
  44. if !coreenv.IsEmitDaemonSetLabelsMetric() {
  45. deny(metric.DaemonSetLabels)
  46. }
  47. if !coreenv.IsEmitDaemonSetAnnotationsMetric() {
  48. deny(metric.DaemonSetAnnotations)
  49. }
  50. if !coreenv.IsEmitJobLabelsMetric() {
  51. deny(metric.JobLabels)
  52. }
  53. if !coreenv.IsEmitJobAnnotationsMetric() {
  54. deny(metric.JobAnnotations)
  55. }
  56. if !coreenv.IsEmitCronJobLabelsMetric() {
  57. deny(metric.CronJobLabels)
  58. }
  59. if !coreenv.IsEmitCronJobAnnotationsMetric() {
  60. deny(metric.CronJobAnnotations)
  61. }
  62. if !coreenv.IsEmitReplicaSetLabelsMetric() {
  63. deny(metric.ReplicaSetLabels)
  64. }
  65. if !coreenv.IsEmitReplicaSetAnnotationsMetric() {
  66. deny(metric.ReplicaSetAnnotations)
  67. }
  68. return f
  69. }
  70. func NewScrapeController(
  71. clusterUID string,
  72. scrapeInterval string,
  73. networkPort int,
  74. updater metric.Updater,
  75. clusterInfoProvider clusters.ClusterInfoProvider,
  76. clusterCache clustercache.ClusterCache,
  77. statSummaryClient nodestats.StatSummaryClient,
  78. ) *ScrapeController {
  79. // Start with env-driven defaults, then layer in any caller-supplied entries.
  80. filter := getDefaultMetricFilter()
  81. var scrapers []Scraper
  82. clusterInfoScrapper := withFilter(newClusterInfoScrapper(clusterUID, clusterInfoProvider), filter)
  83. scrapers = append(scrapers, clusterInfoScrapper)
  84. clusterCacheScraper := withFilter(newClusterCacheScraper(clusterCache), filter)
  85. scrapers = append(scrapers, clusterCacheScraper)
  86. opencostScraper := withFilter(newOpenCostScraper(), filter)
  87. scrapers = append(scrapers, opencostScraper)
  88. statSummaryScraper := withFilter(newStatSummaryScraper(statSummaryClient, clusterCache), filter)
  89. scrapers = append(scrapers, statSummaryScraper)
  90. networkScraper := withFilter(newNetworkScraper(networkPort, clusterCache), filter)
  91. scrapers = append(scrapers, networkScraper)
  92. dcgmScraper := withFilter(newDCGMScrapper(clusterCache), filter)
  93. scrapers = append(scrapers, dcgmScraper)
  94. si, err := util.NewInterval(scrapeInterval)
  95. if err != nil {
  96. panic(fmt.Errorf("scrapecontroller failed to create scrape interval: %w", err))
  97. }
  98. sc := &ScrapeController{
  99. scrapeInterval: si,
  100. scrapers: scrapers,
  101. updater: updater,
  102. }
  103. return sc
  104. }
  105. func (sc *ScrapeController) Start() {
  106. // Before we attempt to start, we must ensure we are not in a stopping state
  107. sc.runState.WaitForReset()
  108. // This will atomically check the current state to ensure we can run, then advances the state.
  109. // If the state is already started, it will return false.
  110. if !sc.runState.Start() {
  111. log.Info("metric already running")
  112. return
  113. }
  114. go func() {
  115. nextScrape := time.Now().UTC()
  116. timer := time.NewTimer(time.Duration(0))
  117. for {
  118. select {
  119. case <-sc.runState.OnStop():
  120. sc.runState.Reset()
  121. timer.Stop()
  122. return // exit go routine
  123. case <-timer.C:
  124. sc.Scrape(nextScrape)
  125. nextScrape = sc.scrapeInterval.Add(sc.scrapeInterval.Truncate(time.Now().UTC()), 1)
  126. timer.Reset(time.Until(nextScrape))
  127. }
  128. }
  129. }()
  130. }
  131. func (sc *ScrapeController) Stop() {
  132. sc.runState.Stop()
  133. }
  134. func (sc *ScrapeController) Scrape(timestamp time.Time) {
  135. // Run scrapes concurrently to minimize time from call to data collection
  136. var scrapeFuncs []ScrapeFunc
  137. for i := range sc.scrapers {
  138. scraper := sc.scrapers[i]
  139. scrapeFuncs = append(scrapeFuncs, scraper.Scrape)
  140. }
  141. scrapeResults := concurrentScrape(scrapeFuncs...)
  142. // once all results are returned run updates all at once with the same timestamp
  143. sc.updater.Update(&metric.UpdateSet{
  144. Timestamp: timestamp,
  145. Updates: scrapeResults,
  146. })
  147. }