ingestor_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package customcost
  2. import (
  3. "fmt"
  4. "os/exec"
  5. "runtime"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/hashicorp/go-plugin"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. )
  12. func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
  13. cmd := exec.Command("sleep", "60")
  14. client := plugin.NewClient(&plugin.ClientConfig{
  15. HandshakeConfig: plugin.HandshakeConfig{
  16. ProtocolVersion: 1,
  17. MagicCookieKey: "test",
  18. MagicCookieValue: "test",
  19. },
  20. Cmd: cmd,
  21. StartTimeout: 2 * time.Second,
  22. })
  23. // Start the process (handshake will fail but process runs)
  24. _, _ = client.Client()
  25. ingestor := &CustomCostIngestor{
  26. plugins: map[string]pluginConnector{
  27. "test-plugin": client,
  28. },
  29. }
  30. ingestor.Stop()
  31. if !client.Exited() {
  32. t.Error("Expected plugin client process to be terminated after Stop()")
  33. }
  34. }
  35. func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
  36. connectors := make(map[string]pluginConnector)
  37. clients := make(map[string]*plugin.Client)
  38. for _, name := range []string{"plugin-a", "plugin-b", "plugin-c"} {
  39. cmd := exec.Command("sleep", "60")
  40. client := plugin.NewClient(&plugin.ClientConfig{
  41. HandshakeConfig: plugin.HandshakeConfig{
  42. ProtocolVersion: 1,
  43. MagicCookieKey: "test",
  44. MagicCookieValue: name,
  45. },
  46. Cmd: cmd,
  47. StartTimeout: 2 * time.Second,
  48. })
  49. _, _ = client.Client()
  50. connectors[name] = client
  51. clients[name] = client
  52. }
  53. ingestor := &CustomCostIngestor{plugins: connectors}
  54. ingestor.Stop()
  55. for name, client := range clients {
  56. if !client.Exited() {
  57. t.Errorf("Expected plugin %s to be terminated after Stop()", name)
  58. }
  59. }
  60. }
  61. func TestIngestor_Stop_EmptyPluginsMap(t *testing.T) {
  62. ingestor := &CustomCostIngestor{
  63. plugins: map[string]pluginConnector{},
  64. }
  65. ingestor.Stop() // covers lock path with 0 iterations
  66. }
  67. func TestIngestor_Stop_NilPluginsMap(t *testing.T) {
  68. ingestor := &CustomCostIngestor{}
  69. ingestor.Stop() // should not panic
  70. }
  71. func TestIngestor_Stop_AlreadyStopping(t *testing.T) {
  72. ingestor := &CustomCostIngestor{
  73. plugins: map[string]pluginConnector{},
  74. }
  75. ingestor.isStopping.Store(true) // atomic.Bool must use Store()!
  76. ingestor.Stop() // should return immediately
  77. }
  78. func TestIngestor_Stop_ConcurrentCalls(t *testing.T) {
  79. ingestor := &CustomCostIngestor{
  80. plugins: map[string]pluginConnector{},
  81. }
  82. var wg sync.WaitGroup
  83. for i := 0; i < 10; i++ {
  84. wg.Add(1)
  85. go func() {
  86. defer wg.Done()
  87. ingestor.Stop()
  88. }()
  89. }
  90. done := make(chan struct{})
  91. go func() {
  92. wg.Wait()
  93. close(done)
  94. }()
  95. select {
  96. case <-done:
  97. // success
  98. case <-time.After(5 * time.Second):
  99. t.Fatal("Concurrent Stop() calls deadlocked")
  100. }
  101. }
  102. func TestIngestor_Stop_WithStartedIngestor(t *testing.T) {
  103. repo := NewMemoryRepository()
  104. config := &CustomCostIngestorConfig{
  105. DailyDuration: 7 * 24 * time.Hour,
  106. HourlyDuration: 16 * time.Hour,
  107. DailyQueryWindow: 24 * time.Hour,
  108. HourlyQueryWindow: time.Hour,
  109. }
  110. ingestor, err := NewCustomCostIngestor(config, repo, map[string]*plugin.Client{}, time.Hour)
  111. if err != nil {
  112. t.Fatalf("Failed to create ingestor: %v", err)
  113. }
  114. ingestor.Start(false)
  115. time.Sleep(100 * time.Millisecond)
  116. done := make(chan struct{})
  117. go func() {
  118. ingestor.Stop()
  119. close(done)
  120. }()
  121. select {
  122. case <-done:
  123. // success
  124. case <-time.After(5 * time.Second):
  125. t.Fatal("Stop() on started ingestor timed out")
  126. }
  127. if ingestor.isRunning.Load() {
  128. t.Error("Expected isRunning to be false after Stop()")
  129. }
  130. if ingestor.isStopping.Load() {
  131. t.Error("Expected isStopping to be false after Stop()")
  132. }
  133. }
  134. // TestIngestor_BuildWindow_WithPlugin covers pluginsLock paths inside buildSingleDomain.
  135. // Using a command that exits immediately causes client.Client() to fail fast, exercising
  136. // the RLock/RUnlock calls and the error-return path without hanging.
  137. func TestIngestor_BuildWindow_WithPlugin(t *testing.T) {
  138. if runtime.GOOS == "windows" {
  139. t.Skip("requires Unix false command")
  140. }
  141. cmd := exec.Command("false") // exits immediately with failure
  142. client := plugin.NewClient(&plugin.ClientConfig{
  143. HandshakeConfig: plugin.HandshakeConfig{
  144. ProtocolVersion: 1,
  145. MagicCookieKey: "test",
  146. MagicCookieValue: "test",
  147. },
  148. Cmd: cmd,
  149. StartTimeout: 2 * time.Second,
  150. })
  151. t.Cleanup(func() { client.Kill() })
  152. repo := NewMemoryRepository()
  153. config := &CustomCostIngestorConfig{
  154. DailyDuration: 24 * time.Hour,
  155. HourlyDuration: time.Hour,
  156. DailyQueryWindow: 24 * time.Hour,
  157. HourlyQueryWindow: time.Hour,
  158. }
  159. ingestor, err := NewCustomCostIngestor(config, repo, map[string]*plugin.Client{"test-plugin": client}, 24*time.Hour)
  160. if err != nil {
  161. t.Fatalf("Failed to create ingestor: %v", err)
  162. }
  163. now := time.Now().UTC()
  164. // BuildWindow iterates the plugins map, exercising pluginsLock in both
  165. // BuildWindow and buildSingleDomain; client.Client() fails fast (false exits)
  166. ingestor.BuildWindow(now.Add(-time.Hour), now)
  167. }
  168. // mockClientProtocol implements plugin.ClientProtocol for testing.
  169. type mockClientProtocol struct {
  170. dispenseResult interface{}
  171. dispenseErr error
  172. }
  173. func (m *mockClientProtocol) Dispense(string) (interface{}, error) {
  174. return m.dispenseResult, m.dispenseErr
  175. }
  176. func (m *mockClientProtocol) Ping() error { return nil }
  177. func (m *mockClientProtocol) Close() error { return nil }
  178. // mockPluginConnector implements pluginConnector for testing.
  179. type mockPluginConnector struct {
  180. protocol plugin.ClientProtocol
  181. clientErr error
  182. killed bool
  183. }
  184. func (m *mockPluginConnector) Client() (plugin.ClientProtocol, error) {
  185. if m.clientErr != nil {
  186. return nil, m.clientErr
  187. }
  188. return m.protocol, nil
  189. }
  190. func (m *mockPluginConnector) Kill() { m.killed = true }
  191. func TestBuildSingleDomain_InvalidPluginType_NoPanic(t *testing.T) {
  192. mock := &mockPluginConnector{
  193. protocol: &mockClientProtocol{
  194. dispenseResult: "not a CustomCostSource", // wrong type
  195. },
  196. }
  197. repo := NewMemoryRepository()
  198. ingestor := &CustomCostIngestor{
  199. plugins: map[string]pluginConnector{"bad-plugin": mock},
  200. resolution: time.Hour,
  201. repo: repo,
  202. coverage: map[string]opencost.Window{},
  203. }
  204. now := time.Now().UTC()
  205. // Before the fix this would panic; now it should log an error and return.
  206. ingestor.BuildWindow(now.Add(-time.Hour), now)
  207. }
  208. func TestBuildSingleDomain_DispenseError(t *testing.T) {
  209. mock := &mockPluginConnector{
  210. protocol: &mockClientProtocol{
  211. dispenseErr: fmt.Errorf("dispense failed"),
  212. },
  213. }
  214. repo := NewMemoryRepository()
  215. ingestor := &CustomCostIngestor{
  216. plugins: map[string]pluginConnector{"err-plugin": mock},
  217. resolution: time.Hour,
  218. repo: repo,
  219. coverage: map[string]opencost.Window{},
  220. }
  221. now := time.Now().UTC()
  222. // Should handle the error gracefully without panic.
  223. ingestor.BuildWindow(now.Add(-time.Hour), now)
  224. }
  225. func TestBuildSingleDomain_ClientError(t *testing.T) {
  226. mock := &mockPluginConnector{
  227. clientErr: fmt.Errorf("connection failed"),
  228. }
  229. repo := NewMemoryRepository()
  230. ingestor := &CustomCostIngestor{
  231. plugins: map[string]pluginConnector{"fail-plugin": mock},
  232. resolution: time.Hour,
  233. repo: repo,
  234. coverage: map[string]opencost.Window{},
  235. }
  236. now := time.Now().UTC()
  237. // Should handle the error gracefully without panic.
  238. ingestor.BuildWindow(now.Add(-time.Hour), now)
  239. }