datasource.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package collector
  2. import (
  3. "time"
  4. "github.com/julienschmidt/httprouter"
  5. "github.com/opencost/opencost/core/pkg/clustercache"
  6. "github.com/opencost/opencost/core/pkg/clusters"
  7. "github.com/opencost/opencost/core/pkg/diagnostics"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/nodestats"
  10. "github.com/opencost/opencost/core/pkg/source"
  11. "github.com/opencost/opencost/core/pkg/storage"
  12. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
  14. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  15. )
  16. type collectorDataSource struct {
  17. metricsQuerier *collectorMetricsQuerier
  18. clusterMap clusters.ClusterMap
  19. clusterInfo clusters.ClusterInfoProvider
  20. config CollectorConfig
  21. }
  22. func NewDefaultCollectorDataSource(
  23. store storage.Storage,
  24. clusterInfoProvider clusters.ClusterInfoProvider,
  25. clusterCache clustercache.ClusterCache,
  26. statSummaryClient nodestats.StatSummaryClient,
  27. ) source.OpenCostDataSource {
  28. config := NewOpenCostCollectorConfigFromEnv()
  29. return NewCollectorDataSource(
  30. config,
  31. store,
  32. clusterInfoProvider,
  33. clusterCache,
  34. statSummaryClient,
  35. )
  36. }
  37. func NewCollectorDataSource(
  38. config CollectorConfig,
  39. store storage.Storage,
  40. clusterInfoProvider clusters.ClusterInfoProvider,
  41. clusterCache clustercache.ClusterCache,
  42. statSummaryClient nodestats.StatSummaryClient,
  43. ) source.OpenCostDataSource {
  44. var resolutions []*util.Resolution
  45. for _, resconf := range config.Resolutions {
  46. resolution, err := util.NewResolution(resconf)
  47. if err != nil {
  48. log.Errorf("failed to create resolution %s", err.Error())
  49. continue
  50. }
  51. resolutions = append(resolutions, resolution)
  52. }
  53. repo := metric.NewMetricRepository(
  54. resolutions,
  55. NewOpenCostMetricStore,
  56. )
  57. var updater metric.Updater
  58. updater = repo
  59. if store != nil {
  60. wal, err := metric.NewWalinator(
  61. config.ClusterID,
  62. store,
  63. resolutions,
  64. repo,
  65. )
  66. if err != nil {
  67. log.Errorf("failed to initialize the walinator: %s", err.Error())
  68. } else {
  69. wal.Start()
  70. updater = wal
  71. }
  72. }
  73. scrapeController := scrape.NewScrapeController(
  74. config.ScrapeInterval,
  75. config.NetworkPort,
  76. updater,
  77. clusterCache,
  78. statSummaryClient,
  79. )
  80. scrapeController.Start()
  81. metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
  82. // cluster info provider
  83. clusterInfo := clusterInfoProvider
  84. clusterMap := newCollectorClusterMap(clusterInfo)
  85. return &collectorDataSource{
  86. config: config,
  87. metricsQuerier: metricQuerier,
  88. clusterInfo: clusterInfo,
  89. clusterMap: clusterMap,
  90. }
  91. }
  92. func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
  93. return
  94. }
  95. func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  96. return
  97. }
  98. func (c *collectorDataSource) Metrics() source.MetricsQuerier {
  99. return c.metricsQuerier
  100. }
  101. func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
  102. return c.clusterMap
  103. }
  104. func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  105. return c.clusterInfo
  106. }
  107. // BatchDuration collector data source queries do not need to be broken up
  108. func (c *collectorDataSource) BatchDuration() time.Duration {
  109. var maxDuration time.Duration = 1<<63 - 1
  110. return maxDuration
  111. }
  112. func (c *collectorDataSource) Resolution() time.Duration {
  113. interval, _ := util.NewInterval(c.config.ScrapeInterval)
  114. current := interval.Truncate(time.Now().UTC())
  115. next := interval.Add(current, 1)
  116. return next.Sub(current)
  117. }