Просмотр исходного кода

Merge pull request #657 from kubecost/sean/agg-on-annotations

Sean/agg on annotations
Sean Holcomb 5 лет назад
Родитель
Сommit
c209408ac8

+ 105 - 13
pkg/costmodel/aggregation.go

@@ -375,6 +375,20 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 				if !found {
 					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
 				}
+			} else if field == "annotation" {
+				found := false
+				if costDatum.Annotations != nil {
+					for _, sf := range subfields {
+						if subfieldName, ok := costDatum.Annotations[sf]; ok {
+							aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
+							found = true
+							break
+						}
+					}
+				}
+				if !found {
+					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
+				}
 			} else if field == "pod" {
 				aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, customDiscount, idleCoefficient, false)
 			} else if field == "container" {
@@ -585,6 +599,7 @@ func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, cos
 				props.SetControllerKind(kind)
 			}
 			props.SetLabels(costDatum.Labels)
+			props.SetAnnotations(costDatum.Annotations)
 			props.SetNamespace(costDatum.Namespace)
 			props.SetPod(costDatum.PodName)
 			props.SetServices(costDatum.Services)
@@ -1127,6 +1142,14 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 					}
 				}
 			}
+		} else if field == "annotation" {
+			if costDatum.Annotations != nil {
+				for _, sf := range subfields {
+					if subfieldName, ok := costDatum.Annotations[sf]; ok {
+						return fmt.Sprintf("%s=%s", sf, subfieldName)
+					}
+				}
+			}
 		} else if field == "pod" {
 			return costDatum.Namespace + "/" + costDatum.PodName
 		} else if field == "container" {
@@ -1248,15 +1271,15 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 					ae := aggregateEnvironment(cd)
 					for _, v := range vs {
 						if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
-							if _, ok := cd.Labels[label]; !ok {
+							if _, ok := cd.Labels[l]; !ok {
 								return true, ae
 							}
 						}
-						if cd.Labels[label] == v {
+						if cd.Labels[l] == v {
 							return true, ae
 						} else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
 							vTrim := strings.TrimSuffix(v, "*")
-							if strings.HasPrefix(cd.Labels[label], vTrim) {
+							if strings.HasPrefix(cd.Labels[l], vTrim) {
 								return true, ae
 							}
 						}
@@ -1268,6 +1291,55 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 		}
 	}
 
+	if filters["annotations"] != "" {
+		// annotations are expected to be comma-separated and to take the form key=value
+		// e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
+		// each different annotation will be applied as an AND
+		// multiple values for a single annotation will be evaluated as an OR
+		annotationValues := map[string][]string{}
+		as := strings.Split(filters["annotations"], ",")
+		for _, annot := range as {
+			aTrim := strings.TrimSpace(annot)
+			annotation := strings.Split(aTrim, "=")
+			if len(annotation) == 2 {
+				an := prom.SanitizeLabelName(strings.TrimSpace(annotation[0]))
+				av := strings.TrimSpace(annotation[1])
+				annotationValues[an] = append(annotationValues[an], av)
+			} else {
+				// annotation is not of the form name=value, so log it and move on
+				log.Warningf("ComputeAggregateCostModel: skipping illegal annotation filter: %s", annot)
+			}
+		}
+
+		// Generate FilterFunc for each set of annotation filters by invoking a function instead of accessing
+		// values by closure to prevent reference-type looping bug.
+		// (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
+		for annotation, values := range annotationValues {
+			ff := (func(l string, vs []string) FilterFunc {
+				return func(cd *CostData) (bool, string) {
+					ae := aggregateEnvironment(cd)
+					for _, v := range vs {
+						if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
+							if _, ok := cd.Annotations[l]; !ok {
+								return true, ae
+							}
+						}
+						if cd.Annotations[l] == v {
+							return true, ae
+						} else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
+							vTrim := strings.TrimSuffix(v, "*")
+							if strings.HasPrefix(cd.Annotations[l], vTrim) {
+								return true, ae
+							}
+						}
+					}
+					return false, ae
+				}
+			})(annotation, values)
+			filterFuncs = append(filterFuncs, ff)
+		}
+	}
+
 	// clear cache prior to checking the cache so that a clearCache=true
 	// request always returns a freshly computed value
 	if clearCache {
@@ -1619,7 +1691,28 @@ func GenerateAggKey(window kubecost.Window, field string, subfields []string, op
 	sort.Strings(lFilters)
 	lFilterStr := strings.Join(lFilters, ",")
 
-	filterStr := fmt.Sprintf("%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, podPrefixFiltersStr)
+	// parse, trim, and sort annotation filters
+	aFilters := []string{}
+	if afs, ok := opts.Filters["annotations"]; ok && afs != "" {
+		for _, af := range strings.Split(afs, ",") {
+			// trim whitespace from the annotation name and the annotation value
+			// of each annotation name/value pair, then reconstruct
+			// e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
+			afa := strings.Split(af, "=")
+			if len(afa) == 2 {
+				afn := strings.TrimSpace(afa[0])
+				afv := strings.TrimSpace(afa[1])
+				aFilters = append(aFilters, fmt.Sprintf("%s=%s", afn, afv))
+			} else {
+				// annotation is not of the form name=value, so log it and move on
+				klog.V(2).Infof("[Warning] GenerateAggKey: skipping illegal annotation filter: %s", af)
+			}
+		}
+	}
+	sort.Strings(aFilters)
+	aFilterStr := strings.Join(aFilters, ",")
+
+	filterStr := fmt.Sprintf("%s:%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, aFilterStr, podPrefixFiltersStr)
 
 	sort.Strings(subfields)
 	fieldStr := fmt.Sprintf("%s:%s", field, strings.Join(subfields, ","))
@@ -1858,10 +1951,8 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	namespace := r.URL.Query().Get("namespace")
 	cluster := r.URL.Query().Get("cluster")
 	labels := r.URL.Query().Get("labels")
+	annotations := r.URL.Query().Get("annotations")
 	podprefix := r.URL.Query().Get("podprefix")
-	labelArray := strings.Split(labels, "=")
-	labelArray[0] = strings.ReplaceAll(labelArray[0], "-", "_")
-	labels = strings.Join(labelArray, "=")
 	field := r.URL.Query().Get("aggregation")
 	sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
 	sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
@@ -1920,7 +2011,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	}
 
 	// aggregation subfield is required when aggregation field is "label"
-	if field == "label" && len(subfields) == 0 {
+	if (field == "label" || field == "annotation") && len(subfields) == 0 {
 		WriteError(w, BadRequest("Missing aggregation field parameter"))
 		return
 	}
@@ -1936,10 +2027,11 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	// labels are expected to be comma-separated and to take the form key=value
 	// e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
 	opts.Filters = map[string]string{
-		"namespace": namespace,
-		"cluster":   cluster,
-		"labels":    labels,
-		"podprefix": podprefix,
+		"namespace":   namespace,
+		"cluster":     cluster,
+		"labels":      labels,
+		"annotations": annotations,
+		"podprefix":   podprefix,
 	}
 
 	// parse shared resources
@@ -1953,7 +2045,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 		sln = strings.Split(sharedLabelNames, ",")
 		slv = strings.Split(sharedLabelValues, ",")
 		if len(sln) != len(slv) || slv[0] == "" {
-			WriteError(w, BadRequest("Supply exacly one shared label value per shared label name"))
+			WriteError(w, BadRequest("Supply exactly one shared label value per shared label name"))
 			return
 		}
 	}

+ 94 - 9
pkg/costmodel/costmodel.go

@@ -85,6 +85,7 @@ type CostData struct {
 	GPUReq          []*util.Vector               `json:"gpureq,omitempty"`
 	PVCData         []*PersistentVolumeClaimData `json:"pvcData,omitempty"`
 	NetworkData     []*util.Vector               `json:"network,omitempty"`
+	Annotations     map[string]string            `json:"annotations,omitempty"`
 	Labels          map[string]string            `json:"labels,omitempty"`
 	NamespaceLabels map[string]string            `json:"namespaceLabels,omitempty"`
 	ClusterID       string                       `json:"clusterId"`
@@ -211,6 +212,8 @@ const (
 	queryPVHourlyCostFmt      = `avg_over_time(pv_hourly_cost[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
+	queryNSAnnotations        = `avg_over_time(kube_namespace_annotations[%s])`
+	queryPodAnnotations       = `avg_over_time(kube_pod_annotations[%s])`
 	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
 	queryStatefulsetLabels    = `avg_over_time(statefulSet_match_labels[%s])`
 	queryPodDaemonsets        = `sum(kube_pod_owner{owner_kind="DaemonSet"}) by (namespace,pod,owner_name,cluster_id)`
@@ -268,6 +271,11 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 		return nil, err
 	}
 
+	namespaceAnnotationsMapping, err := getNamespaceAnnotations(cm.Cache, clusterID)
+	if err != nil {
+		return nil, err
+	}
+
 	// Process Prometheus query results. Handle errors using ctx.Errors.
 	resRAMRequests, _ := resChRAMRequests.Await()
 	resRAMUsage, _ := resChRAMUsage.Await()
@@ -409,6 +417,18 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 				}
 			}
 
+			nsAnnotations := namespaceAnnotationsMapping[ns+","+clusterID]
+			podAnnotations := pod.GetObjectMeta().GetAnnotations()
+			if podAnnotations == nil {
+				podAnnotations = make(map[string]string)
+			}
+
+			for k, v := range nsAnnotations {
+				if _, ok := podAnnotations[k]; !ok {
+					podAnnotations[k] = v
+				}
+			}
+
 			nodeName := pod.Spec.NodeName
 			var nodeData *costAnalyzerCloud.Node
 			if _, ok := nodes[nodeName]; ok {
@@ -518,6 +538,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 					GPUReq:          GPUReqV,
 					PVCData:         pvReq,
 					NetworkData:     netReq,
+					Annotations:     podAnnotations,
 					Labels:          podLabels,
 					NamespaceLabels: nsLabels,
 					ClusterID:       clusterID,
@@ -579,6 +600,11 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 				klog.V(3).Infof("Missing data for namespace %s", c.Namespace)
 			}
 
+			namespaceAnnotations, ok := namespaceAnnotationsMapping[c.Namespace+","+c.ClusterID]
+			if !ok {
+				klog.V(3).Infof("Missing data for namespace %s", c.Namespace)
+			}
+
 			costs := &CostData{
 				Name:            c.ContainerName,
 				PodName:         c.PodName,
@@ -590,6 +616,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 				CPUReq:          CPUReqV,
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
+				Annotations:     namespaceAnnotations,
 				NamespaceLabels: namespacelabels,
 				ClusterID:       c.ClusterID,
 				ClusterName:     cm.ClusterMap.NameFor(c.ClusterID),
@@ -607,7 +634,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 	}
 	// Use unmounted pvs to create a mapping of "Unmounted-<Namespace>" containers
 	// to pass along the cost data
-	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping)
+	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping, namespaceAnnotationsMapping)
 	for k, costs := range unmounted {
 		klog.V(4).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
 
@@ -630,7 +657,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 	return containerNameCost, err
 }
 
-func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string) map[string]*CostData {
+func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string, namespaceAnnotationsMapping map[string]map[string]string) map[string]*CostData {
 	costs := make(map[string]*CostData)
 	if len(unmountedPVs) == 0 {
 		return costs
@@ -650,6 +677,11 @@ func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[st
 			klog.V(3).Infof("Missing data for namespace %s", ns)
 		}
 
+		namespaceAnnotations, ok := namespaceAnnotationsMapping[ns+","+clusterID]
+		if !ok {
+			klog.V(3).Infof("Missing data for namespace %s", ns)
+		}
+
 		// Should be a unique "Unmounted" cost data type
 		name := "unmounted-pvs"
 
@@ -661,6 +693,7 @@ func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[st
 				Name:            name,
 				PodName:         name,
 				NodeName:        "",
+				Annotations:     namespaceAnnotations,
 				Namespace:       ns,
 				NamespaceLabels: namespacelabels,
 				Labels:          namespacelabels,
@@ -1568,6 +1601,8 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 	resChNetInternetRequests := ctx.QueryRange(queryNetInternetRequests, start, end, resolution)
 	resChNSLabels := ctx.QueryRange(fmt.Sprintf(queryNSLabels, resStr), start, end, resolution)
 	resChPodLabels := ctx.QueryRange(fmt.Sprintf(queryPodLabels, resStr), start, end, resolution)
+	resChNSAnnotations := ctx.QueryRange(fmt.Sprintf(queryNSAnnotations, resStr), start, end, resolution)
+	resChPodAnnotations := ctx.QueryRange(fmt.Sprintf(queryPodAnnotations, resStr), start, end, resolution)
 	resChServiceLabels := ctx.QueryRange(fmt.Sprintf(queryServiceLabels, resStr), start, end, resolution)
 	resChDeploymentLabels := ctx.QueryRange(fmt.Sprintf(queryDeploymentLabels, resStr), start, end, resolution)
 	resChStatefulsetLabels := ctx.QueryRange(fmt.Sprintf(queryStatefulsetLabels, resStr), start, end, resolution)
@@ -1598,6 +1633,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		return nil, fmt.Errorf("error querying the kubernetes API: %s", err)
 	}
 
+	namespaceAnnotationsMapping, err := getNamespaceAnnotations(cm.Cache, clusterID)
+	if err != nil {
+		return nil, fmt.Errorf("error querying the kubernetes API: %s", err)
+	}
+
 	// Process query results. Handle errors afterwards using ctx.Errors.
 	resRAMRequests, _ := resChRAMRequests.Await()
 	resRAMUsage, _ := resChRAMUsage.Await()
@@ -1614,6 +1654,8 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 	resNetInternetRequests, _ := resChNetInternetRequests.Await()
 	resNSLabels, _ := resChNSLabels.Await()
 	resPodLabels, _ := resChPodLabels.Await()
+	resNSAnnotations, _ := resChNSAnnotations.Await()
+	resPodAnnotations, _ := resChPodAnnotations.Await()
 	resServiceLabels, _ := resChServiceLabels.Await()
 	resDeploymentLabels, _ := resChDeploymentLabels.Await()
 	resStatefulsetLabels, _ := resChStatefulsetLabels.Await()
@@ -1679,7 +1721,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		klog.V(1).Infof("Unable to get Namespace Labels for Metrics: %s", err.Error())
 	}
 	if nsLabels != nil {
-		appendNamespaceLabels(namespaceLabelsMapping, nsLabels)
+		mergeStringMap(namespaceLabelsMapping, nsLabels)
 	}
 
 	podLabels, err := GetPodLabelsMetrics(resPodLabels, clusterID)
@@ -1687,6 +1729,19 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		klog.V(1).Infof("Unable to get Pod Labels for Metrics: %s", err.Error())
 	}
 
+	nsAnnotations, err := GetNamespaceAnnotationsMetrics(resNSAnnotations, clusterID)
+	if err != nil {
+		klog.V(1).Infof("Unable to get Namespace Annotations for Metrics: %s", err.Error())
+	}
+	if nsAnnotations != nil {
+		mergeStringMap(namespaceAnnotationsMapping, nsAnnotations)
+	}
+
+	podAnnotations, err := GetPodAnnotationsMetrics(resPodAnnotations, clusterID)
+	if err != nil {
+		klog.V(1).Infof("Unable to get Pod Annotations for Metrics: %s", err.Error())
+	}
+
 	serviceLabels, err := GetServiceSelectorLabelsMetrics(resServiceLabels, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Service Selector Labels for Metrics: %s", err.Error())
@@ -1874,6 +1929,22 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 			}
 		}
 
+		namespaceAnnotations, ok := namespaceAnnotationsMapping[nsKey]
+		if !ok {
+			klog.V(4).Infof("Missing data for namespace %s", c.Namespace)
+		}
+
+		pAnnotations := podAnnotations[podKey]
+		if pAnnotations == nil {
+			pAnnotations = make(map[string]string)
+		}
+
+		for k, v := range namespaceAnnotations {
+			if _, ok := pAnnotations[k]; !ok {
+				pAnnotations[k] = v
+			}
+		}
+
 		var podDeployments []string
 		if _, ok := podDeploymentsMapping[nsKey]; ok {
 			if ds, ok := podDeploymentsMapping[nsKey][c.PodName]; ok {
@@ -1965,6 +2036,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 			RAMAllocation:   RAMAllocsV,
 			CPUAllocation:   CPUAllocsV,
 			GPUReq:          GPUReqV,
+			Annotations:     pAnnotations,
 			Labels:          pLabels,
 			NamespaceLabels: namespaceLabels,
 			PVCData:         podPVs,
@@ -1979,7 +2051,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		}
 	}
 
-	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping)
+	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping, namespaceAnnotationsMapping)
 	for k, costs := range unmounted {
 		klog.V(4).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
 
@@ -2056,11 +2128,11 @@ func addMetricPVData(pvAllocationMap map[string][]*PersistentVolumeClaimData, pv
 	}
 }
 
-// Append labels into nsLabels iff the ns key doesn't already exist
-func appendNamespaceLabels(nsLabels map[string]map[string]string, labels map[string]map[string]string) {
-	for k, v := range labels {
-		if _, ok := nsLabels[k]; !ok {
-			nsLabels[k] = v
+// Add values that don't already exist in origMap from mergeMap into origMap
+func mergeStringMap(origMap map[string]map[string]string, mergeMap map[string]map[string]string) {
+	for k, v := range mergeMap {
+		if _, ok := origMap[k]; !ok {
+			origMap[k] = v
 		}
 	}
 }
@@ -2084,6 +2156,19 @@ func getNamespaceLabels(cache clustercache.ClusterCache, clusterID string) (map[
 	return nsToLabels, nil
 }
 
+func getNamespaceAnnotations(cache clustercache.ClusterCache, clusterID string) (map[string]map[string]string, error) {
+	nsToAnnotations := make(map[string]map[string]string)
+	nss := cache.GetAllNamespaces()
+	for _, ns := range nss {
+		annotations := make(map[string]string)
+		for k, v := range ns.Annotations {
+			annotations[prom.SanitizeLabelName(k)] = v
+		}
+		nsToAnnotations[ns.Name+","+clusterID] = annotations
+	}
+	return nsToAnnotations, nil
+}
+
 func getDaemonsetsOfPod(pod v1.Pod) []string {
 	for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
 		if ownerReference.Kind == "DaemonSet" {

+ 60 - 0
pkg/costmodel/promparsers.go

@@ -210,6 +210,66 @@ func GetPodLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[
 	return toReturn, nil
 }
 
+func GetNamespaceAnnotationsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+
+	for _, val := range qrs {
+		// We want Namespace and ClusterID for key generation purposes
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		nsKey := ns + "," + clusterID
+		if nsAnnotations, ok := toReturn[nsKey]; ok {
+			for k, v := range val.GetAnnotations() {
+				nsAnnotations[k] = v // override with more recently assigned if we changed labels within the window.
+			}
+		} else {
+			toReturn[nsKey] = val.GetAnnotations()
+		}
+	}
+	return toReturn, nil
+}
+
+func GetPodAnnotationsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+
+	for _, val := range qrs {
+		// We want Pod, Namespace and ClusterID for key generation purposes
+		pod, err := val.GetString("pod")
+		if err != nil {
+			return toReturn, err
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		nsKey := ns + "," + pod + "," + clusterID
+		if labels, ok := toReturn[nsKey]; ok {
+			for k, v := range val.GetAnnotations() {
+				labels[k] = v
+			}
+		} else {
+			toReturn[nsKey] = val.GetAnnotations()
+		}
+	}
+
+	return toReturn, nil
+}
+
 func GetStatefulsetMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 

+ 36 - 0
pkg/kubecost/allocation.go

@@ -959,6 +959,42 @@ func (alloc *Allocation) generateKey(properties Properties) (string, error) {
 		}
 	}
 
+	if properties.HasAnnotations() {
+		annotations, err := alloc.Properties.GetAnnotations() // annotations that the individual allocation possesses
+		if err != nil {
+			// Indicate that allocation has no annotations
+			names = append(names, UnallocatedSuffix)
+		} else {
+			annotationNames := []string{}
+
+			aggAnnotations, err := properties.GetAnnotations() // potential annotations to aggregate on supplied by the API caller
+			if err != nil {
+				// We've already checked HasAnnotation, so this should never occur
+				return "", err
+			}
+			// calvin - support multi-annotation aggregation
+			for annotationName := range aggAnnotations {
+				if val, ok := annotations[annotationName]; ok {
+					annotationNames = append(annotationNames, fmt.Sprintf("%s=%s", annotationName, val))
+				} else if indexOf(UnallocatedSuffix, annotationNames) == -1 { // if UnallocatedSuffix not already in names
+					annotationNames = append(annotationNames, UnallocatedSuffix)
+				}
+			}
+			// resolve arbitrary ordering. e.g., app=app0/env=env0 is the same agg as env=env0/app=app0
+			if len(annotationNames) > 1 {
+				sort.Strings(annotationNames)
+			}
+			unallocatedSuffixIndex := indexOf(UnallocatedSuffix, annotationNames)
+			// suffix should be at index 0 if it exists b/c of underscores
+			if unallocatedSuffixIndex != -1 {
+				annotationNames = append(annotationNames[:unallocatedSuffixIndex], annotationNames[unallocatedSuffixIndex+1:]...)
+				annotationNames = append(annotationNames, UnallocatedSuffix) // append to end
+			}
+
+			names = append(names, annotationNames...)
+		}
+	}
+
 	if properties.HasLabel() {
 		labels, err := alloc.Properties.GetLabels() // labels that the individual allocation possesses
 		if err != nil {

+ 39 - 4
pkg/kubecost/allocation_test.go

@@ -208,7 +208,7 @@ func generateAllocationSet(start time.Time) *AllocationSet {
 	// Idle allocations
 	a1i := NewUnitAllocation(fmt.Sprintf("cluster1/%s", IdleSuffix), start, day, &Properties{
 		ClusterProp: "cluster1",
-		NodeProp: "node1",
+		NodeProp:    "node1",
 	})
 	a1i.CPUCost = 5.0
 	a1i.RAMCost = 15.0
@@ -347,6 +347,11 @@ func generateAllocationSet(start time.Time) *AllocationSet {
 	a22mno4.Properties.SetLabels(map[string]string{"app": "app2"})
 	a22mno5.Properties.SetLabels(map[string]string{"app": "app2"})
 
+	//Annotations
+	a23stu7.Properties.SetAnnotations(map[string]string{"team": "team1"})
+	a23vwx8.Properties.SetAnnotations(map[string]string{"team": "team2"})
+	a23vwx9.Properties.SetAnnotations(map[string]string{"team": "team1"})
+
 	// Services
 
 	a12jkl6.Properties.SetServices([]string{"service1"})
@@ -445,10 +450,10 @@ func TestAllocationSet_AggregateBy(t *testing.T) {
 	//         container6: {service1}              5.00   1.00   1.00   1.00   1.00   1.00
 	//     namespace3:
 	//       pod-stu: (deployment3)
-	//         container7:                         5.00   1.00   1.00   1.00   1.00   1.00
+	//         container7: an[team=team1]          5.00   1.00   1.00   1.00   1.00   1.00
 	//       pod-vwx: (statefulset1)
-	//         container8:                         5.00   1.00   1.00   1.00   1.00   1.00
-	//         container9:                         5.00   1.00   1.00   1.00   1.00   1.00
+	//         container8: an[team=team2]          5.00   1.00   1.00   1.00   1.00   1.00
+	//         container9: an[team=team1]          5.00   1.00   1.00   1.00   1.00   1.00
 	// +----------------------------------------+------+------+------+------+------+------+
 	//   cluster2 subtotal                        40.00  11.00  11.00   6.00   6.00   6.00
 	// +----------------------------------------+------+------+------+------+------+------+
@@ -669,6 +674,18 @@ func TestAllocationSet_AggregateBy(t *testing.T) {
 	})
 	assertAllocationWindow(t, as, "1i", startYesterday, endYesterday, 1440.0)
 
+	// 1j AggregationProperties=(Annotation:team)
+	as = generateAllocationSet(start)
+	err = as.AggregateBy(Properties{AnnotationProp: map[string]string{"team": ""}}, nil)
+	assertAllocationSetTotals(t, as, "1j", err, 2+numIdle+numUnallocated, activeTotalCost+idleTotalCost)
+	assertAllocationTotals(t, as, "1j", map[string]float64{
+		"team=team1":      10.00,
+		"team=team2":      5.00,
+		IdleSuffix:        30.00,
+		UnallocatedSuffix: 55.00,
+	})
+	assertAllocationWindow(t, as, "1i", startYesterday, endYesterday, 1440.0)
+
 	// 2  Multi-aggregation
 
 	// 2a AggregationProperties=(Cluster, Namespace)
@@ -701,6 +718,24 @@ func TestAllocationSet_AggregateBy(t *testing.T) {
 		"cluster2/" + UnallocatedSuffix:          20.00,
 	})
 
+	// 2f AggregationProperties=(annotation:team, pod)
+	as = generateAllocationSet(start)
+	err = as.AggregateBy(Properties{AnnotationProp: map[string]string{"team": ""}, PodProp: ""}, nil)
+	assertAllocationSetTotals(t, as, "2e", err, 11, activeTotalCost+idleTotalCost)
+	assertAllocationTotals(t, as, "2e", map[string]float64{
+		"pod-jkl/" + UnallocatedSuffix: 5.00,
+		"pod-stu/team=team1":           5.00,
+		"pod-abc/" + UnallocatedSuffix: 5.00,
+		"pod-pqr/" + UnallocatedSuffix: 5.00,
+		"pod-def/" + UnallocatedSuffix: 5.00,
+		"pod-vwx/team=team1":           5.00,
+		"pod-vwx/team=team2":           5.00,
+		"pod1/" + UnallocatedSuffix:    15.00,
+		"pod-mno/" + UnallocatedSuffix: 10.00,
+		"pod-ghi/" + UnallocatedSuffix: 10.00,
+		IdleSuffix:                     30.00,
+	})
+
 	// // TODO niko/etl
 
 	// // 3  Share idle

+ 1 - 0
pkg/kubecost/properties.go

@@ -222,6 +222,7 @@ func (p *Properties) Length() int {
 	return len(*p)
 }
 
+// TODO: deprecate
 func (p *Properties) Matches(that Properties) bool {
 	// The only Properties that a nil Properties matches is an empty one
 	if p == nil {

+ 24 - 1
pkg/prom/result.go

@@ -240,7 +240,7 @@ func (qr *QueryResult) GetLabels() map[string]string {
 			continue
 		}
 
-		label := k[6:]
+		label := strings.TrimPrefix(k, "label_")
 		value, ok := v.(string)
 		if !ok {
 			log.Warningf("Failed to parse label value for label: '%s'", label)
@@ -253,6 +253,29 @@ func (qr *QueryResult) GetLabels() map[string]string {
 	return result
 }
 
+// GetAnnotations returns all annotations and their values from the query result
+func (qr *QueryResult) GetAnnotations() map[string]string {
+	result := make(map[string]string)
+
+	// Find All keys with prefix annotation_, remove prefix, add to annotations
+	for k, v := range qr.Metric {
+		if !strings.HasPrefix(k, "annotation_") {
+			continue
+		}
+
+		annotations := strings.TrimPrefix(k, "annotation_")
+		value, ok := v.(string)
+		if !ok {
+			log.Warningf("Failed to parse label value for label: '%s'", annotations)
+			continue
+		}
+
+		result[annotations] = value
+	}
+
+	return result
+}
+
 // parseDataPoint parses a data point from raw prometheus query results and returns
 // a new Vector instance containing the parsed data along with any warnings or errors.
 func parseDataPoint(query string, dataPoint interface{}) (*util.Vector, warning, error) {