Przeglądaj źródła

Merge pull request #519 from kubecost/develop

Merge develop into master 1.62.0
Ajay Tripathy 5 lat temu
rodzic
commit
f6c6042a75

+ 1 - 2
go.mod

@@ -9,7 +9,6 @@ require (
 	github.com/Azure/go-autorest v11.3.2+incompatible
 	github.com/aws/aws-sdk-go v1.28.9
 	github.com/dimchansky/utfbom v1.1.0 // indirect
-	github.com/etcd-io/bbolt v1.3.3
 	github.com/getsentry/sentry-go v0.6.1
 	github.com/google/martian v2.1.0+incompatible // indirect
 	github.com/google/uuid v1.1.1
@@ -23,7 +22,7 @@ require (
 	github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
 	github.com/satori/go.uuid v1.2.0 // indirect
 	github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
-	go.etcd.io/bbolt v1.3.3 // indirect
+	go.etcd.io/bbolt v1.3.5
 	golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
 	golang.org/x/sync v0.0.0-20190423024810-112230192c58
 	google.golang.org/api v0.4.0

+ 5 - 1
go.sum

@@ -72,7 +72,6 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
 github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
 github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
-github.com/etcd-io/bbolt v1.3.3 h1:gSJmxrs37LgTqR/oyJBWok6k6SvXEUerFTbltIhXkBM=
 github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
 github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550 h1:mV9jbLoSW/8m4VK16ZkHTozJa8sesK5u5kTMFysTYac=
 github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
@@ -375,6 +374,8 @@ github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZ
 go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
 go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
+go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
 go.opencensus.io v0.19.1/go.mod h1:gug0GbSHa8Pafr0d2urOSgoXHZ6x/RUlaiT0d9pqb4A=
 go.opencensus.io v0.19.2 h1:ZZpq6xI6kv/LuE/5s5UQvBU5vMjvRnPb8PvJrIntAnc=
 go.opencensus.io v0.19.2/go.mod h1:NO/8qkisMZLZ1FCsKNqtJPwc8/TaclWyY0B6wcYNg9M=
@@ -466,6 +467,8 @@ golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
 golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
+golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -553,6 +556,7 @@ k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 h1:uV4S5IB5g4Nvi+TBVNf3e9
 k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA=
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6 h1:tGU1C/vMoUV2ZakSH6wQq2shk9KiFtjoH2vDDHlhpA4=
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6/go.mod h1:nL6pwRT8NgfF8TT68DBI8uEePRt89cSvoXUVqbkWHq4=
+k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5 h1:BwY2C//EoWktJi74O6R2REBonrhsfhRI0qfVwOjOPp8=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5/go.mod h1:bIEHXHbykaOlj+pgLllzLJ2RPGdzkjtqdk0Il07KPEM=
 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab h1:E8Fecph0qbNsAbijJJQryKu4Oi9QTp5cVpjTE+nqg6g=

+ 33 - 10
pkg/cloud/awsprovider.go

@@ -105,6 +105,8 @@ type AWS struct {
 	DownloadPricingDataLock     sync.RWMutex
 	Config                      *ProviderConfig
 	ServiceAccountChecks        map[string]*ServiceAccountCheck
+	clusterManagementPrice      float64
+	clusterProvisioner          string
 	*CustomProvider
 }
 
@@ -511,6 +513,10 @@ func (aws *AWS) isPreemptible(key string) bool {
 	return false
 }
 
+func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
+	return aws.clusterProvisioner, aws.clusterManagementPrice, nil
+}
+
 // DownloadPricingData fetches data from the AWS Pricing API
 func (aws *AWS) DownloadPricingData() error {
 	aws.DownloadPricingDataLock.Lock()
@@ -545,6 +551,13 @@ func (aws *AWS) DownloadPricingData() error {
 
 	inputkeys := make(map[string]bool)
 	for _, n := range nodeList {
+		if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
+			aws.clusterManagementPrice = 0.10
+			aws.clusterProvisioner = "EKS"
+		} else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
+			aws.clusterProvisioner = "KOPS"
+		}
+
 		labels := n.GetObjectMeta().GetLabels()
 		key := aws.GetKey(labels, n)
 		inputkeys[key.Features()] = true
@@ -990,16 +1003,9 @@ func (aws *AWS) NodePricing(k Key) (*Node, error) {
 			}, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
 		}
 		return aws.createNode(terms, usageType, k)
-	} else { // Fall back to base pricing if we can't find the key.
-		klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
-		return &Node{
-			Cost:             aws.BaseCPUPrice,
-			BaseCPUPrice:     aws.BaseCPUPrice,
-			BaseRAMPrice:     aws.BaseRAMPrice,
-			BaseGPUPrice:     aws.BaseGPUPrice,
-			UsageType:        usageType,
-			UsesBaseCPUPrice: true,
-		}, nil
+	} else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
+		return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
+
 	}
 }
 
@@ -1019,6 +1025,7 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		m["provider"] = "AWS"
 		m["id"] = env.GetClusterID()
 		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
+		m["provisioner"] = awsProvider.clusterProvisioner
 		return m, nil
 	}
 	makeStructure := func(clusterName string) (map[string]string, error) {
@@ -2395,3 +2402,19 @@ func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
 		Checks: checks,
 	}
 }
+
+func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
+	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
+}
+
+func (aws *AWS) ParseID(id string) string {
+	// It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
+	rx := regexp.MustCompile("aws://[^/]*/[^/]*/([^/]+)")
+	match := rx.FindStringSubmatch(id)
+	if len(match) < 2 {
+		log.Infof("awsprovider.ParseID: failed to parse %s", id)
+		return id
+	}
+
+	return match[1]
+}

+ 11 - 0
pkg/cloud/azureprovider.go

@@ -780,3 +780,14 @@ func (az *Azure) ServiceAccountStatus() *ServiceAccountStatus {
 		Checks: []*ServiceAccountCheck{},
 	}
 }
+func (*Azure) ClusterManagementPricing() (string, float64, error) {
+	return "", 0.0, nil
+}
+
+func (az *Azure) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
+	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
+}
+
+func (az *Azure) ParseID(id string) string {
+	return id
+}

+ 12 - 0
pkg/cloud/csvprovider.go

@@ -294,3 +294,15 @@ func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
 		Checks: []*ServiceAccountCheck{},
 	}
 }
+
+func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
+	return "", 0.0, nil
+}
+
+func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
+	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
+}
+
+func (c *CSVProvider) ParseID(id string) string {
+	return id
+}

+ 12 - 0
pkg/cloud/customprovider.go

@@ -38,6 +38,10 @@ type customProviderKey struct {
 	Labels         map[string]string
 }
 
+func (*CustomProvider) ClusterManagementPricing() (string, float64, error) {
+	return "", 0.0, nil
+}
+
 func (*CustomProvider) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
 	return ""
 }
@@ -268,3 +272,11 @@ func (cp *CustomProvider) ServiceAccountStatus() *ServiceAccountStatus {
 		Checks: []*ServiceAccountCheck{},
 	}
 }
+
+func (cp *CustomProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
+	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
+}
+
+func (cp *CustomProvider) ParseID(id string) string {
+	return id
+}

+ 45 - 0
pkg/cloud/gcpprovider.go

@@ -21,6 +21,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 
 	"golang.org/x/oauth2"
@@ -56,6 +57,8 @@ type GCP struct {
 	Config                  *ProviderConfig
 	serviceKeyProvided      bool
 	ValidPricingKeys        map[string]bool
+	clusterManagementPrice  float64
+	clusterProvisioner      string
 	*CustomProvider
 }
 
@@ -487,11 +490,16 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m := make(map[string]string)
 	m["name"] = attribute
 	m["provider"] = "GCP"
+	m["provisioner"] = gcp.clusterProvisioner
 	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 	return m, nil
 }
 
+func (gcp *GCP) ClusterManagementPricing() (string, float64, error) {
+	return gcp.clusterProvisioner, gcp.clusterManagementPrice, nil
+}
+
 func (*GCP) GetAddresses() ([]byte, error) {
 	// metadata API setup
 	metadataClient := metadata.NewClient(&http.Client{Transport: userAgentTransport{
@@ -944,6 +952,11 @@ func (gcp *GCP) DownloadPricingData() error {
 
 	for _, n := range nodeList {
 		labels := n.GetObjectMeta().GetLabels()
+		if _, ok := labels["cloud.google.com/gke-nodepool"]; ok { // The node is part of a GKE nodepool, so you're paying a cluster management cost
+			gcp.clusterManagementPrice = 0.10
+			gcp.clusterProvisioner = "GKE"
+		}
+
 		key := gcp.GetKey(labels, n)
 		inputkeys[key.Features()] = key
 	}
@@ -1385,3 +1398,35 @@ func (gcp *GCP) ServiceAccountStatus() *ServiceAccountStatus {
 		Checks: []*ServiceAccountCheck{},
 	}
 }
+
+func (gcp *GCP) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
+	class := strings.Split(instanceType, "-")[0]
+	return 1.0 - ((1.0 - sustainedUseDiscount(class, defaultDiscount, isPreemptible)) * (1.0 - negotiatedDiscount))
+}
+
+func sustainedUseDiscount(class string, defaultDiscount float64, isPreemptible bool) float64 {
+	if isPreemptible {
+		return 0.0
+	}
+	discount := defaultDiscount
+	switch class {
+	case "e2", "f1", "g1":
+		discount = 0.0
+	case "n2":
+		discount = 0.2
+	}
+	return discount
+}
+
+func (gcp *GCP) ParseID(id string) string {
+	// gce://guestbook-227502/us-central1-a/gke-niko-n1-standard-2-wljla-8df8e58a-hfy7
+	//  => gke-niko-n1-standard-2-wljla-8df8e58a-hfy7
+	rx := regexp.MustCompile("gce://[^/]*/[^/]*/([^/]+)")
+	match := rx.FindStringSubmatch(id)
+	if len(match) < 2 {
+		log.Infof("gcpprovider.ParseID: failed to parse %s", id)
+		return id
+	}
+
+	return match[1]
+}

+ 3 - 0
pkg/cloud/provider.go

@@ -188,6 +188,9 @@ type Provider interface {
 	ExternalAllocations(string, string, []string, string, string, bool) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 	ServiceAccountStatus() *ServiceAccountStatus
+	ClusterManagementPricing() (string, float64, error)
+	CombinedDiscountForNode(string, bool, float64, float64) float64
+	ParseID(string) string
 }
 
 // ClusterName returns the name defined in cluster info, defaulting to the

+ 1 - 1
pkg/clustermanager/boltdbstorage.go

@@ -1,7 +1,7 @@
 package clustermanager
 
 import (
-	bolt "github.com/etcd-io/bbolt"
+	bolt "go.etcd.io/bbolt"
 	_ "k8s.io/klog"
 )
 

+ 99 - 72
pkg/costmodel/cluster.go

@@ -123,7 +123,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
 	// but more expensive queries, and vice-a-versa.
-	minsPerResolution := 1
+	minsPerResolution := 5
 
 	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
 	// value, converts it to a cumulative value; i.e.
@@ -144,10 +144,10 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
 	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
 
-	resPVCost := resChPVCost.Await()
-	resPVSize := resChPVSize.Await()
-	resLocalStorageCost := resChLocalStorageCost.Await()
-	resLocalStorageBytes := resChLocalStorageBytes.Await()
+	resPVCost, _ := resChPVCost.Await()
+	resPVSize, _ := resChPVSize.Await()
+	resLocalStorageCost, _ := resChLocalStorageCost.Await()
+	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 	}
@@ -176,7 +176,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 				Name:    name,
 			}
 		}
-		diskMap[key].Cost = cost
+		diskMap[key].Cost += cost
 	}
 
 	for _, result := range resPVSize {
@@ -227,7 +227,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 				Local:   true,
 			}
 		}
-		diskMap[key].Cost = cost
+		diskMap[key].Cost += cost
 	}
 
 	for _, result := range resLocalStorageBytes {
@@ -283,7 +283,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
 	// but more expensive queries, and vice-a-versa.
-	minsPerResolution := 1
+	minsPerResolution := 5
 
 	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
 	// value, converts it to a cumulative value; i.e.
@@ -291,11 +291,11 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
 
 	ctx := prom.NewContext(client)
-	queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node, provider_id))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 
 	resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
@@ -305,12 +305,12 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
 	resChNodeLabels := ctx.Query(queryNodeLabels)
 
-	resNodeCPUCost := resChNodeCPUCost.Await()
-	resNodeCPUCores := resChNodeCPUCores.Await()
-	resNodeGPUCost := resChNodeGPUCost.Await()
-	resNodeRAMCost := resChNodeRAMCost.Await()
-	resNodeRAMBytes := resChNodeRAMBytes.Await()
-	resNodeLabels := resChNodeLabels.Await()
+	resNodeCPUCost, _ := resChNodeCPUCost.Await()
+	resNodeCPUCores, _ := resChNodeCPUCores.Await()
+	resNodeGPUCost, _ := resChNodeGPUCost.Await()
+	resNodeRAMCost, _ := resChNodeRAMCost.Await()
+	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
+	resNodeLabels, _ := resChNodeLabels.Await()
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 	}
@@ -329,22 +329,21 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 			continue
 		}
 
-		nodeType, err := result.GetString("instance_type")
-		if err != nil {
-			log.Warningf("ClusterNodes: CPU cost data missing node type")
-		}
+		nodeType, _ := result.GetString("instance_type")
+		providerID, _ := result.GetString("provider_id")
 
 		cpuCost := result.Values[0].Value
 
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster:  cluster,
-				Name:     name,
-				NodeType: nodeType,
+				Cluster:    cluster,
+				Name:       name,
+				NodeType:   nodeType,
+				ProviderID: cp.ParseID(providerID),
 			}
 		}
-		nodeMap[key].CPUCost = cpuCost
+		nodeMap[key].CPUCost += cpuCost
 		nodeMap[key].NodeType = nodeType
 	}
 
@@ -384,22 +383,21 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 			continue
 		}
 
-		nodeType, err := result.GetString("instance_type")
-		if err != nil {
-			log.Warningf("ClusterNodes: RAM cost data missing node type")
-		}
+		nodeType, _ := result.GetString("instance_type")
+		providerID, _ := result.GetString("provider_id")
 
 		ramCost := result.Values[0].Value
 
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster:  cluster,
-				Name:     name,
-				NodeType: nodeType,
+				Cluster:    cluster,
+				Name:       name,
+				NodeType:   nodeType,
+				ProviderID: cp.ParseID(providerID),
 			}
 		}
-		nodeMap[key].RAMCost = ramCost
+		nodeMap[key].RAMCost += ramCost
 		nodeMap[key].NodeType = nodeType
 	}
 
@@ -439,19 +437,24 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 			continue
 		}
 
+		nodeType, _ := result.GetString("instance_type")
+		providerID, _ := result.GetString("provider_id")
+
 		gpuCost := result.Values[0].Value
 
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:    cluster,
+				Name:       name,
+				NodeType:   nodeType,
+				ProviderID: cp.ParseID(providerID),
 			}
 		}
-		nodeMap[key].GPUCost = gpuCost
+		nodeMap[key].GPUCost += gpuCost
 	}
 
-	// node_labels label_cloud_google_com_gke_preemptible
+	// Determine preemptibility with node labels
 	for _, result := range resNodeLabels {
 		nodeName, err := result.GetString("node")
 		if err != nil {
@@ -460,11 +463,17 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 
 		// GCP preemptible label
 		pre, _ := result.GetString("label_cloud_google_com_gke_preemptible")
-		if node, ok := nodeMap[nodeName]; pre == "true" && ok {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+		key := fmt.Sprintf("%s/%s", cluster, nodeName)
+		if node, ok := nodeMap[key]; pre == "true" && ok {
 			node.Preemptible = true
 		}
 
 		// TODO AWS preemptible
+
 		// TODO Azure preemptible
 	}
 
@@ -472,24 +481,20 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	if err != nil {
 		return nil, []error{err}
 	}
+
 	discount, err := ParsePercentString(c.Discount)
 	if err != nil {
 		return nil, []error{err}
 	}
+
 	negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
 	if err != nil {
 		return nil, []error{err}
 	}
 
 	for _, node := range nodeMap {
-		if !node.Preemptible {
-			// TODO determine discount(s) based on:
-			// - custom settings
-			// - node RI data
-			// - provider-specific rules, e.g.
-			//   cp.GetDiscount(instanceType string) float64
-			node.Discount = (1.0 - (1.0-discount)*(1.0-negotiatedDiscount))
-		}
+		// TODO take RI into account
+		node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
 	}
 
 	return nodeMap, nil
@@ -624,10 +629,19 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		resChs = append(resChs, bdResChs...)
 	}
 
+	resDataCount, _ := resChs[0].Await()
+	resTotalGPU, _ := resChs[1].Await()
+	resTotalCPU, _ := resChs[2].Await()
+	resTotalRAM, _ := resChs[3].Await()
+	resTotalStorage, _ := resChs[4].Await()
+	if ctx.HasErrors() {
+		return nil, ctx.Errors()[0]
+	}
+
 	defaultClusterID := env.GetClusterID()
 
 	dataMinsByCluster := map[string]float64{}
-	for _, result := range resChs[0].Await() {
+	for _, result := range resDataCount {
 		clusterID, _ := result.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -676,20 +690,31 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		}
 	}
 	// Apply both sustained use and custom discounts to RAM and CPU
-	setCostsFromResults(costData, resChs[2].Await(), "cpu", discount, customDiscount)
-	setCostsFromResults(costData, resChs[3].Await(), "ram", discount, customDiscount)
+	setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
+	setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
 	// Apply only custom discount to GPU and storage
-	setCostsFromResults(costData, resChs[1].Await(), "gpu", 0.0, customDiscount)
-	setCostsFromResults(costData, resChs[4].Await(), "storage", 0.0, customDiscount)
+	setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
+	setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
 	if queryTotalLocalStorage != "" {
-		setCostsFromResults(costData, resChs[5].Await(), "localstorage", 0.0, customDiscount)
+		resTotalLocalStorage, err := resChs[5].Await()
+		if err != nil {
+			return nil, err
+		}
+		setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
 	}
 
 	cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	pvUsedCostMap := map[string]float64{}
 	if withBreakdown {
-		for _, result := range resChs[6].Await() {
+		resCPUModePct, _ := resChs[6].Await()
+		resRAMSystemPct, _ := resChs[7].Await()
+		resRAMUserPct, _ := resChs[8].Await()
+		if ctx.HasErrors() {
+			return nil, ctx.Errors()[0]
+		}
+
+		for _, result := range resCPUModePct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -717,7 +742,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			}
 		}
 
-		for _, result := range resChs[7].Await() {
+		for _, result := range resRAMSystemPct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -728,7 +753,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			ramBD := ramBreakdownMap[clusterID]
 			ramBD.System += result.Values[0].Value
 		}
-		for _, result := range resChs[8].Await() {
+		for _, result := range resRAMUserPct {
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 				clusterID = defaultClusterID
@@ -748,7 +773,11 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		}
 
 		if queryUsedLocalStorage != "" {
-			for _, result := range resChs[9].Await() {
+			resUsedLocalStorage, err := resChs[9].Await()
+			if err != nil {
+				return nil, err
+			}
+			for _, result := range resUsedLocalStorage {
 				clusterID, _ := result.GetString("cluster_id")
 				if clusterID == "" {
 					clusterID = defaultClusterID
@@ -804,20 +833,12 @@ type Totals struct {
 	StorageCost [][]string `json:"storageCost"`
 }
 
-func resultToTotals(qr interface{}) ([][]string, error) {
-	// TODO: Provide an actual query instead of resultToTotals
-	qResults, err := prom.NewQueryResults("resultToTotals", qr)
-	if err != nil {
-		return nil, err
-	}
-
-	results := qResults.Results
-
-	if len(results) == 0 {
+func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
+	if len(qrs) == 0 {
 		return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
 	}
 
-	result := results[0]
+	result := qrs[0]
 	totals := [][]string{}
 	for _, value := range result.Values {
 		d0 := fmt.Sprintf("%f", value.Timestamp)
@@ -866,22 +887,28 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 
-	resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
+	ctx := prom.NewContext(cli)
+	resChClusterCores := ctx.QueryRange(qCores, start, end, window)
+	resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
+	resChStorage := ctx.QueryRange(qStorage, start, end, window)
+	resChTotal := ctx.QueryRange(qTotal, start, end, window)
+
+	resultClusterCores, err := resChClusterCores.Await()
 	if err != nil {
 		return nil, err
 	}
 
-	resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
+	resultClusterRAM, err := resChClusterRAM.Await()
 	if err != nil {
 		return nil, err
 	}
 
-	resultStorage, err := QueryRange(cli, qStorage, start, end, window)
+	resultStorage, err := resChStorage.Await()
 	if err != nil {
 		return nil, err
 	}
 
-	resultTotal, err := QueryRange(cli, qTotal, start, end, window)
+	resultTotal, err := resChTotal.Await()
 	if err != nil {
 		return nil, err
 	}
@@ -910,7 +937,7 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 		// If that fails, return an error because something is actually wrong.
 		qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
 
-		resultNodes, err := QueryRange(cli, qNodes, start, end, window)
+		resultNodes, err := ctx.QueryRangeSync(qNodes, start, end, window)
 		if err != nil {
 			return nil, err
 		}

Plik diff jest za duży
+ 238 - 677
pkg/costmodel/costmodel.go


+ 441 - 68
pkg/costmodel/metrics.go

@@ -1,54 +1,70 @@
 package costmodel
 
 import (
+	"math"
 	"regexp"
 	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
 
+	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/prometheus/client_golang/prometheus"
 	dto "github.com/prometheus/client_model/go"
+	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/client-go/kubernetes"
+
+	"k8s.io/klog"
 )
 
 var (
 	invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
 )
 
-func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string) {
-	labelKeys := make([]string, 0, len(labels))
-	for k := range labels {
-		labelKeys = append(labelKeys, k)
-	}
-	sort.Strings(labelKeys)
-
-	labelValues := make([]string, 0, len(labels))
-	for i, k := range labelKeys {
-		labelKeys[i] = "label_" + SanitizeLabelName(k)
-		labelValues = append(labelValues, labels[k])
-	}
-	return labelKeys, labelValues
-}
-
-func SanitizeLabelName(s string) string {
-	return invalidLabelCharRE.ReplaceAllString(s, "_")
-}
+//--------------------------------------------------------------------------
+//  StatefulsetCollector
+//--------------------------------------------------------------------------
 
+// StatefulsetCollector is a prometheus collector that generates StatefulsetMetrics
 type StatefulsetCollector struct {
 	KubeClientSet kubernetes.Interface
 }
 
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
 func (sc StatefulsetCollector) Describe(ch chan<- *prometheus.Desc) {
 	ch <- prometheus.NewDesc("statefulSet_match_labels", "statfulSet match labels", []string{}, nil)
 }
 
-type DeploymentCollector struct {
-	KubeClientSet kubernetes.Interface
+// Collect is called by the Prometheus registry when collecting metrics.
+func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
+	ds, _ := sc.KubeClientSet.AppsV1().StatefulSets("").List(metav1.ListOptions{})
+	for _, statefulset := range ds.Items {
+		labels, values := kubeLabelsToPrometheusLabels(statefulset.Spec.Selector.MatchLabels)
+		m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
+		ch <- m
+	}
 }
 
-func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
-	ch <- prometheus.NewInvalidDesc(nil)
+//--------------------------------------------------------------------------
+//  StatefulsetMetric
+//--------------------------------------------------------------------------
+
+// StatefulsetMetric is a prometheus.Metric used to encode statefulset match labels
+type StatefulsetMetric struct {
+	fqName          string
+	help            string
+	labelNames      []string
+	labelValues     []string
+	statefulsetName string
+	namespace       string
 }
 
+// Creates a new StatefulsetMetric, implementation of prometheus.Metric
 func newStatefulsetMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) StatefulsetMetric {
 	return StatefulsetMetric{
 		fqName:          fqname,
@@ -60,20 +76,15 @@ func newStatefulsetMetric(name, namespace, fqname string, labelNames []string, l
 	}
 }
 
-type StatefulsetMetric struct {
-	fqName          string
-	help            string
-	labelNames      []string
-	labelValues     []string
-	statefulsetName string
-	namespace       string
-}
-
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
 func (s StatefulsetMetric) Desc() *prometheus.Desc {
 	l := prometheus.Labels{"statefulSet": s.statefulsetName, "namespace": s.namespace}
 	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
 }
 
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
 func (s StatefulsetMetric) Write(m *dto.Metric) error {
 	h := float64(1)
 	m.Gauge = &dto.Gauge{
@@ -100,15 +111,46 @@ func (s StatefulsetMetric) Write(m *dto.Metric) error {
 	return nil
 }
 
-func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
-	ds, _ := sc.KubeClientSet.AppsV1().StatefulSets("").List(metav1.ListOptions{})
-	for _, statefulset := range ds.Items {
-		labels, values := kubeLabelsToPrometheusLabels(statefulset.Spec.Selector.MatchLabels)
-		m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
+//--------------------------------------------------------------------------
+//  DeploymentCollector
+//--------------------------------------------------------------------------
+
+// DeploymentCollector is a prometheus collector that generates DeploymentMetrics
+type DeploymentCollector struct {
+	KubeClientSet kubernetes.Interface
+}
+
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
+func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("deployment_match_labels", "deployment match labels", []string{}, nil)
+}
+
+// Collect is called by the Prometheus registry when collecting metrics.
+func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
+	ds, _ := sc.KubeClientSet.AppsV1().Deployments("").List(metav1.ListOptions{})
+	for _, deployment := range ds.Items {
+		labels, values := kubeLabelsToPrometheusLabels(deployment.Spec.Selector.MatchLabels)
+		m := newDeploymentMetric(deployment.GetName(), deployment.GetNamespace(), "deployment_match_labels", labels, values)
 		ch <- m
 	}
 }
 
+//--------------------------------------------------------------------------
+//  DeploymentMetric
+//--------------------------------------------------------------------------
+
+// DeploymentMetric is a prometheus.Metric used to encode deployment match labels
+type DeploymentMetric struct {
+	fqName         string
+	help           string
+	labelNames     []string
+	labelValues    []string
+	deploymentName string
+	namespace      string
+}
+
+// Creates a new DeploymentMetric, implementation of prometheus.Metric
 func newDeploymentMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) DeploymentMetric {
 	return DeploymentMetric{
 		fqName:         fqname,
@@ -120,20 +162,15 @@ func newDeploymentMetric(name, namespace, fqname string, labelNames []string, la
 	}
 }
 
-type DeploymentMetric struct {
-	fqName         string
-	help           string
-	labelNames     []string
-	labelValues    []string
-	deploymentName string
-	namespace      string
-}
-
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
 func (s DeploymentMetric) Desc() *prometheus.Desc {
 	l := prometheus.Labels{"deployment": s.deploymentName, "namespace": s.namespace}
 	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
 }
 
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
 func (s DeploymentMetric) Write(m *dto.Metric) error {
 	h := float64(1)
 	m.Gauge = &dto.Gauge{
@@ -160,34 +197,36 @@ func (s DeploymentMetric) Write(m *dto.Metric) error {
 	return nil
 }
 
-func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
-	ds, _ := sc.KubeClientSet.AppsV1().Deployments("").List(metav1.ListOptions{})
-	for _, deployment := range ds.Items {
-		labels, values := kubeLabelsToPrometheusLabels(deployment.Spec.Selector.MatchLabels)
-		m := newDeploymentMetric(deployment.GetName(), deployment.GetNamespace(), "deployment_match_labels", labels, values)
-		ch <- m
-	}
-}
+//--------------------------------------------------------------------------
+//  ServiceCollector
+//--------------------------------------------------------------------------
 
+// ServiceCollector is a prometheus collector that generates ServiceMetrics
 type ServiceCollector struct {
 	KubeClientSet kubernetes.Interface
 }
 
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
 func (sc ServiceCollector) Describe(ch chan<- *prometheus.Desc) {
-	return
+	ch <- prometheus.NewDesc("service_selector_labels", "service selector labels", []string{}, nil)
 }
 
-func newServiceMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) ServiceMetric {
-	return ServiceMetric{
-		fqName:      fqname,
-		labelNames:  labelNames,
-		labelValues: labelvalues,
-		help:        "service_selector_labels Service Selector Labels",
-		serviceName: name,
-		namespace:   namespace,
+// Collect is called by the Prometheus registry when collecting metrics.
+func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
+	svcs, _ := sc.KubeClientSet.CoreV1().Services("").List(metav1.ListOptions{})
+	for _, svc := range svcs.Items {
+		labels, values := kubeLabelsToPrometheusLabels(svc.Spec.Selector)
+		m := newServiceMetric(svc.GetName(), svc.GetNamespace(), "service_selector_labels", labels, values)
+		ch <- m
 	}
 }
 
+//--------------------------------------------------------------------------
+//  ServiceMetric
+//--------------------------------------------------------------------------
+
+// ServiceMetric is a prometheus.Metric used to encode service selector labels
 type ServiceMetric struct {
 	fqName      string
 	help        string
@@ -197,11 +236,27 @@ type ServiceMetric struct {
 	namespace   string
 }
 
+// Creates a new ServiceMetric, implementation of prometheus.Metric
+func newServiceMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) ServiceMetric {
+	return ServiceMetric{
+		fqName:      fqname,
+		labelNames:  labelNames,
+		labelValues: labelvalues,
+		help:        "service_selector_labels Service Selector Labels",
+		serviceName: name,
+		namespace:   namespace,
+	}
+}
+
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
 func (s ServiceMetric) Desc() *prometheus.Desc {
 	l := prometheus.Labels{"service": s.serviceName, "namespace": s.namespace}
 	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
 }
 
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
 func (s ServiceMetric) Write(m *dto.Metric) error {
 	h := float64(1)
 	m.Gauge = &dto.Gauge{
@@ -228,11 +283,329 @@ func (s ServiceMetric) Write(m *dto.Metric) error {
 	return nil
 }
 
-func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
-	svcs, _ := sc.KubeClientSet.CoreV1().Services("").List(metav1.ListOptions{})
-	for _, svc := range svcs.Items {
-		labels, values := kubeLabelsToPrometheusLabels(svc.Spec.Selector)
-		m := newServiceMetric(svc.GetName(), svc.GetNamespace(), "service_selector_labels", labels, values)
-		ch <- m
+//--------------------------------------------------------------------------
+//  Package Functions
+//--------------------------------------------------------------------------
+
+var (
+	recordingLock     sync.Mutex
+	recordingStopping bool
+	recordingStop     chan bool
+)
+
+// Checks to see if there is a metric recording stop channel. If it exists, a new
+// channel is not created and false is returned. If it doesn't exist, a new channel
+// is created and true is returned.
+func checkOrCreateRecordingChan() bool {
+	recordingLock.Lock()
+	defer recordingLock.Unlock()
+
+	if recordingStop != nil {
+		return false
 	}
+
+	recordingStop = make(chan bool, 1)
+	return true
+}
+
+// IsCostModelMetricRecordingRunning returns true if metric recording is still running.
+func IsCostModelMetricRecordingRunning() bool {
+	recordingLock.Lock()
+	defer recordingLock.Unlock()
+
+	return recordingStop != nil
+}
+
+// StartCostModelMetricRecording starts the go routine that emits metrics used to determine
+// cluster costs.
+func StartCostModelMetricRecording(a *Accesses) bool {
+	// Check to see if we're already recording
+	// This function will create the stop recording channel and return true
+	// if it doesn't exist.
+	if !checkOrCreateRecordingChan() {
+		log.Errorf("Attempted to start cost model metric recording when it's already running.")
+		return false
+	}
+
+	go func() {
+		defer errors.HandlePanic()
+
+		containerSeen := make(map[string]bool)
+		nodeSeen := make(map[string]bool)
+		pvSeen := make(map[string]bool)
+		pvcSeen := make(map[string]bool)
+
+		getKeyFromLabelStrings := func(labels ...string) string {
+			return strings.Join(labels, ",")
+		}
+		getLabelStringsFromKey := func(key string) []string {
+			return strings.Split(key, ",")
+		}
+
+		var defaultRegion string = ""
+		nodeList := a.Model.Cache.GetAllNodes()
+		if len(nodeList) > 0 {
+			defaultRegion = nodeList[0].Labels[v1.LabelZoneRegion]
+		}
+
+		for {
+			klog.V(4).Info("Recording prices...")
+			podlist := a.Model.Cache.GetAllPods()
+			podStatus := make(map[string]v1.PodPhase)
+			for _, pod := range podlist {
+				podStatus[pod.Name] = pod.Status.Phase
+			}
+
+			cfg, _ := a.Cloud.GetConfig()
+
+			provisioner, clusterManagementCost, err := a.Cloud.ClusterManagementPricing()
+			if err != nil {
+				klog.V(1).Infof("Error getting cluster management cost %s", err.Error())
+			}
+			a.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
+
+			// Record network pricing at global scope
+			networkCosts, err := a.Cloud.NetworkPricing()
+			if err != nil {
+				klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
+			} else {
+				a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
+				a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
+				a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
+			}
+
+			data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
+			if err != nil {
+				klog.V(1).Info("Error in price recording: " + err.Error())
+				// zero the for loop so the time.Sleep will still work
+				data = map[string]*CostData{}
+			}
+
+			nodes, err := a.Model.GetNodeCost(a.Cloud)
+			for nodeName, node := range nodes {
+				// Emit costs, guarding against NaN inputs for custom pricing.
+				cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
+				if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
+					cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
+					if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
+						cpuCost = 0
+					}
+				}
+				cpu, _ := strconv.ParseFloat(node.VCPU, 64)
+				if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
+					cpu = 1 // Assume 1 CPU
+				}
+				ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
+				if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
+					ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
+					if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
+						ramCost = 0
+					}
+				}
+				ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
+				if math.IsNaN(ram) || math.IsInf(ram, 0) {
+					ram = 0
+				}
+				gpu, _ := strconv.ParseFloat(node.GPU, 64)
+				if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
+					gpu = 0
+				}
+				gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
+				if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
+					gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
+					if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
+						gpuCost = 0
+					}
+				}
+				nodeType := node.InstanceType
+				nodeRegion := node.Region
+
+				totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
+
+				a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
+				a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
+				a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
+				a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
+				labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
+				nodeSeen[labelKey] = true
+			}
+
+			for _, costs := range data {
+				nodeName := costs.NodeName
+
+				namespace := costs.Namespace
+				podName := costs.PodName
+				containerName := costs.Name
+
+				if costs.PVCData != nil {
+					for _, pvc := range costs.PVCData {
+						if pvc.Volume != nil {
+							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
+							labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
+							pvcSeen[labelKey] = true
+						}
+					}
+				}
+
+				if len(costs.RAMAllocation) > 0 {
+					a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
+				}
+				if len(costs.CPUAllocation) > 0 {
+					a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
+				}
+				if len(costs.GPUReq) > 0 {
+					// allocation here is set to the request because shared GPU usage not yet supported.
+					a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
+				}
+				labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
+				if podStatus[podName] == v1.PodRunning { // Only report data for current pods
+					containerSeen[labelKey] = true
+				} else {
+					containerSeen[labelKey] = false
+				}
+
+				storageClasses := a.Model.Cache.GetAllStorageClasses()
+				storageClassMap := make(map[string]map[string]string)
+				for _, storageClass := range storageClasses {
+					params := storageClass.Parameters
+					storageClassMap[storageClass.ObjectMeta.Name] = params
+					if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
+						storageClassMap["default"] = params
+						storageClassMap[""] = params
+					}
+				}
+
+				pvs := a.Model.Cache.GetAllPersistentVolumes()
+				for _, pv := range pvs {
+					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
+					if !ok {
+						klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
+					}
+					var region string
+					if r, ok := pv.Labels[v1.LabelZoneRegion]; ok {
+						region = r
+					} else {
+						region = defaultRegion
+					}
+					cacPv := &costAnalyzerCloud.PV{
+						Class:      pv.Spec.StorageClassName,
+						Region:     region,
+						Parameters: parameters,
+					}
+					GetPVCost(cacPv, pv, a.Cloud, region)
+					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
+					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
+					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
+					pvSeen[labelKey] = true
+				}
+			}
+			for labelString, seen := range nodeSeen {
+				if !seen {
+					klog.Infof("Removing %s from nodes", labelString)
+					labels := getLabelStringsFromKey(labelString)
+					ok := a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from totalprice", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
+					}
+					ok = a.CPUPriceRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from cpuprice", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
+					}
+					ok = a.GPUPriceRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from gpuprice", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
+					}
+					ok = a.RAMPriceRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from ramprice", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
+					}
+					delete(nodeSeen, labelString)
+				} else {
+					nodeSeen[labelString] = false
+				}
+			}
+			for labelString, seen := range containerSeen {
+				if !seen {
+					labels := getLabelStringsFromKey(labelString)
+					a.RAMAllocationRecorder.DeleteLabelValues(labels...)
+					a.CPUAllocationRecorder.DeleteLabelValues(labels...)
+					a.GPUAllocationRecorder.DeleteLabelValues(labels...)
+					delete(containerSeen, labelString)
+				} else {
+					containerSeen[labelString] = false
+				}
+			}
+			for labelString, seen := range pvSeen {
+				if !seen {
+					labels := getLabelStringsFromKey(labelString)
+					a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
+					delete(pvSeen, labelString)
+				} else {
+					pvSeen[labelString] = false
+				}
+			}
+			for labelString, seen := range pvcSeen {
+				if !seen {
+					labels := getLabelStringsFromKey(labelString)
+					a.PVAllocationRecorder.DeleteLabelValues(labels...)
+					delete(pvcSeen, labelString)
+				} else {
+					pvcSeen[labelString] = false
+				}
+			}
+
+			select {
+			case <-time.After(time.Minute):
+			case <-recordingStop:
+				recordingLock.Lock()
+				recordingStopping = false
+				recordingStop = nil
+				recordingLock.Unlock()
+				return
+			}
+		}
+	}()
+
+	return true
+}
+
+// StopCostModelMetricRecording halts the metrics emission loop after the current emission is completed
+// or if the emission is paused.
+func StopCostModelMetricRecording() {
+	recordingLock.Lock()
+	defer recordingLock.Unlock()
+
+	if !recordingStopping && recordingStop != nil {
+		recordingStopping = true
+		close(recordingStop)
+	}
+}
+
+// Converts kubernetes labels into prometheus labels.
+func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string) {
+	labelKeys := make([]string, 0, len(labels))
+	for k := range labels {
+		labelKeys = append(labelKeys, k)
+	}
+	sort.Strings(labelKeys)
+
+	labelValues := make([]string, 0, len(labels))
+	for i, k := range labelKeys {
+		labelKeys[i] = "label_" + SanitizeLabelName(k)
+		labelValues = append(labelValues, labels[k])
+	}
+	return labelKeys, labelValues
+}
+
+// Replaces all illegal prometheus label characters with _
+func SanitizeLabelName(s string) string {
+	return invalidLabelCharRE.ReplaceAllString(s, "_")
 }

+ 3 - 9
pkg/costmodel/networkcosts.go

@@ -27,7 +27,7 @@ type NetworkUsageVector struct {
 
 // GetNetworkUsageData performs a join of the the results of zone, region, and internet usage queries to return a single
 // map containing network costs for each namespace+pod
-func GetNetworkUsageData(zr interface{}, rr interface{}, ir interface{}, defaultClusterID string) (map[string]*NetworkUsageData, error) {
+func GetNetworkUsageData(zr []*prom.QueryResult, rr []*prom.QueryResult, ir []*prom.QueryResult, defaultClusterID string) (map[string]*NetworkUsageData, error) {
 	zoneNetworkMap, err := getNetworkUsage(zr, defaultClusterID)
 	if err != nil {
 		return nil, err
@@ -137,16 +137,10 @@ func GetNetworkCost(usage *NetworkUsageData, cloud costAnalyzerCloud.Provider) (
 	return results, nil
 }
 
-func getNetworkUsage(qr interface{}, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
+func getNetworkUsage(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
 	ncdmap := make(map[string]*NetworkUsageVector)
 
-	// TODO: Pass actual query instead of NetworkUsage
-	result, err := prom.NewQueryResults("NetworkUsage", qr)
-	if err != nil {
-		return nil, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		podName, err := val.GetString("pod_name")
 		if err != nil {
 			return nil, err

+ 123 - 78
pkg/costmodel/promparsers.go

@@ -1,23 +1,21 @@
 package costmodel
 
 import (
+	"errors"
 	"fmt"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/util"
 )
 
-func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
-	toReturn := make(map[string]*PersistentVolumeClaimData)
+// TODO niko/prom move parsing functions from costmodel.go
 
-	// TODO: Pass actual query instead of PVInfo
-	result, err := prom.NewQueryResults("PVInfo", qr)
-	if err != nil {
-		return toReturn, err
-	}
+func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
+	toReturn := make(map[string]*PersistentVolumeClaimData)
 
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -60,16 +58,10 @@ func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentV
 	return toReturn, nil
 }
 
-func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
+func GetPVAllocationMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string][]*PersistentVolumeClaimData)
 
-	// TODO: Pass actual query instead of PVAllocationMetrics
-	result, err := prom.NewQueryResults("PVAllocationMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -112,16 +104,10 @@ func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (m
 	return toReturn, nil
 }
 
-func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
+func GetPVCostMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
 	toReturn := make(map[string]*costAnalyzerCloud.PV)
 
-	// TODO: Pass actual query instead of PVCostMetrics
-	result, err := prom.NewQueryResults("PVCostMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -141,16 +127,10 @@ func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[str
 	return toReturn, nil
 }
 
-func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetNamespaceLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of NamespaceLabelsMetrics
-	result, err := prom.NewQueryResults("NamespaceLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Namespace and ClusterID for key generation purposes
 		ns, err := val.GetString("namespace")
 		if err != nil {
@@ -174,16 +154,10 @@ func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string)
 	return toReturn, nil
 }
 
-func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetPodLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of PodLabelsMetrics
-	result, err := prom.NewQueryResults("PodLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Pod, Namespace and ClusterID for key generation purposes
 		pod, err := val.GetString("pod")
 		if err != nil {
@@ -214,16 +188,10 @@ func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[
 	return toReturn, nil
 }
 
-func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetStatefulsetMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of StatefulsetMatchLabelsMetrics
-	result, err := prom.NewQueryResults("StatefulsetMatchLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Statefulset, Namespace and ClusterID for key generation purposes
 		ss, err := val.GetString("statefulSet")
 		if err != nil {
@@ -247,15 +215,10 @@ func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID
 	return toReturn, nil
 }
 
-func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
+func GetPodDaemonsetsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
 	toReturn := make(map[string]string)
 
-	// TODO: Pass actual query instead of PodDaemonsetsWithMetrics
-	result, err := prom.NewQueryResults("PodDaemonsetsWithMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		ds, err := val.GetString("owner_name")
 		if err != nil {
 			return toReturn, err
@@ -283,15 +246,10 @@ func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID strin
 	return toReturn, nil
 }
 
-func GetPodJobsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
+func GetPodJobsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]string, error) {
 	toReturn := make(map[string]string)
 
-	// TODO: Pass actual query instead of PodJobsWithMetrics
-	result, err := prom.NewQueryResults("PodJobsWithMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		ds, err := val.GetString("owner_name")
 		if err != nil {
 			return toReturn, err
@@ -319,16 +277,10 @@ func GetPodJobsWithMetrics(queryResult interface{}, defaultClusterID string) (ma
 	return toReturn, nil
 }
 
-func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetDeploymentMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of DeploymentMatchLabelsMetrics
-	result, err := prom.NewQueryResults("DeploymentMatchLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Deployment, Namespace and ClusterID for key generation purposes
 		deployment, err := val.GetString("deployment")
 		if err != nil {
@@ -352,16 +304,10 @@ func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID s
 	return toReturn, nil
 }
 
-func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+func GetServiceSelectorLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 
-	// TODO: Pass actual query instead of ServiceSelectorLabelsMetrics
-	result, err := prom.NewQueryResults("ServiceSelectorLabelsMetrics", queryResult)
-	if err != nil {
-		return toReturn, err
-	}
-
-	for _, val := range result.Results {
+	for _, val := range qrs {
 		// We want Service, Namespace and ClusterID for key generation purposes
 		service, err := val.GetString("service")
 		if err != nil {
@@ -384,3 +330,102 @@ func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID s
 
 	return toReturn, nil
 }
+
+func GetContainerMetricVector(qrs []*prom.QueryResult, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
+	containerData := make(map[string][]*util.Vector)
+	for _, val := range qrs {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+		if err != nil {
+			return nil, err
+		}
+
+		if normalize && normalizationValue != 0 {
+			for _, v := range val.Values {
+				v.Value = v.Value / normalizationValue
+			}
+		}
+		containerData[containerMetric.Key()] = val.Values
+	}
+	return containerData, nil
+}
+
+func GetContainerMetricVectors(qrs []*prom.QueryResult, defaultClusterID string) (map[string][]*util.Vector, error) {
+	containerData := make(map[string][]*util.Vector)
+	for _, val := range qrs {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+		if err != nil {
+			return nil, err
+		}
+		containerData[containerMetric.Key()] = val.Values
+	}
+	return containerData, nil
+}
+
+func GetNormalizedContainerMetricVectors(qrs []*prom.QueryResult, normalizationValues []*util.Vector, defaultClusterID string) (map[string][]*util.Vector, error) {
+	containerData := make(map[string][]*util.Vector)
+	for _, val := range qrs {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+		if err != nil {
+			return nil, err
+		}
+		containerData[containerMetric.Key()] = util.NormalizeVectorByVector(val.Values, normalizationValues)
+	}
+	return containerData, nil
+}
+
+func getCost(qrs []*prom.QueryResult) (map[string][]*util.Vector, error) {
+	toReturn := make(map[string][]*util.Vector)
+
+	for _, val := range qrs {
+		instance, err := val.GetString("instance")
+		if err != nil {
+			return toReturn, err
+		}
+
+		toReturn[instance] = val.Values
+	}
+
+	return toReturn, nil
+}
+
+// TODO niko/prom retain message:
+// normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
+func getNormalization(qrs []*prom.QueryResult) (float64, error) {
+	if len(qrs) == 0 {
+		return 0.0, prom.NoDataErr
+	}
+	if len(qrs[0].Values) == 0 {
+		return 0.0, prom.NoDataErr
+	}
+	return qrs[0].Values[0].Value, nil
+}
+
+// TODO niko/prom retain message:
+// normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
+func getNormalizations(qrs []*prom.QueryResult) ([]*util.Vector, error) {
+	if len(qrs) == 0 {
+		return nil, prom.NoDataErr
+	}
+
+	return qrs[0].Values, nil
+}
+
+func parsePodLabels(qrs []*prom.QueryResult) (map[string]map[string]string, error) {
+	podLabels := map[string]map[string]string{}
+
+	for _, result := range qrs {
+		pod, err := result.GetString("pod")
+		if err != nil {
+			return podLabels, errors.New("missing pod field")
+		}
+
+		if _, ok := podLabels[pod]; ok {
+			podLabels[pod] = result.GetLabels()
+		} else {
+			podLabels[pod] = map[string]string{}
+			podLabels[pod] = result.GetLabels()
+		}
+	}
+
+	return podLabels, nil
+}

+ 8 - 232
pkg/costmodel/router.go

@@ -5,7 +5,6 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
-	"math"
 	"net"
 	"net/http"
 	"reflect"
@@ -73,6 +72,7 @@ type Accesses struct {
 	CPUAllocationRecorder         *prometheus.GaugeVec
 	GPUAllocationRecorder         *prometheus.GaugeVec
 	PVAllocationRecorder          *prometheus.GaugeVec
+	ClusterManagementCostRecorder *prometheus.GaugeVec
 	NetworkZoneEgressRecorder     prometheus.Gauge
 	NetworkRegionEgressRecorder   prometheus.Gauge
 	NetworkInternetEgressRecorder prometheus.Gauge
@@ -657,236 +657,6 @@ func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request,
 	w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
 }
 
-func (a *Accesses) recordPrices() {
-	go func() {
-		defer errors.HandlePanic()
-
-		containerSeen := make(map[string]bool)
-		nodeSeen := make(map[string]bool)
-		pvSeen := make(map[string]bool)
-		pvcSeen := make(map[string]bool)
-
-		getKeyFromLabelStrings := func(labels ...string) string {
-			return strings.Join(labels, ",")
-		}
-		getLabelStringsFromKey := func(key string) []string {
-			return strings.Split(key, ",")
-		}
-
-		var defaultRegion string = ""
-		nodeList := a.Model.Cache.GetAllNodes()
-		if len(nodeList) > 0 {
-			defaultRegion = nodeList[0].Labels[v1.LabelZoneRegion]
-		}
-
-		for {
-			klog.V(4).Info("Recording prices...")
-			podlist := a.Model.Cache.GetAllPods()
-			podStatus := make(map[string]v1.PodPhase)
-			for _, pod := range podlist {
-				podStatus[pod.Name] = pod.Status.Phase
-			}
-
-			cfg, _ := a.Cloud.GetConfig()
-
-			// Record network pricing at global scope
-			networkCosts, err := a.Cloud.NetworkPricing()
-			if err != nil {
-				klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
-			} else {
-				a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
-				a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
-				a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
-			}
-
-			data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
-			if err != nil {
-				klog.V(1).Info("Error in price recording: " + err.Error())
-				// zero the for loop so the time.Sleep will still work
-				data = map[string]*CostData{}
-			}
-
-			nodes, err := a.Model.GetNodeCost(a.Cloud)
-			for nodeName, node := range nodes {
-				// Emit costs, guarding against NaN inputs for custom pricing.
-				cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
-				if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
-					cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
-					if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
-						cpuCost = 0
-					}
-				}
-				cpu, _ := strconv.ParseFloat(node.VCPU, 64)
-				if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
-					cpu = 1 // Assume 1 CPU
-				}
-				ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
-				if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
-					ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
-					if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
-						ramCost = 0
-					}
-				}
-				ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
-				if math.IsNaN(ram) || math.IsInf(ram, 0) {
-					ram = 0
-				}
-				gpu, _ := strconv.ParseFloat(node.GPU, 64)
-				if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
-					gpu = 0
-				}
-				gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
-				if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
-					gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
-					if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
-						gpuCost = 0
-					}
-				}
-				nodeType := node.InstanceType
-				nodeRegion := node.Region
-
-				totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
-
-				a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
-				a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
-				a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
-				a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
-				labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
-				nodeSeen[labelKey] = true
-			}
-
-			for _, costs := range data {
-				nodeName := costs.NodeName
-
-				namespace := costs.Namespace
-				podName := costs.PodName
-				containerName := costs.Name
-
-				if costs.PVCData != nil {
-					for _, pvc := range costs.PVCData {
-						if pvc.Volume != nil {
-							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
-							labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
-							pvcSeen[labelKey] = true
-						}
-					}
-				}
-
-				if len(costs.RAMAllocation) > 0 {
-					a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
-				}
-				if len(costs.CPUAllocation) > 0 {
-					a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
-				}
-				if len(costs.GPUReq) > 0 {
-					// allocation here is set to the request because shared GPU usage not yet supported.
-					a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
-				}
-				labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
-				if podStatus[podName] == v1.PodRunning { // Only report data for current pods
-					containerSeen[labelKey] = true
-				} else {
-					containerSeen[labelKey] = false
-				}
-
-				storageClasses := a.Model.Cache.GetAllStorageClasses()
-				storageClassMap := make(map[string]map[string]string)
-				for _, storageClass := range storageClasses {
-					params := storageClass.Parameters
-					storageClassMap[storageClass.ObjectMeta.Name] = params
-					if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
-						storageClassMap["default"] = params
-						storageClassMap[""] = params
-					}
-				}
-
-				pvs := a.Model.Cache.GetAllPersistentVolumes()
-				for _, pv := range pvs {
-					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
-					if !ok {
-						klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
-					}
-					var region string
-					if r, ok := pv.Labels[v1.LabelZoneRegion]; ok {
-						region = r
-					} else {
-						region = defaultRegion
-					}
-					cacPv := &costAnalyzerCloud.PV{
-						Class:      pv.Spec.StorageClassName,
-						Region:     region,
-						Parameters: parameters,
-					}
-					GetPVCost(cacPv, pv, a.Cloud, region)
-					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
-					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
-					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
-					pvSeen[labelKey] = true
-				}
-			}
-			for labelString, seen := range nodeSeen {
-				if !seen {
-					klog.Infof("Removing %s from nodes", labelString)
-					labels := getLabelStringsFromKey(labelString)
-					ok := a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
-					if ok {
-						klog.Infof("removed %s from totalprice", labelString)
-					} else {
-						klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
-					}
-					ok = a.CPUPriceRecorder.DeleteLabelValues(labels...)
-					if ok {
-						klog.Infof("removed %s from cpuprice", labelString)
-					} else {
-						klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
-					}
-					ok = a.GPUPriceRecorder.DeleteLabelValues(labels...)
-					if ok {
-						klog.Infof("removed %s from gpuprice", labelString)
-					} else {
-						klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
-					}
-					ok = a.RAMPriceRecorder.DeleteLabelValues(labels...)
-					if ok {
-						klog.Infof("removed %s from ramprice", labelString)
-					} else {
-						klog.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
-					}
-					delete(nodeSeen, labelString)
-				}
-				nodeSeen[labelString] = false
-			}
-			for labelString, seen := range containerSeen {
-				if !seen {
-					labels := getLabelStringsFromKey(labelString)
-					a.RAMAllocationRecorder.DeleteLabelValues(labels...)
-					a.CPUAllocationRecorder.DeleteLabelValues(labels...)
-					a.GPUAllocationRecorder.DeleteLabelValues(labels...)
-					delete(containerSeen, labelString)
-				}
-				containerSeen[labelString] = false
-			}
-			for labelString, seen := range pvSeen {
-				if !seen {
-					labels := getLabelStringsFromKey(labelString)
-					a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
-					delete(pvSeen, labelString)
-				}
-				pvSeen[labelString] = false
-			}
-			for labelString, seen := range pvcSeen {
-				if !seen {
-					labels := getLabelStringsFromKey(labelString)
-					a.PVAllocationRecorder.DeleteLabelValues(labels...)
-					delete(pvcSeen, labelString)
-				}
-				pvcSeen[labelString] = false
-			}
-			time.Sleep(time.Minute)
-		}
-	}()
-}
-
 // Creates a new ClusterManager instance using a boltdb storage. If that fails,
 // then we fall back to a memory-only storage.
 func newClusterManager() *cm.ClusterManager {
@@ -1129,6 +899,10 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Name: "kubecost_network_internet_egress_cost",
 		Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
 	})
+	ClusterManagementCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Name: "kubecost_cluster_management_cost",
+		Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
+	}, []string{"provisioner_name"})
 
 	prometheus.MustRegister(cpuGv)
 	prometheus.MustRegister(ramGv)
@@ -1140,6 +914,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	prometheus.MustRegister(PVAllocation)
 	prometheus.MustRegister(GPUAllocation)
 	prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
+	prometheus.MustRegister(ClusterManagementCostRecorder)
 	prometheus.MustRegister(ServiceCollector{
 		KubeClientSet: kubeClientset,
 	})
@@ -1170,6 +945,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
+		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
 		Model:                         NewCostModel(k8sCache),
 		OutOfClusterCache:             outOfClusterCache,
 	}
@@ -1228,7 +1004,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		klog.V(1).Info("Failed to download pricing data: " + err.Error())
 	}
 
-	A.recordPrices()
+	StartCostModelMetricRecording(&A)
 
 	managerEndpoints := cm.NewClusterManagerEndpoints(A.ClusterManager)
 

+ 115 - 12
pkg/prom/query.go

@@ -5,6 +5,9 @@ import (
 	"encoding/json"
 	"fmt"
 	"net/http"
+	"net/url"
+	"strconv"
+	"time"
 
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -13,8 +16,9 @@ import (
 )
 
 const (
-	apiPrefix = "/api/v1"
-	epQuery   = apiPrefix + "/query"
+	apiPrefix    = "/api/v1"
+	epQuery      = apiPrefix + "/query"
+	epQueryRange = apiPrefix + "/query_range"
 )
 
 // Context wraps a Prometheus client and provides methods for querying and
@@ -39,7 +43,33 @@ func (ctx *Context) Errors() []error {
 	return ctx.ErrorCollector.Errors()
 }
 
-// TODO SetMaxConcurrency
+// HasErrors returns true if the ErrorCollector has errors
+func (ctx *Context) HasErrors() bool {
+	return ctx.ErrorCollector.IsError()
+}
+
+// Query returns a QueryResultsChan, then runs the given query and sends the
+// results on the provided channel. Receiver is responsible for closing the
+// channel, preferably using the Read method.
+func (ctx *Context) Query(query string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
+
+	go func(ctx *Context, resCh QueryResultsChan) {
+		defer errors.HandlePanic()
+
+		raw, promErr := ctx.query(query)
+		ctx.ErrorCollector.Report(promErr)
+
+		results := NewQueryResults(query, raw)
+		if results.Error != nil {
+			ctx.ErrorCollector.Report(results.Error)
+		}
+
+		resCh <- results
+	}(ctx, resCh)
+
+	return resCh
+}
 
 // QueryAll returns one QueryResultsChan for each query provided, then runs
 // each query concurrently and returns results on each channel, respectively,
@@ -55,20 +85,70 @@ func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
 	return resChs
 }
 
-// Query returns a QueryResultsChan, then runs the given query and sends the
-// results on the provided channel. Receiver is responsible for closing the
-// channel, preferably using the Read method.
-func (ctx *Context) Query(query string) QueryResultsChan {
+func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
+	raw, err := ctx.query(query)
+	if err != nil {
+		return nil, err
+	}
+
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		return nil, results.Error
+	}
+
+	return results.Results, nil
+}
+
+// QueryURL returns the URL used to query Prometheus
+func (ctx *Context) QueryURL() *url.URL {
+	return ctx.Client.URL(epQuery, nil)
+}
+
+func (ctx *Context) query(query string) (interface{}, error) {
+	u := ctx.Client.URL(epQuery, nil)
+	q := u.Query()
+	q.Set("query", query)
+	u.RawQuery = q.Encode()
+
+	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
+	for _, w := range warnings {
+		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+	}
+	if err != nil {
+		if resp == nil {
+			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		}
+
+		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+	}
+
+	var toReturn interface{}
+	err = json.Unmarshal(body, &toReturn)
+	if err != nil {
+		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+	}
+
+	return toReturn, nil
+}
+
+func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
 	go func(ctx *Context, resCh QueryResultsChan) {
 		defer errors.HandlePanic()
 
-		raw, promErr := ctx.query(query)
+		raw, promErr := ctx.queryRange(query, start, end, step)
 		ctx.ErrorCollector.Report(promErr)
 
-		results, parseErr := NewQueryResults(query, raw)
-		ctx.ErrorCollector.Report(parseErr)
+		results := NewQueryResults(query, raw)
+		if results.Error != nil {
+			ctx.ErrorCollector.Report(results.Error)
+		}
 
 		resCh <- results
 	}(ctx, resCh)
@@ -76,10 +156,32 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 	return resCh
 }
 
-func (ctx *Context) query(query string) (interface{}, error) {
-	u := ctx.Client.URL(epQuery, nil)
+func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time.Duration) ([]*QueryResult, error) {
+	raw, err := ctx.queryRange(query, start, end, step)
+	if err != nil {
+		return nil, err
+	}
+
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		return nil, results.Error
+	}
+
+	return results.Results, nil
+}
+
+// QueryRangeURL returns the URL used to query_range Prometheus
+func (ctx *Context) QueryRangeURL() *url.URL {
+	return ctx.Client.URL(epQueryRange, nil)
+}
+
+func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
+	u := ctx.Client.URL(epQueryRange, nil)
 	q := u.Query()
 	q.Set("query", query)
+	q.Set("start", start.Format(time.RFC3339Nano))
+	q.Set("end", end.Format(time.RFC3339Nano))
+	q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
 	u.RawQuery = q.Encode()
 
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
@@ -111,5 +213,6 @@ func (ctx *Context) query(query string) (interface{}, error) {
 	if err != nil {
 		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
+
 	return toReturn, nil
 }

+ 63 - 49
pkg/prom/result.go

@@ -11,83 +11,92 @@ import (
 	"github.com/kubecost/cost-model/pkg/util"
 )
 
+var (
+	// Static Warnings for data point parsing
+	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
+	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
+
+	// Static Errors for query result parsing
+	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
+	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
+	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
+	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
+	NoDataErr                  error = errors.New("No data")
+	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
+	QueryResultNilErr          error = NewCommError("nil queryResult")
+	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
+	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
+	ResultFormatErr            error = errors.New("Result is improperly formatted")
+	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
+	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
+)
+
 // QueryResultsChan is a channel of query results
 type QueryResultsChan chan *QueryResults
 
 // Await returns query results, blocking until they are made available, and
 // deferring the closure of the underlying channel
-func (qrc QueryResultsChan) Await() []*QueryResult {
+func (qrc QueryResultsChan) Await() ([]*QueryResult, error) {
 	defer close(qrc)
-	results := <-qrc
 
-	// Possible that the returned results are nil
-	if results == nil {
-		return nil
+	results := <-qrc
+	if results.Error != nil {
+		return nil, results.Error
 	}
 
-	return results.Results
-}
-
-// QueryResult contains a single result from a prometheus query. It's common
-// to refer to query results as a slice of QueryResult
-type QueryResult struct {
-	Metric map[string]interface{}
-	Values []*util.Vector
+	return results.Results, nil
 }
 
 // QueryResults contains all of the query results and the source query string.
 type QueryResults struct {
 	Query   string
+	Error   error
 	Results []*QueryResult
 }
 
-var (
-	// Static Warnings for data point parsing
-	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
-	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
-
-	// Static Errors for query result parsing
-	QueryResultNilErr          error = NewCommError("nil queryResult")
-	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
-	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
-	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
-	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
-	ResultFormatErr            error = errors.New("Result is improperly formatted")
-	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
-	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
-	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
-	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
-	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
-)
+// QueryResult contains a single result from a prometheus query. It's common
+// to refer to query results as a slice of QueryResult
+type QueryResult struct {
+	Metric map[string]interface{}
+	Values []*util.Vector
+}
 
 // NewQueryResults accepts the raw prometheus query result and returns an array of
 // QueryResult objects
-func NewQueryResults(query string, queryResult interface{}) (*QueryResults, error) {
+func NewQueryResults(query string, queryResult interface{}) *QueryResults {
+	qrs := &QueryResults{Query: query}
+
 	if queryResult == nil {
-		return nil, QueryResultNilErr
+		qrs.Error = QueryResultNilErr
+		return qrs
 	}
 
 	data, ok := queryResult.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(queryResult)
 		if err != nil {
-			return nil, err
+			qrs.Error = err
+			return qrs
 		}
-		return nil, fmt.Errorf(e)
+		qrs.Error = fmt.Errorf(e)
+		return qrs
 	}
 
 	// Deep Check for proper formatting
 	d, ok := data.(map[string]interface{})
 	if !ok {
-		return nil, DataFieldFormatErr
+		qrs.Error = DataFieldFormatErr
+		return qrs
 	}
 	resultData, ok := d["result"]
 	if !ok {
-		return nil, ResultFieldDoesNotExistErr
+		qrs.Error = ResultFieldDoesNotExistErr
+		return qrs
 	}
 	resultsData, ok := resultData.([]interface{})
 	if !ok {
-		return nil, ResultFieldFormatErr
+		qrs.Error = ResultFieldFormatErr
+		return qrs
 	}
 
 	// Result vectors from the query
@@ -97,16 +106,19 @@ func NewQueryResults(query string, queryResult interface{}) (*QueryResults, erro
 	for _, val := range resultsData {
 		resultInterface, ok := val.(map[string]interface{})
 		if !ok {
-			return nil, ResultFormatErr
+			qrs.Error = ResultFormatErr
+			return qrs
 		}
 
 		metricInterface, ok := resultInterface["metric"]
 		if !ok {
-			return nil, MetricFieldDoesNotExistErr
+			qrs.Error = MetricFieldDoesNotExistErr
+			return qrs
 		}
 		metricMap, ok := metricInterface.(map[string]interface{})
 		if !ok {
-			return nil, MetricFieldFormatErr
+			qrs.Error = MetricFieldFormatErr
+			return qrs
 		}
 
 		// Define label string for values to ensure that we only run labelsForMetric once
@@ -120,13 +132,15 @@ func NewQueryResults(query string, queryResult interface{}) (*QueryResults, erro
 		if !isRange {
 			dataPoint, ok := resultInterface["value"]
 			if !ok {
-				return nil, ValueFieldDoesNotExistErr
+				qrs.Error = ValueFieldDoesNotExistErr
+				return qrs
 			}
 
 			// Append new data point, log warnings
 			v, warn, err := parseDataPoint(dataPoint)
 			if err != nil {
-				return nil, err
+				qrs.Error = err
+				return qrs
 			}
 			if warn != nil {
 				log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
@@ -136,14 +150,16 @@ func NewQueryResults(query string, queryResult interface{}) (*QueryResults, erro
 		} else {
 			values, ok := resultInterface["values"].([]interface{})
 			if !ok {
-				return nil, fmt.Errorf("Values field is improperly formatted")
+				qrs.Error = fmt.Errorf("Values field is improperly formatted")
+				return qrs
 			}
 
 			// Append new data points, log warnings
 			for _, value := range values {
 				v, warn, err := parseDataPoint(value)
 				if err != nil {
-					return nil, err
+					qrs.Error = err
+					return qrs
 				}
 				if warn != nil {
 					if labelString == "" {
@@ -162,10 +178,8 @@ func NewQueryResults(query string, queryResult interface{}) (*QueryResults, erro
 		})
 	}
 
-	return &QueryResults{
-		Query:   query,
-		Results: results,
-	}, nil
+	qrs.Results = results
+	return qrs
 }
 
 // GetString returns the requested field, or an error if it does not exist

Niektóre pliki nie zostały wyświetlone z powodu dużej ilości zmienionych plików