|
|
@@ -0,0 +1,175 @@
|
|
|
+package customcost
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "io"
|
|
|
+ "io/fs"
|
|
|
+ "net/http"
|
|
|
+ "os"
|
|
|
+ "testing"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/opencost/opencost/core/pkg/log"
|
|
|
+ "github.com/opencost/opencost/core/pkg/util/timeutil"
|
|
|
+)
|
|
|
+
|
|
|
+const ddPluginURL = "https://github.com/opencost/opencost-plugins/releases/download/v0.0.1/datadog.ocplugin.amd64"
|
|
|
+
|
|
|
+func TestPipelineService(t *testing.T) {
|
|
|
+ // establish temporary test assets dir
|
|
|
+ dir, err := os.MkdirTemp("", "ocplugin-*")
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error creating temp dir: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = os.MkdirAll(dir+"/config", fs.FileMode(os.O_RDWR))
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error creating temp config dir: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = os.MkdirAll(dir+"/executable", fs.FileMode(os.O_RDWR))
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error creating temp exec dir: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // write DD secrets to config files
|
|
|
+ // write config file to temp dir
|
|
|
+ writeDDConfig(dir+"/config", t)
|
|
|
+
|
|
|
+ // set env vars for plugin config and executable
|
|
|
+ err = os.Setenv("PLUGIN_CONFIG_DIR", dir+"/config")
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error setting config dir env var: %v", err)
|
|
|
+ }
|
|
|
+ err = os.Setenv("PLUGIN_CONFIG_DIR", dir+"/executable")
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error setting config dir env var: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // download amd plugin, store in tmp executable dir
|
|
|
+ downloadLatestPluginExec(dir+"/executable", t)
|
|
|
+
|
|
|
+ // set up repos
|
|
|
+ hourlyRepo := NewMemoryRepository()
|
|
|
+ dailyRepo := NewMemoryRepository()
|
|
|
+ // set up ingestor config
|
|
|
+ config := DefaultIngestorConfiguration()
|
|
|
+
|
|
|
+ config.DailyDuration = 7 * timeutil.Day
|
|
|
+ config.HourlyDuration = 12 * time.Hour
|
|
|
+ pipeline, err := NewPipelineService(hourlyRepo, dailyRepo, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error starting pipeline: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // wait until coverage is complete, then stop ingestor
|
|
|
+ ingestionComplete := false
|
|
|
+ maxLoops := 10
|
|
|
+ loopCount := 0
|
|
|
+ for !ingestionComplete && loopCount < maxLoops {
|
|
|
+ status := pipeline.Status()
|
|
|
+ log.Debugf("got status: %v", status)
|
|
|
+ coverage, found := status.Coverage["datadog"]
|
|
|
+ if found {
|
|
|
+ // check coverage
|
|
|
+ minTime := time.Now().UTC().Add(-6 * timeutil.Day)
|
|
|
+ maxTime := time.Now().UTC().Add(-3 * time.Hour)
|
|
|
+ if coverage.Start().Before(minTime) && coverage.End().After(maxTime) {
|
|
|
+ log.Infof("good coverage, breaking out of loop")
|
|
|
+ ingestionComplete = true
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ log.Infof("coverage not within range. Looking for coverage %v to %v, but current coverage is %v", minTime, maxTime, coverage)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.Debugf("no coverage info ready yet for datadog")
|
|
|
+ }
|
|
|
+ log.Infof("sleeping 10s...")
|
|
|
+ time.Sleep(10 * time.Second)
|
|
|
+ loopCount++
|
|
|
+ }
|
|
|
+
|
|
|
+ if !ingestionComplete {
|
|
|
+ t.Fatal("ingestor never completed within allocated time")
|
|
|
+ }
|
|
|
+
|
|
|
+ pipeline.hourlyIngestor.Stop()
|
|
|
+ pipeline.dailyIngestor.Stop()
|
|
|
+
|
|
|
+ // inspect data from yesterday
|
|
|
+ targetTime := time.Now().Add(-1 * timeutil.Day).Truncate(timeutil.Day)
|
|
|
+ log.Infof("querying for data with window start: %v", targetTime)
|
|
|
+ // check for presence of hosts in DD response
|
|
|
+ ddCosts, err := dailyRepo.Get(targetTime, "datadog")
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error getting results for targetTime")
|
|
|
+ }
|
|
|
+ foundInfraHosts := false
|
|
|
+ for _, cost := range ddCosts.Costs {
|
|
|
+ if cost.ResourceType == "infra_hosts" {
|
|
|
+ foundInfraHosts = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if !foundInfraHosts {
|
|
|
+ t.Fatal("expecting infra_hosts costs in response")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func downloadLatestPluginExec(dirName string, t *testing.T) {
|
|
|
+ out, err := os.OpenFile(dirName+"datadog.ocplugin.amd64", 0755, 0755)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error creating executable file: %v", err)
|
|
|
+ }
|
|
|
+ resp, err := http.Get(ddPluginURL)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error downloading: %v", err)
|
|
|
+ }
|
|
|
+ defer resp.Body.Close()
|
|
|
+ defer out.Close()
|
|
|
+
|
|
|
+ _, err = io.Copy(out, resp.Body)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("error copying: %v", err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func writeDDConfig(pluginConfigDir string, t *testing.T) {
|
|
|
+ // read necessary env vars. If any are missing, log warning and skip test
|
|
|
+ ddSite := os.Getenv("DD_SITE")
|
|
|
+ ddApiKey := os.Getenv("DD_API_KEY")
|
|
|
+ ddAppKey := os.Getenv("DD_APPLICATION_KEY")
|
|
|
+
|
|
|
+ if ddSite == "" {
|
|
|
+ log.Warnf("DD_SITE undefined, this needs to have the URL of your DD instance, skipping test")
|
|
|
+ t.Skip()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if ddApiKey == "" {
|
|
|
+ log.Warnf("DD_API_KEY undefined, skipping test")
|
|
|
+ t.Skip()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if ddAppKey == "" {
|
|
|
+ log.Warnf("DD_APPLICATION_KEY undefined, skipping test")
|
|
|
+ t.Skip()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // write out config to temp file using contents of env vars
|
|
|
+ ddConf := fmt.Sprintf(`{"datadog_site": "%s", "datadog_api_key": "%s", "datadog_app_key": "%s"`, ddSite, ddApiKey, ddAppKey)
|
|
|
+
|
|
|
+ // set up custom cost request
|
|
|
+ file, err := os.CreateTemp(pluginConfigDir, "datadog_config.json")
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("could not create temp config dir: %v", err)
|
|
|
+ }
|
|
|
+ defer os.Remove(file.Name())
|
|
|
+
|
|
|
+ err = os.WriteFile(file.Name(), []byte(ddConf), fs.FileMode(os.O_RDWR))
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("could not write file: %v", err)
|
|
|
+ }
|
|
|
+}
|