Przeglądaj źródła

finish rough in of ingestion logic

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
Alex Meijer 2 lat temu
rodzic
commit
757ce70711
2 zmienionych plików z 256 dodań i 420 usunięć
  1. 245 407
      pkg/customcost/ingestor.go
  2. 11 13
      pkg/customcost/pipelineservice.go

+ 245 - 407
pkg/customcost/ingestor.go

@@ -1,19 +1,21 @@
 package customcost
 
 import (
-	"encoding/json"
 	"fmt"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
 
-	"github.com/davecgh/go-spew/spew"
 	"github.com/hashicorp/go-plugin"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/model"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	ocplugin "github.com/opencost/opencost/core/pkg/plugin"
+	"github.com/opencost/opencost/core/pkg/util/stringutil"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/env"
+	"github.com/opencost/opencost/pkg/errors"
 )
 
 // IngestorStatus includes diagnostic values for a given Ingestor
@@ -22,7 +24,7 @@ type IngestorStatus struct {
 	LastRun  time.Time
 	NextRun  time.Time
 	Runs     int
-	Coverage opencost.Window
+	Coverage map[string]opencost.Window
 }
 
 // CustomCost IngestorConfig is a configuration struct for an Ingestor
@@ -54,116 +56,166 @@ type CustomCostIngestor struct {
 	lastRun      time.Time
 	runs         int
 	creationTime time.Time
-	coverage     opencost.Window
+	coverage     map[string]opencost.Window
 	coverageLock sync.Mutex
 	isRunning    atomic.Bool
 	isStopping   atomic.Bool
 	exitBuildCh  chan string
 	exitRunCh    chan string
 	plugins      map[string]*plugin.ClientProtocol
+	resolution   time.Duration
 }
 
 // NewIngestor is an initializer for ingestor
-func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.ClientProtocol) (*CustomCostIngestor, error) {
+func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.ClientProtocol, res time.Duration) (*CustomCostIngestor, error) {
 	if repo == nil {
 		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
 	}
 	if ingestorConfig == nil {
-		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: integration connot be nil")
+		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: config connot be nil")
 	}
+	key := ""
+
+	for name := range plugins {
+		key += "," + name
+	}
+
+	key = strings.TrimPrefix(key, ",")
 
 	now := time.Now().UTC()
-	midnight := opencost.RoundForward(now, timeutil.Day)
+
 	return &CustomCostIngestor{
+		key:          key,
 		config:       ingestorConfig,
 		repo:         repo,
 		creationTime: now,
 		lastRun:      now,
-		coverage:     opencost.NewClosedWindow(midnight, midnight),
+		coverage:     map[string]opencost.Window{},
 		plugins:      plugins,
+		resolution:   res,
 	}, nil
 }
 
 func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
-	windows, err := opencost.GetWindows(start, end, timeutil.Day)
-	if err != nil {
-		log.Errorf("CloudCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
-		return
+	var targets []opencost.Window
+	if ing.resolution == time.Hour {
+		oldestDailyDate := time.Now().UTC().Add(-1 * ing.config.DailyDuration)
+		if !oldestDailyDate.After(start) {
+			windows, err := opencost.GetWindows(start, end, timeutil.Day)
+			if err != nil {
+				log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
+				return
+			}
+			targets = windows
+		}
+	} else {
+
+		oldestHourlyDate := time.Now().UTC().Add(-1 * ing.config.HourlyDuration)
+		if !oldestHourlyDate.After(start) {
+			windows, err := opencost.GetWindows(start, end, time.Hour)
+			if err != nil {
+				log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
+				return
+			}
+			targets = windows
+		}
 	}
 
-	for _, window := range windows {
-		has, err2 := ing.repo.Has(*window.Start(), ing.key)
-		if err2 != nil {
-			log.Errorf("CloudCost[%s]: ingestor: error when loading window: %s", ing.key, err2.Error())
+	for _, window := range targets {
+		allPluginsHave := true
+		for domain := range ing.plugins {
+			has, err2 := ing.repo.Has(*window.Start(), domain)
+			if err2 != nil {
+				log.Errorf("CustomCost[%s]: ingestor: error when loading window for plugin %s: %s", ing.key, domain, err2.Error())
+			}
+			if !has {
+				allPluginsHave = false
+				break
+			}
 		}
-		if !has {
+		if !allPluginsHave {
 			ing.BuildWindow(start, end)
-			return
+		} else {
+			for domain := range ing.plugins {
+				ing.expandCoverage(window, domain)
+			}
+			log.Debugf("CustomCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
 		}
-		ing.expandCoverage(window)
-		log.Debugf("CloudCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
 	}
 
 }
 
 func (ing *CustomCostIngestor) BuildWindow(start, end time.Time) {
-	log.Infof("ingestor: building window %s", opencost.NewWindow(&start, &end))
-
-	// // build customCostRequest
-	// // make RPC call via plugin
-
-	// // loop through each customCostResponse, adding
-	// for _, ccs := range ccsr.CloudCostSets {
-	// 	log.Debugf("BuildWindow[%s]: GetCloudCost: writing cloud costs for window %s: %d", ccs.Integration, ccs.Window, len(ccs.CloudCosts))
-	// 	err2 := ing.repo.Put(ccs)
-	// 	if err2 != nil {
-	// 		log.Errorf("CloudCost[%s]: ingestor: failed to save Cloud Cost Set with window %s: %s", ing.key, ccs.GetWindow().String(), err2.Error())
-	// 	}
-	// 	ing.expandCoverage(ccs.Window)
-	// }
-}
 
-func (ing *CustomCostIngestor) Start(rebuild bool) {
+	for domain := range ing.plugins {
+		ing.buildSingleDomain(start, end, domain)
+	}
+}
 
-	// // If already running, log that and return.
-	// if !ing.isRunning.CompareAndSwap(false, true) {
-	// 	log.Infof("CloudCost: ingestor: is already running")
-	// 	return
-	// }
+func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain string) {
+	target := opencost.NewWindow(&start, &end)
+	req := model.CustomCostRequest{
+		TargetWindow: &target,
+		Resolution:   ing.resolution,
+	}
+	log.Infof("ingestor: building window %s for plugin %s", opencost.NewWindow(&start, &end), domain)
+	// make RPC call via plugin
+	pluginClient, found := ing.plugins[domain]
+	if !found {
+		log.Errorf("could not find plugin client for plugin %s. Did you initialize the plugin correctly?", domain)
+		return
+	}
+	pluginClientDeref := *pluginClient
+	// Request the plugin
+	raw, err := pluginClientDeref.Dispense("CustomCostSource")
+	if err != nil {
+		log.Errorf("error creating new plugin client for plugin %s: %v", domain, err)
+		return
+	}
 
-	// ing.runID = stringutil.RandSeq(5)
+	custCostSrc := raw.(ocplugin.CustomCostSource)
 
-	// ing.exitBuildCh = make(chan string)
-	// ing.exitRunCh = make(chan string)
+	custCostResps := custCostSrc.GetCustomCosts(req)
+	// loop through each customCostResponse, adding to repo
+	for _, ccr := range custCostResps {
 
-	// // Build the store once, advancing backward in time from the earliest
-	// // point of coverage.
-	// go ing.build(rebuild)
+		// check for errors in response
+		if len(ccr.Errors) > 0 {
+			for _, errResp := range ccr.Errors {
+				log.Errorf("error in getting custom costs for plugin %s: %v", domain, errResp)
+			}
+			log.Errorf("not adding any costs for window %v on plugin %s", req.TargetWindow, domain)
+			continue
+		}
+		log.Debugf("BuildWindow[%s]: GetCustomCost: writing custom costs for window %s: %d", domain, ccr.Window, len(ccr.Costs))
 
-	// go ing.run()
+		err2 := ing.repo.Put(&ccr)
+		if err2 != nil {
+			log.Errorf("CustomCost[%s]: ingestor: failed to save Custom Cost Set with window %s: %s", domain, ccr.GetWindow().String(), err2.Error())
+		}
 
-	// TEMPORARY - load the repo with dummy data
-	var resps []*model.CustomCostResponse
-	err := json.Unmarshal([]byte(ddData), &resps)
-	if err != nil {
-		panic(err)
+		ing.expandCoverage(ccr.Window, domain)
 	}
+}
 
-	for _, resp := range resps {
-		err = ing.repo.Put(resp)
-		if err != nil {
-			panic(err)
-		}
-	}
-	//2024-02-27T01:00:00
-	target := time.Date(2024, 2, 27, 1, 0, 0, 0, time.UTC)
-	stored, err := ing.repo.Get(target, "datadog")
-	if err != nil {
-		panic(err)
+func (ing *CustomCostIngestor) Start(rebuild bool) {
+
+	// If already running, log that and return.
+	if !ing.isRunning.CompareAndSwap(false, true) {
+		log.Infof("CustomCost: ingestor: is already running")
+		return
 	}
 
-	log.Debug("got stored object: ")
-	spew.Dump(stored)
+	ing.runID = stringutil.RandSeq(5)
+
+	ing.exitBuildCh = make(chan string)
+	ing.exitRunCh = make(chan string)
+
+	// Build the store once, advancing backward in time from the earliest
+	// point of coverage.
+	go ing.build(rebuild)
+
+	go ing.run()
 
 }
 
@@ -216,361 +268,147 @@ func (ing *CustomCostIngestor) Status() IngestorStatus {
 }
 
 func (ing *CustomCostIngestor) build(rebuild bool) {
-	// defer errors.HandlePanic()
-
-	// // Profile the full Duration of the build time
-	// buildStart := time.Now()
-
-	// // Build as far back as the configures build Duration
-	// limit := opencost.RoundBack(time.Now().UTC().Add(-ing.config.Duration), ing.config.Resolution)
-
-	// queryWindowStr := timeutil.FormatStoreResolution(ing.config.QueryWindow)
-	// log.Infof("CloudCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, limit.String(), queryWindowStr)
-
-	// // Start with a window of the configured Duration and ending on the given
-	// // start time. Build windows repeating until the window reaches the
-	// // given limit time
-
-	// // Round end times back to nearest Resolution points in the past,
-	// // querying for exactly one interval
-	// e := opencost.RoundBack(time.Now().UTC(), ing.config.Resolution)
-	// s := e.Add(-ing.config.QueryWindow)
-
-	// // Continue until limit is reached
-	// for limit.Before(e) {
-	// 	// If exit instruction is received, log and return
-	// 	select {
-	// 	case <-ing.exitBuildCh:
-	// 		log.Debugf("CloudCost[%s]: ingestor: build[%s]: exiting", ing.key, ing.runID)
-	// 		return
-	// 	default:
-	// 	}
-
-	// 	// Profile the current build step
-	// 	stepStart := time.Now()
-
-	// 	// if rebuild is not specified then check for existing coverage on window
-	// 	if rebuild {
-	// 		ing.BuildWindow(s, e)
-	// 	} else {
-	// 		ing.LoadWindow(s, e)
-	// 	}
-
-	// 	log.Infof("CloudCost[%s]: ingestor: build[%s]:  %s in %v", ing.key, ing.runID, opencost.NewClosedWindow(s, e), time.Since(stepStart))
-
-	// 	// Shift to next QueryWindow
-	// 	s = s.Add(-ing.config.QueryWindow)
-	// 	if s.Before(limit) {
-	// 		s = limit
-	// 	}
-	// 	e = e.Add(-ing.config.QueryWindow)
-	// }
-
-	// log.Infof(fmt.Sprintf("CloudCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart)))
-
-	// // In order to be able to Stop, we have to wait on an exit message
-	// // here
-	// <-ing.exitBuildCh
+	defer errors.HandlePanic()
+
+	// Profile the full Duration of the build time
+	buildStart := time.Now()
+
+	targetDur := ing.config.DailyDuration
+	if ing.resolution == time.Hour {
+		targetDur = ing.config.HourlyDuration
+	}
+	// Build as far back as the configures build Duration
+	limit := opencost.RoundBack(time.Now().UTC().Add(-1*targetDur), ing.resolution)
+
+	queryWindowStr := timeutil.FormatStoreResolution(ing.config.QueryWindow)
+	log.Infof("CustomCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, limit.String(), queryWindowStr)
+
+	// Start with a window of the configured Duration and ending on the given
+	// start time. Build windows repeating until the window reaches the
+	// given limit time
+
+	// Round end times back to nearest Resolution points in the past,
+	// querying for exactly one interval
+	e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
+	s := e.Add(-ing.config.QueryWindow)
+
+	// Continue until limit is reached
+	for limit.Before(e) {
+		// If exit instruction is received, log and return
+		select {
+		case <-ing.exitBuildCh:
+			log.Debugf("CustomCost[%s][%s]: ingestor: build[%s]: exiting", ing.key, ing.resolution, ing.runID)
+			return
+		default:
+		}
+
+		// Profile the current build step
+		stepStart := time.Now()
+
+		// if rebuild is not specified then check for existing coverage on window
+		if rebuild {
+			ing.BuildWindow(s, e)
+		} else {
+			ing.LoadWindow(s, e)
+		}
+
+		log.Infof("CustomCost[%s]: ingestor: build[%s]:  %s in %v", ing.key, ing.runID, opencost.NewClosedWindow(s, e), time.Since(stepStart))
+
+		// Shift to next QueryWindow
+		s = s.Add(-ing.config.QueryWindow)
+		if s.Before(limit) {
+			s = limit
+		}
+		e = e.Add(-ing.config.QueryWindow)
+	}
+
+	log.Infof(fmt.Sprintf("CusomCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart)))
+
+	// In order to be able to Stop, we have to wait on an exit message
+	// here
+	<-ing.exitBuildCh
 
 }
 
 func (ing *CustomCostIngestor) Rebuild(domain string) error {
+	targetDur := ing.config.DailyDuration
+	if ing.resolution == time.Hour {
+		targetDur = ing.config.HourlyDuration
+	}
+	// Build as far back as the configures build Duration
+	limit := opencost.RoundBack(time.Now().UTC().Add(-1*targetDur), ing.resolution)
+	e := time.Now().UTC()
+
+	ing.buildSingleDomain(limit, e, domain)
 	return nil
 }
+
 func (ing *CustomCostIngestor) run() {
-	// defer errors.HandlePanic()
-
-	// ticker := timeutil.NewJobTicker()
-	// defer ticker.Close()
-	// ticker.TickIn(0)
-
-	// for {
-	// 	// If an exit instruction is received, break the run loop
-	// 	select {
-	// 	case <-ing.exitRunCh:
-	// 		log.Debugf("CloudCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
-	// 		return
-	// 	case <-ticker.Ch:
-	// 		// Wait for next tick
-	// 	}
-
-	// 	// Start from the last covered time, minus the RunWindow
-	// 	start := ing.lastRun
-	// 	start = start.Add(-ing.config.RunWindow)
-
-	// 	// Every Nth (determined by the MonthToDateRunInterval) run should be a month to date run. Where the start is
-	// 	// truncated to the beginning of its current month this can mean that early in a new month we will build all of
-	// 	// last month and the first few days of the current month.
-	// 	if ing.runs%ing.config.MonthToDateRunInterval == 0 {
-	// 		start = time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
-	// 		log.Infof("CloudCost[%s]: ingestor: Run[%s]: running month-to-date update starting at %s", ing.key, ing.runID, start.String())
-	// 	}
-
-	// 	// Round start time back to the nearest Resolution point in the past from the
-	// 	// last update to the QueryWindow
-	// 	s := opencost.RoundBack(start.UTC(), ing.config.Resolution)
-	// 	e := s.Add(ing.config.QueryWindow)
-
-	// 	// Start with a window of the configured Duration and starting on the given
-	// 	// start time. Do the following, repeating until the window reaches the
-	// 	// current time:
-	// 	// 1. Instruct builder to build window
-	// 	// 2. Move window forward one Resolution
-	// 	for time.Now().After(s) {
-	// 		profStart := time.Now()
-	// 		ing.BuildWindow(s, e)
-
-	// 		log.Debugf("CloudCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
-
-	// 		s = s.Add(ing.config.QueryWindow)
-	// 		e = e.Add(ing.config.QueryWindow)
-	// 		// prevent builds into the future
-	// 		if e.After(time.Now().UTC()) {
-	// 			e = opencost.RoundForward(time.Now().UTC(), ing.config.Resolution)
-	// 		}
-
-	// 	}
-	// 	ing.lastRun = time.Now().UTC()
-
-	// 	limit := opencost.RoundBack(time.Now().UTC(), ing.config.Resolution).Add(-ing.config.Duration)
-	// 	err := ing.repo.Expire(limit)
-	// 	if err != nil {
-	// 		log.Errorf("CloudCost: Ingestor: failed to expire Data: %s", err)
-	// 	}
-
-	// 	ing.coverageLock.Lock()
-	// 	ing.coverage = ing.coverage.ContractStart(limit)
-	// 	ing.coverageLock.Unlock()
-
-	// 	ing.runs++
-
-	// 	ticker.TickIn(ing.config.RefreshRate)
-	// }
+	defer errors.HandlePanic()
+
+	ticker := timeutil.NewJobTicker()
+	defer ticker.Close()
+	ticker.TickIn(0)
+
+	for {
+		// If an exit instruction is received, break the run loop
+		select {
+		case <-ing.exitRunCh:
+			log.Debugf("CustomCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
+			return
+		case <-ticker.Ch:
+			// Wait for next tick
+		}
+
+		// Start from the last covered time, minus the RunWindow
+		start := ing.lastRun
+		start = start.Add(-ing.config.RunWindow)
+
+		// Round start time back to the nearest Resolution point in the past from the
+		// last update to the QueryWindow
+		s := opencost.RoundBack(start.UTC(), ing.resolution)
+		e := s.Add(ing.config.QueryWindow)
+
+		// Start with a window of the configured Duration and starting on the given
+		// start time. Do the following, repeating until the window reaches the
+		// current time:
+		// 1. Instruct builder to build window
+		// 2. Move window forward one Resolution
+		for time.Now().After(s) {
+			profStart := time.Now()
+			ing.BuildWindow(s, e)
+
+			log.Debugf("CUstomCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
+
+			s = s.Add(ing.config.QueryWindow)
+			e = e.Add(ing.config.QueryWindow)
+			// prevent builds into the future
+			if e.After(time.Now().UTC()) {
+				e = opencost.RoundForward(time.Now().UTC(), ing.resolution)
+			}
+
+		}
+		ing.lastRun = time.Now().UTC()
+
+		ing.runs++
+
+		ticker.TickIn(ing.config.RefreshRate)
+	}
 }
 
-func (ing *CustomCostIngestor) expandCoverage(window opencost.Window) {
+func (ing *CustomCostIngestor) expandCoverage(window opencost.Window, plugin string) {
 	if window.IsOpen() {
 		return
 	}
 	ing.coverageLock.Lock()
 	defer ing.coverageLock.Unlock()
 
-	coverage := ing.coverage.ExpandStart(*window.Start())
-	coverage = coverage.ExpandEnd(*window.End())
+	if _, hasCoverage := ing.coverage[plugin]; !hasCoverage {
+		ing.coverage[plugin] = window.Clone()
+	} else {
+		// expand existing coverage
+		ing.coverage[plugin] = ing.coverage[plugin].ExpandStart(*window.Start())
+		ing.coverage[plugin] = ing.coverage[plugin].ExpandEnd(*window.End())
+	}
 
-	ing.coverage = coverage
 }
-
-// temporary mock data
-const ddData = `
-[
-    {
-        "Metadata": {
-            "api_client_version": "v2"
-        },
-        "Costsource": "observability",
-        "Domain": "datadog",
-        "Version": "v1",
-        "Currency": "USD",
-        "Window": {
-            "start": "2024-02-27T00:00:00Z",
-            "end": "2024-02-27T01:00:00Z"
-        },
-        "Costs": [
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "350+ integrations, alerting, custom metrics \u0026 unlimited user accounts",
-                "ListCost": 90,
-                "ListUnitPrice": 18,
-                "ResourceName": "agent_host_count",
-                "ResourceType": "infra_hosts",
-                "Id": "4bba680574ac970cfba52a5edc5b2d44541319b365fbcc45023b51fbe2572373",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/agent_host_count",
-                "Window": {
-                    "start": "2024-02-27T00:00:00Z",
-                    "end": "2024-02-27T01:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 5,
-                "UsageUnit": "Infra Hosts",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "Centralize your monitoring of systems and services (Per Container)",
-                "ListCost": 236,
-                "ListUnitPrice": 1,
-                "ResourceName": "container_count",
-                "ResourceType": "infra_hosts",
-                "Id": "4bba680574ac970cfba52a5edc5b2d44541319b365fbcc45023b51fbe2572373",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/container_count",
-                "Window": {
-                    "start": "2024-02-27T00:00:00Z",
-                    "end": "2024-02-27T01:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 236,
-                "UsageUnit": "Containers",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "Centralize your monitoring of systems and services (Per Container)",
-                "ListCost": 219,
-                "ListUnitPrice": 1,
-                "ResourceName": "container_count_excl_agent",
-                "ResourceType": "infra_hosts",
-                "Id": "4bba680574ac970cfba52a5edc5b2d44541319b365fbcc45023b51fbe2572373",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/container_count_excl_agent",
-                "Window": {
-                    "start": "2024-02-27T00:00:00Z",
-                    "end": "2024-02-27T01:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 219,
-                "UsageUnit": "Containers",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "350+ integrations, alerting, custom metrics \u0026 unlimited user accounts",
-                "ListCost": 90,
-                "ListUnitPrice": 18,
-                "ResourceName": "host_count",
-                "ResourceType": "infra_hosts",
-                "Id": "4bba680574ac970cfba52a5edc5b2d44541319b365fbcc45023b51fbe2572373",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/host_count",
-                "Window": {
-                    "start": "2024-02-27T00:00:00Z",
-                    "end": "2024-02-27T01:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 5,
-                "UsageUnit": "Infra Hosts",
-                "ExtendedAttributes": null
-            }
-        ],
-        "Errors": null
-    },
-    {
-        "Metadata": {
-            "api_client_version": "v2"
-        },
-        "Costsource": "observability",
-        "Domain": "datadog",
-        "Version": "v1",
-        "Currency": "USD",
-        "Window": {
-            "start": "2024-02-27T01:00:00Z",
-            "end": "2024-02-27T02:00:00Z"
-        },
-        "Costs": [
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "350+ integrations, alerting, custom metrics \u0026 unlimited user accounts",
-                "ListCost": 90,
-                "ListUnitPrice": 18,
-                "ResourceName": "agent_host_count",
-                "ResourceType": "infra_hosts",
-                "Id": "448c8561d845b42adb1d52ebc88b3c44385372e54bf117544442d25887e3c338",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/agent_host_count",
-                "Window": {
-                    "start": "2024-02-27T01:00:00Z",
-                    "end": "2024-02-27T02:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 5,
-                "UsageUnit": "Infra Hosts",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "Centralize your monitoring of systems and services (Per Container)",
-                "ListCost": 235,
-                "ListUnitPrice": 1,
-                "ResourceName": "container_count",
-                "ResourceType": "infra_hosts",
-                "Id": "448c8561d845b42adb1d52ebc88b3c44385372e54bf117544442d25887e3c338",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/container_count",
-                "Window": {
-                    "start": "2024-02-27T01:00:00Z",
-                    "end": "2024-02-27T02:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 235,
-                "UsageUnit": "Containers",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "Centralize your monitoring of systems and services (Per Container)",
-                "ListCost": 218,
-                "ListUnitPrice": 1,
-                "ResourceName": "container_count_excl_agent",
-                "ResourceType": "infra_hosts",
-                "Id": "448c8561d845b42adb1d52ebc88b3c44385372e54bf117544442d25887e3c338",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/container_count_excl_agent",
-                "Window": {
-                    "start": "2024-02-27T01:00:00Z",
-                    "end": "2024-02-27T02:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 218,
-                "UsageUnit": "Containers",
-                "ExtendedAttributes": null
-            },
-            {
-                "Metadata": null,
-                "Zone": "us",
-                "BilledCost": 0,
-                "AccountName": "Kubecost",
-                "ChargeCategory": "usage",
-                "Description": "350+ integrations, alerting, custom metrics \u0026 unlimited user accounts",
-                "ListCost": 90,
-                "ListUnitPrice": 18,
-                "ResourceName": "host_count",
-                "ResourceType": "infra_hosts",
-                "Id": "448c8561d845b42adb1d52ebc88b3c44385372e54bf117544442d25887e3c338",
-                "ProviderId": "42c0ac62-8d80-11ed-96f3-da7ad0900005/host_count",
-                "Window": {
-                    "start": "2024-02-27T01:00:00Z",
-                    "end": "2024-02-27T02:00:00Z"
-                },
-                "Labels": {},
-                "UsageQty": 5,
-                "UsageUnit": "Infra Hosts",
-                "ExtendedAttributes": null
-            }
-        ],
-        "Errors": null
-    }
-]
-`

+ 11 - 13
pkg/customcost/pipelineservice.go

@@ -14,6 +14,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/log"
 	ocplugin "github.com/opencost/opencost/core/pkg/plugin"
 	proto "github.com/opencost/opencost/core/pkg/protocol"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/core/pkg/version"
 	"github.com/opencost/opencost/pkg/env"
 )
@@ -106,21 +107,20 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 // NewPipelineService is a constructor for a PipelineService
 func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
 
-	var registeredPlugins map[string]*plugin.ClientProtocol
-	var err error //getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
+	registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
 	if err != nil {
 		log.Errorf("error getting registered plugins: %v", err)
 		return nil, fmt.Errorf("error getting registered plugins: %v", err)
 	}
 
-	hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins)
+	hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins, time.Hour)
 	if err != nil {
 		return nil, err
 	}
 
 	hourlyIngestor.Start(false)
 
-	dailyIngestor, err := NewCustomCostIngestor(&ingConf, dailyrepo, registeredPlugins)
+	dailyIngestor, err := NewCustomCostIngestor(&ingConf, dailyrepo, registeredPlugins, timeutil.Day)
 	if err != nil {
 		return nil, err
 	}
@@ -148,15 +148,15 @@ func (dp *PipelineService) Status() Status {
 
 	// These are the statuses
 	return Status{
-		Coverage:    ingstatus.Coverage.String(),
+		Coverage:    fmt.Sprintf("%v", ingstatus.Coverage),
 		RefreshRate: refreshRate.String(),
 	}
 
 }
 
-// GetCloudCostRebuildHandler creates a handler from a http request which initiates a rebuild of cloud cost pipeline, if an
-// integrationKey is provided then it only rebuilds the specified billing integration
-func (s *PipelineService) GetCloudCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+// GetCustomCostRebuildHandler creates a handler from a http request which initiates a rebuild of custom cost pipeline, if a
+// domain is provided then it only rebuilds the specified billing domain
+func (s *PipelineService) GetCustomCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	// If pipeline Service is nil, always return 501
 	if s == nil {
 		return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
@@ -175,7 +175,7 @@ func (s *PipelineService) GetCloudCostRebuildHandler() func(w http.ResponseWrite
 		commit := r.URL.Query().Get("commit") == "true" || r.URL.Query().Get("commit") == "1"
 
 		if !commit {
-			protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Cloud Cost rebuild")
+			protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Custom Cost rebuild")
 			return
 		}
 
@@ -194,17 +194,15 @@ func (s *PipelineService) GetCloudCostRebuildHandler() func(w http.ResponseWrite
 			return
 		}
 		protocol.WriteData(w, fmt.Sprintf("Rebuilding Custom Cost For Domain %s", domain))
-		return
-
 	}
 }
 
-// GetCloudCostStatusHandler creates a handler from a http request which returns a list of the billing integration status
+// GetCustomCostStatusHandler creates a handler from a http request which returns the custom cost ingestor status
 func (s *PipelineService) GetCustomCostStatusHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	// If Reporting Service is nil, always return 501
 	if s == nil {
 		return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
-			http.Error(w, "Reporting Service is nil", http.StatusNotImplemented)
+			http.Error(w, "Custom cost pipeline Service is nil", http.StatusNotImplemented)
 		}
 	}
 	if s.hourlyIngestor == nil || s.dailyIngestor == nil {