| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- package scrape
- import (
- "fmt"
- "time"
- "github.com/opencost/opencost/core/pkg/clustercache"
- "github.com/opencost/opencost/core/pkg/clusters"
- coreenv "github.com/opencost/opencost/core/pkg/env"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/nodestats"
- "github.com/opencost/opencost/core/pkg/util/atomic"
- "github.com/opencost/opencost/modules/collector-source/pkg/metric"
- "github.com/opencost/opencost/modules/collector-source/pkg/util"
- )
- // ScrapeController initializes and holds the scrapers in addition to running the loop that triggers scrapes
- type ScrapeController struct {
- scrapeInterval util.Interval
- runState atomic.AtomicRunState
- scrapers []Scraper
- updater metric.Updater
- }
- // getDefaultMetricFilter builds a MetricFilter from environment variable configuration.
- // Metrics whose corresponding env var is false (the default) are added to the deny set.
- func getDefaultMetricFilter() MetricFilter {
- f := MetricFilter{}
- deny := func(name string) { f[name] = struct{}{} }
- if !coreenv.IsEmitPodAnnotationsMetric() {
- deny(metric.KubePodAnnotations)
- }
- if !coreenv.IsEmitNamespaceAnnotationsMetric() {
- deny(metric.KubeNamespaceAnnotations)
- }
- if !coreenv.IsEmitDeploymentLabelsMetric() {
- deny(metric.DeploymentLabels)
- }
- if !coreenv.IsEmitDeploymentAnnotationsMetric() {
- deny(metric.DeploymentAnnotations)
- }
- if !coreenv.IsEmitStatefulSetLabelsMetric() {
- deny(metric.StatefulSetLabels)
- }
- if !coreenv.IsEmitStatefulSetAnnotationsMetric() {
- deny(metric.StatefulSetAnnotations)
- }
- if !coreenv.IsEmitDaemonSetLabelsMetric() {
- deny(metric.DaemonSetLabels)
- }
- if !coreenv.IsEmitDaemonSetAnnotationsMetric() {
- deny(metric.DaemonSetAnnotations)
- }
- if !coreenv.IsEmitJobLabelsMetric() {
- deny(metric.JobLabels)
- }
- if !coreenv.IsEmitJobAnnotationsMetric() {
- deny(metric.JobAnnotations)
- }
- if !coreenv.IsEmitCronJobLabelsMetric() {
- deny(metric.CronJobLabels)
- }
- if !coreenv.IsEmitCronJobAnnotationsMetric() {
- deny(metric.CronJobAnnotations)
- }
- if !coreenv.IsEmitReplicaSetLabelsMetric() {
- deny(metric.ReplicaSetLabels)
- }
- if !coreenv.IsEmitReplicaSetAnnotationsMetric() {
- deny(metric.ReplicaSetAnnotations)
- }
- return f
- }
- func NewScrapeController(
- clusterUID string,
- scrapeInterval string,
- networkPort int,
- updater metric.Updater,
- clusterInfoProvider clusters.ClusterInfoProvider,
- clusterCache clustercache.ClusterCache,
- statSummaryClient nodestats.StatSummaryClient,
- ) *ScrapeController {
- // Start with env-driven defaults, then layer in any caller-supplied entries.
- filter := getDefaultMetricFilter()
- var scrapers []Scraper
- clusterInfoScrapper := withFilter(newClusterInfoScrapper(clusterUID, clusterInfoProvider), filter)
- scrapers = append(scrapers, clusterInfoScrapper)
- clusterCacheScraper := withFilter(newClusterCacheScraper(clusterCache), filter)
- scrapers = append(scrapers, clusterCacheScraper)
- opencostScraper := withFilter(newOpenCostScraper(), filter)
- scrapers = append(scrapers, opencostScraper)
- statSummaryScraper := withFilter(newStatSummaryScraper(statSummaryClient, clusterCache), filter)
- scrapers = append(scrapers, statSummaryScraper)
- networkScraper := withFilter(newNetworkScraper(networkPort, clusterCache), filter)
- scrapers = append(scrapers, networkScraper)
- dcgmScraper := withFilter(newDCGMScrapper(clusterCache), filter)
- scrapers = append(scrapers, dcgmScraper)
- si, err := util.NewInterval(scrapeInterval)
- if err != nil {
- panic(fmt.Errorf("scrapecontroller failed to create scrape interval: %w", err))
- }
- sc := &ScrapeController{
- scrapeInterval: si,
- scrapers: scrapers,
- updater: updater,
- }
- return sc
- }
- func (sc *ScrapeController) Start() {
- // Before we attempt to start, we must ensure we are not in a stopping state
- sc.runState.WaitForReset()
- // This will atomically check the current state to ensure we can run, then advances the state.
- // If the state is already started, it will return false.
- if !sc.runState.Start() {
- log.Info("metric already running")
- return
- }
- go func() {
- nextScrape := time.Now().UTC()
- timer := time.NewTimer(time.Duration(0))
- for {
- select {
- case <-sc.runState.OnStop():
- sc.runState.Reset()
- timer.Stop()
- return // exit go routine
- case <-timer.C:
- sc.Scrape(nextScrape)
- nextScrape = sc.scrapeInterval.Add(sc.scrapeInterval.Truncate(time.Now().UTC()), 1)
- timer.Reset(time.Until(nextScrape))
- }
- }
- }()
- }
- func (sc *ScrapeController) Stop() {
- sc.runState.Stop()
- }
- func (sc *ScrapeController) Scrape(timestamp time.Time) {
- // Run scrapes concurrently to minimize time from call to data collection
- var scrapeFuncs []ScrapeFunc
- for i := range sc.scrapers {
- scraper := sc.scrapers[i]
- scrapeFuncs = append(scrapeFuncs, scraper.Scrape)
- }
- scrapeResults := concurrentScrape(scrapeFuncs...)
- // once all results are returned run updates all at once with the same timestamp
- sc.updater.Update(&metric.UpdateSet{
- Timestamp: timestamp,
- Updates: scrapeResults,
- })
- }
|