ingestor_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package customcost
  2. import (
  3. "fmt"
  4. "os/exec"
  5. "runtime"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/hashicorp/go-plugin"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. )
  12. func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
  13. cmd := exec.Command("sleep", "60")
  14. client := plugin.NewClient(&plugin.ClientConfig{
  15. HandshakeConfig: plugin.HandshakeConfig{
  16. ProtocolVersion: 1,
  17. MagicCookieKey: "test",
  18. MagicCookieValue: "test",
  19. },
  20. Cmd: cmd,
  21. StartTimeout: 2 * time.Second,
  22. })
  23. // Start the process (handshake will fail but process runs)
  24. _, _ = client.Client()
  25. ingestor := &CustomCostIngestor{
  26. plugins: map[string]pluginConnector{
  27. "test-plugin": client,
  28. },
  29. }
  30. ingestor.Stop()
  31. if !client.Exited() {
  32. t.Error("Expected plugin client process to be terminated after Stop()")
  33. }
  34. }
  35. func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
  36. connectors := make(map[string]pluginConnector)
  37. clients := make(map[string]*plugin.Client)
  38. for _, name := range []string{"plugin-a", "plugin-b", "plugin-c"} {
  39. cmd := exec.Command("sleep", "60")
  40. client := plugin.NewClient(&plugin.ClientConfig{
  41. HandshakeConfig: plugin.HandshakeConfig{
  42. ProtocolVersion: 1,
  43. MagicCookieKey: "test",
  44. MagicCookieValue: name,
  45. },
  46. Cmd: cmd,
  47. StartTimeout: 2 * time.Second,
  48. })
  49. _, _ = client.Client()
  50. connectors[name] = client
  51. clients[name] = client
  52. }
  53. ingestor := &CustomCostIngestor{plugins: connectors}
  54. ingestor.Stop()
  55. for name, client := range clients {
  56. if !client.Exited() {
  57. t.Errorf("Expected plugin %s to be terminated after Stop()", name)
  58. }
  59. }
  60. }
  61. func TestIngestor_Stop_EmptyPluginsMap(t *testing.T) {
  62. ingestor := &CustomCostIngestor{
  63. plugins: map[string]pluginConnector{},
  64. }
  65. ingestor.Stop() // covers lock path with 0 iterations
  66. }
  67. func TestIngestor_Stop_NilPluginsMap(t *testing.T) {
  68. ingestor := &CustomCostIngestor{}
  69. ingestor.Stop() // should not panic
  70. }
  71. func TestIngestor_Stop_AlreadyStopping(t *testing.T) {
  72. ingestor := &CustomCostIngestor{
  73. plugins: map[string]pluginConnector{},
  74. }
  75. ingestor.isStopping.Store(true) // atomic.Bool must use Store()!
  76. ingestor.Stop() // should return immediately
  77. }
  78. func TestIngestor_Stop_ConcurrentCalls(t *testing.T) {
  79. ingestor := &CustomCostIngestor{
  80. plugins: map[string]pluginConnector{},
  81. }
  82. var wg sync.WaitGroup
  83. for i := 0; i < 10; i++ {
  84. wg.Add(1)
  85. go func() {
  86. defer wg.Done()
  87. ingestor.Stop()
  88. }()
  89. }
  90. done := make(chan struct{})
  91. go func() {
  92. wg.Wait()
  93. close(done)
  94. }()
  95. select {
  96. case <-done:
  97. // success
  98. case <-time.After(5 * time.Second):
  99. t.Fatal("Concurrent Stop() calls deadlocked")
  100. }
  101. }
  102. func TestIngestor_Stop_WithStartedIngestor(t *testing.T) {
  103. repo := NewMemoryRepository()
  104. config := &CustomCostIngestorConfig{
  105. DailyDuration: 7 * 24 * time.Hour,
  106. HourlyDuration: 16 * time.Hour,
  107. DailyQueryWindow: 24 * time.Hour,
  108. HourlyQueryWindow: time.Hour,
  109. }
  110. ingestor, err := NewCustomCostIngestor(config, repo, map[string]*plugin.Client{}, time.Hour)
  111. if err != nil {
  112. t.Fatalf("Failed to create ingestor: %v", err)
  113. }
  114. ingestor.Start(false)
  115. time.Sleep(100 * time.Millisecond)
  116. done := make(chan struct{})
  117. go func() {
  118. ingestor.Stop()
  119. close(done)
  120. }()
  121. select {
  122. case <-done:
  123. // success
  124. case <-time.After(5 * time.Second):
  125. t.Fatal("Stop() on started ingestor timed out")
  126. }
  127. if ingestor.isRunning.Load() {
  128. t.Error("Expected isRunning to be false after Stop()")
  129. }
  130. if ingestor.isStopping.Load() {
  131. t.Error("Expected isStopping to be false after Stop()")
  132. }
  133. }
  134. // TestIngestor_BuildWindow_WithPlugin covers pluginsLock paths inside buildSingleDomain.
  135. // Using a command that exits immediately causes client.Client() to fail fast, exercising
  136. // the RLock/RUnlock calls and the error-return path without hanging.
  137. func TestIngestor_BuildWindow_WithPlugin(t *testing.T) {
  138. if runtime.GOOS == "windows" {
  139. t.Skip("requires Unix false command")
  140. }
  141. cmd := exec.Command("false") // exits immediately with failure
  142. client := plugin.NewClient(&plugin.ClientConfig{
  143. HandshakeConfig: plugin.HandshakeConfig{
  144. ProtocolVersion: 1,
  145. MagicCookieKey: "test",
  146. MagicCookieValue: "test",
  147. },
  148. Cmd: cmd,
  149. StartTimeout: 2 * time.Second,
  150. })
  151. t.Cleanup(func() { client.Kill() })
  152. repo := NewMemoryRepository()
  153. config := &CustomCostIngestorConfig{
  154. DailyDuration: 24 * time.Hour,
  155. HourlyDuration: time.Hour,
  156. DailyQueryWindow: 24 * time.Hour,
  157. HourlyQueryWindow: time.Hour,
  158. }
  159. ingestor, err := NewCustomCostIngestor(config, repo, map[string]*plugin.Client{"test-plugin": client}, 24*time.Hour)
  160. if err != nil {
  161. t.Fatalf("Failed to create ingestor: %v", err)
  162. }
  163. now := time.Now().UTC()
  164. // BuildWindow iterates the plugins map, exercising pluginsLock in both
  165. // BuildWindow and buildSingleDomain; client.Client() fails fast (false exits)
  166. ingestor.BuildWindow(now.Add(-time.Hour), now)
  167. }
  168. // mockClientProtocol implements plugin.ClientProtocol for testing.
  169. type mockClientProtocol struct {
  170. dispenseResult interface{}
  171. dispenseErr error
  172. }
  173. func (m *mockClientProtocol) Dispense(string) (interface{}, error) {
  174. return m.dispenseResult, m.dispenseErr
  175. }
  176. func (m *mockClientProtocol) Ping() error { return nil }
  177. func (m *mockClientProtocol) Close() error { return nil }
  178. // mockPluginConnector implements pluginConnector for testing.
  179. type mockPluginConnector struct {
  180. protocol plugin.ClientProtocol
  181. clientErr error
  182. killed bool
  183. }
  184. func (m *mockPluginConnector) Client() (plugin.ClientProtocol, error) {
  185. if m.clientErr != nil {
  186. return nil, m.clientErr
  187. }
  188. return m.protocol, nil
  189. }
  190. func (m *mockPluginConnector) Kill() { m.killed = true }
  191. func TestBuildSingleDomain_InvalidPluginType_NoPanic(t *testing.T) {
  192. mock := &mockPluginConnector{
  193. protocol: &mockClientProtocol{
  194. dispenseResult: "not a CustomCostSource", // wrong type
  195. },
  196. }
  197. repo := NewMemoryRepository()
  198. ingestor := &CustomCostIngestor{
  199. plugins: map[string]pluginConnector{"bad-plugin": mock},
  200. resolution: time.Hour,
  201. repo: repo,
  202. coverage: map[string]opencost.Window{},
  203. }
  204. now := time.Now().UTC()
  205. // Before the fix this would panic; now it should log an error and return.
  206. ingestor.BuildWindow(now.Add(-time.Hour), now)
  207. }
  208. func TestBuildSingleDomain_DispenseError(t *testing.T) {
  209. mock := &mockPluginConnector{
  210. protocol: &mockClientProtocol{
  211. dispenseErr: fmt.Errorf("dispense failed"),
  212. },
  213. }
  214. repo := NewMemoryRepository()
  215. ingestor := &CustomCostIngestor{
  216. plugins: map[string]pluginConnector{"err-plugin": mock},
  217. resolution: time.Hour,
  218. repo: repo,
  219. coverage: map[string]opencost.Window{},
  220. }
  221. now := time.Now().UTC()
  222. // Should handle the error gracefully without panic.
  223. ingestor.BuildWindow(now.Add(-time.Hour), now)
  224. }
  225. // TestIngestor_Status_ReturnsCopyOfCoverage deterministically proves Status()
  226. // hands back a copy, not the live map: mutating the returned Coverage must not
  227. // leak into the ingestor's internal state. Unlike the concurrent test below,
  228. // this fails without needing the race detector.
  229. func TestIngestor_Status_ReturnsCopyOfCoverage(t *testing.T) {
  230. ingestor := &CustomCostIngestor{
  231. coverage: map[string]opencost.Window{},
  232. }
  233. start := time.Now().UTC()
  234. end := start.Add(time.Hour)
  235. ingestor.expandCoverage(opencost.NewWindow(&start, &end), "plugin-a")
  236. status := ingestor.Status()
  237. if len(status.Coverage) != 1 {
  238. t.Fatalf("expected 1 coverage entry, got %d", len(status.Coverage))
  239. }
  240. // Mutating the returned map must not affect the ingestor.
  241. status.Coverage["plugin-b"] = opencost.NewWindow(&start, &end)
  242. delete(status.Coverage, "plugin-a")
  243. again := ingestor.Status()
  244. if _, ok := again.Coverage["plugin-a"]; !ok {
  245. t.Error("plugin-a should remain in the ingestor's coverage; Status() leaked a live reference")
  246. }
  247. if _, ok := again.Coverage["plugin-b"]; ok {
  248. t.Error("plugin-b leaked into the ingestor's coverage; Status() returned a live reference")
  249. }
  250. }
  251. // TestIngestor_Status_ConcurrentWithExpandCoverage guards against a data race:
  252. // Status() returns the coverage map by reference while expandCoverage() writes
  253. // to it under coverageLock. The /customCost/status handler serializes the
  254. // returned map (which iterates it), racing the writer and crashing the process
  255. // with "concurrent map iteration and map write". Run with -race to detect it.
  256. func TestIngestor_Status_ConcurrentWithExpandCoverage(t *testing.T) {
  257. ingestor := &CustomCostIngestor{
  258. coverage: map[string]opencost.Window{},
  259. }
  260. start := time.Now().UTC()
  261. end := start.Add(time.Hour)
  262. window := opencost.NewWindow(&start, &end)
  263. var wg sync.WaitGroup
  264. wg.Add(2)
  265. // writer: continuously expands coverage under the lock
  266. go func() {
  267. defer wg.Done()
  268. for i := 0; i < 2000; i++ {
  269. ingestor.expandCoverage(window, fmt.Sprintf("plugin-%d", i%16))
  270. }
  271. }()
  272. // reader: reads Status() and iterates the returned map, mimicking the JSON
  273. // serialization the /customCost/status handler performs on the response
  274. go func() {
  275. defer wg.Done()
  276. for i := 0; i < 2000; i++ {
  277. for range ingestor.Status().Coverage {
  278. }
  279. }
  280. }()
  281. wg.Wait()
  282. // The writer touched 16 distinct plugins, so coverage should hold 16 entries
  283. // once the race-free reads settle.
  284. if got := len(ingestor.Status().Coverage); got != 16 {
  285. t.Fatalf("expected 16 coverage entries after concurrent writes, got %d", got)
  286. }
  287. }
  288. func TestBuildSingleDomain_ClientError(t *testing.T) {
  289. mock := &mockPluginConnector{
  290. clientErr: fmt.Errorf("connection failed"),
  291. }
  292. repo := NewMemoryRepository()
  293. ingestor := &CustomCostIngestor{
  294. plugins: map[string]pluginConnector{"fail-plugin": mock},
  295. resolution: time.Hour,
  296. repo: repo,
  297. coverage: map[string]opencost.Window{},
  298. }
  299. now := time.Now().UTC()
  300. // Should handle the error gracefully without panic.
  301. ingestor.BuildWindow(now.Add(-time.Hour), now)
  302. }