datasource.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package collector
  2. import (
  3. "context"
  4. "time"
  5. "github.com/julienschmidt/httprouter"
  6. "github.com/opencost/opencost/core/pkg/clustercache"
  7. "github.com/opencost/opencost/core/pkg/clusters"
  8. "github.com/opencost/opencost/core/pkg/diagnostics"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/nodestats"
  11. "github.com/opencost/opencost/core/pkg/source"
  12. "github.com/opencost/opencost/core/pkg/storage"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  14. "github.com/opencost/opencost/modules/collector-source/pkg/metric/synthetic"
  15. "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
  16. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  17. )
  18. type collectorDataSource struct {
  19. metricsQuerier *collectorMetricsQuerier
  20. clusterMap clusters.ClusterMap
  21. clusterInfo clusters.ClusterInfoProvider
  22. config CollectorConfig
  23. diagnosticsModule *metric.DiagnosticsModule
  24. }
  25. func NewDefaultCollectorDataSource(
  26. clusterUID string,
  27. store storage.Storage,
  28. clusterInfoProvider clusters.ClusterInfoProvider,
  29. clusterCache clustercache.ClusterCache,
  30. statSummaryClient nodestats.StatSummaryClient,
  31. ) source.OpenCostDataSource {
  32. config := NewOpenCostCollectorConfigFromEnv(clusterUID)
  33. return NewCollectorDataSource(
  34. config,
  35. store,
  36. clusterInfoProvider,
  37. clusterCache,
  38. statSummaryClient,
  39. )
  40. }
  41. func NewCollectorDataSource(
  42. config CollectorConfig,
  43. store storage.Storage,
  44. clusterInfoProvider clusters.ClusterInfoProvider,
  45. clusterCache clustercache.ClusterCache,
  46. statSummaryClient nodestats.StatSummaryClient,
  47. ) source.OpenCostDataSource {
  48. var resolutions []*util.Resolution
  49. for _, resconf := range config.Resolutions {
  50. resolution, err := util.NewResolution(resconf)
  51. if err != nil {
  52. log.Errorf("failed to create resolution %s", err.Error())
  53. continue
  54. }
  55. resolutions = append(resolutions, resolution)
  56. }
  57. repo := metric.NewMetricRepository(
  58. resolutions,
  59. NewOpenCostMetricStore,
  60. )
  61. var updater metric.Updater
  62. updater = repo
  63. if store != nil {
  64. wal, err := metric.NewWalinator(
  65. config.ClusterName,
  66. config.ApplicationName,
  67. store,
  68. resolutions,
  69. updater,
  70. )
  71. if err != nil {
  72. log.Errorf("failed to initialize the walinator: %s", err.Error())
  73. } else {
  74. wal.Start()
  75. updater = wal
  76. }
  77. }
  78. // synthesizer collects specific metric types and generates new metrics to pass
  79. // along with the original metrics into the updater
  80. metricSynthesizer := synthetic.NewMetricSynthesizers(
  81. updater,
  82. synthetic.NewContainerMemoryAllocationSynthesizer(),
  83. synthetic.NewContainerCpuAllocationSynthesizer(),
  84. )
  85. updater = metricSynthesizer
  86. diagnosticsModule := metric.NewDiagnosticsModule()
  87. scrapeController := scrape.NewScrapeController(
  88. config.ClusterUID,
  89. config.ScrapeInterval,
  90. config.NetworkPort,
  91. updater,
  92. clusterInfoProvider,
  93. clusterCache,
  94. statSummaryClient,
  95. )
  96. scrapeController.Start()
  97. metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
  98. // cluster info provider
  99. clusterInfo := clusterInfoProvider
  100. clusterMap := newCollectorClusterMap(clusterInfo)
  101. return &collectorDataSource{
  102. config: config,
  103. metricsQuerier: metricQuerier,
  104. clusterInfo: clusterInfo,
  105. clusterMap: clusterMap,
  106. diagnosticsModule: diagnosticsModule,
  107. }
  108. }
  109. func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
  110. }
  111. func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  112. const CollectorDiagnosticCategory = "collector"
  113. diagnosticDefinitions := c.diagnosticsModule.DiagnosticsDefinitions()
  114. for _, dd := range diagnosticDefinitions {
  115. err := diagService.Register(dd.MetricName, dd.Description, CollectorDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
  116. details, err := c.diagnosticsModule.DiagnosticsDetails(dd.ID)
  117. if err != nil {
  118. return nil, err
  119. }
  120. return details, nil
  121. })
  122. if err != nil {
  123. log.Warnf("Failed to register collector diagnostic %s: %s", dd.ID, err.Error())
  124. }
  125. }
  126. }
  127. func (c *collectorDataSource) Metrics() source.MetricsQuerier {
  128. return c.metricsQuerier
  129. }
  130. func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
  131. return c.clusterMap
  132. }
  133. func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  134. return c.clusterInfo
  135. }
  136. // BatchDuration collector data source queries do not need to be broken up
  137. func (c *collectorDataSource) BatchDuration() time.Duration {
  138. var maxDuration time.Duration = 1<<63 - 1
  139. return maxDuration
  140. }
  141. func (c *collectorDataSource) Resolution() time.Duration {
  142. interval, _ := util.NewInterval(c.config.ScrapeInterval)
  143. current := interval.Truncate(time.Now().UTC())
  144. next := interval.Add(current, 1)
  145. return next.Sub(current)
  146. }