AjayTripathy 6 лет назад
Родитель
Сommit
0e1e264ba1
2 измененных файлов с 101 добавлено и 2 удалено
  1. 99 0
      costmodel/sql.go
  2. 2 2
      main.go

+ 99 - 0
costmodel/sql.go

@@ -16,6 +16,34 @@ import (
 const remotePW = "REMOTE_WRITE_PASSWORD"
 const sqlAddress = "SQL_ADDRESS"
 
+func getPVCosts(db *sql.DB) (map[string]*costAnalyzerCloud.PV, error) {
+	pvs := make(map[string]*costAnalyzerCloud.PV)
+	query := `SELECT name, avg(value),labels->>'volumename' AS volumename, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='pv_hourly_cost')  AND value != 'NaN' AND value != 0
+	GROUP BY volumename,name,clusterid;`
+	rows, err := db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var (
+			name       string
+			avg        float64
+			volumename string
+			clusterid  string
+		)
+		if err := rows.Scan(&name, &avg, &volumename, &clusterid); err != nil {
+			return nil, err
+		}
+		pvs[volumename] = &costAnalyzerCloud.PV{
+			Cost: fmt.Sprintf("%f", avg),
+		}
+	}
+	return pvs, nil
+}
+
 func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
 
 	nodes := make(map[string]*costAnalyzerCloud.Node)
@@ -274,5 +302,76 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 		}
 	}
 
+	volumes, err := getPVCosts(db)
+	if err != nil {
+		klog.Infof("Error fetching pv data from sql: %s. Skipping PVData", err.Error())
+	} else {
+		query = `SELECT time_bucket($1, time) AS bucket, name, avg(value), labels->>'persistentvolumeclaim' AS claim, labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'persistentvolume' AS volumename, labels->>'cluster_id' AS clusterid
+		FROM metrics
+		WHERE (name='pod_pvc_allocation') AND
+			time > $2 AND time < $3 AND value != 'NaN'
+		GROUP BY claim,pod,bucket,namespace,volumename,clusterid,name
+		ORDER BY pod,bucket;`
+
+		rows, err = db.Query(query, window, start, end)
+		if err != nil {
+			return nil, err
+		}
+		pvcData := make(map[string]*PersistentVolumeClaimData)
+		for rows.Next() {
+			var (
+				bucket     string
+				name       string
+				sum        float64
+				claim      string
+				pod        string
+				namespace  string
+				volumename sql.NullString
+				clusterid  string
+			)
+			if err := rows.Scan(&bucket, &name, &sum, &claim, &pod, &namespace, &volumename, &clusterid); err != nil {
+				return nil, err
+			}
+			layout := "2006-01-02T15:04:05Z"
+			t, err := time.Parse(layout, bucket)
+			if err != nil {
+				return nil, err
+			}
+			allocationVector := &Vector{
+				Timestamp: float64(t.Unix()),
+				Value:     sum,
+			}
+			if pvcd, ok := pvcData[claim]; ok {
+				pvcd.Values = append(pvcd.Values, allocationVector)
+			} else {
+				vname := ""
+				if volumename.Valid {
+					vname = volumename.String
+				}
+				d := &PersistentVolumeClaimData{
+					Namespace:  namespace,
+					VolumeName: vname,
+					Claim:      claim,
+				}
+				if volume, ok := volumes[vname]; ok {
+					volume.Size = fmt.Sprintf("%f", sum) // Just assume the claim is the whole volume for now
+					d.Volume = volume
+				}
+				d.Values = append(d.Values, allocationVector)
+				pvcData[claim] = d
+				for _, cd := range model { // TODO: make this not doubly nested
+					if cd.PodName == pod && cd.Namespace == namespace {
+						if len(cd.PVCData) > 0 {
+							cd.PVCData = append(cd.PVCData, d)
+						} else {
+							cd.PVCData = []*PersistentVolumeClaimData{d}
+						}
+						break // break so we only assign to the first
+					}
+				}
+			}
+		}
+	}
+
 	return model, nil
 }

+ 2 - 2
main.go

@@ -515,7 +515,7 @@ func (a *Accesses) recordPrices() {
 						if pvc.Volume != nil {
 							pvCost, _ := strconv.ParseFloat(pvc.Volume.Cost, 64)
 							a.PersistentVolumePriceRecorder.WithLabelValues(pvc.VolumeName, pvc.VolumeName).Set(pvCost)
-							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim).Set(pvc.Values[0].Value)
+							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
 						}
 					}
 				}
@@ -711,7 +711,7 @@ func main() {
 	PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "pod_pvc_allocation",
 		Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
-	}, []string{"namespace", "pod", "persistentvolumeclaim"})
+	}, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
 
 	ContainerUptimeRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "container_uptime_seconds",