controllers.go 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package exporter
  2. import (
  3. "time"
  4. export "github.com/opencost/opencost/core/pkg/exporter"
  5. "github.com/opencost/opencost/core/pkg/opencost"
  6. "github.com/opencost/opencost/core/pkg/storage"
  7. "github.com/opencost/opencost/pkg/costmodel"
  8. "github.com/opencost/opencost/pkg/exporter/allocation"
  9. "github.com/opencost/opencost/pkg/exporter/asset"
  10. "github.com/opencost/opencost/pkg/exporter/networkinsight"
  11. )
  12. type PipelineExportControllers struct {
  13. AllocationExportController *export.ComputeExportControllerGroup[opencost.AllocationSet]
  14. AssetExportController *export.ComputeExportControllerGroup[opencost.AssetSet]
  15. NetworkInsightExportController *export.ComputeExportControllerGroup[opencost.NetworkInsightSet]
  16. }
  17. func NewPipelineExportControllers(clusterId string, store storage.Storage, cm *costmodel.CostModel) *PipelineExportControllers {
  18. mins := int(cm.DataSource.Resolution().Minutes())
  19. if mins <= 0 {
  20. mins = 1
  21. }
  22. // minimum source/query resolution
  23. sourceResolution := time.Duration(mins) * time.Minute
  24. // allocation sources and exporters
  25. allocSource := allocation.NewAllocationComputeSource(cm)
  26. hourlyAllocExporter := NewAllocationStorageExporter(clusterId, time.Hour, store)
  27. dailyAllocExporter := NewAllocationStorageExporter(clusterId, 24*time.Hour, store)
  28. hourlyAllocController := export.NewComputeExportController(allocSource, hourlyAllocExporter, sourceResolution)
  29. dailyAllocController := export.NewComputeExportController(allocSource, dailyAllocExporter, sourceResolution)
  30. allocController := export.NewComputeExportControllerGroup(hourlyAllocController, dailyAllocController)
  31. // asset sources and exporters
  32. assetSource := asset.NewAssetsComputeSource(cm)
  33. hourlyAssetExporter := NewAssetsStorageExporter(clusterId, time.Hour, store)
  34. dailyAssetExporter := NewAssetsStorageExporter(clusterId, 24*time.Hour, store)
  35. hourlyAssetController := export.NewComputeExportController(assetSource, hourlyAssetExporter, sourceResolution)
  36. dailyAssetController := export.NewComputeExportController(assetSource, dailyAssetExporter, sourceResolution)
  37. assetController := export.NewComputeExportControllerGroup(hourlyAssetController, dailyAssetController)
  38. // network insights sources and exporters
  39. networkInsightSource := networkinsight.NewNetworkInsightsComputeSource(cm)
  40. hourlyNetworkInsightExporter := NewNetworkInsightStorageExporter(clusterId, time.Hour, store)
  41. dailyNetworkInsightExporter := NewNetworkInsightStorageExporter(clusterId, 24*time.Hour, store)
  42. hourlyNetworkInsightController := export.NewComputeExportController(networkInsightSource, hourlyNetworkInsightExporter, sourceResolution)
  43. dailyNetworkInsightController := export.NewComputeExportController(networkInsightSource, dailyNetworkInsightExporter, sourceResolution)
  44. networkInsightController := export.NewComputeExportControllerGroup(hourlyNetworkInsightController, dailyNetworkInsightController)
  45. return &PipelineExportControllers{
  46. AllocationExportController: allocController,
  47. AssetExportController: assetController,
  48. NetworkInsightExportController: networkInsightController,
  49. }
  50. }
  51. func (pec *PipelineExportControllers) Start(interval time.Duration) {
  52. pec.AllocationExportController.Start(interval)
  53. pec.AssetExportController.Start(interval)
  54. pec.NetworkInsightExportController.Start(interval)
  55. }
  56. func (pec *PipelineExportControllers) Stop() {
  57. pec.AllocationExportController.Stop()
  58. pec.AssetExportController.Stop()
  59. pec.NetworkInsightExportController.Stop()
  60. }