exporter.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package exporter
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  6. "github.com/opencost/opencost/core/pkg/exporter/validator"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/opencost"
  9. "github.com/opencost/opencost/core/pkg/storage"
  10. )
  11. // Exporter[T] is a generic interface for exporting T instances to a specific storage destination.
  12. type Exporter[TimeUnit any, T any] interface {
  13. // Export performs the export operation for the provided data.
  14. Export(time TimeUnit, data *T) error
  15. }
  16. // EventExporter[T] is an alias type of an Exporter[time.Time, T] that writes data that is timestamped.
  17. type EventExporter[T any] Exporter[time.Time, T]
  18. // ComputeExporter[T] is an alias type of an Exporter[opencost.Window, T] that writes data for a specific window.
  19. type ComputeExporter[T any] Exporter[opencost.Window, T]
  20. // EventStorageExporter[T] is an implementation of an Exporter[T] that writes data to a storage backend using
  21. // the `github.com/opencost/opencost/core/pkg/storage` package, a pathing strategy, and an encoder.
  22. type EventStorageExporter[T any] struct {
  23. paths pathing.StoragePathFormatter[time.Time]
  24. encoder Encoder[T]
  25. storage storage.Storage
  26. }
  27. // NewEventStorageExporter creates a new StorageExporter instance, which is responsible for exporting data to a storage backend.
  28. // It uses a pathing strategy to determine the storage location, an encoder to convert the data to binary format, and
  29. // a storage backend to write the data.
  30. func NewEventStorageExporter[T any](
  31. paths pathing.StoragePathFormatter[time.Time],
  32. encoder Encoder[T],
  33. storage storage.Storage,
  34. ) EventExporter[T] {
  35. return &EventStorageExporter[T]{
  36. paths: paths,
  37. encoder: encoder,
  38. storage: storage,
  39. }
  40. }
  41. // Export performs the export operation for the provided data. It encodes the data using the encoder and writes it to
  42. // the storage backend using the pathing strategy.
  43. func (se *EventStorageExporter[T]) Export(t time.Time, data *T) error {
  44. path := se.paths.ToFullPath("", t, se.encoder.FileExt())
  45. bin, err := se.encoder.Encode(data)
  46. if err != nil {
  47. return fmt.Errorf("failed to encode data: %w", err)
  48. }
  49. log.Debugf("writing new binary data to storage %s", path)
  50. err = se.storage.Write(path, bin)
  51. if err != nil {
  52. return fmt.Errorf("failed to write binary data to file '%s': %w", path, err)
  53. }
  54. return nil
  55. }
  56. // ComputeStorageExporter[T] is an implementation of ComputeExporter[T] that writes data to a storage backend using
  57. // `github.com/opencost/opencost/core/pkg/storage`, a pathing strategy, and an encoder.
  58. type ComputeStorageExporter[T any] struct {
  59. resolution time.Duration
  60. paths pathing.StoragePathFormatter[opencost.Window]
  61. encoder Encoder[T]
  62. storage storage.Storage
  63. validator validator.ExportValidator[T]
  64. streaming bool
  65. }
  66. // NewComputeStorageExporter creates a new ComputeStorageExporter instance, which is responsible for exporting
  67. // data for a specific window to a storage backend. It uses a pathing strategy to determine the storage location,
  68. // an encoder to convert the data to binary format, and a validator to check the data before export. The pipeline
  69. // name and resolution are also provided to help identify the data being exported.
  70. func NewComputeStorageExporter[T any](
  71. paths pathing.StoragePathFormatter[opencost.Window],
  72. encoder Encoder[T],
  73. storage storage.Storage,
  74. validator validator.ExportValidator[T],
  75. streaming bool,
  76. ) ComputeExporter[T] {
  77. return &ComputeStorageExporter[T]{
  78. paths: paths,
  79. encoder: encoder,
  80. storage: storage,
  81. validator: validator,
  82. streaming: streaming,
  83. }
  84. }
  85. // Export performs validation on the provided window and data, determines if it should overwrite existing data,
  86. // and stores the data in the location specified by the pathing formatter.
  87. func (se *ComputeStorageExporter[T]) Export(window opencost.Window, data *T) error {
  88. if se.validator != nil {
  89. err := se.validator.Validate(window, data)
  90. if err != nil {
  91. return fmt.Errorf("failed to validate data: %w", err)
  92. }
  93. }
  94. path := se.paths.ToFullPath("", window, se.encoder.FileExt())
  95. currentExists, err := se.storage.Exists(path)
  96. if err != nil {
  97. return fmt.Errorf("unable to check for existing data from storage path: %w", err)
  98. }
  99. if currentExists && se.validator != nil && !se.validator.IsOverwrite(data) {
  100. log.Debugf("retaining existing data in storage at path: %s", path)
  101. return nil
  102. }
  103. // stream the data structure to the storage path if we select streaming
  104. if se.streaming {
  105. return se.streamingUpload(path, data)
  106. }
  107. // otherwise, just encode and write the encoded result directly
  108. return se.encodeAndUpload(path, data)
  109. }
  110. func (se *ComputeStorageExporter[T]) encodeAndUpload(path string, data *T) error {
  111. bin, err := se.encoder.Encode(data)
  112. if err != nil {
  113. return fmt.Errorf("failed to encode data: %w", err)
  114. }
  115. log.Debugf("writing new binary data to storage %s", path)
  116. err = se.storage.Write(path, bin)
  117. if err != nil {
  118. return fmt.Errorf("failed to write binary data to file '%s': %w", path, err)
  119. }
  120. return nil
  121. }
  122. func (se *ComputeStorageExporter[T]) streamingUpload(path string, data *T) error {
  123. writer, err := se.storage.WriteStream(path)
  124. if err != nil {
  125. return fmt.Errorf("failed to create streaming storage writer: %w", err)
  126. }
  127. if err = se.encoder.EncodeTo(writer, data); err != nil {
  128. _ = writer.Close()
  129. return fmt.Errorf("failed to stream encoding for exporter: %w", err)
  130. }
  131. if err = writer.Close(); err != nil {
  132. return fmt.Errorf("failed to flush and close writer after write: %w", err)
  133. }
  134. return nil
  135. }