Przeglądaj źródła

Merge pull request #26 from kubecost/AjayTripathy-fix-normalization-logging

defensive programming. fixed #23
Ajay Tripathy 7 lat temu
rodzic
commit
4bbb46f7de
1 zmienionych plików z 131 dodań i 38 usunięć
  1. 131 38
      costmodel/costmodel.go

+ 131 - 38
costmodel/costmodel.go

@@ -13,7 +13,7 @@ import (
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	prometheusClient "github.com/prometheus/client_golang/api"
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/kubernetes"
@@ -72,15 +72,39 @@ func ComputeCostData(cli prometheusClient.Client, clientset *kubernetes.Clientse
 	queryGPURequests := `avg(label_replace(label_replace(avg((count_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD"}[` + window + `]) *  avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD"}[` + window + `]))) by (namespace,container,pod) , "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)") ) by (namespace,container_name, pod_name)`
 	queryPVRequests := `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass) + on (persistentvolumeclaim) group_right(storageclass) sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace))`
 	normalization := `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[` + window + `]))`
-	resultRAMRequests, _ := query(cli, queryRAMRequests)
-	resultRAMUsage, _ := query(cli, queryRAMUsage)
-	resultCPURequests, _ := query(cli, queryCPURequests)
-	resultCPUUsage, _ := query(cli, queryCPUUsage)
-	resultGPURequests, _ := query(cli, queryGPURequests)
-	resultPVRequests, _ := query(cli, queryPVRequests)
-	normalizationResult, _ := query(cli, normalization)
+	resultRAMRequests, err := query(cli, queryRAMRequests)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching RAM requests: " + err.Error())
+	}
+	resultRAMUsage, err := query(cli, queryRAMUsage)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching RAM usage: " + err.Error())
+	}
+	resultCPURequests, err := query(cli, queryCPURequests)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching CPU requests: " + err.Error())
+	}
+	resultCPUUsage, err := query(cli, queryCPUUsage)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching CPUUsage requests: " + err.Error())
+	}
+	resultGPURequests, err := query(cli, queryGPURequests)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching GPU requests: " + err.Error())
+	}
+	resultPVRequests, err := query(cli, queryPVRequests)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching PV requests: " + err.Error())
+	}
+	normalizationResult, err := query(cli, normalization)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching normalization data: " + err.Error())
+	}
 
-	normalizationValue := getNormalization(normalizationResult)
+	normalizationValue, err := getNormalization(normalizationResult)
+	if err != nil {
+		return nil, err
+	}
 
 	nodes, err := getNodeCost(clientset, cloud)
 	if err != nil {
@@ -103,7 +127,7 @@ func ComputeCostData(cli prometheusClient.Client, clientset *kubernetes.Clientse
 		return nil, err
 	}
 
-	pvClaimMapping := getPVInfoVector(resultPVRequests)
+	pvClaimMapping, err := getPVInfoVector(resultPVRequests)
 	if err != nil {
 		return nil, err
 	}
@@ -381,16 +405,39 @@ func ComputeCostDataRange(cli prometheusClient.Client, clientset *kubernetes.Cli
 		log.Printf("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
-	resultRAMRequests, _ := queryRange(cli, queryRAMRequests, start, end, window)
-	resultRAMUsage, _ := queryRange(cli, queryRAMUsage, start, end, window)
-	resultCPURequests, _ := queryRange(cli, queryCPURequests, start, end, window)
-	resultCPUUsage, _ := queryRange(cli, queryCPUUsage, start, end, window)
-	resultGPURequests, _ := queryRange(cli, queryGPURequests, start, end, window)
-	resultPVRequests, _ := queryRange(cli, queryPVRequests, start, end, window)
-
-	normalizationResult, _ := query(cli, normalization)
+	resultRAMRequests, err := queryRange(cli, queryRAMRequests, start, end, window)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching RAM requests: " + err.Error())
+	}
+	resultRAMUsage, err := queryRange(cli, queryRAMUsage, start, end, window)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching RAM usage: " + err.Error())
+	}
+	resultCPURequests, err := queryRange(cli, queryCPURequests, start, end, window)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching CPU requests: " + err.Error())
+	}
+	resultCPUUsage, err := queryRange(cli, queryCPUUsage, start, end, window)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching CPU usage: " + err.Error())
+	}
+	resultGPURequests, err := queryRange(cli, queryGPURequests, start, end, window)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching GPU requests: " + err.Error())
+	}
+	resultPVRequests, err := queryRange(cli, queryPVRequests, start, end, window)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching PV requests: " + err.Error())
+	}
+	normalizationResult, err := query(cli, normalization)
+	if err != nil {
+		return nil, fmt.Errorf("Error fetching normalization data: " + err.Error())
+	}
 
-	normalizationValue := getNormalization(normalizationResult)
+	normalizationValue, err := getNormalization(normalizationResult)
+	if err != nil {
+		return nil, err
+	}
 
 	nodes, err := getNodeCost(clientset, cloud)
 	if err != nil {
@@ -412,7 +459,7 @@ func ComputeCostDataRange(cli prometheusClient.Client, clientset *kubernetes.Cli
 		return nil, err
 	}
 
-	pvClaimMapping := getPVInfoVectors(resultPVRequests)
+	pvClaimMapping, err := getPVInfoVectors(resultPVRequests)
 	if err != nil {
 		return nil, err
 	}
@@ -546,19 +593,35 @@ type PersistentVolumeData struct {
 	Values    []*Vector `json:"values"`
 }
 
-func getPVInfoVectors(qr interface{}) map[string]*PersistentVolumeData {
+func getPVInfoVectors(qr interface{}) (map[string]*PersistentVolumeData, error) {
 	pvmap := make(map[string]*PersistentVolumeData)
 	for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
-		pvclaim := val.(map[string]interface{})["metric"].(map[string]interface{})["persistentvolumeclaim"]
-		pvclass := val.(map[string]interface{})["metric"].(map[string]interface{})["storageclass"]
-		pvnamespace := val.(map[string]interface{})["metric"].(map[string]interface{})["namespace"]
-		values := val.(map[string]interface{})["values"].([]interface{})
+		metricInterface, ok := val.(map[string]interface{})["metric"]
+		if !ok {
+			return nil, fmt.Errorf("Metric field does not exist in data result vector")
+		}
+		metricMap, ok := metricInterface.(map[string]interface{})
+		if !ok {
+			return nil, fmt.Errorf("Metric field is improperly formatted")
+		}
+		pvclaim := metricMap["persistentvolumeclaim"]
+		pvclass := metricMap["storageclass"]
+		pvnamespace := metricMap["namespace"]
+		values, ok := val.(map[string]interface{})["values"].([]interface{})
+		if !ok {
+			return nil, fmt.Errorf("Values field is improperly formatted")
+		}
 		var vectors []*Vector
 		for _, value := range values {
-			strVal := value.([]interface{})[1].(string)
+			dataPoint, ok := value.([]interface{})
+			if !ok || len(dataPoint) != 2 {
+				return nil, fmt.Errorf("Value field is improperly formatted")
+			}
+
+			strVal := dataPoint[1].(string)
 			v, _ := strconv.ParseFloat(strVal, 64)
 			vectors = append(vectors, &Vector{
-				Timestamp: value.([]interface{})[0].(float64),
+				Timestamp: dataPoint[0].(float64),
 				Value:     v,
 			})
 		}
@@ -570,17 +633,32 @@ func getPVInfoVectors(qr interface{}) map[string]*PersistentVolumeData {
 			Values:    vectors,
 		}
 	}
-	return pvmap
+	return pvmap, nil
 }
 
-func getPVInfoVector(qr interface{}) map[string]*PersistentVolumeData {
+func getPVInfoVector(qr interface{}) (map[string]*PersistentVolumeData, error) {
 	pvmap := make(map[string]*PersistentVolumeData)
 	log.Printf("Interface %v. If the interface is nil, prometheus is not running!", qr)
 	for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
-		pvclaim := val.(map[string]interface{})["metric"].(map[string]interface{})["persistentvolumeclaim"]
-		pvclass := val.(map[string]interface{})["metric"].(map[string]interface{})["storageclass"]
-		pvnamespace := val.(map[string]interface{})["metric"].(map[string]interface{})["namespace"]
-		value := val.(map[string]interface{})["value"].([]interface{})
+		metricInterface, ok := val.(map[string]interface{})["metric"]
+		if !ok {
+			return nil, fmt.Errorf("Metric field does not exist in data result vector")
+		}
+		metricMap, ok := metricInterface.(map[string]interface{})
+		if !ok {
+			return nil, fmt.Errorf("Metric field is improperly formatted")
+		}
+		pvclaim := metricMap["persistentvolumeclaim"]
+		pvclass := metricMap["storageclass"]
+		pvnamespace := metricMap["namespace"]
+		dataPoint, ok := val.(map[string]interface{})["value"]
+		if !ok {
+			return nil, fmt.Errorf("Value field does not exist in data result vector")
+		}
+		value, ok := dataPoint.([]interface{})
+		if !ok || len(value) != 2 {
+			return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		}
 		var vectors []*Vector
 		strVal := value[1].(string)
 		v, _ := strconv.ParseFloat(strVal, 64)
@@ -598,7 +676,7 @@ func getPVInfoVector(qr interface{}) map[string]*PersistentVolumeData {
 			Values:    vectors,
 		}
 	}
-	return pvmap
+	return pvmap, nil
 }
 
 func queryRange(cli prometheusClient.Client, query string, start, end time.Time, step time.Duration) (interface{}, error) {
@@ -654,10 +732,25 @@ func query(cli prometheusClient.Client, query string) (interface{}, error) {
 }
 
 //todo: don't cast, implement unmarshaler interface
-func getNormalization(qr interface{}) float64 {
-	strNorm := qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{})[0].(map[string]interface{})["value"].([]interface{})[1].(string)
-	val, _ := strconv.ParseFloat(strNorm, 64)
-	return val
+func getNormalization(qr interface{}) (float64, error) {
+	data, ok := qr.(map[string]interface{})["data"]
+	if !ok {
+		return 0, fmt.Errorf("Data field not found in normalization response, aborting")
+	}
+	results, ok := data.(map[string]interface{})["result"].([]interface{})
+	if !ok {
+		return 0, fmt.Errorf("Result field not found in normalization response, aborting")
+	}
+	if len(results) > 0 {
+		dataPoint := results[0].(map[string]interface{})["value"].([]interface{})
+		if len(dataPoint) == 2 {
+			strNorm := dataPoint[1].(string)
+			val, _ := strconv.ParseFloat(strNorm, 64)
+			return val, nil
+		}
+		return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+	}
+	return 0, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
 }
 
 //todo: don't cast, implement unmarshaler interface...