Просмотр исходного кода

Merge pull request #537 from kubecost/develop

Merge develop into master 1.65.0
Ajay Tripathy 5 лет назад
Родитель
Сommit
e90b6d52f4

+ 1 - 0
.dockerignore

@@ -0,0 +1 @@
+.git

+ 1 - 9
Dockerfile

@@ -11,17 +11,9 @@ RUN go mod download
 COPY . .
 # Build the binary
 RUN set -e ;\
-    GIT_COMMIT=`git rev-parse HEAD` ;\
-    GIT_DIRTY='' ;\
-    # for our purposes, we only care about dirty .go files ;\
-    if test -n "`git status --porcelain --untracked-files=no | grep '\.go'`"; then \
-      GIT_DIRTY='+dirty' ;\
-    fi ;\
     cd cmd/costmodel;\
     CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
-    go build -a -installsuffix cgo \
-        -ldflags "-X main.gitCommit=${GIT_COMMIT}${GIT_DIRTY}" \
-        -o /go/bin/app
+    go build -a -installsuffix cgo -o /go/bin/app
 
 FROM alpine:3.10.2
 RUN apk add --update --no-cache ca-certificates

+ 1 - 0
go.mod

@@ -17,6 +17,7 @@ require (
 	github.com/jszwec/csvutil v1.2.1
 	github.com/julienschmidt/httprouter v1.2.0
 	github.com/lib/pq v1.2.0
+	github.com/microcosm-cc/bluemonday v1.0.2
 	github.com/patrickmn/go-cache v2.1.0+incompatible
 	github.com/prometheus/client_golang v1.0.0
 	github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90

+ 2 - 0
go.sum

@@ -241,6 +241,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
 github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
 github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
+github.com/microcosm-cc/bluemonday v1.0.2 h1:5lPfLTTAvAbtS0VqT+94yOtFnGfUWYyx0+iToC3Os3s=
 github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
 github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
 github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
@@ -557,6 +558,7 @@ k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719/go.mod h1:I4A+glKBHiTgiEj
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6 h1:tGU1C/vMoUV2ZakSH6wQq2shk9KiFtjoH2vDDHlhpA4=
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6/go.mod h1:nL6pwRT8NgfF8TT68DBI8uEePRt89cSvoXUVqbkWHq4=
 k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
+k8s.io/apimachinery v0.19.0 h1:gjKnAda/HZp5k4xQYjL0K/Yb66IvNqjthCb03QlKpaQ=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5 h1:BwY2C//EoWktJi74O6R2REBonrhsfhRI0qfVwOjOPp8=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5/go.mod h1:bIEHXHbykaOlj+pgLllzLJ2RPGdzkjtqdk0Il07KPEM=
 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab h1:E8Fecph0qbNsAbijJJQryKu4Oi9QTp5cVpjTE+nqg6g=

+ 10 - 1
pkg/cloud/awsprovider.go

@@ -1770,6 +1770,13 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 	}
 	s := session.Must(session.NewSession(c))
 	svc := athena.New(s)
+	if customPricing.MasterPayerARN != "" {
+		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
+		svc = athena.New(s, &aws.Config{
+			Region:      region,
+			Credentials: creds,
+		})
+	}
 
 	var e athena.StartQueryExecutionInput
 
@@ -2412,7 +2419,9 @@ func (aws *AWS) ParseID(id string) string {
 	rx := regexp.MustCompile("aws://[^/]*/[^/]*/([^/]+)")
 	match := rx.FindStringSubmatch(id)
 	if len(match) < 2 {
-		log.Infof("awsprovider.ParseID: failed to parse %s", id)
+		if id != "" {
+			log.Infof("awsprovider.ParseID: failed to parse %s", id)
+		}
 		return id
 	}
 

+ 48 - 16
pkg/cloud/azureprovider.go

@@ -28,8 +28,11 @@ import (
 )
 
 const (
-	AzurePremiumStorageClass  = "premium"
-	AzureStandardStorageClass = "standard"
+	AzureFilePremiumStorageClass     = "premium_smb"
+	AzureFileStandardStorageClass    = "standard_smb"
+	AzureDiskPremiumSSDStorageClass  = "premium_ssd"
+	AzureDiskStandardSSDStorageClass = "standard_ssd"
+	AzureDiskStandardStorageClass    = "standard_hdd"
 )
 
 var (
@@ -478,12 +481,16 @@ func (az *Azure) DownloadPricingData() error {
 		if !strings.Contains(meterSubCategory, "Windows") {
 
 			if strings.Contains(meterCategory, "Storage") {
-				if strings.Contains(meterSubCategory, "HDD") || strings.Contains(meterSubCategory, "SSD") {
+				if strings.Contains(meterSubCategory, "HDD") || strings.Contains(meterSubCategory, "SSD") || strings.Contains(meterSubCategory, "Premium Files") {
 					var storageClass string = ""
-					if strings.Contains(meterName, "S4 ") {
-						storageClass = AzureStandardStorageClass
-					} else if strings.Contains(meterName, "P4 ") {
-						storageClass = AzurePremiumStorageClass
+					if strings.Contains(meterName, "P4 ") {
+						storageClass = AzureDiskPremiumSSDStorageClass
+					} else if strings.Contains(meterName, "E4 ") {
+						storageClass = AzureDiskStandardSSDStorageClass
+					} else if strings.Contains(meterName, "S4 ") {
+						storageClass = AzureDiskStandardStorageClass
+					} else if strings.Contains(meterName, "LRS Provisioned") {
+						storageClass = AzureFilePremiumStorageClass
 					}
 
 					if storageClass != "" {
@@ -514,11 +521,6 @@ func (az *Azure) DownloadPricingData() error {
 
 			if strings.Contains(meterCategory, "Virtual Machines") {
 
-				// not available now
-				if strings.Contains(meterSubCategory, "Promo") {
-					continue
-				}
-
 				usageType := ""
 				if !strings.Contains(meterName, "Low Priority") {
 					usageType = "ondemand"
@@ -530,6 +532,9 @@ func (az *Azure) DownloadPricingData() error {
 				name := strings.TrimSuffix(meterName, " Low Priority")
 				instanceType := strings.Split(name, "/")
 				for _, it := range instanceType {
+					if strings.Contains(meterSubCategory, "Promo") {
+						it = it + " Promo"
+					}
 					instanceTypes = append(instanceTypes, strings.Replace(it, " ", "_", 1))
 				}
 
@@ -561,6 +566,22 @@ func (az *Azure) DownloadPricingData() error {
 			}
 		}
 	}
+
+	// There is no easy way of supporting Standard Azure-File, because it's billed per used GB
+	// this will set the price to "0" as a workaround to not spam with `Persistent Volume pricing not found for` error
+	// check https://github.com/kubecost/cost-model/issues/159 for more information (same problem on AWS)
+	zeroPrice := "0.0"
+	for region := range regions {
+		key := region + "," + AzureFileStandardStorageClass
+		klog.V(4).Infof("Adding PV.Key: %s, Cost: %s", key, zeroPrice)
+		allPrices[key] = &AzurePricing{
+			PV: &PV{
+				Cost:   zeroPrice,
+				Region: region,
+			},
+		}
+	}
+
 	az.Pricing = allPrices
 	return nil
 }
@@ -651,10 +672,21 @@ func (key *azurePvKey) GetStorageClass() string {
 
 func (key *azurePvKey) Features() string {
 	storageClass := key.StorageClassParameters["storageaccounttype"]
-	if strings.EqualFold(storageClass, "Premium_LRS") {
-		storageClass = AzurePremiumStorageClass
-	} else if strings.EqualFold(storageClass, "Standard_LRS") {
-		storageClass = AzureStandardStorageClass
+	storageSKU := key.StorageClassParameters["skuName"]
+	if storageClass != "" {
+		if strings.EqualFold(storageClass, "Premium_LRS") {
+			storageClass = AzureDiskPremiumSSDStorageClass
+		} else if strings.EqualFold(storageClass, "StandardSSD_LRS") {
+			storageClass = AzureDiskStandardSSDStorageClass
+		} else if strings.EqualFold(storageClass, "Standard_LRS") {
+			storageClass = AzureDiskStandardStorageClass
+		}
+	} else {
+		if strings.EqualFold(storageSKU, "Premium_LRS") {
+			storageClass = AzureFilePremiumStorageClass
+		} else if strings.EqualFold(storageSKU, "Standard_LRS") {
+			storageClass = AzureFileStandardStorageClass
+		}
 	}
 	if region, ok := key.Labels[v1.LabelZoneRegion]; ok {
 		return region + "," + storageClass

+ 13 - 4
pkg/cloud/gcpprovider.go

@@ -166,6 +166,9 @@ func (gcp *GCP) GetConfig() (*CustomPricing, error) {
 	if c.NegotiatedDiscount == "" {
 		c.NegotiatedDiscount = "0%"
 	}
+	if c.CurrencyCode == "" {
+		c.CurrencyCode = "USD"
+	}
 	return c, nil
 }
 
@@ -575,7 +578,7 @@ type GCPPricing struct {
 type PricingInfo struct {
 	Summary                string             `json:"summary"`
 	PricingExpression      *PricingExpression `json:"pricingExpression"`
-	CurrencyConversionRate int                `json:"currencyConversionRate"`
+	CurrencyConversionRate float64            `json:"currencyConversionRate"`
 	EffectiveTime          string             `json:""`
 }
 
@@ -874,7 +877,11 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 
 func (gcp *GCP) parsePages(inputKeys map[string]Key, pvKeys map[string]PVKey) (map[string]*GCPPricing, error) {
 	var pages []map[string]*GCPPricing
-	url := "https://cloudbilling.googleapis.com/v1/services/6F81-5844-456A/skus?key=" + gcp.APIKey
+	c, err := gcp.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	url := "https://cloudbilling.googleapis.com/v1/services/6F81-5844-456A/skus?key=" + gcp.APIKey + "&currencyCode=" + c.CurrencyCode
 	klog.V(2).Infof("Fetch GCP Billing Data from URL: %s", url)
 	var parsePagesHelper func(string) error
 	parsePagesHelper = func(pageToken string) error {
@@ -894,7 +901,7 @@ func (gcp *GCP) parsePages(inputKeys map[string]Key, pvKeys map[string]PVKey) (m
 		pages = append(pages, page)
 		return parsePagesHelper(token)
 	}
-	err := parsePagesHelper("")
+	err = parsePagesHelper("")
 	if err != nil {
 		return nil, err
 	}
@@ -1424,7 +1431,9 @@ func (gcp *GCP) ParseID(id string) string {
 	rx := regexp.MustCompile("gce://[^/]*/[^/]*/([^/]+)")
 	match := rx.FindStringSubmatch(id)
 	if len(match) < 2 {
-		log.Infof("gcpprovider.ParseID: failed to parse %s", id)
+		if id != "" {
+			log.Infof("gcpprovider.ParseID: failed to parse %s", id)
+		}
 		return id
 	}
 

+ 4 - 1
pkg/cloud/providerconfig.go

@@ -11,10 +11,13 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/microcosm-cc/bluemonday"
 
 	"k8s.io/klog"
 )
 
+var sanitizePolicy = bluemonday.UGCPolicy()
+
 // ProviderConfig is a utility class that provides a thread-safe configuration
 // storage/cache for all Provider implementations
 type ProviderConfig struct {
@@ -122,7 +125,6 @@ func (pc *ProviderConfig) Update(updateFunc func(*CustomPricing) error) (*Custom
 	if err != nil {
 		return c, err
 	}
-
 	err = ioutil.WriteFile(pc.configPath, cj, 0644)
 
 	if err != nil {
@@ -188,6 +190,7 @@ func SetCustomPricingField(obj *CustomPricing, name string, value string) error
 	}
 
 	structFieldType := structFieldValue.Type()
+	value = sanitizePolicy.Sanitize(value)
 	val := reflect.ValueOf(value)
 	if structFieldType != val.Type() {
 		return fmt.Errorf("Provided value type didn't match custom pricing field type")

+ 1 - 1
pkg/clustermanager/clustermanager.go

@@ -222,7 +222,7 @@ func fromSecret(secretName string) (string, error) {
 		return "", fmt.Errorf("Failed to load secret: %s", file)
 	}
 
-	return string(data), nil
+	return base64.StdEncoding.EncodeToString(data), nil
 }
 
 func getAuth(auth *ClusterConfigEntryAuth) (string, error) {

+ 308 - 42
pkg/costmodel/cluster.go

@@ -111,6 +111,9 @@ type Disk struct {
 	Cost       float64
 	Bytes      float64
 	Local      bool
+	Start      time.Time
+	Minutes    float64
+	Breakdown  *ClusterCostsBreakdown
 }
 
 func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, []error) {
@@ -136,18 +139,28 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	ctx := prom.NewContext(client)
 	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+
 	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
 	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 	resChPVCost := ctx.Query(queryPVCost)
 	resChPVSize := ctx.Query(queryPVSize)
+	resChActiveMins := ctx.Query(queryActiveMins)
 	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
+	resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
 	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
+	resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
 
 	resPVCost, _ := resChPVCost.Await()
 	resPVSize, _ := resChPVSize.Await()
+	resActiveMins, _ := resChActiveMins.Await()
 	resLocalStorageCost, _ := resChLocalStorageCost.Await()
+	resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
 	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
+	resLocalActiveMins, _ := resChLocalActiveMins.Await()
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 	}
@@ -172,8 +185,9 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		diskMap[key].Cost += cost
@@ -197,8 +211,9 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		diskMap[key].Bytes = bytes
@@ -216,20 +231,44 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 			continue
 		}
 
-		// TODO niko/assets storage class?
-
 		cost := result.Values[0].Value
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
-				Cluster: cluster,
-				Name:    name,
-				Local:   true,
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
+				Local:     true,
 			}
 		}
 		diskMap[key].Cost += cost
 	}
 
+	for _, result := range resLocalStorageUsedCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterDisks: local storage usage data missing instance")
+			continue
+		}
+
+		cost := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
+				Local:     true,
+			}
+		}
+		diskMap[key].Breakdown.System = cost / diskMap[key].Cost
+	}
+
 	for _, result := range resLocalStorageBytes {
 		cluster, err := result.GetString("cluster_id")
 		if err != nil {
@@ -242,8 +281,6 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 			continue
 		}
 
-		// TODO niko/assets storage class
-
 		bytes := result.Values[0].Value
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
@@ -256,21 +293,94 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		diskMap[key].Bytes = bytes
 	}
 
+	for _, result := range resActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("persistentvolume")
+		if err != nil {
+			log.Warningf("ClusterDisks: active mins missing instance")
+			continue
+		}
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			log.Warningf("ClusterDisks: active mins for unidentified disk")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		diskMap[key].Start = s
+		diskMap[key].Minutes = mins
+	}
+
+	for _, result := range resLocalActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterDisks: local active mins data missing instance")
+			continue
+		}
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			log.Warningf("ClusterDisks: local active mins for unidentified disk")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		diskMap[key].Start = s
+		diskMap[key].Minutes = mins
+	}
+
+	for _, disk := range diskMap {
+		// Apply all remaining RAM to Idle
+		disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
+	}
+
 	return diskMap, nil
 }
 
 type Node struct {
-	Cluster     string
-	Name        string
-	ProviderID  string
-	NodeType    string
-	CPUCost     float64
-	CPUCores    float64
-	GPUCost     float64
-	RAMCost     float64
-	RAMBytes    float64
-	Discount    float64
-	Preemptible bool
+	Cluster      string
+	Name         string
+	ProviderID   string
+	NodeType     string
+	CPUCost      float64
+	CPUCores     float64
+	GPUCost      float64
+	RAMCost      float64
+	RAMBytes     float64
+	Discount     float64
+	Preemptible  bool
+	CPUBreakdown *ClusterCostsBreakdown
+	RAMBreakdown *ClusterCostsBreakdown
+	Start        time.Time
+	Minutes      float64
 }
 
 func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, []error) {
@@ -296,7 +406,11 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node, provider_id))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 	resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
 	resChNodeCPUCores := ctx.Query(queryNodeCPUCores)
@@ -304,6 +418,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
 	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
 	resChNodeLabels := ctx.Query(queryNodeLabels)
+	resChNodeCPUModePct := ctx.Query(queryNodeCPUModePct)
+	resChNodeRAMSystemPct := ctx.Query(queryNodeRAMSystemPct)
+	resChNodeRAMUserPct := ctx.Query(queryNodeRAMUserPct)
+	resChActiveMins := ctx.Query(queryActiveMins)
 
 	resNodeCPUCost, _ := resChNodeCPUCost.Await()
 	resNodeCPUCores, _ := resChNodeCPUCores.Await()
@@ -311,6 +429,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resNodeRAMCost, _ := resChNodeRAMCost.Await()
 	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
 	resNodeLabels, _ := resChNodeLabels.Await()
+	resNodeCPUModePct, _ := resChNodeCPUModePct.Await()
+	resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
+	resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
+	resActiveMins, _ := resChActiveMins.Await()
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 	}
@@ -337,15 +459,21 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster:    cluster,
-				Name:       name,
-				NodeType:   nodeType,
-				ProviderID: cp.ParseID(providerID),
+				Cluster:      cluster,
+				Name:         name,
+				NodeType:     nodeType,
+				ProviderID:   cp.ParseID(providerID),
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		nodeMap[key].CPUCost += cpuCost
 		nodeMap[key].NodeType = nodeType
 	}
+	partialCPUMap := make(map[string]float64)
+	partialCPUMap["e2-micro"] = 0.25
+	partialCPUMap["e2-small"] = 0.5
+	partialCPUMap["e2-medium"] = 1.0
 
 	for _, result := range resNodeCPUCores {
 		cluster, err := result.GetString("cluster_id")
@@ -364,11 +492,23 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:      cluster,
+				Name:         name,
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
-		nodeMap[key].CPUCores = cpuCores
+		node := nodeMap[key]
+		if v, ok := partialCPUMap[node.NodeType]; ok {
+			node.CPUCores = v
+			if cpuCores > 0 {
+				adjustmentFactor := v / cpuCores
+				node.CPUCost = node.CPUCost * adjustmentFactor
+			}
+		} else {
+			nodeMap[key].CPUCores = cpuCores
+		}
+
 	}
 
 	for _, result := range resNodeRAMCost {
@@ -391,10 +531,12 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster:    cluster,
-				Name:       name,
-				NodeType:   nodeType,
-				ProviderID: cp.ParseID(providerID),
+				Cluster:      cluster,
+				Name:         name,
+				NodeType:     nodeType,
+				ProviderID:   cp.ParseID(providerID),
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		nodeMap[key].RAMCost += ramCost
@@ -418,8 +560,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:      cluster,
+				Name:         name,
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		nodeMap[key].RAMBytes = ramBytes
@@ -445,15 +589,133 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster:    cluster,
-				Name:       name,
-				NodeType:   nodeType,
-				ProviderID: cp.ParseID(providerID),
+				Cluster:      cluster,
+				Name:         name,
+				NodeType:     nodeType,
+				ProviderID:   cp.ParseID(providerID),
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		nodeMap[key].GPUCost += gpuCost
 	}
 
+	for _, result := range resNodeCPUModePct {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("kubernetes_node")
+		if err != nil {
+			log.Warningf("ClusterNodes: CPU mode data missing node")
+			continue
+		}
+
+		mode, err := result.GetString("mode")
+		if err != nil {
+			log.Warningf("ClusterNodes: unable to read CPU mode: %s", err)
+			mode = "other"
+		}
+
+		pct := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("ClusterNodes: CPU mode data for unidentified node")
+			continue
+		}
+
+		switch mode {
+		case "idle":
+			nodeMap[key].CPUBreakdown.Idle += pct
+		case "system":
+			nodeMap[key].CPUBreakdown.System += pct
+		case "user":
+			nodeMap[key].CPUBreakdown.User += pct
+		default:
+			nodeMap[key].CPUBreakdown.Other += pct
+		}
+	}
+
+	for _, result := range resNodeRAMSystemPct {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM system percent missing node")
+			continue
+		}
+
+		pct := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("ClusterNodes: RAM system percent for unidentified node")
+			continue
+		}
+
+		nodeMap[key].RAMBreakdown.System += pct
+	}
+
+	for _, result := range resNodeRAMUserPct {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM system percent missing node")
+			continue
+		}
+
+		pct := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("ClusterNodes: RAM system percent for unidentified node")
+			continue
+		}
+
+		nodeMap[key].RAMBreakdown.User += pct
+	}
+
+	for _, result := range resActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: active mins missing node")
+			continue
+		}
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("ClusterNodes: active mins for unidentified node")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		nodeMap[key].Start = s
+		nodeMap[key].Minutes = mins
+	}
+
 	// Determine preemptibility with node labels
 	for _, result := range resNodeLabels {
 		nodeName, err := result.GetString("node")
@@ -462,13 +724,14 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		}
 
 		// GCP preemptible label
-		pre, _ := result.GetString("label_cloud_google_com_gke_preemptible")
+		pre := result.Values[0].Value
+
 		cluster, err := result.GetString("cluster_id")
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
 		key := fmt.Sprintf("%s/%s", cluster, nodeName)
-		if node, ok := nodeMap[key]; pre == "true" && ok {
+		if node, ok := nodeMap[key]; pre > 0.0 && ok {
 			node.Preemptible = true
 		}
 
@@ -495,6 +758,9 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	for _, node := range nodeMap {
 		// TODO take RI into account
 		node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
+
+		// Apply all remaining RAM to Idle
+		node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
 	}
 
 	return nodeMap, nil

+ 70 - 0
pkg/costmodel/clusterinfo.go

@@ -0,0 +1,70 @@
+package costmodel
+
+import (
+	"fmt"
+
+	cloudProvider "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/thanos"
+
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/klog"
+)
+
+var (
+	logCollectionEnabled    bool   = env.IsLogCollectionEnabled()
+	productAnalyticsEnabled bool   = env.IsProductAnalyticsEnabled()
+	errorReportingEnabled   bool   = env.IsErrorReportingEnabled()
+	valuesReportingEnabled  bool   = env.IsValuesReportingEnabled()
+	clusterProfile          string = env.GetClusterProfile()
+)
+
+// writeReportingFlags writes the reporting flags to the cluster info map
+func writeReportingFlags(clusterInfo map[string]string) {
+	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
+	clusterInfo["productAnalytics"] = fmt.Sprintf("%t", productAnalyticsEnabled)
+	clusterInfo["errorReporting"] = fmt.Sprintf("%t", errorReportingEnabled)
+	clusterInfo["valuesReporting"] = fmt.Sprintf("%t", valuesReportingEnabled)
+}
+
+// writeClusterProfile writes the data associated with the cluster profile
+func writeClusterProfile(clusterInfo map[string]string) {
+	clusterInfo["clusterProfile"] = clusterProfile
+}
+
+func writeThanosFlags(clusterInfo map[string]string) {
+	// Include Thanos Offset Duration if Applicable
+	clusterInfo["thanosEnabled"] = fmt.Sprintf("%t", thanos.IsEnabled())
+	if thanos.IsEnabled() {
+		clusterInfo["thanosOffset"] = thanos.Offset()
+	}
+}
+
+// GetClusterInfo provides specific information about the cluster cloud provider as well as
+// generic configuration values.
+func GetClusterInfo(kubeClient kubernetes.Interface, cloud cloudProvider.Provider) map[string]string {
+	data, err := cloud.ClusterInfo()
+
+	// Ensure we create the info object if it doesn't exist
+	if data == nil {
+		data = make(map[string]string)
+	}
+
+	kc, ok := kubeClient.(*kubernetes.Clientset)
+	if ok && data != nil {
+		v, err := kc.ServerVersion()
+		if err != nil {
+			klog.Infof("Could not get k8s version info: %s", err.Error())
+		} else if v != nil {
+			data["version"] = v.Major + "." + v.Minor
+		}
+	} else {
+		klog.Infof("Could not get k8s version info: %s", err.Error())
+	}
+
+	writeClusterProfile(data)
+	writeReportingFlags(data)
+	writeThanosFlags(data)
+
+	return data
+}

+ 15 - 9
pkg/costmodel/costmodel.go

@@ -448,7 +448,9 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 			}
 
 			for k, v := range nsLabels {
-				podLabels[k] = v
+				if _, ok := podLabels[k]; !ok {
+					podLabels[k] = v
+				}
 			}
 
 			nodeName := pod.Spec.NodeName
@@ -475,6 +477,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 					name := vol.PersistentVolumeClaim.ClaimName
 					key := ns + "," + name + "," + clusterID
 					if pvClaim, ok := pvClaimMapping[key]; ok {
+						pvClaim.TimesClaimed++
 						podPVs = append(podPVs, pvClaim)
 
 						// Remove entry from potential unmounted pvs
@@ -1926,7 +1929,9 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		}
 
 		for k, v := range namespaceLabels {
-			pLabels[k] = v
+			if _, ok := pLabels[k]; !ok {
+				pLabels[k] = v
+			}
 		}
 
 		var podDeployments []string
@@ -2170,13 +2175,14 @@ func getStatefulSetsOfPod(pod v1.Pod) []string {
 }
 
 type PersistentVolumeClaimData struct {
-	Class      string                `json:"class"`
-	Claim      string                `json:"claim"`
-	Namespace  string                `json:"namespace"`
-	ClusterID  string                `json:"clusterId"`
-	VolumeName string                `json:"volumeName"`
-	Volume     *costAnalyzerCloud.PV `json:"persistentVolume"`
-	Values     []*util.Vector        `json:"values"`
+	Class        string                `json:"class"`
+	Claim        string                `json:"claim"`
+	Namespace    string                `json:"namespace"`
+	ClusterID    string                `json:"clusterId"`
+	TimesClaimed int                   `json:"timesClaimed"`
+	VolumeName   string                `json:"volumeName"`
+	Volume       *costAnalyzerCloud.PV `json:"persistentVolume"`
+	Values       []*util.Vector        `json:"values"`
 }
 
 func measureTime(start time.Time, threshold time.Duration, name string) {

+ 92 - 1
pkg/costmodel/metrics.go

@@ -12,6 +12,7 @@ import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/prometheus/client_golang/prometheus"
 	dto "github.com/prometheus/client_model/go"
 	v1 "k8s.io/api/core/v1"
@@ -283,6 +284,81 @@ func (s ServiceMetric) Write(m *dto.Metric) error {
 	return nil
 }
 
+//--------------------------------------------------------------------------
+//  ClusterInfoCollector
+//--------------------------------------------------------------------------
+
+// ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
+type ClusterInfoCollector struct {
+	Cloud         costAnalyzerCloud.Provider
+	KubeClientSet kubernetes.Interface
+}
+
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
+func (cic ClusterInfoCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("kubecost_cluster_info", "Kubecost Cluster Info", []string{}, nil)
+}
+
+// Collect is called by the Prometheus registry when collecting metrics.
+func (cic ClusterInfoCollector) Collect(ch chan<- prometheus.Metric) {
+	clusterInfo := GetClusterInfo(cic.KubeClientSet, cic.Cloud)
+	labels := prom.MapToLabels(clusterInfo)
+
+	m := newClusterInfoMetric("kubecost_cluster_info", labels)
+	ch <- m
+}
+
+//--------------------------------------------------------------------------
+//  ClusterInfoMetric
+//--------------------------------------------------------------------------
+
+// ClusterInfoMetric is a prometheus.Metric used to encode the local cluster info
+type ClusterInfoMetric struct {
+	fqName string
+	help   string
+	labels map[string]string
+}
+
+// Creates a new ClusterInfoMetric, implementation of prometheus.Metric
+func newClusterInfoMetric(fqName string, labels map[string]string) ClusterInfoMetric {
+	return ClusterInfoMetric{
+		fqName: fqName,
+		labels: labels,
+		help:   "kubecost_cluster_info ClusterInfo",
+	}
+}
+
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
+func (cim ClusterInfoMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{}
+	return prometheus.NewDesc(cim.fqName, cim.help, prom.LabelNamesFrom(cim.labels), l)
+}
+
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
+func (cim ClusterInfoMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+	var labels []*dto.LabelPair
+	for k, v := range cim.labels {
+		labels = append(labels, &dto.LabelPair{
+			Name:  toStringPtr(k),
+			Value: toStringPtr(v),
+		})
+	}
+	m.Label = labels
+	return nil
+}
+
+// toStringPtr is used to create a new string pointer from iteration vars
+func toStringPtr(s string) *string {
+	return &s
+}
+
 //--------------------------------------------------------------------------
 //  Package Functions
 //--------------------------------------------------------------------------
@@ -426,6 +502,11 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 				a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
 				a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
 				a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
+				if node.IsSpot() {
+					a.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(1.0)
+				} else {
+					a.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(0.0)
+				}
 				labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
 				nodeSeen[labelKey] = true
 			}
@@ -440,7 +521,11 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 				if costs.PVCData != nil {
 					for _, pvc := range costs.PVCData {
 						if pvc.Volume != nil {
-							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
+							timesClaimed := pvc.TimesClaimed
+							if timesClaimed == 0 {
+								timesClaimed = 1 // unallocated PVs are unclaimed but have a full allocation
+							}
+							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value / float64(pvc.TimesClaimed))
 							labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
 							pvcSeen[labelKey] = true
 						}
@@ -509,6 +594,12 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
 					}
+					ok = a.NodeSpotRecorder.DeleteLabelValues(labels...)
+					if ok {
+						klog.Infof("removed %s from spot records", labelString)
+					} else {
+						klog.Infof("FAILURE TO REMOVE %s from spot records", labelString)
+					}
 					ok = a.CPUPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 						klog.Infof("removed %s from cpuprice", labelString)

+ 34 - 43
pkg/costmodel/router.go

@@ -2,6 +2,7 @@ package costmodel
 
 import (
 	"context"
+	"crypto/tls"
 	"encoding/json"
 	"flag"
 	"fmt"
@@ -45,11 +46,6 @@ const (
 var (
 	// gitCommit is set by the build system
 	gitCommit                       string
-	logCollectionEnabled            bool   = env.IsLogCollectionEnabled()
-	productAnalyticsEnabled         bool   = env.IsProductAnalyticsEnabled()
-	errorReportingEnabled           bool   = env.IsErrorReportingEnabled()
-	valuesReportingEnabled          bool   = env.IsValuesReportingEnabled()
-	clusterProfile                  string = env.GetClusterProfile()
 	multiclusterDBBasicAuthUsername string = env.GetMultiClusterBasicAuthUsername()
 	multiclusterDBBasicAuthPW       string = env.GetMultiClusterBasicAuthPassword()
 )
@@ -68,6 +64,7 @@ type Accesses struct {
 	PersistentVolumePriceRecorder *prometheus.GaugeVec
 	GPUPriceRecorder              *prometheus.GaugeVec
 	NodeTotalPriceRecorder        *prometheus.GaugeVec
+	NodeSpotRecorder              *prometheus.GaugeVec
 	RAMAllocationRecorder         *prometheus.GaugeVec
 	CPUAllocationRecorder         *prometheus.GaugeVec
 	GPUAllocationRecorder         *prometheus.GaugeVec
@@ -168,14 +165,6 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
-// writeReportingFlags writes the reporting flags to the cluster info map
-func writeReportingFlags(clusterInfo map[string]string) {
-	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
-	clusterInfo["productAnalytics"] = fmt.Sprintf("%t", productAnalyticsEnabled)
-	clusterInfo["errorReporting"] = fmt.Sprintf("%t", errorReportingEnabled)
-	clusterInfo["valuesReporting"] = fmt.Sprintf("%t", valuesReportingEnabled)
-}
-
 // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
 // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
 // return 0.0.
@@ -342,7 +331,22 @@ func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httpr
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 
-	data, err := ComputeClusterCosts(a.PrometheusClient, a.Cloud, window, offset, true)
+	useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
+
+	if useThanos && !thanos.IsEnabled() {
+		w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
+		return
+	}
+
+	var client prometheusClient.Client
+	if useThanos {
+		client = a.ThanosClient
+		offset = thanos.Offset()
+	} else {
+		client = a.PrometheusClient
+	}
+
+	data, err := ComputeClusterCosts(client, a.Cloud, window, offset, true)
 	w.Write(WrapData(data, err))
 }
 
@@ -612,36 +616,9 @@ func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httpro
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	data, err := p.Cloud.ClusterInfo()
-
-	kc, ok := p.KubeClientSet.(*kubernetes.Clientset)
-	if ok && data != nil {
-		v, err := kc.ServerVersion()
-		if err != nil {
-			klog.Infof("Could not get k8s version info: %s", err.Error())
-		} else if v != nil {
-			data["version"] = v.Major + "." + v.Minor
-		}
-	} else {
-		klog.Infof("Could not get k8s version info: %s", err.Error())
-	}
-
-	// Ensure we create the info object if it doesn't exist
-	if data == nil {
-		data = make(map[string]string)
-	}
-
-	data["clusterProfile"] = clusterProfile
+	data := GetClusterInfo(p.KubeClientSet, p.Cloud)
 
-	// Include Product Reporting Flags with Cluster Info
-	writeReportingFlags(data)
-
-	// Include Thanos Offset Duration if Applicable
-	if thanos.IsEnabled() {
-		data["thanosOffset"] = thanos.Offset()
-	}
-
-	w.Write(WrapData(data, err))
+	w.Write(WrapData(data, nil))
 }
 
 func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
@@ -747,6 +724,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	queryConcurrency := env.GetMaxQueryConcurrency()
 	klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
 
+	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
 	var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
 		Proxy: http.ProxyFromEnvironment,
 		DialContext: (&net.Dialer{
@@ -754,6 +732,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 			KeepAlive: 120 * time.Second,
 		}).DialContext,
 		TLSHandshakeTimeout: 10 * time.Second,
+		TLSClientConfig:     tlsConfig,
 	}
 
 	pc := prometheusClient.Config{
@@ -863,6 +842,11 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Help: "node_total_hourly_cost Total node cost per hour",
 	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
 
+	spotGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Name: "kubecost_node_is_spot",
+		Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
+	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
+
 	pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "pv_hourly_cost",
 		Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
@@ -909,6 +893,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	prometheus.MustRegister(gpuGv)
 	prometheus.MustRegister(totalGv)
 	prometheus.MustRegister(pvGv)
+	prometheus.MustRegister(spotGv)
 	prometheus.MustRegister(RAMAllocation)
 	prometheus.MustRegister(CPUAllocation)
 	prometheus.MustRegister(PVAllocation)
@@ -924,6 +909,10 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	prometheus.MustRegister(StatefulsetCollector{
 		KubeClientSet: kubeClientset,
 	})
+	prometheus.MustRegister(ClusterInfoCollector{
+		KubeClientSet: kubeClientset,
+		Cloud:         cloudProvider,
+	})
 
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
@@ -937,6 +926,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		RAMPriceRecorder:              ramGv,
 		GPUPriceRecorder:              gpuGv,
 		NodeTotalPriceRecorder:        totalGv,
+		NodeSpotRecorder:              spotGv,
 		RAMAllocationRecorder:         RAMAllocation,
 		CPUAllocationRecorder:         CPUAllocation,
 		GPUAllocationRecorder:         GPUAllocation,
@@ -975,6 +965,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 					KeepAlive: 120 * time.Second,
 				}).DialContext,
 				TLSHandshakeTimeout: 10 * time.Second,
+				TLSClientConfig:     tlsConfig,
 			}
 
 			thanosConfig := prometheusClient.Config{

+ 6 - 0
pkg/env/costmodelenv.go

@@ -30,6 +30,8 @@ const (
 
 	MultiClusterBasicAuthUsername = "MC_BASIC_AUTH_USERNAME"
 	MultiClusterBasicAuthPassword = "MC_BASIC_AUTH_PW"
+
+	InsecureSkipVerify = "INSECURE_SKIP_VERIFY"
 )
 
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
@@ -74,6 +76,10 @@ func GetPrometheusServerEndpoint() string {
 	return Get(PrometheusServerEndpointEnvVar, "")
 }
 
+func GetInsecureSkipVerify() bool {
+	return GetBool(InsecureSkipVerify, false)
+}
+
 // IsRemoteEnabled returns the environment variable value for RemoteEnabledEnvVar which represents whether
 // or not remote write is enabled for prometheus for use with SQL backed persistent storage.
 func IsRemoteEnabled() bool {

+ 69 - 0
pkg/prom/metrics.go

@@ -0,0 +1,69 @@
+package prom
+
+import (
+	"encoding/json"
+	"fmt"
+	"reflect"
+	"strings"
+)
+
+// AnyToLabels will create prometheus labels based on the fields of the interface
+// passed. Note that this method is quite expensive and should only be used when absolutely
+// necessary.
+func AnyToLabels(a interface{}) (map[string]string, error) {
+	val := reflect.ValueOf(a)
+	if val.Kind() == reflect.Map {
+		return MapToLabels(a), nil
+	}
+
+	b, e := json.Marshal(a)
+	if e != nil {
+		return nil, e
+	}
+
+	var m map[string]interface{}
+	e = json.Unmarshal(b, &m)
+	if e != nil {
+		return nil, e
+	}
+
+	return MapToLabels(m), nil
+}
+
+// MapToLabels accepts a map type, and will return a new map containing all the nested
+// fields separated by _ with string versions of the values.
+func MapToLabels(m interface{}) map[string]string {
+	val := reflect.ValueOf(m)
+	if val.Kind() != reflect.Map {
+		return map[string]string{}
+	}
+
+	r := make(map[string]string)
+
+	for _, k := range val.MapKeys() {
+		key := strings.ToLower(k.String())
+		v := val.MapIndex(k).Interface()
+
+		switch v.(type) {
+		case uint, uint8, uint16, uint32, uint64, int, int8, int16, int32, int64, string, bool, float32, float64:
+			r[key] = fmt.Sprintf("%+v", v)
+
+		default:
+			mm := MapToLabels(v)
+			for kk, vv := range mm {
+				r[fmt.Sprintf("%s_%s", key, kk)] = vv
+			}
+		}
+	}
+
+	return r
+}
+
+// LabelNamesFrom accepts a mapping of labels to values and returns the label names.
+func LabelNamesFrom(labels map[string]string) []string {
+	keys := []string{}
+	for key := range labels {
+		keys = append(keys, key)
+	}
+	return keys
+}

+ 54 - 7
pkg/prom/prom.go

@@ -5,10 +5,29 @@ import (
 	"net/http"
 	"net/url"
 
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 )
 
+// Creates a new prometheus client which limits the total number of concurrent outbound requests
+// allowed at a given moment.
+type RateLimitedPrometheusClient struct {
+	client   prometheus.Client
+	limiter  *util.Semaphore
+	requests *util.AtomicInt32
+	outbound *util.AtomicInt32
+	username string
+	password string
+}
+
+// requestCounter is used to determine if the prometheus client keeps track of
+// the concurrent outbound requests
+type requestCounter interface {
+	TotalRequests() int32
+	TotalOutboundRequests() int32
+}
+
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
 func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password string) (prometheus.Client, error) {
@@ -18,22 +37,41 @@ func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username
 	}
 
 	limiter := util.NewSemaphore(maxConcurrency)
+	requests := util.NewAtomicInt32(0)
+	outbound := util.NewAtomicInt32(0)
 
 	return &RateLimitedPrometheusClient{
 		client:   c,
 		limiter:  limiter,
+		requests: requests,
+		outbound: outbound,
 		username: username,
 		password: password,
 	}, nil
 }
 
-// Creates a new prometheus client which limits the total number of concurrent outbound requests
-// allowed at a given moment.
-type RateLimitedPrometheusClient struct {
-	client   prometheus.Client
-	limiter  *util.Semaphore
-	username string
-	password string
+// LogPrometheusClientState logs the current state, with respect to outbound requests, if that
+// information is available.
+func LogPrometheusClientState(client prometheus.Client) {
+	if rc, ok := client.(requestCounter); ok {
+		total := rc.TotalRequests()
+		outbound := rc.TotalOutboundRequests()
+		queued := total - outbound
+
+		log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
+	}
+}
+
+// TotalRequests returns the total number of requests that are either waiting to be sent and/or
+// are currently outbound.
+func (rlpc *RateLimitedPrometheusClient) TotalRequests() int32 {
+	return rlpc.requests.Get()
+}
+
+// TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
+// sent to the server and are awaiting response.
+func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int32 {
+	return rlpc.outbound.Get()
 }
 
 // Passthrough to the prometheus client API
@@ -46,8 +84,17 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 	if rlpc.username != "" {
 		req.SetBasicAuth(rlpc.username, rlpc.password)
 	}
+	// Increment the total request counter first
+	rlpc.requests.Increment()
+	defer rlpc.requests.Decrement()
+
+	// Acquire mutex based on concurrency limiter
 	rlpc.limiter.Acquire()
 	defer rlpc.limiter.Return()
 
+	// Increment outbound once mutex acquired
+	rlpc.outbound.Increment()
+	defer rlpc.outbound.Decrement()
+
 	return rlpc.client.Do(ctx, req)
 }

+ 72 - 20
pkg/prom/query.go

@@ -10,6 +10,7 @@ import (
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
@@ -54,19 +55,18 @@ func (ctx *Context) HasErrors() bool {
 func (ctx *Context) Query(query string) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go func(ctx *Context, resCh QueryResultsChan) {
-		defer errors.HandlePanic()
+	go runQuery(query, ctx, resCh, "")
 
-		raw, promErr := ctx.query(query)
-		ctx.ErrorCollector.Report(promErr)
+	return resCh
+}
 
-		results := NewQueryResults(query, raw)
-		if results.Error != nil {
-			ctx.ErrorCollector.Report(results.Error)
-		}
+// ProfileQuery returns a QueryResultsChan, then runs the given query with a profile
+// label and sends the results on the provided channel. Receiver is responsible for closing the
+// channel, preferably using the Read method.
+func (ctx *Context) ProfileQuery(query string, profileLabel string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
 
-		resCh <- results
-	}(ctx, resCh)
+	go runQuery(query, ctx, resCh, profileLabel)
 
 	return resCh
 }
@@ -85,6 +85,20 @@ func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
 	return resChs
 }
 
+// ProfileQueryAll returns one QueryResultsChan for each query provided, then runs
+// each ProfileQuery concurrently and returns results on each channel, respectively,
+// in the order they were provided; i.e. the response to queries[1] will be
+// sent on channel resChs[1].
+func (ctx *Context) ProfileQueryAll(queries ...string) []QueryResultsChan {
+	resChs := []QueryResultsChan{}
+
+	for _, q := range queries {
+		resChs = append(resChs, ctx.ProfileQuery(q, fmt.Sprintf("Query #%d", len(resChs)+1)))
+	}
+
+	return resChs
+}
+
 func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
 	raw, err := ctx.query(query)
 	if err != nil {
@@ -104,6 +118,27 @@ func (ctx *Context) QueryURL() *url.URL {
 	return ctx.Client.URL(epQuery, nil)
 }
 
+// runQuery executes the prometheus query asynchronously, collects results and
+// errors, and passes them through the results channel.
+func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel string) {
+	defer errors.HandlePanic()
+	startQuery := time.Now()
+
+	raw, promErr := ctx.query(query)
+	ctx.ErrorCollector.Report(promErr)
+
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		ctx.ErrorCollector.Report(results.Error)
+	}
+
+	if profileLabel != "" {
+		log.Profile(startQuery, profileLabel)
+	}
+
+	resCh <- results
+}
+
 func (ctx *Context) query(query string) (interface{}, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
@@ -139,19 +174,15 @@ func (ctx *Context) query(query string) (interface{}, error) {
 func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go func(ctx *Context, resCh QueryResultsChan) {
-		defer errors.HandlePanic()
+	go runQueryRange(query, start, end, step, ctx, resCh, "")
 
-		raw, promErr := ctx.queryRange(query, start, end, step)
-		ctx.ErrorCollector.Report(promErr)
+	return resCh
+}
 
-		results := NewQueryResults(query, raw)
-		if results.Error != nil {
-			ctx.ErrorCollector.Report(results.Error)
-		}
+func (ctx *Context) ProfileQueryRange(query string, start, end time.Time, step time.Duration, profileLabel string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
 
-		resCh <- results
-	}(ctx, resCh)
+	go runQueryRange(query, start, end, step, ctx, resCh, profileLabel)
 
 	return resCh
 }
@@ -175,6 +206,27 @@ func (ctx *Context) QueryRangeURL() *url.URL {
 	return ctx.Client.URL(epQueryRange, nil)
 }
 
+// runQueryRange executes the prometheus queryRange asynchronously, collects results and
+// errors, and passes them through the results channel.
+func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *Context, resCh QueryResultsChan, profileLabel string) {
+	defer errors.HandlePanic()
+	startQuery := time.Now()
+
+	raw, promErr := ctx.queryRange(query, start, end, step)
+	ctx.ErrorCollector.Report(promErr)
+
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		ctx.ErrorCollector.Report(results.Error)
+	}
+
+	if profileLabel != "" {
+		log.Profile(startQuery, profileLabel)
+	}
+
+	resCh <- results
+}
+
 func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
 	u := ctx.Client.URL(epQueryRange, nil)
 	q := u.Query()

+ 37 - 0
pkg/util/atomic.go

@@ -0,0 +1,37 @@
+package util
+
+import "sync/atomic"
+
+type AtomicInt32 int32
+
+// NewAtomicInt32 creates a new atomic int32 instance.
+func NewAtomicInt32(value int32) *AtomicInt32 {
+	ai := new(AtomicInt32)
+	ai.Set(value)
+	return ai
+}
+
+// Loads the int32 value atomically
+func (ai *AtomicInt32) Get() int32 {
+	return atomic.LoadInt32((*int32)(ai))
+}
+
+// Sets the int32 value atomically
+func (ai *AtomicInt32) Set(value int32) {
+	atomic.StoreInt32((*int32)(ai), value)
+}
+
+// Increments the atomic int and returns the new value
+func (ai *AtomicInt32) Increment() int32 {
+	return atomic.AddInt32((*int32)(ai), 1)
+}
+
+// Decrements the atomint int and returns the new value
+func (ai *AtomicInt32) Decrement() int32 {
+	return atomic.AddInt32((*int32)(ai), -1)
+}
+
+// CompareAndSet sets value to new if current is equal to the current value
+func (ai *AtomicInt32) CompareAndSet(current, new int32) bool {
+	return atomic.CompareAndSwapInt32((*int32)(ai), current, new)
+}

+ 28 - 0
test/clusterinfo_test.go

@@ -0,0 +1,28 @@
+package test
+
+import (
+	"encoding/json"
+	"testing"
+
+	"github.com/kubecost/cost-model/pkg/prom"
+)
+
+func TestClusterInfoLabels(t *testing.T) {
+	expected := map[string]bool{"clusterprofile": true, "errorreporting": true, "id": true, "logcollection": true, "name": true, "productanalytics": true, "provider": true, "provisioner": true, "remotereadenabled": true, "thanosenabled": true, "valuesreporting": true, "version": true}
+	clusterInfo := `{"clusterProfile":"production","errorReporting":"true","id":"cluster-one","logCollection":"true","name":"bolt-3","productAnalytics":"true","provider":"GCP","provisioner":"GKE","remoteReadEnabled":"false","thanosEnabled":"false","valuesReporting":"true","version":"1.14+"}`
+
+	var m map[string]interface{}
+	err := json.Unmarshal([]byte(clusterInfo), &m)
+	if err != nil {
+		t.Errorf("Error: %s", err)
+		return
+	}
+
+	labels := prom.MapToLabels(m)
+	for k := range expected {
+		if _, ok := labels[k]; !ok {
+			t.Errorf("Failed to locate key: \"%s\" in labels.", k)
+			return
+		}
+	}
+}