datasource.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package collector
  2. import (
  3. "time"
  4. "github.com/julienschmidt/httprouter"
  5. "github.com/opencost/opencost/core/pkg/clustercache"
  6. "github.com/opencost/opencost/core/pkg/clusters"
  7. "github.com/opencost/opencost/core/pkg/diagnostics"
  8. "github.com/opencost/opencost/core/pkg/nodestats"
  9. "github.com/opencost/opencost/core/pkg/source"
  10. "github.com/opencost/opencost/core/pkg/storage"
  11. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  12. "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  14. )
  15. type collectorDataSource struct {
  16. metricsQuerier *collectorMetricsQuerier
  17. clusterMap clusters.ClusterMap
  18. clusterInfo clusters.ClusterInfoProvider
  19. config CollectorConfig
  20. }
  21. func NewDefaultCollectorDataSource(
  22. store storage.Storage,
  23. clusterInfoProvider clusters.ClusterInfoProvider,
  24. clusterCache clustercache.ClusterCache,
  25. statSummaryClient nodestats.StatSummaryClient,
  26. ) source.OpenCostDataSource {
  27. config := NewOpenCostCollectorConfigFromEnv()
  28. return NewCollectorDataSource(
  29. config,
  30. store,
  31. clusterInfoProvider,
  32. clusterCache,
  33. statSummaryClient,
  34. )
  35. }
  36. func NewCollectorDataSource(
  37. config CollectorConfig,
  38. store storage.Storage,
  39. clusterInfoProvider clusters.ClusterInfoProvider,
  40. clusterCache clustercache.ClusterCache,
  41. statSummaryClient nodestats.StatSummaryClient,
  42. ) source.OpenCostDataSource {
  43. repo := metric.NewMetricRepository(
  44. config.ClusterID,
  45. config.Resolutions,
  46. store,
  47. NewOpenCostMetricStore,
  48. )
  49. scrapeController := scrape.NewScrapeController(
  50. config.ScrapeInterval,
  51. config.NetworkPort,
  52. repo,
  53. clusterCache,
  54. statSummaryClient,
  55. )
  56. scrapeController.Start()
  57. metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
  58. // cluster info provider
  59. clusterInfo := clusterInfoProvider
  60. clusterMap := newCollectorClusterMap(clusterInfo)
  61. return &collectorDataSource{
  62. metricsQuerier: metricQuerier,
  63. clusterInfo: clusterInfo,
  64. clusterMap: clusterMap,
  65. }
  66. }
  67. func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
  68. return
  69. }
  70. func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  71. return
  72. }
  73. func (c *collectorDataSource) Metrics() source.MetricsQuerier {
  74. return c.metricsQuerier
  75. }
  76. func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
  77. return c.clusterMap
  78. }
  79. func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  80. return c.clusterInfo
  81. }
  82. // BatchDuration collector data source queries do not need to be broken up
  83. func (c *collectorDataSource) BatchDuration() time.Duration {
  84. var maxDuration time.Duration = 1<<63 - 1
  85. return maxDuration
  86. }
  87. func (c *collectorDataSource) Resolution() time.Duration {
  88. interval, _ := util.NewInterval(c.config.ScrapeInterval)
  89. current := interval.Truncate(time.Now().UTC())
  90. next := interval.Add(current, 1)
  91. return next.Sub(current)
  92. }