فهرست منبع

add aggregation endpoint

AjayTripathy 6 سال پیش
والد
کامیت
392f12829b
6فایلهای تغییر یافته به همراه484 افزوده شده و 227 حذف شده
  1. 167 193
      costmodel/aggregations.go
  2. 4 4
      costmodel/cluster.go
  3. 22 22
      costmodel/costmodel.go
  4. 225 0
      costmodel/sql.go
  5. 1 0
      go.sum
  6. 65 8
      main.go

+ 167 - 193
costmodel/aggregations.go

@@ -1,225 +1,199 @@
 package costmodel
 
 import (
-	"database/sql"
-	"fmt"
-	"os"
-	"time"
-
-	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
-	_ "github.com/lib/pq"
+	"math"
+	"sort"
+	"strconv"
 )
 
-const remotePW = "REMOTE_WRITE_PASSWORD"
-const sqlAddress = "SQL_ADDRESS"
-
-func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
-
-	nodes := make(map[string]*costAnalyzerCloud.Node)
+type Aggregation struct {
+	Aggregator         string    `json:"aggregator"`
+	AggregatorSubField string    `json:"aggregatorSubField"`
+	Environment        string    `json:"environment"`
+	Cluster            string    `json:"cluster"`
+	CPUAllocation      []*Vector `json:"-"`
+	CPUCostVector      []*Vector `json:"-"`
+	RAMAllocation      []*Vector `json:"-"`
+	RAMCostVector      []*Vector `json:"-"`
+	PVCostVector       []*Vector `json:"-"`
+	GPUAllocation      []*Vector `json:"-"`
+	GPUCostVector      []*Vector `json:"-"`
+	CPUCost            float64   `json:"cpuCost"`
+	RAMCost            float64   `json:"ramCost"`
+	GPUCost            float64   `json:"gpuCost"`
+	PVCost             float64   `json:"pvCost"`
+	NetworkCost        float64   `json:"networkCost"`
+	TotalCost          float64   `json:"totalCost"`
+}
 
-	query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
-	FROM metrics
-	WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost')  AND value != 'NaN' AND value != 0
-	GROUP BY instance,name,clusterid`
-	rows, err := db.Query(query)
-	if err != nil {
-		return nil, err
-	}
-	defer rows.Close()
-	for rows.Next() {
-		var (
-			name      string
-			avg       float64
-			instance  string
-			clusterid string
-		)
-		if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
-			return nil, err
-		}
-		if data, ok := nodes[instance]; ok {
-			if name == "node_cpu_hourly_cost" {
-				data.VCPUCost = fmt.Sprintf("%f", avg)
-			} else if name == "node_ram_hourly_cost" {
-				data.RAMCost = fmt.Sprintf("%f", avg)
-			} else if name == "node_gpu_hourly_cost" {
-				data.GPUCost = fmt.Sprintf("%f", avg)
+func AggregateCostModel(costData map[string]*CostData, aggregationField string, aggregationSubField string) map[string]*Aggregation {
+	aggregations := make(map[string]*Aggregation)
+	for _, costDatum := range costData {
+		if aggregationField == "cluster" {
+			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations)
+		} else if aggregationField == "namespace" {
+			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations)
+		} else if aggregationField == "service" {
+			if len(costDatum.Services) > 0 {
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations)
+			}
+		} else if aggregationField == "deployment" {
+			if len(costDatum.Deployments) > 0 {
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations)
 			}
-		} else {
-			nodes[instance] = &costAnalyzerCloud.Node{}
-			data := nodes[instance]
-			if name == "node_cpu_hourly_cost" {
-				data.VCPUCost = fmt.Sprintf("%f", avg)
-			} else if name == "node_ram_hourly_cost" {
-				data.RAMCost = fmt.Sprintf("%f", avg)
-			} else if name == "node_gpu_hourly_cost" {
-				data.GPUCost = fmt.Sprintf("%f", avg)
+		} else if aggregationField == "label" {
+			if costDatum.Labels != nil {
+				if subfieldName, ok := costDatum.Labels[aggregationSubField]; ok {
+					aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations)
+				}
 			}
 		}
+	}
+	for _, agg := range aggregations {
+		agg.CPUCost = totalVector(agg.CPUCostVector)
+		agg.RAMCost = totalVector(agg.RAMCostVector)
+		agg.GPUCost = totalVector(agg.GPUCostVector)
+		agg.PVCost = totalVector(agg.PVCostVector)
+		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost
+	}
+	return aggregations
+}
 
+func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation) {
+	if _, ok := aggregations[key]; !ok {
+		agg := &Aggregation{}
+		agg.Aggregator = aggregator
+		agg.AggregatorSubField = aggregatorSubField
+		agg.Environment = key
+		agg.Cluster = costDatum.ClusterID
+		aggregations[key] = agg
 	}
+	mergeVectors(costDatum, aggregations[key])
+}
+
+func mergeVectors(costDatum *CostData, aggregation *Aggregation) {
+	aggregation.CPUAllocation = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocation)
+	aggregation.RAMAllocation = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocation)
+	aggregation.GPUAllocation = addVectors(costDatum.GPUReq, aggregation.GPUAllocation)
 
-	return nodes, nil
+	cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum)
+	aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
+	aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
+	aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
+	for _, vectorList := range pvvs {
+		aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
+	}
 }
 
-func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
-	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
-	db, err := sql.Open("postgres", connStr)
-	defer db.Close()
-	if err != nil {
-		return nil, err
+func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
+	cpuv := make([]*Vector, 0, len(costDatum.CPUAllocation))
+	for _, val := range costDatum.CPUAllocation {
+		cost, _ := strconv.ParseFloat(costDatum.NodeData.VCPUCost, 64)
+		cpuv = append(cpuv, &Vector{
+			Timestamp: math.Round(val.Timestamp/10) * 10,
+			Value:     val.Value * cost,
+		})
 	}
-	nodes, err := getNodeCosts(db)
-	if err != nil {
-		return nil, err
+	ramv := make([]*Vector, 0, len(costDatum.RAMAllocation))
+	for _, val := range costDatum.RAMAllocation {
+		cost, _ := strconv.ParseFloat(costDatum.NodeData.RAMCost, 64)
+		ramv = append(ramv, &Vector{
+			Timestamp: math.Round(val.Timestamp/10) * 10,
+			Value:     (val.Value / 1024 / 1024 / 1024) * cost,
+		})
 	}
-	model := make(map[string]*CostData)
-	query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
-	FROM metrics
-	WHERE (name='container_cpu_allocation') AND
-	  time > $2 AND time < $3 AND value != 'NaN'
-	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
-	ORDER BY container,bucket;
-	`
-	rows, err := db.Query(query, window, start, end)
-	if err != nil {
-		return nil, err
+	gpuv := make([]*Vector, 0, len(costDatum.GPUReq))
+	for _, val := range costDatum.GPUReq {
+		cost, _ := strconv.ParseFloat(costDatum.NodeData.GPUCost, 64)
+		gpuv = append(gpuv, &Vector{
+			Timestamp: math.Round(val.Timestamp/10) * 10,
+			Value:     val.Value * cost,
+		})
 	}
-	defer rows.Close()
-
-	for rows.Next() {
-		var (
-			bucket    string
-			name      string
-			sum       float64
-			container string
-			pod       string
-			namespace string
-			instance  string
-			clusterid string
-		)
-		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
-			return nil, err
-		}
-		layout := "2006-01-02T15:04:05Z"
-		t, err := time.Parse(layout, bucket)
-		if err != nil {
-			return nil, err
+	pvvs := make([][]*Vector, 0, len(costDatum.PVCData))
+	for _, pvcData := range costDatum.PVCData {
+		pvv := make([]*Vector, 0, len(pvcData.Values))
+		if pvcData.Volume != nil {
+			cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
+			for _, val := range pvcData.Values {
+				pvv = append(pvv, &Vector{
+					Timestamp: math.Round(val.Timestamp/10) * 10,
+					Value:     (val.Value / 1024 / 1024 / 1024) * cost,
+				})
+			}
+			pvvs = append(pvvs, pvv)
 		}
+	}
+	return cpuv, ramv, gpuv, pvvs
+}
 
-		k := newContainerMetricFromValues(namespace, pod, container, instance)
-		key := k.Key()
-		allocationVector := &Vector{
-			Timestamp: float64(t.Unix()),
-			Value:     sum,
-		}
-		if data, ok := model[key]; ok {
-			if name == "container_cpu_allocation" {
-				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
-			} else if name == "container_memory_allocation_bytes" {
-				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
-			} else if name == "container_gpu_allocation" {
-				data.GPUReq = append(data.GPUReq, allocationVector)
-			}
-		} else {
-			node, ok := nodes[instance]
-			if !ok {
-				return nil, fmt.Errorf("No node found")
-			}
-			model[key] = &CostData{
-				Name:          container,
-				PodName:       pod,
-				NodeName:      instance,
-				NodeData:      node,
-				CPUAllocation: []*Vector{},
-				RAMAllocation: []*Vector{},
-				GPUReq:        []*Vector{},
-				Namespace:     namespace,
-				ClusterID:     clusterid,
-			}
-			data := model[key]
-			if name == "container_cpu_allocation" {
-				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
-			} else if name == "container_memory_allocation_bytes" {
-				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
-			} else if name == "container_gpu_allocation" {
-				data.GPUReq = append(data.GPUReq, allocationVector)
+func totalVector(vectors []*Vector) float64 {
+	total := 0.0
+	for _, vector := range vectors {
+		total += vector.Value
+	}
+	return total
+}
+
+func addVectors(req []*Vector, used []*Vector) []*Vector {
+	if req == nil || len(req) == 0 {
+		for _, usedV := range used {
+			if usedV.Timestamp == 0 {
+				continue
 			}
+			usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
 		}
+		return used
 	}
-	query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
-	FROM metrics
-	WHERE (name='container_memory_allocation_bytes') AND
-		time > $2 AND time < $3 AND value != 'NaN'
-	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
-	ORDER BY container,bucket;
-	`
-	rows, err = db.Query(query, window, start, end)
-	if err != nil {
-		return nil, err
+	if used == nil || len(used) == 0 {
+		for _, reqV := range req {
+			if reqV.Timestamp == 0 {
+				continue
+			}
+			reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
+		}
+		return req
 	}
-	defer rows.Close()
+	var allocation []*Vector
 
-	for rows.Next() {
-		var (
-			bucket    string
-			name      string
-			sum       float64
-			container string
-			pod       string
-			namespace string
-			instance  string
-			clusterid string
-		)
-		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
-			return nil, err
+	var timestamps []float64
+	reqMap := make(map[float64]float64)
+	for _, reqV := range req {
+		if reqV.Timestamp == 0 {
+			continue
+		}
+		reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
+		reqMap[reqV.Timestamp] = reqV.Value
+		timestamps = append(timestamps, reqV.Timestamp)
+	}
+	usedMap := make(map[float64]float64)
+	for _, usedV := range used {
+		if usedV.Timestamp == 0 {
+			continue
 		}
-		layout := "2006-01-02T15:04:05Z"
-		t, err := time.Parse(layout, bucket)
-		if err != nil {
-			return nil, err
+		usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
+		usedMap[usedV.Timestamp] = usedV.Value
+		if _, ok := reqMap[usedV.Timestamp]; !ok { // no need to double add, since we'll range over sorted timestamps and check.
+			timestamps = append(timestamps, usedV.Timestamp)
 		}
+	}
 
-		k := newContainerMetricFromValues(namespace, pod, container, instance)
-		key := k.Key()
+	sort.Float64s(timestamps)
+	for _, t := range timestamps {
+		rv, okR := reqMap[t]
+		uv, okU := usedMap[t]
 		allocationVector := &Vector{
-			Timestamp: float64(t.Unix()),
-			Value:     sum,
+			Timestamp: t,
 		}
-		if data, ok := model[key]; ok {
-			if name == "container_cpu_allocation" {
-				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
-			} else if name == "container_memory_allocation_bytes" {
-				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
-			} else if name == "container_gpu_allocation" {
-				data.GPUReq = append(data.GPUReq, allocationVector)
-			}
-		} else {
-			node, ok := nodes[instance]
-			if !ok {
-				return nil, fmt.Errorf("No node found")
-			}
-			model[key] = &CostData{
-				Name:          container,
-				PodName:       pod,
-				NodeName:      instance,
-				NodeData:      node,
-				CPUAllocation: []*Vector{},
-				RAMAllocation: []*Vector{},
-				GPUReq:        []*Vector{},
-				Namespace:     namespace,
-				ClusterID:     clusterid,
-			}
-			data := model[key]
-			if name == "container_cpu_allocation" {
-				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
-			} else if name == "container_memory_allocation_bytes" {
-				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
-			} else if name == "container_gpu_allocation" {
-				data.GPUReq = append(data.GPUReq, allocationVector)
-			}
+		if okR && okU {
+			allocationVector.Value = rv + uv
+		} else if okR {
+			allocationVector.Value = rv
+		} else if okU {
+			allocationVector.Value = uv
 		}
+		allocation = append(allocation, allocationVector)
 	}
-	return model, nil
+
+	return allocation
 }

+ 4 - 4
costmodel/cluster.go

@@ -125,21 +125,21 @@ func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider,
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 
-	resultClusterCores, err := query(cli, qCores)
+	resultClusterCores, err := Query(cli, qCores)
 	if err != nil {
 		return nil, err
 	}
-	resultClusterRAM, err := query(cli, qRAM)
+	resultClusterRAM, err := Query(cli, qRAM)
 	if err != nil {
 		return nil, err
 	}
 
-	resultStorage, err := query(cli, qStorage)
+	resultStorage, err := Query(cli, qStorage)
 	if err != nil {
 		return nil, err
 	}
 
-	resultTotal, err := query(cli, qTotal)
+	resultTotal, err := Query(cli, qTotal)
 	if err != nil {
 		return nil, err
 	}

+ 22 - 22
costmodel/costmodel.go

@@ -183,7 +183,7 @@ type PrometheusMetadata struct {
 
 // ValidatePrometheus tells the model what data prometheus has on it.
 func ValidatePrometheus(cli prometheusClient.Client) (*PrometheusMetadata, error) {
-	data, err := query(cli, "up")
+	data, err := Query(cli, "up")
 	if err != nil {
 		return &PrometheusMetadata{
 			Running:            false,
@@ -262,11 +262,11 @@ func getUptimeData(qr interface{}) ([]*Vector, bool, error) {
 }
 
 func ComputeUptimes(cli prometheusClient.Client) (map[string]float64, error) {
-	res, err := query(cli, `container_start_time_seconds{container_name != "POD",container_name != ""}`)
+	res, err := Query(cli, `container_start_time_seconds{container_name != "POD",container_name != ""}`)
 	if err != nil {
 		return nil, err
 	}
-	vectors, err := getContainerMetricVector(res, false, 0)
+	vectors, err := GetContainerMetricVector(res, false, 0)
 	if err != nil {
 		return nil, err
 	}
@@ -297,37 +297,37 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	var promErr error
 	var resultRAMRequests interface{}
 	go func() {
-		resultRAMRequests, promErr = query(cli, queryRAMRequests)
+		resultRAMRequests, promErr = Query(cli, queryRAMRequests)
 		defer wg.Done()
 	}()
 	var resultRAMUsage interface{}
 	go func() {
-		resultRAMUsage, promErr = query(cli, queryRAMUsage)
+		resultRAMUsage, promErr = Query(cli, queryRAMUsage)
 		defer wg.Done()
 	}()
 	var resultCPURequests interface{}
 	go func() {
-		resultCPURequests, promErr = query(cli, queryCPURequests)
+		resultCPURequests, promErr = Query(cli, queryCPURequests)
 		defer wg.Done()
 	}()
 	var resultCPUUsage interface{}
 	go func() {
-		resultCPUUsage, promErr = query(cli, queryCPUUsage)
+		resultCPUUsage, promErr = Query(cli, queryCPUUsage)
 		defer wg.Done()
 	}()
 	var resultGPURequests interface{}
 	go func() {
-		resultGPURequests, promErr = query(cli, queryGPURequests)
+		resultGPURequests, promErr = Query(cli, queryGPURequests)
 		defer wg.Done()
 	}()
 	var resultPVRequests interface{}
 	go func() {
-		resultPVRequests, promErr = query(cli, queryPVRequests)
+		resultPVRequests, promErr = Query(cli, queryPVRequests)
 		defer wg.Done()
 	}()
 	var normalizationResult interface{}
 	go func() {
-		normalizationResult, promErr = query(cli, normalization)
+		normalizationResult, promErr = Query(cli, normalization)
 		defer wg.Done()
 	}()
 
@@ -389,7 +389,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	containerNameCost := make(map[string]*CostData)
 	containers := make(map[string]bool)
 
-	RAMReqMap, err := getContainerMetricVector(resultRAMRequests, true, normalizationValue)
+	RAMReqMap, err := GetContainerMetricVector(resultRAMRequests, true, normalizationValue)
 	if err != nil {
 		return nil, err
 	}
@@ -397,28 +397,28 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		containers[key] = true
 	}
 
-	RAMUsedMap, err := getContainerMetricVector(resultRAMUsage, true, normalizationValue)
+	RAMUsedMap, err := GetContainerMetricVector(resultRAMUsage, true, normalizationValue)
 	if err != nil {
 		return nil, err
 	}
 	for key := range RAMUsedMap {
 		containers[key] = true
 	}
-	CPUReqMap, err := getContainerMetricVector(resultCPURequests, true, normalizationValue)
+	CPUReqMap, err := GetContainerMetricVector(resultCPURequests, true, normalizationValue)
 	if err != nil {
 		return nil, err
 	}
 	for key := range CPUReqMap {
 		containers[key] = true
 	}
-	GPUReqMap, err := getContainerMetricVector(resultGPURequests, true, normalizationValue)
+	GPUReqMap, err := GetContainerMetricVector(resultGPURequests, true, normalizationValue)
 	if err != nil {
 		return nil, err
 	}
 	for key := range GPUReqMap {
 		containers[key] = true
 	}
-	CPUUsedMap, err := getContainerMetricVector(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
+	CPUUsedMap, err := GetContainerMetricVector(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
 	if err != nil {
 		return nil, err
 	}
@@ -653,7 +653,7 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 		l := strings.Join(q, "|")
 		queryHistoricalPodLabels := fmt.Sprintf(`kube_pod_labels{pod=~"%s"}[%s]`, l, window)
 
-		podLabelsResult, err := query(cli, queryHistoricalPodLabels)
+		podLabelsResult, err := Query(cli, queryHistoricalPodLabels)
 		if err != nil {
 			return fmt.Errorf("Error fetching historical pod labels: " + err.Error())
 		}
@@ -729,15 +729,15 @@ func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*c
 		queryHistoricalRAMCost := fmt.Sprintf(`avg_over_time(node_ram_hourly_cost{instance=~"%s"}[%s])`, l, window)
 		queryHistoricalGPUCost := fmt.Sprintf(`avg_over_time(node_gpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
 
-		cpuCostResult, err := query(cli, queryHistoricalCPUCost)
+		cpuCostResult, err := Query(cli, queryHistoricalCPUCost)
 		if err != nil {
 			return fmt.Errorf("Error fetching cpu cost data: " + err.Error())
 		}
-		ramCostResult, err := query(cli, queryHistoricalRAMCost)
+		ramCostResult, err := Query(cli, queryHistoricalRAMCost)
 		if err != nil {
 			return fmt.Errorf("Error fetching ram cost data: " + err.Error())
 		}
-		gpuCostResult, err := query(cli, queryHistoricalGPUCost)
+		gpuCostResult, err := Query(cli, queryHistoricalGPUCost)
 		if err != nil {
 			return fmt.Errorf("Error fetching gpu cost data: " + err.Error())
 		}
@@ -1176,7 +1176,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	}()
 	var normalizationResult interface{}
 	go func() {
-		normalizationResult, promErr = query(cli, normalization)
+		normalizationResult, promErr = Query(cli, normalization)
 		defer wg.Done()
 	}()
 
@@ -1805,7 +1805,7 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 	return toReturn, err
 }
 
-func query(cli prometheusClient.Client, query string) (interface{}, error) {
+func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 	u := cli.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)
@@ -1946,7 +1946,7 @@ func newContainerMetricFromPrometheus(metrics map[string]interface{}) (*Containe
 	}, nil
 }
 
-func getContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
+func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(qr)

+ 225 - 0
costmodel/sql.go

@@ -0,0 +1,225 @@
+package costmodel
+
+import (
+	"database/sql"
+	"fmt"
+	"os"
+	"time"
+
+	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
+	_ "github.com/lib/pq"
+)
+
+const remotePW = "REMOTE_WRITE_PASSWORD"
+const sqlAddress = "SQL_ADDRESS"
+
+func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
+
+	nodes := make(map[string]*costAnalyzerCloud.Node)
+
+	query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost')  AND value != 'NaN' AND value != 0
+	GROUP BY instance,name,clusterid`
+	rows, err := db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var (
+			name      string
+			avg       float64
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		if data, ok := nodes[instance]; ok {
+			if name == "node_cpu_hourly_cost" {
+				data.VCPUCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_ram_hourly_cost" {
+				data.RAMCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_gpu_hourly_cost" {
+				data.GPUCost = fmt.Sprintf("%f", avg)
+			}
+		} else {
+			nodes[instance] = &costAnalyzerCloud.Node{}
+			data := nodes[instance]
+			if name == "node_cpu_hourly_cost" {
+				data.VCPUCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_ram_hourly_cost" {
+				data.RAMCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_gpu_hourly_cost" {
+				data.GPUCost = fmt.Sprintf("%f", avg)
+			}
+		}
+
+	}
+
+	return nodes, nil
+}
+
+func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
+	pw := os.Getenv(remotePW)
+	address := os.Getenv(sqlAddress)
+	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
+	db, err := sql.Open("postgres", connStr)
+	defer db.Close()
+	if err != nil {
+		return nil, err
+	}
+	nodes, err := getNodeCosts(db)
+	if err != nil {
+		return nil, err
+	}
+	model := make(map[string]*CostData)
+	query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='container_cpu_allocation') AND
+	  time > $2 AND time < $3 AND value != 'NaN'
+	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
+	ORDER BY container,bucket;
+	`
+	rows, err := db.Query(query, window, start, end)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var (
+			bucket    string
+			name      string
+			sum       float64
+			container string
+			pod       string
+			namespace string
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		layout := "2006-01-02T15:04:05Z"
+		t, err := time.Parse(layout, bucket)
+		if err != nil {
+			return nil, err
+		}
+
+		k := newContainerMetricFromValues(namespace, pod, container, instance)
+		key := k.Key()
+		allocationVector := &Vector{
+			Timestamp: float64(t.Unix()),
+			Value:     sum,
+		}
+		if data, ok := model[key]; ok {
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		} else {
+			node, ok := nodes[instance]
+			if !ok {
+				return nil, fmt.Errorf("No node found")
+			}
+			model[key] = &CostData{
+				Name:          container,
+				PodName:       pod,
+				NodeName:      instance,
+				NodeData:      node,
+				CPUAllocation: []*Vector{},
+				RAMAllocation: []*Vector{},
+				GPUReq:        []*Vector{},
+				Namespace:     namespace,
+				ClusterID:     clusterid,
+			}
+			data := model[key]
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		}
+	}
+	query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='container_memory_allocation_bytes') AND
+		time > $2 AND time < $3 AND value != 'NaN'
+	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
+	ORDER BY container,bucket;
+	`
+	rows, err = db.Query(query, window, start, end)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var (
+			bucket    string
+			name      string
+			sum       float64
+			container string
+			pod       string
+			namespace string
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		layout := "2006-01-02T15:04:05Z"
+		t, err := time.Parse(layout, bucket)
+		if err != nil {
+			return nil, err
+		}
+
+		k := newContainerMetricFromValues(namespace, pod, container, instance)
+		key := k.Key()
+		allocationVector := &Vector{
+			Timestamp: float64(t.Unix()),
+			Value:     sum,
+		}
+		if data, ok := model[key]; ok {
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		} else {
+			node, ok := nodes[instance]
+			if !ok {
+				return nil, fmt.Errorf("No node found")
+			}
+			model[key] = &CostData{
+				Name:          container,
+				PodName:       pod,
+				NodeName:      instance,
+				NodeData:      node,
+				CPUAllocation: []*Vector{},
+				RAMAllocation: []*Vector{},
+				GPUReq:        []*Vector{},
+				Namespace:     namespace,
+				ClusterID:     clusterid,
+			}
+			data := model[key]
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		}
+	}
+	return model, nil
+}

+ 1 - 0
go.sum

@@ -4,6 +4,7 @@ cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg=
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 contrib.go.opencensus.io/exporter/ocagent v0.5.0 h1:TKXjQSRS0/cCDrP7KvkgU6SmILtF/yV2TOs/02K/WZQ=
 contrib.go.opencensus.io/exporter/ocagent v0.5.0/go.mod h1:ImxhfLRpxoYiSq891pBrLVhN+qmP8BTVvdH2YLs7Gl0=
+git.apache.org/thrift.git v0.12.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
 github.com/Azure/azure-sdk-for-go v24.1.0+incompatible h1:P7GocB7bhkyGbRL1tCy0m9FDqb1V/dqssch3jZieUHk=
 github.com/Azure/azure-sdk-for-go v24.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
 github.com/Azure/go-autorest v11.1.0+incompatible h1:9DfMsQdUMEtg1jKRTjtkNZsvOuZXJOMl4dN1kiQwAc8=

+ 65 - 8
main.go

@@ -136,17 +136,24 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 	offset := r.URL.Query().Get("offset")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
+	aggregation := r.URL.Query().Get("aggregation")
+	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 
 	if offset != "" {
 		offset = "offset " + offset
 	}
 
 	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
-	if fields != "" {
-		filteredData := filterFields(fields, data)
-		w.Write(wrapData(filteredData, err))
+	if aggregation != "" {
+		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		w.Write(wrapData(agg, nil))
 	} else {
-		w.Write(wrapData(data, err))
+		if fields != "" {
+			filteredData := filterFields(fields, data)
+			w.Write(wrapData(filteredData, err))
+		} else {
+			w.Write(wrapData(data, err))
+		}
 	}
 }
 
@@ -182,6 +189,45 @@ func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request,
 	w.Write(wrapData(data, err))
 }
 
+func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	window := r.URL.Query().Get("window")
+	offset := r.URL.Query().Get("offset")
+	aggregation := r.URL.Query().Get("aggregation")
+	namespace := r.URL.Query().Get("namespace")
+	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
+
+	endTime := time.Now()
+	if offset != "" {
+		o, err := time.ParseDuration(offset)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+
+		endTime = endTime.Add(-1 * o)
+	}
+	d, err := time.ParseDuration(window)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+	startTime := endTime.Add(-1 * d)
+	layout := "2006-01-02T15:04:05.000Z"
+	start := startTime.Format(layout)
+	end := startTime.Format(layout)
+	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+	if aggregation != "" {
+		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		w.Write(wrapData(agg, nil))
+	}
+}
+
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -191,13 +237,23 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	window := r.URL.Query().Get("window")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
+	aggregation := r.URL.Query().Get("aggregation")
+	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 
 	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace)
-	if fields != "" {
-		filteredData := filterFields(fields, data)
-		w.Write(wrapData(filteredData, err))
+	if err != nil {
+		w.Write(wrapData(nil, err))
+	}
+	if aggregation != "" {
+		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		w.Write(wrapData(agg, nil))
 	} else {
-		w.Write(wrapData(data, err))
+		if fields != "" {
+			filteredData := filterFields(fields, data)
+			w.Write(wrapData(filteredData, err))
+		} else {
+			w.Write(wrapData(data, err))
+		}
 	}
 }
 
@@ -694,6 +750,7 @@ func main() {
 	router.GET("/managementPlatform", a.ManagementPlatform)
 	router.GET("/clusterInfo", a.ClusterInfo)
 	router.GET("/containerUptimes", a.ContainerUptimes)
+	router.GET("/aggregatedPrices", a.AggregateCostModel)
 
 	rootMux := http.NewServeMux()
 	rootMux.Handle("/", router)