datasource.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package collector
  2. import (
  3. "context"
  4. "time"
  5. "github.com/julienschmidt/httprouter"
  6. "github.com/opencost/opencost/core/pkg/clustercache"
  7. "github.com/opencost/opencost/core/pkg/clusters"
  8. "github.com/opencost/opencost/core/pkg/diagnostics"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/nodestats"
  11. "github.com/opencost/opencost/core/pkg/source"
  12. "github.com/opencost/opencost/core/pkg/storage"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  14. "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
  15. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  16. )
  17. type collectorDataSource struct {
  18. metricsQuerier *collectorMetricsQuerier
  19. clusterMap clusters.ClusterMap
  20. clusterInfo clusters.ClusterInfoProvider
  21. config CollectorConfig
  22. diagnosticsModule *metric.DiagnosticsModule
  23. }
  24. func NewDefaultCollectorDataSource(
  25. clusterUID string,
  26. store storage.Storage,
  27. clusterInfoProvider clusters.ClusterInfoProvider,
  28. clusterCache clustercache.ClusterCache,
  29. statSummaryClient nodestats.StatSummaryClient,
  30. ) source.OpenCostDataSource {
  31. config := NewOpenCostCollectorConfigFromEnv(clusterUID)
  32. return NewCollectorDataSource(
  33. config,
  34. store,
  35. clusterInfoProvider,
  36. clusterCache,
  37. statSummaryClient,
  38. )
  39. }
  40. func NewCollectorDataSource(
  41. config CollectorConfig,
  42. store storage.Storage,
  43. clusterInfoProvider clusters.ClusterInfoProvider,
  44. clusterCache clustercache.ClusterCache,
  45. statSummaryClient nodestats.StatSummaryClient,
  46. ) source.OpenCostDataSource {
  47. var resolutions []*util.Resolution
  48. for _, resconf := range config.Resolutions {
  49. resolution, err := util.NewResolution(resconf)
  50. if err != nil {
  51. log.Errorf("failed to create resolution %s", err.Error())
  52. continue
  53. }
  54. resolutions = append(resolutions, resolution)
  55. }
  56. repo := metric.NewMetricRepository(
  57. resolutions,
  58. NewOpenCostMetricStore,
  59. )
  60. var updater metric.Updater
  61. updater = repo
  62. if store != nil {
  63. wal, err := metric.NewWalinator(
  64. config.ClusterName,
  65. config.ApplicationName,
  66. store,
  67. resolutions,
  68. updater,
  69. )
  70. if err != nil {
  71. log.Errorf("failed to initialize the walinator: %s", err.Error())
  72. } else {
  73. wal.Start()
  74. updater = wal
  75. }
  76. }
  77. diagnosticsModule := metric.NewDiagnosticsModule()
  78. scrapeController := scrape.NewScrapeController(
  79. config.ClusterUID,
  80. config.ScrapeInterval,
  81. config.NetworkPort,
  82. updater,
  83. clusterInfoProvider,
  84. clusterCache,
  85. statSummaryClient,
  86. )
  87. scrapeController.Start()
  88. metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
  89. // cluster info provider
  90. clusterInfo := clusterInfoProvider
  91. clusterMap := newCollectorClusterMap(clusterInfo)
  92. return &collectorDataSource{
  93. config: config,
  94. metricsQuerier: metricQuerier,
  95. clusterInfo: clusterInfo,
  96. clusterMap: clusterMap,
  97. diagnosticsModule: diagnosticsModule,
  98. }
  99. }
  100. func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
  101. return
  102. }
  103. func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  104. const CollectorDiagnosticCategory = "collector"
  105. diagnosticDefinitions := c.diagnosticsModule.DiagnosticsDefinitions()
  106. for _, dd := range diagnosticDefinitions {
  107. err := diagService.Register(dd.MetricName, dd.Description, CollectorDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
  108. details, err := c.diagnosticsModule.DiagnosticsDetails(dd.ID)
  109. if err != nil {
  110. return nil, err
  111. }
  112. return details, nil
  113. })
  114. if err != nil {
  115. log.Warnf("Failed to register collector diagnostic %s: %s", dd.ID, err.Error())
  116. }
  117. }
  118. }
  119. func (c *collectorDataSource) Metrics() source.MetricsQuerier {
  120. return c.metricsQuerier
  121. }
  122. func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
  123. return c.clusterMap
  124. }
  125. func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  126. return c.clusterInfo
  127. }
  128. // BatchDuration collector data source queries do not need to be broken up
  129. func (c *collectorDataSource) BatchDuration() time.Duration {
  130. var maxDuration time.Duration = 1<<63 - 1
  131. return maxDuration
  132. }
  133. func (c *collectorDataSource) Resolution() time.Duration {
  134. interval, _ := util.NewInterval(c.config.ScrapeInterval)
  135. current := interval.Truncate(time.Now().UTC())
  136. next := interval.Add(current, 1)
  137. return next.Sub(current)
  138. }