exporter.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package exporter
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  6. "github.com/opencost/opencost/core/pkg/exporter/validator"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/opencost"
  9. "github.com/opencost/opencost/core/pkg/storage"
  10. )
  11. // Exporter[T] is a generic interface for exporting T instances to a specific storage destination.
  12. type Exporter[TimeUnit any, T any] interface {
  13. // Export performs the export operation for the provided data.
  14. Export(time TimeUnit, data *T) error
  15. }
  16. // EventExporter[T] is an alias type of an Exporter[time.Time, T] that writes data that is timestamped.
  17. type EventExporter[T any] Exporter[time.Time, T]
  18. // ComputeExporter[T] is an alias type of an Exporter[opencost.Window, T] that writes data for a specific window.
  19. type ComputeExporter[T any] Exporter[opencost.Window, T]
  20. // EventStorageExporter[T] is an implementation of an Exporter[T] that writes data to a storage backend using
  21. // the `github.com/opencost/opencost/core/pkg/storage` package, a pathing strategy, and an encoder.
  22. type EventStorageExporter[T any] struct {
  23. paths pathing.StoragePathFormatter[time.Time]
  24. encoder Encoder[T]
  25. storage storage.Storage
  26. }
  27. // NewEventStorageExporter creates a new StorageExporter instance, which is responsible for exporting data to a storage backend.
  28. // It uses a pathing strategy to determine the storage location, an encoder to convert the data to binary format, and
  29. // a storage backend to write the data.
  30. func NewEventStorageExporter[T any](
  31. paths pathing.StoragePathFormatter[time.Time],
  32. encoder Encoder[T],
  33. storage storage.Storage,
  34. ) EventExporter[T] {
  35. return &EventStorageExporter[T]{
  36. paths: paths,
  37. encoder: encoder,
  38. storage: storage,
  39. }
  40. }
  41. // Export performs the export operation for the provided data. It encodes the data using the encoder and writes it to
  42. // the storage backend using the pathing strategy.
  43. func (se *EventStorageExporter[T]) Export(t time.Time, data *T) error {
  44. path := se.paths.ToFullPath("", t, se.encoder.FileExt())
  45. bin, err := se.encoder.Encode(data)
  46. if err != nil {
  47. return fmt.Errorf("failed to encode data: %w", err)
  48. }
  49. log.Debugf("writing new binary data to storage %s", path)
  50. err = se.storage.Write(path, bin)
  51. if err != nil {
  52. return fmt.Errorf("failed to write binary data to file '%s': %w", path, err)
  53. }
  54. return nil
  55. }
  56. // ComputeStorageExporter[T] is an implementation of ComputeExporter[T] that writes data to a storage backend using
  57. // `github.com/opencost/opencost/core/pkg/storage`, a pathing strategy, and an encoder.
  58. type ComputeStorageExporter[T any] struct {
  59. resolution time.Duration
  60. paths pathing.StoragePathFormatter[opencost.Window]
  61. encoder Encoder[T]
  62. storage storage.Storage
  63. validator validator.ExportValidator[T]
  64. }
  65. // NewComputeStorageExporter creates a new ComputeStorageExporter instance, which is responsible for exporting
  66. // data for a specific window to a storage backend. It uses a pathing strategy to determine the storage location,
  67. // an encoder to convert the data to binary format, and a validator to check the data before export. The pipeline
  68. // name and resolution are also provided to help identify the data being exported.
  69. func NewComputeStorageExporter[T any](
  70. paths pathing.StoragePathFormatter[opencost.Window],
  71. encoder Encoder[T],
  72. storage storage.Storage,
  73. validator validator.ExportValidator[T],
  74. ) ComputeExporter[T] {
  75. return &ComputeStorageExporter[T]{
  76. paths: paths,
  77. encoder: encoder,
  78. storage: storage,
  79. validator: validator,
  80. }
  81. }
  82. // Export performs validation on the provided window and data, determines if it should overwrite existing data,
  83. // and stores the data in the location specified by the pathing formatter.
  84. func (se *ComputeStorageExporter[T]) Export(window opencost.Window, data *T) error {
  85. if se.validator != nil {
  86. err := se.validator.Validate(window, data)
  87. if err != nil {
  88. return fmt.Errorf("failed to validate data: %w", err)
  89. }
  90. }
  91. path := se.paths.ToFullPath("", window, se.encoder.FileExt())
  92. currentExists, err := se.storage.Exists(path)
  93. if err != nil {
  94. return fmt.Errorf("unable to check for existing data from storage path: %w", err)
  95. }
  96. if currentExists && se.validator != nil && !se.validator.IsOverwrite(data) {
  97. log.Debugf("retaining existing data in storage at path: %s", path)
  98. return nil
  99. }
  100. bin, err := se.encoder.Encode(data)
  101. if err != nil {
  102. return fmt.Errorf("failed to encode data: %w", err)
  103. }
  104. log.Debugf("writing new binary data to storage %s", path)
  105. err = se.storage.Write(path, bin)
  106. if err != nil {
  107. return fmt.Errorf("failed to write binary data to file '%s': %w", path, err)
  108. }
  109. return nil
  110. }