Explorar o código

Merge pull request #1618 from opencost/niko/node-labels

Apply node labels to allocation data
Niko Kovacevic %!s(int64=3) %!d(string=hai) anos
pai
achega
e8d632c4fa

+ 21 - 1
pkg/costmodel/allocation.go

@@ -43,6 +43,7 @@ const (
 	queryFmtNetInternetCostPerGiB    = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
 	queryFmtNetReceiveBytes          = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
 	queryFmtNetTransferBytes         = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
+	queryFmtNodeLabels               = `avg_over_time(kube_node_labels[%s])`
 	queryFmtNamespaceLabels          = `avg_over_time(kube_namespace_labels[%s])`
 	queryFmtNamespaceAnnotations     = `avg_over_time(kube_namespace_annotations[%s])`
 	queryFmtPodLabels                = `avg_over_time(kube_pod_labels[%s])`
@@ -393,6 +394,12 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, env.GetPromClusterLabel())
 	resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
 
+	var resChNodeLabels prom.QueryResultsChan
+	if env.GetAllocationNodeLabelsEnabled() {
+		queryNodeLabels := fmt.Sprintf(queryFmtNodeLabels, durStr)
+		resChNodeLabels = ctx.QueryAtTime(queryNodeLabels, end)
+	}
+
 	queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr)
 	resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
 
@@ -465,6 +472,12 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	resNetInternetGiB, _ := resChNetInternetGiB.Await()
 	resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
 
+	var resNodeLabels []*prom.QueryResult
+	if env.GetAllocationNodeLabelsEnabled() {
+		if env.GetAllocationNodeLabelsEnabled() {
+			resNodeLabels, _ = resChNodeLabels.Await()
+		}
+	}
 	resNamespaceLabels, _ := resChNamespaceLabels.Await()
 	resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
 	resPodLabels, _ := resChPodLabels.Await()
@@ -512,11 +525,18 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	// Other than that case, Allocations should be associated with pods by the
 	// above functions.
 
+	// At this point, we expect "Node" to be set by one of the above functions
+	// (e.g. applyCPUCoresAllocated, etc.) -- otherwise, node labels will fail
+	// to correctly apply to the pods.
+	var nodeLabels map[nodeKey]map[string]string
+	if env.GetAllocationNodeLabelsEnabled() {
+		nodeLabels = resToNodeLabels(resNodeLabels)
+	}
 	namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
 	podLabels := resToPodLabels(resPodLabels, podUIDKeyMap, ingestPodUID)
 	namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
 	podAnnotations := resToPodAnnotations(resPodAnnotations, podUIDKeyMap, ingestPodUID)
-	applyLabels(podMap, namespaceLabels, podLabels)
+	applyLabels(podMap, nodeLabels, namespaceLabels, podLabels)
 	applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
 
 	podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))

+ 61 - 4
pkg/costmodel/allocation_helpers.go

@@ -244,6 +244,7 @@ func applyCPUCoresAllocated(podMap map[podKey]*pod, resCPUCoresAllocated []*prom
 				continue
 			}
 			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }
@@ -301,6 +302,7 @@ func applyCPUCoresRequested(podMap map[podKey]*pod, resCPUCoresRequested []*prom
 				continue
 			}
 			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }
@@ -449,6 +451,7 @@ func applyRAMBytesAllocated(podMap map[podKey]*pod, resRAMBytesAllocated []*prom
 				continue
 			}
 			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }
@@ -503,6 +506,7 @@ func applyRAMBytesRequested(podMap map[podKey]*pod, resRAMBytesRequested []*prom
 				continue
 			}
 			pod.Allocations[container].Properties.Node = node
+			pod.Node = node
 		}
 	}
 }
@@ -766,6 +770,42 @@ func applyNetworkAllocation(podMap map[podKey]*pod, resNetworkGiB []*prom.QueryR
 	}
 }
 
+func resToNodeLabels(resNodeLabels []*prom.QueryResult) map[nodeKey]map[string]string {
+	nodeLabels := map[nodeKey]map[string]string{}
+
+	for _, res := range resNodeLabels {
+		nodeKey, err := resultNodeKey(res, env.GetPromClusterLabel(), "node")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := nodeLabels[nodeKey]; !ok {
+			nodeLabels[nodeKey] = map[string]string{}
+		}
+
+		for _, rawK := range env.GetAllocationNodeLabelsIncludeList() {
+			labels := res.GetLabels()
+
+			// Sanitize the given label name to match Prometheus formatting
+			// e.g. topology.kubernetes.io/zone => topology_kubernetes_io_zone
+			k := prom.SanitizeLabelName(rawK)
+			if v, ok := labels[k]; ok {
+				nodeLabels[nodeKey][k] = v
+				continue
+			}
+
+			// Try with the "label_" prefix, if not found
+			// e.g. topology_kubernetes_io_zone => label_topology_kubernetes_io_zone
+			k = fmt.Sprintf("label_%s", k)
+			if v, ok := labels[k]; ok {
+				nodeLabels[nodeKey][k] = v
+			}
+		}
+	}
+
+	return nodeLabels
+}
+
 func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
 	namespaceLabels := map[namespaceKey]map[string]string{}
 
@@ -878,21 +918,34 @@ func resToPodAnnotations(resPodAnnotations []*prom.QueryResult, podUIDKeyMap map
 	return podAnnotations
 }
 
-func applyLabels(podMap map[podKey]*pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
+func applyLabels(podMap map[podKey]*pod, nodeLabels map[nodeKey]map[string]string, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
 	for podKey, pod := range podMap {
 		for _, alloc := range pod.Allocations {
 			allocLabels := alloc.Properties.Labels
 			if allocLabels == nil {
 				allocLabels = make(map[string]string)
 			}
-			// Apply namespace labels first, then pod labels so that pod labels
-			// overwrite namespace labels.
-			nsKey := podKey.namespaceKey // newNamespaceKey(podKey.Cluster, podKey.Namespace)
+
+			// Apply node labels first, then namespace labels, then pod labels
+			// so that pod labels overwrite namespace labels, which overwrite
+			// node labels.
+
+			if nodeLabels != nil {
+				nodeKey := newNodeKey(pod.Key.Cluster, pod.Node)
+				if labels, ok := nodeLabels[nodeKey]; ok {
+					for k, v := range labels {
+						allocLabels[k] = v
+					}
+				}
+			}
+
+			nsKey := podKey.namespaceKey
 			if labels, ok := namespaceLabels[nsKey]; ok {
 				for k, v := range labels {
 					allocLabels[k] = v
 				}
 			}
+
 			if labels, ok := podLabels[podKey]; ok {
 				for k, v := range labels {
 					allocLabels[k] = v
@@ -2008,6 +2061,8 @@ func getUnmountedPodForCluster(window kubecost.Window, podMap map[podKey]*pod, c
 		thisPod.Allocations[container].Properties.Pod = podName
 		thisPod.Allocations[container].Properties.Container = container
 
+		thisPod.Node = node
+
 		podMap[thisPodKey] = thisPod
 	}
 	return thisPod
@@ -2039,6 +2094,8 @@ func getUnmountedPodForNamespace(window kubecost.Window, podMap map[podKey]*pod,
 		thisPod.Allocations[container].Properties.Pod = podName
 		thisPod.Allocations[container].Properties.Container = container
 
+		thisPod.Node = node
+
 		podMap[thisPodKey] = thisPod
 	}
 	return thisPod

+ 3 - 1
pkg/costmodel/allocation_types.go

@@ -2,8 +2,9 @@ package costmodel
 
 import (
 	"fmt"
-	"github.com/opencost/opencost/pkg/kubecost"
 	"time"
+
+	"github.com/opencost/opencost/pkg/kubecost"
 )
 
 // pod describes a running pod's start and end time within a Window and
@@ -13,6 +14,7 @@ type pod struct {
 	Start       time.Time
 	End         time.Time
 	Key         podKey
+	Node        string
 	Allocations map[string]*kubecost.Allocation
 }
 

+ 1 - 1
pkg/costmodel/key.go

@@ -2,9 +2,9 @@ package costmodel
 
 import (
 	"fmt"
-	"github.com/opencost/opencost/pkg/kubecost"
 
 	"github.com/opencost/opencost/pkg/env"
+	"github.com/opencost/opencost/pkg/kubecost"
 	"github.com/opencost/opencost/pkg/prom"
 )
 

+ 32 - 0
pkg/env/costmodelenv.go

@@ -92,6 +92,9 @@ const (
 	IngestPodUIDEnvVar = "INGEST_POD_UID"
 
 	ETLReadOnlyMode = "ETL_READ_ONLY"
+
+	AllocationNodeLabelsEnabled     = "ALLOCATION_NODE_LABELS_ENABLED"
+	AllocationNodeLabelsIncludeList = "ALLOCATION_NODE_LABELS_INCLUDE_LIST"
 )
 
 var offsetRegex = regexp.MustCompile(`^(\+|-)(\d\d):(\d\d)$`)
@@ -503,3 +506,32 @@ func GetPromClusterLabel() string {
 func IsIngestingPodUID() bool {
 	return GetBool(IngestPodUIDEnvVar, false)
 }
+
+func GetAllocationNodeLabelsEnabled() bool {
+	return GetBool(AllocationNodeLabelsEnabled, true)
+}
+
+var defaultAllocationNodeLabelsIncludeList []string = []string{
+	"cloud.google.com/gke-nodepool",
+	"eks.amazonaws.com/nodegroup",
+	"kubernetes.azure.com/agentpool",
+	"node.kubernetes.io/instance-type",
+	"topology.kubernetes.io/region",
+	"topology.kubernetes.io/zone",
+}
+
+func GetAllocationNodeLabelsIncludeList() []string {
+	// If node labels are not enabled, return an empty list.
+	if !GetAllocationNodeLabelsEnabled() {
+		return []string{}
+	}
+
+	list := GetList(AllocationNodeLabelsIncludeList, ",")
+
+	// If node labels are enabled, but the white list is empty, use defaults.
+	if len(list) == 0 {
+		return defaultAllocationNodeLabelsIncludeList
+	}
+
+	return list
+}