Ajay Tripathy преди 4 години
родител
ревизия
d299d3b94a

+ 8 - 2
pkg/cloud/awsprovider.go

@@ -130,9 +130,9 @@ type AWS struct {
 	SpotPricingUpdatedAt        *time.Time
 	SpotRefreshRunning          bool
 	SpotPricingLock             sync.RWMutex
-	SpotPricingError           error
+	SpotPricingError            error
 	RIPricingByInstanceID       map[string]*RIData
-	RIPricingError             error
+	RIPricingError              error
 	RIDataRunning               bool
 	RIDataLock                  sync.RWMutex
 	SavingsPlanDataByInstanceID map[string]*SavingsPlanData
@@ -156,6 +156,8 @@ type AWS struct {
 	Config                      *ProviderConfig
 	ServiceAccountChecks        map[string]*ServiceAccountCheck
 	clusterManagementPrice      float64
+	clusterAccountId            string
+	clusterRegion               string
 	clusterProvisioner          string
 	*CustomProvider
 }
@@ -1180,6 +1182,8 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		m := make(map[string]string)
 		m["name"] = c.ClusterName
 		m["provider"] = "AWS"
+		m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
+		m["region"] = awsProvider.clusterRegion
 		m["id"] = env.GetClusterID()
 		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 		m["provisioner"] = awsProvider.clusterProvisioner
@@ -1190,6 +1194,8 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		m := make(map[string]string)
 		m["name"] = clusterName
 		m["provider"] = "AWS"
+		m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
+		m["region"] = awsProvider.clusterRegion
 		m["id"] = env.GetClusterID()
 		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 		return m, nil

+ 19 - 3
pkg/cloud/azureprovider.go

@@ -383,6 +383,8 @@ type Azure struct {
 	Config                  *ProviderConfig
 	ServiceAccountChecks    map[string]*ServiceAccountCheck
 	RateCardPricingError    error
+	clusterAccountId        string
+	clusterRegion           string
 }
 
 type azureKey struct {
@@ -1130,6 +1132,8 @@ func (az *Azure) ClusterInfo() (map[string]string, error) {
 		m["name"] = c.ClusterName
 	}
 	m["provider"] = "azure"
+	m["account"] = az.clusterAccountId
+	m["region"] = az.clusterRegion
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 	m["id"] = env.GetClusterID()
 	return m, nil
@@ -1391,8 +1395,8 @@ func (az *Azure) PricingSourceStatus() map[string]*PricingSource {
 	if az.RateCardPricingError != nil {
 		errMsg = az.RateCardPricingError.Error()
 	}
-	rcps := &PricingSource {
-		Name: rateCardPricingSource,
+	rcps := &PricingSource{
+		Name:  rateCardPricingSource,
 		Error: errMsg,
 	}
 	if rcps.Error != "" {
@@ -1400,7 +1404,7 @@ func (az *Azure) PricingSourceStatus() map[string]*PricingSource {
 	} else if len(az.Pricing) == 0 {
 		rcps.Error = "No Pricing Data Available"
 		rcps.Available = false
-	}else {
+	} else {
 		rcps.Available = true
 	}
 	sources[rateCardPricingSource] = rcps
@@ -1418,3 +1422,15 @@ func (az *Azure) CombinedDiscountForNode(instanceType string, isPreemptible bool
 func (az *Azure) Regions() []string {
 	return azureRegions
 }
+
+func parseAzureSubscriptionID(id string) string {
+	// azure:///subscriptions/0badafdf-1234-abcd-wxyz-123456789/...
+	//  => 0badafdf-1234-abcd-wxyz-123456789
+	rx := regexp.MustCompile("azure:///subscriptions/([^/]*)/*")
+	match := rx.FindStringSubmatch(id)
+	if len(match) >= 2 {
+		return match[1]
+	}
+	// Return empty string if an account could not be parsed from provided string
+	return ""
+}

+ 36 - 0
pkg/cloud/azureprovider_test.go

@@ -0,0 +1,36 @@
+package cloud
+
+import (
+	"testing"
+)
+
+func TestParseAzureSubscriptionID(t *testing.T) {
+	cases := []struct {
+		input    string
+		expected string
+	}{
+		{
+			input:    "azure:///subscriptions/0badafdf-1234-abcd-wxyz-123456789/...",
+			expected: "0badafdf-1234-abcd-wxyz-123456789",
+		},
+		{
+			input:    "azure:/subscriptions/0badafdf-1234-abcd-wxyz-123456789/...",
+			expected: "",
+		},
+		{
+			input:    "azure:///subscriptions//",
+			expected: "",
+		},
+		{
+			input:    "",
+			expected: "",
+		},
+	}
+
+	for _, test := range cases {
+		result := parseAzureSubscriptionID(test.input)
+		if result != test.expected {
+			t.Errorf("Input: %s, Expected: %s, Actual: %s", test.input, test.expected, result)
+		}
+	}
+}

+ 16 - 0
pkg/cloud/gcpprovider.go

@@ -92,6 +92,8 @@ type GCP struct {
 	ServiceKeyProvided      bool
 	ValidPricingKeys        map[string]bool
 	clusterManagementPrice  float64
+	clusterProjectId        string
+	clusterRegion           string
 	clusterProvisioner      string
 	*CustomProvider
 }
@@ -529,6 +531,8 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m := make(map[string]string)
 	m["name"] = attribute
 	m["provider"] = "GCP"
+	m["project"] = gcp.clusterProjectId
+	m["region"] = gcp.clusterRegion
 	m["provisioner"] = gcp.clusterProvisioner
 	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
@@ -1524,3 +1528,15 @@ func sustainedUseDiscount(class string, defaultDiscount float64, isPreemptible b
 	}
 	return discount
 }
+
+func parseGCPProjectID(id string) string {
+	// gce://guestbook-12345/...
+	//  => guestbook-12345
+	rx := regexp.MustCompile("gce://([^/]*)/*")
+	match := rx.FindStringSubmatch(id)
+	if len(match) >= 2 {
+		return match[1]
+	}
+	// Return empty string if an account could not be parsed from provided string
+	return ""
+}

+ 31 - 0
pkg/cloud/gcpprovider_test.go

@@ -34,3 +34,34 @@ func TestParseGCPInstanceTypeLabel(t *testing.T) {
 		}
 	}
 }
+
+func TestParseGCPProjectID(t *testing.T) {
+	cases := []struct {
+		input    string
+		expected string
+	}{
+		{
+			input:    "gce://guestbook-12345/...",
+			expected: "guestbook-12345",
+		},
+		{
+			input:    "gce:/guestbook-12345/...",
+			expected: "",
+		},
+		{
+			input:    "asdfa",
+			expected: "",
+		},
+		{
+			input:    "",
+			expected: "",
+		},
+	}
+
+	for _, test := range cases {
+		result := parseGCPProjectID(test.input)
+		if result != test.expected {
+			t.Errorf("Input: %s, Expected: %s, Actual: %s", test.input, test.expected, result)
+		}
+	}
+}

+ 60 - 29
pkg/cloud/provider.go

@@ -4,6 +4,7 @@ import (
 	"database/sql"
 	"errors"
 	"fmt"
+	"github.com/kubecost/cost-model/pkg/util"
 	"io"
 	"regexp"
 	"strconv"
@@ -412,62 +413,92 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		return nil, fmt.Errorf("Could not locate any nodes for cluster.")
 	}
 
-	provider := strings.ToLower(nodes[0].Spec.ProviderID)
+	cp := getClusterProperties(nodes[0])
 
-	if env.IsUseCSVProvider() {
+	switch cp.provider {
+	case "CSV":
 		klog.Infof("Using CSV Provider with CSV at %s", env.GetCSVPath())
-		configFileName := ""
-		if metadata.OnGCE() {
-			configFileName = "gcp.json"
-		} else if strings.HasPrefix(provider, "aws") {
-			configFileName = "aws.json"
-		} else if strings.HasPrefix(provider, "azure") {
-			configFileName = "azure.json"
-
-		} else {
-			configFileName = "default.json"
-		}
 		return &CSVProvider{
 			CSVLocation: env.GetCSVPath(),
 			CustomProvider: &CustomProvider{
 				Clientset: cache,
-				Config:    NewProviderConfig(configFileName),
+				Config:    NewProviderConfig(cp.configFileName),
 			},
 		}, nil
-	}
-	if metadata.OnGCE() {
+	case "GCP":
 		klog.V(3).Info("metadata reports we are in GCE")
 		if apiKey == "" {
 			return nil, errors.New("Supply a GCP Key to start getting data")
 		}
 		return &GCP{
-			Clientset: cache,
-			APIKey:    apiKey,
-			Config:    NewProviderConfig("gcp.json"),
+			Clientset:        cache,
+			APIKey:           apiKey,
+			Config:           NewProviderConfig(cp.configFileName),
+			clusterRegion:    cp.region,
+			clusterProjectId: cp.projectID,
 		}, nil
-	}
-
-	if strings.HasPrefix(provider, "aws") {
+	case "AWS":
 		klog.V(2).Info("Found ProviderID starting with \"aws\", using AWS Provider")
 		return &AWS{
-			Clientset: cache,
-			Config:    NewProviderConfig("aws.json"),
+			Clientset:        cache,
+			Config:           NewProviderConfig(cp.configFileName),
+			clusterRegion:    cp.region,
+			clusterAccountId: cp.accountID,
 		}, nil
-	} else if strings.HasPrefix(provider, "azure") {
+	case "AZURE":
 		klog.V(2).Info("Found ProviderID starting with \"azure\", using Azure Provider")
 		return &Azure{
-			Clientset: cache,
-			Config:    NewProviderConfig("azure.json"),
+			Clientset:        cache,
+			Config:           NewProviderConfig(cp.configFileName),
+			clusterRegion:    cp.region,
+			clusterAccountId: cp.accountID,
 		}, nil
-	} else {
+	default:
 		klog.V(2).Info("Unsupported provider, falling back to default")
 		return &CustomProvider{
 			Clientset: cache,
-			Config:    NewProviderConfig("default.json"),
+			Config:    NewProviderConfig(cp.configFileName),
 		}, nil
 	}
 }
 
+type clusterProperties struct {
+	provider       string
+	configFileName string
+	region         string
+	accountID      string
+	projectID      string
+}
+
+func getClusterProperties(node *v1.Node) (clusterProperties) {
+	providerID := strings.ToLower(node.Spec.ProviderID)
+	region, _ := util.GetRegion(node.Labels)
+	cp := clusterProperties{
+		provider: "DEFAULT",
+		configFileName: "default.json",
+		region: region,
+		accountID: "",
+		projectID: "",
+	}
+	if metadata.OnGCE() {
+		cp.provider = "GCP"
+		cp.configFileName = "gcp.json"
+		cp.projectID = parseGCPProjectID(providerID)
+	} else if strings.HasPrefix(providerID, "aws") {
+		cp.provider = "AWS"
+		cp.configFileName = "aws.json"
+	} else if strings.HasPrefix(providerID, "azure") {
+		cp.provider = "AZURE"
+		cp.configFileName = "azure.json"
+		cp.accountID = parseAzureSubscriptionID(providerID)
+	}
+	if env.IsUseCSVProvider() {
+		cp.provider = "CSV"
+	}
+
+	return cp
+}
+
 func UpdateClusterMeta(cluster_id, cluster_name string) error {
 	pw := env.GetRemotePW()
 	address := env.GetSQLAddress()

+ 7 - 1
pkg/costmodel/cluster.go

@@ -39,6 +39,8 @@ const (
 	queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
 )
 
+const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
+
 // Costs represents cumulative and monthly cluster costs over a given duration. Costs
 // are broken down by cores, memory, and storage.
 type ClusterCosts struct {
@@ -242,6 +244,10 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 			}
 		}
 		diskMap[key].Bytes = bytes
+		if bytes/1024/1024/1024 > maxLocalDiskSize {
+			log.Warningf("Deleting large root disk/localstorage disk from analysis")
+			delete(diskMap, key)
+		}
 	}
 
 	for _, result := range resLocalActiveMins {
@@ -258,7 +264,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
-			log.Warningf("ClusterDisks: local active mins for unidentified disk")
+			log.Warningf("ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
 			continue
 		}
 

+ 45 - 0
pkg/costmodel/clusters/clustermap.go

@@ -21,11 +21,15 @@ const (
 	LoadRetryDelay time.Duration = 10 * time.Second
 )
 
+// ClusterInfo holds attributes of Cluster from metrics pulled from Prometheus
 type ClusterInfo struct {
 	ID          string `json:"id"`
 	Name        string `json:"name"`
 	Profile     string `json:"profile"`
 	Provider    string `json:"provider"`
+	Account     string `json:"account"`
+	Project     string `json:"project"`
+	Region      string `json:"region"`
 	Provisioner string `json:"provisioner"`
 }
 
@@ -40,6 +44,9 @@ func (ci *ClusterInfo) Clone() *ClusterInfo {
 		Name:        ci.Name,
 		Profile:     ci.Profile,
 		Provider:    ci.Provider,
+		Account:     ci.Account,
+		Project:     ci.Project,
+		Region:      ci.Region,
 		Provisioner: ci.Provisioner,
 	}
 }
@@ -170,6 +177,21 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error)
 			provider = ""
 		}
 
+		account, err := result.GetString("account")
+		if err != nil {
+			account = ""
+		}
+
+		project, err := result.GetString("project")
+		if err != nil {
+			project = ""
+		}
+
+		region, err := result.GetString("region")
+		if err != nil {
+			region = ""
+		}
+
 		provisioner, err := result.GetString("provisioner")
 		if err != nil {
 			provisioner = ""
@@ -180,6 +202,9 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error)
 			Name:        name,
 			Profile:     profile,
 			Provider:    provider,
+			Account:     account,
+			Project:     project,
+			Region:      region,
 			Provisioner: provisioner,
 		}
 	}
@@ -218,14 +243,31 @@ func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
 
 	var clusterProfile string
 	var provider string
+	var account string
+	var project string
+	var region string
 	var provisioner string
 
 	if cp, ok := info["clusterProfile"]; ok {
 		clusterProfile = cp
 	}
+
 	if pvdr, ok := info["provider"]; ok {
 		provider = pvdr
 	}
+
+	if acct, ok := info["account"]; ok {
+		account = acct
+	}
+
+	if proj, ok := info["project"]; ok {
+		project = proj
+	}
+
+	if reg, ok := info["region"]; ok {
+		region = reg
+	}
+
 	if pvsr, ok := info["provisioner"]; ok {
 		provisioner = pvsr
 	}
@@ -235,6 +277,9 @@ func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
 		Name:        name,
 		Profile:     clusterProfile,
 		Provider:    provider,
+		Account:     account,
+		Project:     project,
+		Region:      region,
 		Provisioner: provisioner,
 	}, nil
 }

+ 1 - 1
pkg/costmodel/metrics.go

@@ -268,7 +268,7 @@ func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clust
 		EmitNamespaceAnnotations:      env.IsEmitNamespaceAnnotationsMetric(),
 		EmitPodAnnotations:            env.IsEmitPodAnnotationsMetric(),
 		EmitKubeStateMetrics:          env.IsEmitKsmV1Metrics(),
-		EmitNodeKubeStateMetrisOnly:   env.IsEmitNodeKubeStateMetrisOnly(),
+		EmitKubeStateMetricsV1Only:    env.IsEmitKsmV1MetricsOnly(),
 	})
 
 	return &CostModelMetricsEmitter{

+ 3 - 3
pkg/env/costmodelenv.go

@@ -39,7 +39,7 @@ const (
 	EmitNamespaceAnnotationsMetricEnvVar = "EMIT_NAMESPACE_ANNOTATIONS_METRIC"
 
 	EmitKsmV1MetricsEnvVar = "EMIT_KSM_V1_METRICS"
-	EmitKsmNodeMetricsOnly = "EMIT_KSM_NODE_METRICS_ONLY"
+	EmitKsmV1MetricsOnly   = "EMIT_KSM_V1_METRICS_ONLY"
 
 	ThanosEnabledEnvVar      = "THANOS_ENABLED"
 	ThanosQueryUrlEnvVar     = "THANOS_QUERY_URL"
@@ -105,8 +105,8 @@ func IsEmitKsmV1Metrics() bool {
 	return GetBool(EmitKsmV1MetricsEnvVar, true)
 }
 
-func IsEmitNodeKubeStateMetrisOnly() bool {
-	return GetBool(EmitKsmNodeMetricsOnly, true)
+func IsEmitKsmV1MetricsOnly() bool {
+	return GetBool(EmitKsmV1MetricsOnly, false)
 }
 
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents

+ 8 - 8
pkg/kubecost/allocation.go

@@ -121,7 +121,7 @@ func (pv *PVAllocations) Clone() PVAllocations {
 		return nil
 	}
 	apv := *pv
-	clonePV := PVAllocations{}
+	clonePV := make(map[PVKey]*PVAllocation, len(apv))
 	for k, v := range apv {
 		clonePV[k] = &PVAllocation{
 			ByteHours: v.ByteHours,
@@ -1679,17 +1679,17 @@ func (as *AllocationSet) Clone() *AllocationSet {
 	as.RLock()
 	defer as.RUnlock()
 
-	allocs := map[string]*Allocation{}
+	allocs := make(map[string]*Allocation, len(as.allocations))
 	for k, v := range as.allocations {
 		allocs[k] = v.Clone()
 	}
 
-	externalKeys := map[string]bool{}
+	externalKeys := make(map[string]bool, len(as.externalKeys))
 	for k, v := range as.externalKeys {
 		externalKeys[k] = v
 	}
 
-	idleKeys := map[string]bool{}
+	idleKeys := make(map[string]bool, len(as.idleKeys))
 	for k, v := range as.idleKeys {
 		idleKeys[k] = v
 	}
@@ -1754,12 +1754,12 @@ func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]
 			}
 
 			cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
-			gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
 			ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
+			gpuCost := node.GPUCost * (1.0) * adjustmentRate
 
 			assetClusterResourceCosts[node.Properties().Cluster]["cpu"] += cpuCost
-			assetClusterResourceCosts[node.Properties().Cluster]["gpu"] += gpuCost
 			assetClusterResourceCosts[node.Properties().Cluster]["ram"] += ramCost
+			assetClusterResourceCosts[node.Properties().Cluster]["gpu"] += gpuCost
 		}
 	})
 
@@ -1891,12 +1891,12 @@ func (as *AllocationSet) ComputeIdleAllocationsByNode(assetSet *AssetSet) (map[s
 			}
 
 			cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
-			gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
 			ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
+			gpuCost := node.GPUCost * adjustmentRate
 
 			assetNodeResourceCosts[node.Properties().ProviderID]["cpu"] += cpuCost
-			assetNodeResourceCosts[node.Properties().ProviderID]["gpu"] += gpuCost
 			assetNodeResourceCosts[node.Properties().ProviderID]["ram"] += ramCost
+			assetNodeResourceCosts[node.Properties().ProviderID]["gpu"] += gpuCost
 		}
 	})
 

+ 2 - 2
pkg/kubecost/allocationprops.go

@@ -133,13 +133,13 @@ func (p *AllocationProperties) Clone() *AllocationProperties {
 	}
 	clone.Services = services
 
-	labels := make(map[string]string)
+	labels := make(map[string]string, len(p.Labels))
 	for k, v := range p.Labels {
 		labels[k] = v
 	}
 	clone.Labels = labels
 
-	annotations := make(map[string]string)
+	annotations := make(map[string]string, len(p.Annotations))
 	for k, v := range p.Annotations {
 		annotations[k] = v
 	}

+ 2 - 2
pkg/kubecost/asset.go

@@ -740,7 +740,7 @@ func (ca *Cloud) Add(a Asset) Asset {
 	any.SetProperties(props)
 	any.SetLabels(labels)
 	any.adjustment = ca.Adjustment() + a.Adjustment()
-	any.Cost = (ca.TotalCost() - ca.Adjustment() - ca.Credit) + (a.TotalCost() - a.Adjustment() - ca.Credit)
+	any.Cost = (ca.TotalCost() - ca.Adjustment()) + (a.TotalCost() - a.Adjustment())
 
 	return any
 }
@@ -2502,7 +2502,7 @@ func (as *AssetSet) Clone() *AssetSet {
 		aggregateBy = append([]string{}, as.aggregateBy...)
 	}
 
-	assets := map[string]Asset{}
+	assets := make(map[string]Asset, len(as.assets))
 	for k, v := range as.assets {
 		assets[k] = v.Clone()
 	}

+ 1 - 1
pkg/kubecost/config_test.go

@@ -38,7 +38,7 @@ func TestLabelConfig_Map(t *testing.T) {
 
 func TestLabelConfig_GetExternalAllocationName(t *testing.T) {
 	// Make sure that AWS's Glue/Athena column formatting is supported
-	glueFormattedLabel := cloudutil.ConvertToGlueColumnFormat("Non__GlueFormattedLabel")
+	glueFormattedLabel := cloudutil.ConvertToGlueColumnFormat("___Non_&GlueFormattedLabel___&")
 
 	labels := map[string]string{
 		"kubens":                      "kubecost-staging",

+ 9 - 3
pkg/metrics/kubemetrics.go

@@ -28,7 +28,7 @@ type KubeMetricsOpts struct {
 	EmitNamespaceAnnotations      bool
 	EmitPodAnnotations            bool
 	EmitKubeStateMetrics          bool
-	EmitNodeKubeStateMetrisOnly   bool
+	EmitKubeStateMetricsV1Only    bool
 }
 
 // DefaultKubeMetricsOpts returns KubeMetricsOpts with default values set
@@ -38,7 +38,7 @@ func DefaultKubeMetricsOpts() *KubeMetricsOpts {
 		EmitNamespaceAnnotations:      false,
 		EmitPodAnnotations:            false,
 		EmitKubeStateMetrics:          true,
-		EmitNodeKubeStateMetrisOnly:   false,
+		EmitKubeStateMetricsV1Only:    false,
 	}
 }
 
@@ -95,10 +95,16 @@ func InitKubeMetrics(clusterCache clustercache.ClusterCache, opts *KubeMetricsOp
 			prometheus.MustRegister(KubeJobCollector{
 				KubeClusterCache: clusterCache,
 			})
-		} else if opts.EmitNodeKubeStateMetrisOnly {
+		} else if opts.EmitKubeStateMetricsV1Only {
 			prometheus.MustRegister(KubeNodeCollector{
 				KubeClusterCache: clusterCache,
 			})
+			prometheus.MustRegister(KubeNamespaceCollector{
+				KubeClusterCache: clusterCache,
+			})
+			prometheus.MustRegister(KubePodLabelsCollector{
+				KubeClusterCache: clusterCache,
+			})
 		}
 	})
 }

+ 72 - 0
pkg/metrics/podlabelmetrics.go

@@ -0,0 +1,72 @@
+package metrics
+
+import (
+	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+//--------------------------------------------------------------------------
+//  KubecostPodCollector
+//--------------------------------------------------------------------------
+
+// KubecostPodCollector is a prometheus collector that emits pod metrics
+type KubecostPodLabelsCollector struct {
+	KubeClusterCache clustercache.ClusterCache
+}
+
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
+func (kpmc KubecostPodLabelsCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("kube_pod_annotations", "All annotations for each pod prefix with annotation_", []string{}, nil)
+}
+
+// Collect is called by the Prometheus registry when collecting metrics.
+func (kpmc KubecostPodLabelsCollector) Collect(ch chan<- prometheus.Metric) {
+	pods := kpmc.KubeClusterCache.GetAllPods()
+	for _, pod := range pods {
+		podName := pod.GetName()
+		podNS := pod.GetNamespace()
+
+		// Pod Annotations
+		labels, values := prom.KubeAnnotationsToLabels(pod.Annotations)
+		if len(labels) > 0 {
+			ch <- newPodAnnotationMetric("kube_pod_annotations", podNS, podName, labels, values)
+		}
+	}
+}
+
+//--------------------------------------------------------------------------
+//  KubePodLabelsCollector
+//--------------------------------------------------------------------------
+
+// KubePodLabelsCollector is a prometheus collector that emits pod labels only
+type KubePodLabelsCollector struct {
+	KubeClusterCache clustercache.ClusterCache
+}
+
+// Describe sends the super-set of all possible descriptors of pod labels only
+// collected by this Collector.
+func (kpmc KubePodLabelsCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("kube_pod_labels", "All labels for each pod prefixed with label_", []string{}, nil)
+	ch <- prometheus.NewDesc("kube_pod_owner", "Information about the Pod's owner", []string{}, nil)
+}
+
+// Collect is called by the Prometheus registry when collecting metrics.
+func (kpmc KubePodLabelsCollector) Collect(ch chan<- prometheus.Metric) {
+	pods := kpmc.KubeClusterCache.GetAllPods()
+	for _, pod := range pods {
+		podName := pod.GetName()
+		podNS := pod.GetNamespace()
+		podUID := string(pod.GetUID())
+
+		// Pod Labels
+		labelNames, labelValues := prom.KubePrependQualifierToLabels(pod.GetLabels(), "label_")
+		ch <- newKubePodLabelsMetric("kube_pod_labels", podNS, podName, podUID, labelNames, labelValues)
+
+		// Owner References
+		for _, owner := range pod.OwnerReferences {
+			ch <- newKubePodOwnerMetric("kube_pod_owner", podNS, podName, owner.Name, owner.Kind, owner.Controller != nil)
+		}
+	}
+}

+ 12 - 0
pkg/util/cloudutil/aws.go

@@ -1,8 +1,16 @@
 package cloudutil
 
 import (
+<<<<<<< HEAD
 	"strings"
 	"unicode"
+=======
+	"fmt"
+	"strings"
+	"unicode"
+
+	"github.com/kubecost/cost-model/pkg/log"
+>>>>>>> 8f84d202d4aa2126642ec05b1f03513893dabfbc
 )
 
 // ConvertToGlueColumnFormat takes a string and runs through various regex
@@ -34,7 +42,11 @@ func ConvertToGlueColumnFormat(columnName string) string {
 		sb.WriteRune(unicode.ToLower(r))
 		prev = r
 	}
+
 	final := sb.String()
+	if prev == '_' { // string any trailing '_'
+		final = final[:len(final)-1]
+	}
 	// Longer column name than expected - remove _ left to right
 	allowedColLen := 128
 	underscoreToRemove := len(final) - allowedColLen