Просмотр исходного кода

Add utils. Complete and rename new ClusterCosts function.

Niko Kovacevic 6 лет назад
Родитель
Сommit
cd9fb4b49e
3 измененных файлов с 325 добавлено и 119 удалено
  1. 199 119
      costmodel/cluster.go
  2. 39 0
      util/errors.go
  3. 87 0
      util/time.go

+ 199 - 119
costmodel/cluster.go

@@ -6,9 +6,9 @@ import (
 	"sync"
 	"time"
 
-	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
-	prometheusClient "github.com/prometheus/client_golang/api"
-
+	"github.com/kubecost/cost-model/cloud"
+	"github.com/kubecost/cost-model/util"
+	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
 
@@ -34,6 +34,197 @@ const (
 	  ) by (cluster_id) %s`
 )
 
+// TODO move this to a package-accessible helper
+type PromQueryContext struct {
+	client prometheus.Client
+	ec     *util.ErrorCollector
+	wg     *sync.WaitGroup
+}
+
+// TODO move this to a package-accessible helper function once dependencies are able to
+// be extricated from costmodel package (PromQueryResult -> Vector). Otherwise, circular deps.
+func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
+	if ctx.wg != nil {
+		defer ctx.wg.Done()
+	}
+
+	raw, promErr := Query(ctx.client, query)
+	ctx.ec.Report(promErr)
+
+	results, parseErr := NewQueryResults(raw)
+	ctx.ec.Report(parseErr)
+
+	resultCh <- results
+}
+
+// Costs represents cumulative and monthly cluster costs over a given duration. Costs
+// are broken down by cores, memory, and storage.
+type ClusterCosts struct {
+	Start             *time.Time `json:"startTime"`
+	End               *time.Time `json:"endTime"`
+	CPUCumulative     float64    `json:"cpuCumulativeCost"`
+	CPUMonthly        float64    `json:"cpuMonthlyCost"`
+	RAMCumulative     float64    `json:"ramCumulativeCost"`
+	RAMMonthly        float64    `json:"ramMonthlyCost"`
+	StorageCumulative float64    `json:"storageCumulativeCost"`
+	StorageMonthly    float64    `json:"storageMonthlyCost"`
+	TotalCumulative   float64    `json:"totalCost"`
+	TotalMonthly      float64    `json:"totalMonthlyCost"`
+}
+
+// NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
+// the associated monthly rate data, and returns the Costs.
+func NewClusterCostsFromCumulative(cpu, ram, storage float64, window, offset string) (*ClusterCosts, error) {
+	start, end, err := util.ParseTimeRange(window, offset)
+	if err != nil {
+		return nil, err
+	}
+
+	hours := end.Sub(*start).Hours()
+
+	cc := &ClusterCosts{
+		Start:             start,
+		End:               end,
+		CPUCumulative:     cpu,
+		RAMCumulative:     ram,
+		StorageCumulative: storage,
+		TotalCumulative:   cpu + ram + storage,
+		CPUMonthly:        cpu / hours * (util.HoursPerDay * util.DaysPerMonth),
+		RAMMonthly:        ram / hours * (util.HoursPerDay * util.DaysPerMonth),
+		StorageMonthly:    storage / hours * (util.HoursPerDay * util.DaysPerMonth),
+	}
+	cc.TotalMonthly = cc.CPUMonthly + cc.RAMMonthly + cc.StorageMonthly
+
+	return cc, nil
+}
+
+// NewClusterCostsFromMonthly takes monthly-rate cost data over a given time range, computes
+// the associated cumulative cost data, and returns the Costs.
+func NewClusterCostsFromMonthly(cpuMonthly, ramMonthly, storageMonthly float64, window, offset string) (*ClusterCosts, error) {
+	start, end, err := util.ParseTimeRange(window, offset)
+	if err != nil {
+		return nil, err
+	}
+
+	hours := end.Sub(*start).Hours()
+
+	cc := &ClusterCosts{
+		Start:             start,
+		End:               end,
+		CPUMonthly:        cpuMonthly,
+		RAMMonthly:        ramMonthly,
+		StorageMonthly:    storageMonthly,
+		TotalMonthly:      cpuMonthly + ramMonthly + storageMonthly,
+		CPUCumulative:     cpuMonthly / util.HoursPerMonth * hours,
+		RAMCumulative:     ramMonthly / util.HoursPerMonth * hours,
+		StorageCumulative: storageMonthly / util.HoursPerMonth * hours,
+	}
+	cc.TotalCumulative = cc.CPUCumulative + cc.RAMCumulative + cc.StorageCumulative
+
+	return cc, nil
+}
+
+// ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
+func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*ClusterCosts, error) {
+	const fmtQueryTotalCPU = `sum(
+		sum(sum_over_time(kube_node_status_capacity_cpu_cores[%s:1h]%s)) by (node, cluster_id) *
+		avg(avg_over_time(node_cpu_hourly_cost[%s:1h]%s)) by (node, cluster_id)
+	)`
+
+	const fmtQueryTotalRAM = `sum(
+		sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1h]%s) / 1024 / 1024 / 1024) by (node, cluster_id) *
+		avg(avg_over_time(node_ram_hourly_cost[%s:1h]%s)) by (node, cluster_id)
+	)`
+
+	const fmtQueryTotalStorage = `sum(
+		sum(sum_over_time(kube_persistentvolume_capacity_bytes[%s:1h]%s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024 *
+		avg(avg_over_time(pv_hourly_cost[%s:1h]%s)) by (persistentvolume, cluster_id)
+	)`
+
+	// TODO local storage
+
+	// TODO norm for interpolating missed scrapes?
+
+	fmtOffset := ""
+	if offset != "" {
+		fmtOffset = fmt.Sprintf("offset %s", offset)
+	}
+
+	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
+	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
+	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset)
+	numQueries := 3
+
+	klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
+	klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
+	klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
+
+	// Submit queries to Prometheus asynchronously
+	var ec util.ErrorCollector
+	var wg sync.WaitGroup
+	ctx := PromQueryContext{client, &ec, &wg}
+	ctx.wg.Add(numQueries)
+
+	chTotalCPU := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
+
+	chTotalRAM := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalRAM, chTotalRAM, ctx)
+
+	chTotalStorage := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
+
+	// After queries complete, retrieve results
+	wg.Wait()
+
+	resultsTotalCPU := <-chTotalCPU
+	close(chTotalCPU)
+
+	resultsTotalRAM := <-chTotalRAM
+	close(chTotalRAM)
+
+	resultsTotalStorage := <-chTotalStorage
+	close(chTotalStorage)
+
+	// Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
+	costData := make(map[string]map[string]float64)
+	defaultClusterID := os.Getenv(clusterIDKey)
+
+	// Helper function to iterate over Prom query results, parsing the raw values into
+	// the intermediate costData structure.
+	setCostsFromResults := func(costData map[string]map[string]float64, results []*PromQueryResult, name string) {
+		for _, result := range results {
+			clusterID, _ := result.GetString("cluster_id")
+			if clusterID == "" {
+				clusterID = defaultClusterID
+			}
+			if _, ok := costData[clusterID]; !ok {
+				costData[clusterID] = map[string]float64{}
+			}
+			if len(result.Values) > 0 {
+				costData[clusterID][name] += result.Values[0].Value
+				costData[clusterID]["total"] += result.Values[0].Value
+			}
+		}
+	}
+	setCostsFromResults(costData, resultsTotalCPU, "cpu")
+	setCostsFromResults(costData, resultsTotalRAM, "ram")
+	setCostsFromResults(costData, resultsTotalStorage, "storage")
+
+	// Convert intermediate structure to Costs instances
+	costsByCluster := map[string]*ClusterCosts{}
+	for id, cd := range costData {
+		costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["ram"], cd["storage"], window, offset)
+		if err != nil {
+			klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
+			return nil, err
+		}
+		costsByCluster[id] = costs
+	}
+
+	return costsByCluster, nil
+}
+
 type Totals struct {
 	TotalCost   [][]string `json:"totalcost"`
 	CPUCost     [][]string `json:"cpucost"`
@@ -104,7 +295,7 @@ func resultToTotal(qr interface{}) (map[string][][]string, error) {
 }
 
 // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
-func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, window, offset string) (map[string]*Totals, error) {
+func ClusterCostsForAllClusters(cli prometheus.Client, cloud cloud.Provider, window, offset string) (map[string]*Totals, error) {
 	if offset != "" {
 		offset = fmt.Sprintf("offset %s", offset)
 	}
@@ -177,120 +368,9 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
 	return toReturn, nil
 }
 
-// TODO move this to a package-accessible helper struct
-type PromQueryContext struct {
-	client prometheusClient.Client
-	ec     *ErrorCollector
-	wg     *sync.WaitGroup
-}
-
-// TODO move this to a package-accessible helper function
-func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
-	if ctx.wg != nil {
-		defer ctx.wg.Done()
-	}
-
-	raw, promErr := Query(ctx.client, query)
-	ctx.ec.Report(promErr)
-
-	results, parseErr := NewQueryResults(raw)
-	ctx.ec.Report(parseErr)
-
-	resultCh <- results
-}
-
-type ClusterCosts struct {
-	CPU     float64 `json:"cpu"`
-	RAM     float64 `json:"ram"`
-	Storage float64 `json:"storage"`
-	Total   float64 `json:"total"`
-}
-
-// CumulativeClusterCostsForAllClusters gives the cumulative cluster costs summed over a window of time for all clusters.
-func CumulativeClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, window, offset string) (map[string]ClusterCosts, error) {
-	const fmtQueryTotalCPU = `sum(
-		sum(sum_over_time(kube_node_status_capacity_cpu_cores[%s:1h]%s)) by (node, cluster_id) *
-		avg(avg_over_time(node_cpu_hourly_cost[%s:1h]%s)) by (node, cluster_id)
-	)`
-
-	const fmtQueryTotalRAM = `sum(
-		sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1h]%s) / 1024 / 1024 / 1024) by (node, cluster_id) *
-		avg(avg_over_time(node_ram_hourly_cost[%s:1h]%s)) by (node, cluster_id)
-	)`
-
-	const fmtQueryTotalStorage = `sum(
-		sum(sum_over_time(kube_persistentvolume_capacity_bytes[%s:1h]%s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024 *
-		avg(avg_over_time(pv_hourly_cost[%s:1h]%s)) by (persistentvolume, cluster_id)
-	)`
-
-	// TODO local storage
-
-	if offset != "" {
-		offset = fmt.Sprintf("offset %s", offset)
-	}
-
-	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, offset, window, offset)
-	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, offset, window, offset)
-	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, offset, window, offset)
-	numQueries := 3
-
-	var ec ErrorCollector
-	var wg sync.WaitGroup
-	wg.Add(numQueries)
-	ctx := PromQueryContext{cli, &ec, &wg}
-
-	klog.Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
-	chTotalCPU := make(chan []*PromQueryResult)
-	go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
-
-	klog.Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
-	chTotalRAM := make(chan []*PromQueryResult)
-	go AsyncPromQuery(queryTotalRAM, chTotalRAM, ctx)
-
-	klog.Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
-	chTotalStorage := make(chan []*PromQueryResult)
-	go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
-
-	costsPerCluster := make(map[string]ClusterCosts)
-
-	// coreTotal, err := resultToTotal(resultClusterCores)
-	// if err != nil {
-	// 	return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
-	// }
-	// for clusterID, total := range coreTotal {
-	// 	if _, ok := toReturn[clusterID]; !ok {
-	// 		toReturn[clusterID] = &Totals{}
-	// 	}
-	// 	toReturn[clusterID].CPUCost = total
-	// }
-
-	// ramTotal, err := resultToTotal(resultClusterRAM)
-	// if err != nil {
-	// 	return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
-	// }
-	// for clusterID, total := range ramTotal {
-	// 	if _, ok := toReturn[clusterID]; !ok {
-	// 		toReturn[clusterID] = &Totals{}
-	// 	}
-	// 	toReturn[clusterID].MemCost = total
-	// }
-
-	// storageTotal, err := resultToTotal(resultStorage)
-	// if err != nil {
-	// 	return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
-	// }
-	// for clusterID, total := range storageTotal {
-	// 	if _, ok := toReturn[clusterID]; !ok {
-	// 		toReturn[clusterID] = &Totals{}
-	// 	}
-	// 	toReturn[clusterID].StorageCost = total
-	// }
-
-	return costsPerCluster, nil
-}
-
-// ComputeClusterCosts gives the current full cluster costs averaged over a window of time.
-func ComputeClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
+// AverageClusterTotals gives the current full cluster costs averaged over a window of time.
+// Used to be ClutserCosts, but has been deprecated for that use.
+func AverageClusterTotals(cli prometheus.Client, cloud cloud.Provider, windowString, offset string) (*Totals, error) {
 	// turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
 	if offset != "" {
 		offset = fmt.Sprintf("offset %s", offset)
@@ -359,7 +439,7 @@ func ComputeClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Pr
 }
 
 // ClusterCostsOverTime gives the full cluster costs over time
-func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
+func ClusterCostsOverTime(cli prometheus.Client, cloud cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
 
 	localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
 	if err != nil {

+ 39 - 0
util/errors.go

@@ -0,0 +1,39 @@
+package util
+
+import "sync"
+
+// Error collection helper
+type ErrorCollector struct {
+	m      sync.Mutex
+	errors []error
+}
+
+// Reports an error to the collector. Ignores if the error is nil.
+func (ec *ErrorCollector) Report(e error) {
+	if e == nil {
+		return
+	}
+
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	ec.errors = append(ec.errors, e)
+}
+
+// Whether or not the collector caught errors
+func (ec *ErrorCollector) IsError() bool {
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	return len(ec.errors) > 0
+}
+
+// Errors caught by the collector
+func (ec *ErrorCollector) Errors() []error {
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	errs := make([]error, len(ec.errors))
+	copy(errs, ec.errors)
+	return errs
+}

+ 87 - 0
util/time.go

@@ -0,0 +1,87 @@
+package util
+
+import (
+	"fmt"
+	"strconv"
+	"time"
+)
+
+const (
+	HoursPerDay   = 24.0
+	HoursPerMonth = 730.0
+	DaysPerMonth  = 30.42
+)
+
+// ParseDuration converts a Prometheus-style duration string into a Duration
+func ParseDuration(duration string) (*time.Duration, error) {
+	unitStr := duration[len(duration)-1:]
+	var unit time.Duration
+	switch unitStr {
+	case "s":
+		unit = time.Second
+	case "m":
+		unit = time.Minute
+	case "h":
+		unit = time.Hour
+	case "d":
+		unit = 24.0 * time.Hour
+	default:
+		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
+
+	amountStr := duration[:len(duration)-1]
+	amount, err := strconv.ParseInt(amountStr, 10, 64)
+	if err != nil {
+		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
+
+	dur := time.Duration(amount) * unit
+	return &dur, nil
+}
+
+// ParseTimeRange returns a start and end time, respectively, which are converted from
+// a duration and offset, defined as strings with Prometheus-style syntax.
+func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
+	// endTime defaults to the current time, unless an offset is explicity declared,
+	// in which case it shifts endTime back by given duration
+	endTime := time.Now()
+	if offset != "" {
+		o, err := ParseDuration(offset)
+		if err != nil {
+			return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
+		}
+		endTime = endTime.Add(-1 * *o)
+	}
+
+	// if duration is defined in terms of days, convert to hours
+	// e.g. convert "2d" to "48h"
+	durationNorm, err := normalizeTimeParam(duration)
+	if err != nil {
+		return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
+	}
+
+	// convert time duration into start and end times, formatted
+	// as ISO datetime strings
+	dur, err := time.ParseDuration(durationNorm)
+	if err != nil {
+		return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
+	}
+	startTime := endTime.Add(-1 * dur)
+
+	return &startTime, &endTime, nil
+}
+
+func normalizeTimeParam(param string) (string, error) {
+	// convert days to hours
+	if param[len(param)-1:] == "d" {
+		count := param[:len(param)-1]
+		val, err := strconv.ParseInt(count, 10, 64)
+		if err != nil {
+			return "", err
+		}
+		val = val * 24
+		param = fmt.Sprintf("%dh", val)
+	}
+
+	return param, nil
+}