package collector import ( "time" "github.com/julienschmidt/httprouter" "github.com/opencost/opencost/core/pkg/clustercache" "github.com/opencost/opencost/core/pkg/clusters" "github.com/opencost/opencost/core/pkg/diagnostics" "github.com/opencost/opencost/core/pkg/nodestats" "github.com/opencost/opencost/core/pkg/source" "github.com/opencost/opencost/core/pkg/storage" "github.com/opencost/opencost/modules/collector-source/pkg/metric" "github.com/opencost/opencost/modules/collector-source/pkg/scrape" "github.com/opencost/opencost/modules/collector-source/pkg/util" ) type collectorDataSource struct { metricsQuerier *collectorMetricsQuerier clusterMap clusters.ClusterMap clusterInfo clusters.ClusterInfoProvider config CollectorConfig } func NewDefaultCollectorDataSource( store storage.Storage, clusterInfoProvider clusters.ClusterInfoProvider, clusterCache clustercache.ClusterCache, statSummaryClient nodestats.StatSummaryClient, ) source.OpenCostDataSource { config := NewOpenCostCollectorConfigFromEnv() return NewCollectorDataSource( config, store, clusterInfoProvider, clusterCache, statSummaryClient, ) } func NewCollectorDataSource( config CollectorConfig, store storage.Storage, clusterInfoProvider clusters.ClusterInfoProvider, clusterCache clustercache.ClusterCache, statSummaryClient nodestats.StatSummaryClient, ) source.OpenCostDataSource { repo := metric.NewMetricRepository( config.ClusterID, config.Resolutions, store, NewOpenCostMetricStore, ) scrapeController := scrape.NewScrapeController( config.ScrapeInterval, config.NetworkPort, repo, clusterCache, statSummaryClient, ) scrapeController.Start() metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions) // cluster info provider clusterInfo := clusterInfoProvider clusterMap := newCollectorClusterMap(clusterInfo) return &collectorDataSource{ metricsQuerier: metricQuerier, clusterInfo: clusterInfo, clusterMap: clusterMap, } } func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) { return } func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) { return } func (c *collectorDataSource) Metrics() source.MetricsQuerier { return c.metricsQuerier } func (c *collectorDataSource) ClusterMap() clusters.ClusterMap { return c.clusterMap } func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider { return c.clusterInfo } // BatchDuration collector data source queries do not need to be broken up func (c *collectorDataSource) BatchDuration() time.Duration { var maxDuration time.Duration = 1<<63 - 1 return maxDuration } func (c *collectorDataSource) Resolution() time.Duration { interval, _ := util.NewInterval(c.config.ScrapeInterval) current := interval.Truncate(time.Now().UTC()) next := interval.Add(current, 1) return next.Sub(current) }