Explorar el Código

Merge pull request #167 from kubecost/AjayTripathy-pv

add pv cost
Ajay Tripathy hace 6 años
padre
commit
cfc5d656a6
Se han modificado 3 ficheros con 125 adiciones y 9 borrados
  1. 3 3
      cloud/gcpprovider.go
  2. 99 1
      costmodel/sql.go
  3. 23 5
      main.go

+ 3 - 3
cloud/gcpprovider.go

@@ -86,12 +86,12 @@ func (gcp *GCP) GetLocalStorageQuery() (string, error) {
 
 func (gcp *GCP) GetConfig() (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("gcp.json")
-	if c.Discount == "" {
-		c.Discount = "30%"
-	}
 	if err != nil {
 		return nil, err
 	}
+	if c.Discount == "" {
+		c.Discount = "30%"
+	}
 	return c, nil
 }
 

+ 99 - 1
costmodel/sql.go

@@ -16,6 +16,34 @@ import (
 const remotePW = "REMOTE_WRITE_PASSWORD"
 const sqlAddress = "SQL_ADDRESS"
 
+func getPVCosts(db *sql.DB) (map[string]*costAnalyzerCloud.PV, error) {
+	pvs := make(map[string]*costAnalyzerCloud.PV)
+	query := `SELECT name, avg(value),labels->>'volumename' AS volumename, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='pv_hourly_cost')  AND value != 'NaN' AND value != 0
+	GROUP BY volumename,name,clusterid;`
+	rows, err := db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var (
+			name       string
+			avg        float64
+			volumename string
+			clusterid  string
+		)
+		if err := rows.Scan(&name, &avg, &volumename, &clusterid); err != nil {
+			return nil, err
+		}
+		pvs[volumename] = &costAnalyzerCloud.PV{
+			Cost: fmt.Sprintf("%f", avg),
+		}
+	}
+	return pvs, nil
+}
+
 func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
 
 	nodes := make(map[string]*costAnalyzerCloud.Node)
@@ -252,7 +280,6 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 			}
 		}
 
-		klog.Infof("%#v\n", result)
 		var dat map[string]string
 		err := json.Unmarshal([]byte(result[4]), &dat)
 		if err != nil {
@@ -274,5 +301,76 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 		}
 	}
 
+	volumes, err := getPVCosts(db)
+	if err != nil {
+		klog.Infof("Error fetching pv data from sql: %s. Skipping PVData", err.Error())
+	} else {
+		query = `SELECT time_bucket($1, time) AS bucket, name, avg(value), labels->>'persistentvolumeclaim' AS claim, labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'persistentvolume' AS volumename, labels->>'cluster_id' AS clusterid
+		FROM metrics
+		WHERE (name='pod_pvc_allocation') AND
+			time > $2 AND time < $3 AND value != 'NaN'
+		GROUP BY claim,pod,bucket,namespace,volumename,clusterid,name
+		ORDER BY pod,bucket;`
+
+		rows, err = db.Query(query, window, start, end)
+		if err != nil {
+			return nil, err
+		}
+		pvcData := make(map[string]*PersistentVolumeClaimData)
+		for rows.Next() {
+			var (
+				bucket     string
+				name       string
+				sum        float64
+				claim      string
+				pod        string
+				namespace  string
+				volumename sql.NullString
+				clusterid  string
+			)
+			if err := rows.Scan(&bucket, &name, &sum, &claim, &pod, &namespace, &volumename, &clusterid); err != nil {
+				return nil, err
+			}
+			layout := "2006-01-02T15:04:05Z"
+			t, err := time.Parse(layout, bucket)
+			if err != nil {
+				return nil, err
+			}
+			allocationVector := &Vector{
+				Timestamp: float64(t.Unix()),
+				Value:     sum,
+			}
+			if pvcd, ok := pvcData[claim]; ok {
+				pvcd.Values = append(pvcd.Values, allocationVector)
+			} else {
+				if volumename.Valid {
+					vname := volumename.String
+					d := &PersistentVolumeClaimData{
+						Namespace:  namespace,
+						VolumeName: vname,
+						Claim:      claim,
+					}
+					if volume, ok := volumes[vname]; ok {
+						volume.Size = fmt.Sprintf("%f", sum) // Just assume the claim is the whole volume for now
+						d.Volume = volume
+					}
+					d.Values = append(d.Values, allocationVector)
+					pvcData[claim] = d
+					for _, cd := range model { // TODO: make this not doubly nested
+						if cd.PodName == pod && cd.Namespace == namespace {
+							if len(cd.PVCData) > 0 {
+								cd.PVCData = append(cd.PVCData, d)
+							} else {
+								cd.PVCData = []*PersistentVolumeClaimData{d}
+							}
+							break // break so we only assign to the first
+						}
+					}
+				}
+
+			}
+		}
+	}
+
 	return model, nil
 }

+ 23 - 5
main.go

@@ -53,6 +53,7 @@ type Accesses struct {
 	RAMAllocationRecorder         *prometheus.GaugeVec
 	CPUAllocationRecorder         *prometheus.GaugeVec
 	GPUAllocationRecorder         *prometheus.GaugeVec
+	PVAllocationRecorder          *prometheus.GaugeVec
 	ContainerUptimeRecorder       *prometheus.GaugeVec
 	NetworkZoneEgressRecorder     prometheus.Gauge
 	NetworkRegionEgressRecorder   prometheus.Gauge
@@ -470,6 +471,7 @@ func (a *Accesses) recordPrices() {
 		containerSeen := make(map[string]bool)
 		nodeSeen := make(map[string]bool)
 		pvSeen := make(map[string]bool)
+		pvcSeen := make(map[string]bool)
 
 		getKeyFromLabelStrings := func(labels ...string) string {
 			return strings.Join(labels, ",")
@@ -519,11 +521,16 @@ func (a *Accesses) recordPrices() {
 
 				totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
 
+				namespace := costs.Namespace
+				podName := costs.PodName
+				containerName := costs.Name
+
 				if costs.PVCData != nil {
 					for _, pvc := range costs.PVCData {
 						if pvc.Volume != nil {
-							pvCost, _ := strconv.ParseFloat(pvc.Volume.Cost, 64)
-							a.PersistentVolumePriceRecorder.WithLabelValues(pvc.VolumeName, pvc.VolumeName).Set(pvCost)
+							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
+							labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
+							pvcSeen[labelKey] = true
 						}
 					}
 				}
@@ -535,9 +542,6 @@ func (a *Accesses) recordPrices() {
 				labelKey := getKeyFromLabelStrings(nodeName, nodeName)
 				nodeSeen[labelKey] = true
 
-				namespace := costs.Namespace
-				podName := costs.PodName
-				containerName := costs.Name
 				if len(costs.RAMAllocation) > 0 {
 					a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
 				}
@@ -620,6 +624,14 @@ func (a *Accesses) recordPrices() {
 				}
 				pvSeen[labelString] = false
 			}
+			for labelString, seen := range pvcSeen {
+				if !seen {
+					labels := getLabelStringsFromKey(labelString)
+					a.PVAllocationRecorder.DeleteLabelValues(labels...)
+					delete(pvcSeen, labelString)
+				}
+				pvcSeen[labelString] = false
+			}
 			time.Sleep(time.Minute)
 		}
 	}()
@@ -719,6 +731,10 @@ func main() {
 		Name: "container_gpu_allocation",
 		Help: "container_gpu_allocation GPU used",
 	}, []string{"namespace", "pod", "container", "instance", "node"})
+	PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Name: "pod_pvc_allocation",
+		Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
+	}, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
 
 	ContainerUptimeRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "container_uptime_seconds",
@@ -746,6 +762,7 @@ func main() {
 	prometheus.MustRegister(RAMAllocation)
 	prometheus.MustRegister(CPUAllocation)
 	prometheus.MustRegister(ContainerUptimeRecorder)
+	prometheus.MustRegister(PVAllocation)
 	prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
 	prometheus.MustRegister(costModel.ServiceCollector{
 		KubeClientSet: kubeClientset,
@@ -765,6 +782,7 @@ func main() {
 		RAMAllocationRecorder:         RAMAllocation,
 		CPUAllocationRecorder:         CPUAllocation,
 		GPUAllocationRecorder:         GPUAllocation,
+		PVAllocationRecorder:          PVAllocation,
 		ContainerUptimeRecorder:       ContainerUptimeRecorder,
 		NetworkZoneEgressRecorder:     NetworkZoneEgressRecorder,
 		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,