package customcost import ( "fmt" "io" "net/http" "os" "runtime" "strings" "testing" "time" "github.com/opencost/opencost/core/pkg/log" "github.com/opencost/opencost/core/pkg/util/timeutil" ) func TestPipelineService(t *testing.T) { // establish temporary test assets dir dir := t.TempDir() err := os.MkdirAll(dir+"/config", 0777) if err != nil { t.Fatalf("error creating temp config dir: %v", err) } err = os.MkdirAll(dir+"/executable", 0777) if err != nil { t.Fatalf("error creating temp exec dir: %v", err) } // write DD secrets to config files // write config file to temp dir writeDDConfig(dir+"/config", t) // set env vars for plugin config and executable err = os.Setenv("PLUGIN_CONFIG_DIR", dir+"/config") if err != nil { t.Fatalf("error setting config dir env var: %v", err) } err = os.Setenv("PLUGIN_EXECUTABLE_DIR", dir+"/executable") if err != nil { t.Fatalf("error setting config dir env var: %v", err) } // download amd plugin, store in tmp executable dir downloadLatestPluginExec(dir+"/executable", t) // set up repos hourlyRepo := NewMemoryRepository() dailyRepo := NewMemoryRepository() // set up ingestor config config := DefaultIngestorConfiguration() config.DailyDuration = 7 * timeutil.Day config.HourlyDuration = 16 * time.Hour pipeline, err := NewPipelineService(hourlyRepo, dailyRepo, config) if err != nil { t.Fatalf("error starting pipeline: %v", err) } // wait until coverage is complete, then stop ingestor ingestionComplete := false maxLoops := 10 loopCount := 0 for !ingestionComplete && loopCount < maxLoops { status := pipeline.Status() log.Debugf("got status: %v", status) coverageHourly, foundHourly := status.CoverageHourly["datadog"] coverageDaily, foundDaily := status.CoverageDaily["datadog"] if foundHourly && foundDaily { // check coverage minTime := time.Now().UTC().Add(-6 * timeutil.Day) maxTime := time.Now().UTC().Add(-3 * time.Hour) if coverageDaily.Start().Before(minTime) && coverageHourly.End().After(maxTime) { log.Infof("good coverage, breaking out of loop") ingestionComplete = true break } else { log.Infof("coverage not within range. Looking for coverage %v to %v, but current coverage is %v/%v", minTime, maxTime, coverageDaily, coverageHourly) } } else { log.Debugf("no coverage info ready yet for datadog") } log.Infof("sleeping 10s...") time.Sleep(10 * time.Second) loopCount++ } if !ingestionComplete { t.Fatal("ingestor never completed within allocated time") } pipeline.hourlyIngestor.Stop() pipeline.dailyIngestor.Stop() // inspect data from yesterday targetTime := time.Now().UTC().Add(-1 * timeutil.Day).Truncate(timeutil.Day) log.Infof("querying for data with window start: %v", targetTime) // check for presence of hosts in DD response ddCosts, err := dailyRepo.Get(targetTime, "datadog") if err != nil { t.Fatalf("error getting results for targetTime") } foundInfraHosts := false for _, cost := range ddCosts.Costs { if cost.ResourceType == "infra_hosts" { foundInfraHosts = true } } if !foundInfraHosts { t.Fatal("expecting infra_hosts costs in daily response") } // query data from 4 hours ago (hourly) targetTime = time.Now().UTC().Add(-13 * time.Hour).Truncate(time.Hour) log.Infof("querying for data with window start: %v", targetTime) // check for presence of hosts in DD response ddCosts, err = hourlyRepo.Get(targetTime, "datadog") if err != nil { t.Fatalf("error getting results for targetTime") } foundInfraHosts = false for _, cost := range ddCosts.Costs { if cost.ResourceType == "infra_hosts" { foundInfraHosts = true } } if !foundInfraHosts { t.Fatal("expecting infra_hosts costs in hourly response") } } func downloadLatestPluginExec(dirName string, t *testing.T) { ddPluginURL := "https://github.com/opencost/opencost-plugins/releases/download/v0.0.3/datadog.ocplugin." + runtime.GOOS + "." + runtime.GOARCH out, err := os.OpenFile(dirName+"/datadog.ocplugin."+runtime.GOOS+"."+runtime.GOARCH, 0755|os.O_CREATE, 0755) if err != nil { t.Fatalf("error creating executable file: %v", err) } resp, err := http.Get(ddPluginURL) if err != nil { t.Fatalf("error downloading: %v", err) } defer resp.Body.Close() defer out.Close() _, err = io.Copy(out, resp.Body) if err != nil { t.Fatalf("error copying: %v", err) } } func TestGetRegisteredPlugins_UnreadableConfigDir(t *testing.T) { testCases := []struct { name string setupFunc func() (configDir string, execDir string, cleanup func()) expectError bool errorContains string }{ { name: "non-existent config directory", setupFunc: func() (string, string, func()) { execDir := t.TempDir() return "/non/existent/path", execDir, func() {} }, expectError: true, errorContains: "failed to read plugin config directory", }, { name: "config directory is a file not a directory", setupFunc: func() (string, string, func()) { tmpDir := t.TempDir() execDir := t.TempDir() // Create a file instead of a directory configFile := tmpDir + "/not-a-directory" err := os.WriteFile(configFile, []byte("test"), 0644) if err != nil { t.Fatalf("failed to create test file: %v", err) } return configFile, execDir, func() {} }, expectError: true, errorContains: "failed to read plugin config directory", }, { name: "empty valid config directory", setupFunc: func() (string, string, func()) { configDir := t.TempDir() execDir := t.TempDir() return configDir, execDir, func() {} }, expectError: false, errorContains: "", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { configDir, execDir, cleanup := tc.setupFunc() defer cleanup() _, err := getRegisteredPlugins(configDir, execDir) if tc.expectError { if err == nil { t.Errorf("expected error but got nil") } else if tc.errorContains != "" && !strings.Contains(err.Error(), tc.errorContains) { t.Errorf("expected error to contain %q, but got: %v", tc.errorContains, err) } } else { if err != nil { t.Errorf("expected no error but got: %v", err) } } }) } } func writeDDConfig(pluginConfigDir string, t *testing.T) { // read necessary env vars. If any are missing, log warning and skip test ddSite := os.Getenv("DD_SITE") ddApiKey := os.Getenv("DD_API_KEY") ddAppKey := os.Getenv("DD_APPLICATION_KEY") if ddSite == "" { log.Warnf("DD_SITE undefined, this needs to have the URL of your DD instance, skipping test") t.Skip() return } if ddApiKey == "" { log.Warnf("DD_API_KEY undefined, skipping test") t.Skip() return } if ddAppKey == "" { log.Warnf("DD_APPLICATION_KEY undefined, skipping test") t.Skip() return } // write out config to temp file using contents of env vars ddConf := fmt.Sprintf(`{"datadog_site": "%s", "datadog_api_key": "%s", "datadog_app_key": "%s"}`, ddSite, ddApiKey, ddAppKey) // set up custom cost request file, err := os.CreateTemp(pluginConfigDir, "datadog_config.json") if err != nil { t.Fatalf("could not create temp config dir: %v", err) } err = os.WriteFile(file.Name(), []byte(ddConf), 0777) if err != nil { t.Fatalf("could not write file: %v", err) } } // TestPipelineService_Stop_Nil ensures nil PipelineService is safe func TestPipelineService_Stop_Nil(t *testing.T) { var ps *PipelineService ps.Stop() t.Log("Nil PipelineService handled safely") } // TestPipelineService_Stop_WithNilIngestors ensures nil ingestors are handled func TestPipelineService_Stop_WithNilIngestors(t *testing.T) { ps := &PipelineService{ hourlyIngestor: nil, dailyIngestor: nil, domains: []string{}, } ps.Stop() t.Log("Nil ingestors handled safely") } // TestPipelineService_Stop_PartialNilIngestors ensures partial nil is handled func TestPipelineService_Stop_PartialNilIngestors(t *testing.T) { hourly := &CustomCostIngestor{ key: "hourly", plugins: make(map[string]pluginConnector), } ps := &PipelineService{ hourlyIngestor: hourly, dailyIngestor: nil, domains: []string{}, } ps.Stop() t.Log("Partial nil ingestors handled safely") } // TestPipelineService_Stop_ShutdownLogging verifies logging during shutdown func TestPipelineService_Stop_ShutdownLogging(t *testing.T) { ps := &PipelineService{ hourlyIngestor: &CustomCostIngestor{ key: "hourly", plugins: make(map[string]pluginConnector), }, dailyIngestor: &CustomCostIngestor{ key: "daily", plugins: make(map[string]pluginConnector), }, domains: []string{}, } ps.Stop() time.Sleep(50 * time.Millisecond) t.Log("Pipeline service logged shutdown progress") } func TestPipelineService_Stop_NilReceiver(t *testing.T) { var ps *PipelineService ps.Stop() // should not panic on nil receiver } func TestPipelineService_Stop_NilIngestors(t *testing.T) { ps := &PipelineService{} ps.Stop() // should not panic when ingestors are nil } func TestPipelineService_Stop_WithIngestors(t *testing.T) { hourly := &CustomCostIngestor{plugins: map[string]pluginConnector{}} daily := &CustomCostIngestor{plugins: map[string]pluginConnector{}} ps := &PipelineService{ hourlyIngestor: hourly, dailyIngestor: daily, } ps.Stop() } func TestPipelineService_Stop_OnlyHourlyIngestor(t *testing.T) { ps := &PipelineService{ hourlyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}}, } ps.Stop() // should not panic when dailyIngestor is nil } func TestPipelineService_Stop_OnlyDailyIngestor(t *testing.T) { ps := &PipelineService{ dailyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}}, } ps.Stop() // should not panic when hourlyIngestor is nil }