controllers.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package exporter
  2. import (
  3. "time"
  4. export "github.com/opencost/opencost/core/pkg/exporter"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/opencost"
  7. "github.com/opencost/opencost/core/pkg/storage"
  8. "github.com/opencost/opencost/pkg/costmodel"
  9. "github.com/opencost/opencost/pkg/exporter/allocation"
  10. "github.com/opencost/opencost/pkg/exporter/asset"
  11. "github.com/opencost/opencost/pkg/exporter/networkinsight"
  12. )
  13. // PipelinesExportConfig is a configuration struct that contains the export resolutions for
  14. // allocation, assets, and network insights pipelines.
  15. type PipelinesExportConfig struct {
  16. AllocationPiplineResolutions []time.Duration
  17. AssetPipelineResolutons []time.Duration
  18. NetworkInsightPipelineResolutions []time.Duration
  19. }
  20. // defaultPipelineExportResolutions returns the default export configuration for the pipeline
  21. // which is set to export hourly and daily.
  22. func defaultPipelineExportResolutions() []time.Duration {
  23. return []time.Duration{
  24. time.Hour,
  25. 24 * time.Hour,
  26. }
  27. }
  28. // DefaultPipelinesExportConfig returns the default export configuration for all pipelines
  29. // which is set to export hourly and daily for allocations, assets, and network insights.
  30. func DefaultPipelinesExportConfig() *PipelinesExportConfig {
  31. return &PipelinesExportConfig{
  32. AllocationPiplineResolutions: defaultPipelineExportResolutions(),
  33. AssetPipelineResolutons: defaultPipelineExportResolutions(),
  34. NetworkInsightPipelineResolutions: defaultPipelineExportResolutions(),
  35. }
  36. }
  37. // PipelineExportControllers is a facade that contains the export controllers for allocations, assets, and network insights.
  38. type PipelineExportControllers struct {
  39. AllocationExportController *export.ComputeExportControllerGroup[opencost.AllocationSet]
  40. AssetExportController *export.ComputeExportControllerGroup[opencost.AssetSet]
  41. NetworkInsightExportController *export.ComputeExportControllerGroup[opencost.NetworkInsightSet]
  42. }
  43. // NewPipelineExportControllers creates a new PipelineExportControllers instance with the given cluster ID, storage implementation, cost model, and configuration.
  44. // Setting the config to nil will use the default hourly and daily export resolutions for each pipeline.
  45. func NewPipelineExportControllers(clusterId string, store storage.Storage, cm *costmodel.CostModel, config *PipelinesExportConfig) *PipelineExportControllers {
  46. if config == nil {
  47. config = DefaultPipelinesExportConfig()
  48. }
  49. mins := int(cm.DataSource.Resolution().Minutes())
  50. if mins <= 0 {
  51. mins = 1
  52. }
  53. // minimum source/query resolution
  54. sourceResolution := time.Duration(mins) * time.Minute
  55. // allocation sources and exporters
  56. allocSource := allocation.NewAllocationComputeSource(cm)
  57. allocExportControllers := []*export.ComputeExportController[opencost.AllocationSet]{}
  58. for _, res := range config.AllocationPiplineResolutions {
  59. if res < sourceResolution {
  60. log.Warnf("Configured allocation pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
  61. continue
  62. }
  63. allocExporter := NewAllocationStorageExporter(clusterId, res, store)
  64. allocController := export.NewComputeExportController(allocSource, allocExporter, sourceResolution)
  65. allocExportControllers = append(allocExportControllers, allocController)
  66. }
  67. // asset sources and exporters
  68. assetSource := asset.NewAssetsComputeSource(cm)
  69. assetExportControllers := []*export.ComputeExportController[opencost.AssetSet]{}
  70. for _, res := range config.AssetPipelineResolutons {
  71. if res < sourceResolution {
  72. log.Warnf("Configured asset pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
  73. continue
  74. }
  75. assetExporter := NewAssetsStorageExporter(clusterId, res, store)
  76. assetController := export.NewComputeExportController(assetSource, assetExporter, sourceResolution)
  77. assetExportControllers = append(assetExportControllers, assetController)
  78. }
  79. // network insights sources and exporters
  80. networkInsightSource := networkinsight.NewNetworkInsightsComputeSource(cm)
  81. networkInsightExportControllers := []*export.ComputeExportController[opencost.NetworkInsightSet]{}
  82. for _, res := range config.NetworkInsightPipelineResolutions {
  83. if res < sourceResolution {
  84. log.Warnf("Configured network insight pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
  85. continue
  86. }
  87. networkInsightExporter := NewNetworkInsightStorageExporter(clusterId, res, store)
  88. networkInsightController := export.NewComputeExportController(networkInsightSource, networkInsightExporter, sourceResolution)
  89. networkInsightExportControllers = append(networkInsightExportControllers, networkInsightController)
  90. }
  91. return &PipelineExportControllers{
  92. AllocationExportController: export.NewComputeExportControllerGroup(allocExportControllers...),
  93. AssetExportController: export.NewComputeExportControllerGroup(assetExportControllers...),
  94. NetworkInsightExportController: export.NewComputeExportControllerGroup(networkInsightExportControllers...),
  95. }
  96. }
  97. func (pec *PipelineExportControllers) Start(interval time.Duration) {
  98. pec.AllocationExportController.Start(interval)
  99. pec.AssetExportController.Start(interval)
  100. pec.NetworkInsightExportController.Start(interval)
  101. }
  102. func (pec *PipelineExportControllers) Stop() {
  103. pec.AllocationExportController.Stop()
  104. pec.AssetExportController.Stop()
  105. pec.NetworkInsightExportController.Stop()
  106. }