Просмотр исходного кода

Merge branch 'develop' into alibaba-errs

Lars Lehtonen 2 лет назад
Родитель
Сommit
f5a849cddd

+ 3 - 0
pkg/cloud/aws/athenaquerier.go

@@ -110,6 +110,9 @@ func (aq *AthenaQuerier) queryAthenaPaginated(ctx context.Context, query string,
 
 	// Create Athena Client
 	cli, err := aq.GetAthenaClient()
+	if err != nil {
+		return fmt.Errorf("QueryAthenaPaginated: GetAthenaClient error: %s", err.Error())
+	}
 
 	// Query Athena
 	startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)

+ 40 - 11
pkg/cloud/aws/provider.go

@@ -5,6 +5,7 @@ import (
 	"compress/gzip"
 	"context"
 	"encoding/csv"
+	"errors"
 	"fmt"
 	"io"
 	"net/http"
@@ -15,6 +16,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/aws/smithy-go"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 	"github.com/opencost/opencost/pkg/kubecost"
@@ -1559,8 +1561,20 @@ func (aws *AWS) getAllAddresses() ([]*ec2Types.Address, error) {
 			// Query for first page of volume results
 			resp, err := aws.getAddressesForRegion(context.TODO(), region)
 			if err != nil {
-				errorCh <- err
-				return
+				var awsErr smithy.APIError
+				if errors.As(err, &awsErr) {
+					switch awsErr.ErrorCode() {
+					case "AuthFailure", "InvalidClientTokenId", "UnauthorizedOperation":
+						log.DedupedInfof(5, "Unable to get addresses for region %s due to AWS permissions, error message: %s", r, awsErr.ErrorMessage())
+						return
+					default:
+						errorCh <- err
+						return
+					}
+				} else {
+					errorCh <- err
+					return
+				}
 			}
 			addressCh <- resp
 		}(r)
@@ -1661,8 +1675,20 @@ func (aws *AWS) getAllDisks() ([]*ec2Types.Volume, error) {
 			// Query for first page of volume results
 			resp, err := aws.getDisksForRegion(context.TODO(), region, 1000, nil)
 			if err != nil {
-				errorCh <- err
-				return
+				var awsErr smithy.APIError
+				if errors.As(err, &awsErr) {
+					switch awsErr.ErrorCode() {
+					case "AuthFailure", "InvalidClientTokenId", "UnauthorizedOperation":
+						log.DedupedInfof(5, "Unable to get disks for region %s due to AWS permissions, error message: %s", r, awsErr.ErrorMessage())
+						return
+					default:
+						errorCh <- err
+						return
+					}
+				} else {
+					errorCh <- err
+					return
+				}
 			}
 			volumeCh <- resp
 
@@ -1744,14 +1770,17 @@ func (aws *AWS) isDiskOrphaned(vol *ec2Types.Volume) bool {
 }
 
 func (aws *AWS) GetOrphanedResources() ([]models.OrphanedResource, error) {
-	volumes, err := aws.getAllDisks()
-	if err != nil {
-		return nil, err
-	}
+	volumes, volumesErr := aws.getAllDisks()
+	addresses, addressesErr := aws.getAllAddresses()
 
-	addresses, err := aws.getAllAddresses()
-	if err != nil {
-		return nil, err
+	// If we have any orphaned resources - prioritize returning them over returning errors
+	if len(addresses) == 0 && len(volumes) == 0 {
+		if volumesErr != nil {
+			return nil, volumesErr
+		}
+		if addressesErr != nil {
+			return nil, addressesErr
+		}
 	}
 
 	var orphanedResources []models.OrphanedResource

+ 1 - 1
pkg/cloud/gcp/provider.go

@@ -153,7 +153,7 @@ func (gcp *GCP) GetLocalStorageQuery(window, offset time.Duration, rate bool, us
 	}
 	fmtWindow := timeutil.DurationString(window)
 
-	return fmt.Sprintf(fmtQuery, env.GetPromClusterFilter(), baseMetric, fmtWindow, fmtOffset, env.GetPromClusterLabel(), localStorageCost)
+	return fmt.Sprintf(fmtQuery, baseMetric, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), localStorageCost)
 }
 
 func (gcp *GCP) GetConfig() (*models.CustomPricing, error) {

+ 1 - 1
pkg/costmodel/allocation.go

@@ -665,7 +665,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	// split appropriately among each pod's container allocation.
 	podPVCMap := map[podKey][]*pvc{}
 	buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation, podUIDKeyMap, ingestPodUID)
-	applyPVCsToPods(window, podMap, podPVCMap, pvcMap, resolution)
+	applyPVCsToPods(window, podMap, podPVCMap, pvcMap)
 
 	// Identify PVCs without pods and add pv costs to the unmounted Allocation for the pvc's cluster
 	applyUnmountedPVCs(window, podMap, pvcMap)

+ 13 - 4
pkg/costmodel/allocation_helpers.go

@@ -1961,7 +1961,7 @@ func buildPodPVCMap(podPVCMap map[podKey][]*pvc, pvMap map[pvKey]*pv, pvcMap map
 	}
 }
 
-func applyPVCsToPods(window kubecost.Window, podMap map[podKey]*pod, podPVCMap map[podKey][]*pvc, pvcMap map[pvcKey]*pvc, resolution time.Duration) {
+func applyPVCsToPods(window kubecost.Window, podMap map[podKey]*pod, podPVCMap map[podKey][]*pvc, pvcMap map[pvcKey]*pvc) {
 	// Because PVCs can be shared among pods, the respective pv cost
 	// needs to be evenly distributed to those pods based on time
 	// running, as well as the amount of time the pvc was shared.
@@ -2006,12 +2006,20 @@ func applyPVCsToPods(window kubecost.Window, podMap map[podKey]*pod, podPVCMap m
 
 		pvc, ok := pvcMap[thisPVCKey]
 		if !ok {
-			log.DedupedWarningf(5, "Missing pvc with key %s", thisPVCKey)
+			log.Warnf("Allocation: Compute: applyPVCsToPods: missing pvc with key %s", thisPVCKey)
+			continue
+		}
+		if pvc == nil {
+			log.Warnf("Allocation: Compute: applyPVCsToPods: nil pvc with key %s", thisPVCKey)
 			continue
 		}
 
 		// Determine coefficients for each pvc-pod relation.
-		sharedPVCCostCoefficients := getPVCCostCoefficients(intervals, pvc, resolution)
+		sharedPVCCostCoefficients, err := getPVCCostCoefficients(intervals, pvc)
+		if err != nil {
+			log.Warnf("Allocation: Compute: applyPVCsToPods: getPVCCostCoefficients: %s", err)
+			continue
+		}
 
 		// Distribute pvc costs to Allocations
 		for thisPodKey, coeffComponents := range sharedPVCCostCoefficients {
@@ -2043,8 +2051,9 @@ func applyPVCsToPods(window kubecost.Window, podMap map[podKey]*pod, podPVCMap m
 					Cluster: pvc.Volume.Cluster,
 					Name:    pvc.Volume.Name,
 				}
+
 				// Both Cost and byteHours should be multiplied by the coef and divided by count
-				// so that you if all allocations with a given pv key are summed the result of those
+				// so that if all allocations with a given pv key are summed the result of those
 				// would be equal to the values of the original pv
 				count := float64(len(pod.Allocations))
 				alloc.PVs[pvKey] = &kubecost.PVAllocation{

+ 18 - 8
pkg/costmodel/intervals.go

@@ -1,6 +1,7 @@
 package costmodel
 
 import (
+	"fmt"
 	"sort"
 	"time"
 
@@ -79,40 +80,49 @@ func getIntervalPointsFromWindows(windows map[podKey]kubecost.Window) IntervalPo
 // getPVCCostCoefficients gets a coefficient which represents the scale
 // factor that each PVC in a pvcIntervalMap and corresponding slice of
 // IntervalPoints intervals uses to calculate a cost for that PVC's PV.
-func getPVCCostCoefficients(intervals IntervalPoints, thisPVC *pvc, resolution time.Duration) map[podKey][]CoefficientComponent {
+func getPVCCostCoefficients(intervals IntervalPoints, thisPVC *pvc) (map[podKey][]CoefficientComponent, error) {
 	// pvcCostCoefficientMap has a format such that the individual coefficient
 	// components are preserved for testing purposes.
 	pvcCostCoefficientMap := make(map[podKey][]CoefficientComponent)
 
-	// Reset the start time by the offset as well. so that offset is not used in coefficient calculation!
-	startTime := thisPVC.Start.Add(resolution)
-	pvcWindow := kubecost.NewWindow(&startTime, &thisPVC.End)
+	pvcWindow := kubecost.NewWindow(&thisPVC.Start, &thisPVC.End)
+	pvcWindowDurationMinutes := pvcWindow.Duration().Minutes()
+	if pvcWindowDurationMinutes <= 0.0 {
+		// Protect against Inf and NaN issues that would be caused by dividing
+		// by zero later on.
+		return nil, fmt.Errorf("detected PVC with window of zero duration: %s/%s/%s", thisPVC.Cluster, thisPVC.Namespace, thisPVC.Name)
+	}
+
 	unmountedKey := getUnmountedPodKey(thisPVC.Cluster)
 
 	var void struct{}
 	activeKeys := map[podKey]struct{}{}
 
-	currentTime := startTime
+	currentTime := thisPVC.Start
+
 	// For each interval i.e. for any time a pod-PVC relation ends or starts...
 	for _, point := range intervals {
 		// If the current point happens at a later time than the previous point
 		if !point.Time.Equal(currentTime) {
+			// If there are active keys, attribute one unit of proportion to
+			// each active key.
 			for key := range activeKeys {
 				pvcCostCoefficientMap[key] = append(
 					pvcCostCoefficientMap[key],
 					CoefficientComponent{
-						Time:       point.Time.Sub(currentTime).Minutes() / pvcWindow.Duration().Minutes(),
+						Time:       point.Time.Sub(currentTime).Minutes() / pvcWindowDurationMinutes,
 						Proportion: 1.0 / float64(len(activeKeys)),
 					},
 				)
 
 			}
+
 			// If there are no active keys attribute all cost to the unmounted pv
 			if len(activeKeys) == 0 {
 				pvcCostCoefficientMap[unmountedKey] = append(
 					pvcCostCoefficientMap[unmountedKey],
 					CoefficientComponent{
-						Time:       point.Time.Sub(currentTime).Minutes() / pvcWindow.Duration().Minutes(),
+						Time:       point.Time.Sub(currentTime).Minutes() / pvcWindowDurationMinutes,
 						Proportion: 1.0,
 					},
 				)
@@ -142,7 +152,7 @@ func getPVCCostCoefficients(intervals IntervalPoints, thisPVC *pvc, resolution t
 			},
 		)
 	}
-	return pvcCostCoefficientMap
+	return pvcCostCoefficientMap, nil
 }
 
 // getCoefficientFromComponents takes the components of a PVC-pod PV cost coefficient

+ 53 - 25
pkg/costmodel/intervals_test.go

@@ -1,6 +1,7 @@
 package costmodel
 
 import (
+	"fmt"
 	"reflect"
 	"testing"
 	"time"
@@ -150,32 +151,39 @@ func TestGetIntervalPointsFromWindows(t *testing.T) {
 }
 
 func TestGetPVCCostCoefficients(t *testing.T) {
+	pod1Key := newPodKey("cluster1", "namespace1", "pod1")
+	pod2Key := newPodKey("cluster1", "namespace1", "pod2")
+	pod3Key := newPodKey("cluster1", "namespace1", "pod3")
+	pod4Key := newPodKey("cluster1", "namespace1", "pod4")
+	ummountedPodKey := newPodKey("cluster1", kubecost.UnmountedSuffix, kubecost.UnmountedSuffix)
+
 	pvc1 := &pvc{
-		Bytes:     0,
+		Bytes:     100 * 1024 * 1024 * 1024,
 		Name:      "pvc1",
 		Cluster:   "cluster1",
 		Namespace: "namespace1",
 		Start:     time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
 		End:       time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
 	}
-	pod1Key := newPodKey("cluster1", "namespace1", "pod1")
-	pod2Key := newPodKey("cluster1", "namespace1", "pod2")
-	pod3Key := newPodKey("cluster1", "namespace1", "pod3")
-	pod4Key := newPodKey("cluster1", "namespace1", "pod4")
-	ummountedPodKey := newPodKey("cluster1", kubecost.UnmountedSuffix, kubecost.UnmountedSuffix)
 
-	zeroDuration, _ := time.ParseDuration("0m0s")
-	fiveMinOffset, _ := time.ParseDuration("5m")
-	// Reflects the pvc that is offset by duration, the actual case that happens in allocation workflow.
-	// before core-370, the offset was causing a unmounted shared pvc coefficient map for the duration of the offset.
-	pvcWithDurationOffset := &pvc{
-		Bytes:     0,
-		Name:      "pvc1",
+	pvc2 := &pvc{
+		Bytes:     100 * 1024 * 1024 * 1024,
+		Name:      "pvc2",
 		Cluster:   "cluster1",
 		Namespace: "namespace1",
-		Start:     time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC).Add(-fiveMinOffset),
+		Start:     time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
 		End:       time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
 	}
+
+	pvc3 := &pvc{
+		Bytes:     100 * 1024 * 1024 * 1024,
+		Name:      "pvc3",
+		Cluster:   "cluster1",
+		Namespace: "namespace1",
+		Start:     time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+		End:       time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+	}
+
 	cases := []struct {
 		name           string
 		pvc            *pvc
@@ -183,6 +191,7 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 		intervals      []IntervalPoint
 		resolution     time.Duration
 		expected       map[podKey][]CoefficientComponent
+		expError       error
 	}{
 		{
 			name: "four pods w/ various overlaps",
@@ -197,7 +206,7 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", pod3Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", pod1Key),
 			},
-			resolution: zeroDuration,
+			expError: nil,
 			expected: map[podKey][]CoefficientComponent{
 				pod1Key: {
 					{0.5, 0.25},
@@ -226,7 +235,7 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "end", pod1Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", pod2Key),
 			},
-			resolution: zeroDuration,
+			expError: nil,
 			expected: map[podKey][]CoefficientComponent{
 				pod1Key: {
 					{1.0, 0.5},
@@ -245,7 +254,7 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", pod1Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", pod2Key),
 			},
-			resolution: zeroDuration,
+			expError: nil,
 			expected: map[podKey][]CoefficientComponent{
 				pod1Key: {
 					{0.5, 0.5},
@@ -265,7 +274,7 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", pod1Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", pod1Key),
 			},
-			resolution: zeroDuration,
+			expError: nil,
 			expected: map[podKey][]CoefficientComponent{
 				pod1Key: {
 					{1.0, 1.0},
@@ -281,7 +290,7 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 				NewIntervalPoint(time.Date(2021, 2, 19, 8, 45, 0, 0, time.UTC), "start", pod2Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", pod2Key),
 			},
-			resolution: zeroDuration,
+			expError: nil,
 			expected: map[podKey][]CoefficientComponent{
 				pod1Key: {
 					{1.0, 0.25},
@@ -301,7 +310,7 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 				NewIntervalPoint(time.Date(2021, 2, 19, 8, 15, 0, 0, time.UTC), "start", pod1Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 8, 45, 0, 0, time.UTC), "end", pod1Key),
 			},
-			resolution: zeroDuration,
+			expError: nil,
 			expected: map[podKey][]CoefficientComponent{
 				pod1Key: {
 					{1.0, 0.5},
@@ -312,17 +321,16 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 				},
 			},
 		},
-		// Test case to ensure the offset doesnt cause any unmounted
 		{
-			name: "pvcMap with duration offset not causing any unmounted entry in sharedPV Coefficient",
-			pvc:  pvcWithDurationOffset,
+			name: "back to back pods, full coverage",
+			pvc:  pvc2,
 			intervals: []IntervalPoint{
 				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", pod1Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", pod2Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "end", pod1Key),
 				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", pod2Key),
 			},
-			resolution: fiveMinOffset,
+			expError: nil,
 			expected: map[podKey][]CoefficientComponent{
 				pod1Key: {
 					{1.0, 0.5},
@@ -332,11 +340,31 @@ func TestGetPVCCostCoefficients(t *testing.T) {
 				},
 			},
 		},
+		{
+			name: "zero duration",
+			pvc:  pvc3,
+			intervals: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", pod1Key),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "end", pod1Key),
+			},
+			expError: fmt.Errorf("detected PVC with window of zero duration: %s/%s/%s", "cluster1", "namespace1", "pvc3"),
+			expected: nil,
+		},
 	}
 
 	for _, testCase := range cases {
 		t.Run(testCase.name, func(t *testing.T) {
-			result := getPVCCostCoefficients(testCase.intervals, testCase.pvc, testCase.resolution)
+			result, err := getPVCCostCoefficients(testCase.intervals, testCase.pvc)
+			if err != nil {
+				if testCase.expError == nil {
+					t.Errorf("getPVCCostCoefficients failed: got unexpected error: %v", err)
+				}
+				return
+			}
+
+			if err == nil && testCase.expError != nil {
+				t.Errorf("getPVCCostCoefficients failed: did not get expected error: %v", testCase.expError)
+			}
 
 			if !reflect.DeepEqual(result, testCase.expected) {
 				t.Errorf("getPVCCostCoefficients test failed: %s: Got %+v but expected %+v", testCase.name, result, testCase.expected)

+ 2 - 1
pkg/filter21/allocation/fields.go

@@ -25,7 +25,8 @@ const (
 // Filtering based on label aliases (team, department, etc.) should be a
 // responsibility of the query handler. By the time it reaches this
 // structured representation, we shouldn't have to be aware of what is
-// aliased to what.
+// aliased to what. The aliases correspond to either a label or annotation,
+// defined by the user.
 type AllocationAlias string
 
 const (

+ 39 - 2
pkg/kubecost/allocationfilter_test.go

@@ -440,11 +440,11 @@ func Test_AllocationFilterCondition_Matches(t *testing.T) {
 			expected: true,
 		},
 		{
-			name: `product != unallocated -> true`,
+			name: `department != unallocated -> true`,
 			a: &Allocation{
 				Properties: &AllocationProperties{
 					Annotations: AllocationAnnotations{
-						"keyproduct": "foo",
+						"keydepartment": "foo",
 					},
 				},
 			},
@@ -460,6 +460,43 @@ func Test_AllocationFilterCondition_Matches(t *testing.T) {
 			},
 			expected: true,
 		},
+		{
+			name: `product == unallocated -> true`,
+			a: &Allocation{
+				Properties: &AllocationProperties{
+					Annotations: AllocationAnnotations{
+						"keydepartment": "foo",
+					},
+				},
+			},
+			filter: &ast.EqualOp{
+				Left: ast.Identifier{
+					Field: ast.NewAliasField(afilter.AliasProduct),
+				},
+				Right: UnallocatedSuffix,
+			},
+			expected: true,
+		},
+		{
+			name: `product == "" -> true`,
+			a: &Allocation{
+				Properties: &AllocationProperties{
+					Labels: AllocationLabels{
+						"keydepartment": "foo",
+					},
+					Annotations: AllocationAnnotations{
+						"keyowner": "bar",
+					},
+				},
+			},
+			filter: &ast.EqualOp{
+				Left: ast.Identifier{
+					Field: ast.NewAliasField(afilter.AliasProduct),
+				},
+				Right: "",
+			},
+			expected: true,
+		},
 	}
 
 	for _, c := range cases {

+ 26 - 2
pkg/kubecost/allocationmatcher.go

@@ -244,7 +244,9 @@ func convertAliasFilterToLabelAnnotationFilter(aliasKey string, filterValue stri
 		return nil, fmt.Errorf("unsupported op type '%s' for alias conversion", op)
 	}
 
-	return ops.Or(
+	// This handles the case where a label EXISTS/IS PRESENT for (is extant)
+	// for an aliased field. That's the primary case.
+	extantCaseNode := ops.Or(
 		ops.And(
 			ops.Contains(afilter.FieldLabel, aliasKey),
 			labelOp,
@@ -256,5 +258,27 @@ func convertAliasFilterToLabelAnnotationFilter(aliasKey string, filterValue stri
 				annotationOp,
 			),
 		),
-	), nil
+	)
+	var node ast.FilterNode
+	// This handles the special case of unallocated aliased value. There's
+	// two forms of this; first is where the label/annotation exists, but
+	// has an empty string value. That's actually handled by the extant case,
+	// because the API passes through that empty string. The other is when
+	// the aliased label/annotation doesn't exist for an allocation. That's
+	// what this modification to the tree handles. This matters when you're
+	// trying to drill into/identify workloads "not allocated" within that
+	// specific aliased field.
+	if filterValue == "" || filterValue == UnallocatedSuffix {
+		node = ops.Or(
+			extantCaseNode,
+			ops.And(
+				ops.Not(ops.Contains(afilter.FieldLabel, aliasKey)),
+				ops.Not(ops.Contains(afilter.FieldAnnotation, aliasKey)),
+			),
+		)
+	} else {
+		node = extantCaseNode
+	}
+
+	return node, nil
 }

+ 1 - 1
pkg/metrics/deploymentmetrics.go

@@ -42,7 +42,7 @@ func (kdc KubecostDeploymentCollector) Collect(ch chan<- prometheus.Metric) {
 		deploymentName := deployment.GetName()
 		deploymentNS := deployment.GetNamespace()
 
-		labels, values := prom.KubeLabelsToLabels(deployment.Spec.Selector.MatchLabels)
+		labels, values := prom.KubeLabelsToLabels(prom.SanitizeLabels(deployment.Spec.Selector.MatchLabels))
 		if len(labels) > 0 {
 			m := newDeploymentMatchLabelsMetric(deploymentName, deploymentNS, "deployment_match_labels", labels, values)
 			ch <- m

+ 1 - 1
pkg/metrics/namespacemetrics.go

@@ -139,7 +139,7 @@ func (nsac KubeNamespaceCollector) Collect(ch chan<- prometheus.Metric) {
 	for _, namespace := range namespaces {
 		nsName := namespace.GetName()
 
-		labels, values := prom.KubeLabelsToLabels(namespace.Labels)
+		labels, values := prom.KubeLabelsToLabels(prom.SanitizeLabels(namespace.Labels))
 		if len(labels) > 0 {
 			m := newNamespaceAnnotationsMetric("kube_namespace_labels", nsName, labels, values)
 			ch <- m

+ 1 - 1
pkg/metrics/nodemetrics.go

@@ -120,7 +120,7 @@ func (nsac KubeNodeCollector) Collect(ch chan<- prometheus.Metric) {
 
 		// node labels
 		if _, disabled := disabledMetrics["kube_node_labels"]; !disabled {
-			labelNames, labelValues := prom.KubePrependQualifierToLabels(node.GetLabels(), "label_")
+			labelNames, labelValues := prom.KubePrependQualifierToLabels(prom.SanitizeLabels(node.GetLabels()), "label_")
 			ch <- newKubeNodeLabelsMetric(nodeName, "kube_node_labels", labelNames, labelValues)
 		}
 

+ 1 - 31
pkg/metrics/podlabelmetrics.go

@@ -6,36 +6,6 @@ import (
 	"github.com/prometheus/client_golang/prometheus"
 )
 
-//--------------------------------------------------------------------------
-//  KubecostPodCollector
-//--------------------------------------------------------------------------
-
-// KubecostPodCollector is a prometheus collector that emits pod metrics
-type KubecostPodLabelsCollector struct {
-	KubeClusterCache clustercache.ClusterCache
-}
-
-// Describe sends the super-set of all possible descriptors of metrics
-// collected by this Collector.
-func (kpmc KubecostPodLabelsCollector) Describe(ch chan<- *prometheus.Desc) {
-	ch <- prometheus.NewDesc("kube_pod_annotations", "All annotations for each pod prefix with annotation_", []string{}, nil)
-}
-
-// Collect is called by the Prometheus registry when collecting metrics.
-func (kpmc KubecostPodLabelsCollector) Collect(ch chan<- prometheus.Metric) {
-	pods := kpmc.KubeClusterCache.GetAllPods()
-	for _, pod := range pods {
-		podName := pod.GetName()
-		podNS := pod.GetNamespace()
-
-		// Pod Annotations
-		labels, values := prom.KubeAnnotationsToLabels(pod.Annotations)
-		if len(labels) > 0 {
-			ch <- newPodAnnotationMetric("kube_pod_annotations", podNS, podName, labels, values)
-		}
-	}
-}
-
 //--------------------------------------------------------------------------
 //  KubePodLabelsCollector
 //--------------------------------------------------------------------------
@@ -71,7 +41,7 @@ func (kpmc KubePodLabelsCollector) Collect(ch chan<- prometheus.Metric) {
 
 		// Pod Labels
 		if _, disabled := disabledMetrics["kube_pod_labels"]; !disabled {
-			labelNames, labelValues := prom.KubePrependQualifierToLabels(pod.GetLabels(), "label_")
+			labelNames, labelValues := prom.KubePrependQualifierToLabels(prom.SanitizeLabels(pod.GetLabels()), "label_")
 			ch <- newKubePodLabelsMetric("kube_pod_labels", podNS, podName, podUID, labelNames, labelValues)
 		}
 

+ 1 - 1
pkg/metrics/podmetrics.go

@@ -135,7 +135,7 @@ func (kpmc KubePodCollector) Collect(ch chan<- prometheus.Metric) {
 
 		// Pod Labels
 		if _, disabled := disabledMetrics["kube_pod_labels"]; !disabled {
-			labelNames, labelValues := prom.KubePrependQualifierToLabels(pod.GetLabels(), "label_")
+			labelNames, labelValues := prom.KubePrependQualifierToLabels(prom.SanitizeLabels(pod.GetLabels()), "label_")
 			ch <- newKubePodLabelsMetric("kube_pod_labels", podNS, podName, podUID, labelNames, labelValues)
 		}
 

+ 1 - 1
pkg/metrics/servicemetrics.go

@@ -42,7 +42,7 @@ func (sc KubecostServiceCollector) Collect(ch chan<- prometheus.Metric) {
 		serviceName := svc.GetName()
 		serviceNS := svc.GetNamespace()
 
-		labels, values := prom.KubeLabelsToLabels(svc.Spec.Selector)
+		labels, values := prom.KubeLabelsToLabels(prom.SanitizeLabels(svc.Spec.Selector))
 		if len(labels) > 0 {
 			m := newServiceSelectorLabelsMetric(serviceName, serviceNS, "service_selector_labels", labels, values)
 			ch <- m

+ 1 - 1
pkg/metrics/statefulsetmetrics.go

@@ -41,7 +41,7 @@ func (sc KubecostStatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
 		statefulsetName := statefulset.GetName()
 		statefulsetNS := statefulset.GetNamespace()
 
-		labels, values := prom.KubeLabelsToLabels(statefulset.Spec.Selector.MatchLabels)
+		labels, values := prom.KubeLabelsToLabels(prom.SanitizeLabels(statefulset.Spec.Selector.MatchLabels))
 		if len(labels) > 0 {
 			m := newStatefulsetMatchLabelsMetric(statefulsetName, statefulsetNS, "statefulSet_match_labels", labels, values)
 			ch <- m

+ 2 - 1
pkg/metrics/telemetry.go

@@ -2,9 +2,10 @@ package metrics
 
 import (
 	"fmt"
-	"github.com/opencost/opencost/pkg/version"
 	"sync"
 
+	"github.com/opencost/opencost/pkg/version"
+
 	"github.com/kubecost/events"
 	"github.com/prometheus/client_golang/prometheus"
 )

+ 14 - 0
pkg/prom/metrics.go

@@ -104,3 +104,17 @@ func KubeAnnotationsToLabels(labels map[string]string) ([]string, []string) {
 func SanitizeLabelName(s string) string {
 	return invalidLabelCharRE.ReplaceAllString(s, "_")
 }
+
+// SanitizeLabels sanitizes all label names in the given map. This may cause
+// collisions, which is intentional as collisions that are not caught prior to
+// attempted emission will cause fatal errors. In the case of a collision, the
+// last value seen will be set, and all previous values will be overwritten.
+func SanitizeLabels(labels map[string]string) map[string]string {
+	response := make(map[string]string, len(labels))
+
+	for k, v := range labels {
+		response[SanitizeLabelName(k)] = v
+	}
+
+	return response
+}

+ 65 - 0
pkg/prom/metrics_test.go

@@ -2,6 +2,7 @@ package prom
 
 import (
 	"fmt"
+	"reflect"
 	"testing"
 )
 
@@ -93,3 +94,67 @@ func TestKubeLabelsToPromLabels(t *testing.T) {
 		t.Errorf("%s", err)
 	}
 }
+
+func TestSanitizeLabels(t *testing.T) {
+	type testCase struct {
+		in  map[string]string
+		exp map[string]string
+	}
+
+	tcs := map[string]testCase{
+		"empty labels": {
+			in:  map[string]string{},
+			exp: map[string]string{},
+		},
+		"no op": {
+			in: map[string]string{
+				"foo": "bar",
+				"baz": "loo",
+			},
+			exp: map[string]string{
+				"foo": "bar",
+				"baz": "loo",
+			},
+		},
+		"modification, no collisions": {
+			in: map[string]string{
+				"foo-foo":   "bar",
+				"baz---baz": "loo",
+			},
+			exp: map[string]string{
+				"foo_foo":   "bar",
+				"baz___baz": "loo",
+			},
+		},
+		"modification, one collision": {
+			in: map[string]string{
+				"foo-foo":   "bar",
+				"foo+foo":   "bar",
+				"baz---baz": "loo",
+			},
+			exp: map[string]string{
+				"foo_foo":   "bar",
+				"baz___baz": "loo",
+			},
+		},
+		"modification, all collisions": {
+			in: map[string]string{
+				"foo-foo": "bar",
+				"foo+foo": "bar",
+				"foo_foo": "bar",
+			},
+			exp: map[string]string{
+				"foo_foo": "bar",
+			},
+		},
+	}
+
+	for name, tc := range tcs {
+		t.Run(name, func(t *testing.T) {
+			act := SanitizeLabels(tc.in)
+			if !reflect.DeepEqual(tc.exp, act) {
+				t.Errorf("sanitizing labels failed for case %s: %+v != %+v", name, tc.exp, act)
+			}
+		})
+	}
+}