controllers.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package exporter
  2. import (
  3. "time"
  4. export "github.com/opencost/opencost/core/pkg/exporter"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/model/kubemodel"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/opencost/exporter/allocation"
  9. "github.com/opencost/opencost/core/pkg/opencost/exporter/asset"
  10. km "github.com/opencost/opencost/core/pkg/opencost/exporter/kubemodel"
  11. "github.com/opencost/opencost/core/pkg/opencost/exporter/networkinsight"
  12. "github.com/opencost/opencost/core/pkg/source"
  13. "github.com/opencost/opencost/core/pkg/storage"
  14. "github.com/opencost/opencost/core/pkg/util/timeutil"
  15. )
  16. // ComputePipelineSource is an interface that defines methods for computing all pipeline data.
  17. // For all intents and purposes, this represents costmodel.CostModel. To interface allows tests to
  18. // mock the costmodel.CostModel and return a different source for the pipeline.
  19. type ComputePipelineSource interface {
  20. allocation.AllocationSource
  21. asset.AssetSource
  22. networkinsight.NetworkInsightSource
  23. km.KubeModelSource
  24. GetDataSource() source.OpenCostDataSource
  25. }
  26. // PipelinesExportConfig is a configuration struct that contains the export resolutions for
  27. // allocation, assets, and network insights pipelines.
  28. type PipelinesExportConfig struct {
  29. AppName string
  30. ClusterUID string
  31. ClusterName string
  32. AllocationPiplineResolutions []time.Duration
  33. AssetPipelineResolutons []time.Duration
  34. NetworkInsightPipelineResolutions []time.Duration
  35. KubeModelPipelineResolutions []time.Duration
  36. Streaming bool
  37. Compression ExportCompressionLevel
  38. }
  39. // defaultPipelineExportResolutions returns the default export configuration for the pipeline
  40. // which is set to export hourly and daily.
  41. func defaultPipelineExportResolutions() []time.Duration {
  42. return []time.Duration{
  43. time.Hour,
  44. 24 * time.Hour,
  45. }
  46. }
  47. // NewPipelinesExportConfig returns the default export configuration for all pipelines
  48. // which is set to export hourly and daily for allocations, assets, and network insights.
  49. func NewPipelinesExportConfig(appName, clusterUID, clusterName string) PipelinesExportConfig {
  50. return PipelinesExportConfig{
  51. AppName: appName,
  52. ClusterUID: clusterUID,
  53. ClusterName: clusterName,
  54. AllocationPiplineResolutions: defaultPipelineExportResolutions(),
  55. AssetPipelineResolutons: defaultPipelineExportResolutions(),
  56. NetworkInsightPipelineResolutions: defaultPipelineExportResolutions(),
  57. KubeModelPipelineResolutions: defaultPipelineExportResolutions(),
  58. Streaming: false,
  59. Compression: ExportCompressionLevelNone,
  60. }
  61. }
  62. // PipelineExportControllers is a facade that contains the export controllers for allocations, assets, and network insights.
  63. type PipelineExportControllers struct {
  64. AllocationExportController *export.ComputeExportControllerGroup[opencost.AllocationSet]
  65. AssetExportController *export.ComputeExportControllerGroup[opencost.AssetSet]
  66. NetworkInsightExportController *export.ComputeExportControllerGroup[opencost.NetworkInsightSet]
  67. KubeModelExportController *export.ComputeExportControllerGroup[kubemodel.KubeModelSet]
  68. }
  69. // NewPipelineExportControllers creates a new PipelineExportControllers instance with the given cluster ID, storage implementation, cost model, and configuration.
  70. // Setting the config to nil will use the default hourly and daily export resolutions for each pipeline.
  71. func NewPipelineExportControllers(store storage.Storage, cm ComputePipelineSource, config PipelinesExportConfig) *PipelineExportControllers {
  72. mins := int(cm.GetDataSource().Resolution().Minutes())
  73. if mins <= 0 {
  74. mins = 1
  75. }
  76. // minimum source/query resolution
  77. sourceResolution := time.Duration(mins) * time.Minute
  78. // allocation sources and exporters
  79. allocSource := allocation.NewAllocationComputeSource(cm)
  80. allocExportControllers := []*export.ComputeExportController[opencost.AllocationSet]{}
  81. for _, res := range config.AllocationPiplineResolutions {
  82. if res < sourceResolution {
  83. log.Warnf("Configured allocation pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
  84. continue
  85. }
  86. // Use ClusterName for "clusterId" here to maintain legacy pattern
  87. var allocController *export.ComputeExportController[opencost.AllocationSet]
  88. var err error
  89. if config.Streaming {
  90. allocController, err = NewStreamingComputePipelineExportController(config.ClusterName, store, allocSource, res, config.Compression)
  91. } else {
  92. allocController, err = NewComputePipelineExportController(config.ClusterName, store, allocSource, res)
  93. }
  94. if err != nil {
  95. log.Errorf("Failed to create allocation export controller for resolution: %s - %v", timeutil.DurationString(res), err)
  96. continue
  97. }
  98. allocExportControllers = append(allocExportControllers, allocController)
  99. }
  100. // asset sources and exporters
  101. assetSource := asset.NewAssetsComputeSource(cm)
  102. assetExportControllers := []*export.ComputeExportController[opencost.AssetSet]{}
  103. for _, res := range config.AssetPipelineResolutons {
  104. if res < sourceResolution {
  105. log.Warnf("Configured asset pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
  106. continue
  107. }
  108. // Use ClusterName for "clusterId" here to maintain legacy pattern
  109. var assetController *export.ComputeExportController[opencost.AssetSet]
  110. var err error
  111. if config.Streaming {
  112. assetController, err = NewStreamingComputePipelineExportController(config.ClusterName, store, assetSource, res, config.Compression)
  113. } else {
  114. assetController, err = NewComputePipelineExportController(config.ClusterName, store, assetSource, res)
  115. }
  116. if err != nil {
  117. log.Errorf("Failed to create asset export controller for resolution: %s - %v", timeutil.DurationString(res), err)
  118. continue
  119. }
  120. assetExportControllers = append(assetExportControllers, assetController)
  121. }
  122. // network insights sources and exporters
  123. networkInsightSource := networkinsight.NewNetworkInsightsComputeSource(cm)
  124. networkInsightExportControllers := []*export.ComputeExportController[opencost.NetworkInsightSet]{}
  125. for _, res := range config.NetworkInsightPipelineResolutions {
  126. if res < sourceResolution {
  127. log.Warnf("Configured network insight pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
  128. continue
  129. }
  130. // Use ClusterName for "clusterId" here to maintain legacy pattern
  131. var networkInsightController *export.ComputeExportController[opencost.NetworkInsightSet]
  132. var err error
  133. if config.Streaming {
  134. networkInsightController, err = NewStreamingComputePipelineExportController(config.ClusterName, store, networkInsightSource, res, config.Compression)
  135. } else {
  136. networkInsightController, err = NewComputePipelineExportController(config.ClusterName, store, networkInsightSource, res)
  137. }
  138. if err != nil {
  139. log.Errorf("Failed to create network insight export controller for resolution: %s - %v", timeutil.DurationString(res), err)
  140. continue
  141. }
  142. networkInsightExportControllers = append(networkInsightExportControllers, networkInsightController)
  143. }
  144. // KubeModel sources and exporters
  145. kubeModelSource := km.NewKubeModelComputeSource(cm)
  146. kubeModelExportControllers := []*export.ComputeExportController[kubemodel.KubeModelSet]{}
  147. for _, res := range config.KubeModelPipelineResolutions {
  148. if res < sourceResolution {
  149. log.Warnf("Configured KubeModel pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
  150. continue
  151. }
  152. var kubeModelController *export.ComputeExportController[kubemodel.KubeModelSet]
  153. var err error
  154. if config.Streaming {
  155. kubeModelController, err = NewStreamingKubeModelComputePipelineExportController(config.AppName, config.ClusterUID, store, kubeModelSource, res, config.Compression)
  156. } else {
  157. kubeModelController, err = NewKubeModelComputePipelineExportController(config.AppName, config.ClusterUID, store, kubeModelSource, res)
  158. }
  159. if err != nil {
  160. log.Errorf("Failed to create KubeModel export controller for resolution: %s - %v", timeutil.DurationString(res), err)
  161. continue
  162. }
  163. kubeModelExportControllers = append(kubeModelExportControllers, kubeModelController)
  164. }
  165. return &PipelineExportControllers{
  166. AllocationExportController: export.NewComputeExportControllerGroup(allocExportControllers...),
  167. AssetExportController: export.NewComputeExportControllerGroup(assetExportControllers...),
  168. NetworkInsightExportController: export.NewComputeExportControllerGroup(networkInsightExportControllers...),
  169. KubeModelExportController: export.NewComputeExportControllerGroup(kubeModelExportControllers...),
  170. }
  171. }
  172. func (pec *PipelineExportControllers) Start(interval time.Duration) {
  173. pec.AllocationExportController.Start(interval)
  174. pec.AssetExportController.Start(interval)
  175. pec.NetworkInsightExportController.Start(interval)
  176. pec.KubeModelExportController.Start(interval)
  177. }
  178. func (pec *PipelineExportControllers) Stop() {
  179. pec.AllocationExportController.Stop()
  180. pec.AssetExportController.Stop()
  181. pec.NetworkInsightExportController.Stop()
  182. pec.KubeModelExportController.Stop()
  183. }