| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- package customcost
- import (
- "fmt"
- "io"
- "net/http"
- "os"
- "runtime"
- "strings"
- "testing"
- "time"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/util/timeutil"
- )
- func TestPipelineService(t *testing.T) {
- // establish temporary test assets dir
- dir := t.TempDir()
- err := os.MkdirAll(dir+"/config", 0777)
- if err != nil {
- t.Fatalf("error creating temp config dir: %v", err)
- }
- err = os.MkdirAll(dir+"/executable", 0777)
- if err != nil {
- t.Fatalf("error creating temp exec dir: %v", err)
- }
- // write DD secrets to config files
- // write config file to temp dir
- writeDDConfig(dir+"/config", t)
- // set env vars for plugin config and executable
- err = os.Setenv("PLUGIN_CONFIG_DIR", dir+"/config")
- if err != nil {
- t.Fatalf("error setting config dir env var: %v", err)
- }
- err = os.Setenv("PLUGIN_EXECUTABLE_DIR", dir+"/executable")
- if err != nil {
- t.Fatalf("error setting config dir env var: %v", err)
- }
- // download amd plugin, store in tmp executable dir
- downloadLatestPluginExec(dir+"/executable", t)
- // set up repos
- hourlyRepo := NewMemoryRepository()
- dailyRepo := NewMemoryRepository()
- // set up ingestor config
- config := DefaultIngestorConfiguration()
- config.DailyDuration = 7 * timeutil.Day
- config.HourlyDuration = 16 * time.Hour
- pipeline, err := NewPipelineService(hourlyRepo, dailyRepo, config)
- if err != nil {
- t.Fatalf("error starting pipeline: %v", err)
- }
- // wait until coverage is complete, then stop ingestor
- ingestionComplete := false
- maxLoops := 10
- loopCount := 0
- for !ingestionComplete && loopCount < maxLoops {
- status := pipeline.Status()
- log.Debugf("got status: %v", status)
- coverageHourly, foundHourly := status.CoverageHourly["datadog"]
- coverageDaily, foundDaily := status.CoverageDaily["datadog"]
- if foundHourly && foundDaily {
- // check coverage
- minTime := time.Now().UTC().Add(-6 * timeutil.Day)
- maxTime := time.Now().UTC().Add(-3 * time.Hour)
- if coverageDaily.Start().Before(minTime) && coverageHourly.End().After(maxTime) {
- log.Infof("good coverage, breaking out of loop")
- ingestionComplete = true
- break
- } else {
- log.Infof("coverage not within range. Looking for coverage %v to %v, but current coverage is %v/%v", minTime, maxTime, coverageDaily, coverageHourly)
- }
- } else {
- log.Debugf("no coverage info ready yet for datadog")
- }
- log.Infof("sleeping 10s...")
- time.Sleep(10 * time.Second)
- loopCount++
- }
- if !ingestionComplete {
- t.Fatal("ingestor never completed within allocated time")
- }
- pipeline.hourlyIngestor.Stop()
- pipeline.dailyIngestor.Stop()
- // inspect data from yesterday
- targetTime := time.Now().UTC().Add(-1 * timeutil.Day).Truncate(timeutil.Day)
- log.Infof("querying for data with window start: %v", targetTime)
- // check for presence of hosts in DD response
- ddCosts, err := dailyRepo.Get(targetTime, "datadog")
- if err != nil {
- t.Fatalf("error getting results for targetTime")
- }
- foundInfraHosts := false
- for _, cost := range ddCosts.Costs {
- if cost.ResourceType == "infra_hosts" {
- foundInfraHosts = true
- }
- }
- if !foundInfraHosts {
- t.Fatal("expecting infra_hosts costs in daily response")
- }
- // query data from 4 hours ago (hourly)
- targetTime = time.Now().UTC().Add(-13 * time.Hour).Truncate(time.Hour)
- log.Infof("querying for data with window start: %v", targetTime)
- // check for presence of hosts in DD response
- ddCosts, err = hourlyRepo.Get(targetTime, "datadog")
- if err != nil {
- t.Fatalf("error getting results for targetTime")
- }
- foundInfraHosts = false
- for _, cost := range ddCosts.Costs {
- if cost.ResourceType == "infra_hosts" {
- foundInfraHosts = true
- }
- }
- if !foundInfraHosts {
- t.Fatal("expecting infra_hosts costs in hourly response")
- }
- }
- func downloadLatestPluginExec(dirName string, t *testing.T) {
- ddPluginURL := "https://github.com/opencost/opencost-plugins/releases/download/v0.0.3/datadog.ocplugin." + runtime.GOOS + "." + runtime.GOARCH
- out, err := os.OpenFile(dirName+"/datadog.ocplugin."+runtime.GOOS+"."+runtime.GOARCH, 0755|os.O_CREATE, 0755)
- if err != nil {
- t.Fatalf("error creating executable file: %v", err)
- }
- resp, err := http.Get(ddPluginURL)
- if err != nil {
- t.Fatalf("error downloading: %v", err)
- }
- defer resp.Body.Close()
- defer out.Close()
- _, err = io.Copy(out, resp.Body)
- if err != nil {
- t.Fatalf("error copying: %v", err)
- }
- }
- func TestGetRegisteredPlugins_UnreadableConfigDir(t *testing.T) {
- testCases := []struct {
- name string
- setupFunc func() (configDir string, execDir string, cleanup func())
- expectError bool
- errorContains string
- }{
- {
- name: "non-existent config directory",
- setupFunc: func() (string, string, func()) {
- execDir := t.TempDir()
- return "/non/existent/path", execDir, func() {}
- },
- expectError: true,
- errorContains: "failed to read plugin config directory",
- },
- {
- name: "config directory is a file not a directory",
- setupFunc: func() (string, string, func()) {
- tmpDir := t.TempDir()
- execDir := t.TempDir()
- // Create a file instead of a directory
- configFile := tmpDir + "/not-a-directory"
- err := os.WriteFile(configFile, []byte("test"), 0644)
- if err != nil {
- t.Fatalf("failed to create test file: %v", err)
- }
- return configFile, execDir, func() {}
- },
- expectError: true,
- errorContains: "failed to read plugin config directory",
- },
- {
- name: "empty valid config directory",
- setupFunc: func() (string, string, func()) {
- configDir := t.TempDir()
- execDir := t.TempDir()
- return configDir, execDir, func() {}
- },
- expectError: false,
- errorContains: "",
- },
- }
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- configDir, execDir, cleanup := tc.setupFunc()
- defer cleanup()
- _, err := getRegisteredPlugins(configDir, execDir)
- if tc.expectError {
- if err == nil {
- t.Errorf("expected error but got nil")
- } else if tc.errorContains != "" && !strings.Contains(err.Error(), tc.errorContains) {
- t.Errorf("expected error to contain %q, but got: %v", tc.errorContains, err)
- }
- } else {
- if err != nil {
- t.Errorf("expected no error but got: %v", err)
- }
- }
- })
- }
- }
- func writeDDConfig(pluginConfigDir string, t *testing.T) {
- // read necessary env vars. If any are missing, log warning and skip test
- ddSite := os.Getenv("DD_SITE")
- ddApiKey := os.Getenv("DD_API_KEY")
- ddAppKey := os.Getenv("DD_APPLICATION_KEY")
- if ddSite == "" {
- log.Warnf("DD_SITE undefined, this needs to have the URL of your DD instance, skipping test")
- t.Skip()
- return
- }
- if ddApiKey == "" {
- log.Warnf("DD_API_KEY undefined, skipping test")
- t.Skip()
- return
- }
- if ddAppKey == "" {
- log.Warnf("DD_APPLICATION_KEY undefined, skipping test")
- t.Skip()
- return
- }
- // write out config to temp file using contents of env vars
- ddConf := fmt.Sprintf(`{"datadog_site": "%s", "datadog_api_key": "%s", "datadog_app_key": "%s"}`, ddSite, ddApiKey, ddAppKey)
- // set up custom cost request
- file, err := os.CreateTemp(pluginConfigDir, "datadog_config.json")
- if err != nil {
- t.Fatalf("could not create temp config dir: %v", err)
- }
- err = os.WriteFile(file.Name(), []byte(ddConf), 0777)
- if err != nil {
- t.Fatalf("could not write file: %v", err)
- }
- }
- // TestPipelineService_Stop_Nil ensures nil PipelineService is safe
- func TestPipelineService_Stop_Nil(t *testing.T) {
- var ps *PipelineService
- ps.Stop()
- t.Log("Nil PipelineService handled safely")
- }
- // TestPipelineService_Stop_WithNilIngestors ensures nil ingestors are handled
- func TestPipelineService_Stop_WithNilIngestors(t *testing.T) {
- ps := &PipelineService{
- hourlyIngestor: nil,
- dailyIngestor: nil,
- domains: []string{},
- }
- ps.Stop()
- t.Log("Nil ingestors handled safely")
- }
- // TestPipelineService_Stop_PartialNilIngestors ensures partial nil is handled
- func TestPipelineService_Stop_PartialNilIngestors(t *testing.T) {
- hourly := &CustomCostIngestor{
- key: "hourly",
- plugins: make(map[string]pluginConnector),
- }
- ps := &PipelineService{
- hourlyIngestor: hourly,
- dailyIngestor: nil,
- domains: []string{},
- }
- ps.Stop()
- t.Log("Partial nil ingestors handled safely")
- }
- // TestPipelineService_Stop_ShutdownLogging verifies logging during shutdown
- func TestPipelineService_Stop_ShutdownLogging(t *testing.T) {
- ps := &PipelineService{
- hourlyIngestor: &CustomCostIngestor{
- key: "hourly",
- plugins: make(map[string]pluginConnector),
- },
- dailyIngestor: &CustomCostIngestor{
- key: "daily",
- plugins: make(map[string]pluginConnector),
- },
- domains: []string{},
- }
- ps.Stop()
- time.Sleep(50 * time.Millisecond)
- t.Log("Pipeline service logged shutdown progress")
- }
- func TestPipelineService_Stop_NilReceiver(t *testing.T) {
- var ps *PipelineService
- ps.Stop() // should not panic on nil receiver
- }
- func TestPipelineService_Stop_NilIngestors(t *testing.T) {
- ps := &PipelineService{}
- ps.Stop() // should not panic when ingestors are nil
- }
- func TestPipelineService_Stop_WithIngestors(t *testing.T) {
- hourly := &CustomCostIngestor{plugins: map[string]pluginConnector{}}
- daily := &CustomCostIngestor{plugins: map[string]pluginConnector{}}
- ps := &PipelineService{
- hourlyIngestor: hourly,
- dailyIngestor: daily,
- }
- ps.Stop()
- }
- func TestPipelineService_Stop_OnlyHourlyIngestor(t *testing.T) {
- ps := &PipelineService{
- hourlyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}},
- }
- ps.Stop() // should not panic when dailyIngestor is nil
- }
- func TestPipelineService_Stop_OnlyDailyIngestor(t *testing.T) {
- ps := &PipelineService{
- dailyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}},
- }
- ps.Stop() // should not panic when hourlyIngestor is nil
- }
|