Niko Kovacevic 5 лет назад
Родитель
Сommit
73b083af1d

+ 1 - 0
.dockerignore

@@ -0,0 +1 @@
+.git

+ 1 - 9
Dockerfile

@@ -11,17 +11,9 @@ RUN go mod download
 COPY . .
 # Build the binary
 RUN set -e ;\
-    GIT_COMMIT=`git rev-parse HEAD` ;\
-    GIT_DIRTY='' ;\
-    # for our purposes, we only care about dirty .go files ;\
-    if test -n "`git status --porcelain --untracked-files=no | grep '\.go'`"; then \
-      GIT_DIRTY='+dirty' ;\
-    fi ;\
     cd cmd/costmodel;\
     CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
-    go build -a -installsuffix cgo \
-        -ldflags "-X main.gitCommit=${GIT_COMMIT}${GIT_DIRTY}" \
-        -o /go/bin/app
+    go build -a -installsuffix cgo -o /go/bin/app
 
 FROM alpine:3.10.2
 RUN apk add --update --no-cache ca-certificates

+ 1 - 2
go.mod

@@ -9,7 +9,6 @@ require (
 	github.com/Azure/go-autorest v11.3.2+incompatible
 	github.com/aws/aws-sdk-go v1.28.9
 	github.com/dimchansky/utfbom v1.1.0 // indirect
-	github.com/etcd-io/bbolt v1.3.3
 	github.com/getsentry/sentry-go v0.6.1
 	github.com/google/martian v2.1.0+incompatible // indirect
 	github.com/google/uuid v1.1.1
@@ -23,7 +22,7 @@ require (
 	github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
 	github.com/satori/go.uuid v1.2.0 // indirect
 	github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
-	go.etcd.io/bbolt v1.3.3 // indirect
+	go.etcd.io/bbolt v1.3.5
 	golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
 	golang.org/x/sync v0.0.0-20190423024810-112230192c58
 	google.golang.org/api v0.4.0

+ 5 - 1
go.sum

@@ -72,7 +72,6 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
 github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
 github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
-github.com/etcd-io/bbolt v1.3.3 h1:gSJmxrs37LgTqR/oyJBWok6k6SvXEUerFTbltIhXkBM=
 github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
 github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550 h1:mV9jbLoSW/8m4VK16ZkHTozJa8sesK5u5kTMFysTYac=
 github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
@@ -375,6 +374,8 @@ github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZ
 go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
 go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
+go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
 go.opencensus.io v0.19.1/go.mod h1:gug0GbSHa8Pafr0d2urOSgoXHZ6x/RUlaiT0d9pqb4A=
 go.opencensus.io v0.19.2 h1:ZZpq6xI6kv/LuE/5s5UQvBU5vMjvRnPb8PvJrIntAnc=
 go.opencensus.io v0.19.2/go.mod h1:NO/8qkisMZLZ1FCsKNqtJPwc8/TaclWyY0B6wcYNg9M=
@@ -466,6 +467,8 @@ golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
 golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
+golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -553,6 +556,7 @@ k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 h1:uV4S5IB5g4Nvi+TBVNf3e9
 k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA=
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6 h1:tGU1C/vMoUV2ZakSH6wQq2shk9KiFtjoH2vDDHlhpA4=
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6/go.mod h1:nL6pwRT8NgfF8TT68DBI8uEePRt89cSvoXUVqbkWHq4=
+k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5 h1:BwY2C//EoWktJi74O6R2REBonrhsfhRI0qfVwOjOPp8=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5/go.mod h1:bIEHXHbykaOlj+pgLllzLJ2RPGdzkjtqdk0Il07KPEM=
 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab h1:E8Fecph0qbNsAbijJJQryKu4Oi9QTp5cVpjTE+nqg6g=

+ 21 - 0
pkg/cloud/awsprovider.go

@@ -105,6 +105,8 @@ type AWS struct {
 	DownloadPricingDataLock     sync.RWMutex
 	Config                      *ProviderConfig
 	ServiceAccountChecks        map[string]*ServiceAccountCheck
+	clusterManagementPrice      float64
+	clusterProvisioner          string
 	*CustomProvider
 }
 
@@ -511,6 +513,10 @@ func (aws *AWS) isPreemptible(key string) bool {
 	return false
 }
 
+func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
+	return aws.clusterProvisioner, aws.clusterManagementPrice, nil
+}
+
 // DownloadPricingData fetches data from the AWS Pricing API
 func (aws *AWS) DownloadPricingData() error {
 	aws.DownloadPricingDataLock.Lock()
@@ -545,6 +551,13 @@ func (aws *AWS) DownloadPricingData() error {
 
 	inputkeys := make(map[string]bool)
 	for _, n := range nodeList {
+		if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
+			aws.clusterManagementPrice = 0.10
+			aws.clusterProvisioner = "EKS"
+		} else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
+			aws.clusterProvisioner = "KOPS"
+		}
+
 		labels := n.GetObjectMeta().GetLabels()
 		key := aws.GetKey(labels, n)
 		inputkeys[key.Features()] = true
@@ -1012,6 +1025,7 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		m["provider"] = "AWS"
 		m["id"] = env.GetClusterID()
 		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
+		m["provisioner"] = awsProvider.clusterProvisioner
 		return m, nil
 	}
 	makeStructure := func(clusterName string) (map[string]string, error) {
@@ -1756,6 +1770,13 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 	}
 	s := session.Must(session.NewSession(c))
 	svc := athena.New(s)
+	if customPricing.MasterPayerARN != "" {
+		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
+		svc = athena.New(s, &aws.Config{
+			Region:      region,
+			Credentials: creds,
+		})
+	}
 
 	var e athena.StartQueryExecutionInput
 

+ 51 - 16
pkg/cloud/azureprovider.go

@@ -28,8 +28,11 @@ import (
 )
 
 const (
-	AzurePremiumStorageClass  = "premium"
-	AzureStandardStorageClass = "standard"
+	AzureFilePremiumStorageClass     = "premium_smb"
+	AzureFileStandardStorageClass    = "standard_smb"
+	AzureDiskPremiumSSDStorageClass  = "premium_ssd"
+	AzureDiskStandardSSDStorageClass = "standard_ssd"
+	AzureDiskStandardStorageClass    = "standard_hdd"
 )
 
 var (
@@ -478,12 +481,16 @@ func (az *Azure) DownloadPricingData() error {
 		if !strings.Contains(meterSubCategory, "Windows") {
 
 			if strings.Contains(meterCategory, "Storage") {
-				if strings.Contains(meterSubCategory, "HDD") || strings.Contains(meterSubCategory, "SSD") {
+				if strings.Contains(meterSubCategory, "HDD") || strings.Contains(meterSubCategory, "SSD") || strings.Contains(meterSubCategory, "Premium Files") {
 					var storageClass string = ""
-					if strings.Contains(meterName, "S4 ") {
-						storageClass = AzureStandardStorageClass
-					} else if strings.Contains(meterName, "P4 ") {
-						storageClass = AzurePremiumStorageClass
+					if strings.Contains(meterName, "P4 ") {
+						storageClass = AzureDiskPremiumSSDStorageClass
+					} else if strings.Contains(meterName, "E4 ") {
+						storageClass = AzureDiskStandardSSDStorageClass
+					} else if strings.Contains(meterName, "S4 ") {
+						storageClass = AzureDiskStandardStorageClass
+					} else if strings.Contains(meterName, "LRS Provisioned") {
+						storageClass = AzureFilePremiumStorageClass
 					}
 
 					if storageClass != "" {
@@ -514,11 +521,6 @@ func (az *Azure) DownloadPricingData() error {
 
 			if strings.Contains(meterCategory, "Virtual Machines") {
 
-				// not available now
-				if strings.Contains(meterSubCategory, "Promo") {
-					continue
-				}
-
 				usageType := ""
 				if !strings.Contains(meterName, "Low Priority") {
 					usageType = "ondemand"
@@ -530,6 +532,9 @@ func (az *Azure) DownloadPricingData() error {
 				name := strings.TrimSuffix(meterName, " Low Priority")
 				instanceType := strings.Split(name, "/")
 				for _, it := range instanceType {
+					if strings.Contains(meterSubCategory, "Promo") {
+						it = it + " Promo"
+					}
 					instanceTypes = append(instanceTypes, strings.Replace(it, " ", "_", 1))
 				}
 
@@ -561,6 +566,22 @@ func (az *Azure) DownloadPricingData() error {
 			}
 		}
 	}
+
+	// There is no easy way of supporting Standard Azure-File, because it's billed per used GB
+	// this will set the price to "0" as a workaround to not spam with `Persistent Volume pricing not found for` error
+	// check https://github.com/kubecost/cost-model/issues/159 for more information (same problem on AWS)
+	zeroPrice := "0.0"
+	for region := range regions {
+		key := region + "," + AzureFileStandardStorageClass
+		klog.V(4).Infof("Adding PV.Key: %s, Cost: %s", key, zeroPrice)
+		allPrices[key] = &AzurePricing{
+			PV: &PV{
+				Cost:   zeroPrice,
+				Region: region,
+			},
+		}
+	}
+
 	az.Pricing = allPrices
 	return nil
 }
@@ -651,10 +672,21 @@ func (key *azurePvKey) GetStorageClass() string {
 
 func (key *azurePvKey) Features() string {
 	storageClass := key.StorageClassParameters["storageaccounttype"]
-	if strings.EqualFold(storageClass, "Premium_LRS") {
-		storageClass = AzurePremiumStorageClass
-	} else if strings.EqualFold(storageClass, "Standard_LRS") {
-		storageClass = AzureStandardStorageClass
+	storageSKU := key.StorageClassParameters["skuName"]
+	if storageClass != "" {
+		if strings.EqualFold(storageClass, "Premium_LRS") {
+			storageClass = AzureDiskPremiumSSDStorageClass
+		} else if strings.EqualFold(storageClass, "StandardSSD_LRS") {
+			storageClass = AzureDiskStandardSSDStorageClass
+		} else if strings.EqualFold(storageClass, "Standard_LRS") {
+			storageClass = AzureDiskStandardStorageClass
+		}
+	} else {
+		if strings.EqualFold(storageSKU, "Premium_LRS") {
+			storageClass = AzureFilePremiumStorageClass
+		} else if strings.EqualFold(storageSKU, "Standard_LRS") {
+			storageClass = AzureFileStandardStorageClass
+		}
 	}
 	if region, ok := key.Labels[v1.LabelZoneRegion]; ok {
 		return region + "," + storageClass
@@ -780,6 +812,9 @@ func (az *Azure) ServiceAccountStatus() *ServiceAccountStatus {
 		Checks: []*ServiceAccountCheck{},
 	}
 }
+func (*Azure) ClusterManagementPricing() (string, float64, error) {
+	return "", 0.0, nil
+}
 
 func (az *Azure) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))

+ 4 - 0
pkg/cloud/csvprovider.go

@@ -295,6 +295,10 @@ func (c *CSVProvider) ServiceAccountStatus() *ServiceAccountStatus {
 	}
 }
 
+func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
+	return "", 0.0, nil
+}
+
 func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
 }

+ 4 - 0
pkg/cloud/customprovider.go

@@ -38,6 +38,10 @@ type customProviderKey struct {
 	Labels         map[string]string
 }
 
+func (*CustomProvider) ClusterManagementPricing() (string, float64, error) {
+	return "", 0.0, nil
+}
+
 func (*CustomProvider) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
 	return ""
 }

+ 27 - 5
pkg/cloud/gcpprovider.go

@@ -57,6 +57,8 @@ type GCP struct {
 	Config                  *ProviderConfig
 	serviceKeyProvided      bool
 	ValidPricingKeys        map[string]bool
+	clusterManagementPrice  float64
+	clusterProvisioner      string
 	*CustomProvider
 }
 
@@ -164,6 +166,9 @@ func (gcp *GCP) GetConfig() (*CustomPricing, error) {
 	if c.NegotiatedDiscount == "" {
 		c.NegotiatedDiscount = "0%"
 	}
+	if c.CurrencyCode == "" {
+		c.CurrencyCode = "USD"
+	}
 	return c, nil
 }
 
@@ -488,11 +493,16 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m := make(map[string]string)
 	m["name"] = attribute
 	m["provider"] = "GCP"
+	m["provisioner"] = gcp.clusterProvisioner
 	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 	return m, nil
 }
 
+func (gcp *GCP) ClusterManagementPricing() (string, float64, error) {
+	return gcp.clusterProvisioner, gcp.clusterManagementPrice, nil
+}
+
 func (*GCP) GetAddresses() ([]byte, error) {
 	// metadata API setup
 	metadataClient := metadata.NewClient(&http.Client{Transport: userAgentTransport{
@@ -568,7 +578,7 @@ type GCPPricing struct {
 type PricingInfo struct {
 	Summary                string             `json:"summary"`
 	PricingExpression      *PricingExpression `json:"pricingExpression"`
-	CurrencyConversionRate int                `json:"currencyConversionRate"`
+	CurrencyConversionRate float64            `json:"currencyConversionRate"`
 	EffectiveTime          string             `json:""`
 }
 
@@ -867,7 +877,11 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 
 func (gcp *GCP) parsePages(inputKeys map[string]Key, pvKeys map[string]PVKey) (map[string]*GCPPricing, error) {
 	var pages []map[string]*GCPPricing
-	url := "https://cloudbilling.googleapis.com/v1/services/6F81-5844-456A/skus?key=" + gcp.APIKey
+	c, err := gcp.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	url := "https://cloudbilling.googleapis.com/v1/services/6F81-5844-456A/skus?key=" + gcp.APIKey + "&currencyCode=" + c.CurrencyCode
 	klog.V(2).Infof("Fetch GCP Billing Data from URL: %s", url)
 	var parsePagesHelper func(string) error
 	parsePagesHelper = func(pageToken string) error {
@@ -887,7 +901,7 @@ func (gcp *GCP) parsePages(inputKeys map[string]Key, pvKeys map[string]PVKey) (m
 		pages = append(pages, page)
 		return parsePagesHelper(token)
 	}
-	err := parsePagesHelper("")
+	err = parsePagesHelper("")
 	if err != nil {
 		return nil, err
 	}
@@ -945,6 +959,11 @@ func (gcp *GCP) DownloadPricingData() error {
 
 	for _, n := range nodeList {
 		labels := n.GetObjectMeta().GetLabels()
+		if _, ok := labels["cloud.google.com/gke-nodepool"]; ok { // The node is part of a GKE nodepool, so you're paying a cluster management cost
+			gcp.clusterManagementPrice = 0.10
+			gcp.clusterProvisioner = "GKE"
+		}
+
 		key := gcp.GetKey(labels, n)
 		inputkeys[key.Features()] = key
 	}
@@ -1389,10 +1408,13 @@ func (gcp *GCP) ServiceAccountStatus() *ServiceAccountStatus {
 
 func (gcp *GCP) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	class := strings.Split(instanceType, "-")[0]
-	return 1.0 - ((1.0 - sustainedUseDiscount(class, defaultDiscount)) * (1.0 - negotiatedDiscount))
+	return 1.0 - ((1.0 - sustainedUseDiscount(class, defaultDiscount, isPreemptible)) * (1.0 - negotiatedDiscount))
 }
 
-func sustainedUseDiscount(class string, defaultDiscount float64) float64 {
+func sustainedUseDiscount(class string, defaultDiscount float64, isPreemptible bool) float64 {
+	if isPreemptible {
+		return 0.0
+	}
 	discount := defaultDiscount
 	switch class {
 	case "e2", "f1", "g1":

+ 1 - 0
pkg/cloud/provider.go

@@ -188,6 +188,7 @@ type Provider interface {
 	ExternalAllocations(string, string, []string, string, string, bool) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 	ServiceAccountStatus() *ServiceAccountStatus
+	ClusterManagementPricing() (string, float64, error)
 	CombinedDiscountForNode(string, bool, float64, float64) float64
 	ParseID(string) string
 }

+ 1 - 1
pkg/clustermanager/boltdbstorage.go

@@ -1,7 +1,7 @@
 package clustermanager
 
 import (
-	bolt "github.com/etcd-io/bbolt"
+	bolt "go.etcd.io/bbolt"
 	_ "k8s.io/klog"
 )
 

+ 1 - 1
pkg/clustermanager/clustermanager.go

@@ -222,7 +222,7 @@ func fromSecret(secretName string) (string, error) {
 		return "", fmt.Errorf("Failed to load secret: %s", file)
 	}
 
-	return string(data), nil
+	return base64.StdEncoding.EncodeToString(data), nil
 }
 
 func getAuth(auth *ClusterConfigEntryAuth) (string, error) {

+ 24 - 4
pkg/costmodel/cluster.go

@@ -406,7 +406,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node, provider_id))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
@@ -470,6 +470,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		nodeMap[key].CPUCost += cpuCost
 		nodeMap[key].NodeType = nodeType
 	}
+	partialCPUMap := make(map[string]float64)
+	partialCPUMap["e2-micro"] = 0.25
+	partialCPUMap["e2-small"] = 0.5
+	partialCPUMap["e2-medium"] = 1.0
 
 	for _, result := range resNodeCPUCores {
 		cluster, err := result.GetString("cluster_id")
@@ -494,7 +498,17 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
-		nodeMap[key].CPUCores = cpuCores
+		node := nodeMap[key]
+		if v, ok := partialCPUMap[node.NodeType]; ok {
+			node.CPUCores = v
+			if cpuCores > 0 {
+				adjustmentFactor := v / cpuCores
+				node.CPUCost = node.CPUCost * adjustmentFactor
+			}
+		} else {
+			nodeMap[key].CPUCores = cpuCores
+		}
+
 	}
 
 	for _, result := range resNodeRAMCost {
@@ -710,8 +724,14 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		}
 
 		// GCP preemptible label
-		pre, _ := result.GetString("label_cloud_google_com_gke_preemptible")
-		if node, ok := nodeMap[nodeName]; pre == "true" && ok {
+		pre := result.Values[0].Value
+
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+		key := fmt.Sprintf("%s/%s", cluster, nodeName)
+		if node, ok := nodeMap[key]; pre > 0.0 && ok {
 			node.Preemptible = true
 		}
 

+ 70 - 0
pkg/costmodel/clusterinfo.go

@@ -0,0 +1,70 @@
+package costmodel
+
+import (
+	"fmt"
+
+	cloudProvider "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/thanos"
+
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/klog"
+)
+
+var (
+	logCollectionEnabled    bool   = env.IsLogCollectionEnabled()
+	productAnalyticsEnabled bool   = env.IsProductAnalyticsEnabled()
+	errorReportingEnabled   bool   = env.IsErrorReportingEnabled()
+	valuesReportingEnabled  bool   = env.IsValuesReportingEnabled()
+	clusterProfile          string = env.GetClusterProfile()
+)
+
+// writeReportingFlags writes the reporting flags to the cluster info map
+func writeReportingFlags(clusterInfo map[string]string) {
+	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
+	clusterInfo["productAnalytics"] = fmt.Sprintf("%t", productAnalyticsEnabled)
+	clusterInfo["errorReporting"] = fmt.Sprintf("%t", errorReportingEnabled)
+	clusterInfo["valuesReporting"] = fmt.Sprintf("%t", valuesReportingEnabled)
+}
+
+// writeClusterProfile writes the data associated with the cluster profile
+func writeClusterProfile(clusterInfo map[string]string) {
+	clusterInfo["clusterProfile"] = clusterProfile
+}
+
+func writeThanosFlags(clusterInfo map[string]string) {
+	// Include Thanos Offset Duration if Applicable
+	clusterInfo["thanosEnabled"] = fmt.Sprintf("%t", thanos.IsEnabled())
+	if thanos.IsEnabled() {
+		clusterInfo["thanosOffset"] = thanos.Offset()
+	}
+}
+
+// GetClusterInfo provides specific information about the cluster cloud provider as well as
+// generic configuration values.
+func GetClusterInfo(kubeClient kubernetes.Interface, cloud cloudProvider.Provider) map[string]string {
+	data, err := cloud.ClusterInfo()
+
+	// Ensure we create the info object if it doesn't exist
+	if data == nil {
+		data = make(map[string]string)
+	}
+
+	kc, ok := kubeClient.(*kubernetes.Clientset)
+	if ok && data != nil {
+		v, err := kc.ServerVersion()
+		if err != nil {
+			klog.Infof("Could not get k8s version info: %s", err.Error())
+		} else if v != nil {
+			data["version"] = v.Major + "." + v.Minor
+		}
+	} else {
+		klog.Infof("Could not get k8s version info: %s", err.Error())
+	}
+
+	writeClusterProfile(data)
+	writeReportingFlags(data)
+	writeThanosFlags(data)
+
+	return data
+}

+ 12 - 17
pkg/costmodel/costmodel.go

@@ -475,6 +475,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 					name := vol.PersistentVolumeClaim.ClaimName
 					key := ns + "," + name + "," + clusterID
 					if pvClaim, ok := pvClaimMapping[key]; ok {
+						pvClaim.TimesClaimed++
 						podPVs = append(podPVs, pvClaim)
 
 						// Remove entry from potential unmounted pvs
@@ -750,16 +751,9 @@ func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*c
 	if len(missingNodes) > 0 {
 		defer measureTime(time.Now(), profileThreshold, "Finding Deleted Node Info")
 
-		q := make([]string, 0, len(missingNodes))
-		for nodename := range missingNodes {
-			klog.V(4).Infof("Finding data for deleted node %v", nodename)
-			q = append(q, nodename)
-		}
-		l := strings.Join(q, "|")
-
-		queryHistoricalCPUCost := fmt.Sprintf(`avg_over_time(node_cpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
-		queryHistoricalRAMCost := fmt.Sprintf(`avg_over_time(node_ram_hourly_cost{instance=~"%s"}[%s])`, l, window)
-		queryHistoricalGPUCost := fmt.Sprintf(`avg_over_time(node_gpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
+		queryHistoricalCPUCost := fmt.Sprintf(`avg_over_time(node_cpu_hourly_cost[%s])`, window)
+		queryHistoricalRAMCost := fmt.Sprintf(`avg_over_time(node_ram_hourly_cost[%s])`, window)
+		queryHistoricalGPUCost := fmt.Sprintf(`avg_over_time(node_gpu_hourly_cost[%s])`, window)
 
 		ctx := prom.NewContext(cli)
 		cpuCostResCh := ctx.Query(queryHistoricalCPUCost)
@@ -2177,13 +2171,14 @@ func getStatefulSetsOfPod(pod v1.Pod) []string {
 }
 
 type PersistentVolumeClaimData struct {
-	Class      string                `json:"class"`
-	Claim      string                `json:"claim"`
-	Namespace  string                `json:"namespace"`
-	ClusterID  string                `json:"clusterId"`
-	VolumeName string                `json:"volumeName"`
-	Volume     *costAnalyzerCloud.PV `json:"persistentVolume"`
-	Values     []*util.Vector        `json:"values"`
+	Class        string                `json:"class"`
+	Claim        string                `json:"claim"`
+	Namespace    string                `json:"namespace"`
+	ClusterID    string                `json:"clusterId"`
+	TimesClaimed int                   `json:"timesClaimed"`
+	VolumeName   string                `json:"volumeName"`
+	Volume       *costAnalyzerCloud.PV `json:"persistentVolume"`
+	Values       []*util.Vector        `json:"values"`
 }
 
 func measureTime(start time.Time, threshold time.Duration, name string) {

+ 532 - 68
pkg/costmodel/metrics.go

@@ -1,54 +1,71 @@
 package costmodel
 
 import (
+	"math"
 	"regexp"
 	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
 
+	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/prometheus/client_golang/prometheus"
 	dto "github.com/prometheus/client_model/go"
+	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/client-go/kubernetes"
+
+	"k8s.io/klog"
 )
 
 var (
 	invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
 )
 
-func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string) {
-	labelKeys := make([]string, 0, len(labels))
-	for k := range labels {
-		labelKeys = append(labelKeys, k)
-	}
-	sort.Strings(labelKeys)
-
-	labelValues := make([]string, 0, len(labels))
-	for i, k := range labelKeys {
-		labelKeys[i] = "label_" + SanitizeLabelName(k)
-		labelValues = append(labelValues, labels[k])
-	}
-	return labelKeys, labelValues
-}
-
-func SanitizeLabelName(s string) string {
-	return invalidLabelCharRE.ReplaceAllString(s, "_")
-}
+//--------------------------------------------------------------------------
+//  StatefulsetCollector
+//--------------------------------------------------------------------------
 
+// StatefulsetCollector is a prometheus collector that generates StatefulsetMetrics
 type StatefulsetCollector struct {
 	KubeClientSet kubernetes.Interface
 }
 
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
 func (sc StatefulsetCollector) Describe(ch chan<- *prometheus.Desc) {
 	ch <- prometheus.NewDesc("statefulSet_match_labels", "statfulSet match labels", []string{}, nil)
 }
 
-type DeploymentCollector struct {
-	KubeClientSet kubernetes.Interface
+// Collect is called by the Prometheus registry when collecting metrics.
+func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
+	ds, _ := sc.KubeClientSet.AppsV1().StatefulSets("").List(metav1.ListOptions{})
+	for _, statefulset := range ds.Items {
+		labels, values := kubeLabelsToPrometheusLabels(statefulset.Spec.Selector.MatchLabels)
+		m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
+		ch <- m
+	}
 }
 
-func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
-	ch <- prometheus.NewInvalidDesc(nil)
+//--------------------------------------------------------------------------
+//  StatefulsetMetric
+//--------------------------------------------------------------------------
+
+// StatefulsetMetric is a prometheus.Metric used to encode statefulset match labels
+type StatefulsetMetric struct {
+	fqName          string
+	help            string
+	labelNames      []string
+	labelValues     []string
+	statefulsetName string
+	namespace       string
 }
 
+// Creates a new StatefulsetMetric, implementation of prometheus.Metric
 func newStatefulsetMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) StatefulsetMetric {
 	return StatefulsetMetric{
 		fqName:          fqname,
@@ -60,20 +77,15 @@ func newStatefulsetMetric(name, namespace, fqname string, labelNames []string, l
 	}
 }
 
-type StatefulsetMetric struct {
-	fqName          string
-	help            string
-	labelNames      []string
-	labelValues     []string
-	statefulsetName string
-	namespace       string
-}
-
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
 func (s StatefulsetMetric) Desc() *prometheus.Desc {
 	l := prometheus.Labels{"statefulSet": s.statefulsetName, "namespace": s.namespace}
 	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
 }
 
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
 func (s StatefulsetMetric) Write(m *dto.Metric) error {
 	h := float64(1)
 	m.Gauge = &dto.Gauge{
@@ -100,15 +112,46 @@ func (s StatefulsetMetric) Write(m *dto.Metric) error {
 	return nil
 }
 
-func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
-	ds, _ := sc.KubeClientSet.AppsV1().StatefulSets("").List(metav1.ListOptions{})
-	for _, statefulset := range ds.Items {
-		labels, values := kubeLabelsToPrometheusLabels(statefulset.Spec.Selector.MatchLabels)
-		m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
+//--------------------------------------------------------------------------
+//  DeploymentCollector
+//--------------------------------------------------------------------------
+
+// DeploymentCollector is a prometheus collector that generates DeploymentMetrics
+type DeploymentCollector struct {
+	KubeClientSet kubernetes.Interface
+}
+
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
+func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("deployment_match_labels", "deployment match labels", []string{}, nil)
+}
+
+// Collect is called by the Prometheus registry when collecting metrics.
+func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
+	ds, _ := sc.KubeClientSet.AppsV1().Deployments("").List(metav1.ListOptions{})
+	for _, deployment := range ds.Items {
+		labels, values := kubeLabelsToPrometheusLabels(deployment.Spec.Selector.MatchLabels)
+		m := newDeploymentMetric(deployment.GetName(), deployment.GetNamespace(), "deployment_match_labels", labels, values)
 		ch <- m
 	}
 }
 
+//--------------------------------------------------------------------------
+//  DeploymentMetric
+//--------------------------------------------------------------------------
+
+// DeploymentMetric is a prometheus.Metric used to encode deployment match labels
+type DeploymentMetric struct {
+	fqName         string
+	help           string
+	labelNames     []string
+	labelValues    []string
+	deploymentName string
+	namespace      string
+}
+
+// Creates a new DeploymentMetric, implementation of prometheus.Metric
 func newDeploymentMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) DeploymentMetric {
 	return DeploymentMetric{
 		fqName:         fqname,
@@ -120,20 +163,15 @@ func newDeploymentMetric(name, namespace, fqname string, labelNames []string, la
 	}
 }
 
-type DeploymentMetric struct {
-	fqName         string
-	help           string
-	labelNames     []string
-	labelValues    []string
-	deploymentName string
-	namespace      string
-}
-
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
 func (s DeploymentMetric) Desc() *prometheus.Desc {
 	l := prometheus.Labels{"deployment": s.deploymentName, "namespace": s.namespace}
 	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
 }
 
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
 func (s DeploymentMetric) Write(m *dto.Metric) error {
 	h := float64(1)
 	m.Gauge = &dto.Gauge{
@@ -160,34 +198,36 @@ func (s DeploymentMetric) Write(m *dto.Metric) error {
 	return nil
 }
 
-func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
-	ds, _ := sc.KubeClientSet.AppsV1().Deployments("").List(metav1.ListOptions{})
-	for _, deployment := range ds.Items {
-		labels, values := kubeLabelsToPrometheusLabels(deployment.Spec.Selector.MatchLabels)
-		m := newDeploymentMetric(deployment.GetName(), deployment.GetNamespace(), "deployment_match_labels", labels, values)
-		ch <- m
-	}
-}
+//--------------------------------------------------------------------------
+//  ServiceCollector
+//--------------------------------------------------------------------------
 
+// ServiceCollector is a prometheus collector that generates ServiceMetrics
 type ServiceCollector struct {
 	KubeClientSet kubernetes.Interface
 }
 
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
 func (sc ServiceCollector) Describe(ch chan<- *prometheus.Desc) {
-	return
+	ch <- prometheus.NewDesc("service_selector_labels", "service selector labels", []string{}, nil)
 }
 
-func newServiceMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) ServiceMetric {
-	return ServiceMetric{
-		fqName:      fqname,
-		labelNames:  labelNames,
-		labelValues: labelvalues,
-		help:        "service_selector_labels Service Selector Labels",
-		serviceName: name,
-		namespace:   namespace,
+// Collect is called by the Prometheus registry when collecting metrics.
+func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
+	svcs, _ := sc.KubeClientSet.CoreV1().Services("").List(metav1.ListOptions{})
+	for _, svc := range svcs.Items {
+		labels, values := kubeLabelsToPrometheusLabels(svc.Spec.Selector)
+		m := newServiceMetric(svc.GetName(), svc.GetNamespace(), "service_selector_labels", labels, values)
+		ch <- m
 	}
 }
 
+//--------------------------------------------------------------------------
+//  ServiceMetric
+//--------------------------------------------------------------------------
+
+// ServiceMetric is a prometheus.Metric used to encode service selector labels
 type ServiceMetric struct {
 	fqName      string
 	help        string
@@ -197,11 +237,27 @@ type ServiceMetric struct {
 	namespace   string
 }
 
+// Creates a new ServiceMetric, implementation of prometheus.Metric
+func newServiceMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) ServiceMetric {
+	return ServiceMetric{
+		fqName:      fqname,
+		labelNames:  labelNames,
+		labelValues: labelvalues,
+		help:        "service_selector_labels Service Selector Labels",
+		serviceName: name,
+		namespace:   namespace,
+	}
+}
+
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
 func (s ServiceMetric) Desc() *prometheus.Desc {
 	l := prometheus.Labels{"service": s.serviceName, "namespace": s.namespace}
 	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
 }
 
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
 func (s ServiceMetric) Write(m *dto.Metric) error {
 	h := float64(1)
 	m.Gauge = &dto.Gauge{
@@ -228,11 +284,419 @@ func (s ServiceMetric) Write(m *dto.Metric) error {
 	return nil
 }
 
-func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
-	svcs, _ := sc.KubeClientSet.CoreV1().Services("").List(metav1.ListOptions{})
-	for _, svc := range svcs.Items {
-		labels, values := kubeLabelsToPrometheusLabels(svc.Spec.Selector)
-		m := newServiceMetric(svc.GetName(), svc.GetNamespace(), "service_selector_labels", labels, values)
-		ch <- m
+//--------------------------------------------------------------------------
+//  ClusterInfoCollector
+//--------------------------------------------------------------------------
+
+// ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
+type ClusterInfoCollector struct {
+	Cloud         costAnalyzerCloud.Provider
+	KubeClientSet kubernetes.Interface
+}
+
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
+func (cic ClusterInfoCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("kubecost_cluster_info", "Kubecost Cluster Info", []string{}, nil)
+}
+
+// Collect is called by the Prometheus registry when collecting metrics.
+func (cic ClusterInfoCollector) Collect(ch chan<- prometheus.Metric) {
+	clusterInfo := GetClusterInfo(cic.KubeClientSet, cic.Cloud)
+	labels := prom.MapToLabels(clusterInfo)
+
+	m := newClusterInfoMetric("kubecost_cluster_info", labels)
+	ch <- m
+}
+
+//--------------------------------------------------------------------------
+//  ClusterInfoMetric
+//--------------------------------------------------------------------------
+
+// ClusterInfoMetric is a prometheus.Metric used to encode the local cluster info
+type ClusterInfoMetric struct {
+	fqName string
+	help   string
+	labels map[string]string
+}
+
+// Creates a new ClusterInfoMetric, implementation of prometheus.Metric
+func newClusterInfoMetric(fqName string, labels map[string]string) ClusterInfoMetric {
+	return ClusterInfoMetric{
+		fqName: fqName,
+		labels: labels,
+		help:   "kubecost_cluster_info ClusterInfo",
+	}
+}
+
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
+func (cim ClusterInfoMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{}
+	return prometheus.NewDesc(cim.fqName, cim.help, prom.LabelNamesFrom(cim.labels), l)
+}
+
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
+func (cim ClusterInfoMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+	var labels []*dto.LabelPair
+	for k, v := range cim.labels {
+		labels = append(labels, &dto.LabelPair{
+			Name:  toStringPtr(k),
+			Value: toStringPtr(v),
+		})
+	}
+	m.Label = labels
+	return nil
+}
+
+// toStringPtr is used to create a new string pointer from iteration vars
+func toStringPtr(s string) *string {
+	return &s
+}
+
+//--------------------------------------------------------------------------
+//  Package Functions
+//--------------------------------------------------------------------------
+
+var (
+	recordingLock     sync.Mutex
+	recordingStopping bool
+	recordingStop     chan bool
+)
+
+// Checks to see if there is a metric recording stop channel. If it exists, a new
+// channel is not created and false is returned. If it doesn't exist, a new channel
+// is created and true is returned.
+func checkOrCreateRecordingChan() bool {
+	recordingLock.Lock()
+	defer recordingLock.Unlock()
+
+	if recordingStop != nil {
+		return false
+	}
+
+	recordingStop = make(chan bool, 1)
+	return true
+}
+
+// IsCostModelMetricRecordingRunning returns true if metric recording is still running.
+func IsCostModelMetricRecordingRunning() bool {
+	recordingLock.Lock()
+	defer recordingLock.Unlock()
+
+	return recordingStop != nil
+}
+
+// StartCostModelMetricRecording starts the go routine that emits metrics used to determine
+// cluster costs.
+func StartCostModelMetricRecording(a *Accesses) bool {
+	// Check to see if we're already recording
+	// This function will create the stop recording channel and return true
+	// if it doesn't exist.
+	if !checkOrCreateRecordingChan() {
+		log.Errorf("Attempted to start cost model metric recording when it's already running.")
+		return false
+	}
+
+	go func() {
+		defer errors.HandlePanic()
+
+		containerSeen := make(map[string]bool)
+		nodeSeen := make(map[string]bool)
+		pvSeen := make(map[string]bool)
+		pvcSeen := make(map[string]bool)
+
+		getKeyFromLabelStrings := func(labels ...string) string {
+			return strings.Join(labels, ",")
+		}
+		getLabelStringsFromKey := func(key string) []string {
+			return strings.Split(key, ",")
+		}
+
+		var defaultRegion string = ""
+		nodeList := a.Model.Cache.GetAllNodes()
+		if len(nodeList) > 0 {
+			defaultRegion = nodeList[0].Labels[v1.LabelZoneRegion]
+		}
+
+		for {
+			klog.V(4).Info("Recording prices...")
+			podlist := a.Model.Cache.GetAllPods()
+			podStatus := make(map[string]v1.PodPhase)
+			for _, pod := range podlist {
+				podStatus[pod.Name] = pod.Status.Phase
+			}
+
+			cfg, _ := a.Cloud.GetConfig()
+
+			provisioner, clusterManagementCost, err := a.Cloud.ClusterManagementPricing()
+			if err != nil {
+				klog.V(1).Infof("Error getting cluster management cost %s", err.Error())
+			}
+			a.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
+
+			// Record network pricing at global scope
+			networkCosts, err := a.Cloud.NetworkPricing()
+			if err != nil {
+				klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
+			} else {
+				a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
+				a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
+				a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
+			}
+
+			data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
+			if err != nil {
+				klog.V(1).Info("Error in price recording: " + err.Error())
+				// zero the for loop so the time.Sleep will still work
+				data = map[string]*CostData{}
+			}
+
+			nodes, err := a.Model.GetNodeCost(a.Cloud)
+			for nodeName, node := range nodes {
+				// Emit costs, guarding against NaN inputs for custom pricing.
+				cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
+				if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
+					cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
+					if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
+						cpuCost = 0
+					}
+				}
+				cpu, _ := strconv.ParseFloat(node.VCPU, 64)
+				if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
+					cpu = 1 // Assume 1 CPU
+				}
+				ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
+				if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
+					ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
+					if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
+						ramCost = 0
+					}
+				}
+				ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
+				if math.IsNaN(ram) || math.IsInf(ram, 0) {
+					ram = 0
+				}
+				gpu, _ := strconv.ParseFloat(node.GPU, 64)
+				if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
+					gpu = 0
+				}
+				gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
+				if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
+					gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
+					if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
+						gpuCost = 0
+					}
+				}
+				nodeType := node.InstanceType
+				nodeRegion := node.Region
+
+				totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
+
+				a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
+				a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
+				a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
+				a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
+				if node.IsSpot() {
+					a.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(1.0)
+				} else {
+					a.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(0.0)
+				}
+				labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
+				nodeSeen[labelKey] = true
+			}
+
+			for _, costs := range data {
+				nodeName := costs.NodeName
+
+				namespace := costs.Namespace
+				podName := costs.PodName
+				containerName := costs.Name
+
+				if costs.PVCData != nil {
+					for _, pvc := range costs.PVCData {
+						if pvc.Volume != nil {
+							timesClaimed := pvc.TimesClaimed
+							if timesClaimed == 0 {
+								timesClaimed = 1 // unallocated PVs are unclaimed but have a full allocation
+							}
+							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value / float64(pvc.TimesClaimed))
+							labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
+							pvcSeen[labelKey] = true
+						}
+					}
+				}
+
+				if len(costs.RAMAllocation) > 0 {
+					a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
+				}
+				if len(costs.CPUAllocation) > 0 {
+					a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
+				}
+				if len(costs.GPUReq) > 0 {
+					// allocation here is set to the request because shared GPU usage not yet supported.
+					a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
+				}
+				labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
+				if podStatus[podName] == v1.PodRunning { // Only report data for current pods
+					containerSeen[labelKey] = true
+				} else {
+					containerSeen[labelKey] = false
+				}
+
+				storageClasses := a.Model.Cache.GetAllStorageClasses()
+				storageClassMap := make(map[string]map[string]string)
+				for _, storageClass := range storageClasses {
+					params := storageClass.Parameters
+					storageClassMap[storageClass.ObjectMeta.Name] = params
+					if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
+						storageClassMap["default"] = params
+						storageClassMap[""] = params
+					}
+				}
+
+				pvs := a.Model.Cache.GetAllPersistentVolumes()
+				for _, pv := range pvs {
+					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
+					if !ok {
+						klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
+					}
+					var region string
+					if r, ok := pv.Labels[v1.LabelZoneRegion]; ok {
+						region = r
+					} else {
+						region = defaultRegion
+					}
+					cacPv := &costAnalyzerCloud.PV{
+						Class:      pv.Spec.StorageClassName,
+						Region:     region,
+						Parameters: parameters,
+					}
+					GetPVCost(cacPv, pv, a.Cloud, region)
+					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
+					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
+					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
+					pvSeen[labelKey] = true
+				}
+			}
+			for labelString, seen := range nodeSeen {
+				if !seen {
+					klog.Infof("Removing %s from nodes", labelString)
+					labels := getLabelStringsFromKey(labelString)
+					ok := a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from totalprice", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
+					}
+					ok = a.NodeSpotRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from spot records", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from spot records", labelString)
+					}
+					ok = a.CPUPriceRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from cpuprice", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
+					}
+					ok = a.GPUPriceRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from gpuprice", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
+					}
+					ok = a.RAMPriceRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from ramprice", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
+					}
+					delete(nodeSeen, labelString)
+				} else {
+					nodeSeen[labelString] = false
+				}
+			}
+			for labelString, seen := range containerSeen {
+				if !seen {
+					labels := getLabelStringsFromKey(labelString)
+					a.RAMAllocationRecorder.DeleteLabelValues(labels...)
+					a.CPUAllocationRecorder.DeleteLabelValues(labels...)
+					a.GPUAllocationRecorder.DeleteLabelValues(labels...)
+					delete(containerSeen, labelString)
+				} else {
+					containerSeen[labelString] = false
+				}
+			}
+			for labelString, seen := range pvSeen {
+				if !seen {
+					labels := getLabelStringsFromKey(labelString)
+					a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
+					delete(pvSeen, labelString)
+				} else {
+					pvSeen[labelString] = false
+				}
+			}
+			for labelString, seen := range pvcSeen {
+				if !seen {
+					labels := getLabelStringsFromKey(labelString)
+					a.PVAllocationRecorder.DeleteLabelValues(labels...)
+					delete(pvcSeen, labelString)
+				} else {
+					pvcSeen[labelString] = false
+				}
+			}
+
+			select {
+			case <-time.After(time.Minute):
+			case <-recordingStop:
+				recordingLock.Lock()
+				recordingStopping = false
+				recordingStop = nil
+				recordingLock.Unlock()
+				return
+			}
+		}
+	}()
+
+	return true
+}
+
+// StopCostModelMetricRecording halts the metrics emission loop after the current emission is completed
+// or if the emission is paused.
+func StopCostModelMetricRecording() {
+	recordingLock.Lock()
+	defer recordingLock.Unlock()
+
+	if !recordingStopping && recordingStop != nil {
+		recordingStopping = true
+		close(recordingStop)
+	}
+}
+
+// Converts kubernetes labels into prometheus labels.
+func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string) {
+	labelKeys := make([]string, 0, len(labels))
+	for k := range labels {
+		labelKeys = append(labelKeys, k)
 	}
+	sort.Strings(labelKeys)
+
+	labelValues := make([]string, 0, len(labels))
+	for i, k := range labelKeys {
+		labelKeys[i] = "label_" + SanitizeLabelName(k)
+		labelValues = append(labelValues, labels[k])
+	}
+	return labelKeys, labelValues
+}
+
+// Replaces all illegal prometheus label characters with _
+func SanitizeLabelName(s string) string {
+	return invalidLabelCharRE.ReplaceAllString(s, "_")
 }

+ 38 - 275
pkg/costmodel/router.go

@@ -5,7 +5,6 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
-	"math"
 	"net"
 	"net/http"
 	"reflect"
@@ -46,11 +45,6 @@ const (
 var (
 	// gitCommit is set by the build system
 	gitCommit                       string
-	logCollectionEnabled            bool   = env.IsLogCollectionEnabled()
-	productAnalyticsEnabled         bool   = env.IsProductAnalyticsEnabled()
-	errorReportingEnabled           bool   = env.IsErrorReportingEnabled()
-	valuesReportingEnabled          bool   = env.IsValuesReportingEnabled()
-	clusterProfile                  string = env.GetClusterProfile()
 	multiclusterDBBasicAuthUsername string = env.GetMultiClusterBasicAuthUsername()
 	multiclusterDBBasicAuthPW       string = env.GetMultiClusterBasicAuthPassword()
 )
@@ -69,10 +63,12 @@ type Accesses struct {
 	PersistentVolumePriceRecorder *prometheus.GaugeVec
 	GPUPriceRecorder              *prometheus.GaugeVec
 	NodeTotalPriceRecorder        *prometheus.GaugeVec
+	NodeSpotRecorder              *prometheus.GaugeVec
 	RAMAllocationRecorder         *prometheus.GaugeVec
 	CPUAllocationRecorder         *prometheus.GaugeVec
 	GPUAllocationRecorder         *prometheus.GaugeVec
 	PVAllocationRecorder          *prometheus.GaugeVec
+	ClusterManagementCostRecorder *prometheus.GaugeVec
 	NetworkZoneEgressRecorder     prometheus.Gauge
 	NetworkRegionEgressRecorder   prometheus.Gauge
 	NetworkInternetEgressRecorder prometheus.Gauge
@@ -168,14 +164,6 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
-// writeReportingFlags writes the reporting flags to the cluster info map
-func writeReportingFlags(clusterInfo map[string]string) {
-	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
-	clusterInfo["productAnalytics"] = fmt.Sprintf("%t", productAnalyticsEnabled)
-	clusterInfo["errorReporting"] = fmt.Sprintf("%t", errorReportingEnabled)
-	clusterInfo["valuesReporting"] = fmt.Sprintf("%t", valuesReportingEnabled)
-}
-
 // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
 // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
 // return 0.0.
@@ -342,7 +330,22 @@ func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httpr
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 
-	data, err := ComputeClusterCosts(a.PrometheusClient, a.Cloud, window, offset, true)
+	useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
+
+	if useThanos && !thanos.IsEnabled() {
+		w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
+		return
+	}
+
+	var client prometheusClient.Client
+	if useThanos {
+		client = a.ThanosClient
+		offset = thanos.Offset()
+	} else {
+		client = a.PrometheusClient
+	}
+
+	data, err := ComputeClusterCosts(client, a.Cloud, window, offset, true)
 	w.Write(WrapData(data, err))
 }
 
@@ -612,36 +615,9 @@ func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httpro
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	data, err := p.Cloud.ClusterInfo()
-
-	kc, ok := p.KubeClientSet.(*kubernetes.Clientset)
-	if ok && data != nil {
-		v, err := kc.ServerVersion()
-		if err != nil {
-			klog.Infof("Could not get k8s version info: %s", err.Error())
-		} else if v != nil {
-			data["version"] = v.Major + "." + v.Minor
-		}
-	} else {
-		klog.Infof("Could not get k8s version info: %s", err.Error())
-	}
-
-	// Ensure we create the info object if it doesn't exist
-	if data == nil {
-		data = make(map[string]string)
-	}
-
-	data["clusterProfile"] = clusterProfile
-
-	// Include Product Reporting Flags with Cluster Info
-	writeReportingFlags(data)
-
-	// Include Thanos Offset Duration if Applicable
-	if thanos.IsEnabled() {
-		data["thanosOffset"] = thanos.Offset()
-	}
+	data := GetClusterInfo(p.KubeClientSet, p.Cloud)
 
-	w.Write(WrapData(data, err))
+	w.Write(WrapData(data, nil))
 }
 
 func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
@@ -657,236 +633,6 @@ func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request,
 	w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
 }
 
-func (a *Accesses) recordPrices() {
-	go func() {
-		defer errors.HandlePanic()
-
-		containerSeen := make(map[string]bool)
-		nodeSeen := make(map[string]bool)
-		pvSeen := make(map[string]bool)
-		pvcSeen := make(map[string]bool)
-
-		getKeyFromLabelStrings := func(labels ...string) string {
-			return strings.Join(labels, ",")
-		}
-		getLabelStringsFromKey := func(key string) []string {
-			return strings.Split(key, ",")
-		}
-
-		var defaultRegion string = ""
-		nodeList := a.Model.Cache.GetAllNodes()
-		if len(nodeList) > 0 {
-			defaultRegion = nodeList[0].Labels[v1.LabelZoneRegion]
-		}
-
-		for {
-			klog.V(4).Info("Recording prices...")
-			podlist := a.Model.Cache.GetAllPods()
-			podStatus := make(map[string]v1.PodPhase)
-			for _, pod := range podlist {
-				podStatus[pod.Name] = pod.Status.Phase
-			}
-
-			cfg, _ := a.Cloud.GetConfig()
-
-			// Record network pricing at global scope
-			networkCosts, err := a.Cloud.NetworkPricing()
-			if err != nil {
-				klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
-			} else {
-				a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
-				a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
-				a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
-			}
-
-			data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
-			if err != nil {
-				klog.V(1).Info("Error in price recording: " + err.Error())
-				// zero the for loop so the time.Sleep will still work
-				data = map[string]*CostData{}
-			}
-
-			nodes, err := a.Model.GetNodeCost(a.Cloud)
-			for nodeName, node := range nodes {
-				// Emit costs, guarding against NaN inputs for custom pricing.
-				cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
-				if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
-					cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
-					if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
-						cpuCost = 0
-					}
-				}
-				cpu, _ := strconv.ParseFloat(node.VCPU, 64)
-				if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
-					cpu = 1 // Assume 1 CPU
-				}
-				ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
-				if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
-					ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
-					if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
-						ramCost = 0
-					}
-				}
-				ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
-				if math.IsNaN(ram) || math.IsInf(ram, 0) {
-					ram = 0
-				}
-				gpu, _ := strconv.ParseFloat(node.GPU, 64)
-				if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
-					gpu = 0
-				}
-				gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
-				if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
-					gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
-					if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
-						gpuCost = 0
-					}
-				}
-				nodeType := node.InstanceType
-				nodeRegion := node.Region
-
-				totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
-
-				a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
-				a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
-				a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
-				a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
-				labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
-				nodeSeen[labelKey] = true
-			}
-
-			for _, costs := range data {
-				nodeName := costs.NodeName
-
-				namespace := costs.Namespace
-				podName := costs.PodName
-				containerName := costs.Name
-
-				if costs.PVCData != nil {
-					for _, pvc := range costs.PVCData {
-						if pvc.Volume != nil {
-							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
-							labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
-							pvcSeen[labelKey] = true
-						}
-					}
-				}
-
-				if len(costs.RAMAllocation) > 0 {
-					a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
-				}
-				if len(costs.CPUAllocation) > 0 {
-					a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
-				}
-				if len(costs.GPUReq) > 0 {
-					// allocation here is set to the request because shared GPU usage not yet supported.
-					a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
-				}
-				labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
-				if podStatus[podName] == v1.PodRunning { // Only report data for current pods
-					containerSeen[labelKey] = true
-				} else {
-					containerSeen[labelKey] = false
-				}
-
-				storageClasses := a.Model.Cache.GetAllStorageClasses()
-				storageClassMap := make(map[string]map[string]string)
-				for _, storageClass := range storageClasses {
-					params := storageClass.Parameters
-					storageClassMap[storageClass.ObjectMeta.Name] = params
-					if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
-						storageClassMap["default"] = params
-						storageClassMap[""] = params
-					}
-				}
-
-				pvs := a.Model.Cache.GetAllPersistentVolumes()
-				for _, pv := range pvs {
-					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
-					if !ok {
-						klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
-					}
-					var region string
-					if r, ok := pv.Labels[v1.LabelZoneRegion]; ok {
-						region = r
-					} else {
-						region = defaultRegion
-					}
-					cacPv := &costAnalyzerCloud.PV{
-						Class:      pv.Spec.StorageClassName,
-						Region:     region,
-						Parameters: parameters,
-					}
-					GetPVCost(cacPv, pv, a.Cloud, region)
-					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
-					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
-					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
-					pvSeen[labelKey] = true
-				}
-			}
-			for labelString, seen := range nodeSeen {
-				if !seen {
-					klog.Infof("Removing %s from nodes", labelString)
-					labels := getLabelStringsFromKey(labelString)
-					ok := a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
-					if ok {
-						klog.Infof("removed %s from totalprice", labelString)
-					} else {
-						klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
-					}
-					ok = a.CPUPriceRecorder.DeleteLabelValues(labels...)
-					if ok {
-						klog.Infof("removed %s from cpuprice", labelString)
-					} else {
-						klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
-					}
-					ok = a.GPUPriceRecorder.DeleteLabelValues(labels...)
-					if ok {
-						klog.Infof("removed %s from gpuprice", labelString)
-					} else {
-						klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
-					}
-					ok = a.RAMPriceRecorder.DeleteLabelValues(labels...)
-					if ok {
-						klog.Infof("removed %s from ramprice", labelString)
-					} else {
-						klog.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
-					}
-					delete(nodeSeen, labelString)
-				}
-				nodeSeen[labelString] = false
-			}
-			for labelString, seen := range containerSeen {
-				if !seen {
-					labels := getLabelStringsFromKey(labelString)
-					a.RAMAllocationRecorder.DeleteLabelValues(labels...)
-					a.CPUAllocationRecorder.DeleteLabelValues(labels...)
-					a.GPUAllocationRecorder.DeleteLabelValues(labels...)
-					delete(containerSeen, labelString)
-				}
-				containerSeen[labelString] = false
-			}
-			for labelString, seen := range pvSeen {
-				if !seen {
-					labels := getLabelStringsFromKey(labelString)
-					a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
-					delete(pvSeen, labelString)
-				}
-				pvSeen[labelString] = false
-			}
-			for labelString, seen := range pvcSeen {
-				if !seen {
-					labels := getLabelStringsFromKey(labelString)
-					a.PVAllocationRecorder.DeleteLabelValues(labels...)
-					delete(pvcSeen, labelString)
-				}
-				pvcSeen[labelString] = false
-			}
-			time.Sleep(time.Minute)
-		}
-	}()
-}
-
 // Creates a new ClusterManager instance using a boltdb storage. If that fails,
 // then we fall back to a memory-only storage.
 func newClusterManager() *cm.ClusterManager {
@@ -1093,6 +839,11 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Help: "node_total_hourly_cost Total node cost per hour",
 	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
 
+	spotGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Name: "kubecost_node_is_spot",
+		Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
+	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
+
 	pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "pv_hourly_cost",
 		Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
@@ -1129,17 +880,23 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Name: "kubecost_network_internet_egress_cost",
 		Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
 	})
+	ClusterManagementCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Name: "kubecost_cluster_management_cost",
+		Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
+	}, []string{"provisioner_name"})
 
 	prometheus.MustRegister(cpuGv)
 	prometheus.MustRegister(ramGv)
 	prometheus.MustRegister(gpuGv)
 	prometheus.MustRegister(totalGv)
 	prometheus.MustRegister(pvGv)
+	prometheus.MustRegister(spotGv)
 	prometheus.MustRegister(RAMAllocation)
 	prometheus.MustRegister(CPUAllocation)
 	prometheus.MustRegister(PVAllocation)
 	prometheus.MustRegister(GPUAllocation)
 	prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
+	prometheus.MustRegister(ClusterManagementCostRecorder)
 	prometheus.MustRegister(ServiceCollector{
 		KubeClientSet: kubeClientset,
 	})
@@ -1149,6 +906,10 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	prometheus.MustRegister(StatefulsetCollector{
 		KubeClientSet: kubeClientset,
 	})
+	prometheus.MustRegister(ClusterInfoCollector{
+		KubeClientSet: kubeClientset,
+		Cloud:         cloudProvider,
+	})
 
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
@@ -1162,6 +923,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		RAMPriceRecorder:              ramGv,
 		GPUPriceRecorder:              gpuGv,
 		NodeTotalPriceRecorder:        totalGv,
+		NodeSpotRecorder:              spotGv,
 		RAMAllocationRecorder:         RAMAllocation,
 		CPUAllocationRecorder:         CPUAllocation,
 		GPUAllocationRecorder:         GPUAllocation,
@@ -1170,6 +932,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
+		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
 		Model:                         NewCostModel(k8sCache),
 		OutOfClusterCache:             outOfClusterCache,
 	}
@@ -1228,7 +991,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		klog.V(1).Info("Failed to download pricing data: " + err.Error())
 	}
 
-	A.recordPrices()
+	StartCostModelMetricRecording(&A)
 
 	managerEndpoints := cm.NewClusterManagerEndpoints(A.ClusterManager)
 

+ 69 - 0
pkg/prom/metrics.go

@@ -0,0 +1,69 @@
+package prom
+
+import (
+	"encoding/json"
+	"fmt"
+	"reflect"
+	"strings"
+)
+
+// AnyToLabels will create prometheus labels based on the fields of the interface
+// passed. Note that this method is quite expensive and should only be used when absolutely
+// necessary.
+func AnyToLabels(a interface{}) (map[string]string, error) {
+	val := reflect.ValueOf(a)
+	if val.Kind() == reflect.Map {
+		return MapToLabels(a), nil
+	}
+
+	b, e := json.Marshal(a)
+	if e != nil {
+		return nil, e
+	}
+
+	var m map[string]interface{}
+	e = json.Unmarshal(b, &m)
+	if e != nil {
+		return nil, e
+	}
+
+	return MapToLabels(m), nil
+}
+
+// MapToLabels accepts a map type, and will return a new map containing all the nested
+// fields separated by _ with string versions of the values.
+func MapToLabels(m interface{}) map[string]string {
+	val := reflect.ValueOf(m)
+	if val.Kind() != reflect.Map {
+		return map[string]string{}
+	}
+
+	r := make(map[string]string)
+
+	for _, k := range val.MapKeys() {
+		key := strings.ToLower(k.String())
+		v := val.MapIndex(k).Interface()
+
+		switch v.(type) {
+		case uint, uint8, uint16, uint32, uint64, int, int8, int16, int32, int64, string, bool, float32, float64:
+			r[key] = fmt.Sprintf("%+v", v)
+
+		default:
+			mm := MapToLabels(v)
+			for kk, vv := range mm {
+				r[fmt.Sprintf("%s_%s", key, kk)] = vv
+			}
+		}
+	}
+
+	return r
+}
+
+// LabelNamesFrom accepts a mapping of labels to values and returns the label names.
+func LabelNamesFrom(labels map[string]string) []string {
+	keys := []string{}
+	for key := range labels {
+		keys = append(keys, key)
+	}
+	return keys
+}

+ 54 - 7
pkg/prom/prom.go

@@ -5,10 +5,29 @@ import (
 	"net/http"
 	"net/url"
 
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 )
 
+// Creates a new prometheus client which limits the total number of concurrent outbound requests
+// allowed at a given moment.
+type RateLimitedPrometheusClient struct {
+	client   prometheus.Client
+	limiter  *util.Semaphore
+	requests *util.AtomicInt32
+	outbound *util.AtomicInt32
+	username string
+	password string
+}
+
+// requestCounter is used to determine if the prometheus client keeps track of
+// the concurrent outbound requests
+type requestCounter interface {
+	TotalRequests() int32
+	TotalOutboundRequests() int32
+}
+
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
 func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password string) (prometheus.Client, error) {
@@ -18,22 +37,41 @@ func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username
 	}
 
 	limiter := util.NewSemaphore(maxConcurrency)
+	requests := util.NewAtomicInt32(0)
+	outbound := util.NewAtomicInt32(0)
 
 	return &RateLimitedPrometheusClient{
 		client:   c,
 		limiter:  limiter,
+		requests: requests,
+		outbound: outbound,
 		username: username,
 		password: password,
 	}, nil
 }
 
-// Creates a new prometheus client which limits the total number of concurrent outbound requests
-// allowed at a given moment.
-type RateLimitedPrometheusClient struct {
-	client   prometheus.Client
-	limiter  *util.Semaphore
-	username string
-	password string
+// LogPrometheusClientState logs the current state, with respect to outbound requests, if that
+// information is available.
+func LogPrometheusClientState(client prometheus.Client) {
+	if rc, ok := client.(requestCounter); ok {
+		total := rc.TotalRequests()
+		outbound := rc.TotalOutboundRequests()
+		queued := total - outbound
+
+		log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
+	}
+}
+
+// TotalRequests returns the total number of requests that are either waiting to be sent and/or
+// are currently outbound.
+func (rlpc *RateLimitedPrometheusClient) TotalRequests() int32 {
+	return rlpc.requests.Get()
+}
+
+// TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
+// sent to the server and are awaiting response.
+func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int32 {
+	return rlpc.outbound.Get()
 }
 
 // Passthrough to the prometheus client API
@@ -46,8 +84,17 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 	if rlpc.username != "" {
 		req.SetBasicAuth(rlpc.username, rlpc.password)
 	}
+	// Increment the total request counter first
+	rlpc.requests.Increment()
+	defer rlpc.requests.Decrement()
+
+	// Acquire mutex based on concurrency limiter
 	rlpc.limiter.Acquire()
 	defer rlpc.limiter.Return()
 
+	// Increment outbound once mutex acquired
+	rlpc.outbound.Increment()
+	defer rlpc.outbound.Decrement()
+
 	return rlpc.client.Do(ctx, req)
 }

+ 72 - 20
pkg/prom/query.go

@@ -10,6 +10,7 @@ import (
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
@@ -54,19 +55,18 @@ func (ctx *Context) HasErrors() bool {
 func (ctx *Context) Query(query string) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go func(ctx *Context, resCh QueryResultsChan) {
-		defer errors.HandlePanic()
+	go runQuery(query, ctx, resCh, "")
 
-		raw, promErr := ctx.query(query)
-		ctx.ErrorCollector.Report(promErr)
+	return resCh
+}
 
-		results := NewQueryResults(query, raw)
-		if results.Error != nil {
-			ctx.ErrorCollector.Report(results.Error)
-		}
+// ProfileQuery returns a QueryResultsChan, then runs the given query with a profile
+// label and sends the results on the provided channel. Receiver is responsible for closing the
+// channel, preferably using the Read method.
+func (ctx *Context) ProfileQuery(query string, profileLabel string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
 
-		resCh <- results
-	}(ctx, resCh)
+	go runQuery(query, ctx, resCh, profileLabel)
 
 	return resCh
 }
@@ -85,6 +85,20 @@ func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
 	return resChs
 }
 
+// ProfileQueryAll returns one QueryResultsChan for each query provided, then runs
+// each ProfileQuery concurrently and returns results on each channel, respectively,
+// in the order they were provided; i.e. the response to queries[1] will be
+// sent on channel resChs[1].
+func (ctx *Context) ProfileQueryAll(queries ...string) []QueryResultsChan {
+	resChs := []QueryResultsChan{}
+
+	for _, q := range queries {
+		resChs = append(resChs, ctx.ProfileQuery(q, fmt.Sprintf("Query #%d", len(resChs)+1)))
+	}
+
+	return resChs
+}
+
 func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
 	raw, err := ctx.query(query)
 	if err != nil {
@@ -104,6 +118,27 @@ func (ctx *Context) QueryURL() *url.URL {
 	return ctx.Client.URL(epQuery, nil)
 }
 
+// runQuery executes the prometheus query asynchronously, collects results and
+// errors, and passes them through the results channel.
+func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel string) {
+	defer errors.HandlePanic()
+	startQuery := time.Now()
+
+	raw, promErr := ctx.query(query)
+	ctx.ErrorCollector.Report(promErr)
+
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		ctx.ErrorCollector.Report(results.Error)
+	}
+
+	if profileLabel != "" {
+		log.Profile(startQuery, profileLabel)
+	}
+
+	resCh <- results
+}
+
 func (ctx *Context) query(query string) (interface{}, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
@@ -139,19 +174,15 @@ func (ctx *Context) query(query string) (interface{}, error) {
 func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go func(ctx *Context, resCh QueryResultsChan) {
-		defer errors.HandlePanic()
+	go runQueryRange(query, start, end, step, ctx, resCh, "")
 
-		raw, promErr := ctx.queryRange(query, start, end, step)
-		ctx.ErrorCollector.Report(promErr)
+	return resCh
+}
 
-		results := NewQueryResults(query, raw)
-		if results.Error != nil {
-			ctx.ErrorCollector.Report(results.Error)
-		}
+func (ctx *Context) ProfileQueryRange(query string, start, end time.Time, step time.Duration, profileLabel string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
 
-		resCh <- results
-	}(ctx, resCh)
+	go runQueryRange(query, start, end, step, ctx, resCh, profileLabel)
 
 	return resCh
 }
@@ -175,6 +206,27 @@ func (ctx *Context) QueryRangeURL() *url.URL {
 	return ctx.Client.URL(epQueryRange, nil)
 }
 
+// runQueryRange executes the prometheus queryRange asynchronously, collects results and
+// errors, and passes them through the results channel.
+func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *Context, resCh QueryResultsChan, profileLabel string) {
+	defer errors.HandlePanic()
+	startQuery := time.Now()
+
+	raw, promErr := ctx.queryRange(query, start, end, step)
+	ctx.ErrorCollector.Report(promErr)
+
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		ctx.ErrorCollector.Report(results.Error)
+	}
+
+	if profileLabel != "" {
+		log.Profile(startQuery, profileLabel)
+	}
+
+	resCh <- results
+}
+
 func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
 	u := ctx.Client.URL(epQueryRange, nil)
 	q := u.Query()

+ 37 - 0
pkg/util/atomic.go

@@ -0,0 +1,37 @@
+package util
+
+import "sync/atomic"
+
+type AtomicInt32 int32
+
+// NewAtomicInt32 creates a new atomic int32 instance.
+func NewAtomicInt32(value int32) *AtomicInt32 {
+	ai := new(AtomicInt32)
+	ai.Set(value)
+	return ai
+}
+
+// Loads the int32 value atomically
+func (ai *AtomicInt32) Get() int32 {
+	return atomic.LoadInt32((*int32)(ai))
+}
+
+// Sets the int32 value atomically
+func (ai *AtomicInt32) Set(value int32) {
+	atomic.StoreInt32((*int32)(ai), value)
+}
+
+// Increments the atomic int and returns the new value
+func (ai *AtomicInt32) Increment() int32 {
+	return atomic.AddInt32((*int32)(ai), 1)
+}
+
+// Decrements the atomint int and returns the new value
+func (ai *AtomicInt32) Decrement() int32 {
+	return atomic.AddInt32((*int32)(ai), -1)
+}
+
+// CompareAndSet sets value to new if current is equal to the current value
+func (ai *AtomicInt32) CompareAndSet(current, new int32) bool {
+	return atomic.CompareAndSwapInt32((*int32)(ai), current, new)
+}

+ 28 - 0
test/clusterinfo_test.go

@@ -0,0 +1,28 @@
+package test
+
+import (
+	"encoding/json"
+	"testing"
+
+	"github.com/kubecost/cost-model/pkg/prom"
+)
+
+func TestClusterInfoLabels(t *testing.T) {
+	expected := map[string]bool{"clusterprofile": true, "errorreporting": true, "id": true, "logcollection": true, "name": true, "productanalytics": true, "provider": true, "provisioner": true, "remotereadenabled": true, "thanosenabled": true, "valuesreporting": true, "version": true}
+	clusterInfo := `{"clusterProfile":"production","errorReporting":"true","id":"cluster-one","logCollection":"true","name":"bolt-3","productAnalytics":"true","provider":"GCP","provisioner":"GKE","remoteReadEnabled":"false","thanosEnabled":"false","valuesReporting":"true","version":"1.14+"}`
+
+	var m map[string]interface{}
+	err := json.Unmarshal([]byte(clusterInfo), &m)
+	if err != nil {
+		t.Errorf("Error: %s", err)
+		return
+	}
+
+	labels := prom.MapToLabels(m)
+	for k := range expected {
+		if _, ok := labels[k]; !ok {
+			t.Errorf("Failed to locate key: \"%s\" in labels.", k)
+			return
+		}
+	}
+}