Ver Fonte

Merge pull request #166 from kubecost/AjayTripathy-sql

add multiclusterlabels
Ajay Tripathy há 6 anos atrás
pai
commit
87a44fd220
4 ficheiros alterados com 106 adições e 20 exclusões
  1. 15 15
      costmodel/aggregations.go
  2. 8 0
      costmodel/costmodel.go
  3. 55 2
      costmodel/sql.go
  4. 28 3
      main.go

+ 15 - 15
costmodel/aggregations.go

@@ -26,25 +26,25 @@ type Aggregation struct {
 	TotalCost          float64   `json:"totalCost"`
 }
 
-func AggregateCostModel(costData map[string]*CostData, aggregationField string, aggregationSubField string) map[string]*Aggregation {
+func AggregateCostModel(costData map[string]*CostData, discount float64, aggregationField string, aggregationSubField string) map[string]*Aggregation {
 	aggregations := make(map[string]*Aggregation)
 	for _, costDatum := range costData {
 		if aggregationField == "cluster" {
-			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations)
+			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations, discount)
 		} else if aggregationField == "namespace" {
-			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations)
+			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations, discount)
 		} else if aggregationField == "service" {
 			if len(costDatum.Services) > 0 {
-				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations)
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations, discount)
 			}
 		} else if aggregationField == "deployment" {
 			if len(costDatum.Deployments) > 0 {
-				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations)
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations, discount)
 			}
 		} else if aggregationField == "label" {
 			if costDatum.Labels != nil {
 				if subfieldName, ok := costDatum.Labels[aggregationSubField]; ok {
-					aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations)
+					aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations, discount)
 				}
 			}
 		}
@@ -59,7 +59,7 @@ func AggregateCostModel(costData map[string]*CostData, aggregationField string,
 	return aggregations
 }
 
-func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation) {
+func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation, discount float64) {
 	if _, ok := aggregations[key]; !ok {
 		agg := &Aggregation{}
 		agg.Aggregator = aggregator
@@ -68,15 +68,15 @@ func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubFiel
 		agg.Cluster = costDatum.ClusterID
 		aggregations[key] = agg
 	}
-	mergeVectors(costDatum, aggregations[key])
+	mergeVectors(costDatum, aggregations[key], discount)
 }
 
-func mergeVectors(costDatum *CostData, aggregation *Aggregation) {
+func mergeVectors(costDatum *CostData, aggregation *Aggregation, discount float64) {
 	aggregation.CPUAllocation = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocation)
 	aggregation.RAMAllocation = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocation)
 	aggregation.GPUAllocation = addVectors(costDatum.GPUReq, aggregation.GPUAllocation)
 
-	cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum)
+	cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum, discount)
 	aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
 	aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
 	aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
@@ -85,13 +85,13 @@ func mergeVectors(costDatum *CostData, aggregation *Aggregation) {
 	}
 }
 
-func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
+func getPriceVectors(costDatum *CostData, discount float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
 	cpuv := make([]*Vector, 0, len(costDatum.CPUAllocation))
 	for _, val := range costDatum.CPUAllocation {
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.VCPUCost, 64)
 		cpuv = append(cpuv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value * cost,
+			Value:     val.Value * cost * (1 - discount),
 		})
 	}
 	ramv := make([]*Vector, 0, len(costDatum.RAMAllocation))
@@ -99,7 +99,7 @@ func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.RAMCost, 64)
 		ramv = append(ramv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     (val.Value / 1024 / 1024 / 1024) * cost,
+			Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount),
 		})
 	}
 	gpuv := make([]*Vector, 0, len(costDatum.GPUReq))
@@ -107,7 +107,7 @@ func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.GPUCost, 64)
 		gpuv = append(gpuv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value * cost,
+			Value:     val.Value * cost * (1 - discount),
 		})
 	}
 	pvvs := make([][]*Vector, 0, len(costDatum.PVCData))
@@ -118,7 +118,7 @@ func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]
 			for _, val := range pvcData.Values {
 				pvv = append(pvv, &Vector{
 					Timestamp: math.Round(val.Timestamp/10) * 10,
-					Value:     (val.Value / 1024 / 1024 / 1024) * cost,
+					Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount),
 				})
 			}
 			pvvs = append(pvvs, pvv)

+ 8 - 0
costmodel/costmodel.go

@@ -41,6 +41,7 @@ const (
 	epConfig          = apiPrefix + "/status/config"
 	epFlags           = apiPrefix + "/status/flags"
 	remoteEnabled     = "REMOTE_WRITE_ENABLED"
+	CLUSTER_ID        = "CLUSTER_ID"
 )
 
 type CostModel struct {
@@ -291,6 +292,8 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
 	normalization := fmt.Sprintf(normalizationStr, window, offset)
 
+	clustID := os.Getenv(CLUSTER_ID)
+
 	var wg sync.WaitGroup
 	wg.Add(8)
 
@@ -551,6 +554,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 					PVCData:         pvReq,
 					Labels:          podLabels,
 					NamespaceLabels: nsLabels,
+					ClusterID:       clustID,
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -619,6 +623,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
+				ClusterID:       clustID,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -1132,6 +1137,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
+	clustID := os.Getenv(CLUSTER_ID)
 	remoteEnabled := os.Getenv(remoteEnabled)
 	if remoteEnabled == "true" {
 		remoteLayout := "2006-01-02T15:04:05Z"
@@ -1402,6 +1408,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 					PVCData:         pvReq,
 					Labels:          podLabels,
 					NamespaceLabels: nsLabels,
+					ClusterID:       clustID,
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -1468,6 +1475,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
+				ClusterID:       clustID,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)

+ 55 - 2
costmodel/sql.go

@@ -2,10 +2,13 @@ package costmodel
 
 import (
 	"database/sql"
+	"encoding/json"
 	"fmt"
 	"os"
 	"time"
 
+	"k8s.io/klog"
+
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	_ "github.com/lib/pq"
 )
@@ -159,8 +162,6 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 	if err != nil {
 		return nil, err
 	}
-	defer rows.Close()
-
 	for rows.Next() {
 		var (
 			bucket    string
@@ -221,5 +222,57 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 			}
 		}
 	}
+	query = `SELECT DISTINCT ON (labels->>'namespace') * FROM METRICS WHERE name='kube_namespace_labels' ORDER BY labels->>'namespace',time DESC;`
+	rows, err = db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	cols, err := rows.Columns()
+	if err != nil {
+		return nil, err
+	}
+	rawResult := make([][]byte, len(cols))
+	result := make([]string, len(cols))
+	dest := make([]interface{}, len(cols)) // A temporary interface{} slice
+	for i, _ := range rawResult {
+		dest[i] = &rawResult[i] // Put pointers to each string in the interface slice
+	}
+	nsToLabels := make(map[string]map[string]string)
+	for rows.Next() {
+		err = rows.Scan(dest...)
+		if err != nil {
+			return nil, err
+		}
+
+		for i, raw := range rawResult {
+			if raw == nil {
+				result[i] = "\\N"
+			} else {
+				result[i] = string(raw)
+			}
+		}
+
+		klog.Infof("%#v\n", result)
+		var dat map[string]string
+		err := json.Unmarshal([]byte(result[4]), &dat)
+		if err != nil {
+			return nil, err
+		}
+
+		ns, ok := dat["namespace"]
+		if !ok {
+			return nil, fmt.Errorf("No namespace found")
+		}
+		nsToLabels[ns] = dat
+	}
+
+	for _, cd := range model {
+		ns := cd.Namespace
+		if labels, ok := nsToLabels[ns]; ok {
+			cd.NamespaceLabels = labels
+			cd.Labels = labels // TODO: override with podlabels
+		}
+	}
+
 	return model, nil
 }

+ 28 - 3
main.go

@@ -145,7 +145,16 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 
 	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
 	if aggregation != "" {
-		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		c, err := a.Cloud.GetConfig()
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+
+		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -234,8 +243,16 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		w.Write(wrapData(nil, err))
 		return
 	}
+	c, err := a.Cloud.GetConfig()
+	if err != nil {
+		w.Write(wrapData(nil, err))
+	}
+	discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+	}
 	if aggregation != "" {
-		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		agg := costModel.AggregateCostModel(data, discount*0.01, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	}
 }
@@ -257,7 +274,15 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		w.Write(wrapData(nil, err))
 	}
 	if aggregation != "" {
-		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		c, err := a.Cloud.GetConfig()
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {