Sfoglia il codice sorgente

Cleanup. Move existing code to utility methods.

Matt Bolt 6 anni fa
parent
commit
236e75d49f
3 ha cambiato i file con 102 aggiunte e 413 eliminazioni
  1. 43 349
      costmodel/costmodel.go
  2. 50 0
      costmodel/promparsers.go
  3. 9 64
      costmodel/vector.go

+ 43 - 349
costmodel/costmodel.go

@@ -7,7 +7,6 @@ import (
 	"math"
 	"net/http"
 	"os"
-	"sort"
 	"strconv"
 	"strings"
 	"sync"
@@ -410,7 +409,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		return nil, err
 	}
 
-	pvClaimMapping, err := getPVInfoVector(resultPVRequests, clusterID)
+	pvClaimMapping, err := GetPVInfo(resultPVRequests, clusterID)
 	if err != nil {
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
@@ -845,66 +844,20 @@ func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*c
 }
 
 func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
-	if req == nil || len(req) == 0 {
-		for _, usedV := range used {
-			if usedV.Timestamp == 0 {
-				continue
-			}
-			usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
-		}
-		return used
-	}
-	if used == nil || len(used) == 0 {
-		for _, reqV := range req {
-			if reqV.Timestamp == 0 {
-				continue
-			}
-			reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
-		}
-		return req
-	}
-	var allocation []*Vector
-
-	var timestamps []float64
-	reqMap := make(map[float64]float64)
-	for _, reqV := range req {
-		if reqV.Timestamp == 0 {
-			continue
-		}
-		reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
-		reqMap[reqV.Timestamp] = reqV.Value
-		timestamps = append(timestamps, reqV.Timestamp)
-	}
-	usedMap := make(map[float64]float64)
-	for _, usedV := range used {
-		if usedV.Timestamp == 0 {
-			continue
-		}
-		usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
-		usedMap[usedV.Timestamp] = usedV.Value
-		if _, ok := reqMap[usedV.Timestamp]; !ok { // no need to double add, since we'll range over sorted timestamps and check.
-			timestamps = append(timestamps, usedV.Timestamp)
+	// The result of the normalize operation will be a new []*Vector to replace the requests
+	allocationOp := func(r *Vector, x *float64, y *float64) bool {
+		if x != nil && y != nil {
+			r.Value = math.Max(*x, *y)
+		} else if x != nil {
+			r.Value = *x
+		} else if y != nil {
+			r.Value = *y
 		}
-	}
 
-	sort.Float64s(timestamps)
-	for _, t := range timestamps {
-		rv, okR := reqMap[t]
-		uv, okU := usedMap[t]
-		allocationVector := &Vector{
-			Timestamp: t,
-		}
-		if okR && okU {
-			allocationVector.Value = math.Max(rv, uv)
-		} else if okR {
-			allocationVector.Value = rv
-		} else if okU {
-			allocationVector.Value = uv
-		}
-		allocation = append(allocation, allocationVector)
+		return true
 	}
 
-	return allocation
+	return ApplyVectorOp(req, used, allocationOp)
 }
 
 func addPVData(cache ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
@@ -1559,7 +1512,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		return nil, err
 	}
 
-	pvClaimMapping, err := getPVInfoVectors(resultPVRequests, clusterID)
+	pvClaimMapping, err := GetPVInfo(resultPVRequests, clusterID)
 	if err != nil {
 		// Just log for compatibility with KSM less than 1.6
 		klog.Infof("Unable to get PV Data: %s", err.Error())
@@ -2126,253 +2079,21 @@ type PersistentVolumeClaimData struct {
 
 func getCost(qr interface{}) (map[string][]*Vector, error) {
 	toReturn := make(map[string][]*Vector)
-	for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
-		}
-		instance, ok := metricMap["instance"]
-		if !ok {
-			return nil, fmt.Errorf("Instance field does not exist in data result vector")
-		}
-		instanceStr, ok := instance.(string)
-		if !ok {
-			return nil, fmt.Errorf("Instance is improperly formatted")
-		}
-		dataPoint, ok := val.(map[string]interface{})["value"]
-		if !ok {
-			return nil, fmt.Errorf("Value field does not exist in data result vector")
-		}
-		value, ok := dataPoint.([]interface{})
-		if !ok || len(value) != 2 {
-			return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-		}
-		var vectors []*Vector
-		strVal := value[1].(string)
-		v, _ := strconv.ParseFloat(strVal, 64)
-
-		vectors = append(vectors, &Vector{
-			Timestamp: value[0].(float64),
-			Value:     v,
-		})
-		toReturn[instanceStr] = vectors
+	result, err := NewQueryResults(qr)
+	if err != nil {
+		return toReturn, err
 	}
 
-	return toReturn, nil
-}
-
-func getPVInfoVectors(qr interface{}, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
-	pvmap := make(map[string]*PersistentVolumeClaimData)
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
+	for _, val := range result {
+		instance, err := val.GetString("instance")
 		if err != nil {
-			return nil, err
-		}
-		return nil, fmt.Errorf(e)
-	}
-	d, ok := data.(map[string]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
-	}
-	result, ok := d["result"]
-	if !ok {
-		return nil, fmt.Errorf("Result field not present in prometheus response")
-	}
-	results, ok := result.([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
-	}
-	for _, val := range results {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
-		}
-		pvclaim, ok := metricMap["persistentvolumeclaim"]
-		if !ok {
-			return nil, fmt.Errorf("Claim field does not exist in data result vector")
-		}
-		pvclaimStr, ok := pvclaim.(string)
-		if !ok {
-			return nil, fmt.Errorf("Claim field improperly formatted")
-		}
-		pvnamespace, ok := metricMap["namespace"]
-		if !ok {
-			return nil, fmt.Errorf("Namespace field does not exist in data result vector")
-		}
-		pvnamespaceStr, ok := pvnamespace.(string)
-		if !ok {
-			return nil, fmt.Errorf("Namespace field improperly formatted")
-		}
-		pv, ok := metricMap["volumename"]
-		if !ok {
-			klog.V(3).Infof("Warning: Unfulfilled claim %s: volumename field does not exist in data result vector", pvclaimStr)
-			pv = ""
-		}
-		pvStr, ok := pv.(string)
-		if !ok {
-			return nil, fmt.Errorf("Volumename field improperly formatted")
-		}
-		pvclass, ok := metricMap["storageclass"]
-		if !ok { // TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
-			klog.V(2).Infof("Storage Class not found for claim \"%s/%s\".", pvnamespaceStr, pvclaimStr)
-			pvclass = ""
-		}
-		pvclassStr, ok := pvclass.(string)
-		if !ok {
-			return nil, fmt.Errorf("StorageClass field improperly formatted")
-		}
-		cid, ok := metricMap["cluster_id"]
-		if !ok {
-			klog.V(4).Info("Prometheus vector does not have cluster id")
-			cid = defaultClusterID
-		}
-		clusterID, ok := cid.(string)
-		if !ok {
-			return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
-		}
-
-		values, ok := val.(map[string]interface{})["values"].([]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Values field is improperly formatted")
+			return toReturn, err
 		}
-		var vectors []*Vector
-		for _, value := range values {
-			dataPoint, ok := value.([]interface{})
-			if !ok || len(dataPoint) != 2 {
-				return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-			}
 
-			strVal := dataPoint[1].(string)
-			v, _ := strconv.ParseFloat(strVal, 64)
-			vectors = append(vectors, &Vector{
-				Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
-				Value:     v,
-			})
-		}
-		key := pvnamespaceStr + "," + pvclaimStr + "," + clusterID
-		pvmap[key] = &PersistentVolumeClaimData{
-			Class:      pvclassStr,
-			Claim:      pvclaimStr,
-			Namespace:  pvnamespaceStr,
-			ClusterID:  clusterID,
-			VolumeName: pvStr,
-			Values:     vectors,
-		}
+		toReturn[instance] = val.Values
 	}
-	return pvmap, nil
-}
 
-func getPVInfoVector(qr interface{}, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
-	pvmap := make(map[string]*PersistentVolumeClaimData)
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
-		if err != nil {
-			return nil, err
-		}
-		return nil, fmt.Errorf(e)
-	}
-	d, ok := data.(map[string]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
-	}
-	result, ok := d["result"]
-	if !ok {
-		return nil, fmt.Errorf("Result field not present in prometheus response")
-	}
-	results, ok := result.([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
-	}
-	for _, val := range results {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
-		}
-		pvclaim, ok := metricMap["persistentvolumeclaim"]
-		if !ok {
-			return nil, fmt.Errorf("Claim field does not exist in data result vector")
-		}
-		pvclaimStr, ok := pvclaim.(string)
-		if !ok {
-			return nil, fmt.Errorf("Claim field improperly formatted")
-		}
-		pvnamespace, ok := metricMap["namespace"]
-		if !ok {
-			return nil, fmt.Errorf("Namespace field does not exist in data result vector")
-		}
-		pvnamespaceStr, ok := pvnamespace.(string)
-		if !ok {
-			return nil, fmt.Errorf("Namespace field improperly formatted")
-		}
-		pv, ok := metricMap["volumename"]
-		if !ok {
-			klog.V(3).Infof("Warning: Unfulfilled claim %s: volumename field does not exist in data result vector", pvclaimStr)
-			pv = ""
-		}
-		pvStr, ok := pv.(string)
-		if !ok {
-			return nil, fmt.Errorf("Volumename field improperly formatted")
-		}
-		pvclass, ok := metricMap["storageclass"]
-		if !ok { // TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
-			klog.V(2).Infof("Storage Class not found for claim \"%s/%s\".", pvnamespaceStr, pvclaimStr)
-			pvclass = ""
-		}
-		pvclassStr, ok := pvclass.(string)
-		if !ok {
-			return nil, fmt.Errorf("StorageClass field improperly formatted")
-		}
-		cid, ok := metricMap["cluster_id"]
-		if !ok {
-			klog.V(4).Info("Prometheus vector does not have cluster id")
-			cid = defaultClusterID
-		}
-		clusterID, ok := cid.(string)
-		if !ok {
-			return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
-		}
-		dataPoint, ok := val.(map[string]interface{})["value"]
-		if !ok {
-			return nil, fmt.Errorf("Value field does not exist in data result vector")
-		}
-		value, ok := dataPoint.([]interface{})
-		if !ok || len(value) != 2 {
-			return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-		}
-		var vectors []*Vector
-		strVal := value[1].(string)
-		v, _ := strconv.ParseFloat(strVal, 64)
-
-		vectors = append(vectors, &Vector{
-			Timestamp: value[0].(float64),
-			Value:     v,
-		})
-
-		key := pvnamespaceStr + "," + pvclaimStr + "," + clusterID
-		pvmap[key] = &PersistentVolumeClaimData{
-			Class:      pvclassStr,
-			Claim:      pvclaimStr,
-			Namespace:  pvnamespaceStr,
-			ClusterID:  clusterID,
-			VolumeName: pvStr,
-			Values:     vectors,
-		}
-	}
-	return pvmap, nil
+	return toReturn, nil
 }
 
 func QueryRange(cli prometheusClient.Client, query string, start, end time.Time, step time.Duration) (interface{}, error) {
@@ -2435,65 +2156,38 @@ func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 }
 
 //todo: don't cast, implement unmarshaler interface
-func getNormalizations(qr interface{}) ([]*Vector, error) {
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
-		if err != nil {
-			return nil, err
-		}
-		return nil, fmt.Errorf(e)
-	}
-	results, ok := data.(map[string]interface{})["result"].([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Result field not found in normalization response, aborting")
+func getNormalization(qr interface{}) (float64, error) {
+	queryResults, err := NewQueryResults(qr)
+	if err != nil {
+		return 0, err
 	}
-	if len(results) > 0 {
-		vectors := []*Vector{}
-		for i := range results {
-			values, ok := results[i].(map[string]interface{})["values"].([]interface{})
-			for _, d := range values {
-				dataPoint := d.([]interface{})
-				if !ok || len(dataPoint) != 2 {
-					return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-				}
-				strVal := dataPoint[1].(string)
-				v, _ := strconv.ParseFloat(strVal, 64)
-				vectors = append(vectors, &Vector{
-					Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
-					Value:     v,
-				})
-			}
+
+	if len(queryResults) > 0 {
+		values := queryResults[0].Values
+
+		if len(values) > 0 {
+			return values[0].Value, nil
 		}
-		return vectors, nil
+		return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
 	}
-	return nil, fmt.Errorf("normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running")
+	return 0, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
 }
 
 //todo: don't cast, implement unmarshaler interface
-func getNormalization(qr interface{}) (float64, error) {
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
-		if err != nil {
-			return 0, err
-		}
-		return 0, fmt.Errorf(e)
-	}
-	results, ok := data.(map[string]interface{})["result"].([]interface{})
-	if !ok {
-		return 0, fmt.Errorf("Result field not found in normalization response, aborting")
+func getNormalizations(qr interface{}) ([]*Vector, error) {
+	queryResults, err := NewQueryResults(qr)
+	if err != nil {
+		return nil, err
 	}
-	if len(results) > 0 {
-		dataPoint := results[0].(map[string]interface{})["value"].([]interface{})
-		if len(dataPoint) == 2 {
-			strNorm := dataPoint[1].(string)
-			val, _ := strconv.ParseFloat(strNorm, 64)
-			return val, nil
+
+	if len(queryResults) > 0 {
+		vectors := []*Vector{}
+		for _, value := range queryResults {
+			vectors = append(vectors, value.Values...)
 		}
-		return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		return vectors, nil
 	}
-	return 0, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
+	return nil, fmt.Errorf("normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running")
 }
 
 type ContainerMetric struct {

+ 50 - 0
costmodel/promparsers.go

@@ -154,6 +154,56 @@ func parseDataPoint(dataPoint interface{}) (*Vector, error) {
 	}, nil
 }
 
+func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
+	toReturn := make(map[string]*PersistentVolumeClaimData)
+	result, err := NewQueryResults(qr)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		pvcName, err := val.GetString("persistentvolumeclaim")
+		if err != nil {
+			return toReturn, err
+		}
+
+		volumeName, err := val.GetString("volumename")
+		if err != nil {
+			klog.V(3).Infof("Warning: Unfulfilled claim %s: volumename field does not exist in data result vector", pvcName)
+			volumeName = ""
+		}
+
+		pvClass, err := val.GetString("storageclass")
+		if err != nil {
+			// TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
+			klog.V(2).Infof("Storage Class not found for claim \"%s/%s\".", ns, pvcName)
+			pvClass = ""
+		}
+
+		key := fmt.Sprintf("%s,%s,%s", ns, pvcName, clusterID)
+		toReturn[key] = &PersistentVolumeClaimData{
+			Class:      pvClass,
+			Claim:      pvcName,
+			Namespace:  ns,
+			ClusterID:  clusterID,
+			VolumeName: volumeName,
+			Values:     val.Values,
+		}
+	}
+
+	return toReturn, nil
+}
+
 func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string][]*PersistentVolumeClaimData)
 	result, err := NewQueryResults(queryResult)

+ 9 - 64
costmodel/vector.go

@@ -108,72 +108,17 @@ func VectorValue(v float64, ok bool) *float64 {
 // which has had its timestamps rounded and its values divided by the values
 // of the Vectors of yvs, such that yvs is the "unit" Vector slice.
 func NormalizeVectorByVector(xvs []*Vector, yvs []*Vector) []*Vector {
-	// round all non-zero timestamps to the nearest 10 second mark
-	for _, yv := range yvs {
-		if yv.Timestamp != 0 {
-			yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
-		}
-	}
-	for _, xv := range xvs {
-		if xv.Timestamp != 0 {
-			xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
+	normalizeOp := func(result *Vector, x *float64, y *float64) bool {
+		if x != nil && y != nil && *y != 0 {
+			result.Value = *x / *y
+		} else if x != nil {
+			result.Value = *x
+		} else if y != nil {
+			result.Value = 0
 		}
-	}
 
-	// if xvs is empty, return yvs
-	if xvs == nil || len(xvs) == 0 {
-		return yvs
-	}
-
-	// if yvs is empty, return xvs
-	if yvs == nil || len(yvs) == 0 {
-		return xvs
-	}
-
-	// sum stores the sum of the vector slices xvs and yvs
-	var sum []*Vector
-
-	// timestamps stores all timestamps present in both vector slices
-	// without duplicates
-	var timestamps []float64
-
-	// turn each vector slice into a map of timestamp-to-value so that
-	// values at equal timestamps can be lined-up and summed
-	xMap := make(map[float64]float64)
-	for _, xv := range xvs {
-		if xv.Timestamp == 0 {
-			continue
-		}
-		xMap[xv.Timestamp] = xv.Value
-		timestamps = append(timestamps, xv.Timestamp)
-	}
-	yMap := make(map[float64]float64)
-	for _, yv := range yvs {
-		if yv.Timestamp == 0 {
-			continue
-		}
-		yMap[yv.Timestamp] = yv.Value
-		if _, ok := xMap[yv.Timestamp]; !ok {
-			// no need to double add, since we'll range over sorted timestamps and check.
-			timestamps = append(timestamps, yv.Timestamp)
-		}
-	}
-
-	// iterate over each timestamp to produce a final normalized vector slice
-	sort.Float64s(timestamps)
-	for _, t := range timestamps {
-		x, okX := xMap[t]
-		y, okY := yMap[t]
-		sv := &Vector{Timestamp: t}
-		if okX && okY && y != 0 {
-			sv.Value = x / y
-		} else if okX {
-			sv.Value = x
-		} else if okY {
-			sv.Value = 0
-		}
-		sum = append(sum, sv)
+		return true
 	}
 
-	return sum
+	return ApplyVectorOp(xvs, yvs, normalizeOp)
 }