Bläddra i källkod

Merge branch 'niko/aggapi' of github.com:kubecost/cost-model into niko/aggapi

Niko Kovacevic 5 år sedan
förälder
incheckning
0f3605420a

+ 6 - 3
pkg/costmodel/aggregation.go

@@ -161,7 +161,7 @@ func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, label
 	// the cardinality matches
 	if len(labelNames) == len(labelValues) {
 		for i := range labelNames {
-			cleanedLname := SanitizeLabelName(strings.Trim(labelNames[i], " "))
+			cleanedLname := prom.SanitizeLabelName(strings.Trim(labelNames[i], " "))
 			if values, ok := sr.LabelSelectors[cleanedLname]; ok {
 				values[strings.Trim(labelValues[i], " ")] = true
 			} else {
@@ -1138,7 +1138,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 			lTrim := strings.TrimSpace(l)
 			label := strings.Split(lTrim, "=")
 			if len(label) == 2 {
-				ln := SanitizeLabelName(strings.TrimSpace(label[0]))
+				ln := prom.SanitizeLabelName(strings.TrimSpace(label[0]))
 				lv := strings.TrimSpace(label[1])
 				labelValues[ln] = append(labelValues[ln], lv)
 			} else {
@@ -1298,6 +1298,9 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 
 		costData, err = a.Model.ComputeCostDataRange(promClient, a.KubeClientSet, a.CloudProvider, start, end, window, resolutionHours, "", "", remoteEnabled, offset)
 		if err != nil {
+			if prom.IsErrorCollection(err) {
+				return nil, "", err
+			}
 			if pce, ok := err.(prom.CommError); ok {
 				return nil, "", pce
 			}
@@ -1678,7 +1681,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	if len(subfieldStr) > 0 {
 		s := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
 		for _, rawLabel := range s {
-			subfields = append(subfields, SanitizeLabelName(rawLabel))
+			subfields = append(subfields, prom.SanitizeLabelName(rawLabel))
 		}
 	}
 

+ 1 - 1
pkg/costmodel/costmodel.go

@@ -2096,7 +2096,7 @@ func getNamespaceLabels(cache clustercache.ClusterCache, clusterID string) (map[
 	for _, ns := range nss {
 		labels := make(map[string]string)
 		for k, v := range ns.Labels {
-			labels[SanitizeLabelName(k)] = v
+			labels[prom.SanitizeLabelName(k)] = v
 		}
 		nsToLabels[ns.Name+","+clusterID] = labels
 	}

+ 5 - 48
pkg/costmodel/metrics.go

@@ -2,8 +2,6 @@ package costmodel
 
 import (
 	"math"
-	"regexp"
-	"sort"
 	"strconv"
 	"strings"
 	"sync"
@@ -23,10 +21,6 @@ import (
 	"k8s.io/klog"
 )
 
-var (
-	invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
-)
-
 //--------------------------------------------------------------------------
 //  StatefulsetCollector
 //--------------------------------------------------------------------------
@@ -46,7 +40,7 @@ func (sc StatefulsetCollector) Describe(ch chan<- *prometheus.Desc) {
 func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
 	ds := sc.KubeClusterCache.GetAllStatefulSets()
 	for _, statefulset := range ds {
-		labels, values := kubeLabelsToPrometheusLabels(statefulset.Spec.Selector.MatchLabels)
+		labels, values := prom.KubeLabelsToLabels(statefulset.Spec.Selector.MatchLabels)
 		m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
 		ch <- m
 	}
@@ -132,7 +126,7 @@ func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
 func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
 	ds := sc.KubeClusterCache.GetAllDeployments()
 	for _, deployment := range ds {
-		labels, values := kubeLabelsToPrometheusLabels(deployment.Spec.Selector.MatchLabels)
+		labels, values := prom.KubeLabelsToLabels(deployment.Spec.Selector.MatchLabels)
 		m := newDeploymentMetric(deployment.GetName(), deployment.GetNamespace(), "deployment_match_labels", labels, values)
 		ch <- m
 	}
@@ -218,7 +212,7 @@ func (sc ServiceCollector) Describe(ch chan<- *prometheus.Desc) {
 func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
 	svcs := sc.KubeClusterCache.GetAllServices()
 	for _, svc := range svcs {
-		labels, values := kubeLabelsToPrometheusLabels(svc.Spec.Selector)
+		labels, values := prom.KubeLabelsToLabels(svc.Spec.Selector)
 		m := newServiceMetric(svc.GetName(), svc.GetNamespace(), "service_selector_labels", labels, values)
 		ch <- m
 	}
@@ -304,7 +298,7 @@ func (nsac NamespaceAnnotationCollector) Describe(ch chan<- *prometheus.Desc) {
 func (nsac NamespaceAnnotationCollector) Collect(ch chan<- prometheus.Metric) {
 	namespaces := nsac.KubeClusterCache.GetAllNamespaces()
 	for _, namespace := range namespaces {
-		labels, values := kubeAnnotationstoPrometheusLabels(namespace.Annotations)
+		labels, values := prom.KubeAnnotationsToLabels(namespace.Annotations)
 		m := newNamespaceAnnotationsMetric(namespace.GetName(), "kube_namespace_annotations", labels, values)
 		ch <- m
 	}
@@ -384,7 +378,7 @@ func (pac PodAnnotationCollector) Describe(ch chan<- *prometheus.Desc) {
 func (pac PodAnnotationCollector) Collect(ch chan<- prometheus.Metric) {
 	pods := pac.KubeClusterCache.GetAllPods()
 	for _, pod := range pods {
-		labels, values := kubeAnnotationstoPrometheusLabels(pod.Annotations)
+		labels, values := prom.KubeAnnotationsToLabels(pod.Annotations)
 		m := newPodAnnotationMetric(pod.GetNamespace(), pod.GetName(), "kube_pod_annotations", labels, values)
 		ch <- m
 	}
@@ -880,40 +874,3 @@ func StopCostModelMetricRecording() {
 		close(recordingStop)
 	}
 }
-
-// Converts kubernetes labels into prometheus labels.
-func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string) {
-	labelKeys := make([]string, 0, len(labels))
-	for k := range labels {
-		labelKeys = append(labelKeys, k)
-	}
-	sort.Strings(labelKeys)
-
-	labelValues := make([]string, 0, len(labels))
-	for i, k := range labelKeys {
-		labelKeys[i] = "label_" + SanitizeLabelName(k)
-		labelValues = append(labelValues, labels[k])
-	}
-	return labelKeys, labelValues
-}
-
-// Converts kubernetes annotations into prometheus labels.
-func kubeAnnotationstoPrometheusLabels(labels map[string]string) ([]string, []string) {
-	labelKeys := make([]string, 0, len(labels))
-	for k := range labels {
-		labelKeys = append(labelKeys, k)
-	}
-	sort.Strings(labelKeys)
-
-	labelValues := make([]string, 0, len(labels))
-	for i, k := range labelKeys {
-		labelKeys[i] = "annotation_" + SanitizeLabelName(k)
-		labelValues = append(labelValues, labels[k])
-	}
-	return labelKeys, labelValues
-}
-
-// Replaces all illegal prometheus label characters with _
-func SanitizeLabelName(s string) string {
-	return invalidLabelCharRE.ReplaceAllString(s, "_")
-}

+ 2 - 0
pkg/kubecost/allocation.go

@@ -367,6 +367,8 @@ type AllocationSet struct {
 	allocations map[string]*Allocation
 	idleKeys    map[string]bool
 	Window      Window
+	Warnings    []string
+	Errors      []string
 }
 
 // NewAllocationSet instantiates a new AllocationSet and, optionally, inserts

+ 5 - 3
pkg/kubecost/asset.go

@@ -2304,9 +2304,11 @@ func (sa *SharedAsset) String() string {
 // a window. An AssetSet is mutable, so treat it like a threadsafe map.
 type AssetSet struct {
 	sync.RWMutex
-	assets map[string]Asset
-	props  []AssetProperty
-	Window Window
+	assets   map[string]Asset
+	props    []AssetProperty
+	Window   Window
+	Warnings []string
+	Errors   []string
 }
 
 // NewAssetSet instantiates a new AssetSet and, optionally, inserts

+ 1 - 1
pkg/kubecost/bingen.go

@@ -21,4 +21,4 @@ package kubecost
 // @bingen:generate:AllocationSet
 // @bingen:generate:AllocationSetRange
 
-//go:generate bingen -package=kubecost -version=3 -buffer=github.com/kubecost/cost-model/pkg/util
+//go:generate bingen -package=kubecost -version=4 -buffer=github.com/kubecost/cost-model/pkg/util

+ 175 - 50
pkg/kubecost/kubecost_codecs.go

@@ -21,8 +21,13 @@ import (
 	util "github.com/kubecost/cost-model/pkg/util"
 )
 
-// GeneratorPackageName is the package the generator is targetting
-const GeneratorPackageName string = "kubecost"
+const (
+	// GeneratorPackageName is the package the generator is targetting
+	GeneratorPackageName string = "kubecost"
+
+	// CodecVersion is the version passed into the generator
+	CodecVersion uint8 = 4
+)
 
 //--------------------------------------------------------------------------
 //  Type Map
@@ -96,7 +101,7 @@ func resolveType(t string) (pkg string, name string, isPtr bool) {
 // into a byte array
 func (target *Allocation) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	buff.WriteString(target.Name) // write string
 	// --- [begin][write][reference](Properties) ---
@@ -160,8 +165,8 @@ func (target *Allocation) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling Allocation. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Allocation. Expected %d, got %d", CodecVersion, version)
 	}
 
 	a := buff.ReadString() // read string
@@ -267,7 +272,7 @@ func (target *Allocation) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *AllocationSet) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.allocations == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -320,6 +325,32 @@ func (target *AllocationSet) MarshalBinary() (data []byte, err error) {
 	buff.WriteBytes(b)
 	// --- [end][write][struct](Window) ---
 
+	if target.Warnings == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][slice]([]string) ---
+		buff.WriteInt(len(target.Warnings)) // array length
+		for i := 0; i < len(target.Warnings); i++ {
+			buff.WriteString(target.Warnings[i]) // write string
+		}
+		// --- [end][write][slice]([]string) ---
+
+	}
+	if target.Errors == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][slice]([]string) ---
+		buff.WriteInt(len(target.Errors)) // array length
+		for j := 0; j < len(target.Errors); j++ {
+			buff.WriteString(target.Errors[j]) // write string
+		}
+		// --- [end][write][slice]([]string) ---
+
+	}
 	return buff.Bytes(), nil
 }
 
@@ -330,8 +361,8 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling AllocationSet. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling AllocationSet. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {
@@ -399,6 +430,40 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) error {
 	target.Window = *n
 	// --- [end][read][struct](Window) ---
 
+	if buff.ReadUInt8() == uint8(0) {
+		target.Warnings = nil
+	} else {
+		// --- [begin][read][slice]([]string) ---
+		r := buff.ReadInt() // array len
+		q := make([]string, r)
+		for ii := 0; ii < r; ii++ {
+			var s string
+			t := buff.ReadString() // read string
+			s = t
+
+			q[ii] = s
+		}
+		target.Warnings = q
+		// --- [end][read][slice]([]string) ---
+
+	}
+	if buff.ReadUInt8() == uint8(0) {
+		target.Errors = nil
+	} else {
+		// --- [begin][read][slice]([]string) ---
+		w := buff.ReadInt() // array len
+		u := make([]string, w)
+		for jj := 0; jj < w; jj++ {
+			var x string
+			y := buff.ReadString() // read string
+			x = y
+
+			u[jj] = x
+		}
+		target.Errors = u
+		// --- [end][read][slice]([]string) ---
+
+	}
 	return nil
 }
 
@@ -410,7 +475,7 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *AllocationSetRange) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.allocations == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -449,8 +514,8 @@ func (target *AllocationSetRange) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling AllocationSetRange. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling AllocationSetRange. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {
@@ -493,7 +558,7 @@ func (target *AllocationSetRange) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *Any) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	// --- [begin][write][alias](AssetLabels) ---
 	if map[string]string(target.labels) == nil {
@@ -566,8 +631,8 @@ func (target *Any) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling Any. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Any. Expected %d, got %d", CodecVersion, version)
 	}
 
 	// --- [begin][read][alias](AssetLabels) ---
@@ -661,7 +726,7 @@ func (target *Any) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *AssetProperties) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	buff.WriteString(target.Category)   // write string
 	buff.WriteString(target.Provider)   // write string
@@ -681,8 +746,8 @@ func (target *AssetProperties) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling AssetProperties. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling AssetProperties. Expected %d, got %d", CodecVersion, version)
 	}
 
 	a := buff.ReadString() // read string
@@ -720,7 +785,7 @@ func (target *AssetProperties) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *AssetSet) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.assets == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -781,6 +846,32 @@ func (target *AssetSet) MarshalBinary() (data []byte, err error) {
 	buff.WriteBytes(d)
 	// --- [end][write][struct](Window) ---
 
+	if target.Warnings == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][slice]([]string) ---
+		buff.WriteInt(len(target.Warnings)) // array length
+		for j := 0; j < len(target.Warnings); j++ {
+			buff.WriteString(target.Warnings[j]) // write string
+		}
+		// --- [end][write][slice]([]string) ---
+
+	}
+	if target.Errors == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][slice]([]string) ---
+		buff.WriteInt(len(target.Errors)) // array length
+		for ii := 0; ii < len(target.Errors); ii++ {
+			buff.WriteString(target.Errors[ii]) // write string
+		}
+		// --- [end][write][slice]([]string) ---
+
+	}
 	return buff.Bytes(), nil
 }
 
@@ -791,8 +882,8 @@ func (target *AssetSet) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling AssetSet. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling AssetSet. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {
@@ -868,6 +959,40 @@ func (target *AssetSet) UnmarshalBinary(data []byte) error {
 	target.Window = *q
 	// --- [end][read][struct](Window) ---
 
+	if buff.ReadUInt8() == uint8(0) {
+		target.Warnings = nil
+	} else {
+		// --- [begin][read][slice]([]string) ---
+		u := buff.ReadInt() // array len
+		t := make([]string, u)
+		for ii := 0; ii < u; ii++ {
+			var w string
+			x := buff.ReadString() // read string
+			w = x
+
+			t[ii] = w
+		}
+		target.Warnings = t
+		// --- [end][read][slice]([]string) ---
+
+	}
+	if buff.ReadUInt8() == uint8(0) {
+		target.Errors = nil
+	} else {
+		// --- [begin][read][slice]([]string) ---
+		z := buff.ReadInt() // array len
+		y := make([]string, z)
+		for jj := 0; jj < z; jj++ {
+			var aa string
+			bb := buff.ReadString() // read string
+			aa = bb
+
+			y[jj] = aa
+		}
+		target.Errors = y
+		// --- [end][read][slice]([]string) ---
+
+	}
 	return nil
 }
 
@@ -879,7 +1004,7 @@ func (target *AssetSet) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *AssetSetRange) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.assets == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -918,8 +1043,8 @@ func (target *AssetSetRange) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling AssetSetRange. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling AssetSetRange. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {
@@ -962,7 +1087,7 @@ func (target *AssetSetRange) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *Breakdown) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	buff.WriteFloat64(target.Idle)   // write float64
 	buff.WriteFloat64(target.Other)  // write float64
@@ -978,8 +1103,8 @@ func (target *Breakdown) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling Breakdown. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Breakdown. Expected %d, got %d", CodecVersion, version)
 	}
 
 	a := buff.ReadFloat64() // read float64
@@ -1005,7 +1130,7 @@ func (target *Breakdown) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *Cloud) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	// --- [begin][write][alias](AssetLabels) ---
 	if map[string]string(target.labels) == nil {
@@ -1078,8 +1203,8 @@ func (target *Cloud) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling Cloud. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Cloud. Expected %d, got %d", CodecVersion, version)
 	}
 
 	// --- [begin][read][alias](AssetLabels) ---
@@ -1173,7 +1298,7 @@ func (target *Cloud) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *ClusterManagement) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	// --- [begin][write][alias](AssetLabels) ---
 	if map[string]string(target.labels) == nil {
@@ -1227,8 +1352,8 @@ func (target *ClusterManagement) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling ClusterManagement. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling ClusterManagement. Expected %d, got %d", CodecVersion, version)
 	}
 
 	// --- [begin][read][alias](AssetLabels) ---
@@ -1297,7 +1422,7 @@ func (target *ClusterManagement) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *Disk) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	// --- [begin][write][alias](AssetLabels) ---
 	if map[string]string(target.labels) == nil {
@@ -1387,8 +1512,8 @@ func (target *Disk) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling Disk. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Disk. Expected %d, got %d", CodecVersion, version)
 	}
 
 	// --- [begin][read][alias](AssetLabels) ---
@@ -1503,7 +1628,7 @@ func (target *Disk) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *LoadBalancer) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.properties == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -1576,8 +1701,8 @@ func (target *LoadBalancer) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling LoadBalancer. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling LoadBalancer. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {
@@ -1671,7 +1796,7 @@ func (target *LoadBalancer) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *Network) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.properties == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -1744,8 +1869,8 @@ func (target *Network) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling Network. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Network. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {
@@ -1839,7 +1964,7 @@ func (target *Network) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *Node) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.properties == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -1949,8 +2074,8 @@ func (target *Node) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling Node. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Node. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {
@@ -2095,7 +2220,7 @@ func (target *Node) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *SharedAsset) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.properties == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -2149,8 +2274,8 @@ func (target *SharedAsset) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling SharedAsset. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling SharedAsset. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {
@@ -2219,7 +2344,7 @@ func (target *SharedAsset) UnmarshalBinary(data []byte) error {
 // into a byte array
 func (target *Window) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	if target.start == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -2261,8 +2386,8 @@ func (target *Window) UnmarshalBinary(data []byte) error {
 
 	// Codec Version Check
 	version := buff.ReadUInt8()
-	if version != 3 {
-		return fmt.Errorf("Invalid Version Unmarshaling Window. Expected 3, got %d", version)
+	if version != CodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Window. Expected %d, got %d", CodecVersion, version)
 	}
 
 	if buff.ReadUInt8() == uint8(0) {

+ 63 - 5
pkg/kubecost/properties.go

@@ -5,7 +5,7 @@ import (
 	"sort"
 	"strings"
 
-	"github.com/kubecost/cost-model/pkg/util"
+	util "github.com/kubecost/cost-model/pkg/util"
 )
 
 type Property string
@@ -18,6 +18,7 @@ const (
 	ControllerProp     Property = "controller"
 	ControllerKindProp Property = "controllerKind"
 	LabelProp          Property = "label"
+	AnnotationProp     Property = "annotation"
 	NamespaceProp      Property = "namespace"
 	PodProp            Property = "pod"
 	ServiceProp        Property = "service"
@@ -31,6 +32,7 @@ var availableProperties []Property = []Property{
 	ControllerProp,
 	ControllerKindProp,
 	LabelProp,
+	AnnotationProp,
 	NamespaceProp,
 	PodProp,
 	ServiceProp,
@@ -133,6 +135,18 @@ func (p *Properties) Equal(that *Properties) bool {
 		return false
 	}
 
+	pAnnotations, _ := p.GetAnnotations()
+	thatAnnotations, _ := that.GetAnnotations()
+	if len(pAnnotations) != len(thatAnnotations) {
+		for k, pv := range pAnnotations {
+			tv, ok := thatAnnotations[k]
+			if !ok || tv != pv {
+				return false
+			}
+		}
+		return false
+	}
+
 	pServices, _ := p.GetServices()
 	thatServices, _ := that.GetServices()
 	if len(pServices) != len(thatServices) {
@@ -195,7 +209,7 @@ func (p *Properties) Intersection(that Properties) Properties {
 		spec.SetPod(sPod)
 	}
 
-	// TODO niko/etl intersection of services and labels
+	// TODO niko/etl intersection of services and labels and annotations
 
 	return *spec
 }
@@ -456,6 +470,25 @@ func (p *Properties) SetLabels(labels map[string]string) {
 	(*p)[LabelProp] = labels
 }
 
+func (p *Properties) GetAnnotations() (map[string]string, error) {
+	if raw, ok := (*p)[AnnotationProp]; ok {
+		if annotations, ok := raw.(map[string]string); ok {
+			return annotations, nil
+		}
+		return map[string]string{}, fmt.Errorf("AnnotationProp is not a map[string]string")
+	}
+	return map[string]string{}, fmt.Errorf("AnnotationProp not set")
+}
+
+func (p *Properties) HasAnnotations() bool {
+	_, ok := (*p)[AnnotationProp]
+	return ok
+}
+
+func (p *Properties) SetAnnotations(annotations map[string]string) {
+	(*p)[AnnotationProp] = annotations
+}
+
 func (p *Properties) GetNamespace() (string, error) {
 	if raw, ok := (*p)[NamespaceProp]; ok {
 		if namespace, ok := raw.(string); ok {
@@ -515,7 +548,7 @@ func (p *Properties) SetServices(services []string) {
 
 func (p *Properties) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
-	buff.WriteUInt8(3) // version
+	buff.WriteUInt8(CodecVersion) // version
 
 	// ClusterProp
 	cluster, err := p.GetCluster()
@@ -593,6 +626,19 @@ func (p *Properties) MarshalBinary() (data []byte, err error) {
 		}
 	}
 
+	// AnnotationProp
+	annotations, err := p.GetAnnotations()
+	if err != nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1))       // write non-nil byte
+		buff.WriteInt(len(annotations)) // map length
+		for k, v := range annotations {
+			buff.WriteString(k) // write string
+			buff.WriteString(v) // write string
+		}
+	}
+
 	// ServiceProp
 	services, err := p.GetServices()
 	if err != nil {
@@ -611,8 +657,8 @@ func (p *Properties) MarshalBinary() (data []byte, err error) {
 func (p *Properties) UnmarshalBinary(data []byte) error {
 	buff := util.NewBufferFromBytes(data)
 	v := buff.ReadUInt8() // version
-	if v != 3 {
-		return fmt.Errorf("Invalid Version. Expected 3, got %d", v)
+	if v != CodecVersion {
+		return fmt.Errorf("Invalid Version. Expected %d, got %d", CodecVersion, v)
 	}
 
 	*p = Properties{}
@@ -671,6 +717,18 @@ func (p *Properties) UnmarshalBinary(data []byte) error {
 		p.SetLabels(labels)
 	}
 
+	// AnnotationProp
+	if buff.ReadUInt8() == 1 { // read nil byte
+		annotations := map[string]string{}
+		length := buff.ReadInt() // read map len
+		for idx := 0; idx < length; idx++ {
+			key := buff.ReadString()
+			val := buff.ReadString()
+			annotations[key] = val
+		}
+		p.SetAnnotations(annotations)
+	}
+
 	// ServiceProp
 	if buff.ReadUInt8() == 1 { // read nil byte
 		services := []string{}

+ 41 - 0
pkg/prom/metrics.go

@@ -4,9 +4,13 @@ import (
 	"encoding/json"
 	"fmt"
 	"reflect"
+	"regexp"
+	"sort"
 	"strings"
 )
 
+var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
+
 // AnyToLabels will create prometheus labels based on the fields of the interface
 // passed. Note that this method is quite expensive and should only be used when absolutely
 // necessary.
@@ -67,3 +71,40 @@ func LabelNamesFrom(labels map[string]string) []string {
 	}
 	return keys
 }
+
+// Converts kubernetes labels into prometheus labels.
+func KubeLabelsToLabels(labels map[string]string) ([]string, []string) {
+	labelKeys := make([]string, 0, len(labels))
+	for k := range labels {
+		labelKeys = append(labelKeys, k)
+	}
+	sort.Strings(labelKeys)
+
+	labelValues := make([]string, 0, len(labels))
+	for i, k := range labelKeys {
+		labelKeys[i] = "label_" + SanitizeLabelName(k)
+		labelValues = append(labelValues, labels[k])
+	}
+	return labelKeys, labelValues
+}
+
+// Converts kubernetes annotations into prometheus labels.
+func KubeAnnotationsToLabels(labels map[string]string) ([]string, []string) {
+	labelKeys := make([]string, 0, len(labels))
+	for k := range labels {
+		labelKeys = append(labelKeys, k)
+	}
+	sort.Strings(labelKeys)
+
+	labelValues := make([]string, 0, len(labels))
+	for i, k := range labelKeys {
+		labelKeys[i] = "annotation_" + SanitizeLabelName(k)
+		labelValues = append(labelValues, labels[k])
+	}
+	return labelKeys, labelValues
+}
+
+// Replaces all illegal prometheus label characters with _
+func SanitizeLabelName(s string) string {
+	return invalidLabelCharRE.ReplaceAllString(s, "_")
+}