Просмотр исходного кода

ClusterDisks: add activeMins to local disks

Niko Kovacevic 5 лет назад
Родитель
Сommit
7f28386c05
1 измененных файлов с 71 добавлено и 4 удалено
  1. 71 4
      pkg/costmodel/cluster.go

+ 71 - 4
pkg/costmodel/cluster.go

@@ -138,21 +138,25 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	ctx := prom.NewContext(client)
 	ctx := prom.NewContext(client)
 	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+
 	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
 	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
 	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-
-	// TODO niko/assets how to do "active minutes" here?
-	// queryActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 
 	resChPVCost := ctx.Query(queryPVCost)
 	resChPVCost := ctx.Query(queryPVCost)
 	resChPVSize := ctx.Query(queryPVSize)
 	resChPVSize := ctx.Query(queryPVSize)
+	resChActiveMins := ctx.Query(queryActiveMins)
 	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
 	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
 	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
 	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
+	resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
 
 
 	resPVCost, _ := resChPVCost.Await()
 	resPVCost, _ := resChPVCost.Await()
 	resPVSize, _ := resChPVSize.Await()
 	resPVSize, _ := resChPVSize.Await()
+	resActiveMins, _ := resChActiveMins.Await()
 	resLocalStorageCost, _ := resChLocalStorageCost.Await()
 	resLocalStorageCost, _ := resChLocalStorageCost.Await()
 	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
 	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
+	resLocalActiveMins, _ := resChLocalActiveMins.Await()
 	if ctx.ErrorCollector.IsError() {
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 		return nil, ctx.Errors()
 	}
 	}
@@ -261,6 +265,70 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		diskMap[key].Bytes = bytes
 		diskMap[key].Bytes = bytes
 	}
 	}
 
 
+	for _, result := range resActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("persistentvolume")
+		if err != nil {
+			log.Warningf("ClusterDisks: active mins missing instance")
+			continue
+		}
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			log.Warningf("ClusterDisks: active mins for unidentified disk")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		diskMap[key].Start = s
+		diskMap[key].Minutes = mins
+	}
+
+	for _, result := range resLocalActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterDisks: local active mins data missing instance")
+			continue
+		}
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			log.Warningf("ClusterDisks: local active mins for unidentified disk")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		diskMap[key].Start = s
+		diskMap[key].Minutes = mins
+	}
+
 	return diskMap, nil
 	return diskMap, nil
 }
 }
 
 
@@ -597,7 +665,6 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
 
 
-		log.Infof("ClusterNodes: %s: %f", key, mins)
 		nodeMap[key].Start = s
 		nodeMap[key].Start = s
 		nodeMap[key].Minutes = mins
 		nodeMap[key].Minutes = mins
 	}
 	}