Просмотр исходного кода

Merge pull request #662 from kubecost/neal/aggregate-by-label

Aggregation by label
Neal Ormsbee 5 лет назад
Родитель
Сommit
9877fcf40b
6 измененных файлов с 330 добавлено и 208 удалено
  1. 35 4
      pkg/costmodel/cluster.go
  2. 165 80
      pkg/kubecost/asset.go
  3. 48 34
      pkg/kubecost/asset_test.go
  4. 1 1
      pkg/kubecost/assetprops.go
  5. 1 1
      pkg/kubecost/bingen.go
  6. 80 88
      pkg/kubecost/kubecost_codecs.go

+ 35 - 4
pkg/costmodel/cluster.go

@@ -391,6 +391,7 @@ type Node struct {
 	Start        time.Time
 	Start        time.Time
 	End          time.Time
 	End          time.Time
 	Minutes      float64
 	Minutes      float64
+	Labels       map[string]string
 }
 }
 
 
 var partialCPUMap = map[string]float64{
 var partialCPUMap = map[string]float64{
@@ -425,11 +426,12 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost * %d.0 / 60.0) by (cluster_id, node, provider_id))[%s:%dm]%s)`, minsPerResolution, durationStr, minsPerResolution, offsetStr)
 	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost * %d.0 / 60.0) by (cluster_id, node, provider_id))[%s:%dm]%s)`, minsPerResolution, durationStr, minsPerResolution, offsetStr)
-	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node,cluster_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node,cluster_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 
 
 	// Return errors if these fail
 	// Return errors if these fail
 	resChNodeCPUCost := requiredCtx.Query(queryNodeCPUCost)
 	resChNodeCPUCost := requiredCtx.Query(queryNodeCPUCost)
@@ -437,24 +439,26 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resChNodeRAMCost := requiredCtx.Query(queryNodeRAMCost)
 	resChNodeRAMCost := requiredCtx.Query(queryNodeRAMCost)
 	resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
 	resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
 	resChNodeGPUCost := requiredCtx.Query(queryNodeGPUCost)
 	resChNodeGPUCost := requiredCtx.Query(queryNodeGPUCost)
-	resChNodeLabels := requiredCtx.Query(queryNodeLabels)
 	resChActiveMins := requiredCtx.Query(queryActiveMins)
 	resChActiveMins := requiredCtx.Query(queryActiveMins)
+	resChIsSpot := requiredCtx.Query(queryIsSpot)
 
 
 	// Do not return errors if these fail, but log warnings
 	// Do not return errors if these fail, but log warnings
 	resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
 	resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
 	resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
 	resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
 	resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
 	resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
+	resChLabels := optionalCtx.Query(queryLabels)
 
 
 	resNodeCPUCost, _ := resChNodeCPUCost.Await()
 	resNodeCPUCost, _ := resChNodeCPUCost.Await()
 	resNodeCPUCores, _ := resChNodeCPUCores.Await()
 	resNodeCPUCores, _ := resChNodeCPUCores.Await()
 	resNodeGPUCost, _ := resChNodeGPUCost.Await()
 	resNodeGPUCost, _ := resChNodeGPUCost.Await()
 	resNodeRAMCost, _ := resChNodeRAMCost.Await()
 	resNodeRAMCost, _ := resChNodeRAMCost.Await()
 	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
 	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
-	resNodeLabels, _ := resChNodeLabels.Await()
+	resIsSpot, _ := resChIsSpot.Await()
 	resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
 	resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
 	resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
 	resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
 	resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
 	resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
 	resActiveMins, _ := resChActiveMins.Await()
 	resActiveMins, _ := resChActiveMins.Await()
+	resLabels, _ := resChLabels.Await()
 
 
 	if optionalCtx.HasErrors() {
 	if optionalCtx.HasErrors() {
 		for _, err := range optionalCtx.Errors() {
 		for _, err := range optionalCtx.Errors() {
@@ -497,6 +501,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 				ProviderID:   cp.ParseID(providerID),
 				ProviderID:   cp.ParseID(providerID),
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
+				Labels:       map[string]string{},
 			}
 			}
 		}
 		}
 		nodeMap[key].CPUCost += cpuCost
 		nodeMap[key].CPUCost += cpuCost
@@ -527,6 +532,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 				Name:         name,
 				Name:         name,
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
+				Labels:       map[string]string{},
 			}
 			}
 		}
 		}
 		node := nodeMap[key]
 		node := nodeMap[key]
@@ -567,6 +573,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 				ProviderID:   cp.ParseID(providerID),
 				ProviderID:   cp.ParseID(providerID),
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
+				Labels:       map[string]string{},
 			}
 			}
 		}
 		}
 		nodeMap[key].RAMCost += ramCost
 		nodeMap[key].RAMCost += ramCost
@@ -597,6 +604,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 				Name:         name,
 				Name:         name,
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
+				Labels:       map[string]string{},
 			}
 			}
 		}
 		}
 		nodeMap[key].RAMBytes = ramBytes
 		nodeMap[key].RAMBytes = ramBytes
@@ -628,6 +636,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 				ProviderID:   cp.ParseID(providerID),
 				ProviderID:   cp.ParseID(providerID),
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				CPUBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
 				RAMBreakdown: &ClusterCostsBreakdown{},
+				Labels:       map[string]string{},
 			}
 			}
 		}
 		}
 		nodeMap[key].GPUCost += gpuCost
 		nodeMap[key].GPUCost += gpuCost
@@ -784,7 +793,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	}
 	}
 
 
 	// Determine preemptibility with node labels
 	// Determine preemptibility with node labels
-	for _, result := range resNodeLabels {
+	for _, result := range resIsSpot {
 		nodeName, err := result.GetString("node")
 		nodeName, err := result.GetString("node")
 		if err != nil {
 		if err != nil {
 			continue
 			continue
@@ -807,6 +816,28 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		// TODO Azure preemptible
 		// TODO Azure preemptible
 	}
 	}
 
 
+	// Copy labels into node
+	for _, result := range resLabels {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+		node, err := result.GetString("kubernetes_node")
+		if err != nil {
+			log.DedupedWarningf(5, "ClusterNodes: label data missing node")
+			continue
+		}
+		key := fmt.Sprintf("%s/%s", cluster, node)
+		if _, ok := nodeMap[key]; !ok {
+			continue
+		}
+		for name, value := range result.Metric {
+			if val, ok := value.(string); ok {
+				nodeMap[key].Labels[name] = val
+			}
+		}
+	}
+
 	c, err := cp.GetConfig()
 	c, err := cp.GetConfig()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err

+ 165 - 80
pkg/kubecost/asset.go

@@ -15,6 +15,10 @@ import (
 
 
 const timeFmt = "2006-01-02T15:04:05-0700"
 const timeFmt = "2006-01-02T15:04:05-0700"
 
 
+// UndefinedKey is used in composing Asset group keys if the group does not have that property defined.
+// E.g. if aggregating on Cluster, Assets in the AssetSet where Asset has no cluster will be grouped under key "__undefined__"
+const UndefinedKey = "__undefined__"
+
 // Asset defines an entity within a cluster that has a defined cost over a
 // Asset defines an entity within a cluster that has a defined cost over a
 // given period of time.
 // given period of time.
 type Asset interface {
 type Asset interface {
@@ -57,51 +61,73 @@ type Asset interface {
 }
 }
 
 
 // key is used to determine uniqueness of an Asset, for instance during Insert
 // key is used to determine uniqueness of an Asset, for instance during Insert
-// to determine if two Assets should be combined. Passing nil props indicates
-// that all available props should be used. Passing empty props indicates that
-// no props should be used (e.g. to aggregate all assets). Passing one or more
-// props will key by only those props.
-func key(a Asset, props []AssetProperty) string {
+// to determine if two Assets should be combined. Passing `nil` `aggregateBy` indicates
+// that all available `AssetProperty` keys should be used. Passing empty `aggregateBy` indicates that
+// no key should be used (e.g. to aggregate all assets). Passing one or more `aggregateBy`
+// values will key by only those values.
+// Valid values of `aggregateBy` elements are strings which are an `AssetProperty`, and strings prefixed
+// with `"label:"`.
+func key(a Asset, aggregateBy []string) (string, error) {
 	keys := []string{}
 	keys := []string{}
 
 
-	if props == nil {
-		props = []AssetProperty{
-			AssetProviderProp,
-			AssetAccountProp,
-			AssetProjectProp,
-			AssetCategoryProp,
-			AssetClusterProp,
-			AssetTypeProp,
-			AssetServiceProp,
-			AssetProviderIDProp,
-			AssetNameProp,
+	if aggregateBy == nil {
+		aggregateBy = []string{
+			string(AssetProviderProp),
+			string(AssetAccountProp),
+			string(AssetProjectProp),
+			string(AssetCategoryProp),
+			string(AssetClusterProp),
+			string(AssetTypeProp),
+			string(AssetServiceProp),
+			string(AssetProviderIDProp),
+			string(AssetNameProp),
 		}
 		}
 	}
 	}
 
 
-	for _, prop := range props {
+	for _, s := range aggregateBy {
+		key := ""
 		switch true {
 		switch true {
-		case prop == AssetProviderProp && a.Properties().Provider != "":
-			keys = append(keys, a.Properties().Provider)
-		case prop == AssetAccountProp && a.Properties().Account != "":
-			keys = append(keys, a.Properties().Account)
-		case prop == AssetProjectProp && a.Properties().Project != "":
-			keys = append(keys, a.Properties().Project)
-		case prop == AssetClusterProp && a.Properties().Cluster != "":
-			keys = append(keys, a.Properties().Cluster)
-		case prop == AssetCategoryProp && a.Properties().Category != "":
-			keys = append(keys, a.Properties().Category)
-		case prop == AssetTypeProp && a.Type().String() != "":
-			keys = append(keys, a.Type().String())
-		case prop == AssetServiceProp && a.Properties().Service != "":
-			keys = append(keys, a.Properties().Service)
-		case prop == AssetProviderIDProp && a.Properties().ProviderID != "":
-			keys = append(keys, a.Properties().ProviderID)
-		case prop == AssetNameProp && a.Properties().Name != "":
-			keys = append(keys, a.Properties().Name)
+		case s == string(AssetProviderProp):
+			key = a.Properties().Provider
+		case s == string(AssetAccountProp):
+			key = a.Properties().Account
+		case s == string(AssetProjectProp):
+			key = a.Properties().Project
+		case s == string(AssetClusterProp):
+			key = a.Properties().Cluster
+		case s == string(AssetCategoryProp):
+			key = a.Properties().Category
+		case s == string(AssetTypeProp):
+			key = a.Type().String()
+		case s == string(AssetServiceProp):
+			key = a.Properties().Service
+		case s == string(AssetProviderIDProp):
+			key = a.Properties().ProviderID
+		case s == string(AssetNameProp):
+			key = a.Properties().Name
+		case strings.HasPrefix(s, "label:"):
+			if labelKey := strings.TrimPrefix(s, "label:"); labelKey != "" {
+				labelVal := a.Labels()[labelKey]
+				if labelVal == "" {
+					key = "__undefined__"
+				} else {
+					key = fmt.Sprintf("%s=%s", labelKey, labelVal)
+				}
+			} else {
+				// Don't allow aggregating on label ""
+				return "", fmt.Errorf("Attempted to aggregate on invalid key: %s", s)
+			}
+		default:
+			return "", fmt.Errorf("Attempted to aggregate on invalid key: %s", s)
 		}
 		}
-	}
 
 
-	return strings.Join(keys, "/")
+		if key != "" {
+			keys = append(keys, key)
+		} else {
+			keys = append(keys, UndefinedKey)
+		}
+	}
+	return strings.Join(keys, "/"), nil
 }
 }
 
 
 func toString(a Asset) string {
 func toString(a Asset) string {
@@ -2304,11 +2330,11 @@ func (sa *SharedAsset) String() string {
 // a window. An AssetSet is mutable, so treat it like a threadsafe map.
 // a window. An AssetSet is mutable, so treat it like a threadsafe map.
 type AssetSet struct {
 type AssetSet struct {
 	sync.RWMutex
 	sync.RWMutex
-	assets   map[string]Asset
-	props    []AssetProperty
-	Window   Window
-	Warnings []string
-	Errors   []string
+	aggregateBy []string
+	assets      map[string]Asset
+	Window      Window
+	Warnings    []string
+	Errors      []string
 }
 }
 
 
 // NewAssetSet instantiates a new AssetSet and, optionally, inserts
 // NewAssetSet instantiates a new AssetSet and, optionally, inserts
@@ -2329,7 +2355,7 @@ func NewAssetSet(start, end time.Time, assets ...Asset) *AssetSet {
 // AggregateBy aggregates the Assets in the AssetSet by the given list of
 // AggregateBy aggregates the Assets in the AssetSet by the given list of
 // AssetProperties, such that each asset is binned by a key determined by its
 // AssetProperties, such that each asset is binned by a key determined by its
 // relevant property values.
 // relevant property values.
-func (as *AssetSet) AggregateBy(props []AssetProperty, opts *AssetAggregationOptions) error {
+func (as *AssetSet) AggregateBy(aggregateBy []string, opts *AssetAggregationOptions) error {
 	if opts == nil {
 	if opts == nil {
 		opts = &AssetAggregationOptions{}
 		opts = &AssetAggregationOptions{}
 	}
 	}
@@ -2342,7 +2368,7 @@ func (as *AssetSet) AggregateBy(props []AssetProperty, opts *AssetAggregationOpt
 	defer as.Unlock()
 	defer as.Unlock()
 
 
 	aggSet := NewAssetSet(as.Start(), as.End())
 	aggSet := NewAssetSet(as.Start(), as.End())
-	aggSet.props = props
+	aggSet.aggregateBy = aggregateBy
 
 
 	// Compute hours of the given AssetSet, and if it ends in the future,
 	// Compute hours of the given AssetSet, and if it ends in the future,
 	// adjust the hours accordingly
 	// adjust the hours accordingly
@@ -2357,7 +2383,10 @@ func (as *AssetSet) AggregateBy(props []AssetProperty, opts *AssetAggregationOpt
 		sa := NewSharedAsset(name, as.Window.Clone())
 		sa := NewSharedAsset(name, as.Window.Clone())
 		sa.Cost = hourlyCost * hours
 		sa.Cost = hourlyCost * hours
 
 
-		aggSet.Insert(sa)
+		err := aggSet.Insert(sa)
+		if err != nil {
+			return err
+		}
 	}
 	}
 
 
 	// Delete the Assets that don't pass each filter
 	// Delete the Assets that don't pass each filter
@@ -2369,15 +2398,18 @@ func (as *AssetSet) AggregateBy(props []AssetProperty, opts *AssetAggregationOpt
 		}
 		}
 	}
 	}
 
 
-	// Insert each asset into the new set, which will be keyed by the props
+	// Insert each asset into the new set, which will be keyed by the `aggregateBy`
 	// on aggSet, resulting in aggregation.
 	// on aggSet, resulting in aggregation.
 	for _, asset := range as.assets {
 	for _, asset := range as.assets {
-		aggSet.Insert(asset)
+		err := aggSet.Insert(asset)
+		if err != nil {
+			return err
+		}
 	}
 	}
 
 
 	// Assign the aggregated values back to the original set
 	// Assign the aggregated values back to the original set
 	as.assets = aggSet.assets
 	as.assets = aggSet.assets
-	as.props = props
+	as.aggregateBy = aggregateBy
 
 
 	return nil
 	return nil
 }
 }
@@ -2392,24 +2424,26 @@ func (as *AssetSet) Clone() *AssetSet {
 	as.RLock()
 	as.RLock()
 	defer as.RUnlock()
 	defer as.RUnlock()
 
 
-	assets := make(map[string]Asset, len(as.assets))
-	for k, v := range as.assets {
-		assets[k] = v.Clone()
+	var aggregateBy []string
+	if as.aggregateBy != nil {
+		aggregateBy := []string{}
+		for _, s := range as.aggregateBy {
+			aggregateBy = append(aggregateBy, s)
+		}
 	}
 	}
 
 
-	var props []AssetProperty
-	if as.props != nil {
-		props = make([]AssetProperty, len(as.props))
-		copy(props, as.props)
+	assets := map[string]Asset{}
+	for k, v := range as.assets {
+		assets[k] = v.Clone()
 	}
 	}
 
 
 	s := as.Start()
 	s := as.Start()
 	e := as.End()
 	e := as.End()
 
 
 	return &AssetSet{
 	return &AssetSet{
-		Window: NewWindow(&s, &e),
-		assets: assets,
-		props:  props,
+		Window:      NewWindow(&s, &e),
+		aggregateBy: aggregateBy,
+		assets:      assets,
 	}
 	}
 }
 }
 
 
@@ -2432,23 +2466,28 @@ func (as *AssetSet) End() time.Time {
 // FindMatch attempts to find a match in the AssetSet for the given Asset on
 // FindMatch attempts to find a match in the AssetSet for the given Asset on
 // the provided properties and labels. If a match is not found, FindMatch
 // the provided properties and labels. If a match is not found, FindMatch
 // returns nil and a Not Found error.
 // returns nil and a Not Found error.
-func (as *AssetSet) FindMatch(query Asset, props []AssetProperty) (Asset, error) {
+func (as *AssetSet) FindMatch(query Asset, aggregateBy []string) (Asset, error) {
 	as.RLock()
 	as.RLock()
 	defer as.RUnlock()
 	defer as.RUnlock()
 
 
-	matchKey := key(query, props)
+	matchKey, err := key(query, aggregateBy)
+	if err != nil {
+		return nil, err
+	}
 	for _, asset := range as.assets {
 	for _, asset := range as.assets {
-		if key(asset, props) == matchKey {
+		if k, err := key(asset, aggregateBy); err != nil {
+			return nil, err
+		} else if k == matchKey {
 			return asset, nil
 			return asset, nil
 		}
 		}
 	}
 	}
 
 
-	return nil, fmt.Errorf("Asset not found to match %s on %v", query, props)
+	return nil, fmt.Errorf("Asset not found to match %s on %v", query, aggregateBy)
 }
 }
 
 
 // ReconciliationMatch attempts to find an exact match in the AssetSet on
 // ReconciliationMatch attempts to find an exact match in the AssetSet on
 // (Category, ProviderID). If a match is found, it returns the Asset with the
 // (Category, ProviderID). If a match is found, it returns the Asset with the
-// intent to adjuts it. If no match exists, it attempts to find one on only
+// intent to adjust it. If no match exists, it attempts to find one on only
 // (ProviderID). If that match is found, it returns the Asset with the intent
 // (ProviderID). If that match is found, it returns the Asset with the intent
 // to insert the associated Cloud cost.
 // to insert the associated Cloud cost.
 func (as *AssetSet) ReconciliationMatch(query Asset) (Asset, bool, error) {
 func (as *AssetSet) ReconciliationMatch(query Asset) (Asset, bool, error) {
@@ -2456,20 +2495,36 @@ func (as *AssetSet) ReconciliationMatch(query Asset) (Asset, bool, error) {
 	defer as.RUnlock()
 	defer as.RUnlock()
 
 
 	// Full match means matching on (Category, ProviderID)
 	// Full match means matching on (Category, ProviderID)
-	fullMatchProps := []AssetProperty{AssetCategoryProp, AssetProviderIDProp}
-	fullMatchKey := key(query, fullMatchProps)
+	fullMatchProps := []string{string(AssetCategoryProp), string(AssetProviderIDProp)}
+	fullMatchKey, err := key(query, fullMatchProps)
+
+	// This should never happen because we are using enumerated properties,
+	// but the check is here in case that changes
+	if err != nil {
+		return nil, false, err
+	}
 
 
 	// Partial match means matching only on (ProviderID)
 	// Partial match means matching only on (ProviderID)
-	providerIDMatchProps := []AssetProperty{AssetProviderIDProp}
-	providerIDMatchKey := key(query, providerIDMatchProps)
+	providerIDMatchProps := []string{string(AssetProviderIDProp)}
+	providerIDMatchKey, err := key(query, providerIDMatchProps)
+
+	// This should never happen because we are using enumerated properties,
+	// but the check is here in case that changes
+	if err != nil {
+		return nil, false, err
+	}
 
 
 	var providerIDMatch Asset
 	var providerIDMatch Asset
 	for _, asset := range as.assets {
 	for _, asset := range as.assets {
-		if key(asset, fullMatchProps) == fullMatchKey {
+		if k, err := key(asset, fullMatchProps); err != nil {
+			return nil, false, err
+		} else if k == fullMatchKey {
 			log.DedupedInfof(10, "Asset ETL: Reconciliation[rcnw]: ReconcileRange Match: %s", fullMatchKey)
 			log.DedupedInfof(10, "Asset ETL: Reconciliation[rcnw]: ReconcileRange Match: %s", fullMatchKey)
 			return asset, true, nil
 			return asset, true, nil
 		}
 		}
-		if key(asset, providerIDMatchProps) == providerIDMatchKey {
+		if k, err := key(asset, providerIDMatchProps); err != nil {
+			return nil, false, err
+		} else if k == providerIDMatchKey {
 			// Found a partial match. Save it until after all other options
 			// Found a partial match. Save it until after all other options
 			// have been checked for full matches.
 			// have been checked for full matches.
 			providerIDMatch = asset
 			providerIDMatch = asset
@@ -2510,7 +2565,10 @@ func (as *AssetSet) Insert(asset Asset) error {
 	defer as.Unlock()
 	defer as.Unlock()
 
 
 	// Determine key into which to Insert the Asset.
 	// Determine key into which to Insert the Asset.
-	k := key(asset, as.props)
+	k, err := key(asset, as.aggregateBy)
+	if err != nil {
+		return err
+	}
 
 
 	// Add the given Asset to the existing entry, if there is one;
 	// Add the given Asset to the existing entry, if there is one;
 	// otherwise just set directly into assets
 	// otherwise just set directly into assets
@@ -2565,7 +2623,7 @@ func (as *AssetSet) MarshalJSON() ([]byte, error) {
 	return json.Marshal(as.assets)
 	return json.Marshal(as.assets)
 }
 }
 
 
-func (as *AssetSet) Set(asset Asset, props []AssetProperty) {
+func (as *AssetSet) Set(asset Asset, aggregateBy []string) error {
 	if as.IsEmpty() {
 	if as.IsEmpty() {
 		as.Lock()
 		as.Lock()
 		as.assets = map[string]Asset{}
 		as.assets = map[string]Asset{}
@@ -2577,7 +2635,12 @@ func (as *AssetSet) Set(asset Asset, props []AssetProperty) {
 
 
 	// Expand the window to match the AssetSet, then set it
 	// Expand the window to match the AssetSet, then set it
 	asset.ExpandWindow(as.Window)
 	asset.ExpandWindow(as.Window)
-	as.assets[key(asset, props)] = asset
+	k, err := key(asset, aggregateBy)
+	if err != nil {
+		return err
+	}
+	as.assets[k] = asset
+	return nil
 }
 }
 
 
 func (as *AssetSet) Start() time.Time {
 func (as *AssetSet) Start() time.Time {
@@ -2612,11 +2675,11 @@ func (as *AssetSet) accumulate(that *AssetSet) (*AssetSet, error) {
 	}
 	}
 
 
 	// In the case of an AssetSetRange with empty entries, we may end up with
 	// In the case of an AssetSetRange with empty entries, we may end up with
-	// an incoming as without props, even though we are trying to aggregate
-	// by props. This handles that case, assigning the correct props.
-	if !propsEqual(as.props, that.props) {
-		if len(as.props) == 0 {
-			as.props = that.props
+	// an incoming `as` without an `aggregateBy`, even though we are tring to
+	// aggregate here. This handles that case by assigning the correct `aggregateBy`.
+	if !sameContents(as.aggregateBy, that.aggregateBy) {
+		if len(as.aggregateBy) == 0 {
+			as.aggregateBy = that.aggregateBy
 		}
 		}
 	}
 	}
 
 
@@ -2635,7 +2698,7 @@ func (as *AssetSet) accumulate(that *AssetSet) (*AssetSet, error) {
 	}
 	}
 
 
 	acc := NewAssetSet(start, end)
 	acc := NewAssetSet(start, end)
-	acc.props = as.props
+	acc.aggregateBy = as.aggregateBy
 
 
 	as.RLock()
 	as.RLock()
 	defer as.RUnlock()
 	defer as.RUnlock()
@@ -2695,14 +2758,14 @@ type AssetAggregationOptions struct {
 	FilterFuncs       []AssetMatchFunc
 	FilterFuncs       []AssetMatchFunc
 }
 }
 
 
-func (asr *AssetSetRange) AggregateBy(props []AssetProperty, opts *AssetAggregationOptions) error {
+func (asr *AssetSetRange) AggregateBy(aggregateBy []string, opts *AssetAggregationOptions) error {
 	aggRange := &AssetSetRange{assets: []*AssetSet{}}
 	aggRange := &AssetSetRange{assets: []*AssetSet{}}
 
 
 	asr.Lock()
 	asr.Lock()
 	defer asr.Unlock()
 	defer asr.Unlock()
 
 
 	for _, as := range asr.assets {
 	for _, as := range asr.assets {
-		err := as.AggregateBy(props, opts)
+		err := as.AggregateBy(aggregateBy, opts)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -2809,3 +2872,25 @@ func jsonEncode(buffer *bytes.Buffer, name string, obj interface{}, comma string
 	}
 	}
 	buffer.WriteString(comma)
 	buffer.WriteString(comma)
 }
 }
+
+// Returns true if string slices a and b contain all of the same strings, in any order.
+func sameContents(a, b []string) bool {
+	if len(a) != len(b) {
+		return false
+	}
+	for i := range a {
+		if !contains(b, a[i]) {
+			return false
+		}
+	}
+	return true
+}
+
+func contains(slice []string, item string) bool {
+	for _, element := range slice {
+		if element == item {
+			return true
+		}
+	}
+	return false
+}

+ 48 - 34
pkg/kubecost/asset_test.go

@@ -618,6 +618,7 @@ func TestAssetSet_AggregateBy(t *testing.T) {
 	// 1b []AssetProperty=[Type]
 	// 1b []AssetProperty=[Type]
 	// 1c []AssetProperty=[Nil]
 	// 1c []AssetProperty=[Nil]
 	// 1d []AssetProperty=nil
 	// 1d []AssetProperty=nil
+	// 1e aggregateBy []string=["label:test"]
 
 
 	// 2  Multi-aggregation
 	// 2  Multi-aggregation
 	// 2a []AssetProperty=[Cluster,Type]
 	// 2a []AssetProperty=[Cluster,Type]
@@ -636,7 +637,7 @@ func TestAssetSet_AggregateBy(t *testing.T) {
 
 
 	// 1a []AssetProperty=[Cluster]
 	// 1a []AssetProperty=[Cluster]
 	as = generateAssetSet(startYesterday)
 	as = generateAssetSet(startYesterday)
-	err = as.AggregateBy([]AssetProperty{AssetClusterProp}, nil)
+	err = as.AggregateBy([]string{string(AssetClusterProp)}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 	}
 	}
@@ -648,7 +649,7 @@ func TestAssetSet_AggregateBy(t *testing.T) {
 
 
 	// 1b []AssetProperty=[Type]
 	// 1b []AssetProperty=[Type]
 	as = generateAssetSet(startYesterday)
 	as = generateAssetSet(startYesterday)
-	err = as.AggregateBy([]AssetProperty{AssetTypeProp}, nil)
+	err = as.AggregateBy([]string{string(AssetTypeProp)}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 	}
 	}
@@ -660,7 +661,7 @@ func TestAssetSet_AggregateBy(t *testing.T) {
 
 
 	// 1c []AssetProperty=[Nil]
 	// 1c []AssetProperty=[Nil]
 	as = generateAssetSet(startYesterday)
 	as = generateAssetSet(startYesterday)
-	err = as.AggregateBy([]AssetProperty{}, nil)
+	err = as.AggregateBy([]string{}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 	}
 	}
@@ -675,24 +676,36 @@ func TestAssetSet_AggregateBy(t *testing.T) {
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 	}
 	}
 	assertAssetSet(t, as, "1d", window, map[string]float64{
 	assertAssetSet(t, as, "1d", window, map[string]float64{
-		"Compute/cluster1/Node/Kubernetes/gcp-node1/node1":     7.00,
-		"Compute/cluster1/Node/Kubernetes/gcp-node2/node2":     5.50,
-		"Compute/cluster1/Node/Kubernetes/gcp-node3/node3":     6.50,
-		"Storage/cluster1/Disk/Kubernetes/gcp-disk1/disk1":     2.50,
-		"Storage/cluster1/Disk/Kubernetes/gcp-disk2/disk2":     1.50,
-		"GCP/Management/cluster1/ClusterManagement/Kubernetes": 3.00,
-		"Compute/cluster2/Node/Kubernetes/gcp-node4/node4":     11.00,
-		"Storage/cluster2/Disk/Kubernetes/gcp-disk3/disk3":     2.50,
-		"Storage/cluster2/Disk/Kubernetes/gcp-disk4/disk4":     1.50,
-		"GCP/Management/cluster2/ClusterManagement/Kubernetes": 0.00,
-		"Compute/cluster3/Node/Kubernetes/aws-node5/node5":     19.00,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster1/Node/Kubernetes/gcp-node1/node1":                   7.00,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster1/Node/Kubernetes/gcp-node2/node2":                   5.50,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster1/Node/Kubernetes/gcp-node3/node3":                   6.50,
+		"__undefined__/__undefined__/__undefined__/Storage/cluster1/Disk/Kubernetes/gcp-disk1/disk1":                   2.50,
+		"__undefined__/__undefined__/__undefined__/Storage/cluster1/Disk/Kubernetes/gcp-disk2/disk2":                   1.50,
+		"GCP/__undefined__/__undefined__/Management/cluster1/ClusterManagement/Kubernetes/__undefined__/__undefined__": 3.00,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster2/Node/Kubernetes/gcp-node4/node4":                   11.00,
+		"__undefined__/__undefined__/__undefined__/Storage/cluster2/Disk/Kubernetes/gcp-disk3/disk3":                   2.50,
+		"__undefined__/__undefined__/__undefined__/Storage/cluster2/Disk/Kubernetes/gcp-disk4/disk4":                   1.50,
+		"GCP/__undefined__/__undefined__/Management/cluster2/ClusterManagement/Kubernetes/__undefined__/__undefined__": 0.00,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster3/Node/Kubernetes/aws-node5/node5":                   19.00,
+	}, nil)
+
+	// 1e aggregateBy []string=["label:test"]
+	as = generateAssetSet(startYesterday)
+	err = as.AggregateBy([]string{"label:test"}, nil)
+	if err != nil {
+		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
+	}
+	fmt.Println(as.assets)
+	assertAssetSet(t, as, "1e", window, map[string]float64{
+		"__undefined__": 53.00,
+		"test=test":     7.00,
 	}, nil)
 	}, nil)
 
 
 	// 2  Multi-aggregation
 	// 2  Multi-aggregation
 
 
 	// 2a []AssetProperty=[Cluster,Type]
 	// 2a []AssetProperty=[Cluster,Type]
 	as = generateAssetSet(startYesterday)
 	as = generateAssetSet(startYesterday)
-	err = as.AggregateBy([]AssetProperty{AssetClusterProp, AssetTypeProp}, nil)
+	err = as.AggregateBy([]string{string(AssetClusterProp), string(AssetTypeProp)}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSet.AggregateBy: unexpected error: %s", err)
 	}
 	}
@@ -710,7 +723,7 @@ func TestAssetSet_AggregateBy(t *testing.T) {
 
 
 	// 3a Shared hourly cost > 0.0
 	// 3a Shared hourly cost > 0.0
 	as = generateAssetSet(startYesterday)
 	as = generateAssetSet(startYesterday)
-	err = as.AggregateBy([]AssetProperty{AssetTypeProp}, &AssetAggregationOptions{
+	err = as.AggregateBy([]string{string(AssetTypeProp)}, &AssetAggregationOptions{
 		SharedHourlyCosts: map[string]float64{"shared1": 0.5},
 		SharedHourlyCosts: map[string]float64{"shared1": 0.5},
 	})
 	})
 	if err != nil {
 	if err != nil {
@@ -737,7 +750,7 @@ func TestAssetSet_FindMatch(t *testing.T) {
 	// Assert success of a simple match of Type and ProviderID
 	// Assert success of a simple match of Type and ProviderID
 	as = generateAssetSet(startYesterday)
 	as = generateAssetSet(startYesterday)
 	query = NewNode("", "", "gcp-node3", s, e, w)
 	query = NewNode("", "", "gcp-node3", s, e, w)
-	match, err = as.FindMatch(query, []AssetProperty{AssetTypeProp, AssetProviderIDProp})
+	match, err = as.FindMatch(query, []string{string(AssetTypeProp), string(AssetProviderIDProp)})
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSet.FindMatch: unexpected error: %s", err)
 		t.Fatalf("AssetSet.FindMatch: unexpected error: %s", err)
 	}
 	}
@@ -745,7 +758,7 @@ func TestAssetSet_FindMatch(t *testing.T) {
 	// Assert error of a simple non-match of Type and ProviderID
 	// Assert error of a simple non-match of Type and ProviderID
 	as = generateAssetSet(startYesterday)
 	as = generateAssetSet(startYesterday)
 	query = NewNode("", "", "aws-node3", s, e, w)
 	query = NewNode("", "", "aws-node3", s, e, w)
-	match, err = as.FindMatch(query, []AssetProperty{AssetTypeProp, AssetProviderIDProp})
+	match, err = as.FindMatch(query, []string{string(AssetTypeProp), string(AssetProviderIDProp)})
 	if err == nil {
 	if err == nil {
 		t.Fatalf("AssetSet.FindMatch: expected error (no match); found %s", match)
 		t.Fatalf("AssetSet.FindMatch: expected error (no match); found %s", match)
 	}
 	}
@@ -753,7 +766,7 @@ func TestAssetSet_FindMatch(t *testing.T) {
 	// Assert error of matching ProviderID, but not Type
 	// Assert error of matching ProviderID, but not Type
 	as = generateAssetSet(startYesterday)
 	as = generateAssetSet(startYesterday)
 	query = NewCloud(ComputeCategory, "gcp-node3", s, e, w)
 	query = NewCloud(ComputeCategory, "gcp-node3", s, e, w)
-	match, err = as.FindMatch(query, []AssetProperty{AssetTypeProp, AssetProviderIDProp})
+	match, err = as.FindMatch(query, []string{string(AssetTypeProp), string(AssetProviderIDProp)})
 	if err == nil {
 	if err == nil {
 		t.Fatalf("AssetSet.FindMatch: expected error (no match); found %s", match)
 		t.Fatalf("AssetSet.FindMatch: expected error (no match); found %s", match)
 	}
 	}
@@ -784,17 +797,17 @@ func TestAssetSetRange_Accumulate(t *testing.T) {
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
 	}
 	}
 	assertAssetSet(t, as, "1a", window, map[string]float64{
 	assertAssetSet(t, as, "1a", window, map[string]float64{
-		"Compute/cluster1/Node/Kubernetes/gcp-node1/node1":     21.00,
-		"Compute/cluster1/Node/Kubernetes/gcp-node2/node2":     16.50,
-		"Compute/cluster1/Node/Kubernetes/gcp-node3/node3":     19.50,
-		"Storage/cluster1/Disk/Kubernetes/gcp-disk1/disk1":     7.50,
-		"Storage/cluster1/Disk/Kubernetes/gcp-disk2/disk2":     4.50,
-		"GCP/Management/cluster1/ClusterManagement/Kubernetes": 9.00,
-		"Compute/cluster2/Node/Kubernetes/gcp-node4/node4":     33.00,
-		"Storage/cluster2/Disk/Kubernetes/gcp-disk3/disk3":     7.50,
-		"Storage/cluster2/Disk/Kubernetes/gcp-disk4/disk4":     4.50,
-		"GCP/Management/cluster2/ClusterManagement/Kubernetes": 0.00,
-		"Compute/cluster3/Node/Kubernetes/aws-node5/node5":     57.00,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster1/Node/Kubernetes/gcp-node1/node1":                   21.00,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster1/Node/Kubernetes/gcp-node2/node2":                   16.50,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster1/Node/Kubernetes/gcp-node3/node3":                   19.50,
+		"__undefined__/__undefined__/__undefined__/Storage/cluster1/Disk/Kubernetes/gcp-disk1/disk1":                   7.50,
+		"__undefined__/__undefined__/__undefined__/Storage/cluster1/Disk/Kubernetes/gcp-disk2/disk2":                   4.50,
+		"GCP/__undefined__/__undefined__/Management/cluster1/ClusterManagement/Kubernetes/__undefined__/__undefined__": 9.00,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster2/Node/Kubernetes/gcp-node4/node4":                   33.00,
+		"__undefined__/__undefined__/__undefined__/Storage/cluster2/Disk/Kubernetes/gcp-disk3/disk3":                   7.50,
+		"__undefined__/__undefined__/__undefined__/Storage/cluster2/Disk/Kubernetes/gcp-disk4/disk4":                   4.50,
+		"GCP/__undefined__/__undefined__/Management/cluster2/ClusterManagement/Kubernetes/__undefined__/__undefined__": 0.00,
+		"__undefined__/__undefined__/__undefined__/Compute/cluster3/Node/Kubernetes/aws-node5/node5":                   57.00,
 	}, nil)
 	}, nil)
 
 
 	asr = NewAssetSetRange(
 	asr = NewAssetSetRange(
@@ -802,7 +815,7 @@ func TestAssetSetRange_Accumulate(t *testing.T) {
 		generateAssetSet(startD1),
 		generateAssetSet(startD1),
 		generateAssetSet(startD2),
 		generateAssetSet(startD2),
 	)
 	)
-	err = asr.AggregateBy([]AssetProperty{}, nil)
+	err = asr.AggregateBy([]string{}, nil)
 	as, err = asr.Accumulate()
 	as, err = asr.Accumulate()
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
@@ -816,7 +829,7 @@ func TestAssetSetRange_Accumulate(t *testing.T) {
 		generateAssetSet(startD1),
 		generateAssetSet(startD1),
 		generateAssetSet(startD2),
 		generateAssetSet(startD2),
 	)
 	)
-	err = asr.AggregateBy([]AssetProperty{AssetTypeProp}, nil)
+	err = asr.AggregateBy([]string{string(AssetTypeProp)}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
 	}
 	}
@@ -835,7 +848,7 @@ func TestAssetSetRange_Accumulate(t *testing.T) {
 		generateAssetSet(startD1),
 		generateAssetSet(startD1),
 		generateAssetSet(startD2),
 		generateAssetSet(startD2),
 	)
 	)
-	err = asr.AggregateBy([]AssetProperty{AssetClusterProp}, nil)
+	err = asr.AggregateBy([]string{string(AssetClusterProp)}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
 	}
 	}
@@ -856,7 +869,7 @@ func TestAssetSetRange_Accumulate(t *testing.T) {
 		generateAssetSet(startD1),
 		generateAssetSet(startD1),
 		generateAssetSet(startD2),
 		generateAssetSet(startD2),
 	)
 	)
-	err = asr.AggregateBy([]AssetProperty{AssetTypeProp}, nil)
+	err = asr.AggregateBy([]string{string(AssetTypeProp)}, nil)
 	as, err = asr.Accumulate()
 	as, err = asr.Accumulate()
 	if err != nil {
 	if err != nil {
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
 		t.Fatalf("AssetSetRange.AggregateBy: unexpected error: %s", err)
@@ -935,6 +948,7 @@ func generateAssetSet(start time.Time) *AssetSet {
 	node1.CPUCoreHours = 2.0 * hours
 	node1.CPUCoreHours = 2.0 * hours
 	node1.RAMByteHours = 4.0 * gb * hours
 	node1.RAMByteHours = 4.0 * gb * hours
 	node1.SetAdjustment(1.0)
 	node1.SetAdjustment(1.0)
+	node1.SetLabels(map[string]string{"test": "test"})
 
 
 	node2 := NewNode("node2", "cluster1", "gcp-node2", *window.Clone().start, *window.Clone().end, window.Clone())
 	node2 := NewNode("node2", "cluster1", "gcp-node2", *window.Clone().start, *window.Clone().end, window.Clone())
 	node2.CPUCost = 4.0
 	node2.CPUCost = 4.0

+ 1 - 1
pkg/kubecost/assetprops.go

@@ -55,7 +55,7 @@ func ParseAssetProperty(text string) (AssetProperty, error) {
 		return AssetProjectProp, nil
 		return AssetProjectProp, nil
 	case "provider":
 	case "provider":
 		return AssetProviderProp, nil
 		return AssetProviderProp, nil
-	case "providerID":
+	case "providerid":
 		return AssetProviderIDProp, nil
 		return AssetProviderIDProp, nil
 	case "service":
 	case "service":
 		return AssetServiceProp, nil
 		return AssetServiceProp, nil

+ 1 - 1
pkg/kubecost/bingen.go

@@ -21,4 +21,4 @@ package kubecost
 // @bingen:generate:AllocationSet
 // @bingen:generate:AllocationSet
 // @bingen:generate:AllocationSetRange
 // @bingen:generate:AllocationSetRange
 
 
-//go:generate bingen -package=kubecost -version=4 -buffer=github.com/kubecost/cost-model/pkg/util
+//go:generate bingen -package=kubecost -version=5 -buffer=github.com/kubecost/cost-model/pkg/util

+ 80 - 88
pkg/kubecost/kubecost_codecs.go

@@ -14,11 +14,10 @@ package kubecost
 import (
 import (
 	"encoding"
 	"encoding"
 	"fmt"
 	"fmt"
+	util "github.com/kubecost/cost-model/pkg/util"
 	"reflect"
 	"reflect"
 	"strings"
 	"strings"
 	"time"
 	"time"
-
-	util "github.com/kubecost/cost-model/pkg/util"
 )
 )
 
 
 const (
 const (
@@ -26,7 +25,7 @@ const (
 	GeneratorPackageName string = "kubecost"
 	GeneratorPackageName string = "kubecost"
 
 
 	// CodecVersion is the version passed into the generator
 	// CodecVersion is the version passed into the generator
-	CodecVersion uint8 = 4
+	CodecVersion uint8 = 6
 )
 )
 
 
 //--------------------------------------------------------------------------
 //--------------------------------------------------------------------------
@@ -421,8 +420,8 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) (err error) {
 		target.allocations = nil
 		target.allocations = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]*Allocation) ---
 		// --- [begin][read][map](map[string]*Allocation) ---
+		a := make(map[string]*Allocation)
 		b := buff.ReadInt() // map len
 		b := buff.ReadInt() // map len
-		a := make(map[string]*Allocation, b)
 		for i := 0; i < b; i++ {
 		for i := 0; i < b; i++ {
 			var k string
 			var k string
 			c := buff.ReadString() // read string
 			c := buff.ReadString() // read string
@@ -454,8 +453,8 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) (err error) {
 		target.idleKeys = nil
 		target.idleKeys = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]bool) ---
 		// --- [begin][read][map](map[string]bool) ---
+		g := make(map[string]bool)
 		h := buff.ReadInt() // map len
 		h := buff.ReadInt() // map len
-		g := make(map[string]bool, h)
 		for j := 0; j < h; j++ {
 		for j := 0; j < h; j++ {
 			var kk string
 			var kk string
 			l := buff.ReadString() // read string
 			l := buff.ReadString() // read string
@@ -745,8 +744,8 @@ func (target *Any) UnmarshalBinary(data []byte) (err error) {
 		a = nil
 		a = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]string) ---
 		// --- [begin][read][map](map[string]string) ---
+		b := make(map[string]string)
 		c := buff.ReadInt() // map len
 		c := buff.ReadInt() // map len
-		b := make(map[string]string, c)
 		for i := 0; i < c; i++ {
 		for i := 0; i < c; i++ {
 			var k string
 			var k string
 			d := buff.ReadString() // read string
 			d := buff.ReadString() // read string
@@ -930,6 +929,19 @@ func (target *AssetSet) MarshalBinary() (data []byte, err error) {
 	buff := util.NewBuffer()
 	buff := util.NewBuffer()
 	buff.WriteUInt8(CodecVersion) // version
 	buff.WriteUInt8(CodecVersion) // version
 
 
+	if target.aggregateBy == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][slice]([]string) ---
+		buff.WriteInt(len(target.aggregateBy)) // array length
+		for i := 0; i < len(target.aggregateBy); i++ {
+			buff.WriteString(target.aggregateBy[i]) // write string
+		}
+		// --- [end][write][slice]([]string) ---
+
+	}
 	if target.assets == nil {
 	if target.assets == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
 		buff.WriteUInt8(uint8(0)) // write nil byte
 	} else {
 	} else {
@@ -963,22 +975,6 @@ func (target *AssetSet) MarshalBinary() (data []byte, err error) {
 		}
 		}
 		// --- [end][write][map](map[string]Asset) ---
 		// --- [end][write][map](map[string]Asset) ---
 
 
-	}
-	if target.props == nil {
-		buff.WriteUInt8(uint8(0)) // write nil byte
-	} else {
-		buff.WriteUInt8(uint8(1)) // write non-nil byte
-
-		// --- [begin][write][slice]([]AssetProperty) ---
-		buff.WriteInt(len(target.props)) // array length
-		for i := 0; i < len(target.props); i++ {
-			// --- [begin][write][alias](AssetProperty) ---
-			buff.WriteString(string(target.props[i])) // write string
-			// --- [end][write][alias](AssetProperty) ---
-
-		}
-		// --- [end][write][slice]([]AssetProperty) ---
-
 	}
 	}
 	// --- [begin][write][struct](Window) ---
 	// --- [begin][write][struct](Window) ---
 	d, errB := target.Window.MarshalBinary()
 	d, errB := target.Window.MarshalBinary()
@@ -1042,93 +1038,89 @@ func (target *AssetSet) UnmarshalBinary(data []byte) (err error) {
 		return fmt.Errorf("Invalid Version Unmarshaling AssetSet. Expected %d, got %d", CodecVersion, version)
 		return fmt.Errorf("Invalid Version Unmarshaling AssetSet. Expected %d, got %d", CodecVersion, version)
 	}
 	}
 
 
+	if buff.ReadUInt8() == uint8(0) {
+		target.aggregateBy = nil
+	} else {
+		// --- [begin][read][slice]([]string) ---
+		b := buff.ReadInt() // array len
+		a := make([]string, b)
+		for i := 0; i < b; i++ {
+			var c string
+			d := buff.ReadString() // read string
+			c = d
+
+			a[i] = c
+		}
+		target.aggregateBy = a
+		// --- [end][read][slice]([]string) ---
+
+	}
 	if buff.ReadUInt8() == uint8(0) {
 	if buff.ReadUInt8() == uint8(0) {
 		target.assets = nil
 		target.assets = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]Asset) ---
 		// --- [begin][read][map](map[string]Asset) ---
-		b := buff.ReadInt() // map len
-		a := make(map[string]Asset, b)
-		for i := 0; i < b; i++ {
+		e := make(map[string]Asset)
+		f := buff.ReadInt() // map len
+		for j := 0; j < f; j++ {
 			var k string
 			var k string
-			c := buff.ReadString() // read string
-			k = c
+			g := buff.ReadString() // read string
+			k = g
 
 
 			var v Asset
 			var v Asset
 			if buff.ReadUInt8() == uint8(0) {
 			if buff.ReadUInt8() == uint8(0) {
 				v = nil
 				v = nil
 			} else {
 			} else {
 				// --- [begin][read][interface](Asset) ---
 				// --- [begin][read][interface](Asset) ---
-				d := buff.ReadString()
-				_, e, _ := resolveType(d)
-				if _, ok := typeMap[e]; !ok {
-					return fmt.Errorf("Unknown Type: %s", e)
+				h := buff.ReadString()
+				_, l, _ := resolveType(h)
+				if _, ok := typeMap[l]; !ok {
+					return fmt.Errorf("Unknown Type: %s", l)
 				}
 				}
-				f, okA := reflect.New(typeMap[e]).Interface().(interface{ UnmarshalBinary([]byte) error })
+				m, okA := reflect.New(typeMap[l]).Interface().(interface{ UnmarshalBinary([]byte) error })
 				if !okA {
 				if !okA {
-					return fmt.Errorf("Type: %s does not implement UnmarshalBinary([]byte) error", e)
+					return fmt.Errorf("Type: %s does not implement UnmarshalBinary([]byte) error", l)
 				}
 				}
-				g := buff.ReadInt()    // byte array length
-				h := buff.ReadBytes(g) // byte array
-				errA := f.UnmarshalBinary(h)
+				n := buff.ReadInt()    // byte array length
+				o := buff.ReadBytes(n) // byte array
+				errA := m.UnmarshalBinary(o)
 				if errA != nil {
 				if errA != nil {
 					return errA
 					return errA
 				}
 				}
-				v = f.(Asset)
+				v = m.(Asset)
 				// --- [end][read][interface](Asset) ---
 				// --- [end][read][interface](Asset) ---
 
 
 			}
 			}
-			a[k] = v
+			e[k] = v
 		}
 		}
-		target.assets = a
+		target.assets = e
 		// --- [end][read][map](map[string]Asset) ---
 		// --- [end][read][map](map[string]Asset) ---
 
 
-	}
-	if buff.ReadUInt8() == uint8(0) {
-		target.props = nil
-	} else {
-		// --- [begin][read][slice]([]AssetProperty) ---
-		m := buff.ReadInt() // array len
-		l := make([]AssetProperty, m)
-		for j := 0; j < m; j++ {
-			// --- [begin][read][alias](AssetProperty) ---
-			var o string
-			p := buff.ReadString() // read string
-			o = p
-
-			n := AssetProperty(o)
-			// --- [end][read][alias](AssetProperty) ---
-
-			l[j] = n
-		}
-		target.props = l
-		// --- [end][read][slice]([]AssetProperty) ---
-
 	}
 	}
 	// --- [begin][read][struct](Window) ---
 	// --- [begin][read][struct](Window) ---
-	q := &Window{}
-	r := buff.ReadInt()    // byte array length
-	s := buff.ReadBytes(r) // byte array
-	errB := q.UnmarshalBinary(s)
+	p := &Window{}
+	q := buff.ReadInt()    // byte array length
+	r := buff.ReadBytes(q) // byte array
+	errB := p.UnmarshalBinary(r)
 	if errB != nil {
 	if errB != nil {
 		return errB
 		return errB
 	}
 	}
-	target.Window = *q
+	target.Window = *p
 	// --- [end][read][struct](Window) ---
 	// --- [end][read][struct](Window) ---
 
 
 	if buff.ReadUInt8() == uint8(0) {
 	if buff.ReadUInt8() == uint8(0) {
 		target.Warnings = nil
 		target.Warnings = nil
 	} else {
 	} else {
 		// --- [begin][read][slice]([]string) ---
 		// --- [begin][read][slice]([]string) ---
-		u := buff.ReadInt() // array len
-		t := make([]string, u)
-		for ii := 0; ii < u; ii++ {
-			var w string
-			x := buff.ReadString() // read string
-			w = x
+		t := buff.ReadInt() // array len
+		s := make([]string, t)
+		for ii := 0; ii < t; ii++ {
+			var u string
+			w := buff.ReadString() // read string
+			u = w
 
 
-			t[ii] = w
+			s[ii] = u
 		}
 		}
-		target.Warnings = t
+		target.Warnings = s
 		// --- [end][read][slice]([]string) ---
 		// --- [end][read][slice]([]string) ---
 
 
 	}
 	}
@@ -1136,16 +1128,16 @@ func (target *AssetSet) UnmarshalBinary(data []byte) (err error) {
 		target.Errors = nil
 		target.Errors = nil
 	} else {
 	} else {
 		// --- [begin][read][slice]([]string) ---
 		// --- [begin][read][slice]([]string) ---
-		z := buff.ReadInt() // array len
-		y := make([]string, z)
-		for jj := 0; jj < z; jj++ {
-			var aa string
-			bb := buff.ReadString() // read string
-			aa = bb
+		y := buff.ReadInt() // array len
+		x := make([]string, y)
+		for jj := 0; jj < y; jj++ {
+			var z string
+			aa := buff.ReadString() // read string
+			z = aa
 
 
-			y[jj] = aa
+			x[jj] = z
 		}
 		}
-		target.Errors = y
+		target.Errors = x
 		// --- [end][read][slice]([]string) ---
 		// --- [end][read][slice]([]string) ---
 
 
 	}
 	}
@@ -1447,8 +1439,8 @@ func (target *Cloud) UnmarshalBinary(data []byte) (err error) {
 		a = nil
 		a = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]string) ---
 		// --- [begin][read][map](map[string]string) ---
+		b := make(map[string]string)
 		c := buff.ReadInt() // map len
 		c := buff.ReadInt() // map len
-		b := make(map[string]string, c)
 		for i := 0; i < c; i++ {
 		for i := 0; i < c; i++ {
 			var k string
 			var k string
 			d := buff.ReadString() // read string
 			d := buff.ReadString() // read string
@@ -1622,8 +1614,8 @@ func (target *ClusterManagement) UnmarshalBinary(data []byte) (err error) {
 		a = nil
 		a = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]string) ---
 		// --- [begin][read][map](map[string]string) ---
+		b := make(map[string]string)
 		c := buff.ReadInt() // map len
 		c := buff.ReadInt() // map len
-		b := make(map[string]string, c)
 		for i := 0; i < c; i++ {
 		for i := 0; i < c; i++ {
 			var k string
 			var k string
 			d := buff.ReadString() // read string
 			d := buff.ReadString() // read string
@@ -1808,8 +1800,8 @@ func (target *Disk) UnmarshalBinary(data []byte) (err error) {
 		a = nil
 		a = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]string) ---
 		// --- [begin][read][map](map[string]string) ---
+		b := make(map[string]string)
 		c := buff.ReadInt() // map len
 		c := buff.ReadInt() // map len
-		b := make(map[string]string, c)
 		for i := 0; i < c; i++ {
 		for i := 0; i < c; i++ {
 			var k string
 			var k string
 			d := buff.ReadString() // read string
 			d := buff.ReadString() // read string
@@ -2038,8 +2030,8 @@ func (target *LoadBalancer) UnmarshalBinary(data []byte) (err error) {
 		d = nil
 		d = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]string) ---
 		// --- [begin][read][map](map[string]string) ---
+		e := make(map[string]string)
 		f := buff.ReadInt() // map len
 		f := buff.ReadInt() // map len
-		e := make(map[string]string, f)
 		for i := 0; i < f; i++ {
 		for i := 0; i < f; i++ {
 			var k string
 			var k string
 			g := buff.ReadString() // read string
 			g := buff.ReadString() // read string
@@ -2232,8 +2224,8 @@ func (target *Network) UnmarshalBinary(data []byte) (err error) {
 		d = nil
 		d = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]string) ---
 		// --- [begin][read][map](map[string]string) ---
+		e := make(map[string]string)
 		f := buff.ReadInt() // map len
 		f := buff.ReadInt() // map len
-		e := make(map[string]string, f)
 		for i := 0; i < f; i++ {
 		for i := 0; i < f; i++ {
 			var k string
 			var k string
 			g := buff.ReadString() // read string
 			g := buff.ReadString() // read string
@@ -2463,8 +2455,8 @@ func (target *Node) UnmarshalBinary(data []byte) (err error) {
 		d = nil
 		d = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]string) ---
 		// --- [begin][read][map](map[string]string) ---
+		e := make(map[string]string)
 		f := buff.ReadInt() // map len
 		f := buff.ReadInt() // map len
-		e := make(map[string]string, f)
 		for i := 0; i < f; i++ {
 		for i := 0; i < f; i++ {
 			var k string
 			var k string
 			g := buff.ReadString() // read string
 			g := buff.ReadString() // read string
@@ -2689,8 +2681,8 @@ func (target *SharedAsset) UnmarshalBinary(data []byte) (err error) {
 		d = nil
 		d = nil
 	} else {
 	} else {
 		// --- [begin][read][map](map[string]string) ---
 		// --- [begin][read][map](map[string]string) ---
+		e := make(map[string]string)
 		f := buff.ReadInt() // map len
 		f := buff.ReadInt() // map len
-		e := make(map[string]string, f)
 		for i := 0; i < f; i++ {
 		for i := 0; i < f; i++ {
 			var k string
 			var k string
 			g := buff.ReadString() // read string
 			g := buff.ReadString() // read string