Niko Kovacevic 5 лет назад
Родитель
Сommit
e17599a3bb
5 измененных файлов с 186 добавлено и 293 удалено
  1. 57 253
      pkg/costmodel/costmodel.go
  2. 2 2
      pkg/costmodel/networkcosts.go
  3. 3 1
      pkg/costmodel/promparsers.go
  4. 94 7
      pkg/prom/query.go
  5. 30 30
      pkg/prom/result.go

+ 57 - 253
pkg/costmodel/costmodel.go

@@ -1,11 +1,8 @@
 package costmodel
 
 import (
-	"context"
-	"encoding/json"
 	"fmt"
 	"math"
-	"net/http"
 	"strconv"
 	"strings"
 	"sync"
@@ -37,8 +34,6 @@ const (
 
 	apiPrefix         = "/api/v1"
 	epAlertManagers   = apiPrefix + "/alertmanagers"
-	epQuery           = apiPrefix + "/query"
-	epQueryRange      = apiPrefix + "/query_range"
 	epLabelValues     = apiPrefix + "/label/:name/values"
 	epSeries          = apiPrefix + "/series"
 	epTargets         = apiPrefix + "/targets"
@@ -325,193 +320,81 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, window, "")
 	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, window, "")
 	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, window, "")
-	normalization := fmt.Sprintf(normalizationStr, window, offset)
+	queryNormalization := fmt.Sprintf(normalizationStr, window, offset)
 
 	// Cluster ID is specific to the source cluster
 	clusterID := env.GetClusterID()
 
-	var wg sync.WaitGroup
-	wg.Add(11)
-
-	var ec errors.ErrorCollector
-	var resultRAMRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultRAMRequests, promErr = Query(cli, queryRAMRequests)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("RAMRequests: %s", promErr))
-		}
-	}()
-
-	var resultRAMUsage interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultRAMUsage, promErr = Query(cli, queryRAMUsage)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("RAMUsage: %s", promErr))
-		}
-	}()
-	var resultCPURequests interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultCPURequests, promErr = Query(cli, queryCPURequests)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("CPURequests: %s", promErr))
-		}
-	}()
-	var resultCPUUsage interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultCPUUsage, promErr = Query(cli, queryCPUUsage)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("CPUUsage: %s", promErr))
-		}
-	}()
-	var resultGPURequests interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultGPURequests, promErr = Query(cli, queryGPURequests)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("GPURequests: %s", promErr))
-		}
-	}()
-	var resultPVRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultPVRequests, promErr = Query(cli, queryPVRequests)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("PVRequests: %s", promErr))
-		}
-	}()
-	var resultNetZoneRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultNetZoneRequests, promErr = Query(cli, queryNetZoneRequests)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("NetZoneRequests: %s", promErr))
-		}
-	}()
-	var resultNetRegionRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultNetRegionRequests, promErr = Query(cli, queryNetRegionRequests)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("NetRegionRequests: %s", promErr))
-		}
-	}()
-	var resultNetInternetRequests interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		resultNetInternetRequests, promErr = Query(cli, queryNetInternetRequests)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("NetInternetRequests: %s", promErr))
-		}
-	}()
-	var normalizationResult interface{}
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
-
-		var promErr error
-		normalizationResult, promErr = Query(cli, normalization)
-
-		if promErr != nil {
-			ec.Report(fmt.Errorf("normalization: %s", promErr))
-		}
-	}()
-
-	podDeploymentsMapping := make(map[string]map[string][]string)
-	podServicesMapping := make(map[string]map[string][]string)
-	namespaceLabelsMapping := make(map[string]map[string]string)
+	// Submit all Prometheus queries asynchronously
+	ctx := prom.NewContext(cli)
+	resChRAMRequests := ctx.Query(queryRAMRequests)
+	resChRAMUsage := ctx.Query(queryRAMUsage)
+	resChCPURequests := ctx.Query(queryCPURequests)
+	resChCPUUsage := ctx.Query(queryCPUUsage)
+	resChGPURequests := ctx.Query(queryGPURequests)
+	resChPVRequests := ctx.Query(queryPVRequests)
+	resChNetZoneRequests := ctx.Query(queryNetZoneRequests)
+	resChNetRegionRequests := ctx.Query(queryNetRegionRequests)
+	resChNetInternetRequests := ctx.Query(queryNetInternetRequests)
+	resChNormalization := ctx.Query(queryNormalization)
+
+	// Pull pod information from k8s API
 	podlist := cm.Cache.GetAllPods()
-	var k8sErr error
-	go func() {
-		defer wg.Done()
-		defer errors.HandlePanic()
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist, clusterID)
-		if k8sErr != nil {
-			return
-		}
-
-		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist, clusterID)
-		if k8sErr != nil {
-			return
-		}
-
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache, clusterID)
-		if k8sErr != nil {
-			return
-		}
-	}()
+	podDeploymentsMapping, err := getPodDeployments(cm.Cache, podlist, clusterID)
+	if err != nil {
+		return nil, err
+	}
 
-	wg.Wait()
+	podServicesMapping, err := getPodServices(cm.Cache, podlist, clusterID)
+	if err != nil {
+		return nil, err
+	}
 
-	defer measureTime(time.Now(), profileThreshold, "ComputeCostData: Processing Query Data")
+	namespaceLabelsMapping, err := getNamespaceLabels(cm.Cache, clusterID)
+	if err != nil {
+		return nil, err
+	}
 
-	if ec.IsError() {
-		for _, promErr := range ec.Errors() {
+	// Process Prometheus query results. Handle errors using ctx.Errors.
+	resRAMRequests, _ := resChRAMRequests.Await()
+	resRAMUsage, _ := resChRAMUsage.Await()
+	resCPURequests, _ := resChCPURequests.Await()
+	resCPUUsage, _ := resChCPUUsage.Await()
+	resGPURequests, _ := resChGPURequests.Await()
+	resPVRequests, _ := resChPVRequests.Await()
+	resNetZoneRequests, _ := resChNetZoneRequests.Await()
+	resNetRegionRequests, _ := resChNetRegionRequests.Await()
+	resNetInternetRequests, _ := resChNetInternetRequests.Await()
+	resNormalization, _ := resChNormalization.Await()
+
+	if ctx.HasErrors() {
+		for _, promErr := range ctx.Errors() {
 			log.Errorf("ComputeCostData: Prometheus error: %s", promErr.Error())
 		}
+
 		// TODO: Categorize fatal prometheus query failures
 		// return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
 	}
-	if k8sErr != nil {
-		return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
-	}
 
-	normalizationValue, err := getNormalization(normalizationResult)
+	defer measureTime(time.Now(), profileThreshold, "ComputeCostData: Processing Query Data")
+
+	normalizationValue, err := getNormalization(resNormalization)
 	if err != nil {
-		return nil, fmt.Errorf("Error parsing normalization values from %s: %s", normalization, err.Error())
+		return nil, fmt.Errorf("Error parsing normalization values from %s: %s", queryNormalization, err.Error())
 	}
 
 	nodes, err := cm.GetNodeCost(cp)
 	if err != nil {
-		klog.V(1).Infof("[Warning] no Node cost model available: " + err.Error())
+		log.Warningf("GetNodeCost: no node cost model available: " + err.Error())
 		return nil, err
 	}
 
 	// Unmounted PVs represent the PVs that are not mounted or tied to a volume on a container
 	unmountedPVs := make(map[string][]*PersistentVolumeClaimData)
-	pvClaimMapping, err := GetPVInfo(resultPVRequests, clusterID)
+	pvClaimMapping, err := GetPVInfo(resPVRequests, clusterID)
 	if err != nil {
-		klog.Infof("[Warning] Unable to get PV Data: %s", err.Error())
+		log.Warningf("GetPVInfo: unable to get PV data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
 		err = addPVData(cm.Cache, pvClaimMapping, cp)
@@ -524,7 +407,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		}
 	}
 
-	networkUsageMap, err := GetNetworkUsageData(resultNetZoneRequests, resultNetRegionRequests, resultNetInternetRequests, clusterID)
+	networkUsageMap, err := GetNetworkUsageData(resNetZoneRequests, resNetRegionRequests, resNetInternetRequests, clusterID)
 	if err != nil {
 		klog.V(1).Infof("[Warning] Unable to get Network Cost Data: %s", err.Error())
 		networkUsageMap = make(map[string]*NetworkUsageData)
@@ -533,7 +416,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	containerNameCost := make(map[string]*CostData)
 	containers := make(map[string]bool)
 
-	RAMReqMap, err := GetContainerMetricVector(resultRAMRequests, true, normalizationValue, clusterID)
+	RAMReqMap, err := GetContainerMetricVector(resRAMRequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
@@ -541,28 +424,28 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		containers[key] = true
 	}
 
-	RAMUsedMap, err := GetContainerMetricVector(resultRAMUsage, true, normalizationValue, clusterID)
+	RAMUsedMap, err := GetContainerMetricVector(resRAMUsage, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range RAMUsedMap {
 		containers[key] = true
 	}
-	CPUReqMap, err := GetContainerMetricVector(resultCPURequests, true, normalizationValue, clusterID)
+	CPUReqMap, err := GetContainerMetricVector(resCPURequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range CPUReqMap {
 		containers[key] = true
 	}
-	GPUReqMap, err := GetContainerMetricVector(resultGPURequests, true, normalizationValue, clusterID)
+	GPUReqMap, err := GetContainerMetricVector(resGPURequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range GPUReqMap {
 		containers[key] = true
 	}
-	CPUUsedMap, err := GetContainerMetricVector(resultCPUUsage, false, 0, clusterID) // No need to normalize here, as this comes from a counter
+	CPUUsedMap, err := GetContainerMetricVector(resCPUUsage, false, 0, clusterID) // No need to normalize here, as this comes from a counter
 	if err != nil {
 		return nil, err
 	}
@@ -2630,87 +2513,8 @@ func getCost(qr interface{}) (map[string][]*util.Vector, error) {
 	return toReturn, nil
 }
 
-func QueryRange(cli prometheusClient.Client, query string, start, end time.Time, step time.Duration) (interface{}, error) {
-	u := cli.URL(epQueryRange, nil)
-	q := u.Query()
-	q.Set("query", query)
-	q.Set("start", start.Format(time.RFC3339Nano))
-	q.Set("end", end.Format(time.RFC3339Nano))
-	q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
-	u.RawQuery = q.Encode()
-
-	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
-	if err != nil {
-		return nil, err
-	}
-
-	resp, body, warnings, err := cli.Do(context.Background(), req)
-	for _, w := range warnings {
-		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
-	}
-	if err != nil {
-		if resp == nil {
-			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
-		}
-
-		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
-	}
-
-	// Unsuccessful Status Code, log body and status
-	statusCode := resp.StatusCode
-	statusText := http.StatusText(statusCode)
-	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
-		return nil, fmt.Errorf("%d (%s) Headers: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
-	}
-
-	var toReturn interface{}
-	err = json.Unmarshal(body, &toReturn)
-	if err != nil {
-		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
-	}
-	return toReturn, nil
-}
-
-func Query(cli prometheusClient.Client, query string) (interface{}, error) {
-	u := cli.URL(epQuery, nil)
-	q := u.Query()
-	q.Set("query", query)
-	u.RawQuery = q.Encode()
-
-	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
-	if err != nil {
-		return nil, err
-	}
-
-	resp, body, warnings, err := cli.Do(context.Background(), req)
-	for _, w := range warnings {
-		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
-	}
-	if err != nil {
-		if resp == nil {
-			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
-		}
-
-		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
-	}
-
-	// Unsuccessful Status Code, log body and status
-	statusCode := resp.StatusCode
-	statusText := http.StatusText(statusCode)
-	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
-		return nil, fmt.Errorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
-	}
-
-	var toReturn interface{}
-	err = json.Unmarshal(body, &toReturn)
-	if err != nil {
-		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
-	}
-	return toReturn, nil
-}
-
 //todo: don't cast, implement unmarshaler interface
-func getNormalization(qr interface{}) (float64, error) {
+func getNormalization(qrs []*prom.QueryResult) (float64, error) {
 	// TODO: Pass actual query instead of getNormalization
 	qResults, err := prom.NewQueryResults("getNormalization", qr)
 	if err != nil {
@@ -2750,7 +2554,7 @@ func getNormalizations(qr interface{}) ([]*util.Vector, error) {
 	return nil, fmt.Errorf("normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running")
 }
 
-func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
+func GetContainerMetricVector(qrs []*prom.QueryResult, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
 	// TODO: Pass actual query instead of ContainerMetricVector
 	result, err := prom.NewQueryResults("ContainerMetricVector", qr)
 	if err != nil {

+ 2 - 2
pkg/costmodel/networkcosts.go

@@ -27,7 +27,7 @@ type NetworkUsageVector struct {
 
 // GetNetworkUsageData performs a join of the the results of zone, region, and internet usage queries to return a single
 // map containing network costs for each namespace+pod
-func GetNetworkUsageData(zr interface{}, rr interface{}, ir interface{}, defaultClusterID string) (map[string]*NetworkUsageData, error) {
+func GetNetworkUsageData(zr []*prom.QueryResult, rr []*prom.QueryResult, ir []*prom.QueryResult, defaultClusterID string) (map[string]*NetworkUsageData, error) {
 	zoneNetworkMap, err := getNetworkUsage(zr, defaultClusterID)
 	if err != nil {
 		return nil, err
@@ -137,7 +137,7 @@ func GetNetworkCost(usage *NetworkUsageData, cloud costAnalyzerCloud.Provider) (
 	return results, nil
 }
 
-func getNetworkUsage(qr interface{}, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
+func getNetworkUsage(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
 	ncdmap := make(map[string]*NetworkUsageVector)
 
 	// TODO: Pass actual query instead of NetworkUsage

+ 3 - 1
pkg/costmodel/promparsers.go

@@ -8,7 +8,9 @@ import (
 	"github.com/kubecost/cost-model/pkg/prom"
 )
 
-func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
+// TODO niko/prom move parsing functions from costmodel.go
+
+func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string]*PersistentVolumeClaimData)
 
 	// TODO: Pass actual query instead of PVInfo

+ 94 - 7
pkg/prom/query.go

@@ -5,6 +5,8 @@ import (
 	"encoding/json"
 	"fmt"
 	"net/http"
+	"strconv"
+	"time"
 
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -39,7 +41,31 @@ func (ctx *Context) Errors() []error {
 	return ctx.ErrorCollector.Errors()
 }
 
-// TODO SetMaxConcurrency
+// HasErrors returns true if the ErrorCollector has errors
+func (ctx *Context) HasErrors() bool {
+	return ctx.ErrorCollector.IsError()
+}
+
+// Query returns a QueryResultsChan, then runs the given query and sends the
+// results on the provided channel. Receiver is responsible for closing the
+// channel, preferably using the Read method.
+func (ctx *Context) Query(query string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
+
+	go func(ctx *Context, resCh QueryResultsChan) {
+		defer errors.HandlePanic()
+
+		raw, promErr := ctx.query(query)
+		ctx.ErrorCollector.Report(promErr)
+
+		results, parseErr := NewQueryResults(query, raw)
+		ctx.ErrorCollector.Report(parseErr)
+
+		resCh <- results
+	}(ctx, resCh)
+
+	return resCh
+}
 
 // QueryAll returns one QueryResultsChan for each query provided, then runs
 // each query concurrently and returns results on each channel, respectively,
@@ -55,16 +81,59 @@ func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
 	return resChs
 }
 
-// Query returns a QueryResultsChan, then runs the given query and sends the
-// results on the provided channel. Receiver is responsible for closing the
-// channel, preferably using the Read method.
-func (ctx *Context) Query(query string) QueryResultsChan {
+func (ctx *Context) QuerySync(query string) (*QueryResults, error) {
+	raw, err := ctx.query(query)
+	if err != nil {
+		return nil, err
+	}
+
+	results, err := NewQueryResults(query, raw)
+	if err != nil {
+		return nil, err
+	}
+
+	return results, nil
+}
+
+func (ctx *Context) query(query string) (interface{}, error) {
+	u := ctx.Client.URL(epQuery, nil)
+	q := u.Query()
+	q.Set("query", query)
+	u.RawQuery = q.Encode()
+
+	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
+	for _, w := range warnings {
+		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+	}
+	if err != nil {
+		if resp == nil {
+			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		}
+
+		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+	}
+
+	var toReturn interface{}
+	err = json.Unmarshal(body, &toReturn)
+	if err != nil {
+		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+	}
+
+	return toReturn, nil
+}
+
+func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
 	go func(ctx *Context, resCh QueryResultsChan) {
 		defer errors.HandlePanic()
 
-		raw, promErr := ctx.query(query)
+		raw, promErr := ctx.queryRange(query, start, end, step)
 		ctx.ErrorCollector.Report(promErr)
 
 		results, parseErr := NewQueryResults(query, raw)
@@ -76,10 +145,27 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 	return resCh
 }
 
-func (ctx *Context) query(query string) (interface{}, error) {
+func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time.Duration) (*QueryResults, error) {
+	raw, err := ctx.queryRange(query, start, end, step)
+	if err != nil {
+		return nil, err
+	}
+
+	results, err := NewQueryResults(query, raw)
+	if err != nil {
+		return nil, err
+	}
+
+	return results, nil
+}
+
+func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)
+	q.Set("start", start.Format(time.RFC3339Nano))
+	q.Set("end", end.Format(time.RFC3339Nano))
+	q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
 	u.RawQuery = q.Encode()
 
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
@@ -111,5 +197,6 @@ func (ctx *Context) query(query string) (interface{}, error) {
 	if err != nil {
 		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
+
 	return toReturn, nil
 }

+ 30 - 30
pkg/prom/result.go

@@ -11,36 +11,6 @@ import (
 	"github.com/kubecost/cost-model/pkg/util"
 )
 
-// QueryResultsChan is a channel of query results
-type QueryResultsChan chan *QueryResults
-
-// Await returns query results, blocking until they are made available, and
-// deferring the closure of the underlying channel
-func (qrc QueryResultsChan) Await() []*QueryResult {
-	defer close(qrc)
-	results := <-qrc
-
-	// Possible that the returned results are nil
-	if results == nil {
-		return nil
-	}
-
-	return results.Results
-}
-
-// QueryResult contains a single result from a prometheus query. It's common
-// to refer to query results as a slice of QueryResult
-type QueryResult struct {
-	Metric map[string]interface{}
-	Values []*util.Vector
-}
-
-// QueryResults contains all of the query results and the source query string.
-type QueryResults struct {
-	Query   string
-	Results []*QueryResult
-}
-
 var (
 	// Static Warnings for data point parsing
 	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
@@ -60,6 +30,36 @@ var (
 	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
 )
 
+// QueryResultsChan is a channel of query results
+type QueryResultsChan chan *QueryResults
+
+// Await returns query results, blocking until they are made available, and
+// deferring the closure of the underlying channel
+func (qrc QueryResultsChan) Await() ([]*QueryResult, error) {
+	defer close(qrc)
+
+	results := <-qrc
+	if results.Error != nil {
+		return nil, results.Error
+	}
+
+	return results.Results, nil
+}
+
+// QueryResults contains all of the query results and the source query string.
+type QueryResults struct {
+	Query   string
+	Error   error
+	Results []*QueryResult
+}
+
+// QueryResult contains a single result from a prometheus query. It's common
+// to refer to query results as a slice of QueryResult
+type QueryResult struct {
+	Metric map[string]interface{}
+	Values []*util.Vector
+}
+
 // NewQueryResults accepts the raw prometheus query result and returns an array of
 // QueryResult objects
 func NewQueryResults(query string, queryResult interface{}) (*QueryResults, error) {