ingestor_test.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package customcost
  2. import (
  3. "os/exec"
  4. "runtime"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/hashicorp/go-plugin"
  9. )
  10. func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
  11. cmd := exec.Command("sleep", "60")
  12. client := plugin.NewClient(&plugin.ClientConfig{
  13. HandshakeConfig: plugin.HandshakeConfig{
  14. ProtocolVersion: 1,
  15. MagicCookieKey: "test",
  16. MagicCookieValue: "test",
  17. },
  18. Cmd: cmd,
  19. StartTimeout: 2 * time.Second,
  20. })
  21. // Start the process (handshake will fail but process runs)
  22. _, _ = client.Client()
  23. ingestor := &CustomCostIngestor{
  24. plugins: map[string]*plugin.Client{
  25. "test-plugin": client,
  26. },
  27. }
  28. ingestor.Stop()
  29. if !client.Exited() {
  30. t.Error("Expected plugin client process to be terminated after Stop()")
  31. }
  32. }
  33. func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
  34. clients := make(map[string]*plugin.Client)
  35. for _, name := range []string{"plugin-a", "plugin-b", "plugin-c"} {
  36. cmd := exec.Command("sleep", "60")
  37. client := plugin.NewClient(&plugin.ClientConfig{
  38. HandshakeConfig: plugin.HandshakeConfig{
  39. ProtocolVersion: 1,
  40. MagicCookieKey: "test",
  41. MagicCookieValue: name,
  42. },
  43. Cmd: cmd,
  44. StartTimeout: 2 * time.Second,
  45. })
  46. _, _ = client.Client()
  47. clients[name] = client
  48. }
  49. ingestor := &CustomCostIngestor{plugins: clients}
  50. ingestor.Stop()
  51. for name, client := range clients {
  52. if !client.Exited() {
  53. t.Errorf("Expected plugin %s to be terminated after Stop()", name)
  54. }
  55. }
  56. }
  57. func TestIngestor_Stop_EmptyPluginsMap(t *testing.T) {
  58. ingestor := &CustomCostIngestor{
  59. plugins: map[string]*plugin.Client{},
  60. }
  61. ingestor.Stop() // covers lock path with 0 iterations
  62. }
  63. func TestIngestor_Stop_NilPluginsMap(t *testing.T) {
  64. ingestor := &CustomCostIngestor{}
  65. ingestor.Stop() // should not panic
  66. }
  67. func TestIngestor_Stop_AlreadyStopping(t *testing.T) {
  68. ingestor := &CustomCostIngestor{
  69. plugins: map[string]*plugin.Client{},
  70. }
  71. ingestor.isStopping.Store(true) // atomic.Bool must use Store()!
  72. ingestor.Stop() // should return immediately
  73. }
  74. func TestIngestor_Stop_ConcurrentCalls(t *testing.T) {
  75. ingestor := &CustomCostIngestor{
  76. plugins: map[string]*plugin.Client{},
  77. }
  78. var wg sync.WaitGroup
  79. for i := 0; i < 10; i++ {
  80. wg.Add(1)
  81. go func() {
  82. defer wg.Done()
  83. ingestor.Stop()
  84. }()
  85. }
  86. done := make(chan struct{})
  87. go func() {
  88. wg.Wait()
  89. close(done)
  90. }()
  91. select {
  92. case <-done:
  93. // success
  94. case <-time.After(5 * time.Second):
  95. t.Fatal("Concurrent Stop() calls deadlocked")
  96. }
  97. }
  98. func TestIngestor_Stop_WithStartedIngestor(t *testing.T) {
  99. repo := NewMemoryRepository()
  100. config := &CustomCostIngestorConfig{
  101. DailyDuration: 7 * 24 * time.Hour,
  102. HourlyDuration: 16 * time.Hour,
  103. DailyQueryWindow: 24 * time.Hour,
  104. HourlyQueryWindow: time.Hour,
  105. }
  106. ingestor, err := NewCustomCostIngestor(config, repo, map[string]*plugin.Client{}, time.Hour)
  107. if err != nil {
  108. t.Fatalf("Failed to create ingestor: %v", err)
  109. }
  110. ingestor.Start(false)
  111. time.Sleep(100 * time.Millisecond)
  112. done := make(chan struct{})
  113. go func() {
  114. ingestor.Stop()
  115. close(done)
  116. }()
  117. select {
  118. case <-done:
  119. // success
  120. case <-time.After(5 * time.Second):
  121. t.Fatal("Stop() on started ingestor timed out")
  122. }
  123. if ingestor.isRunning.Load() {
  124. t.Error("Expected isRunning to be false after Stop()")
  125. }
  126. if ingestor.isStopping.Load() {
  127. t.Error("Expected isStopping to be false after Stop()")
  128. }
  129. }
  130. // TestIngestor_BuildWindow_WithPlugin covers pluginsLock paths inside buildSingleDomain.
  131. // Using a command that exits immediately causes client.Client() to fail fast, exercising
  132. // the RLock/RUnlock calls and the error-return path without hanging.
  133. func TestIngestor_BuildWindow_WithPlugin(t *testing.T) {
  134. if runtime.GOOS == "windows" {
  135. t.Skip("requires Unix false command")
  136. }
  137. cmd := exec.Command("false") // exits immediately with failure
  138. client := plugin.NewClient(&plugin.ClientConfig{
  139. HandshakeConfig: plugin.HandshakeConfig{
  140. ProtocolVersion: 1,
  141. MagicCookieKey: "test",
  142. MagicCookieValue: "test",
  143. },
  144. Cmd: cmd,
  145. StartTimeout: 2 * time.Second,
  146. })
  147. t.Cleanup(func() { client.Kill() })
  148. repo := NewMemoryRepository()
  149. config := &CustomCostIngestorConfig{
  150. DailyDuration: 24 * time.Hour,
  151. HourlyDuration: time.Hour,
  152. DailyQueryWindow: 24 * time.Hour,
  153. HourlyQueryWindow: time.Hour,
  154. }
  155. ingestor, err := NewCustomCostIngestor(config, repo, map[string]*plugin.Client{"test-plugin": client}, 24*time.Hour)
  156. if err != nil {
  157. t.Fatalf("Failed to create ingestor: %v", err)
  158. }
  159. now := time.Now().UTC()
  160. // BuildWindow iterates the plugins map, exercising pluginsLock in both
  161. // BuildWindow and buildSingleDomain; client.Client() fails fast (false exits)
  162. ingestor.BuildWindow(now.Add(-time.Hour), now)
  163. }