exporter.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package exporter
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "time"
  7. coreenv "github.com/opencost/opencost/core/pkg/env"
  8. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  9. "github.com/opencost/opencost/core/pkg/exporter/validator"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/opencost"
  12. "github.com/opencost/opencost/core/pkg/storage"
  13. )
  14. // Exporter[T] is a generic interface for exporting T instances to a specific storage destination.
  15. type Exporter[TimeUnit any, T any] interface {
  16. // Export performs the export operation for the provided data.
  17. Export(time TimeUnit, data *T) error
  18. }
  19. // EventExporter[T] is an alias type of an Exporter[time.Time, T] that writes data that is timestamped.
  20. type EventExporter[T any] Exporter[time.Time, T]
  21. // ComputeExporter[T] is an alias type of an Exporter[opencost.Window, T] that writes data for a specific window.
  22. type ComputeExporter[T any] Exporter[opencost.Window, T]
  23. // EventStorageExporter[T] is an implementation of an Exporter[T] that writes data to a storage backend using
  24. // the `github.com/opencost/opencost/core/pkg/storage` package, a pathing strategy, and an encoder.
  25. type EventStorageExporter[T any] struct {
  26. paths pathing.StoragePathFormatter[time.Time]
  27. encoder Encoder[T]
  28. storage storage.Storage
  29. }
  30. // NewEventStorageExporter creates a new StorageExporter instance, which is responsible for exporting data to a storage backend.
  31. // It uses a pathing strategy to determine the storage location, an encoder to convert the data to binary format, and
  32. // a storage backend to write the data.
  33. func NewEventStorageExporter[T any](
  34. paths pathing.StoragePathFormatter[time.Time],
  35. encoder Encoder[T],
  36. storage storage.Storage,
  37. ) EventExporter[T] {
  38. return &EventStorageExporter[T]{
  39. paths: paths,
  40. encoder: encoder,
  41. storage: storage,
  42. }
  43. }
  44. // Export performs the export operation for the provided data. It encodes the data using the encoder and writes it to
  45. // the storage backend using the pathing strategy.
  46. func (se *EventStorageExporter[T]) Export(t time.Time, data *T) error {
  47. path := se.paths.ToFullPath("", t, se.encoder.FileExt())
  48. bin, err := se.encoder.Encode(data)
  49. if err != nil {
  50. return fmt.Errorf("failed to encode data: %w", err)
  51. }
  52. log.Debugf("writing new binary data to storage %s", path)
  53. err = se.storage.Write(path, bin)
  54. if err != nil {
  55. return fmt.Errorf("failed to write binary data to file '%s': %w", path, err)
  56. }
  57. return nil
  58. }
  59. // ComputeStorageExporter[T] is an implementation of ComputeExporter[T] that writes data to a storage backend using
  60. // `github.com/opencost/opencost/core/pkg/storage`, a pathing strategy, and an encoder.
  61. type ComputeStorageExporter[T any] struct {
  62. resolution time.Duration
  63. paths pathing.StoragePathFormatter[opencost.Window]
  64. encoder Encoder[T]
  65. storage storage.Storage
  66. validator validator.ExportValidator[T]
  67. }
  68. // NewComputeStorageExporter creates a new ComputeStorageExporter instance, which is responsible for exporting
  69. // data for a specific window to a storage backend. It uses a pathing strategy to determine the storage location,
  70. // an encoder to convert the data to binary format, and a validator to check the data before export. The pipeline
  71. // name and resolution are also provided to help identify the data being exported.
  72. func NewComputeStorageExporter[T any](
  73. paths pathing.StoragePathFormatter[opencost.Window],
  74. encoder Encoder[T],
  75. storage storage.Storage,
  76. validator validator.ExportValidator[T],
  77. ) ComputeExporter[T] {
  78. return &ComputeStorageExporter[T]{
  79. paths: paths,
  80. encoder: encoder,
  81. storage: storage,
  82. validator: validator,
  83. }
  84. }
  85. // Export performs validation on the provided window and data, determines if it should overwrite existing data,
  86. // and stores the data in the location specified by the pathing formatter.
  87. func (se *ComputeStorageExporter[T]) Export(window opencost.Window, data *T) error {
  88. if se.validator != nil {
  89. err := se.validator.Validate(window, data)
  90. if err != nil {
  91. return fmt.Errorf("failed to validate data: %w", err)
  92. }
  93. }
  94. path := se.paths.ToFullPath("", window, se.encoder.FileExt())
  95. currentExists, err := se.storage.Exists(path)
  96. if err != nil {
  97. return fmt.Errorf("unable to check for existing data from storage path: %w", err)
  98. }
  99. if currentExists && se.validator != nil && !se.validator.IsOverwrite(data) {
  100. log.Debugf("retaining existing data in storage at path: %s", path)
  101. return nil
  102. }
  103. bin, err := se.encoder.Encode(data)
  104. if err != nil {
  105. return fmt.Errorf("failed to encode data: %w", err)
  106. }
  107. compressionEnabled := coreenv.IsCompressionEnabled()
  108. if compressionEnabled {
  109. log.Debugf("compressing data with gzip, compression enabled")
  110. log.Debugf("original data size: %d bytes", len(bin))
  111. var buf bytes.Buffer
  112. gzWriter := gzip.NewWriter(&buf)
  113. defer gzWriter.Close()
  114. _, err := gzWriter.Write(bin)
  115. if err != nil {
  116. return fmt.Errorf("failed to write compressed data: %w", err)
  117. }
  118. bin = buf.Bytes()
  119. log.Debugf("compressed data size: %d bytes", len(bin))
  120. }
  121. log.Debugf("writing new binary data to storage %s", path)
  122. err = se.storage.Write(path, bin)
  123. if err != nil {
  124. return fmt.Errorf("failed to write binary data to file '%s': %w", path, err)
  125. }
  126. return nil
  127. }