Kaynağa Gözat

WIP: prom refactor CostDataRange, ComputeClusterCosts

Niko Kovacevic 5 yıl önce
ebeveyn
işleme
7925fa1205

+ 28 - 10
pkg/costmodel/cluster.go

@@ -216,10 +216,20 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		resChs = append(resChs, bdResChs...)
 	}
 
+	resDataCount, _ := resChs[0].Await()
+	resTotalGPU, _ := resChs[1].Await()
+	resTotalCPU, _ := resChs[2].Await()
+	resTotalRAM, _ := resChs[3].Await()
+	resTotalStorage, _ := resChs[4].Await()
+	resTotalLocalStorage, _ := resChs[5].Await()
+	if ctx.HasErrors() {
+		return nil, ctx.Errors()[0]
+	}
+
 	defaultClusterID := env.GetClusterID()
 
 	dataMinsByCluster := map[string]float64{}
-	for _, result := range resChs[0].Await() {
+	for _, result := range resDataCount {
 		clusterID, _ := result.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -268,18 +278,26 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		}
 	}
 	// Apply both sustained use and custom discounts to RAM and CPU
-	setCostsFromResults(costData, resChs[2].Await(), "cpu", discount, customDiscount)
-	setCostsFromResults(costData, resChs[3].Await(), "ram", discount, customDiscount)
+	setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
+	setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
 	// Apply only custom discount to GPU and storage
-	setCostsFromResults(costData, resChs[1].Await(), "gpu", 0.0, customDiscount)
-	setCostsFromResults(costData, resChs[4].Await(), "storage", 0.0, customDiscount)
-	setCostsFromResults(costData, resChs[5].Await(), "localstorage", 0.0, customDiscount)
+	setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
+	setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
+	setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
 
 	cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	pvUsedCostMap := map[string]float64{}
 	if withBreakdown {
-		for _, result := range resChs[6].Await() {
+		resCPUModePct, _ := resChs[6].Await()
+		resRAMSystemPct, _ := resChs[7].Await()
+		resRAMUserPct, _ := resChs[8].Await()
+		resUsedLocalStorage, _ := resChs[9].Await()
+		if ctx.HasErrors() {
+			return nil, ctx.Errors()[0]
+		}
+
+		for _, result := range resCPUModePct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -307,7 +325,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			}
 		}
 
-		for _, result := range resChs[7].Await() {
+		for _, result := range resRAMSystemPct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -318,7 +336,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			ramBD := ramBreakdownMap[clusterID]
 			ramBD.System += result.Values[0].Value
 		}
-		for _, result := range resChs[8].Await() {
+		for _, result := range resRAMUserPct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -337,7 +355,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			ramBD.Idle = remaining
 		}
 
-		for _, result := range resChs[9].Await() {
+		for _, result := range resUsedLocalStorage {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID

+ 93 - 467
pkg/costmodel/costmodel.go

@@ -5,13 +5,11 @@ import (
 	"math"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/env"
-	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
@@ -273,57 +271,6 @@ func ValidatePrometheus(cli prometheusClient.Client, isThanos bool) (*Prometheus
 	}, nil
 }
 
-func getUptimeData(qr interface{}) ([]*util.Vector, bool, error) {
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
-		if err != nil {
-			return nil, false, err
-		}
-		return nil, false, fmt.Errorf(e)
-	}
-	r, ok := data.(map[string]interface{})["result"]
-	if !ok {
-		return nil, false, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
-	}
-	results, ok := r.([]interface{})
-	if !ok {
-		return nil, false, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
-	}
-	jobData := []*util.Vector{}
-	kubecostMetrics := false
-	for _, val := range results {
-		// For now, just do this for validation. TODO: This can be parsed to figure out the exact running jobs.
-		metrics, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
-		if !ok {
-			return nil, false, fmt.Errorf("Prometheus vector does not have metric labels")
-		}
-		jobname, ok := metrics["job"]
-		if !ok {
-			return nil, false, fmt.Errorf("up query does not have job names")
-		}
-		if jobname == "kubecost" {
-			kubecostMetrics = true
-		}
-		value, ok := val.(map[string]interface{})["value"]
-		if !ok {
-			return nil, false, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
-		}
-		dataPoint, ok := value.([]interface{})
-		if !ok || len(dataPoint) != 2 {
-			return nil, false, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-		}
-		strVal := dataPoint[1].(string)
-		v, _ := strconv.ParseFloat(strVal, 64)
-		toReturn := &util.Vector{
-			Timestamp: dataPoint[0].(float64),
-			Value:     v,
-		}
-		jobData = append(jobData, toReturn)
-	}
-	return jobData, kubecostMetrics, nil
-}
-
 func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, window, offset, window, offset)
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset)
@@ -1556,7 +1503,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
 	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")
 	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, windowString, "")
-	normalization := fmt.Sprintf(normalizationStr, windowString, "")
+	queryNormalization := fmt.Sprintf(normalizationStr, windowString, "")
 
 	layout := "2006-01-02T15:04:05.000Z"
 
@@ -1587,348 +1534,95 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		return CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
 	}
 
-	numQueries := 22
-
-	var wg sync.WaitGroup
-	wg.Add(numQueries)
-
 	queryProfileStart := time.Now()
-	queryProfileCh := make(chan string, numQueries)
-
-	var ec errors.ErrorCollector
-	var resultRAMRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "RAMRequests", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("RAMRequests: %s", promErr))
-		}
-	}()
-	var resultRAMUsage interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "RAMUsage", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("RAMUsage: %s", promErr))
-		}
-	}()
-	var resultCPURequests interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "CPURequests", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("CPURequests: %s", promErr))
-		}
-	}()
-	var resultCPUUsage interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "CPUUsage", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("CPUUsage: %s", promErr))
-		}
-	}()
-	var resultRAMAllocations interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "RAMAllocations", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultRAMAllocations, promErr = QueryRange(cli, queryRAMAlloc, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("RAMAllocations: %s", promErr))
-		}
-	}()
-	var resultCPUAllocations interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "CPUAllocations", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultCPUAllocations, promErr = QueryRange(cli, queryCPUAlloc, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("CPUAllocations: %s", promErr))
-		}
-	}()
-	var resultGPURequests interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "GPURequests", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultGPURequests, promErr = QueryRange(cli, queryGPURequests, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("GPURequests: %s", promErr))
-		}
-	}()
-	var resultPVRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "PVRequests", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultPVRequests, promErr = QueryRange(cli, queryPVRequests, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("PVRequests: %s", promErr))
-		}
-	}()
-	var resultNetZoneRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "NetZoneRequests", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultNetZoneRequests, promErr = QueryRange(cli, queryNetZoneRequests, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("NetZoneRequests: %s", promErr))
-		}
-	}()
-	var resultNetRegionRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "NetRegionRequests", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultNetRegionRequests, promErr = QueryRange(cli, queryNetRegionRequests, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("NetRegionRequests: %s", promErr))
-		}
-	}()
-	var resultNetInternetRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "NetInternetRequests", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultNetInternetRequests, promErr = QueryRange(cli, queryNetInternetRequests, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("NetInternetRequests: %s", promErr))
-		}
-	}()
-	var pvPodAllocationResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "PVPodAllocation", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		pvPodAllocationResults, promErr = QueryRange(cli, queryPVCAllocation, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("PVPodAllocation: %s", promErr))
-		}
-	}()
-	var pvCostResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "PVCost", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		pvCostResults, promErr = QueryRange(cli, queryPVHourlyCost, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("PVCost: %s", promErr))
-		}
-	}()
-	var nsLabelsResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "NSLabels", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		nsLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryNSLabels, windowString), start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("NSLabels: %s", promErr))
-		}
-	}()
-	var podLabelsResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "PodLabels", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		podLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodLabels, windowString), start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("PodLabels: %s", promErr))
-		}
-	}()
-	var serviceLabelsResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "ServiceLabels", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		serviceLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("ServiceLabels: %s", promErr))
-		}
-	}()
-	var deploymentLabelsResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "DeploymentLabels", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("DeploymentLabels: %s", promErr))
-		}
-	}()
-	var daemonsetResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "Daemonsets", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		daemonsetResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodDaemonsets), start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("Daemonsets: %s", promErr))
-		}
-	}()
-	var jobResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "Jobs", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		jobResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodJobs), start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("Jobs: %s", promErr))
-		}
-	}()
-	var statefulsetLabelsResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "StatefulSetLabels", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		statefulsetLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryStatefulsetLabels, windowString), start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("StatefulSetLabels: %s", promErr))
-		}
-	}()
-	var normalizationResults interface{}
-	go func() {
-		defer wg.Done()
-		defer measureTimeAsync(time.Now(), profileThreshold, "Normalization", queryProfileCh)
-		defer errors.HandlePanic()
-
-		var promErr error
-		normalizationResults, promErr = QueryRange(cli, normalization, start, end, window)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("Normalization: %s", promErr))
-		}
-	}()
 
-	podDeploymentsMapping := make(map[string]map[string][]string)
-	podStatefulsetsMapping := make(map[string]map[string][]string)
-	podServicesMapping := make(map[string]map[string][]string)
-	namespaceLabelsMapping := make(map[string]map[string]string)
+	// Submit all queries for concurrent evaluation
+	ctx := prom.NewContext(cli)
+	resChRAMRequests := ctx.QueryRange(queryRAMRequests, start, end, window)
+	resChRAMUsage := ctx.QueryRange(queryRAMUsage, start, end, window)
+	resChRAMAlloc := ctx.QueryRange(queryRAMAlloc, start, end, window)
+	resChCPURequests := ctx.QueryRange(queryCPURequests, start, end, window)
+	resChCPUUsage := ctx.QueryRange(queryCPUUsage, start, end, window)
+	resChCPUAlloc := ctx.QueryRange(queryCPUAlloc, start, end, window)
+	resChGPURequests := ctx.QueryRange(queryGPURequests, start, end, window)
+	resChPVRequests := ctx.QueryRange(queryPVRequests, start, end, window)
+	resChPVCAlloc := ctx.QueryRange(queryPVCAllocation, start, end, window)
+	resChPVHourlyCost := ctx.QueryRange(queryPVHourlyCost, start, end, window)
+	resChNetZoneRequests := ctx.QueryRange(queryNetZoneRequests, start, end, window)
+	resChNetRegionRequests := ctx.QueryRange(queryNetRegionRequests, start, end, window)
+	resChNetInternetRequests := ctx.QueryRange(queryNetInternetRequests, start, end, window)
+	resChNSLabels := ctx.QueryRange(fmt.Sprintf(queryNSLabels, windowString), start, end, window)
+	resChPodLabels := ctx.QueryRange(fmt.Sprintf(queryPodLabels, windowString), start, end, window)
+	resChServiceLabels := ctx.QueryRange(fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
+	resChDeploymentLabels := ctx.QueryRange(fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
+	resChStatefulsetLabels := ctx.QueryRange(fmt.Sprintf(queryStatefulsetLabels, windowString), start, end, window)
+	resChJobs := ctx.QueryRange(queryPodJobs, start, end, window)
+	resChDaemonsets := ctx.QueryRange(queryPodDaemonsets, start, end, window)
+	resChNormalization := ctx.QueryRange(queryNormalization, start, end, window)
+
+	// Pull k8s pod, controller, service, and namespace details
 	podlist := cm.Cache.GetAllPods()
-	var k8sErr error
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist, clusterID)
-		if k8sErr != nil {
-			return
-		}
-
-		podStatefulsetsMapping, k8sErr = getPodStatefulsets(cm.Cache, podlist, clusterID)
-		if k8sErr != nil {
-			return
-		}
 
-		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist, clusterID)
-		if k8sErr != nil {
-			return
-		}
+	podDeploymentsMapping, err := getPodDeployments(cm.Cache, podlist, clusterID)
+	if err != nil {
+		return nil, fmt.Errorf("error querying the kubernetes API: %s", err)
+	}
 
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache, clusterID)
-		if k8sErr != nil {
-			return
-		}
-	}()
+	podStatefulsetsMapping, err := getPodStatefulsets(cm.Cache, podlist, clusterID)
+	if err != nil {
+		return nil, fmt.Errorf("error querying the kubernetes API: %s", err)
+	}
 
-	wg.Wait()
+	podServicesMapping, err := getPodServices(cm.Cache, podlist, clusterID)
+	if err != nil {
+		return nil, fmt.Errorf("error querying the kubernetes API: %s", err)
+	}
 
-	// collect all query profiling messages
-	close(queryProfileCh)
-	queryProfileBreakdown := ""
-	for msg := range queryProfileCh {
-		queryProfileBreakdown += "\n - " + msg
+	namespaceLabelsMapping, err := getNamespaceLabels(cm.Cache, clusterID)
+	if err != nil {
+		return nil, fmt.Errorf("error querying the kubernetes API: %s", err)
 	}
-	measureTime(queryProfileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): Prom/k8s Queries: %s", durHrs, queryProfileBreakdown))
 
+	// Process query results. Handle errors afterwards using ctx.Errors.
+	resRAMRequests, _ := resChRAMRequests.Await()
+	resRAMUsage, _ := resChRAMUsage.Await()
+	resRAMAlloc, _ := resChRAMAlloc.Await()
+	resCPURequests, _ := resChCPURequests.Await()
+	resCPUUsage, _ := resChCPUUsage.Await()
+	resCPUAlloc, _ := resChCPUAlloc.Await()
+	resGPURequests, _ := resChGPURequests.Await()
+	resPVRequests, _ := resChPVRequests.Await()
+	resPVCAlloc, _ := resChPVCAlloc.Await()
+	resPVHourlyCost, _ := resChPVHourlyCost.Await()
+	resNetZoneRequests, _ := resChNetZoneRequests.Await()
+	resNetRegionRequests, _ := resChNetRegionRequests.Await()
+	resNetInternetRequests, _ := resChNetInternetRequests.Await()
+	resNSLabels, _ := resChNSLabels.Await()
+	resPodLabels, _ := resChPodLabels.Await()
+	resServiceLabels, _ := resChServiceLabels.Await()
+	resDeploymentLabels, _ := resChDeploymentLabels.Await()
+	resStatefulsetLabels, _ := resChStatefulsetLabels.Await()
+	resDaemonsets, _ := resChDaemonsets.Await()
+	resJobs, _ := resChJobs.Await()
+	resNormalization, _ := resChNormalization.Await()
+
+	measureTime(queryProfileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): Prom/k8s Queries", durHrs))
 	defer measureTime(time.Now(), profileThreshold, fmt.Sprintf("costDataRange(%fh): Processing Query Data", durHrs))
 
-	if ec.IsError() {
-		for _, promErr := range ec.Errors() {
+	if ctx.HasErrors() {
+		for _, promErr := range ctx.Errors() {
 			log.Errorf("CostDataRange: Prometheus error: %s", promErr.Error())
 		}
+
 		// TODO: Categorize fatal prometheus query failures
 		// return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
 	}
-	if k8sErr != nil {
-		return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
-	}
 
 	profileStart := time.Now()
 
-	normalizationValue, err := getNormalizations(normalizationResults)
+	normalizationValue, err := getNormalizations(resNormalization)
 	if err != nil {
-		msg := fmt.Sprintf("error computing normalization %s for start=%s, end=%s, window=%s, res=%f", normalization, start, end, window, resolutionHours*60*60)
+		msg := fmt.Sprintf("error computing normalization for start=%s, end=%s, window=%s, res=%f", start, end, window, resolutionHours*60*60)
 		if pce, ok := err.(prom.CommError); ok {
 			return nil, pce.Wrap(msg)
 		}
@@ -1939,7 +1633,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	profileStart = time.Now()
 
-	pvClaimMapping, err := GetPVInfo(resultPVRequests, clusterID)
+	pvClaimMapping, err := GetPVInfo(resPVRequests, clusterID)
 	if err != nil {
 		// Just log for compatibility with KSM less than 1.6
 		klog.Infof("Unable to get PV Data: %s", err.Error())
@@ -1951,13 +1645,13 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		}
 	}
 
-	pvCostMapping, err := GetPVCostMetrics(pvCostResults, clusterID)
+	pvCostMapping, err := GetPVCostMetrics(resPVHourlyCost, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get PV Hourly Cost Data: %s", err.Error())
 	}
 
 	unmountedPVs := make(map[string][]*PersistentVolumeClaimData)
-	pvAllocationMapping, err := GetPVAllocationMetrics(pvPodAllocationResults, clusterID)
+	pvAllocationMapping, err := GetPVAllocationMetrics(resPVCAlloc, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get PV Allocation Cost Data: %s", err.Error())
 	}
@@ -1972,7 +1666,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	profileStart = time.Now()
 
-	nsLabels, err := GetNamespaceLabelsMetrics(nsLabelsResults, clusterID)
+	nsLabels, err := GetNamespaceLabelsMetrics(resNSLabels, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Namespace Labels for Metrics: %s", err.Error())
 	}
@@ -1980,22 +1674,22 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		appendNamespaceLabels(namespaceLabelsMapping, nsLabels)
 	}
 
-	podLabels, err := GetPodLabelsMetrics(podLabelsResults, clusterID)
+	podLabels, err := GetPodLabelsMetrics(resPodLabels, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Pod Labels for Metrics: %s", err.Error())
 	}
 
-	serviceLabels, err := GetServiceSelectorLabelsMetrics(serviceLabelsResults, clusterID)
+	serviceLabels, err := GetServiceSelectorLabelsMetrics(resServiceLabels, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Service Selector Labels for Metrics: %s", err.Error())
 	}
 
-	deploymentLabels, err := GetDeploymentMatchLabelsMetrics(deploymentLabelsResults, clusterID)
+	deploymentLabels, err := GetDeploymentMatchLabelsMetrics(resDeploymentLabels, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
 	}
 
-	statefulsetLabels, err := GetStatefulsetMatchLabelsMetrics(statefulsetLabelsResults, clusterID)
+	statefulsetLabels, err := GetStatefulsetMatchLabelsMetrics(resStatefulsetLabels, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
 	}
@@ -2016,12 +1710,12 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}
 	appendLabelsList(podDeploymentsMapping, podDeploymentsMetricsMapping)
 
-	podDaemonsets, err := GetPodDaemonsetsWithMetrics(daemonsetResults, clusterID)
+	podDaemonsets, err := GetPodDaemonsetsWithMetrics(resDaemonsets, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Pod Daemonsets for Metrics: %s", err.Error())
 	}
 
-	podJobs, err := GetPodJobsWithMetrics(jobResults, clusterID)
+	podJobs, err := GetPodJobsWithMetrics(resJobs, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Pod Jobs for Metrics: %s", err.Error())
 	}
@@ -2032,7 +1726,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}
 	appendLabelsList(podServicesMapping, podServicesMetricsMapping)
 
-	networkUsageMap, err := GetNetworkUsageData(resultNetZoneRequests, resultNetRegionRequests, resultNetInternetRequests, clusterID)
+	networkUsageMap, err := GetNetworkUsageData(resNetZoneRequests, resNetRegionRequests, resNetInternetRequests, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Network Cost Data: %s", err.Error())
 		networkUsageMap = make(map[string]*NetworkUsageData)
@@ -2046,7 +1740,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	containers := make(map[string]bool)
 	otherClusterPVRecorded := make(map[string]bool)
 
-	RAMReqMap, err := GetNormalizedContainerMetricVectors(resultRAMRequests, normalizationValue, clusterID)
+	RAMReqMap, err := GetNormalizedContainerMetricVectors(resRAMRequests, normalizationValue, clusterID)
 	if err != nil {
 		if pce, ok := err.(prom.CommError); ok {
 			return nil, pce.Wrap("GetNormalizedContainerMetricVectors(RAMRequests)")
@@ -2057,7 +1751,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		containers[key] = true
 	}
 
-	RAMUsedMap, err := GetNormalizedContainerMetricVectors(resultRAMUsage, normalizationValue, clusterID)
+	RAMUsedMap, err := GetNormalizedContainerMetricVectors(resRAMUsage, normalizationValue, clusterID)
 	if err != nil {
 		if pce, ok := err.(prom.CommError); ok {
 			return nil, pce.Wrap("GetNormalizedContainerMetricVectors(RAMUsage)")
@@ -2068,7 +1762,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		containers[key] = true
 	}
 
-	CPUReqMap, err := GetNormalizedContainerMetricVectors(resultCPURequests, normalizationValue, clusterID)
+	CPUReqMap, err := GetNormalizedContainerMetricVectors(resCPURequests, normalizationValue, clusterID)
 	if err != nil {
 		if pce, ok := err.(prom.CommError); ok {
 			return nil, pce.Wrap("GetNormalizedContainerMetricVectors(CPURequests)")
@@ -2081,7 +1775,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	// No need to normalize here, as this comes from a counter, namely:
 	// rate(container_cpu_usage_seconds_total) which properly accounts for normalized rates
-	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, clusterID)
+	CPUUsedMap, err := GetContainerMetricVectors(resCPUUsage, clusterID)
 	if err != nil {
 		if pce, ok := err.(prom.CommError); ok {
 			return nil, pce.Wrap("GetContainerMetricVectors(CPUUsage)")
@@ -2092,7 +1786,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		containers[key] = true
 	}
 
-	RAMAllocMap, err := GetContainerMetricVectors(resultRAMAllocations, clusterID)
+	RAMAllocMap, err := GetContainerMetricVectors(resRAMAlloc, clusterID)
 	if err != nil {
 		if pce, ok := err.(prom.CommError); ok {
 			return nil, pce.Wrap("GetContainerMetricVectors(RAMAllocations)")
@@ -2103,7 +1797,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		containers[key] = true
 	}
 
-	CPUAllocMap, err := GetContainerMetricVectors(resultCPUAllocations, clusterID)
+	CPUAllocMap, err := GetContainerMetricVectors(resCPUAlloc, clusterID)
 	if err != nil {
 		if pce, ok := err.(prom.CommError); ok {
 			return nil, pce.Wrap("GetContainerMetricVectors(CPUAllocations)")
@@ -2114,7 +1808,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		containers[key] = true
 	}
 
-	GPUReqMap, err := GetNormalizedContainerMetricVectors(resultGPURequests, normalizationValue, clusterID)
+	GPUReqMap, err := GetNormalizedContainerMetricVectors(resGPURequests, normalizationValue, clusterID)
 	if err != nil {
 		if pce, ok := err.(prom.CommError); ok {
 			return nil, pce.Wrap("GetContainerMetricVectors(GPURequests)")
@@ -2478,56 +2172,9 @@ func getCost(qr interface{}) (map[string][]*util.Vector, error) {
 	return toReturn, nil
 }
 
-//todo: don't cast, implement unmarshaler interface
-func getNormalization(qrs []*prom.QueryResult) (float64, error) {
-	// TODO: Pass actual query instead of getNormalization
-	qResults, err := prom.NewQueryResults("getNormalization", qr)
-	if err != nil {
-		return 0, err
-	}
-
-	queryResults := qResults.Results
-
-	if len(queryResults) > 0 {
-		values := queryResults[0].Values
-
-		if len(values) > 0 {
-			return values[0].Value, nil
-		}
-		return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-	}
-	return 0, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
-}
-
-//todo: don't cast, implement unmarshaler interface
-func getNormalizations(qr interface{}) ([]*util.Vector, error) {
-	// TODO: Pass actual query instead of getNormalizations
-	qResults, err := prom.NewQueryResults("getNormalizations", qr)
-	if err != nil {
-		return nil, err
-	}
-
-	queryResults := qResults.Results
-
-	if len(queryResults) > 0 {
-		vectors := []*util.Vector{}
-		for _, value := range queryResults {
-			vectors = append(vectors, value.Values...)
-		}
-		return vectors, nil
-	}
-	return nil, fmt.Errorf("normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running")
-}
-
 func GetContainerMetricVector(qrs []*prom.QueryResult, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
-	// TODO: Pass actual query instead of ContainerMetricVector
-	result, err := prom.NewQueryResults("ContainerMetricVector", qr)
-	if err != nil {
-		return nil, err
-	}
-
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
@@ -2543,15 +2190,9 @@ func GetContainerMetricVector(qrs []*prom.QueryResult, normalize bool, normaliza
 	return containerData, nil
 }
 
-func GetContainerMetricVectors(qr interface{}, defaultClusterID string) (map[string][]*util.Vector, error) {
-	// TODO: Pass actual query instead of ContainerMetricVectors
-	result, err := prom.NewQueryResults("ContainerMetricVectors", qr)
-	if err != nil {
-		return nil, err
-	}
-
+func GetContainerMetricVectors(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*util.Vector, error) {
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
@@ -2561,15 +2202,9 @@ func GetContainerMetricVectors(qr interface{}, defaultClusterID string) (map[str
 	return containerData, nil
 }
 
-func GetNormalizedContainerMetricVectors(qr interface{}, normalizationValues []*util.Vector, defaultClusterID string) (map[string][]*util.Vector, error) {
-	// TODO: Pass actual query instead of NormalizedContainerMetricVectors
-	result, err := prom.NewQueryResults("NormalizedContainerMetricVectors", qr)
-	if err != nil {
-		return nil, err
-	}
-
+func GetNormalizedContainerMetricVectors(qrs []*prom.QueryResult, normalizationValues []*util.Vector, defaultClusterID string) (map[string][]*util.Vector, error) {
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
@@ -2579,15 +2214,6 @@ func GetNormalizedContainerMetricVectors(qr interface{}, normalizationValues []*
 	return containerData, nil
 }
 
-func wrapPrometheusError(qr interface{}) (string, error) {
-	e, ok := qr.(map[string]interface{})["error"]
-	if !ok {
-		return "", fmt.Errorf("Unexpected response from Prometheus")
-	}
-	eStr, ok := e.(string)
-	return eStr, nil
-}
-
 func measureTime(start time.Time, threshold time.Duration, name string) {
 	elapsed := time.Since(start)
 	if elapsed > threshold {

+ 1 - 7
pkg/costmodel/networkcosts.go

@@ -140,13 +140,7 @@ func GetNetworkCost(usage *NetworkUsageData, cloud costAnalyzerCloud.Provider) (
 func getNetworkUsage(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
 	ncdmap := make(map[string]*NetworkUsageVector)
 
-	// TODO: Pass actual query instead of NetworkUsage
-	result, err := prom.NewQueryResults("NetworkUsage", qr)
-	if err != nil {
-		return nil, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		podName, err := val.GetString("pod_name")
 		if err != nil {
 			return nil, err

+ 42 - 77
pkg/costmodel/promparsers.go

@@ -7,6 +7,7 @@ import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/util"
 )
 
 // TODO niko/prom move parsing functions from costmodel.go
@@ -14,13 +15,7 @@ import (
 func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string]*PersistentVolumeClaimData)
 
-	// TODO: Pass actual query instead of PVInfo
-	result, err := prom.NewQueryResults("PVInfo", qr)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -63,16 +58,10 @@ func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*Pe
 	return toReturn, nil
 }
 
-func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
+func GetPVAllocationMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string][]*PersistentVolumeClaimData)
 
-	// TODO: Pass actual query instead of PVAllocationMetrics
-	result, err := prom.NewQueryResults("PVAllocationMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -115,16 +104,10 @@ func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (m
 	return toReturn, nil
 }
 
-func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
+func GetPVCostMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
 	toReturn := make(map[string]*costAnalyzerCloud.PV)
 
-	// TODO: Pass actual query instead of PVCostMetrics
-	result, err := prom.NewQueryResults("PVCostMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -144,16 +127,10 @@ func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[str
 	return toReturn, nil
 }
 
-func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetNamespaceLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of NamespaceLabelsMetrics
-	result, err := prom.NewQueryResults("NamespaceLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Namespace and ClusterID for key generation purposes
 		ns, err := val.GetString("namespace")
 		if err != nil {
@@ -177,16 +154,10 @@ func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string)
 	return toReturn, nil
 }
 
-func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetPodLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of PodLabelsMetrics
-	result, err := prom.NewQueryResults("PodLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Pod, Namespace and ClusterID for key generation purposes
 		pod, err := val.GetString("pod")
 		if err != nil {
@@ -217,16 +188,10 @@ func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[
 	return toReturn, nil
 }
 
-func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetStatefulsetMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of StatefulsetMatchLabelsMetrics
-	result, err := prom.NewQueryResults("StatefulsetMatchLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Statefulset, Namespace and ClusterID for key generation purposes
 		ss, err := val.GetString("statefulSet")
 		if err != nil {
@@ -250,15 +215,10 @@ func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID
 	return toReturn, nil
 }
 
-func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
+func GetPodDaemonsetsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
 	toReturn := make(map[string]string)
 
-	// TODO: Pass actual query instead of PodDaemonsetsWithMetrics
-	result, err := prom.NewQueryResults("PodDaemonsetsWithMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		ds, err := val.GetString("owner_name")
 		if err != nil {
 			return toReturn, err
@@ -286,15 +246,10 @@ func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID strin
 	return toReturn, nil
 }
 
-func GetPodJobsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
+func GetPodJobsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
 	toReturn := make(map[string]string)
 
-	// TODO: Pass actual query instead of PodJobsWithMetrics
-	result, err := prom.NewQueryResults("PodJobsWithMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		ds, err := val.GetString("owner_name")
 		if err != nil {
 			return toReturn, err
@@ -322,16 +277,10 @@ func GetPodJobsWithMetrics(queryResult interface{}, defaultClusterID string) (ma
 	return toReturn, nil
 }
 
-func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetDeploymentMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of DeploymentMatchLabelsMetrics
-	result, err := prom.NewQueryResults("DeploymentMatchLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Deployment, Namespace and ClusterID for key generation purposes
 		deployment, err := val.GetString("deployment")
 		if err != nil {
@@ -355,16 +304,10 @@ func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID s
 	return toReturn, nil
 }
 
-func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetServiceSelectorLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of ServiceSelectorLabelsMetrics
-	result, err := prom.NewQueryResults("ServiceSelectorLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Service, Namespace and ClusterID for key generation purposes
 		service, err := val.GetString("service")
 		if err != nil {
@@ -388,6 +331,28 @@ func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID s
 	return toReturn, nil
 }
 
+// TODO niko/prom retain message:
+// normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
+func getNormalization(qrs []*prom.QueryResult) (float64, error) {
+	if len(qrs) == 0 {
+		return 0.0, prom.NoDataErr
+	}
+	if len(qrs[0].Values) == 0 {
+		return 0.0, prom.NoDataErr
+	}
+	return qrs[0].Values[0].Value, nil
+}
+
+// TODO niko/prom retain message:
+// normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
+func getNormalizations(qrs []*prom.QueryResult) ([]*util.Vector, error) {
+	if len(qrs) == 0 {
+		return nil, prom.NoDataErr
+	}
+
+	return qrs[0].Values, nil
+}
+
 func parsePodLabels(qrs []*prom.QueryResult) (map[string]map[string]string, error) {
 	podLabels := map[string]map[string]string{}
 

+ 6 - 5
pkg/prom/result.go

@@ -17,17 +17,18 @@ var (
 	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
 
 	// Static Errors for query result parsing
-	QueryResultNilErr          error = NewCommError("nil queryResult")
-	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
 	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
+	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
+	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
+	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
+	NoDataErr                  error = errors.New("No data")
+	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
+	QueryResultNilErr          error = NewCommError("nil queryResult")
 	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
 	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
 	ResultFormatErr            error = errors.New("Result is improperly formatted")
-	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
-	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
 	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
 	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
-	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
 )
 
 // QueryResultsChan is a channel of query results