Explorar el Código

WIP CostModel.ComputeAllocation queries

Niko Kovacevic hace 5 años
padre
commit
bf3fed72b3
Se han modificado 1 ficheros con 92 adiciones y 48 borrados
  1. 92 48
      pkg/costmodel/allocation.go

+ 92 - 48
pkg/costmodel/allocation.go

@@ -125,12 +125,12 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 	`, durStr, offStr)
 	resChPVCAllocation := ctx.Query(queryPVCAllocation)
 
-	queryPVBytesRequested := fmt.Sprintf(`
+	queryPVCBytesRequested := fmt.Sprintf(`
 		avg(
 			avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)
 		) by (persistentvolumeclaim, namespace, cluster_id)
 	`, durStr, offStr)
-	resChPVBytesRequested := ctx.Query(queryPVBytesRequested)
+	resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
 
 	queryPVCostPerGiBHour := fmt.Sprintf(`
 		avg(
@@ -199,7 +199,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 	resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
 	resGPUsRequested, _ := resChGPUsRequested.Await()
 	resPVCAllocation, _ := resChPVCAllocation.Await()
-	resPVBytesRequested, _ := resChPVBytesRequested.Await()
+	resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
 	resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
 	resPVCInfo, _ := resChPVCInfo.Await()
 
@@ -225,17 +225,21 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 			continue
 		}
 
-		values, err := res.GetStrings("cluster_id", "kubernetes_node", "namespace", "pod", "container")
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		labels, err := res.GetStrings("kubernetes_node", "namespace", "pod", "container")
 		if err != nil {
 			log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
 			continue
 		}
 
-		cluster := values["cluster_id"]
-		node := values["kubernetes_node"]
-		namespace := values["namespace"]
-		pod := values["pod"]
-		container := values["container"]
+		node := labels["kubernetes_node"]
+		namespace := labels["namespace"]
+		pod := labels["pod"]
+		container := labels["container"]
 
 		containerKey := newContainerKey(cluster, namespace, pod, container)
 		podKey := newPodKey(cluster, namespace, pod)
@@ -293,7 +297,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 
 	for _, res := range resCPUCoresAllocated {
 		// TODO niko/cdmr do we need node here?
-		key, err := resultContainerKey(res, "cluster", "namespace", "pod", "container")
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod", "container")
 		if err != nil {
 			log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing field: %s", err)
 			continue
@@ -312,7 +316,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 
 	for _, res := range resRAMBytesAllocated {
 		// TODO niko/cdmr do we need node here?
-		key, err := resultContainerKey(res, "cluster", "namespace", "pod", "container")
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod", "container")
 		if err != nil {
 			log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing field: %s", err)
 			continue
@@ -331,7 +335,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 
 	for _, res := range resGPUsRequested {
 		// TODO niko/cdmr do we need node here?
-		key, err := resultContainerKey(res, "cluster", "namespace", "pod", "container")
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod", "container")
 		if err != nil {
 			log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing field: %s", err)
 			continue
@@ -350,45 +354,49 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 	// TODO niko/cdmr comment
 	pvMap := map[pvKey]*PV{}
 
-	for _, res := range resPVBytesRequested {
+	for _, res := range resPVCostPerGiBHour {
 		// TODO niko/cdmr double-check "persistentvolume" vs "volumename"
 		key, err := resultPVKey(res, "cluster_id", "persistentvolume")
 		if err != nil {
-			log.Warningf("CostModel.ComputeAllocation: PV bytes requested query result missing field: %s", err)
+			log.Warningf("CostModel.ComputeAllocation: PV cost per byte*hr query result missing field: %s", err)
 			continue
 		}
 
-		// TODO niko/cdmr double-check "persistentvolume" vs "volumename"
-		name, err := res.GetString("persistentvolume")
-		if err != nil {
-			log.Warningf("CostModel.ComputeAllocation: PV bytes requested query result missing field: %s", err)
+		if _, ok := pvMap[key]; !ok {
+			log.Warningf("CostModel.ComputeAllocation: PV cost per byte*hr for unidentified PV: %s", key)
 			continue
 		}
 
-		pvMap[key] = &PV{
-			Bytes: res.Values[0].Value,
-			Name:  name,
-		}
+		pvMap[key].CostPerGiBHour = res.Values[0].Value
 	}
 
-	for _, res := range resPVCostPerGiBHour {
-		// TODO niko/cdmr double-check "persistentvolume" vs "volumename"
-		key, err := resultPVKey(res, "cluster_id", "persistentvolume")
+	// TODO niko/cdmr comment
+	// pvcMap := map[pvcKey]*PVC{}
+	for _, res := range resPVCBytesRequested {
+		key, err := resultPVCKey(res, "cluster_id", "persistentvolumeclaim")
 		if err != nil {
-			log.Warningf("CostModel.ComputeAllocation: PV cost per byte*hr query result missing field: %s", err)
-			continue
-		}
-
-		if _, ok := pvMap[key]; !ok {
-			log.Warningf("CostModel.ComputeAllocation: PV cost per byte*hr for unidentified PV: %s", key)
+			log.Warningf("CostModel.ComputeAllocation: PV bytes requested query result missing field: %s", err)
 			continue
 		}
 
-		pvMap[key].CostPerGiBHour = res.Values[0].Value
+		// TODO niko/cdmr double-check "persistentvolume" vs "volumename"
+		// pvc, err := res.GetString("persistentvolumeclaim")
+		// if err != nil {
+		// 	log.Warningf("CostModel.ComputeAllocation: PV bytes requested query result missing field: %s", err)
+		// 	continue
+		// }
+
+		log.Infof("CostModel.ComputeAllocation: PVC: %s %fGiB", key, res.Values[0].Value/1024/1024/1024)
+
+		// TODO niko/cdmr
+		// pvcMap[key] = &PVC{
+		// 	Bytes: res.Values[0].Value,
+		// 	Name:  pvc,
+		// }
 	}
 
 	// TODO niko/cdmr comment
-	pvcMap := map[podKey][]*PVC{}
+	podPVCMap := map[podKey][]*PVC{}
 
 	for _, res := range resPVCAllocation {
 		values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace", "cluster_id")
@@ -411,11 +419,11 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 			continue
 		}
 
-		if _, ok := pvcMap[podKey]; !ok {
-			pvcMap[podKey] = []*PVC{}
+		if _, ok := podPVCMap[podKey]; !ok {
+			podPVCMap[podKey] = []*PVC{}
 		}
 
-		pvcMap[podKey] = append(pvcMap[podKey], &PVC{
+		podPVCMap[podKey] = append(podPVCMap[podKey], &PVC{
 			Bytes:  res.Values[0].Value,
 			Count:  podCount[podKey],
 			Name:   name,
@@ -451,8 +459,10 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 		log.Infof("CostModel.ComputeAllocation: PV: %v", pv)
 	}
 
-	for _, pvc := range pvcMap {
-		log.Infof("CostModel.ComputeAllocation: PVC: %v", pvc)
+	for pod, pvcs := range podPVCMap {
+		for _, pvc := range pvcs {
+			log.Infof("CostModel.ComputeAllocation: Pod %s: PVC: %v", pod, pvc)
+		}
 	}
 
 	for _, alloc := range allocMap {
@@ -463,7 +473,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 		pod, _ := alloc.Properties.GetPod()
 		podKey := newPodKey(cluster, namespace, pod)
 
-		if pvcs, ok := pvcMap[podKey]; ok {
+		if pvcs, ok := podPVCMap[podKey]; ok {
 			for _, pvc := range pvcs {
 				// TODO niko/cdmr this isn't quite right... use PVC info query?
 				hrs := alloc.Minutes() / 60.0
@@ -591,23 +601,57 @@ func resultPodKey(res *prom.QueryResult, clusterLabel, namespaceLabel, podLabel
 	return key, nil
 }
 
+type pvcKey struct {
+	Cluster               string
+	PersistentVolumeClaim string
+}
+
+func (k pvcKey) String() string {
+	return fmt.Sprintf("%s/%s", k.Cluster, k.PersistentVolumeClaim)
+}
+
+func newPVCKey(cluster, persistentVolumeClaim string) pvcKey {
+	return pvcKey{
+		Cluster:               cluster,
+		PersistentVolumeClaim: persistentVolumeClaim,
+	}
+}
+
+func resultPVCKey(res *prom.QueryResult, clusterLabel, pvcLabel string) (pvcKey, error) {
+	key := pvcKey{}
+
+	cluster, err := res.GetString(clusterLabel)
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+	key.Cluster = cluster
+
+	pvc, err := res.GetString(pvcLabel)
+	if err != nil {
+		return key, err
+	}
+	key.PersistentVolumeClaim = pvc
+
+	return key, nil
+}
+
 type pvKey struct {
-	Cluster string
-	Name    string
+	Cluster          string
+	PersistentVolume string
 }
 
 func (k pvKey) String() string {
-	return fmt.Sprintf("%s/%s", k.Cluster, k.Name)
+	return fmt.Sprintf("%s/%s", k.Cluster, k.PersistentVolume)
 }
 
-func newPVKey(cluster, name string) pvKey {
+func newPVKey(cluster, persistentVolume string) pvKey {
 	return pvKey{
-		Cluster: cluster,
-		Name:    name,
+		Cluster:          cluster,
+		PersistentVolume: persistentVolume,
 	}
 }
 
-func resultPVKey(res *prom.QueryResult, clusterLabel, nameLabel string) (pvKey, error) {
+func resultPVKey(res *prom.QueryResult, clusterLabel, persistentVolumeLabel string) (pvKey, error) {
 	key := pvKey{}
 
 	cluster, err := res.GetString(clusterLabel)
@@ -616,11 +660,11 @@ func resultPVKey(res *prom.QueryResult, clusterLabel, nameLabel string) (pvKey,
 	}
 	key.Cluster = cluster
 
-	name, err := res.GetString(nameLabel)
+	persistentVolume, err := res.GetString(persistentVolumeLabel)
 	if err != nil {
 		return key, err
 	}
-	key.Name = name
+	key.PersistentVolume = persistentVolume
 
 	return key, nil
 }