exporters.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package exporter
  2. import (
  3. "fmt"
  4. "time"
  5. export "github.com/opencost/opencost/core/pkg/exporter"
  6. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  7. "github.com/opencost/opencost/core/pkg/exporter/validator"
  8. "github.com/opencost/opencost/core/pkg/pipelines"
  9. "github.com/opencost/opencost/core/pkg/storage"
  10. "github.com/opencost/opencost/core/pkg/util/timeutil"
  11. "github.com/opencost/opencost/core/pkg/util/typeutil"
  12. )
  13. // NewComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
  14. // by window for a specific pipeline.
  15. func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
  16. clusterName string,
  17. resolution time.Duration,
  18. store storage.Storage,
  19. ) (export.ComputeExporter[T], error) {
  20. pipelineName := pipelines.NameFor[T]()
  21. if pipelineName == "" {
  22. return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
  23. }
  24. pathing, err := pathing.NewDefaultStoragePathFormatter(clusterName, pipelineName, &resolution)
  25. if err != nil {
  26. return nil, fmt.Errorf("failed to create path formatter: %w", err)
  27. }
  28. var encoder export.Encoder[T]
  29. encoder = export.NewBingenEncoder[T, U]()
  30. return export.NewComputeStorageExporter(
  31. pathing,
  32. encoder,
  33. store,
  34. validator.NewSetValidator[T, S](resolution),
  35. ), nil
  36. }
  37. // NewComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to export computed data
  38. // using the provided source, storage, resolution, and source resolution.
  39. func NewComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
  40. clusterName string,
  41. store storage.Storage,
  42. source export.ComputeSource[T],
  43. resolution time.Duration,
  44. ) (*export.ComputeExportController[T], error) {
  45. exporter, err := NewComputePipelineExporter[T, U, S](clusterName, resolution, store)
  46. if err != nil {
  47. return nil, fmt.Errorf("failed to create compute exporter: %w", err)
  48. }
  49. return export.NewComputeExportController(source, exporter, resolution), nil
  50. }
  51. // NewKubeModelComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
  52. // by window for a specific pipeline.
  53. func NewKubeModelComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
  54. appName string,
  55. clusterId string,
  56. resolution time.Duration,
  57. store storage.Storage,
  58. ) (export.ComputeExporter[T], error) {
  59. pipelineName := pipelines.NameFor[T]()
  60. if pipelineName == "" {
  61. return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
  62. }
  63. res := timeutil.FormatStoreResolution(resolution)
  64. pathing, err := pathing.NewKubeModelStoragePathFormatter(appName, clusterId, res)
  65. if err != nil {
  66. return nil, fmt.Errorf("failed to create path formatter: %w", err)
  67. }
  68. encoder := export.NewBingenFileEncoder[T, U]()
  69. return export.NewComputeStorageExporter(
  70. pathing,
  71. encoder,
  72. store,
  73. validator.NewSetValidator[T, S](resolution),
  74. ), nil
  75. }
  76. // NewKubeModelComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to export computed data
  77. // using the provided source, storage, resolution, and source resolution.
  78. func NewKubeModelComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
  79. appName string,
  80. clusterId string,
  81. store storage.Storage,
  82. source export.ComputeSource[T],
  83. resolution time.Duration,
  84. ) (*export.ComputeExportController[T], error) {
  85. exporter, err := NewKubeModelComputePipelineExporter[T, U, S](appName, clusterId, resolution, store)
  86. if err != nil {
  87. return nil, fmt.Errorf("failed to create compute exporter: %w", err)
  88. }
  89. return export.NewComputeExportController(source, exporter, resolution), nil
  90. }