2
0

datasource.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package collector
  2. import (
  3. "context"
  4. "time"
  5. "github.com/julienschmidt/httprouter"
  6. "github.com/opencost/opencost/core/pkg/clustercache"
  7. "github.com/opencost/opencost/core/pkg/clusters"
  8. "github.com/opencost/opencost/core/pkg/diagnostics"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/nodestats"
  11. "github.com/opencost/opencost/core/pkg/source"
  12. "github.com/opencost/opencost/core/pkg/storage"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  14. "github.com/opencost/opencost/modules/collector-source/pkg/metric/synthetic"
  15. "github.com/opencost/opencost/modules/collector-source/pkg/scrape"
  16. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  17. )
  18. type collectorDataSource struct {
  19. metricsQuerier *collectorMetricsQuerier
  20. clusterMap clusters.ClusterMap
  21. clusterInfo clusters.ClusterInfoProvider
  22. config CollectorConfig
  23. diagnosticsModule *metric.DiagnosticsModule
  24. }
  25. func NewDefaultCollectorDataSource(
  26. clusterUID string,
  27. store storage.Storage,
  28. clusterInfoProvider clusters.ClusterInfoProvider,
  29. clusterCache clustercache.ClusterCache,
  30. statSummaryClient nodestats.StatSummaryClient,
  31. ) source.OpenCostDataSource {
  32. config := NewOpenCostCollectorConfigFromEnv(clusterUID)
  33. return NewCollectorDataSource(
  34. config,
  35. store,
  36. clusterInfoProvider,
  37. clusterCache,
  38. statSummaryClient,
  39. )
  40. }
  41. func NewCollectorDataSource(
  42. config CollectorConfig,
  43. store storage.Storage,
  44. clusterInfoProvider clusters.ClusterInfoProvider,
  45. clusterCache clustercache.ClusterCache,
  46. statSummaryClient nodestats.StatSummaryClient,
  47. ) source.OpenCostDataSource {
  48. var resolutions []*util.Resolution
  49. for _, resconf := range config.Resolutions {
  50. resolution, err := util.NewResolution(resconf)
  51. if err != nil {
  52. log.Errorf("failed to create resolution %s", err.Error())
  53. continue
  54. }
  55. resolutions = append(resolutions, resolution)
  56. }
  57. repo := metric.NewMetricRepository(
  58. resolutions,
  59. NewOpenCostMetricStore,
  60. )
  61. var updater metric.Updater
  62. updater = repo
  63. if store != nil {
  64. wal, err := metric.NewWalinator(
  65. config.ClusterName,
  66. config.ApplicationName,
  67. store,
  68. resolutions,
  69. updater,
  70. )
  71. if err != nil {
  72. log.Errorf("failed to initialize the walinator: %s", err.Error())
  73. } else {
  74. wal.Start()
  75. updater = wal
  76. }
  77. }
  78. // synthesizer collects specific metric types and generates new metrics to pass
  79. // along with the original metrics into the updater
  80. metricSynthesizer := synthetic.NewMetricSynthesizers(
  81. updater,
  82. synthetic.NewContainerMemoryAllocationSynthesizer(),
  83. synthetic.NewContainerCpuAllocationSynthesizer(),
  84. synthetic.NewGPUMemoryUsedRatioSynthesizer(),
  85. )
  86. updater = metricSynthesizer
  87. diagnosticsModule := metric.NewDiagnosticsModule()
  88. scrapeController := scrape.NewScrapeController(
  89. config.ClusterUID,
  90. config.ScrapeInterval,
  91. config.NetworkPort,
  92. updater,
  93. clusterInfoProvider,
  94. clusterCache,
  95. statSummaryClient,
  96. )
  97. scrapeController.Start()
  98. metricQuerier := newCollectorMetricsQuerier(repo, config.Resolutions)
  99. // cluster info provider
  100. clusterInfo := clusterInfoProvider
  101. clusterMap := newCollectorClusterMap(clusterInfo)
  102. return &collectorDataSource{
  103. config: config,
  104. metricsQuerier: metricQuerier,
  105. clusterInfo: clusterInfo,
  106. clusterMap: clusterMap,
  107. diagnosticsModule: diagnosticsModule,
  108. }
  109. }
  110. func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
  111. }
  112. func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
  113. const CollectorDiagnosticCategory = "collector"
  114. diagnosticDefinitions := c.diagnosticsModule.DiagnosticsDefinitions()
  115. for _, dd := range diagnosticDefinitions {
  116. err := diagService.Register(dd.MetricName, dd.Description, CollectorDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
  117. details, err := c.diagnosticsModule.DiagnosticsDetails(dd.ID)
  118. if err != nil {
  119. return nil, err
  120. }
  121. return details, nil
  122. })
  123. if err != nil {
  124. log.Warnf("Failed to register collector diagnostic %s: %s", dd.ID, err.Error())
  125. }
  126. }
  127. }
  128. func (c *collectorDataSource) Metrics() source.MetricsQuerier {
  129. return c.metricsQuerier
  130. }
  131. func (c *collectorDataSource) ClusterMap() clusters.ClusterMap {
  132. return c.clusterMap
  133. }
  134. func (c *collectorDataSource) ClusterInfo() clusters.ClusterInfoProvider {
  135. return c.clusterInfo
  136. }
  137. // BatchDuration collector data source queries do not need to be broken up
  138. func (c *collectorDataSource) BatchDuration() time.Duration {
  139. var maxDuration time.Duration = 1<<63 - 1
  140. return maxDuration
  141. }
  142. func (c *collectorDataSource) Resolution() time.Duration {
  143. interval, _ := util.NewInterval(c.config.ScrapeInterval)
  144. current := interval.Truncate(time.Now().UTC())
  145. next := interval.Add(current, 1)
  146. return next.Sub(current)
  147. }