exporter_test.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package exporter
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "testing"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  8. "github.com/opencost/opencost/core/pkg/exporter/validator"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "github.com/opencost/opencost/core/pkg/pipelines"
  11. "github.com/opencost/opencost/core/pkg/storage"
  12. "github.com/opencost/opencost/core/pkg/util/json"
  13. )
  14. const (
  15. TestAppName = "test-app"
  16. TestClusterID = "test-cluster-id"
  17. TestClusterName = "test-cluster"
  18. TestEventName = "test-event-path"
  19. )
  20. type TestData struct {
  21. Message string `json:"message"`
  22. }
  23. func TestStorageExporters(t *testing.T) {
  24. t.Run("test event storage exporter", func(t *testing.T) {
  25. store := storage.NewMemoryStorage()
  26. p, err := pathing.NewEventStoragePathFormatter("root", TestClusterName, TestEventName)
  27. if err != nil {
  28. t.Fatalf("failed to create path formatter: %v", err)
  29. }
  30. encoder := NewJSONEncoder[TestData]()
  31. export := NewEventStorageExporter(p, encoder, store)
  32. ts := time.Now().UTC().Truncate(time.Minute)
  33. export.Export(ts, &TestData{
  34. Message: "TestMessage-1",
  35. })
  36. expectedPath := p.ToFullPath("", ts, "json")
  37. t.Logf("expected path: %s", expectedPath)
  38. data, err := store.Read(expectedPath)
  39. if err != nil {
  40. t.Fatalf("failed to read data from store: %v", err)
  41. }
  42. if len(data) == 0 {
  43. t.Fatalf("expected data to be non-empty, got empty")
  44. }
  45. t.Logf("Data: %s", string(data))
  46. var td *TestData = new(TestData)
  47. if err := json.Unmarshal(data, td); err != nil {
  48. t.Fatalf("failed to unmarshal data: %v", err)
  49. }
  50. if td.Message != "TestMessage-1" {
  51. t.Fatalf("expected message to be 'TestMessage-1', got '%s'", td.Message)
  52. }
  53. })
  54. t.Run("test compute storage exporter", func(t *testing.T) {
  55. res := 24 * time.Hour
  56. store := storage.NewMemoryStorage()
  57. p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, &res)
  58. if err != nil {
  59. t.Fatalf("failed to create path formatter: %v", err)
  60. }
  61. encoder := NewBingenEncoder[opencost.AllocationSet]()
  62. export := NewComputeStorageExporter(
  63. p,
  64. encoder,
  65. store,
  66. validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
  67. false,
  68. )
  69. start := time.Now().UTC().Truncate(res)
  70. end := start.Add(res)
  71. toExport := opencost.GenerateMockAllocationSet(start)
  72. err = export.Export(opencost.NewClosedWindow(start, end), toExport)
  73. if err != nil {
  74. t.Fatalf("failed to export data: %v", err)
  75. }
  76. expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "")
  77. data, err := store.Read(expectedPath)
  78. if err != nil {
  79. t.Fatalf("failed to read data from store: %v", err)
  80. }
  81. if len(data) == 0 {
  82. t.Fatalf("expected data to be non-empty, got empty")
  83. }
  84. var as *opencost.AllocationSet = new(opencost.AllocationSet)
  85. err = as.UnmarshalBinary(data)
  86. if err != nil {
  87. t.Fatalf("failed to unmarshal data: %v", err)
  88. }
  89. if as.IsEmpty() {
  90. t.Fatalf("expected allocation set to be non-empty, got empty")
  91. }
  92. })
  93. t.Run("test streaming compute storage exporter", func(t *testing.T) {
  94. res := 24 * time.Hour
  95. store := storage.NewMemoryStorage()
  96. p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, &res)
  97. if err != nil {
  98. t.Fatalf("failed to create path formatter: %v", err)
  99. }
  100. encoder := NewBingenEncoder[opencost.AllocationSet]()
  101. export := NewComputeStorageExporter[opencost.AllocationSet](
  102. p,
  103. encoder,
  104. store,
  105. validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
  106. true,
  107. )
  108. start := time.Now().UTC().Truncate(res)
  109. end := start.Add(res)
  110. toExport := opencost.GenerateMockAllocationSet(start)
  111. err = export.Export(opencost.NewClosedWindow(start, end), toExport)
  112. if err != nil {
  113. t.Fatalf("failed to export data: %v", err)
  114. }
  115. expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "")
  116. data, err := store.Read(expectedPath)
  117. if err != nil {
  118. t.Fatalf("failed to read data from store: %v", err)
  119. }
  120. if len(data) == 0 {
  121. t.Fatalf("expected data to be non-empty, got empty")
  122. }
  123. var as *opencost.AllocationSet = new(opencost.AllocationSet)
  124. err = as.UnmarshalBinary(data)
  125. if err != nil {
  126. t.Fatalf("failed to unmarshal data: %v", err)
  127. }
  128. if as.IsEmpty() {
  129. t.Fatalf("expected allocation set to be non-empty, got empty")
  130. }
  131. })
  132. t.Run("test compressed streaming compute storage exporter", func(t *testing.T) {
  133. res := 24 * time.Hour
  134. store := storage.NewMemoryStorage()
  135. p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, &res)
  136. if err != nil {
  137. t.Fatalf("failed to create path formatter: %v", err)
  138. }
  139. encoder := NewGZipEncoderWithLevel(NewBingenEncoder[opencost.AllocationSet](), gzip.BestSpeed)
  140. export := NewComputeStorageExporter(
  141. p,
  142. encoder,
  143. store,
  144. validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
  145. true,
  146. )
  147. start := time.Now().UTC().Truncate(res)
  148. end := start.Add(res)
  149. toExport := opencost.GenerateMockAllocationSet(start)
  150. err = export.Export(opencost.NewClosedWindow(start, end), toExport)
  151. if err != nil {
  152. t.Fatalf("failed to export data: %v", err)
  153. }
  154. expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "gz")
  155. t.Logf("Reading from path: %s\n", expectedPath)
  156. data, err := store.Read(expectedPath)
  157. if err != nil {
  158. t.Fatalf("failed to read data from store: %v", err)
  159. }
  160. if len(data) == 0 {
  161. t.Fatalf("expected data to be non-empty, got empty")
  162. }
  163. reader, err := gzip.NewReader(bytes.NewReader(data))
  164. if err != nil {
  165. t.Fatalf("failed to create gzip reader")
  166. }
  167. defer reader.Close()
  168. var as *opencost.AllocationSet = new(opencost.AllocationSet)
  169. err = as.UnmarshalBinaryFromReader(reader)
  170. if err != nil {
  171. t.Fatalf("failed to unmarshal data: %v", err)
  172. }
  173. if as.IsEmpty() {
  174. t.Fatalf("expected allocation set to be non-empty, got empty")
  175. }
  176. })
  177. }