Parcourir la source

Prometheus Parsing and CostModel Cleanup

Matt Bolt il y a 5 ans
Parent
commit
f2e3d710db

+ 4 - 28
pkg/costmodel/cluster.go

@@ -3,11 +3,9 @@ package costmodel
 import (
 	"fmt"
 	"os"
-	"sync"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/cloud"
-	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
@@ -38,31 +36,6 @@ const (
 	queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
 )
 
-// TODO move this to a package-accessible helper
-type PromQueryContext struct {
-	Client         prometheus.Client
-	ErrorCollector *errors.ErrorCollector
-	WaitGroup      *sync.WaitGroup
-}
-
-// TODO move this to a package-accessible helper function once dependencies are able to
-// be extricated from costmodel package (PromQueryResult -> util.Vector). Otherwise, circular deps.
-func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
-	if ctx.WaitGroup != nil {
-		defer ctx.WaitGroup.Done()
-	}
-
-	defer errors.HandlePanic()
-
-	raw, promErr := Query(ctx.Client, query)
-	ctx.ErrorCollector.Report(promErr)
-
-	results, parseErr := NewQueryResults(raw)
-	ctx.ErrorCollector.Report(parseErr)
-
-	resultCh <- results
-}
-
 // Costs represents cumulative and monthly cluster costs over a given duration. Costs
 // are broken down by cores, memory, and storage.
 type ClusterCosts struct {
@@ -412,11 +385,14 @@ type Totals struct {
 }
 
 func resultToTotals(qr interface{}) ([][]string, error) {
-	results, err := NewQueryResults(qr)
+	// TODO: Provide an actual query instead of resultToTotals
+	qResults, err := prom.NewQueryResults("resultToTotals", qr)
 	if err != nil {
 		return nil, err
 	}
 
+	results := qResults.Results
+
 	if len(results) == 0 {
 		return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
 	}

+ 212 - 0
pkg/costmodel/containerkeys.go

@@ -0,0 +1,212 @@
+package costmodel
+
+import (
+	"errors"
+	"strings"
+
+	"github.com/kubecost/cost-model/pkg/log"
+	"k8s.io/api/core/v1"
+)
+
+var (
+	// Static KeyTuple Errors
+	NewKeyTupleErr = errors.New("NewKeyTuple() Provided key not containing exactly 3 components.")
+
+	// Static Errors for ContainerMetric creation
+	InvalidKeyErr error = errors.New("Not a valid key")
+	NoContainerErr error = errors.New("Prometheus vector does not have container name")
+	NoContainerNameErr error = errors.New("Prometheus vector does not have string container name")
+	NoPodErr error = errors.New("Prometheus vector does not have pod name")
+	NoPodNameErr error = errors.New("Prometheus vector does not have string pod name")
+	NoNamespaceErr error = errors.New("Prometheus vector does not have namespace")
+	NoNamespaceNameErr error = errors.New("Prometheus vector does not have string namespace")
+	NoNodeNameErr error = errors.New("Prometheus vector does not have string node")
+	NoClusterIDErr error = errors.New("Prometheus vector does not have string cluster_id")
+)
+
+//--------------------------------------------------------------------------
+//  KeyTuple
+//--------------------------------------------------------------------------
+
+// KeyTuple contains is a utility which parses Namespace, Key, and ClusterID from a 
+// comma delimitted string.
+type KeyTuple struct {
+	key    string
+	kIndex int
+	cIndex int
+}
+
+// Namespace returns the the namespace from the string key.
+func (kt *KeyTuple) Namespace() string {
+	return kt.key[0 : kt.kIndex-1]
+}
+
+// Key returns the identifier from the string key.
+func (kt *KeyTuple) Key() string {
+	return kt.key[kt.kIndex : kt.cIndex-1]
+}
+
+// ClusterID returns the cluster identifier from the string key.
+func (kt *KeyTuple) ClusterID() string {
+	return kt.key[kt.cIndex:]
+}
+
+// NewKeyTuple creates a new KeyTuple instance by determining the exact indices of each tuple
+// entry. When each component is requested, a string slice is returned using the boundaries.
+func NewKeyTuple(key string) (*KeyTuple, error) {
+	kIndex := strings.IndexRune(key, ',')
+	if kIndex < 0 {
+		return nil, NewKeyTupleErr
+	}
+	kIndex += 1
+
+	subIndex := strings.IndexRune(key[kIndex:], ',')
+	if subIndex < 0 {
+		return nil, NewKeyTupleErr
+	}
+	cIndex := kIndex + subIndex + 1
+
+	if strings.ContainsRune(key[cIndex:], ',') {
+		return nil, NewKeyTupleErr
+	}
+
+	return &KeyTuple{
+		key:    key,
+		kIndex: kIndex,
+		cIndex: cIndex,
+	}, nil
+}
+
+//--------------------------------------------------------------------------
+//  ContainerMetric
+//--------------------------------------------------------------------------
+
+// ContainerMetric contains a set of identifiers specific to a kubernetes container including
+// a unique string key
+type ContainerMetric struct {
+	Namespace     string
+	PodName       string
+	ContainerName string
+	NodeName      string
+	ClusterID     string
+	key           string
+}
+
+// Key returns a unique string key that can be used in map[string]interface{}
+func (c *ContainerMetric) Key() string {
+	return c.key
+}
+
+// containerMetricKey creates a unique string key, a comma delimitted list of the provided
+// parameters.
+func containerMetricKey(ns, podName, containerName, nodeName, clusterID string) string {
+	return ns + "," + podName + "," + containerName + "," + nodeName + "," + clusterID
+}
+
+// NewContainerMetricFromKey creates a new ContainerMetric instance using a provided comma delimitted 
+// string key. 
+func NewContainerMetricFromKey(key string) (*ContainerMetric, error) {
+	s := strings.Split(key, ",")
+	if len(s) == 5 {
+		return &ContainerMetric{
+			Namespace:     s[0],
+			PodName:       s[1],
+			ContainerName: s[2],
+			NodeName:      s[3],
+			ClusterID:     s[4],
+			key:           key,
+		}, nil
+	}
+	return nil, InvalidKeyErr
+}
+
+// NewContainerMetricFromValues creates a new ContainerMetric instance using the provided string parameters.
+func NewContainerMetricFromValues(ns, podName, containerName, nodeName, clusterId string) *ContainerMetric {
+	return &ContainerMetric{
+		Namespace:     ns,
+		PodName:       podName,
+		ContainerName: containerName,
+		NodeName:      nodeName,
+		ClusterID:     clusterId,
+		key:           containerMetricKey(ns, podName, containerName, nodeName, clusterId),
+	}
+}
+
+// NewContainerMetricsFromPod creates a slice of ContainerMetric instances for each container in the 
+// provided Pod.
+func NewContainerMetricsFromPod(pod *v1.Pod, clusterID string) ([]*ContainerMetric, error) {
+	podName := pod.GetObjectMeta().GetName()
+	ns := pod.GetObjectMeta().GetNamespace()
+	node := pod.Spec.NodeName
+
+	var cs []*ContainerMetric
+	for _, container := range pod.Spec.Containers {
+		containerName := container.Name
+		cs = append(cs, &ContainerMetric{
+			Namespace:     ns,
+			PodName:       podName,
+			ContainerName: containerName,
+			NodeName:      node,
+			ClusterID:     clusterID,
+			key:           containerMetricKey(ns, podName, containerName, node, clusterID),
+		})
+	}
+	return cs, nil
+}
+
+// NewContainerMetricFromPrometheus accepts the metrics map from a QueryResult and returns a new ContainerMetric
+// instance
+func NewContainerMetricFromPrometheus(metrics map[string]interface{}, defaultClusterID string) (*ContainerMetric, error) {
+	// TODO: Can we use *prom.QueryResult.GetString() here?
+	cName, ok := metrics["container_name"]
+	if !ok {
+		return nil, NoContainerErr
+	}
+	containerName, ok := cName.(string)
+	if !ok {
+		return nil, NoContainerNameErr
+	}
+	pName, ok := metrics["pod_name"]
+	if !ok {
+		return nil, NoPodErr
+	}
+	podName, ok := pName.(string)
+	if !ok {
+		return nil, NoPodNameErr
+	}
+	ns, ok := metrics["namespace"]
+	if !ok {
+		return nil, NoNamespaceErr
+	}
+	namespace, ok := ns.(string)
+	if !ok {
+		return nil, NoNamespaceNameErr
+	}
+	node, ok := metrics["node"]
+	if !ok {
+		log.Debugf("Prometheus vector does not have node name")
+		node = ""
+	}
+	nodeName, ok := node.(string)
+	if !ok {
+		return nil, NoNodeNameErr
+	}
+	cid, ok := metrics["cluster_id"]
+	if !ok {
+		log.Debugf("Prometheus vector does not have cluster id")
+		cid = defaultClusterID
+	}
+	clusterID, ok := cid.(string)
+	if !ok {
+		return nil, NoClusterIDErr
+	}
+
+	return &ContainerMetric{
+		ContainerName: containerName,
+		PodName:       podName,
+		Namespace:     namespace,
+		NodeName:      nodeName,
+		ClusterID:     clusterID,
+		key:           containerMetricKey(namespace, podName, containerName, nodeName, clusterID),
+	}, nil
+}

+ 27 - 173
pkg/costmodel/costmodel.go

@@ -577,7 +577,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		if pod.Status.Phase != v1.PodRunning {
 			continue
 		}
-		cs, err := newContainerMetricsFromPod(*pod, clusterID)
+		cs, err := NewContainerMetricsFromPod(pod, clusterID)
 		if err != nil {
 			return nil, err
 		}
@@ -661,7 +661,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				containerName := container.Name
 
 				// recreate the key and look up data for this container
-				newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
+				newKey := NewContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
 
 				RAMReqV, ok := RAMReqMap[newKey]
 				if !ok {
@@ -847,7 +847,7 @@ func findUnmountedPVCostData(unmountedPVs map[string][]*PersistentVolumeClaimDat
 		// Should be a unique "Unmounted" cost data type
 		name := "unmounted-pvs"
 
-		metric := newContainerMetricFromValues(ns, name, name, "", clusterID)
+		metric := NewContainerMetricFromValues(ns, name, name, "", clusterID)
 		key := metric.Key()
 
 		if costData, ok := costs[key]; !ok {
@@ -2588,12 +2588,14 @@ type PersistentVolumeClaimData struct {
 
 func getCost(qr interface{}) (map[string][]*util.Vector, error) {
 	toReturn := make(map[string][]*util.Vector)
-	result, err := NewQueryResults(qr)
+
+	// TODO: Pass actual query instead of getCost
+	result, err := prom.NewQueryResults("getCost", qr)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		instance, err := val.GetString("instance")
 		if err != nil {
 			return toReturn, err
@@ -2668,11 +2670,14 @@ func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 
 //todo: don't cast, implement unmarshaler interface
 func getNormalization(qr interface{}) (float64, error) {
-	queryResults, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of getNormalization
+	qResults, err := prom.NewQueryResults("getNormalization", qr)
 	if err != nil {
 		return 0, err
 	}
 
+	queryResults := qResults.Results
+
 	if len(queryResults) > 0 {
 		values := queryResults[0].Values
 
@@ -2686,11 +2691,14 @@ func getNormalization(qr interface{}) (float64, error) {
 
 //todo: don't cast, implement unmarshaler interface
 func getNormalizations(qr interface{}) ([]*util.Vector, error) {
-	queryResults, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of getNormalizations
+	qResults, err := prom.NewQueryResults("getNormalizations", qr)
 	if err != nil {
 		return nil, err
 	}
 
+	queryResults := qResults.Results
+
 	if len(queryResults) > 0 {
 		vectors := []*util.Vector{}
 		for _, value := range queryResults {
@@ -2701,172 +2709,16 @@ func getNormalizations(qr interface{}) ([]*util.Vector, error) {
 	return nil, fmt.Errorf("normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running")
 }
 
-type ContainerMetric struct {
-	Namespace     string
-	PodName       string
-	ContainerName string
-	NodeName      string
-	ClusterID     string
-	key           string
-}
-
-func (c *ContainerMetric) Key() string {
-	return c.key
-}
-
-func containerMetricKey(ns, podName, containerName, nodeName, clusterID string) string {
-	return ns + "," + podName + "," + containerName + "," + nodeName + "," + clusterID
-}
-
-func NewContainerMetricFromKey(key string) (*ContainerMetric, error) {
-	s := strings.Split(key, ",")
-	if len(s) == 5 {
-		return &ContainerMetric{
-			Namespace:     s[0],
-			PodName:       s[1],
-			ContainerName: s[2],
-			NodeName:      s[3],
-			ClusterID:     s[4],
-			key:           key,
-		}, nil
-	}
-	return nil, fmt.Errorf("Not a valid key")
-}
-
-func newContainerMetricFromValues(ns string, podName string, containerName string, nodeName string, clusterId string) *ContainerMetric {
-	return &ContainerMetric{
-		Namespace:     ns,
-		PodName:       podName,
-		ContainerName: containerName,
-		NodeName:      nodeName,
-		ClusterID:     clusterId,
-		key:           containerMetricKey(ns, podName, containerName, nodeName, clusterId),
-	}
-}
-
-func newContainerMetricsFromPod(pod v1.Pod, clusterID string) ([]*ContainerMetric, error) {
-	podName := pod.GetObjectMeta().GetName()
-	ns := pod.GetObjectMeta().GetNamespace()
-	node := pod.Spec.NodeName
-	var cs []*ContainerMetric
-	for _, container := range pod.Spec.Containers {
-		containerName := container.Name
-		cs = append(cs, &ContainerMetric{
-			Namespace:     ns,
-			PodName:       podName,
-			ContainerName: containerName,
-			NodeName:      node,
-			ClusterID:     clusterID,
-			key:           containerMetricKey(ns, podName, containerName, node, clusterID),
-		})
-	}
-	return cs, nil
-}
-
-func newContainerMetricFromPrometheus(metrics map[string]interface{}, defaultClusterID string) (*ContainerMetric, error) {
-	cName, ok := metrics["container_name"]
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have container name")
-	}
-	containerName, ok := cName.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string container name")
-	}
-	pName, ok := metrics["pod_name"]
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have pod name")
-	}
-	podName, ok := pName.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string pod name")
-	}
-	ns, ok := metrics["namespace"]
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have namespace")
-	}
-	namespace, ok := ns.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string namespace")
-	}
-	node, ok := metrics["node"]
-	if !ok {
-		klog.V(4).Info("Prometheus vector does not have node name")
-		node = ""
-	}
-	nodeName, ok := node.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string node")
-	}
-	cid, ok := metrics["cluster_id"]
-	if !ok {
-		klog.V(4).Info("Prometheus vector does not have cluster id")
-		cid = defaultClusterID
-	}
-	clusterID, ok := cid.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
-	}
-	return &ContainerMetric{
-		ContainerName: containerName,
-		PodName:       podName,
-		Namespace:     namespace,
-		NodeName:      nodeName,
-		ClusterID:     clusterID,
-		key:           containerMetricKey(namespace, podName, containerName, nodeName, clusterID),
-	}, nil
-}
-
-type KeyTuple struct {
-	key    string
-	kIndex int
-	cIndex int
-}
-
-func (kt *KeyTuple) Namespace() string {
-	return kt.key[0 : kt.kIndex-1]
-}
-
-func (kt *KeyTuple) Key() string {
-	return kt.key[kt.kIndex : kt.cIndex-1]
-}
-
-func (kt *KeyTuple) ClusterID() string {
-	return kt.key[kt.cIndex:]
-}
-
-func NewKeyTuple(key string) (*KeyTuple, error) {
-	kIndex := strings.IndexRune(key, ',')
-	if kIndex < 0 {
-		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
-	}
-	kIndex += 1
-
-	subIndex := strings.IndexRune(key[kIndex:], ',')
-	if subIndex < 0 {
-		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
-	}
-	cIndex := kIndex + subIndex + 1
-
-	if strings.ContainsRune(key[cIndex:], ',') {
-		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
-	}
-
-	return &KeyTuple{
-		key:    key,
-		kIndex: kIndex,
-		cIndex: cIndex,
-	}, nil
-}
-
 func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
-	result, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of ContainerMetricVector
+	result, err := prom.NewQueryResults("ContainerMetricVector", qr)
 	if err != nil {
 		return nil, err
 	}
 
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result {
-		containerMetric, err := newContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+	for _, val := range result.Results {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}
@@ -2882,14 +2734,15 @@ func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue
 }
 
 func GetContainerMetricVectors(qr interface{}, defaultClusterID string) (map[string][]*util.Vector, error) {
-	result, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of ContainerMetricVectors
+	result, err := prom.NewQueryResults("ContainerMetricVectors", qr)
 	if err != nil {
 		return nil, err
 	}
 
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result {
-		containerMetric, err := newContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+	for _, val := range result.Results {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}
@@ -2899,14 +2752,15 @@ func GetContainerMetricVectors(qr interface{}, defaultClusterID string) (map[str
 }
 
 func GetNormalizedContainerMetricVectors(qr interface{}, normalizationValues []*util.Vector, defaultClusterID string) (map[string][]*util.Vector, error) {
-	result, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of NormalizedContainerMetricVectors
+	result, err := prom.NewQueryResults("NormalizedContainerMetricVectors", qr)
 	if err != nil {
 		return nil, err
 	}
 
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result {
-		containerMetric, err := newContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+	for _, val := range result.Results {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}

+ 7 - 4
pkg/costmodel/networkcosts.go

@@ -2,8 +2,9 @@ package costmodel
 
 import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
-	"k8s.io/klog"
 )
 
 // NetworkUsageVNetworkUsageDataector contains the network usage values for egress network traffic
@@ -138,12 +139,14 @@ func GetNetworkCost(usage *NetworkUsageData, cloud costAnalyzerCloud.Provider) (
 
 func getNetworkUsage(qr interface{}, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
 	ncdmap := make(map[string]*NetworkUsageVector)
-	result, err := NewQueryResults(qr)
+
+	// TODO: Pass actual query instead of NetworkUsage
+	result, err := prom.NewQueryResults("NetworkUsage", qr)
 	if err != nil {
 		return nil, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		podName, err := val.GetString("pod_name")
 		if err != nil {
 			return nil, err
@@ -156,7 +159,7 @@ func getNetworkUsage(qr interface{}, defaultClusterID string) (map[string]*Netwo
 
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
-			klog.V(4).Info("Prometheus vector does not have cluster id")
+			log.Debugf("Prometheus vector does not have cluster id")
 			clusterID = defaultClusterID
 		}
 

+ 40 - 193
pkg/costmodel/promparsers.go

@@ -2,182 +2,22 @@ package costmodel
 
 import (
 	"fmt"
-	"math"
-	"strconv"
-	"strings"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
-	"github.com/kubecost/cost-model/pkg/util"
-	"k8s.io/klog"
 )
 
-// PromQueryResult contains a single result from a prometheus query
-type PromQueryResult struct {
-	Metric map[string]interface{}
-	Values []*util.Vector
-}
-
-func (pqr *PromQueryResult) GetString(field string) (string, error) {
-	f, ok := pqr.Metric[field]
-	if !ok {
-		return "", fmt.Errorf("%s field does not exist in data result vector", field)
-	}
-
-	strField, ok := f.(string)
-	if !ok {
-		return "", fmt.Errorf("%s field is improperly formatted", field)
-	}
-
-	return strField, nil
-}
-
-func (pqr *PromQueryResult) GetLabels() map[string]string {
-	result := make(map[string]string)
-
-	// Find All keys with prefix label_, remove prefix, add to labels
-	for k, v := range pqr.Metric {
-		if !strings.HasPrefix(k, "label_") {
-			continue
-		}
-
-		label := k[6:]
-		value, ok := v.(string)
-		if !ok {
-			klog.V(3).Infof("Failed to parse label value for label: %s", label)
-			continue
-		}
-
-		result[label] = value
-	}
-
-	return result
-}
-
-// NewQueryResults accepts the raw prometheus query result and returns an array of
-// PromQueryResult objects
-func NewQueryResults(queryResult interface{}) ([]*PromQueryResult, error) {
-	var result []*PromQueryResult
-	if queryResult == nil {
-		return nil, prom.NewCommError("nil queryResult")
-	}
-	data, ok := queryResult.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(queryResult)
-		if err != nil {
-			return nil, err
-		}
-		return nil, fmt.Errorf(e)
-	}
-
-	// Deep Check for proper formatting
-	d, ok := data.(map[string]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
-	}
-	resultData, ok := d["result"]
-	if !ok {
-		return nil, fmt.Errorf("Result field not present in prometheus response")
-	}
-	resultsData, ok := resultData.([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
-	}
-
-	// Scan Results
-	for _, val := range resultsData {
-		resultInterface, ok := val.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Result is improperly formatted")
-		}
-
-		metricInterface, ok := resultInterface["metric"]
-		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
-		}
-
-		// Wrap execution of this lazily in case the data is not used
-		labels := func() string { return labelsForMetric(metricMap) }
-
-		// Determine if the result is a ranged data set or single value
-		_, isRange := resultInterface["values"]
-
-		var vectors []*util.Vector
-		if !isRange {
-			dataPoint, ok := resultInterface["value"]
-			if !ok {
-				return nil, fmt.Errorf("Value field does not exist in data result vector")
-			}
-
-			v, err := parseDataPoint(dataPoint, labels)
-			if err != nil {
-				return nil, err
-			}
-			vectors = append(vectors, v)
-		} else {
-			values, ok := resultInterface["values"].([]interface{})
-			if !ok {
-				return nil, fmt.Errorf("Values field is improperly formatted")
-			}
-
-			for _, value := range values {
-				v, err := parseDataPoint(value, labels)
-				if err != nil {
-					return nil, err
-				}
-
-				vectors = append(vectors, v)
-			}
-		}
-
-		result = append(result, &PromQueryResult{
-			Metric: metricMap,
-			Values: vectors,
-		})
-	}
-
-	return result, nil
-}
-
-func parseDataPoint(dataPoint interface{}, labels func() string) (*util.Vector, error) {
-	value, ok := dataPoint.([]interface{})
-	if !ok || len(value) != 2 {
-		return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-	}
-
-	strVal := value[1].(string)
-	v, err := strconv.ParseFloat(strVal, 64)
-	if err != nil {
-		return nil, err
-	}
-
-	// Test for +Inf and -Inf (sign: 0), Test for NaN
-	if math.IsInf(v, 0) {
-		klog.V(1).Infof("[Warning] Found Inf value parsing vector data point for metric: %s", labels())
-		v = 0.0
-	} else if math.IsNaN(v) {
-		klog.V(1).Infof("[Warning] Found NaN value parsing vector data point for metric: %s", labels())
-		v = 0.0
-	}
-
-	return &util.Vector{
-		Timestamp: math.Round(value[0].(float64)/10) * 10,
-		Value:     v,
-	}, nil
-}
-
 func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string]*PersistentVolumeClaimData)
-	result, err := NewQueryResults(qr)
+
+	// TODO: Pass actual query instead of PVInfo
+	result, err := prom.NewQueryResults("PVInfo", qr)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -195,14 +35,14 @@ func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentV
 
 		volumeName, err := val.GetString("volumename")
 		if err != nil {
-			klog.V(4).Infof("[Warning] Unfulfilled claim %s: volumename field does not exist in data result vector", pvcName)
+			log.Debugf("Unfulfilled claim %s: volumename field does not exist in data result vector", pvcName)
 			volumeName = ""
 		}
 
 		pvClass, err := val.GetString("storageclass")
 		if err != nil {
 			// TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
-			klog.V(2).Infof("[Warning] Storage Class not found for claim \"%s/%s\".", ns, pvcName)
+			log.Warningf("Storage Class not found for claim \"%s/%s\".", ns, pvcName)
 			pvClass = ""
 		}
 
@@ -222,12 +62,14 @@ func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentV
 
 func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string][]*PersistentVolumeClaimData)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of PVAllocationMetrics
+	result, err := prom.NewQueryResults("PVAllocationMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -250,7 +92,7 @@ func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (m
 
 		pvName, err := val.GetString("persistentvolume")
 		if err != nil {
-			klog.Infof("persistentvolume field does not exist for pv %s", pvcName) // This is possible for an unfulfilled claim
+			log.Warningf("persistentvolume field does not exist for pv %s", pvcName) // This is possible for an unfulfilled claim
 			continue
 		}
 
@@ -272,12 +114,14 @@ func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (m
 
 func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
 	toReturn := make(map[string]*costAnalyzerCloud.PV)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of PVCostMetrics
+	result, err := prom.NewQueryResults("PVCostMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -299,12 +143,14 @@ func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[str
 
 func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of NamespaceLabelsMetrics
+	result, err := prom.NewQueryResults("NamespaceLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Namespace and ClusterID for key generation purposes
 		ns, err := val.GetString("namespace")
 		if err != nil {
@@ -330,12 +176,14 @@ func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string)
 
 func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of PodLabelsMetrics
+	result, err := prom.NewQueryResults("PodLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Pod, Namespace and ClusterID for key generation purposes
 		pod, err := val.GetString("pod")
 		if err != nil {
@@ -368,12 +216,14 @@ func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[
 
 func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of StatefulsetMatchLabelsMetrics
+	result, err := prom.NewQueryResults("StatefulsetMatchLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Statefulset, Namespace and ClusterID for key generation purposes
 		ss, err := val.GetString("statefulSet")
 		if err != nil {
@@ -399,11 +249,13 @@ func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID
 
 func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
 	toReturn := make(map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of PodDaemonsetsWithMetrics
+	result, err := prom.NewQueryResults("PodDaemonsetsWithMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
-	for _, val := range result {
+	for _, val := range result.Results {
 		ds, err := val.GetString("owner_name")
 		if err != nil {
 			return toReturn, err
@@ -433,12 +285,14 @@ func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID strin
 
 func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of DeploymentMatchLabelsMetrics
+	result, err := prom.NewQueryResults("DeploymentMatchLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Deployment, Namespace and ClusterID for key generation purposes
 		deployment, err := val.GetString("deployment")
 		if err != nil {
@@ -464,12 +318,14 @@ func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID s
 
 func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of ServiceSelectorLabelsMetrics
+	result, err := prom.NewQueryResults("ServiceSelectorLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Service, Namespace and ClusterID for key generation purposes
 		service, err := val.GetString("service")
 		if err != nil {
@@ -492,12 +348,3 @@ func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID s
 
 	return toReturn, nil
 }
-
-func labelsForMetric(metricMap map[string]interface{}) string {
-	var pairs []string
-	for k, v := range metricMap {
-		pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
-	}
-
-	return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
-}

+ 2 - 2
pkg/costmodel/sql.go

@@ -140,7 +140,7 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 			return nil, err
 		}
 
-		k := newContainerMetricFromValues(namespace, pod, container, instance, clusterid)
+		k := NewContainerMetricFromValues(namespace, pod, container, instance, clusterid)
 		key := k.Key()
 		allocationVector := &util.Vector{
 			Timestamp: float64(t.Unix()),
@@ -211,7 +211,7 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 			return nil, err
 		}
 
-		k := newContainerMetricFromValues(namespace, pod, container, instance, clusterid)
+		k := NewContainerMetricFromValues(namespace, pod, container, instance, clusterid)
 		key := k.Key()
 		allocationVector := &util.Vector{
 			Timestamp: float64(t.Unix()),

+ 1 - 1
pkg/prom/query.go

@@ -66,7 +66,7 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 		raw, promErr := ctx.query(query)
 		ctx.ErrorCollector.Report(promErr)
 
-		results, parseErr := NewQueryResults(raw)
+		results, parseErr := NewQueryResults(query, raw)
 		ctx.ErrorCollector.Report(parseErr)
 
 		resCh <- results

+ 80 - 30
pkg/prom/result.go

@@ -1,23 +1,25 @@
 package prom
 
 import (
+	"errors"
 	"fmt"
 	"math"
 	"strconv"
 	"strings"
 
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
-	"k8s.io/klog"
 )
 
 // QueryResultsChan is a channel of query results
-type QueryResultsChan chan []*QueryResult
+type QueryResultsChan chan *QueryResults
 
 // Await returns query results, blocking until they are made available, and
 // deferring the closure of the underlying channel
 func (qrc QueryResultsChan) Await() []*QueryResult {
 	defer close(qrc)
-	return <-qrc
+	results := <-qrc
+	return results.Results
 }
 
 // QueryResult contains a single result from a prometheus query. It's common
@@ -27,13 +29,38 @@ type QueryResult struct {
 	Values []*util.Vector
 }
 
+// QueryResults contains all of the query results and the source query string.
+type QueryResults struct {
+	Query   string
+	Results []*QueryResult
+}
+
+var (
+	// Static Warnings for data point parsing
+	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
+	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
+
+	// Static Errors for query result parsing
+	QueryResultNilErr          error = NewCommError("nil queryResult")
+	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
+	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
+	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
+	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
+	ResultFormatErr            error = errors.New("Result is improperly formatted")
+	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
+	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
+	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
+	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
+	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
+)
+
 // NewQueryResults accepts the raw prometheus query result and returns an array of
 // QueryResult objects
-func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
-	var result []*QueryResult
+func NewQueryResults(query string, queryResult interface{}) (*QueryResults, error) {
 	if queryResult == nil {
-		return nil, NewCommError("nil queryResult")
+		return nil, QueryResultNilErr
 	}
+
 	data, ok := queryResult.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(queryResult)
@@ -46,35 +73,39 @@ func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
 	// Deep Check for proper formatting
 	d, ok := data.(map[string]interface{})
 	if !ok {
-		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
+		return nil, DataFieldFormatErr
 	}
 	resultData, ok := d["result"]
 	if !ok {
-		return nil, fmt.Errorf("Result field not present in prometheus response")
+		return nil, ResultFieldDoesNotExistErr
 	}
 	resultsData, ok := resultData.([]interface{})
 	if !ok {
-		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
+		return nil, ResultFieldFormatErr
 	}
 
-	// Scan Results
+	// Result vectors from the query
+	var results []*QueryResult
+
+	// Parse raw results and into QueryResults
 	for _, val := range resultsData {
 		resultInterface, ok := val.(map[string]interface{})
 		if !ok {
-			return nil, fmt.Errorf("Result is improperly formatted")
+			return nil, ResultFormatErr
 		}
 
 		metricInterface, ok := resultInterface["metric"]
 		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
+			return nil, MetricFieldDoesNotExistErr
 		}
 		metricMap, ok := metricInterface.(map[string]interface{})
 		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
+			return nil, MetricFieldFormatErr
 		}
 
-		// Wrap execution of this lazily in case the data is not used
-		labels := func() string { return labelsForMetric(metricMap) }
+		// Define label string for values to ensure that we only run labelsForMetric once
+		// if we receive multiple warnings.
+		var labelString string = ""
 
 		// Determine if the result is a ranged data set or single value
 		_, isRange := resultInterface["values"]
@@ -83,13 +114,18 @@ func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
 		if !isRange {
 			dataPoint, ok := resultInterface["value"]
 			if !ok {
-				return nil, fmt.Errorf("Value field does not exist in data result vector")
+				return nil, ValueFieldDoesNotExistErr
 			}
 
-			v, err := parseDataPoint(dataPoint, labels)
+			// Append new data point, log warnings
+			v, warn, err := parseDataPoint(dataPoint)
 			if err != nil {
 				return nil, err
 			}
+			if warn != nil {
+				log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
+			}
+
 			vectors = append(vectors, v)
 		} else {
 			values, ok := resultInterface["values"].([]interface{})
@@ -97,35 +133,45 @@ func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
 				return nil, fmt.Errorf("Values field is improperly formatted")
 			}
 
+			// Append new data points, log warnings
 			for _, value := range values {
-				v, err := parseDataPoint(value, labels)
+				v, warn, err := parseDataPoint(value)
 				if err != nil {
 					return nil, err
 				}
+				if warn != nil {
+					if labelString == "" {
+						labelString = labelsForMetric(metricMap)
+					}
+					log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
+				}
 
 				vectors = append(vectors, v)
 			}
 		}
 
-		result = append(result, &QueryResult{
+		results = append(results, &QueryResult{
 			Metric: metricMap,
 			Values: vectors,
 		})
 	}
 
-	return result, nil
+	return &QueryResults{
+		Query:   query,
+		Results: results,
+	}, nil
 }
 
 // GetString returns the requested field, or an error if it does not exist
 func (qr *QueryResult) GetString(field string) (string, error) {
 	f, ok := qr.Metric[field]
 	if !ok {
-		return "", fmt.Errorf("%s field does not exist in data result vector", field)
+		return "", fmt.Errorf("'%s' field does not exist in data result vector", field)
 	}
 
 	strField, ok := f.(string)
 	if !ok {
-		return "", fmt.Errorf("%s field is improperly formatted", field)
+		return "", fmt.Errorf("'%s' field is improperly formatted", field)
 	}
 
 	return strField, nil
@@ -144,7 +190,7 @@ func (qr *QueryResult) GetLabels() map[string]string {
 		label := k[6:]
 		value, ok := v.(string)
 		if !ok {
-			klog.V(3).Infof("Failed to parse label value for label: %s", label)
+			log.Warningf("Failed to parse label value for label: '%s'", label)
 			continue
 		}
 
@@ -154,31 +200,35 @@ func (qr *QueryResult) GetLabels() map[string]string {
 	return result
 }
 
-func parseDataPoint(dataPoint interface{}, labels func() string) (*util.Vector, error) {
+// parseDataPoint parses a data point from raw prometheus query results and returns
+// a new Vector instance containing the parsed data along with any warnings or errors.
+func parseDataPoint(dataPoint interface{}) (*util.Vector, warning, error) {
+	var w warning = nil
+
 	value, ok := dataPoint.([]interface{})
 	if !ok || len(value) != 2 {
-		return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		return nil, w, DataPointFormatErr
 	}
 
 	strVal := value[1].(string)
 	v, err := strconv.ParseFloat(strVal, 64)
 	if err != nil {
-		return nil, err
+		return nil, w, err
 	}
 
 	// Test for +Inf and -Inf (sign: 0), Test for NaN
 	if math.IsInf(v, 0) {
-		klog.V(1).Infof("[Warning] Found Inf value parsing vector data point for metric: %s", labels())
+		w = InfWarning
 		v = 0.0
 	} else if math.IsNaN(v) {
-		klog.V(1).Infof("[Warning] Found NaN value parsing vector data point for metric: %s", labels())
+		w = NaNWarning
 		v = 0.0
 	}
 
 	return &util.Vector{
 		Timestamp: math.Round(value[0].(float64)/10) * 10,
 		Value:     v,
-	}, nil
+	}, w, nil
 }
 
 func labelsForMetric(metricMap map[string]interface{}) string {
@@ -193,7 +243,7 @@ func labelsForMetric(metricMap map[string]interface{}) string {
 func wrapPrometheusError(qr interface{}) (string, error) {
 	e, ok := qr.(map[string]interface{})["error"]
 	if !ok {
-		return "", fmt.Errorf("Unexpected response from Prometheus")
+		return "", PromUnexpectedResponseErr
 	}
 	eStr, ok := e.(string)
 	return eStr, nil

+ 26 - 0
pkg/prom/warning.go

@@ -0,0 +1,26 @@
+package prom
+
+// warning represents an unexpected result that occurs but doesn't halt processing
+type warning interface {
+	Message() string
+}
+
+// defaultWarning is a simple implementation for warning
+type defaultWarning struct {
+	message string
+}
+
+// Message returns the message for the warning
+func (dw *defaultWarning) Message() string {
+	return dw.message
+}
+
+// Stringer implementation
+func (dw *defaultWarning) String() string {
+	return dw.message
+}
+
+// Creates a warning for the prom package. NOTE: We can make this less prom-centric if desirable.
+func newWarning(msg string) warning {
+	return &defaultWarning{msg}
+}