datasource.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package collector
  2. import (
  3. "time"
  4. "github.com/julienschmidt/httprouter"
  5. "github.com/opencost/opencost/core/pkg/clustercache"
  6. "github.com/opencost/opencost/core/pkg/clusters"
  7. "github.com/opencost/opencost/core/pkg/diagnostics"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  10. "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
  11. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  12. "k8s.io/client-go/kubernetes"
  13. )
  14. type collectorDataSource struct {
  15. metricsQuerier *collectorMetricsQuerier
  16. clusterMap clusters.ClusterMap
  17. clusterInfo clusters.ClusterInfoProvider
  18. config CollectorConfig
  19. }
  20. func NewDefaultCollectorDataSource(
  21. clusterInfoProvider clusters.ClusterInfoProvider,
  22. clusterCache clustercache.ClusterCache,
  23. k8s kubernetes.Interface,
  24. statSummaryClient util.StatSummaryClient,
  25. ) source.OpenCostDataSource {
  26. config := NewOpenCostCollectorConfigFromEnv()
  27. return NewCollectorDataSource(
  28. config,
  29. clusterInfoProvider,
  30. clusterCache,
  31. k8s,
  32. statSummaryClient,
  33. )
  34. }
  35. func NewCollectorDataSource(
  36. config CollectorConfig,
  37. clusterInfoProvider clusters.ClusterInfoProvider,
  38. clusterCache clustercache.ClusterCache,
  39. k8s kubernetes.Interface,
  40. statSummaryClient util.StatSummaryClient,
  41. ) source.OpenCostDataSource {
  42. var storeFactory metric.MetricStoreFactory
  43. storeFactory = NewOpenCostMetricStore
  44. repo := metric.NewMetricRepository(metric.RepositoryConfig{
  45. Resolutions: config.Resolutions,
  46. }, storeFactory)
  47. scrapeController := scrape.NewScrapeController(
  48. config.ScrapeInterval,
  49. config.ReleaseName,
  50. config.NetworkPort,
  51. repo,
  52. clusterCache,
  53. k8s,
  54. statSummaryClient,
  55. )
  56. scrapeController.Start()
  57. metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
  58. // cluster info provider
  59. clusterInfo := clusterInfoProvider
  60. clusterMap := newCollectorClusterMap(clusterInfo)
  61. return &collectorDataSource{
  62. metricsQuerier: metricQuerier,
  63. clusterInfo: clusterInfo,
  64. clusterMap: clusterMap,
  65. }
  66. }
  67. func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
  68. return
  69. }
  70. func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  71. return
  72. }
  73. func (c *collectorDataSource) Metrics() source.MetricsQuerier {
  74. return c.metricsQuerier
  75. }
  76. func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
  77. return c.clusterMap
  78. }
  79. func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  80. return c.clusterInfo
  81. }
  82. // BatchDuration collector data source queries do not need to be broken up
  83. func (c *collectorDataSource) BatchDuration() time.Duration {
  84. var maxDuration time.Duration = 1<<63 - 1
  85. return maxDuration
  86. }
  87. func (c *collectorDataSource) Resolution() time.Duration {
  88. return c.config.ScrapeInterval
  89. }