Prechádzať zdrojové kódy

Merge pull request #493 from kubecost/niko/prom

Prometheus query refactor
Niko Kovacevic 5 rokov pred
rodič
commit
6d61342ef3

+ 58 - 36
pkg/costmodel/cluster.go

@@ -144,10 +144,10 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
 	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
 
-	resPVCost := resChPVCost.Await()
-	resPVSize := resChPVSize.Await()
-	resLocalStorageCost := resChLocalStorageCost.Await()
-	resLocalStorageBytes := resChLocalStorageBytes.Await()
+	resPVCost, _ := resChPVCost.Await()
+	resPVSize, _ := resChPVSize.Await()
+	resLocalStorageCost, _ := resChLocalStorageCost.Await()
+	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 	}
@@ -305,12 +305,12 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
 	resChNodeLabels := ctx.Query(queryNodeLabels)
 
-	resNodeCPUCost := resChNodeCPUCost.Await()
-	resNodeCPUCores := resChNodeCPUCores.Await()
-	resNodeGPUCost := resChNodeGPUCost.Await()
-	resNodeRAMCost := resChNodeRAMCost.Await()
-	resNodeRAMBytes := resChNodeRAMBytes.Await()
-	resNodeLabels := resChNodeLabels.Await()
+	resNodeCPUCost, _ := resChNodeCPUCost.Await()
+	resNodeCPUCores, _ := resChNodeCPUCores.Await()
+	resNodeGPUCost, _ := resChNodeGPUCost.Await()
+	resNodeRAMCost, _ := resChNodeRAMCost.Await()
+	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
+	resNodeLabels, _ := resChNodeLabels.Await()
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 	}
@@ -624,10 +624,19 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		resChs = append(resChs, bdResChs...)
 	}
 
+	resDataCount, _ := resChs[0].Await()
+	resTotalGPU, _ := resChs[1].Await()
+	resTotalCPU, _ := resChs[2].Await()
+	resTotalRAM, _ := resChs[3].Await()
+	resTotalStorage, _ := resChs[4].Await()
+	if ctx.HasErrors() {
+		return nil, ctx.Errors()[0]
+	}
+
 	defaultClusterID := env.GetClusterID()
 
 	dataMinsByCluster := map[string]float64{}
-	for _, result := range resChs[0].Await() {
+	for _, result := range resDataCount {
 		clusterID, _ := result.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -676,20 +685,31 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		}
 	}
 	// Apply both sustained use and custom discounts to RAM and CPU
-	setCostsFromResults(costData, resChs[2].Await(), "cpu", discount, customDiscount)
-	setCostsFromResults(costData, resChs[3].Await(), "ram", discount, customDiscount)
+	setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
+	setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
 	// Apply only custom discount to GPU and storage
-	setCostsFromResults(costData, resChs[1].Await(), "gpu", 0.0, customDiscount)
-	setCostsFromResults(costData, resChs[4].Await(), "storage", 0.0, customDiscount)
+	setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
+	setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
 	if queryTotalLocalStorage != "" {
-		setCostsFromResults(costData, resChs[5].Await(), "localstorage", 0.0, customDiscount)
+		resTotalLocalStorage, err := resChs[5].Await()
+		if err != nil {
+			return nil, err
+		}
+		setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
 	}
 
 	cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	pvUsedCostMap := map[string]float64{}
 	if withBreakdown {
-		for _, result := range resChs[6].Await() {
+		resCPUModePct, _ := resChs[6].Await()
+		resRAMSystemPct, _ := resChs[7].Await()
+		resRAMUserPct, _ := resChs[8].Await()
+		if ctx.HasErrors() {
+			return nil, ctx.Errors()[0]
+		}
+
+		for _, result := range resCPUModePct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -717,7 +737,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			}
 		}
 
-		for _, result := range resChs[7].Await() {
+		for _, result := range resRAMSystemPct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -728,7 +748,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			ramBD := ramBreakdownMap[clusterID]
 			ramBD.System += result.Values[0].Value
 		}
-		for _, result := range resChs[8].Await() {
+		for _, result := range resRAMUserPct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -748,7 +768,11 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		}
 
 		if queryUsedLocalStorage != "" {
-			for _, result := range resChs[9].Await() {
+			resUsedLocalStorage, err := resChs[9].Await()
+			if err != nil {
+				return nil, err
+			}
+			for _, result := range resUsedLocalStorage {
 				clusterID, _ := result.GetString("cluster_id")
 				if clusterID == "" {
 					clusterID = defaultClusterID
@@ -804,20 +828,12 @@ type Totals struct {
 	StorageCost [][]string `json:"storageCost"`
 }
 
-func resultToTotals(qr interface{}) ([][]string, error) {
-	// TODO: Provide an actual query instead of resultToTotals
-	qResults, err := prom.NewQueryResults("resultToTotals", qr)
-	if err != nil {
-		return nil, err
-	}
-
-	results := qResults.Results
-
-	if len(results) == 0 {
+func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
+	if len(qrs) == 0 {
 		return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
 	}
 
-	result := results[0]
+	result := qrs[0]
 	totals := [][]string{}
 	for _, value := range result.Values {
 		d0 := fmt.Sprintf("%f", value.Timestamp)
@@ -866,22 +882,28 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 
-	resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
+	ctx := prom.NewContext(cli)
+	resChClusterCores := ctx.QueryRange(qCores, start, end, window)
+	resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
+	resChStorage := ctx.QueryRange(qStorage, start, end, window)
+	resChTotal := ctx.QueryRange(qTotal, start, end, window)
+
+	resultClusterCores, err := resChClusterCores.Await()
 	if err != nil {
 		return nil, err
 	}
 
-	resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
+	resultClusterRAM, err := resChClusterRAM.Await()
 	if err != nil {
 		return nil, err
 	}
 
-	resultStorage, err := QueryRange(cli, qStorage, start, end, window)
+	resultStorage, err := resChStorage.Await()
 	if err != nil {
 		return nil, err
 	}
 
-	resultTotal, err := QueryRange(cli, qTotal, start, end, window)
+	resultTotal, err := resChTotal.Await()
 	if err != nil {
 		return nil, err
 	}
@@ -910,7 +932,7 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 		// If that fails, return an error because something is actually wrong.
 		qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
 
-		resultNodes, err := QueryRange(cli, qNodes, start, end, window)
+		resultNodes, err := ctx.QueryRangeSync(qNodes, start, end, window)
 		if err != nil {
 			return nil, err
 		}

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 171 - 629
pkg/costmodel/costmodel.go


+ 3 - 9
pkg/costmodel/networkcosts.go

@@ -27,7 +27,7 @@ type NetworkUsageVector struct {
 
 // GetNetworkUsageData performs a join of the the results of zone, region, and internet usage queries to return a single
 // map containing network costs for each namespace+pod
-func GetNetworkUsageData(zr interface{}, rr interface{}, ir interface{}, defaultClusterID string) (map[string]*NetworkUsageData, error) {
+func GetNetworkUsageData(zr []*prom.QueryResult, rr []*prom.QueryResult, ir []*prom.QueryResult, defaultClusterID string) (map[string]*NetworkUsageData, error) {
 	zoneNetworkMap, err := getNetworkUsage(zr, defaultClusterID)
 	if err != nil {
 		return nil, err
@@ -137,16 +137,10 @@ func GetNetworkCost(usage *NetworkUsageData, cloud costAnalyzerCloud.Provider) (
 	return results, nil
 }
 
-func getNetworkUsage(qr interface{}, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
+func getNetworkUsage(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
 	ncdmap := make(map[string]*NetworkUsageVector)
 
-	// TODO: Pass actual query instead of NetworkUsage
-	result, err := prom.NewQueryResults("NetworkUsage", qr)
-	if err != nil {
-		return nil, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		podName, err := val.GetString("pod_name")
 		if err != nil {
 			return nil, err

+ 123 - 78
pkg/costmodel/promparsers.go

@@ -1,23 +1,21 @@
 package costmodel
 
 import (
+	"errors"
 	"fmt"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/util"
 )
 
-func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
-	toReturn := make(map[string]*PersistentVolumeClaimData)
+// TODO niko/prom move parsing functions from costmodel.go
 
-	// TODO: Pass actual query instead of PVInfo
-	result, err := prom.NewQueryResults("PVInfo", qr)
-	if err != nil {
-		return toReturn, err
-	}
+func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
+	toReturn := make(map[string]*PersistentVolumeClaimData)
 
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -60,16 +58,10 @@ func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentV
 	return toReturn, nil
 }
 
-func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
+func GetPVAllocationMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string][]*PersistentVolumeClaimData)
 
-	// TODO: Pass actual query instead of PVAllocationMetrics
-	result, err := prom.NewQueryResults("PVAllocationMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -112,16 +104,10 @@ func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (m
 	return toReturn, nil
 }
 
-func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
+func GetPVCostMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
 	toReturn := make(map[string]*costAnalyzerCloud.PV)
 
-	// TODO: Pass actual query instead of PVCostMetrics
-	result, err := prom.NewQueryResults("PVCostMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -141,16 +127,10 @@ func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[str
 	return toReturn, nil
 }
 
-func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetNamespaceLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of NamespaceLabelsMetrics
-	result, err := prom.NewQueryResults("NamespaceLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Namespace and ClusterID for key generation purposes
 		ns, err := val.GetString("namespace")
 		if err != nil {
@@ -174,16 +154,10 @@ func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string)
 	return toReturn, nil
 }
 
-func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetPodLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of PodLabelsMetrics
-	result, err := prom.NewQueryResults("PodLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Pod, Namespace and ClusterID for key generation purposes
 		pod, err := val.GetString("pod")
 		if err != nil {
@@ -214,16 +188,10 @@ func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[
 	return toReturn, nil
 }
 
-func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetStatefulsetMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of StatefulsetMatchLabelsMetrics
-	result, err := prom.NewQueryResults("StatefulsetMatchLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Statefulset, Namespace and ClusterID for key generation purposes
 		ss, err := val.GetString("statefulSet")
 		if err != nil {
@@ -247,15 +215,10 @@ func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID
 	return toReturn, nil
 }
 
-func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
+func GetPodDaemonsetsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
 	toReturn := make(map[string]string)
 
-	// TODO: Pass actual query instead of PodDaemonsetsWithMetrics
-	result, err := prom.NewQueryResults("PodDaemonsetsWithMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		ds, err := val.GetString("owner_name")
 		if err != nil {
 			return toReturn, err
@@ -283,15 +246,10 @@ func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID strin
 	return toReturn, nil
 }
 
-func GetPodJobsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
+func GetPodJobsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
 	toReturn := make(map[string]string)
 
-	// TODO: Pass actual query instead of PodJobsWithMetrics
-	result, err := prom.NewQueryResults("PodJobsWithMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		ds, err := val.GetString("owner_name")
 		if err != nil {
 			return toReturn, err
@@ -319,16 +277,10 @@ func GetPodJobsWithMetrics(queryResult interface{}, defaultClusterID string) (ma
 	return toReturn, nil
 }
 
-func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetDeploymentMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of DeploymentMatchLabelsMetrics
-	result, err := prom.NewQueryResults("DeploymentMatchLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Deployment, Namespace and ClusterID for key generation purposes
 		deployment, err := val.GetString("deployment")
 		if err != nil {
@@ -352,16 +304,10 @@ func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID s
 	return toReturn, nil
 }
 
-func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetServiceSelectorLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of ServiceSelectorLabelsMetrics
-	result, err := prom.NewQueryResults("ServiceSelectorLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Service, Namespace and ClusterID for key generation purposes
 		service, err := val.GetString("service")
 		if err != nil {
@@ -384,3 +330,102 @@ func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID s
 
 	return toReturn, nil
 }
+
+func GetContainerMetricVector(qrs []*prom.QueryResult, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
+	containerData := make(map[string][]*util.Vector)
+	for _, val := range qrs {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+		if err != nil {
+			return nil, err
+		}
+
+		if normalize && normalizationValue != 0 {
+			for _, v := range val.Values {
+				v.Value = v.Value / normalizationValue
+			}
+		}
+		containerData[containerMetric.Key()] = val.Values
+	}
+	return containerData, nil
+}
+
+func GetContainerMetricVectors(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*util.Vector, error) {
+	containerData := make(map[string][]*util.Vector)
+	for _, val := range qrs {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+		if err != nil {
+			return nil, err
+		}
+		containerData[containerMetric.Key()] = val.Values
+	}
+	return containerData, nil
+}
+
+func GetNormalizedContainerMetricVectors(qrs []*prom.QueryResult, normalizationValues []*util.Vector, defaultClusterID string) (map[string][]*util.Vector, error) {
+	containerData := make(map[string][]*util.Vector)
+	for _, val := range qrs {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+		if err != nil {
+			return nil, err
+		}
+		containerData[containerMetric.Key()] = util.NormalizeVectorByVector(val.Values, normalizationValues)
+	}
+	return containerData, nil
+}
+
+func getCost(qrs []*prom.QueryResult) (map[string][]*util.Vector, error) {
+	toReturn := make(map[string][]*util.Vector)
+
+	for _, val := range qrs {
+		instance, err := val.GetString("instance")
+		if err != nil {
+			return toReturn, err
+		}
+
+		toReturn[instance] = val.Values
+	}
+
+	return toReturn, nil
+}
+
+// TODO niko/prom retain message:
+// normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
+func getNormalization(qrs []*prom.QueryResult) (float64, error) {
+	if len(qrs) == 0 {
+		return 0.0, prom.NoDataErr
+	}
+	if len(qrs[0].Values) == 0 {
+		return 0.0, prom.NoDataErr
+	}
+	return qrs[0].Values[0].Value, nil
+}
+
+// TODO niko/prom retain message:
+// normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
+func getNormalizations(qrs []*prom.QueryResult) ([]*util.Vector, error) {
+	if len(qrs) == 0 {
+		return nil, prom.NoDataErr
+	}
+
+	return qrs[0].Values, nil
+}
+
+func parsePodLabels(qrs []*prom.QueryResult) (map[string]map[string]string, error) {
+	podLabels := map[string]map[string]string{}
+
+	for _, result := range qrs {
+		pod, err := result.GetString("pod")
+		if err != nil {
+			return podLabels, errors.New("missing pod field")
+		}
+
+		if _, ok := podLabels[pod]; ok {
+			podLabels[pod] = result.GetLabels()
+		} else {
+			podLabels[pod] = map[string]string{}
+			podLabels[pod] = result.GetLabels()
+		}
+	}
+
+	return podLabels, nil
+}

+ 109 - 10
pkg/prom/query.go

@@ -5,6 +5,9 @@ import (
 	"encoding/json"
 	"fmt"
 	"net/http"
+	"net/url"
+	"strconv"
+	"time"
 
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -13,8 +16,9 @@ import (
 )
 
 const (
-	apiPrefix = "/api/v1"
-	epQuery   = apiPrefix + "/query"
+	apiPrefix    = "/api/v1"
+	epQuery      = apiPrefix + "/query"
+	epQueryRange = apiPrefix + "/query_range"
 )
 
 // Context wraps a Prometheus client and provides methods for querying and
@@ -39,7 +43,31 @@ func (ctx *Context) Errors() []error {
 	return ctx.ErrorCollector.Errors()
 }
 
-// TODO SetMaxConcurrency
+// HasErrors returns true if the ErrorCollector has errors
+func (ctx *Context) HasErrors() bool {
+	return ctx.ErrorCollector.IsError()
+}
+
+// Query returns a QueryResultsChan, then runs the given query and sends the
+// results on the provided channel. Receiver is responsible for closing the
+// channel, preferably using the Read method.
+func (ctx *Context) Query(query string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
+
+	go func(ctx *Context, resCh QueryResultsChan) {
+		defer errors.HandlePanic()
+
+		raw, promErr := ctx.query(query)
+		ctx.ErrorCollector.Report(promErr)
+
+		results, parseErr := NewQueryResults(query, raw)
+		ctx.ErrorCollector.Report(parseErr)
+
+		resCh <- results
+	}(ctx, resCh)
+
+	return resCh
+}
 
 // QueryAll returns one QueryResultsChan for each query provided, then runs
 // each query concurrently and returns results on each channel, respectively,
@@ -55,16 +83,64 @@ func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
 	return resChs
 }
 
-// Query returns a QueryResultsChan, then runs the given query and sends the
-// results on the provided channel. Receiver is responsible for closing the
-// channel, preferably using the Read method.
-func (ctx *Context) Query(query string) QueryResultsChan {
+func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
+	raw, err := ctx.query(query)
+	if err != nil {
+		return nil, err
+	}
+
+	results, err := NewQueryResults(query, raw)
+	if err != nil {
+		return nil, err
+	}
+
+	return results.Results, nil
+}
+
+// QueryURL returns the URL used to query Prometheus
+func (ctx *Context) QueryURL() *url.URL {
+	return ctx.Client.URL(epQuery, nil)
+}
+
+func (ctx *Context) query(query string) (interface{}, error) {
+	u := ctx.Client.URL(epQuery, nil)
+	q := u.Query()
+	q.Set("query", query)
+	u.RawQuery = q.Encode()
+
+	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
+	for _, w := range warnings {
+		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+	}
+	if err != nil {
+		if resp == nil {
+			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		}
+
+		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+	}
+
+	var toReturn interface{}
+	err = json.Unmarshal(body, &toReturn)
+	if err != nil {
+		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+	}
+
+	return toReturn, nil
+}
+
+func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
 	go func(ctx *Context, resCh QueryResultsChan) {
 		defer errors.HandlePanic()
 
-		raw, promErr := ctx.query(query)
+		raw, promErr := ctx.queryRange(query, start, end, step)
 		ctx.ErrorCollector.Report(promErr)
 
 		results, parseErr := NewQueryResults(query, raw)
@@ -76,10 +152,32 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 	return resCh
 }
 
-func (ctx *Context) query(query string) (interface{}, error) {
-	u := ctx.Client.URL(epQuery, nil)
+func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time.Duration) ([]*QueryResult, error) {
+	raw, err := ctx.queryRange(query, start, end, step)
+	if err != nil {
+		return nil, err
+	}
+
+	results, err := NewQueryResults(query, raw)
+	if err != nil {
+		return nil, err
+	}
+
+	return results.Results, nil
+}
+
+// QueryRangeURL returns the URL used to query_range Prometheus
+func (ctx *Context) QueryRangeURL() *url.URL {
+	return ctx.Client.URL(epQueryRange, nil)
+}
+
+func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
+	u := ctx.Client.URL(epQueryRange, nil)
 	q := u.Query()
 	q.Set("query", query)
+	q.Set("start", start.Format(time.RFC3339Nano))
+	q.Set("end", end.Format(time.RFC3339Nano))
+	q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
 	u.RawQuery = q.Encode()
 
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
@@ -111,5 +209,6 @@ func (ctx *Context) query(query string) (interface{}, error) {
 	if err != nil {
 		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
+
 	return toReturn, nil
 }

+ 32 - 31
pkg/prom/result.go

@@ -11,54 +11,55 @@ import (
 	"github.com/kubecost/cost-model/pkg/util"
 )
 
+var (
+	// Static Warnings for data point parsing
+	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
+	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
+
+	// Static Errors for query result parsing
+	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
+	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
+	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
+	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
+	NoDataErr                  error = errors.New("No data")
+	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
+	QueryResultNilErr          error = NewCommError("nil queryResult")
+	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
+	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
+	ResultFormatErr            error = errors.New("Result is improperly formatted")
+	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
+	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
+)
+
 // QueryResultsChan is a channel of query results
 type QueryResultsChan chan *QueryResults
 
 // Await returns query results, blocking until they are made available, and
 // deferring the closure of the underlying channel
-func (qrc QueryResultsChan) Await() []*QueryResult {
+func (qrc QueryResultsChan) Await() ([]*QueryResult, error) {
 	defer close(qrc)
-	results := <-qrc
 
-	// Possible that the returned results are nil
-	if results == nil {
-		return nil
+	results := <-qrc
+	if results.Error != nil {
+		return nil, results.Error
 	}
 
-	return results.Results
-}
-
-// QueryResult contains a single result from a prometheus query. It's common
-// to refer to query results as a slice of QueryResult
-type QueryResult struct {
-	Metric map[string]interface{}
-	Values []*util.Vector
+	return results.Results, nil
 }
 
 // QueryResults contains all of the query results and the source query string.
 type QueryResults struct {
 	Query   string
+	Error   error
 	Results []*QueryResult
 }
 
-var (
-	// Static Warnings for data point parsing
-	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
-	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
-
-	// Static Errors for query result parsing
-	QueryResultNilErr          error = NewCommError("nil queryResult")
-	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
-	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
-	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
-	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
-	ResultFormatErr            error = errors.New("Result is improperly formatted")
-	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
-	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
-	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
-	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
-	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
-)
+// QueryResult contains a single result from a prometheus query. It's common
+// to refer to query results as a slice of QueryResult
+type QueryResult struct {
+	Metric map[string]interface{}
+	Values []*util.Vector
+}
 
 // NewQueryResults accepts the raw prometheus query result and returns an array of
 // QueryResult objects

Niektoré súbory nie sú zobrazené, pretože je v týchto rozdielových dátach zmenené mnoho súborov