| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package collector
- import (
- "time"
- "github.com/julienschmidt/httprouter"
- "github.com/opencost/opencost/core/pkg/clustercache"
- "github.com/opencost/opencost/core/pkg/clusters"
- "github.com/opencost/opencost/core/pkg/diagnostics"
- "github.com/opencost/opencost/core/pkg/nodestats"
- "github.com/opencost/opencost/core/pkg/source"
- "github.com/opencost/opencost/core/pkg/storage"
- "github.com/opencost/opencost/modules/collector-source/pkg/metric"
- "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
- "github.com/opencost/opencost/modules/collector-source/pkg/util"
- )
- type collectorDataSource struct {
- metricsQuerier *collectorMetricsQuerier
- clusterMap clusters.ClusterMap
- clusterInfo clusters.ClusterInfoProvider
- config CollectorConfig
- }
- func NewDefaultCollectorDataSource(
- store storage.Storage,
- clusterInfoProvider clusters.ClusterInfoProvider,
- clusterCache clustercache.ClusterCache,
- statSummaryClient nodestats.StatSummaryClient,
- ) source.OpenCostDataSource {
- config := NewOpenCostCollectorConfigFromEnv()
- return NewCollectorDataSource(
- config,
- store,
- clusterInfoProvider,
- clusterCache,
- statSummaryClient,
- )
- }
- func NewCollectorDataSource(
- config CollectorConfig,
- store storage.Storage,
- clusterInfoProvider clusters.ClusterInfoProvider,
- clusterCache clustercache.ClusterCache,
- statSummaryClient nodestats.StatSummaryClient,
- ) source.OpenCostDataSource {
- repo := metric.NewMetricRepository(
- config.ClusterID,
- config.Resolutions,
- store,
- NewOpenCostMetricStore,
- )
- scrapeController := scrape.NewScrapeController(
- config.ScrapeInterval,
- config.NetworkPort,
- repo,
- clusterCache,
- statSummaryClient,
- )
- scrapeController.Start()
- metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
- // cluster info provider
- clusterInfo := clusterInfoProvider
- clusterMap := newCollectorClusterMap(clusterInfo)
- return &collectorDataSource{
- metricsQuerier: metricQuerier,
- clusterInfo: clusterInfo,
- clusterMap: clusterMap,
- }
- }
- func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
- return
- }
- func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
- return
- }
- func (c *collectorDataSource) Metrics() source.MetricsQuerier {
- return c.metricsQuerier
- }
- func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
- return c.clusterMap
- }
- func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
- return c.clusterInfo
- }
- // BatchDuration collector data source queries do not need to be broken up
- func (c *collectorDataSource) BatchDuration() time.Duration {
- var maxDuration time.Duration = 1<<63 - 1
- return maxDuration
- }
- func (c *collectorDataSource) Resolution() time.Duration {
- interval, _ := util.NewInterval(c.config.ScrapeInterval)
- current := interval.Truncate(time.Now().UTC())
- next := interval.Add(current, 1)
- return next.Sub(current)
- }
|