exporter_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package exporter
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "testing"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  8. "github.com/opencost/opencost/core/pkg/exporter/validator"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "github.com/opencost/opencost/core/pkg/pipelines"
  11. "github.com/opencost/opencost/core/pkg/storage"
  12. "github.com/opencost/opencost/core/pkg/util/json"
  13. )
  14. const (
  15. TestClusterId = "test-cluster"
  16. TestEventName = "test-event-path"
  17. )
  18. type TestData struct {
  19. Message string `json:"message"`
  20. }
  21. func TestStorageExporters(t *testing.T) {
  22. t.Run("test event storage exporter", func(t *testing.T) {
  23. store := storage.NewMemoryStorage()
  24. p, err := pathing.NewEventStoragePathFormatter("root", TestClusterId, TestEventName)
  25. if err != nil {
  26. t.Fatalf("failed to create path formatter: %v", err)
  27. }
  28. encoder := NewJSONEncoder[TestData]()
  29. export := NewEventStorageExporter(p, encoder, store)
  30. ts := time.Now().UTC().Truncate(time.Minute)
  31. export.Export(ts, &TestData{
  32. Message: "TestMessage-1",
  33. })
  34. expectedPath := p.ToFullPath("", ts, "json")
  35. t.Logf("expected path: %s", expectedPath)
  36. data, err := store.Read(expectedPath)
  37. if err != nil {
  38. t.Fatalf("failed to read data from store: %v", err)
  39. }
  40. if len(data) == 0 {
  41. t.Fatalf("expected data to be non-empty, got empty")
  42. }
  43. t.Logf("Data: %s", string(data))
  44. var td *TestData = new(TestData)
  45. if err := json.Unmarshal(data, td); err != nil {
  46. t.Fatalf("failed to unmarshal data: %v", err)
  47. }
  48. if td.Message != "TestMessage-1" {
  49. t.Fatalf("expected message to be 'TestMessage-1', got '%s'", td.Message)
  50. }
  51. })
  52. t.Run("test compute storage exporter", func(t *testing.T) {
  53. res := 24 * time.Hour
  54. store := storage.NewMemoryStorage()
  55. p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, &res)
  56. if err != nil {
  57. t.Fatalf("failed to create path formatter: %v", err)
  58. }
  59. encoder := NewBingenEncoder[opencost.AllocationSet]()
  60. export := NewComputeStorageExporter(
  61. p,
  62. encoder,
  63. store,
  64. validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
  65. false,
  66. )
  67. start := time.Now().UTC().Truncate(res)
  68. end := start.Add(res)
  69. toExport := opencost.GenerateMockAllocationSet(start)
  70. err = export.Export(opencost.NewClosedWindow(start, end), toExport)
  71. if err != nil {
  72. t.Fatalf("failed to export data: %v", err)
  73. }
  74. expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "")
  75. data, err := store.Read(expectedPath)
  76. if err != nil {
  77. t.Fatalf("failed to read data from store: %v", err)
  78. }
  79. if len(data) == 0 {
  80. t.Fatalf("expected data to be non-empty, got empty")
  81. }
  82. var as *opencost.AllocationSet = new(opencost.AllocationSet)
  83. err = as.UnmarshalBinary(data)
  84. if err != nil {
  85. t.Fatalf("failed to unmarshal data: %v", err)
  86. }
  87. if as.IsEmpty() {
  88. t.Fatalf("expected allocation set to be non-empty, got empty")
  89. }
  90. })
  91. t.Run("test streaming compute storage exporter", func(t *testing.T) {
  92. res := 24 * time.Hour
  93. store := storage.NewMemoryStorage()
  94. p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, &res)
  95. if err != nil {
  96. t.Fatalf("failed to create path formatter: %v", err)
  97. }
  98. encoder := NewBingenEncoder[opencost.AllocationSet]()
  99. export := NewComputeStorageExporter[opencost.AllocationSet](
  100. p,
  101. encoder,
  102. store,
  103. validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
  104. true,
  105. )
  106. start := time.Now().UTC().Truncate(res)
  107. end := start.Add(res)
  108. toExport := opencost.GenerateMockAllocationSet(start)
  109. err = export.Export(opencost.NewClosedWindow(start, end), toExport)
  110. if err != nil {
  111. t.Fatalf("failed to export data: %v", err)
  112. }
  113. expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "")
  114. data, err := store.Read(expectedPath)
  115. if err != nil {
  116. t.Fatalf("failed to read data from store: %v", err)
  117. }
  118. if len(data) == 0 {
  119. t.Fatalf("expected data to be non-empty, got empty")
  120. }
  121. var as *opencost.AllocationSet = new(opencost.AllocationSet)
  122. err = as.UnmarshalBinary(data)
  123. if err != nil {
  124. t.Fatalf("failed to unmarshal data: %v", err)
  125. }
  126. if as.IsEmpty() {
  127. t.Fatalf("expected allocation set to be non-empty, got empty")
  128. }
  129. })
  130. t.Run("test compressed streaming compute storage exporter", func(t *testing.T) {
  131. res := 24 * time.Hour
  132. store := storage.NewMemoryStorage()
  133. p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, &res)
  134. if err != nil {
  135. t.Fatalf("failed to create path formatter: %v", err)
  136. }
  137. encoder := NewGZipEncoderWithLevel(NewBingenEncoder[opencost.AllocationSet](), gzip.BestSpeed)
  138. export := NewComputeStorageExporter(
  139. p,
  140. encoder,
  141. store,
  142. validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
  143. true,
  144. )
  145. start := time.Now().UTC().Truncate(res)
  146. end := start.Add(res)
  147. toExport := opencost.GenerateMockAllocationSet(start)
  148. err = export.Export(opencost.NewClosedWindow(start, end), toExport)
  149. if err != nil {
  150. t.Fatalf("failed to export data: %v", err)
  151. }
  152. expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "gz")
  153. t.Logf("Reading from path: %s\n", expectedPath)
  154. data, err := store.Read(expectedPath)
  155. if err != nil {
  156. t.Fatalf("failed to read data from store: %v", err)
  157. }
  158. if len(data) == 0 {
  159. t.Fatalf("expected data to be non-empty, got empty")
  160. }
  161. reader, err := gzip.NewReader(bytes.NewReader(data))
  162. if err != nil {
  163. t.Fatalf("failed to create gzip reader")
  164. }
  165. defer reader.Close()
  166. var as *opencost.AllocationSet = new(opencost.AllocationSet)
  167. err = as.UnmarshalBinaryFromReader(reader)
  168. if err != nil {
  169. t.Fatalf("failed to unmarshal data: %v", err)
  170. }
  171. if as.IsEmpty() {
  172. t.Fatalf("expected allocation set to be non-empty, got empty")
  173. }
  174. })
  175. }