Просмотр исходного кода

Fix tests; clean up ClusterDisks and ClusterLoadBalancers; add RFC3339 to legal time parameters for Prometheus and Thanos querying

Niko Kovacevic 4 лет назад
Родитель
Сommit
e705b0448c
3 измененных файлов с 52 добавлено и 33 удалено
  1. 16 18
      pkg/costmodel/cluster.go
  2. 8 5
      pkg/costmodel/cluster_helpers_test.go
  3. 28 10
      pkg/costmodel/router.go

+ 16 - 18
pkg/costmodel/cluster.go

@@ -118,7 +118,12 @@ type Disk struct {
 	Breakdown  *ClusterCostsBreakdown
 }
 
-func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[string]*Disk, error) {
+type DiskIdentifier struct {
+	Cluster string
+	Name    string
+}
+
+func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
 	// Query for the duration between start and end
 	durStr := timeutil.DurationString(end.Sub(start))
 
@@ -168,7 +173,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 		return nil, ctx.ErrorCollection()
 	}
 
-	diskMap := map[string]*Disk{}
+	diskMap := map[DiskIdentifier]*Disk{}
 
 	pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, provider)
 
@@ -185,7 +190,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 		}
 
 		cost := result.Values[0].Value
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -210,7 +215,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 		}
 
 		cost := result.Values[0].Value
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -235,7 +240,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 		}
 
 		bytes := result.Values[0].Value
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -263,7 +268,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 			continue
 		}
 
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
 			continue
@@ -539,9 +544,6 @@ func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[L
 	queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
 	queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
 
-	log.Infof("[Prom] t=%d q=%s", t.Unix(), queryLBCost)
-	log.Infof("[Prom] t=%d q=%s", t.Unix(), queryActiveMins)
-
 	resChLBCost := ctx.QueryAtTime(queryLBCost, t)
 	resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
 
@@ -1105,7 +1107,7 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 	}, nil
 }
 
-func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
+func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
 	for _, result := range resActiveMins {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
@@ -1122,7 +1124,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 			continue
 		}
 
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -1156,7 +1158,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 		// TODO niko/assets storage class
 
 		bytes := result.Values[0].Value
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -1188,9 +1190,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 		// TODO niko/assets storage class
 
 		var cost float64
-
 		if customPricingEnabled && customPricingConfig != nil {
-
 			customPVCostStr := customPricingConfig.Storage
 
 			customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
@@ -1199,14 +1199,11 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 			}
 
 			cost = customPVCost
-
 		} else {
-
 			cost = result.Values[0].Value
-
 		}
 
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -1214,6 +1211,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 				Breakdown: &ClusterCostsBreakdown{},
 			}
 		}
+		fmt.Printf("price: %.5f & minutes: %.5f & cost: %.5f\n", cost, diskMap[key].Minutes, diskMap[key].Cost)
 		diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
 		providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
 		if providerID != "" {

+ 8 - 5
pkg/costmodel/cluster_helpers_test.go

@@ -5,9 +5,8 @@ import (
 	"testing"
 	"time"
 
-	"github.com/kubecost/cost-model/pkg/config"
-
 	"github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
 
@@ -927,7 +926,11 @@ func TestAssetCustompricing(t *testing.T) {
 			Values: []*util.Vector{
 				&util.Vector{
 					Timestamp: 0,
-					Value:     60.0,
+					Value:     1.0,
+				},
+				&util.Vector{
+					Timestamp: 3600.0,
+					Value:     1.0,
 				},
 			},
 		},
@@ -996,10 +999,10 @@ func TestAssetCustompricing(t *testing.T) {
 			ramResult := ramMap[nodeKey]
 			gpuResult := gpuMap[nodeKey]
 
-			diskMap := map[string]*Disk{}
+			diskMap := map[DiskIdentifier]*Disk{}
 			pvCosts(diskMap, time.Hour, pvMinsPromResult, pvSizePromResult, pvCostPromResult, testProvider)
 
-			diskResult := diskMap["cluster1/pvc1"].Cost
+			diskResult := diskMap[DiskIdentifier{"cluster1", "pvc1"}].Cost
 
 			if !util.IsApproximately(cpuResult, testCase.expectedPricing["CPU"]) {
 				t.Errorf("CPU custom pricing error in %s. Got %v but expected %v", testCase.name, cpuResult, testCase.expectedPricing["CPU"])

+ 28 - 10
pkg/costmodel/router.go

@@ -692,15 +692,24 @@ func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ htt
 		return
 	}
 
-	// TODO test to make sure "time" does not get set, if not given
+	// Attempt to parse time as either a unix timestamp or as an RFC3339 value
+	var timeVal time.Time
+	timeStr := qp.Get("time", "")
+	if len(timeStr) > 0 {
+		if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
+			timeVal = time.Unix(t, 0)
+		} else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
+			timeVal = t
+		}
 
-	var ts time.Time
-	if qp.GetInt64("time", 0) > 0 {
-		ts = time.Unix(qp.GetInt64("time", 0), 0)
+		// If time is given, but not parse-able, return an error
+		if timeVal.IsZero() {
+			http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
+		}
 	}
 
 	ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
-	body, err := ctx.RawQuery(query, ts)
+	body, err := ctx.RawQuery(query, timeVal)
 	if err != nil {
 		w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
 		return
@@ -752,15 +761,24 @@ func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprou
 		return
 	}
 
-	// TODO test to make sure "time" does not get set, if not given
+	// Attempt to parse time as either a unix timestamp or as an RFC3339 value
+	var timeVal time.Time
+	timeStr := qp.Get("time", "")
+	if len(timeStr) > 0 {
+		if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
+			timeVal = time.Unix(t, 0)
+		} else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
+			timeVal = t
+		}
 
-	var ts time.Time
-	if qp.GetInt64("time", 0) > 0 {
-		ts = time.Unix(qp.GetInt64("time", 0), 0)
+		// If time is given, but not parse-able, return an error
+		if timeVal.IsZero() {
+			http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
+		}
 	}
 
 	ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
-	body, err := ctx.RawQuery(query, ts)
+	body, err := ctx.RawQuery(query, timeVal)
 	if err != nil {
 		w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
 		return