Explorar o código

Merge pull request #2610 from ameijer/atm/plugin-ingestion

Atm/plugin ingestion
Cliff Colvin %!s(int64=2) %!d(string=hai) anos
pai
achega
c35bef48fc

+ 240 - 421
pkg/customcost/ingestor.go

@@ -1,38 +1,40 @@
 package customcost
 
 import (
-	"encoding/json"
 	"fmt"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
 
-	"github.com/davecgh/go-spew/spew"
 	"github.com/hashicorp/go-plugin"
+
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/model"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	ocplugin "github.com/opencost/opencost/core/pkg/plugin"
+	"github.com/opencost/opencost/core/pkg/util/stringutil"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/env"
+	"github.com/opencost/opencost/pkg/errors"
 )
 
 // IngestorStatus includes diagnostic values for a given Ingestor
 type IngestorStatus struct {
-	Created  time.Time
-	LastRun  time.Time
-	NextRun  time.Time
-	Runs     int
-	Coverage opencost.Window
+	Created     time.Time
+	LastRun     time.Time
+	NextRun     time.Time
+	Runs        int
+	Coverage    map[string]opencost.Window
+	RefreshRate time.Duration
 }
 
 // CustomCost IngestorConfig is a configuration struct for an Ingestor
 type CustomCostIngestorConfig struct {
 	MonthToDateRunInterval               int
 	HourlyDuration, DailyDuration        time.Duration
-	QueryWindow                          time.Duration
-	RunWindow                            time.Duration
+	DailyQueryWindow, HourlyQueryWindow  time.Duration
 	PluginConfigDir, PluginExecutableDir string
-	RefreshRate                          time.Duration
 }
 
 // DefaultIngestorConfiguration retrieves an CustomCostIngestorConfig from env variables
@@ -41,8 +43,10 @@ func DefaultIngestorConfiguration() CustomCostIngestorConfig {
 		DailyDuration:          timeutil.Day * time.Duration(env.GetDataRetentionDailyResolutionDays()),
 		HourlyDuration:         time.Hour * time.Duration(env.GetDataRetentionHourlyResolutionHours()),
 		MonthToDateRunInterval: env.GetCloudCostMonthToDateInterval(),
-		QueryWindow:            timeutil.Day * time.Duration(env.GetCloudCostQueryWindowDays()),
+		DailyQueryWindow:       timeutil.Day * time.Duration(env.GetCustomCostQueryWindowDays()),
+		HourlyQueryWindow:      time.Hour * time.Duration(env.GetCustomCostQueryWindowHours()),
 		PluginConfigDir:        env.GetPluginConfigDir(),
+		PluginExecutableDir:    env.GetPluginExecutableDir(),
 	}
 }
 
@@ -54,116 +58,175 @@ type CustomCostIngestor struct {
 	lastRun      time.Time
 	runs         int
 	creationTime time.Time
-	coverage     opencost.Window
+	coverage     map[string]opencost.Window
 	coverageLock sync.Mutex
 	isRunning    atomic.Bool
 	isStopping   atomic.Bool
 	exitBuildCh  chan string
 	exitRunCh    chan string
-	plugins      map[string]*plugin.ClientProtocol
+	plugins      map[string]*plugin.Client
+	resolution   time.Duration
+	refreshRate  time.Duration
 }
 
 // NewIngestor is an initializer for ingestor
-func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.ClientProtocol) (*CustomCostIngestor, error) {
+func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client, res time.Duration) (*CustomCostIngestor, error) {
 	if repo == nil {
 		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
 	}
 	if ingestorConfig == nil {
-		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: integration connot be nil")
+		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: config connot be nil")
+	}
+	key := ""
+
+	for name := range plugins {
+		key += "," + name
 	}
 
+	key = strings.TrimPrefix(key, ",")
+
 	now := time.Now().UTC()
-	midnight := opencost.RoundForward(now, timeutil.Day)
+
 	return &CustomCostIngestor{
+		key:          key,
 		config:       ingestorConfig,
 		repo:         repo,
 		creationTime: now,
 		lastRun:      now,
-		coverage:     opencost.NewClosedWindow(midnight, midnight),
+		coverage:     map[string]opencost.Window{},
 		plugins:      plugins,
+		resolution:   res,
+		refreshRate:  res,
 	}, nil
 }
 
 func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
-	windows, err := opencost.GetWindows(start, end, timeutil.Day)
-	if err != nil {
-		log.Errorf("CloudCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
-		return
+	var targets []opencost.Window
+	if ing.resolution == timeutil.Day {
+		oldestDailyDate := time.Now().UTC().Add(-1 * ing.config.DailyDuration).Truncate(timeutil.Day)
+		if !oldestDailyDate.After(start) {
+			windows, err := opencost.GetWindows(start, end, timeutil.Day)
+			if err != nil {
+				log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
+				return
+			}
+			targets = windows
+		}
+	} else {
+
+		oldestHourlyDate := time.Now().UTC().Add(-1 * ing.config.HourlyDuration).Truncate(time.Hour)
+		if !oldestHourlyDate.After(start) {
+			windows, err := opencost.GetWindows(start, end, time.Hour)
+			if err != nil {
+				log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
+				return
+			}
+			targets = windows
+		}
 	}
 
-	for _, window := range windows {
-		has, err2 := ing.repo.Has(*window.Start(), ing.key)
-		if err2 != nil {
-			log.Errorf("CloudCost[%s]: ingestor: error when loading window: %s", ing.key, err2.Error())
+	for _, window := range targets {
+		allPluginsHave := true
+		for domain := range ing.plugins {
+			has, err2 := ing.repo.Has(*window.Start(), domain)
+			if err2 != nil {
+				log.Errorf("CustomCost[%s]: ingestor: error when loading window for plugin %s: %s", ing.key, domain, err2.Error())
+			}
+			if !has {
+				allPluginsHave = false
+				break
+			}
 		}
-		if !has {
-			ing.BuildWindow(start, end)
-			return
+		if !allPluginsHave {
+			ing.BuildWindow(*window.Start(), *window.End())
+		} else {
+			for domain := range ing.plugins {
+				ing.expandCoverage(window, domain)
+			}
+			log.Debugf("CustomCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
 		}
-		ing.expandCoverage(window)
-		log.Debugf("CloudCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
 	}
 
 }
 
 func (ing *CustomCostIngestor) BuildWindow(start, end time.Time) {
-	log.Infof("ingestor: building window %s", opencost.NewWindow(&start, &end))
-
-	// // build customCostRequest
-	// // make RPC call via plugin
-
-	// // loop through each customCostResponse, adding
-	// for _, ccs := range ccsr.CloudCostSets {
-	// 	log.Debugf("BuildWindow[%s]: GetCloudCost: writing cloud costs for window %s: %d", ccs.Integration, ccs.Window, len(ccs.CloudCosts))
-	// 	err2 := ing.repo.Put(ccs)
-	// 	if err2 != nil {
-	// 		log.Errorf("CloudCost[%s]: ingestor: failed to save Cloud Cost Set with window %s: %s", ing.key, ccs.GetWindow().String(), err2.Error())
-	// 	}
-	// 	ing.expandCoverage(ccs.Window)
-	// }
-}
 
-func (ing *CustomCostIngestor) Start(rebuild bool) {
+	for domain := range ing.plugins {
+		ing.buildSingleDomain(start, end, domain)
+	}
+}
 
-	// // If already running, log that and return.
-	// if !ing.isRunning.CompareAndSwap(false, true) {
-	// 	log.Infof("CloudCost: ingestor: is already running")
-	// 	return
-	// }
+func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain string) {
+	target := opencost.NewWindow(&start, &end)
+	req := model.CustomCostRequest{
+		TargetWindow: &target,
+		Resolution:   ing.resolution,
+	}
+	log.Infof("ingestor: building window %s for plugin %s", opencost.NewWindow(&start, &end), domain)
+	// make RPC call via plugin
+	pluginClient, found := ing.plugins[domain]
+	if !found {
+		log.Errorf("could not find plugin client for plugin %s. Did you initialize the plugin correctly?", domain)
+		return
+	}
 
-	// ing.runID = stringutil.RandSeq(5)
+	// connect the client
+	rpcClient, err := pluginClient.Client()
+	if err != nil {
+		log.Errorf("error connecting client for plugin %s: %v", domain, err)
+		return
+	}
 
-	// ing.exitBuildCh = make(chan string)
-	// ing.exitRunCh = make(chan string)
+	// Request the plugin
+	raw, err := rpcClient.Dispense("CustomCostSource")
+	if err != nil {
+		log.Errorf("error creating new plugin client for plugin %s: %v", domain, err)
+		return
+	}
 
-	// // Build the store once, advancing backward in time from the earliest
-	// // point of coverage.
-	// go ing.build(rebuild)
+	custCostSrc := raw.(ocplugin.CustomCostSource)
 
-	// go ing.run()
+	custCostResps := custCostSrc.GetCustomCosts(req)
+	// loop through each customCostResponse, adding to repo
+	for _, ccr := range custCostResps {
 
-	// TEMPORARY - load the repo with dummy data
-	var resps []*model.CustomCostResponse
-	err := json.Unmarshal([]byte(ddData), &resps)
-	if err != nil {
-		panic(err)
-	}
+		// check for errors in response
+		if len(ccr.Errors) > 0 {
+			for _, errResp := range ccr.Errors {
+				log.Errorf("error in getting custom costs for plugin %s: %v", domain, errResp)
+			}
+			log.Errorf("not adding any costs for window %v on plugin %s", req.TargetWindow, domain)
+			continue
+		}
+		log.Debugf("BuildWindow[%s]: GetCustomCost: writing custom costs for window %s: %d", domain, ccr.Window, len(ccr.Costs))
 
-	for _, resp := range resps {
-		err = ing.repo.Put(resp)
-		if err != nil {
-			panic(err)
+		err2 := ing.repo.Put(&ccr)
+		if err2 != nil {
+			log.Errorf("CustomCost[%s]: ingestor: failed to save Custom Cost Set with window %s: %s", domain, ccr.GetWindow().String(), err2.Error())
 		}
+
+		ing.expandCoverage(ccr.Window, domain)
 	}
-	//2024-02-27T01:00:00
-	target := time.Date(2024, 2, 27, 1, 0, 0, 0, time.UTC)
-	stored, err := ing.repo.Get(target, "datadog")
-	if err != nil {
-		panic(err)
+}
+
+func (ing *CustomCostIngestor) Start(rebuild bool) {
+
+	// If already running, log that and return.
+	if !ing.isRunning.CompareAndSwap(false, true) {
+		log.Infof("CustomCost: ingestor: is already running")
+		return
 	}
 
-	log.Debug("got stored object: ")
-	spew.Dump(stored)
+	ing.runID = stringutil.RandSeq(5)
+
+	ing.exitBuildCh = make(chan string)
+	ing.exitRunCh = make(chan string)
+
+	// Build the store once, advancing backward in time from the earliest
+	// point of coverage.
+	go ing.build(rebuild)
+
+	go ing.run()
 
 }
 
@@ -207,370 +270,126 @@ func (ing *CustomCostIngestor) Stop() {
 // Status returns an IngestorStatus that describes the current state of the ingestor
 func (ing *CustomCostIngestor) Status() IngestorStatus {
 	return IngestorStatus{
-		Created:  ing.creationTime,
-		LastRun:  ing.lastRun,
-		NextRun:  ing.lastRun.Add(ing.config.RefreshRate).UTC(),
-		Runs:     ing.runs,
-		Coverage: ing.coverage,
+		Created:     ing.creationTime,
+		LastRun:     ing.lastRun,
+		NextRun:     ing.lastRun.Add(ing.refreshRate).UTC(),
+		Runs:        ing.runs,
+		Coverage:    ing.coverage,
+		RefreshRate: ing.refreshRate,
 	}
 }
 
 func (ing *CustomCostIngestor) build(rebuild bool) {
-	// defer errors.HandlePanic()
-
-	// // Profile the full Duration of the build time
-	// buildStart := time.Now()
-
-	// // Build as far back as the configures build Duration
-	// limit := opencost.RoundBack(time.Now().UTC().Add(-ing.config.Duration), ing.config.Resolution)
-
-	// queryWindowStr := timeutil.FormatStoreResolution(ing.config.QueryWindow)
-	// log.Infof("CloudCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, limit.String(), queryWindowStr)
-
-	// // Start with a window of the configured Duration and ending on the given
-	// // start time. Build windows repeating until the window reaches the
-	// // given limit time
-
-	// // Round end times back to nearest Resolution points in the past,
-	// // querying for exactly one interval
-	// e := opencost.RoundBack(time.Now().UTC(), ing.config.Resolution)
-	// s := e.Add(-ing.config.QueryWindow)
-
-	// // Continue until limit is reached
-	// for limit.Before(e) {
-	// 	// If exit instruction is received, log and return
-	// 	select {
-	// 	case <-ing.exitBuildCh:
-	// 		log.Debugf("CloudCost[%s]: ingestor: build[%s]: exiting", ing.key, ing.runID)
-	// 		return
-	// 	default:
-	// 	}
-
-	// 	// Profile the current build step
-	// 	stepStart := time.Now()
-
-	// 	// if rebuild is not specified then check for existing coverage on window
-	// 	if rebuild {
-	// 		ing.BuildWindow(s, e)
-	// 	} else {
-	// 		ing.LoadWindow(s, e)
-	// 	}
-
-	// 	log.Infof("CloudCost[%s]: ingestor: build[%s]:  %s in %v", ing.key, ing.runID, opencost.NewClosedWindow(s, e), time.Since(stepStart))
-
-	// 	// Shift to next QueryWindow
-	// 	s = s.Add(-ing.config.QueryWindow)
-	// 	if s.Before(limit) {
-	// 		s = limit
-	// 	}
-	// 	e = e.Add(-ing.config.QueryWindow)
-	// }
-
-	// log.Infof(fmt.Sprintf("CloudCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart)))
-
-	// // In order to be able to Stop, we have to wait on an exit message
-	// // here
-	// <-ing.exitBuildCh
+	defer errors.HandlePanic()
+	e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
+	s := e.Add(-ing.config.DailyDuration)
+	if ing.resolution == time.Hour {
+		s = e.Add(-ing.config.HourlyDuration)
+	}
+	// Profile the full Duration of the build time
+	buildStart := time.Now()
+
+	log.Infof("CustomCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, s, ing.resolution)
+
+	// if rebuild is not specified then check for existing coverage on window
+	if rebuild {
+		ing.BuildWindow(s, e)
+	} else {
+		ing.LoadWindow(s, e)
+	}
+
+	log.Infof(fmt.Sprintf("CustomCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart)))
+
+	// In order to be able to Stop, we have to wait on an exit message
+	// here
+	<-ing.exitBuildCh
 
 }
 
 func (ing *CustomCostIngestor) Rebuild(domain string) error {
+	targetDur := ing.config.DailyDuration
+	if ing.resolution == time.Hour {
+		targetDur = ing.config.HourlyDuration
+	}
+	// Build as far back as the configures build Duration
+	limit := opencost.RoundBack(time.Now().UTC().Add(-1*targetDur), ing.resolution)
+	e := time.Now().UTC()
+
+	ing.buildSingleDomain(limit, e, domain)
 	return nil
 }
+
 func (ing *CustomCostIngestor) run() {
-	// defer errors.HandlePanic()
-
-	// ticker := timeutil.NewJobTicker()
-	// defer ticker.Close()
-	// ticker.TickIn(0)
-
-	// for {
-	// 	// If an exit instruction is received, break the run loop
-	// 	select {
-	// 	case <-ing.exitRunCh:
-	// 		log.Debugf("CloudCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
-	// 		return
-	// 	case <-ticker.Ch:
-	// 		// Wait for next tick
-	// 	}
-
-	// 	// Start from the last covered time, minus the RunWindow
-	// 	start := ing.lastRun
-	// 	start = start.Add(-ing.config.RunWindow)
-
-	// 	// Every Nth (determined by the MonthToDateRunInterval) run should be a month to date run. Where the start is
-	// 	// truncated to the beginning of its current month this can mean that early in a new month we will build all of
-	// 	// last month and the first few days of the current month.
-	// 	if ing.runs%ing.config.MonthToDateRunInterval == 0 {
-	// 		start = time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
-	// 		log.Infof("CloudCost[%s]: ingestor: Run[%s]: running month-to-date update starting at %s", ing.key, ing.runID, start.String())
-	// 	}
-
-	// 	// Round start time back to the nearest Resolution point in the past from the
-	// 	// last update to the QueryWindow
-	// 	s := opencost.RoundBack(start.UTC(), ing.config.Resolution)
-	// 	e := s.Add(ing.config.QueryWindow)
-
-	// 	// Start with a window of the configured Duration and starting on the given
-	// 	// start time. Do the following, repeating until the window reaches the
-	// 	// current time:
-	// 	// 1. Instruct builder to build window
-	// 	// 2. Move window forward one Resolution
-	// 	for time.Now().After(s) {
-	// 		profStart := time.Now()
-	// 		ing.BuildWindow(s, e)
-
-	// 		log.Debugf("CloudCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
-
-	// 		s = s.Add(ing.config.QueryWindow)
-	// 		e = e.Add(ing.config.QueryWindow)
-	// 		// prevent builds into the future
-	// 		if e.After(time.Now().UTC()) {
-	// 			e = opencost.RoundForward(time.Now().UTC(), ing.config.Resolution)
-	// 		}
-
-	// 	}
-	// 	ing.lastRun = time.Now().UTC()
-
-	// 	limit := opencost.RoundBack(time.Now().UTC(), ing.config.Resolution).Add(-ing.config.Duration)
-	// 	err := ing.repo.Expire(limit)
-	// 	if err != nil {
-	// 		log.Errorf("CloudCost: Ingestor: failed to expire Data: %s", err)
-	// 	}
-
-	// 	ing.coverageLock.Lock()
-	// 	ing.coverage = ing.coverage.ContractStart(limit)
-	// 	ing.coverageLock.Unlock()
-
-	// 	ing.runs++
-
-	// 	ticker.TickIn(ing.config.RefreshRate)
-	// }
+	defer errors.HandlePanic()
+
+	ticker := timeutil.NewJobTicker()
+	defer ticker.Close()
+	ticker.TickIn(0)
+
+	for {
+		// If an exit instruction is received, break the run loop
+		select {
+		case <-ing.exitRunCh:
+			log.Debugf("CustomCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
+			return
+		case <-ticker.Ch:
+			// Wait for next tick
+		}
+
+		// Start from the last covered time, minus the RunWindow
+		start := ing.lastRun
+		start = start.Add(-ing.resolution)
+
+		queryWin := ing.config.DailyQueryWindow
+		if ing.resolution == time.Hour {
+			queryWin = ing.config.HourlyQueryWindow
+		}
+
+		// Round start time back to the nearest Resolution point in the past from the
+		// last update to the QueryWindow
+		s := opencost.RoundBack(start.UTC(), ing.resolution)
+		e := s.Add(queryWin)
+
+		// Start with a window of the configured Duration and starting on the given
+		// start time. Do the following, repeating until the window reaches the
+		// current time:
+		// 1. Instruct builder to build window
+		// 2. Move window forward one Resolution
+		for time.Now().After(s) {
+			profStart := time.Now()
+			ing.BuildWindow(s, e)
+
+			log.Debugf("CustomCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
+
+			s = s.Add(queryWin)
+			e = e.Add(queryWin)
+			// prevent builds into the future
+			if e.After(time.Now().UTC()) {
+				e = opencost.RoundForward(time.Now().UTC(), ing.resolution)
+			}
+
+		}
+		ing.lastRun = time.Now().UTC()
+
+		ing.runs++
+
+		ticker.TickIn(ing.refreshRate)
+	}
 }
 
-func (ing *CustomCostIngestor) expandCoverage(window opencost.Window) {
+func (ing *CustomCostIngestor) expandCoverage(window opencost.Window, plugin string) {
 	if window.IsOpen() {
 		return
 	}
 	ing.coverageLock.Lock()
 	defer ing.coverageLock.Unlock()
 
-	coverage := ing.coverage.ExpandStart(*window.Start())
-	coverage = coverage.ExpandEnd(*window.End())
+	if _, hasCoverage := ing.coverage[plugin]; !hasCoverage {
+		ing.coverage[plugin] = window.Clone()
+	} else {
+		// expand existing coverage
+		ing.coverage[plugin] = ing.coverage[plugin].ExpandStart(*window.Start())
+		ing.coverage[plugin] = ing.coverage[plugin].ExpandEnd(*window.End())
+	}
 
-	ing.coverage = coverage
 }
-
-// temporary mock data
-const ddData = `
-[
-    {
-        "Metadata": {
-            "api_client_version": "v2"
-        },
-        "Costsource": "observability",
-        "Domain": "datadog",
-        "Version": "v1",
-        "Currency": "USD",
-        "Window": {
-            "start": "2024-02-27T00:00:00Z",
-            "end": "2024-02-27T01:00:00Z"
-        },
-        "Costs": [
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "350+ integrations, alerting, custom metrics \u0026 unlimited user accounts",
-                "ListCost": 90,
-                "ListUnitPrice": 18,
-                "ResourceName": "agent_host_count",
-                "ResourceType": "infra_hosts",
-                "Id": "4bba680574ac970cfba52a5edc5b2d44541319b365fbcc45023b51fbe2572373",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/agent_host_count",
-                "Window": {
-                    "start": "2024-02-27T00:00:00Z",
-                    "end": "2024-02-27T01:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 5,
-                "UsageUnit": "Infra Hosts",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "Centralize your monitoring of systems and services (Per Container)",
-                "ListCost": 236,
-                "ListUnitPrice": 1,
-                "ResourceName": "container_count",
-                "ResourceType": "infra_hosts",
-                "Id": "4bba680574ac970cfba52a5edc5b2d44541319b365fbcc45023b51fbe2572373",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/container_count",
-                "Window": {
-                    "start": "2024-02-27T00:00:00Z",
-                    "end": "2024-02-27T01:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 236,
-                "UsageUnit": "Containers",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "Centralize your monitoring of systems and services (Per Container)",
-                "ListCost": 219,
-                "ListUnitPrice": 1,
-                "ResourceName": "container_count_excl_agent",
-                "ResourceType": "infra_hosts",
-                "Id": "4bba680574ac970cfba52a5edc5b2d44541319b365fbcc45023b51fbe2572373",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/container_count_excl_agent",
-                "Window": {
-                    "start": "2024-02-27T00:00:00Z",
-                    "end": "2024-02-27T01:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 219,
-                "UsageUnit": "Containers",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "350+ integrations, alerting, custom metrics \u0026 unlimited user accounts",
-                "ListCost": 90,
-                "ListUnitPrice": 18,
-                "ResourceName": "host_count",
-                "ResourceType": "infra_hosts",
-                "Id": "4bba680574ac970cfba52a5edc5b2d44541319b365fbcc45023b51fbe2572373",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/host_count",
-                "Window": {
-                    "start": "2024-02-27T00:00:00Z",
-                    "end": "2024-02-27T01:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 5,
-                "UsageUnit": "Infra Hosts",
-                "ExtendedAttributes": null
-            }
-        ],
-        "Errors": null
-    },
-    {
-        "Metadata": {
-            "api_client_version": "v2"
-        },
-        "Costsource": "observability",
-        "Domain": "datadog",
-        "Version": "v1",
-        "Currency": "USD",
-        "Window": {
-            "start": "2024-02-27T01:00:00Z",
-            "end": "2024-02-27T02:00:00Z"
-        },
-        "Costs": [
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "350+ integrations, alerting, custom metrics \u0026 unlimited user accounts",
-                "ListCost": 90,
-                "ListUnitPrice": 18,
-                "ResourceName": "agent_host_count",
-                "ResourceType": "infra_hosts",
-                "Id": "448c8561d845b42adb1d52ebc88b3c44385372e54bf117544442d25887e3c338",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/agent_host_count",
-                "Window": {
-                    "start": "2024-02-27T01:00:00Z",
-                    "end": "2024-02-27T02:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 5,
-                "UsageUnit": "Infra Hosts",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "Centralize your monitoring of systems and services (Per Container)",
-                "ListCost": 235,
-                "ListUnitPrice": 1,
-                "ResourceName": "container_count",
-                "ResourceType": "infra_hosts",
-                "Id": "448c8561d845b42adb1d52ebc88b3c44385372e54bf117544442d25887e3c338",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/container_count",
-                "Window": {
-                    "start": "2024-02-27T01:00:00Z",
-                    "end": "2024-02-27T02:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 235,
-                "UsageUnit": "Containers",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "Centralize your monitoring of systems and services (Per Container)",
-                "ListCost": 218,
-                "ListUnitPrice": 1,
-                "ResourceName": "container_count_excl_agent",
-                "ResourceType": "infra_hosts",
-                "Id": "448c8561d845b42adb1d52ebc88b3c44385372e54bf117544442d25887e3c338",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/container_count_excl_agent",
-                "Window": {
-                    "start": "2024-02-27T01:00:00Z",
-                    "end": "2024-02-27T02:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 218,
-                "UsageUnit": "Containers",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "350+ integrations, alerting, custom metrics \u0026 unlimited user accounts",
-                "ListCost": 90,
-                "ListUnitPrice": 18,
-                "ResourceName": "host_count",
-                "ResourceType": "infra_hosts",
-                "Id": "448c8561d845b42adb1d52ebc88b3c44385372e54bf117544442d25887e3c338",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/host_count",
-                "Window": {
-                    "start": "2024-02-27T01:00:00Z",
-                    "end": "2024-02-27T02:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 5,
-                "UsageUnit": "Infra Hosts",
-                "ExtendedAttributes": null
-            }
-        ],
-        "Errors": null
-    }
-]
-`

+ 27 - 33
pkg/customcost/pipelineservice.go

@@ -5,6 +5,7 @@ import (
 	"net/http"
 	"os"
 	"os/exec"
+	"runtime"
 	"strings"
 	"time"
 
@@ -14,19 +15,21 @@ import (
 	"github.com/opencost/opencost/core/pkg/log"
 	ocplugin "github.com/opencost/opencost/core/pkg/plugin"
 	proto "github.com/opencost/opencost/core/pkg/protocol"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/core/pkg/version"
-	"github.com/opencost/opencost/pkg/env"
 )
 
 var protocol = proto.HTTP()
 
+const execFmt = `%s/%s.ocplugin.%s.%s`
+
 // PipelineService exposes CloudCost pipeline controls and diagnostics endpoints
 type PipelineService struct {
 	hourlyIngestor, dailyIngestor *CustomCostIngestor
 	hourlyStore, dailyStore       Repository
 }
 
-func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.ClientProtocol, error) {
+func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.Client, error) {
 
 	pluginNames := map[string]string{}
 	// scan plugin config directory for all file names
@@ -56,7 +59,7 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 	configs := map[string]*plugin.ClientConfig{}
 	// set up the client config
 	for name, config := range pluginNames {
-		if _, err := os.Stat(execDir + "/" + name + ".ocplugin." + version.Architecture); err != nil {
+		if _, err := os.Stat(fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, version.Architecture)); err != nil {
 			msg := fmt.Sprintf("error reading executable for %s plugin. Plugin executables must be in %s and have name format <plugin name>.ocplugin.<opencost binary archtecture (arm64 or amd64)>", name, execDir)
 			log.Errorf(msg)
 			return nil, fmt.Errorf(msg)
@@ -81,23 +84,18 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 		configs[name] = &plugin.ClientConfig{
 			HandshakeConfig: handshakeConfig,
 			Plugins:         pluginMap,
-			Cmd:             exec.Command(execDir+"/"+name+".ocplugin."+version.Architecture, config),
+			Cmd:             exec.Command(fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, version.Architecture), config),
 			Logger:          logger,
 		}
 	}
 
-	plugins := map[string]*plugin.ClientProtocol{}
+	plugins := map[string]*plugin.Client{}
 
 	for name, config := range configs {
 		client := plugin.NewClient(config)
-		// connect the client
-		rpcClient, err := client.Client()
-		if err != nil {
-			log.Errorf("error connecting client for plugin %s: %v", name, err)
-			return nil, fmt.Errorf("error connecting client for plugin %s: %v", name, err)
-		}
+
 		// add the connected, initialized client to the ma
-		plugins[name] = &rpcClient
+		plugins[name] = client
 	}
 
 	return plugins, nil
@@ -106,21 +104,20 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 // NewPipelineService is a constructor for a PipelineService
 func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
 
-	var registeredPlugins map[string]*plugin.ClientProtocol
-	var err error //getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
+	registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
 	if err != nil {
 		log.Errorf("error getting registered plugins: %v", err)
 		return nil, fmt.Errorf("error getting registered plugins: %v", err)
 	}
 
-	hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins)
+	hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins, time.Hour)
 	if err != nil {
 		return nil, err
 	}
 
 	hourlyIngestor.Start(false)
 
-	dailyIngestor, err := NewCustomCostIngestor(&ingConf, dailyrepo, registeredPlugins)
+	dailyIngestor, err := NewCustomCostIngestor(&ingConf, dailyrepo, registeredPlugins, timeutil.Day)
 	if err != nil {
 		return nil, err
 	}
@@ -138,25 +135,24 @@ func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostInge
 func (dp *PipelineService) Status() Status {
 
 	// Pull config status from the config controller
-	ingstatus := dp.hourlyIngestor.Status()
-	dur, err := time.ParseDuration(env.GetCustomCostRefreshRateHours())
-	if err != nil {
-		log.Errorf("error parsing duration %s: %v", env.GetCustomCostRefreshRateHours(), err)
-		return Status{}
-	}
-	refreshRate := time.Hour * dur
+	ingstatusHourly := dp.hourlyIngestor.Status()
+
+	// Pull config status from the config controller
+	ingstatusDaily := dp.dailyIngestor.Status()
 
 	// These are the statuses
 	return Status{
-		Coverage:    ingstatus.Coverage.String(),
-		RefreshRate: refreshRate.String(),
+		CoverageDaily:     ingstatusDaily.Coverage,
+		CoverageHourly:    ingstatusHourly.Coverage,
+		RefreshRateHourly: ingstatusHourly.RefreshRate.String(),
+		RefreshRateDaily:  ingstatusDaily.RefreshRate.String(),
 	}
 
 }
 
-// GetCloudCostRebuildHandler creates a handler from a http request which initiates a rebuild of cloud cost pipeline, if an
-// integrationKey is provided then it only rebuilds the specified billing integration
-func (s *PipelineService) GetCloudCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+// GetCustomCostRebuildHandler creates a handler from a http request which initiates a rebuild of custom cost pipeline, if a
+// domain is provided then it only rebuilds the specified billing domain
+func (s *PipelineService) GetCustomCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	// If pipeline Service is nil, always return 501
 	if s == nil {
 		return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
@@ -175,7 +171,7 @@ func (s *PipelineService) GetCloudCostRebuildHandler() func(w http.ResponseWrite
 		commit := r.URL.Query().Get("commit") == "true" || r.URL.Query().Get("commit") == "1"
 
 		if !commit {
-			protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Cloud Cost rebuild")
+			protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Custom Cost rebuild")
 			return
 		}
 
@@ -194,17 +190,15 @@ func (s *PipelineService) GetCloudCostRebuildHandler() func(w http.ResponseWrite
 			return
 		}
 		protocol.WriteData(w, fmt.Sprintf("Rebuilding Custom Cost For Domain %s", domain))
-		return
-
 	}
 }
 
-// GetCloudCostStatusHandler creates a handler from a http request which returns a list of the billing integration status
+// GetCustomCostStatusHandler creates a handler from a http request which returns the custom cost ingestor status
 func (s *PipelineService) GetCustomCostStatusHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	// If Reporting Service is nil, always return 501
 	if s == nil {
 		return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
-			http.Error(w, "Reporting Service is nil", http.StatusNotImplemented)
+			http.Error(w, "Custom cost pipeline Service is nil", http.StatusNotImplemented)
 		}
 	}
 	if s.hourlyIngestor == nil || s.dailyIngestor == nil {

+ 192 - 0
pkg/customcost/pipelineservice_test.go

@@ -0,0 +1,192 @@
+package customcost
+
+import (
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"runtime"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+	"github.com/opencost/opencost/core/pkg/version"
+)
+
+func TestPipelineService(t *testing.T) {
+	// establish temporary test assets dir
+
+	dir := t.TempDir()
+
+	err := os.MkdirAll(dir+"/config", 0777)
+	if err != nil {
+		t.Fatalf("error creating temp config dir: %v", err)
+	}
+
+	err = os.MkdirAll(dir+"/executable", 0777)
+	if err != nil {
+		t.Fatalf("error creating temp exec dir: %v", err)
+	}
+	version.Architecture = runtime.GOARCH
+	// write DD secrets to config files
+	// write config file to temp dir
+	writeDDConfig(dir+"/config", t)
+
+	// set env vars for plugin config and executable
+	err = os.Setenv("PLUGIN_CONFIG_DIR", dir+"/config")
+	if err != nil {
+		t.Fatalf("error setting config dir env var: %v", err)
+	}
+	err = os.Setenv("PLUGIN_EXECUTABLE_DIR", dir+"/executable")
+	if err != nil {
+		t.Fatalf("error setting config dir env var: %v", err)
+	}
+
+	// download amd plugin, store in tmp executable dir
+	downloadLatestPluginExec(dir+"/executable", t)
+
+	// set up repos
+	hourlyRepo := NewMemoryRepository()
+	dailyRepo := NewMemoryRepository()
+	// set up ingestor config
+	config := DefaultIngestorConfiguration()
+
+	config.DailyDuration = 7 * timeutil.Day
+	config.HourlyDuration = 16 * time.Hour
+	pipeline, err := NewPipelineService(hourlyRepo, dailyRepo, config)
+	if err != nil {
+		t.Fatalf("error starting pipeline: %v", err)
+	}
+
+	// wait until coverage is complete, then stop ingestor
+	ingestionComplete := false
+	maxLoops := 10
+	loopCount := 0
+	for !ingestionComplete && loopCount < maxLoops {
+		status := pipeline.Status()
+		log.Debugf("got status: %v", status)
+		coverageHourly, foundHourly := status.CoverageHourly["datadog"]
+		coverageDaily, foundDaily := status.CoverageDaily["datadog"]
+		if foundHourly && foundDaily {
+			// check coverage
+			minTime := time.Now().UTC().Add(-6 * timeutil.Day)
+			maxTime := time.Now().UTC().Add(-3 * time.Hour)
+			if coverageDaily.Start().Before(minTime) && coverageHourly.End().After(maxTime) {
+				log.Infof("good coverage, breaking out of loop")
+				ingestionComplete = true
+				break
+			} else {
+				log.Infof("coverage not within range. Looking for coverage %v to %v, but current coverage is %v/%v", minTime, maxTime, coverageDaily, coverageHourly)
+			}
+		} else {
+			log.Debugf("no coverage info ready yet for datadog")
+		}
+		log.Infof("sleeping 10s...")
+		time.Sleep(10 * time.Second)
+		loopCount++
+	}
+
+	if !ingestionComplete {
+		t.Fatal("ingestor never completed within allocated time")
+	}
+
+	pipeline.hourlyIngestor.Stop()
+	pipeline.dailyIngestor.Stop()
+
+	// inspect data from yesterday
+	targetTime := time.Now().UTC().Add(-1 * timeutil.Day).Truncate(timeutil.Day)
+	log.Infof("querying for data with window start: %v", targetTime)
+	// check for presence of hosts in DD response
+	ddCosts, err := dailyRepo.Get(targetTime, "datadog")
+	if err != nil {
+		t.Fatalf("error getting results for targetTime")
+	}
+	foundInfraHosts := false
+	for _, cost := range ddCosts.Costs {
+		if cost.ResourceType == "infra_hosts" {
+			foundInfraHosts = true
+		}
+	}
+
+	if !foundInfraHosts {
+		t.Fatal("expecting infra_hosts costs in daily response")
+	}
+
+	// query data from 4 hours ago (hourly)
+	targetTime = time.Now().UTC().Add(-13 * time.Hour).Truncate(time.Hour)
+	log.Infof("querying for data with window start: %v", targetTime)
+	// check for presence of hosts in DD response
+	ddCosts, err = hourlyRepo.Get(targetTime, "datadog")
+	if err != nil {
+		t.Fatalf("error getting results for targetTime")
+	}
+	foundInfraHosts = false
+	for _, cost := range ddCosts.Costs {
+		if cost.ResourceType == "infra_hosts" {
+			foundInfraHosts = true
+		}
+	}
+
+	if !foundInfraHosts {
+		t.Fatal("expecting infra_hosts costs in hourly response")
+	}
+}
+
+func downloadLatestPluginExec(dirName string, t *testing.T) {
+	ddPluginURL := "https://github.com/opencost/opencost-plugins/releases/download/v0.0.3/datadog.ocplugin." + runtime.GOOS + "." + version.Architecture
+	out, err := os.OpenFile(dirName+"/datadog.ocplugin."+runtime.GOOS+"."+version.Architecture, 0755|os.O_CREATE, 0755)
+	if err != nil {
+		t.Fatalf("error creating executable file: %v", err)
+	}
+	resp, err := http.Get(ddPluginURL)
+	if err != nil {
+		t.Fatalf("error downloading: %v", err)
+	}
+	defer resp.Body.Close()
+	defer out.Close()
+
+	_, err = io.Copy(out, resp.Body)
+	if err != nil {
+		t.Fatalf("error copying: %v", err)
+	}
+}
+
+func writeDDConfig(pluginConfigDir string, t *testing.T) {
+	// read necessary env vars. If any are missing, log warning and skip test
+	ddSite := os.Getenv("DD_SITE")
+	ddApiKey := os.Getenv("DD_API_KEY")
+	ddAppKey := os.Getenv("DD_APPLICATION_KEY")
+
+	if ddSite == "" {
+		log.Warnf("DD_SITE undefined, this needs to have the URL of your DD instance, skipping test")
+		t.Skip()
+		return
+	}
+
+	if ddApiKey == "" {
+		log.Warnf("DD_API_KEY undefined, skipping test")
+		t.Skip()
+		return
+	}
+
+	if ddAppKey == "" {
+		log.Warnf("DD_APPLICATION_KEY undefined, skipping test")
+		t.Skip()
+		return
+	}
+
+	// write out config to temp file using contents of env vars
+	ddConf := fmt.Sprintf(`{"datadog_site": "%s", "datadog_api_key": "%s", "datadog_app_key": "%s"}`, ddSite, ddApiKey, ddAppKey)
+
+	// set up custom cost request
+	file, err := os.CreateTemp(pluginConfigDir, "datadog_config.json")
+	if err != nil {
+		t.Fatalf("could not create temp config dir: %v", err)
+	}
+
+	err = os.WriteFile(file.Name(), []byte(ddConf), 0777)
+	if err != nil {
+		t.Fatalf("could not write file: %v", err)
+	}
+}

+ 16 - 13
pkg/customcost/status.go

@@ -3,22 +3,25 @@ package customcost
 import (
 	"time"
 
+	"github.com/opencost/opencost/core/pkg/opencost"
 	cloudconfig "github.com/opencost/opencost/pkg/cloud"
 )
 
 // Status gives the details and metadata of a CloudCost integration
 type Status struct {
-	Key              string             `json:"key"`
-	Source           string             `json:"source"`
-	Provider         string             `json:"provider"`
-	Active           bool               `json:"active"`
-	Valid            bool               `json:"valid"`
-	LastRun          time.Time          `json:"lastRun"`
-	NextRun          time.Time          `json:"nextRun"`
-	RefreshRate      string             `json:"RefreshRate"`
-	Created          time.Time          `json:"created"`
-	Runs             int                `json:"runs"`
-	Coverage         string             `json:"coverage"`
-	ConnectionStatus string             `json:"connectionStatus"`
-	Config           cloudconfig.Config `json:"config"`
+	Key               string                     `json:"key"`
+	Source            string                     `json:"source"`
+	Provider          string                     `json:"provider"`
+	Active            bool                       `json:"active"`
+	Valid             bool                       `json:"valid"`
+	LastRun           time.Time                  `json:"lastRun"`
+	NextRun           time.Time                  `json:"nextRun"`
+	RefreshRateDaily  string                     `json:"RefreshRateDaily"`
+	RefreshRateHourly string                     `json:"RefreshRateHourly"`
+	Created           time.Time                  `json:"created"`
+	Runs              int                        `json:"runs"`
+	CoverageHourly    map[string]opencost.Window `json:"coverageHourly"`
+	CoverageDaily     map[string]opencost.Window `json:"coverageDaily"`
+	ConnectionStatus  string                     `json:"connectionStatus"`
+	Config            cloudconfig.Config         `json:"config"`
 }

+ 4 - 0
pkg/env/costmodelenv.go

@@ -677,6 +677,10 @@ func GetCloudCostQueryWindowDays() int64 {
 	return env.GetInt64(CloudCostQueryWindowDaysEnvVar, 7)
 }
 
+func GetCustomCostQueryWindowHours() int64 {
+	return env.GetInt64(CustomCostQueryWindowDaysEnvVar, 1)
+}
+
 func GetCustomCostQueryWindowDays() int64 {
 	return env.GetInt64(CustomCostQueryWindowDaysEnvVar, 7)
 }