datasource.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package collector
  2. import (
  3. "os"
  4. "time"
  5. "github.com/julienschmidt/httprouter"
  6. "github.com/opencost/opencost/core/pkg/clustercache"
  7. "github.com/opencost/opencost/core/pkg/clusters"
  8. "github.com/opencost/opencost/core/pkg/diagnostics"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/source"
  11. "github.com/opencost/opencost/core/pkg/storage"
  12. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
  14. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  15. )
  16. type collectorDataSource struct {
  17. metricsQuerier *collectorMetricsQuerier
  18. clusterMap clusters.ClusterMap
  19. clusterInfo clusters.ClusterInfoProvider
  20. config CollectorConfig
  21. }
  22. func NewDefaultCollectorDataSource(
  23. clusterInfoProvider clusters.ClusterInfoProvider,
  24. clusterCache clustercache.ClusterCache,
  25. statSummaryClient util.StatSummaryClient,
  26. ) source.OpenCostDataSource {
  27. config := NewOpenCostCollectorConfigFromEnv()
  28. return NewCollectorDataSource(
  29. config,
  30. clusterInfoProvider,
  31. clusterCache,
  32. statSummaryClient,
  33. )
  34. }
  35. func NewCollectorDataSource(
  36. config CollectorConfig,
  37. clusterInfoProvider clusters.ClusterInfoProvider,
  38. clusterCache clustercache.ClusterCache,
  39. statSummaryClient util.StatSummaryClient,
  40. ) source.OpenCostDataSource {
  41. var store storage.Storage
  42. if config.BucketConfigFile != "" {
  43. bucketConfig, err := os.ReadFile(config.BucketConfigFile)
  44. if err != nil {
  45. log.Errorf("Failed to initialize bucket output storage, please check your configuration and bucket security settings: %s", err)
  46. } else {
  47. store, err = storage.NewBucketStorage(bucketConfig)
  48. if err != nil {
  49. log.Errorf("Failed to create bucket storage, please check your configuration and bucket security settings: %s", err)
  50. }
  51. }
  52. }
  53. repo := metric.NewMetricRepository(
  54. config.ClusterID,
  55. config.Resolutions,
  56. store,
  57. NewOpenCostMetricStore,
  58. )
  59. scrapeController := scrape.NewScrapeController(
  60. config.ScrapeInterval,
  61. config.NetworkPort,
  62. repo,
  63. clusterCache,
  64. statSummaryClient,
  65. )
  66. scrapeController.Start()
  67. metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
  68. // cluster info provider
  69. clusterInfo := clusterInfoProvider
  70. clusterMap := newCollectorClusterMap(clusterInfo)
  71. return &collectorDataSource{
  72. metricsQuerier: metricQuerier,
  73. clusterInfo: clusterInfo,
  74. clusterMap: clusterMap,
  75. }
  76. }
  77. func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
  78. return
  79. }
  80. func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  81. return
  82. }
  83. func (c *collectorDataSource) Metrics() source.MetricsQuerier {
  84. return c.metricsQuerier
  85. }
  86. func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
  87. return c.clusterMap
  88. }
  89. func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  90. return c.clusterInfo
  91. }
  92. // BatchDuration collector data source queries do not need to be broken up
  93. func (c *collectorDataSource) BatchDuration() time.Duration {
  94. var maxDuration time.Duration = 1<<63 - 1
  95. return maxDuration
  96. }
  97. func (c *collectorDataSource) Resolution() time.Duration {
  98. interval, _ := util.NewInterval(c.config.ScrapeInterval)
  99. current := interval.Truncate(time.Now().UTC())
  100. next := interval.Add(current, 1)
  101. return next.Sub(current)
  102. }