Sfoglia il codice sorgente

debugging integration

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
Alex Meijer 2 anni fa
parent
commit
fd4ad1120d

+ 47 - 69
pkg/customcost/ingestor.go

@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/hashicorp/go-plugin"
+
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/model"
 	"github.com/opencost/opencost/core/pkg/opencost"
@@ -31,10 +32,8 @@ type IngestorStatus struct {
 type CustomCostIngestorConfig struct {
 	MonthToDateRunInterval               int
 	HourlyDuration, DailyDuration        time.Duration
-	QueryWindow                          time.Duration
-	RunWindow                            time.Duration
+	DailyQueryWindow, HourlyQueryWindow  time.Duration
 	PluginConfigDir, PluginExecutableDir string
-	RefreshRate                          time.Duration
 }
 
 // DefaultIngestorConfiguration retrieves an CustomCostIngestorConfig from env variables
@@ -43,7 +42,8 @@ func DefaultIngestorConfiguration() CustomCostIngestorConfig {
 		DailyDuration:          timeutil.Day * time.Duration(env.GetDataRetentionDailyResolutionDays()),
 		HourlyDuration:         time.Hour * time.Duration(env.GetDataRetentionHourlyResolutionHours()),
 		MonthToDateRunInterval: env.GetCloudCostMonthToDateInterval(),
-		QueryWindow:            timeutil.Day * time.Duration(env.GetCloudCostQueryWindowDays()),
+		DailyQueryWindow:       timeutil.Day * time.Duration(env.GetCustomCostQueryWindowDays()),
+		HourlyQueryWindow:      time.Hour * time.Duration(env.GetCustomCostQueryWindowHours()),
 		PluginConfigDir:        env.GetPluginConfigDir(),
 		PluginExecutableDir:    env.GetPluginExecutableDir(),
 	}
@@ -63,12 +63,13 @@ type CustomCostIngestor struct {
 	isStopping   atomic.Bool
 	exitBuildCh  chan string
 	exitRunCh    chan string
-	plugins      map[string]*plugin.ClientProtocol
+	plugins      map[string]*plugin.Client
 	resolution   time.Duration
+	refreshRate  time.Duration
 }
 
 // NewIngestor is an initializer for ingestor
-func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.ClientProtocol, res time.Duration) (*CustomCostIngestor, error) {
+func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client, res time.Duration) (*CustomCostIngestor, error) {
 	if repo == nil {
 		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
 	}
@@ -94,13 +95,14 @@ func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Reposi
 		coverage:     map[string]opencost.Window{},
 		plugins:      plugins,
 		resolution:   res,
+		refreshRate:  res,
 	}, nil
 }
 
 func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
 	var targets []opencost.Window
-	if ing.resolution == time.Hour {
-		oldestDailyDate := time.Now().UTC().Add(-1 * ing.config.DailyDuration)
+	if ing.resolution == timeutil.Day {
+		oldestDailyDate := time.Now().UTC().Add(-1 * ing.config.DailyDuration).Truncate(timeutil.Day)
 		if !oldestDailyDate.After(start) {
 			windows, err := opencost.GetWindows(start, end, timeutil.Day)
 			if err != nil {
@@ -111,7 +113,7 @@ func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
 		}
 	} else {
 
-		oldestHourlyDate := time.Now().UTC().Add(-1 * ing.config.HourlyDuration)
+		oldestHourlyDate := time.Now().UTC().Add(-1 * ing.config.HourlyDuration).Truncate(time.Hour)
 		if !oldestHourlyDate.After(start) {
 			windows, err := opencost.GetWindows(start, end, time.Hour)
 			if err != nil {
@@ -135,7 +137,7 @@ func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
 			}
 		}
 		if !allPluginsHave {
-			ing.BuildWindow(start, end)
+			ing.BuildWindow(*window.Start(), *window.End())
 		} else {
 			for domain := range ing.plugins {
 				ing.expandCoverage(window, domain)
@@ -166,9 +168,16 @@ func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain st
 		log.Errorf("could not find plugin client for plugin %s. Did you initialize the plugin correctly?", domain)
 		return
 	}
-	pluginClientDeref := *pluginClient
+
+	// connect the client
+	rpcClient, err := pluginClient.Client()
+	if err != nil {
+		log.Errorf("error connecting client for plugin %s: %v", domain, err)
+		return
+	}
+
 	// Request the plugin
-	raw, err := pluginClientDeref.Dispense("CustomCostSource")
+	raw, err := rpcClient.Dispense("CustomCostSource")
 	if err != nil {
 		log.Errorf("error creating new plugin client for plugin %s: %v", domain, err)
 		return
@@ -214,9 +223,9 @@ func (ing *CustomCostIngestor) Start(rebuild bool) {
 
 	// Build the store once, advancing backward in time from the earliest
 	// point of coverage.
-	go ing.build(rebuild)
+	//go ing.build(rebuild)
 
-	go ing.run()
+	ing.run()
 
 }
 
@@ -262,7 +271,7 @@ func (ing *CustomCostIngestor) Status() IngestorStatus {
 	return IngestorStatus{
 		Created:  ing.creationTime,
 		LastRun:  ing.lastRun,
-		NextRun:  ing.lastRun.Add(ing.config.RefreshRate).UTC(),
+		NextRun:  ing.lastRun.Add(ing.refreshRate).UTC(),
 		Runs:     ing.runs,
 		Coverage: ing.coverage,
 	}
@@ -270,60 +279,24 @@ func (ing *CustomCostIngestor) Status() IngestorStatus {
 
 func (ing *CustomCostIngestor) build(rebuild bool) {
 	defer errors.HandlePanic()
-
-	// Profile the full Duration of the build time
-	buildStart := time.Now()
-
-	targetDur := ing.config.DailyDuration
+	e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
+	s := e.Add(-ing.config.DailyDuration)
 	if ing.resolution == time.Hour {
-		targetDur = ing.config.HourlyDuration
+		s = e.Add(-ing.config.HourlyDuration)
 	}
-	// Build as far back as the configures build Duration
-	limit := opencost.RoundBack(time.Now().UTC().Add(-1*targetDur), ing.resolution)
-
-	queryWindowStr := timeutil.FormatStoreResolution(ing.config.QueryWindow)
-	log.Infof("CustomCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, limit.String(), queryWindowStr)
-
-	// Start with a window of the configured Duration and ending on the given
-	// start time. Build windows repeating until the window reaches the
-	// given limit time
-
-	// Round end times back to nearest Resolution points in the past,
-	// querying for exactly one interval
-	e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
-	s := e.Add(-ing.config.QueryWindow)
-
-	// Continue until limit is reached
-	for limit.Before(e) {
-		// If exit instruction is received, log and return
-		select {
-		case <-ing.exitBuildCh:
-			log.Debugf("CustomCost[%s][%s]: ingestor: build[%s]: exiting", ing.key, ing.resolution, ing.runID)
-			return
-		default:
-		}
-
-		// Profile the current build step
-		stepStart := time.Now()
-
-		// if rebuild is not specified then check for existing coverage on window
-		if rebuild {
-			ing.BuildWindow(s, e)
-		} else {
-			ing.LoadWindow(s, e)
-		}
+	// Profile the full Duration of the build time
+	buildStart := time.Now()
 
-		log.Infof("CustomCost[%s]: ingestor: build[%s]:  %s in %v", ing.key, ing.runID, opencost.NewClosedWindow(s, e), time.Since(stepStart))
+	log.Infof("CustomCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, s, ing.resolution)
 
-		// Shift to next QueryWindow
-		s = s.Add(-ing.config.QueryWindow)
-		if s.Before(limit) {
-			s = limit
-		}
-		e = e.Add(-ing.config.QueryWindow)
+	// if rebuild is not specified then check for existing coverage on window
+	if rebuild {
+		ing.BuildWindow(s, e)
+	} else {
+		ing.LoadWindow(s, e)
 	}
 
-	log.Infof(fmt.Sprintf("CusomCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart)))
+	log.Infof(fmt.Sprintf("CustomCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart)))
 
 	// In order to be able to Stop, we have to wait on an exit message
 	// here
@@ -363,12 +336,17 @@ func (ing *CustomCostIngestor) run() {
 
 		// Start from the last covered time, minus the RunWindow
 		start := ing.lastRun
-		start = start.Add(-ing.config.RunWindow)
+		start = start.Add(-ing.resolution)
+
+		queryWin := ing.config.DailyQueryWindow
+		if ing.resolution == time.Hour {
+			queryWin = ing.config.HourlyQueryWindow
+		}
 
 		// Round start time back to the nearest Resolution point in the past from the
 		// last update to the QueryWindow
 		s := opencost.RoundBack(start.UTC(), ing.resolution)
-		e := s.Add(ing.config.QueryWindow)
+		e := s.Add(queryWin)
 
 		// Start with a window of the configured Duration and starting on the given
 		// start time. Do the following, repeating until the window reaches the
@@ -379,10 +357,10 @@ func (ing *CustomCostIngestor) run() {
 			profStart := time.Now()
 			ing.BuildWindow(s, e)
 
-			log.Debugf("CUstomCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
+			log.Debugf("CustomCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
 
-			s = s.Add(ing.config.QueryWindow)
-			e = e.Add(ing.config.QueryWindow)
+			s = s.Add(queryWin)
+			e = e.Add(queryWin)
 			// prevent builds into the future
 			if e.After(time.Now().UTC()) {
 				e = opencost.RoundForward(time.Now().UTC(), ing.resolution)
@@ -393,7 +371,7 @@ func (ing *CustomCostIngestor) run() {
 
 		ing.runs++
 
-		ticker.TickIn(ing.config.RefreshRate)
+		ticker.TickIn(ing.refreshRate)
 	}
 }
 

+ 8 - 12
pkg/customcost/pipelineservice.go

@@ -5,6 +5,7 @@ import (
 	"net/http"
 	"os"
 	"os/exec"
+	"runtime"
 	"strings"
 	"time"
 
@@ -27,7 +28,7 @@ type PipelineService struct {
 	hourlyStore, dailyStore       Repository
 }
 
-func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.ClientProtocol, error) {
+func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.Client, error) {
 
 	pluginNames := map[string]string{}
 	// scan plugin config directory for all file names
@@ -57,7 +58,7 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 	configs := map[string]*plugin.ClientConfig{}
 	// set up the client config
 	for name, config := range pluginNames {
-		if _, err := os.Stat(execDir + "/" + name + ".ocplugin." + version.Architecture); err != nil {
+		if _, err := os.Stat(execDir + "/" + name + ".ocplugin." + runtime.GOOS + "." + version.Architecture); err != nil {
 			msg := fmt.Sprintf("error reading executable for %s plugin. Plugin executables must be in %s and have name format <plugin name>.ocplugin.<opencost binary archtecture (arm64 or amd64)>", name, execDir)
 			log.Errorf(msg)
 			return nil, fmt.Errorf(msg)
@@ -82,23 +83,18 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 		configs[name] = &plugin.ClientConfig{
 			HandshakeConfig: handshakeConfig,
 			Plugins:         pluginMap,
-			Cmd:             exec.Command(execDir+"/"+name+".ocplugin."+version.Architecture, config),
+			Cmd:             exec.Command(execDir+"/"+name+".ocplugin."+runtime.GOOS+"."+version.Architecture, config),
 			Logger:          logger,
 		}
 	}
 
-	plugins := map[string]*plugin.ClientProtocol{}
+	plugins := map[string]*plugin.Client{}
 
 	for name, config := range configs {
 		client := plugin.NewClient(config)
-		// connect the client
-		rpcClient, err := client.Client()
-		if err != nil {
-			log.Errorf("error connecting client for plugin %s: %v", name, err)
-			return nil, fmt.Errorf("error connecting client for plugin %s: %v", name, err)
-		}
+
 		// add the connected, initialized client to the ma
-		plugins[name] = &rpcClient
+		plugins[name] = client
 	}
 
 	return plugins, nil
@@ -125,7 +121,7 @@ func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostInge
 		return nil, err
 	}
 
-	dailyIngestor.Start(false)
+	//dailyIngestor.Start(false)
 	return &PipelineService{
 		hourlyIngestor: hourlyIngestor,
 		hourlyStore:    hourlyrepo,

+ 11 - 11
pkg/customcost/pipelineservice_test.go

@@ -3,35 +3,35 @@ package customcost
 import (
 	"fmt"
 	"io"
-	"io/fs"
 	"net/http"
 	"os"
+	"runtime"
 	"testing"
 	"time"
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
+	"github.com/opencost/opencost/core/pkg/version"
 )
 
-const ddPluginURL = "https://github.com/opencost/opencost-plugins/releases/download/v0.0.1/datadog.ocplugin.amd64"
-
 func TestPipelineService(t *testing.T) {
 	// establish temporary test assets dir
 	dir, err := os.MkdirTemp("", "ocplugin-*")
 	if err != nil {
 		t.Fatalf("error creating temp dir: %v", err)
 	}
+	defer os.Remove(dir)
 
-	err = os.MkdirAll(dir+"/config", fs.FileMode(os.O_RDWR))
+	err = os.MkdirAll(dir+"/config", 0777)
 	if err != nil {
 		t.Fatalf("error creating temp config dir: %v", err)
 	}
 
-	err = os.MkdirAll(dir+"/executable", fs.FileMode(os.O_RDWR))
+	err = os.MkdirAll(dir+"/executable", 0777)
 	if err != nil {
 		t.Fatalf("error creating temp exec dir: %v", err)
 	}
-
+	version.Architecture = runtime.GOARCH
 	// write DD secrets to config files
 	// write config file to temp dir
 	writeDDConfig(dir+"/config", t)
@@ -41,7 +41,7 @@ func TestPipelineService(t *testing.T) {
 	if err != nil {
 		t.Fatalf("error setting config dir env var: %v", err)
 	}
-	err = os.Setenv("PLUGIN_CONFIG_DIR", dir+"/executable")
+	err = os.Setenv("PLUGIN_EXECUTABLE_DIR", dir+"/executable")
 	if err != nil {
 		t.Fatalf("error setting config dir env var: %v", err)
 	}
@@ -117,7 +117,8 @@ func TestPipelineService(t *testing.T) {
 }
 
 func downloadLatestPluginExec(dirName string, t *testing.T) {
-	out, err := os.OpenFile(dirName+"datadog.ocplugin.amd64", 0755, 0755)
+	ddPluginURL := "https://github.com/opencost/opencost-plugins/releases/download/v0.0.2/datadog.ocplugin." + runtime.GOOS + "." + version.Architecture
+	out, err := os.OpenFile(dirName+"/datadog.ocplugin."+runtime.GOOS+"."+version.Architecture, 0755|os.O_CREATE, 0755)
 	if err != nil {
 		t.Fatalf("error creating executable file: %v", err)
 	}
@@ -159,16 +160,15 @@ func writeDDConfig(pluginConfigDir string, t *testing.T) {
 	}
 
 	// write out config to temp file using contents of env vars
-	ddConf := fmt.Sprintf(`{"datadog_site": "%s", "datadog_api_key": "%s", "datadog_app_key": "%s"`, ddSite, ddApiKey, ddAppKey)
+	ddConf := fmt.Sprintf(`{"datadog_site": "%s", "datadog_api_key": "%s", "datadog_app_key": "%s"}`, ddSite, ddApiKey, ddAppKey)
 
 	// set up custom cost request
 	file, err := os.CreateTemp(pluginConfigDir, "datadog_config.json")
 	if err != nil {
 		t.Fatalf("could not create temp config dir: %v", err)
 	}
-	defer os.Remove(file.Name())
 
-	err = os.WriteFile(file.Name(), []byte(ddConf), fs.FileMode(os.O_RDWR))
+	err = os.WriteFile(file.Name(), []byte(ddConf), 0777)
 	if err != nil {
 		t.Fatalf("could not write file: %v", err)
 	}

+ 4 - 0
pkg/env/costmodelenv.go

@@ -677,6 +677,10 @@ func GetCloudCostQueryWindowDays() int64 {
 	return env.GetInt64(CloudCostQueryWindowDaysEnvVar, 7)
 }
 
+func GetCustomCostQueryWindowHours() int64 {
+	return env.GetInt64(CustomCostQueryWindowDaysEnvVar, 1)
+}
+
 func GetCustomCostQueryWindowDays() int64 {
 	return env.GetInt64(CustomCostQueryWindowDaysEnvVar, 7)
 }