datasource.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package collector
  2. import (
  3. "context"
  4. "time"
  5. "github.com/julienschmidt/httprouter"
  6. "github.com/opencost/opencost/core/pkg/clustercache"
  7. "github.com/opencost/opencost/core/pkg/clusters"
  8. "github.com/opencost/opencost/core/pkg/diagnostics"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/nodestats"
  11. "github.com/opencost/opencost/core/pkg/source"
  12. "github.com/opencost/opencost/core/pkg/storage"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  14. "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
  15. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  16. )
  17. type collectorDataSource struct {
  18. metricsQuerier *collectorMetricsQuerier
  19. clusterMap clusters.ClusterMap
  20. clusterInfo clusters.ClusterInfoProvider
  21. config CollectorConfig
  22. diagnosticsModule *metric.DiagnosticsModule
  23. }
  24. func NewDefaultCollectorDataSource(
  25. store storage.Storage,
  26. clusterInfoProvider clusters.ClusterInfoProvider,
  27. clusterCache clustercache.ClusterCache,
  28. statSummaryClient nodestats.StatSummaryClient,
  29. ) source.OpenCostDataSource {
  30. config := NewOpenCostCollectorConfigFromEnv()
  31. return NewCollectorDataSource(
  32. config,
  33. store,
  34. clusterInfoProvider,
  35. clusterCache,
  36. statSummaryClient,
  37. )
  38. }
  39. func NewCollectorDataSource(
  40. config CollectorConfig,
  41. store storage.Storage,
  42. clusterInfoProvider clusters.ClusterInfoProvider,
  43. clusterCache clustercache.ClusterCache,
  44. statSummaryClient nodestats.StatSummaryClient,
  45. ) source.OpenCostDataSource {
  46. var resolutions []*util.Resolution
  47. for _, resconf := range config.Resolutions {
  48. resolution, err := util.NewResolution(resconf)
  49. if err != nil {
  50. log.Errorf("failed to create resolution %s", err.Error())
  51. continue
  52. }
  53. resolutions = append(resolutions, resolution)
  54. }
  55. repo := metric.NewMetricRepository(
  56. resolutions,
  57. NewOpenCostMetricStore,
  58. )
  59. var updater metric.Updater
  60. updater = repo
  61. if store != nil {
  62. wal, err := metric.NewWalinator(
  63. config.ClusterID,
  64. store,
  65. resolutions,
  66. updater,
  67. )
  68. if err != nil {
  69. log.Errorf("failed to initialize the walinator: %s", err.Error())
  70. } else {
  71. wal.Start()
  72. updater = wal
  73. }
  74. }
  75. diagnosticsModule := metric.NewDiagnosticsModule(updater)
  76. updater = diagnosticsModule
  77. scrapeController := scrape.NewScrapeController(
  78. config.ScrapeInterval,
  79. config.NetworkPort,
  80. updater,
  81. clusterCache,
  82. statSummaryClient,
  83. )
  84. scrapeController.Start()
  85. metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
  86. // cluster info provider
  87. clusterInfo := clusterInfoProvider
  88. clusterMap := newCollectorClusterMap(clusterInfo)
  89. return &collectorDataSource{
  90. config: config,
  91. metricsQuerier: metricQuerier,
  92. clusterInfo: clusterInfo,
  93. clusterMap: clusterMap,
  94. diagnosticsModule: diagnosticsModule,
  95. }
  96. }
  97. func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
  98. return
  99. }
  100. func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  101. const CollectorDiagnosticCategory = "collector"
  102. diagnosticDefinitions := c.diagnosticsModule.DiagnosticsDefinitions()
  103. for _, dd := range diagnosticDefinitions {
  104. err := diagService.Register(dd.ID, dd.Description, CollectorDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
  105. details, err := c.diagnosticsModule.DiagnosticsDetails(dd.ID)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return details, nil
  110. })
  111. if err != nil {
  112. log.Warnf("Failed to register collector diagnostic %s: %s", dd.ID, err.Error())
  113. }
  114. }
  115. }
  116. func (c *collectorDataSource) Metrics() source.MetricsQuerier {
  117. return c.metricsQuerier
  118. }
  119. func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
  120. return c.clusterMap
  121. }
  122. func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  123. return c.clusterInfo
  124. }
  125. // BatchDuration collector data source queries do not need to be broken up
  126. func (c *collectorDataSource) BatchDuration() time.Duration {
  127. var maxDuration time.Duration = 1<<63 - 1
  128. return maxDuration
  129. }
  130. func (c *collectorDataSource) Resolution() time.Duration {
  131. interval, _ := util.NewInterval(c.config.ScrapeInterval)
  132. current := interval.Truncate(time.Now().UTC())
  133. next := interval.Add(current, 1)
  134. return next.Sub(current)
  135. }