pipelineservice_test.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package customcost
  2. import (
  3. "fmt"
  4. "io"
  5. "net/http"
  6. "os"
  7. "runtime"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/util/timeutil"
  13. )
  14. func TestPipelineService(t *testing.T) {
  15. // establish temporary test assets dir
  16. dir := t.TempDir()
  17. err := os.MkdirAll(dir+"/config", 0777)
  18. if err != nil {
  19. t.Fatalf("error creating temp config dir: %v", err)
  20. }
  21. err = os.MkdirAll(dir+"/executable", 0777)
  22. if err != nil {
  23. t.Fatalf("error creating temp exec dir: %v", err)
  24. }
  25. // write DD secrets to config files
  26. // write config file to temp dir
  27. writeDDConfig(dir+"/config", t)
  28. // set env vars for plugin config and executable
  29. err = os.Setenv("PLUGIN_CONFIG_DIR", dir+"/config")
  30. if err != nil {
  31. t.Fatalf("error setting config dir env var: %v", err)
  32. }
  33. err = os.Setenv("PLUGIN_EXECUTABLE_DIR", dir+"/executable")
  34. if err != nil {
  35. t.Fatalf("error setting config dir env var: %v", err)
  36. }
  37. // download amd plugin, store in tmp executable dir
  38. downloadLatestPluginExec(dir+"/executable", t)
  39. // set up repos
  40. hourlyRepo := NewMemoryRepository()
  41. dailyRepo := NewMemoryRepository()
  42. // set up ingestor config
  43. config := DefaultIngestorConfiguration()
  44. config.DailyDuration = 7 * timeutil.Day
  45. config.HourlyDuration = 16 * time.Hour
  46. pipeline, err := NewPipelineService(hourlyRepo, dailyRepo, config)
  47. if err != nil {
  48. t.Fatalf("error starting pipeline: %v", err)
  49. }
  50. // wait until coverage is complete, then stop ingestor
  51. ingestionComplete := false
  52. maxLoops := 10
  53. loopCount := 0
  54. for !ingestionComplete && loopCount < maxLoops {
  55. status := pipeline.Status()
  56. log.Debugf("got status: %v", status)
  57. coverageHourly, foundHourly := status.CoverageHourly["datadog"]
  58. coverageDaily, foundDaily := status.CoverageDaily["datadog"]
  59. if foundHourly && foundDaily {
  60. // check coverage
  61. minTime := time.Now().UTC().Add(-6 * timeutil.Day)
  62. maxTime := time.Now().UTC().Add(-3 * time.Hour)
  63. if coverageDaily.Start().Before(minTime) && coverageHourly.End().After(maxTime) {
  64. log.Infof("good coverage, breaking out of loop")
  65. ingestionComplete = true
  66. break
  67. } else {
  68. log.Infof("coverage not within range. Looking for coverage %v to %v, but current coverage is %v/%v", minTime, maxTime, coverageDaily, coverageHourly)
  69. }
  70. } else {
  71. log.Debugf("no coverage info ready yet for datadog")
  72. }
  73. log.Infof("sleeping 10s...")
  74. time.Sleep(10 * time.Second)
  75. loopCount++
  76. }
  77. if !ingestionComplete {
  78. t.Fatal("ingestor never completed within allocated time")
  79. }
  80. pipeline.hourlyIngestor.Stop()
  81. pipeline.dailyIngestor.Stop()
  82. // inspect data from yesterday
  83. targetTime := time.Now().UTC().Add(-1 * timeutil.Day).Truncate(timeutil.Day)
  84. log.Infof("querying for data with window start: %v", targetTime)
  85. // check for presence of hosts in DD response
  86. ddCosts, err := dailyRepo.Get(targetTime, "datadog")
  87. if err != nil {
  88. t.Fatalf("error getting results for targetTime")
  89. }
  90. foundInfraHosts := false
  91. for _, cost := range ddCosts.Costs {
  92. if cost.ResourceType == "infra_hosts" {
  93. foundInfraHosts = true
  94. }
  95. }
  96. if !foundInfraHosts {
  97. t.Fatal("expecting infra_hosts costs in daily response")
  98. }
  99. // query data from 4 hours ago (hourly)
  100. targetTime = time.Now().UTC().Add(-13 * time.Hour).Truncate(time.Hour)
  101. log.Infof("querying for data with window start: %v", targetTime)
  102. // check for presence of hosts in DD response
  103. ddCosts, err = hourlyRepo.Get(targetTime, "datadog")
  104. if err != nil {
  105. t.Fatalf("error getting results for targetTime")
  106. }
  107. foundInfraHosts = false
  108. for _, cost := range ddCosts.Costs {
  109. if cost.ResourceType == "infra_hosts" {
  110. foundInfraHosts = true
  111. }
  112. }
  113. if !foundInfraHosts {
  114. t.Fatal("expecting infra_hosts costs in hourly response")
  115. }
  116. }
  117. func downloadLatestPluginExec(dirName string, t *testing.T) {
  118. ddPluginURL := "https://github.com/opencost/opencost-plugins/releases/download/v0.0.3/datadog.ocplugin." + runtime.GOOS + "." + runtime.GOARCH
  119. out, err := os.OpenFile(dirName+"/datadog.ocplugin."+runtime.GOOS+"."+runtime.GOARCH, 0755|os.O_CREATE, 0755)
  120. if err != nil {
  121. t.Fatalf("error creating executable file: %v", err)
  122. }
  123. resp, err := http.Get(ddPluginURL)
  124. if err != nil {
  125. t.Fatalf("error downloading: %v", err)
  126. }
  127. defer resp.Body.Close()
  128. defer out.Close()
  129. _, err = io.Copy(out, resp.Body)
  130. if err != nil {
  131. t.Fatalf("error copying: %v", err)
  132. }
  133. }
  134. func TestGetRegisteredPlugins_UnreadableConfigDir(t *testing.T) {
  135. testCases := []struct {
  136. name string
  137. setupFunc func() (configDir string, execDir string, cleanup func())
  138. expectError bool
  139. errorContains string
  140. }{
  141. {
  142. name: "non-existent config directory",
  143. setupFunc: func() (string, string, func()) {
  144. execDir := t.TempDir()
  145. return "/non/existent/path", execDir, func() {}
  146. },
  147. expectError: true,
  148. errorContains: "failed to read plugin config directory",
  149. },
  150. {
  151. name: "config directory is a file not a directory",
  152. setupFunc: func() (string, string, func()) {
  153. tmpDir := t.TempDir()
  154. execDir := t.TempDir()
  155. // Create a file instead of a directory
  156. configFile := tmpDir + "/not-a-directory"
  157. err := os.WriteFile(configFile, []byte("test"), 0644)
  158. if err != nil {
  159. t.Fatalf("failed to create test file: %v", err)
  160. }
  161. return configFile, execDir, func() {}
  162. },
  163. expectError: true,
  164. errorContains: "failed to read plugin config directory",
  165. },
  166. {
  167. name: "empty valid config directory",
  168. setupFunc: func() (string, string, func()) {
  169. configDir := t.TempDir()
  170. execDir := t.TempDir()
  171. return configDir, execDir, func() {}
  172. },
  173. expectError: false,
  174. errorContains: "",
  175. },
  176. }
  177. for _, tc := range testCases {
  178. t.Run(tc.name, func(t *testing.T) {
  179. configDir, execDir, cleanup := tc.setupFunc()
  180. defer cleanup()
  181. _, err := getRegisteredPlugins(configDir, execDir)
  182. if tc.expectError {
  183. if err == nil {
  184. t.Errorf("expected error but got nil")
  185. } else if tc.errorContains != "" && !strings.Contains(err.Error(), tc.errorContains) {
  186. t.Errorf("expected error to contain %q, but got: %v", tc.errorContains, err)
  187. }
  188. } else {
  189. if err != nil {
  190. t.Errorf("expected no error but got: %v", err)
  191. }
  192. }
  193. })
  194. }
  195. }
  196. func writeDDConfig(pluginConfigDir string, t *testing.T) {
  197. // read necessary env vars. If any are missing, log warning and skip test
  198. ddSite := os.Getenv("DD_SITE")
  199. ddApiKey := os.Getenv("DD_API_KEY")
  200. ddAppKey := os.Getenv("DD_APPLICATION_KEY")
  201. if ddSite == "" {
  202. log.Warnf("DD_SITE undefined, this needs to have the URL of your DD instance, skipping test")
  203. t.Skip()
  204. return
  205. }
  206. if ddApiKey == "" {
  207. log.Warnf("DD_API_KEY undefined, skipping test")
  208. t.Skip()
  209. return
  210. }
  211. if ddAppKey == "" {
  212. log.Warnf("DD_APPLICATION_KEY undefined, skipping test")
  213. t.Skip()
  214. return
  215. }
  216. // write out config to temp file using contents of env vars
  217. ddConf := fmt.Sprintf(`{"datadog_site": "%s", "datadog_api_key": "%s", "datadog_app_key": "%s"}`, ddSite, ddApiKey, ddAppKey)
  218. // set up custom cost request
  219. file, err := os.CreateTemp(pluginConfigDir, "datadog_config.json")
  220. if err != nil {
  221. t.Fatalf("could not create temp config dir: %v", err)
  222. }
  223. err = os.WriteFile(file.Name(), []byte(ddConf), 0777)
  224. if err != nil {
  225. t.Fatalf("could not write file: %v", err)
  226. }
  227. }
  228. // TestPipelineService_Stop_Nil ensures nil PipelineService is safe
  229. func TestPipelineService_Stop_Nil(t *testing.T) {
  230. var ps *PipelineService
  231. ps.Stop()
  232. t.Log("Nil PipelineService handled safely")
  233. }
  234. // TestPipelineService_Stop_WithNilIngestors ensures nil ingestors are handled
  235. func TestPipelineService_Stop_WithNilIngestors(t *testing.T) {
  236. ps := &PipelineService{
  237. hourlyIngestor: nil,
  238. dailyIngestor: nil,
  239. domains: []string{},
  240. }
  241. ps.Stop()
  242. t.Log("Nil ingestors handled safely")
  243. }
  244. // TestPipelineService_Stop_PartialNilIngestors ensures partial nil is handled
  245. func TestPipelineService_Stop_PartialNilIngestors(t *testing.T) {
  246. hourly := &CustomCostIngestor{
  247. key: "hourly",
  248. plugins: make(map[string]pluginConnector),
  249. }
  250. ps := &PipelineService{
  251. hourlyIngestor: hourly,
  252. dailyIngestor: nil,
  253. domains: []string{},
  254. }
  255. ps.Stop()
  256. t.Log("Partial nil ingestors handled safely")
  257. }
  258. // TestPipelineService_Stop_ShutdownLogging verifies logging during shutdown
  259. func TestPipelineService_Stop_ShutdownLogging(t *testing.T) {
  260. ps := &PipelineService{
  261. hourlyIngestor: &CustomCostIngestor{
  262. key: "hourly",
  263. plugins: make(map[string]pluginConnector),
  264. },
  265. dailyIngestor: &CustomCostIngestor{
  266. key: "daily",
  267. plugins: make(map[string]pluginConnector),
  268. },
  269. domains: []string{},
  270. }
  271. ps.Stop()
  272. time.Sleep(50 * time.Millisecond)
  273. t.Log("Pipeline service logged shutdown progress")
  274. }
  275. func TestPipelineService_Stop_NilReceiver(t *testing.T) {
  276. var ps *PipelineService
  277. ps.Stop() // should not panic on nil receiver
  278. }
  279. func TestPipelineService_Stop_NilIngestors(t *testing.T) {
  280. ps := &PipelineService{}
  281. ps.Stop() // should not panic when ingestors are nil
  282. }
  283. func TestPipelineService_Stop_WithIngestors(t *testing.T) {
  284. hourly := &CustomCostIngestor{plugins: map[string]pluginConnector{}}
  285. daily := &CustomCostIngestor{plugins: map[string]pluginConnector{}}
  286. ps := &PipelineService{
  287. hourlyIngestor: hourly,
  288. dailyIngestor: daily,
  289. }
  290. ps.Stop()
  291. }
  292. func TestPipelineService_Stop_OnlyHourlyIngestor(t *testing.T) {
  293. ps := &PipelineService{
  294. hourlyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}},
  295. }
  296. ps.Stop() // should not panic when dailyIngestor is nil
  297. }
  298. func TestPipelineService_Stop_OnlyDailyIngestor(t *testing.T) {
  299. ps := &PipelineService{
  300. dailyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}},
  301. }
  302. ps.Stop() // should not panic when hourlyIngestor is nil
  303. }