Browse Source

Merge pull request #212 from kubecost/Bolt-deployments-services

Deployments and Services Considered Multicluster and for Deleted Pods
Ajay Tripathy 6 years ago
parent
commit
bc293af544
4 changed files with 607 additions and 413 deletions
  1. 236 241
      costmodel/costmodel.go
  2. 20 171
      costmodel/networkcosts.go
  3. 350 0
      costmodel/promparsers.go
  4. 1 1
      costmodel/router.go

+ 236 - 241
costmodel/costmodel.go

@@ -150,9 +150,12 @@ const (
 						* 
 						on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
 				sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id)`
-	queryPVCAllocation        = `pod_pvc_allocation`
-	queryPVHourlyCost         = `avg_over_time(pv_hourly_cost[24h])`
-	queryNSLabels             = `avg_over_time(kube_namespace_labels[24h])`
+	queryPVCAllocation        = `avg_over_time(pod_pvc_allocation[%s])`
+	queryPVHourlyCost         = `avg_over_time(pv_hourly_cost[%s])`
+	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
+	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
+	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
+	queryServiceLabels        = `avg_over_time(service_selector_labels[%s])`
 	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
@@ -347,12 +350,12 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	go func() {
 		defer wg.Done()
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist, clusterID)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist, clusterID)
 		if k8sErr != nil {
 			return
 		}
@@ -393,7 +396,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		}
 	}
 
-	networkUsageMap, err := GetNetworkUsageData(resultNetZoneRequests, resultNetRegionRequests, resultNetInternetRequests, clusterID, false)
+	networkUsageMap, err := GetNetworkUsageData(resultNetZoneRequests, resultNetRegionRequests, resultNetInternetRequests, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Network Cost Data: %s", err.Error())
 		networkUsageMap = make(map[string]*NetworkUsageData)
@@ -477,9 +480,12 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 			if _, ok := nodes[nodeName]; ok {
 				nodeData = nodes[nodeName]
 			}
+
+			nsKey := ns + "," + clusterID
+
 			var podDeployments []string
-			if _, ok := podDeploymentsMapping[ns]; ok {
-				if ds, ok := podDeploymentsMapping[ns][pod.GetObjectMeta().GetName()]; ok {
+			if _, ok := podDeploymentsMapping[nsKey]; ok {
+				if ds, ok := podDeploymentsMapping[nsKey][pod.GetObjectMeta().GetName()]; ok {
 					podDeployments = ds
 				} else {
 					podDeployments = []string{}
@@ -508,8 +514,8 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 			}
 
 			var podServices []string
-			if _, ok := podServicesMapping[ns]; ok {
-				if svcs, ok := podServicesMapping[ns][pod.GetObjectMeta().GetName()]; ok {
+			if _, ok := podServicesMapping[nsKey]; ok {
+				if svcs, ok := podServicesMapping[nsKey][pod.GetObjectMeta().GetName()]; ok {
 					podServices = svcs
 				} else {
 					podServices = []string{}
@@ -1082,25 +1088,25 @@ func getNodeCost(cache ClusterCache, cp costAnalyzerCloud.Provider) (map[string]
 	return nodes, nil
 }
 
-func getPodServices(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+func getPodServices(cache ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 	servicesList := cache.GetAllServices()
 	podServicesMapping := make(map[string]map[string][]string)
 	for _, service := range servicesList {
 		namespace := service.GetObjectMeta().GetNamespace()
 		name := service.GetObjectMeta().GetName()
-
-		if _, ok := podServicesMapping[namespace]; !ok {
-			podServicesMapping[namespace] = make(map[string][]string)
+		key := namespace + "," + clusterID
+		if _, ok := podServicesMapping[key]; !ok {
+			podServicesMapping[key] = make(map[string][]string)
 		}
 		s := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
 		for _, pod := range podList {
 			labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
 			if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
-				services, ok := podServicesMapping[namespace][pod.GetObjectMeta().GetName()]
+				services, ok := podServicesMapping[key][pod.GetObjectMeta().GetName()]
 				if ok {
-					podServicesMapping[namespace][pod.GetObjectMeta().GetName()] = append(services, name)
+					podServicesMapping[key][pod.GetObjectMeta().GetName()] = append(services, name)
 				} else {
-					podServicesMapping[namespace][pod.GetObjectMeta().GetName()] = []string{name}
+					podServicesMapping[key][pod.GetObjectMeta().GetName()] = []string{name}
 				}
 			}
 		}
@@ -1108,14 +1114,16 @@ func getPodServices(cache ClusterCache, podList []*v1.Pod) (map[string]map[strin
 	return podServicesMapping, nil
 }
 
-func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+func getPodDeployments(cache ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 	deploymentsList := cache.GetAllDeployments()
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
 	for _, deployment := range deploymentsList {
 		namespace := deployment.GetObjectMeta().GetNamespace()
 		name := deployment.GetObjectMeta().GetName()
-		if _, ok := podDeploymentsMapping[namespace]; !ok {
-			podDeploymentsMapping[namespace] = make(map[string][]string)
+
+		key := namespace + "," + clusterID
+		if _, ok := podDeploymentsMapping[key]; !ok {
+			podDeploymentsMapping[key] = make(map[string][]string)
 		}
 		s, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
 		if err != nil {
@@ -1124,18 +1132,102 @@ func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[st
 		for _, pod := range podList {
 			labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
 			if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
-				deployments, ok := podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()]
+				deployments, ok := podDeploymentsMapping[key][pod.GetObjectMeta().GetName()]
+				if ok {
+					podDeploymentsMapping[key][pod.GetObjectMeta().GetName()] = append(deployments, name)
+				} else {
+					podDeploymentsMapping[key][pod.GetObjectMeta().GetName()] = []string{name}
+				}
+			}
+		}
+	}
+	return podDeploymentsMapping, nil
+}
+
+func getPodDeploymentsWithMetrics(deploymentLabels map[string]map[string]string, podLabels map[string]map[string]string) (map[string]map[string][]string, error) {
+	podDeploymentsMapping := make(map[string]map[string][]string)
+
+	for depKey, depLabels := range deploymentLabels {
+		kt, err := NewKeyTuple(depKey)
+		if err != nil {
+			continue
+		}
+
+		namespace := kt.Namespace
+		name := kt.Key
+		clusterID := kt.ClusterID
+
+		key := namespace + "," + clusterID
+		if _, ok := podDeploymentsMapping[key]; !ok {
+			podDeploymentsMapping[key] = make(map[string][]string)
+		}
+		s := labels.Set(depLabels).AsSelectorPreValidated()
+		for podKey, pLabels := range podLabels {
+			pkey, err := NewKeyTuple(podKey)
+			if err != nil {
+				continue
+			}
+			podNamespace := pkey.Namespace
+			podName := pkey.Key
+			podClusterID := pkey.ClusterID
+
+			labelSet := labels.Set(pLabels)
+			if s.Matches(labelSet) && podNamespace == namespace && podClusterID == clusterID {
+				deployments, ok := podDeploymentsMapping[key][podName]
 				if ok {
-					podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()] = append(deployments, name)
+					podDeploymentsMapping[key][podName] = append(deployments, name)
 				} else {
-					podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()] = []string{name}
+					podDeploymentsMapping[key][podName] = []string{name}
 				}
 			}
 		}
 	}
+
 	return podDeploymentsMapping, nil
 }
 
+func getPodServicesWithMetrics(serviceLabels map[string]map[string]string, podLabels map[string]map[string]string) (map[string]map[string][]string, error) {
+	podServicesMapping := make(map[string]map[string][]string)
+
+	for servKey, servLabels := range serviceLabels {
+		kt, err := NewKeyTuple(servKey)
+		if err != nil {
+			continue
+		}
+
+		namespace := kt.Namespace
+		name := kt.Key
+		clusterID := kt.ClusterID
+
+		key := namespace + "," + clusterID
+		if _, ok := podServicesMapping[key]; !ok {
+			podServicesMapping[key] = make(map[string][]string)
+		}
+		s := labels.Set(servLabels).AsSelectorPreValidated()
+
+		for podKey, pLabels := range podLabels {
+			pkey, err := NewKeyTuple(podKey)
+			if err != nil {
+				continue
+			}
+			podNamespace := pkey.Namespace
+			podName := pkey.Key
+			podClusterID := pkey.ClusterID
+
+			labelSet := labels.Set(pLabels)
+			if s.Matches(labelSet) && podNamespace == namespace && podClusterID == clusterID {
+				services, ok := podServicesMapping[key][podName]
+				if ok {
+					podServicesMapping[key][podName] = append(services, name)
+				} else {
+					podServicesMapping[key][podName] = []string{name}
+				}
+			}
+		}
+	}
+	return podServicesMapping, nil
+}
+
 func costDataPassesFilters(costs *CostData, namespace string, cluster string) bool {
 	passesNamespace := namespace == "" || costs.Namespace == namespace
 	passesCluster := cluster == "" || costs.ClusterID == cluster
@@ -1184,7 +1276,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	}
 
 	var wg sync.WaitGroup
-	wg.Add(14)
+	wg.Add(17)
 
 	var promErr error
 	var resultRAMRequests interface{}
@@ -1234,17 +1326,32 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	}()
 	var pvPodAllocationResults interface{}
 	go func() {
-		pvPodAllocationResults, promErr = QueryRange(cli, queryPVCAllocation, start, end, window)
+		pvPodAllocationResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVCAllocation, windowString), start, end, window)
 		defer wg.Done()
 	}()
 	var pvCostResults interface{}
 	go func() {
-		pvCostResults, promErr = QueryRange(cli, queryPVHourlyCost, start, end, window)
+		pvCostResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVHourlyCost, windowString), start, end, window)
 		defer wg.Done()
 	}()
 	var nsLabelsResults interface{}
 	go func() {
-		nsLabelsResults, promErr = Query(cli, queryNSLabels)
+		nsLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryNSLabels, windowString), start, end, window)
+		defer wg.Done()
+	}()
+	var podLabelsResults interface{}
+	go func() {
+		podLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodLabels, windowString), start, end, window)
+		defer wg.Done()
+	}()
+	var serviceLabelsResults interface{}
+	go func() {
+		serviceLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
+		defer wg.Done()
+	}()
+	var deploymentLabelsResults interface{}
+	go func() {
+		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
 		defer wg.Done()
 	}()
 	var normalizationResults interface{}
@@ -1261,12 +1368,12 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	go func() {
 		defer wg.Done()
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist, clusterID)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist, clusterID)
 		if k8sErr != nil {
 			return
 		}
@@ -1308,12 +1415,12 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		}
 	}
 
-	pvCostMapping, err := getPVCostMetrics(pvCostResults, clusterID)
+	pvCostMapping, err := GetPVCostMetrics(pvCostResults, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get PV Hourly Cost Data: %s", err.Error())
 	}
 
-	pvAllocationMapping, err := getPVAllocationMetrics(pvPodAllocationResults, clusterID)
+	pvAllocationMapping, err := GetPVAllocationMetrics(pvPodAllocationResults, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get PV Allocation Cost Data: %s", err.Error())
 	}
@@ -1321,7 +1428,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		addMetricPVData(pvAllocationMapping, pvCostMapping, cp)
 	}
 
-	nsLabels, err := getNamespaceLabelsMetrics(nsLabelsResults)
+	nsLabels, err := GetNamespaceLabelsMetrics(nsLabelsResults, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Namespace Labels for Metrics: %s", err.Error())
 	}
@@ -1329,7 +1436,34 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		appendNamespaceLabels(namespaceLabelsMapping, nsLabels)
 	}
 
-	networkUsageMap, err := GetNetworkUsageData(resultNetZoneRequests, resultNetRegionRequests, resultNetInternetRequests, clusterID, true)
+	podLabels, err := GetPodLabelsMetrics(podLabelsResults, clusterID)
+	if err != nil {
+		klog.V(1).Infof("Unable to get Pod Labels for Metrics: %s", err.Error())
+	}
+
+	serviceLabels, err := GetServiceSelectorLabelsMetrics(serviceLabelsResults, clusterID)
+	if err != nil {
+		klog.V(1).Infof("Unable to get Service Selector Labels for Metrics: %s", err.Error())
+	}
+
+	deploymentLabels, err := GetDeploymentMatchLabelsMetrics(deploymentLabelsResults, clusterID)
+	if err != nil {
+		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
+	}
+
+	podDeploymentsMetricsMapping, err := getPodDeploymentsWithMetrics(deploymentLabels, podLabels)
+	if err != nil {
+		klog.V(1).Infof("Unable to get match Deployment Labels Metrics to Pods: %s", err.Error())
+	}
+	appendLabelsList(podDeploymentsMapping, podDeploymentsMetricsMapping)
+
+	podServicesMetricsMapping, err := getPodServicesWithMetrics(serviceLabels, podLabels)
+	if err != nil {
+		klog.V(1).Infof("Unable to get match Service Labels Metrics to Pods: %s", err.Error())
+	}
+	appendLabelsList(podServicesMapping, podServicesMetricsMapping)
+
+	networkUsageMap, err := GetNetworkUsageData(resultNetZoneRequests, resultNetRegionRequests, resultNetInternetRequests, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Network Cost Data: %s", err.Error())
 		networkUsageMap = make(map[string]*NetworkUsageData)
@@ -1404,9 +1538,11 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 			if _, ok := nodes[nodeName]; ok {
 				nodeData = nodes[nodeName]
 			}
+
+			nsKey := ns + "," + clusterID
 			var podDeployments []string
-			if _, ok := podDeploymentsMapping[ns]; ok {
-				if ds, ok := podDeploymentsMapping[ns][pod.GetObjectMeta().GetName()]; ok {
+			if _, ok := podDeploymentsMapping[nsKey]; ok {
+				if ds, ok := podDeploymentsMapping[nsKey][pod.GetObjectMeta().GetName()]; ok {
 					podDeployments = ds
 				} else {
 					podDeployments = []string{}
@@ -1435,15 +1571,15 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 			}
 
 			var podServices []string
-			if _, ok := podServicesMapping[ns]; ok {
-				if svcs, ok := podServicesMapping[ns][pod.GetObjectMeta().GetName()]; ok {
+			if _, ok := podServicesMapping[nsKey]; ok {
+				if svcs, ok := podServicesMapping[nsKey][pod.GetObjectMeta().GetName()]; ok {
 					podServices = svcs
 				} else {
 					podServices = []string{}
 				}
 			}
 
-			nsLabels := namespaceLabelsMapping[ns+","+clusterID]
+			nsLabels := namespaceLabelsMapping[nsKey]
 			podLabels := pod.GetObjectMeta().GetLabels()
 
 			if podLabels == nil {
@@ -1563,18 +1699,49 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 					missingNodes[c.NodeName] = node
 				}
 			}
-			namespacelabels, ok := namespaceLabelsMapping[c.Namespace+","+c.ClusterID]
+
+			nsKey := c.Namespace + "," + c.ClusterID
+			podKey := c.Namespace + "," + c.PodName + "," + c.ClusterID
+
+			namespaceLabels, ok := namespaceLabelsMapping[nsKey]
 			if !ok {
 				klog.V(3).Infof("Missing data for namespace %s", c.Namespace)
 			}
 
+			pLabels := podLabels[podKey]
+			if pLabels == nil {
+				pLabels = make(map[string]string)
+			}
+
+			for k, v := range namespaceLabels {
+				pLabels[k] = v
+			}
+
+			var podDeployments []string
+			if _, ok := podDeploymentsMapping[nsKey]; ok {
+				if ds, ok := podDeploymentsMapping[nsKey][c.PodName]; ok {
+					podDeployments = ds
+				} else {
+					podDeployments = []string{}
+				}
+			}
+
+			var podServices []string
+			if _, ok := podServicesMapping[nsKey]; ok {
+				if svcs, ok := podServicesMapping[nsKey][c.PodName]; ok {
+					podServices = svcs
+				} else {
+					podServices = []string{}
+				}
+			}
+
 			var podPVs []*PersistentVolumeClaimData
 			var podNetCosts []*Vector
 
 			// For PVC data, we'll need to find the claim mapping and cost data. Will need to append
 			// cost data since that was populated by cluster data previously. We do this with
 			// the pod_pvc_allocation metric
-			podPVData, ok := pvAllocationMapping[c.Namespace+","+c.PodName+","+c.ClusterID]
+			podPVData, ok := pvAllocationMapping[podKey]
 			if !ok {
 				klog.V(4).Infof("Failed to locate pv allocation mapping for missing pod.")
 			}
@@ -1582,7 +1749,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 			// For network costs, we'll use existing map since it should still contain the
 			// correct data.
 			var podNetworkCosts []*Vector
-			if usage, ok := networkUsageMap[c.Namespace+","+c.PodName+","+c.ClusterID]; ok {
+			if usage, ok := networkUsageMap[podKey]; ok {
 				netCosts, err := GetNetworkCost(usage, cp)
 				if err != nil {
 					klog.V(3).Infof("Error pulling network costs: %s", err.Error())
@@ -1593,7 +1760,6 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 
 			// Check to see if any other data has been recorded for this namespace, pod, clusterId
 			// Follow the pattern of only allowing claims data per pod
-			podKey := c.Namespace + "," + c.PodName + "," + c.ClusterID
 			if !otherClusterPVRecorded[podKey] {
 				otherClusterPVRecorded[podKey] = true
 
@@ -1607,13 +1773,15 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 				NodeName:        c.NodeName,
 				NodeData:        node,
 				Namespace:       c.Namespace,
+				Services:        podServices,
+				Deployments:     podDeployments,
 				RAMReq:          RAMReqV,
 				RAMUsed:         RAMUsedV,
 				CPUReq:          CPUReqV,
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
-				Labels:          namespacelabels,
-				NamespaceLabels: namespacelabels,
+				Labels:          pLabels,
+				NamespaceLabels: namespaceLabels,
 				PVCData:         podPVs,
 				NetworkData:     podNetCosts,
 				ClusterID:       c.ClusterID,
@@ -1645,145 +1813,6 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return containerNameCost, err
 }
 
-func parseStringField(metricMap map[string]interface{}, field string) (string, error) {
-	f, ok := metricMap[field]
-	if !ok {
-		return "", fmt.Errorf("%s field does not exist in data result vector", field)
-	}
-
-	strField, ok := f.(string)
-	if !ok {
-		return "", fmt.Errorf("%s field is improperly formatted", field)
-	}
-
-	return strField, nil
-}
-
-func getPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
-	toReturn := make(map[string][]*PersistentVolumeClaimData)
-	data, ok := queryResult.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(queryResult)
-		if err != nil {
-			return toReturn, err
-		}
-		return toReturn, fmt.Errorf(e)
-	}
-
-	vectors := []*Vector{}
-	for _, val := range data.(map[string]interface{})["result"].([]interface{}) {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return toReturn, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return toReturn, fmt.Errorf("Metric field is improperly formatted")
-		}
-
-		clusterID, err := parseStringField(metricMap, "cluster_id")
-		if clusterID == "" {
-			clusterID = defaultClusterID
-		}
-
-		ns, err := parseStringField(metricMap, "namespace")
-		if err != nil {
-			return toReturn, err
-		}
-
-		pod, err := parseStringField(metricMap, "pod")
-		if err != nil {
-			return toReturn, err
-		}
-
-		pvcName, err := parseStringField(metricMap, "persistentvolumeclaim")
-		if err != nil {
-			return toReturn, err
-		}
-
-		pvName, err := parseStringField(metricMap, "persistentvolume")
-		if err != nil {
-			return toReturn, err
-		}
-		values, ok := val.(map[string]interface{})["values"].([]interface{})
-		for _, d := range values {
-			dataPoint := d.([]interface{})
-			if !ok || len(dataPoint) != 2 {
-				return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-			}
-			strVal := dataPoint[1].(string)
-			v, _ := strconv.ParseFloat(strVal, 64)
-			vectors = append(vectors, &Vector{
-				Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
-				Value:     v,
-			})
-		}
-
-		key := fmt.Sprintf("%s,%s,%s", ns, pod, clusterID)
-		pvcData := &PersistentVolumeClaimData{
-			Class:      "",
-			Claim:      pvcName,
-			Namespace:  ns,
-			ClusterID:  clusterID,
-			VolumeName: pvName,
-			Values:     vectors,
-		}
-
-		toReturn[key] = append(toReturn[key], pvcData)
-	}
-
-	return toReturn, nil
-}
-
-func getPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
-	toReturn := make(map[string]*costAnalyzerCloud.PV)
-	data, ok := queryResult.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(queryResult)
-		if err != nil {
-			return toReturn, err
-		}
-		return toReturn, fmt.Errorf(e)
-	}
-
-	for _, val := range data.(map[string]interface{})["result"].([]interface{}) {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return toReturn, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return toReturn, fmt.Errorf("Metric field is improperly formatted")
-		}
-
-		clusterID, err := parseStringField(metricMap, "cluster_id")
-		if clusterID == "" {
-			clusterID = defaultClusterID
-		}
-
-		volumeName, err := parseStringField(metricMap, "volumename")
-		if err != nil {
-			return toReturn, err
-		}
-
-		dataPoint, ok := val.(map[string]interface{})["value"]
-		if !ok {
-			return toReturn, fmt.Errorf("Value field does not exist in data result vector")
-		}
-		value, ok := dataPoint.([]interface{})
-		if !ok || len(value) != 2 {
-			return toReturn, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-		}
-
-		key := fmt.Sprintf("%s,%s", volumeName, clusterID)
-		toReturn[key] = &costAnalyzerCloud.PV{
-			Cost: value[1].(string),
-		}
-	}
-
-	return toReturn, nil
-}
-
 func addMetricPVData(pvAllocationMap map[string][]*PersistentVolumeClaimData, pvCostMap map[string]*costAnalyzerCloud.PV, cp costAnalyzerCloud.Provider) {
 	cfg, err := cp.GetConfig()
 	if err != nil {
@@ -1808,64 +1837,6 @@ func addMetricPVData(pvAllocationMap map[string][]*PersistentVolumeClaimData, pv
 	}
 }
 
-func getNamespaceLabelsMetrics(queryResult interface{}) (map[string]map[string]string, error) {
-	toReturn := make(map[string]map[string]string)
-	data, ok := queryResult.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(queryResult)
-		if err != nil {
-			return toReturn, err
-		}
-		return toReturn, fmt.Errorf(e)
-	}
-
-	for _, val := range data.(map[string]interface{})["result"].([]interface{}) {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return toReturn, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return toReturn, fmt.Errorf("Metric field is improperly formatted")
-		}
-
-		// We want Namespace and ClusterID for key generation purposes
-		ns, err := parseStringField(metricMap, "namespace")
-		if err != nil {
-			return toReturn, err
-		}
-
-		clusterID, err := parseStringField(metricMap, "cluster_id")
-		if err != nil {
-			return toReturn, err
-		}
-
-		nsKey := ns + "," + clusterID
-
-		// Find All keys with prefix label_, remove prefix, add to labels
-		for k, v := range metricMap {
-			if !strings.HasPrefix(k, "label_") {
-				continue
-			}
-
-			label := k[6:]
-			value, ok := v.(string)
-			if !ok {
-				klog.V(3).Infof("Failed to parse label value for label: %s", label)
-				continue
-			}
-
-			if toReturn[nsKey] == nil {
-				toReturn[nsKey] = make(map[string]string)
-			}
-
-			toReturn[nsKey][label] = value
-		}
-	}
-
-	return toReturn, nil
-}
-
 // Append labels into nsLabels iff the ns key doesn't already exist
 func appendNamespaceLabels(nsLabels map[string]map[string]string, labels map[string]map[string]string) {
 	for k, v := range labels {
@@ -1875,6 +1846,12 @@ func appendNamespaceLabels(nsLabels map[string]map[string]string, labels map[str
 	}
 }
 
+func appendLabelsList(mainLabels map[string]map[string][]string, labels map[string]map[string][]string) {
+	for k, v := range labels {
+		mainLabels[k] = v
+	}
+}
+
 func getNamespaceLabels(cache ClusterCache, clusterID string) (map[string]map[string]string, error) {
 	nsToLabels := make(map[string]map[string]string)
 	nss := cache.GetAllNamespaces()
@@ -2400,6 +2377,24 @@ func newContainerMetricFromPrometheus(metrics map[string]interface{}, defaultClu
 	}, nil
 }
 
+type KeyTuple struct {
+	Namespace string
+	Key       string
+	ClusterID string
+}
+
+func NewKeyTuple(key string) (*KeyTuple, error) {
+	r := strings.Split(key, ",")
+	if len(r) != 3 {
+		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
+	}
+	return &KeyTuple{
+		Namespace: r[0],
+		Key:       r[1],
+		ClusterID: r[2],
+	}, nil
+}
+
 func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*Vector, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {

+ 20 - 171
costmodel/networkcosts.go

@@ -1,10 +1,6 @@
 package costmodel
 
 import (
-	"fmt"
-	"math"
-	"strconv"
-
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	"k8s.io/klog"
 )
@@ -29,26 +25,18 @@ type NetworkUsageVector struct {
 
 // GetNetworkUsageData performs a join of the the results of zone, region, and internet usage queries to return a single
 // map containing network costs for each namespace+pod
-func GetNetworkUsageData(zr interface{}, rr interface{}, ir interface{}, defaultClusterID string, isRange bool) (map[string]*NetworkUsageData, error) {
-	var vectorFn func(interface{}, string) (map[string]*NetworkUsageVector, error)
-
-	if isRange {
-		vectorFn = getNetworkUsageVectors
-	} else {
-		vectorFn = getNetworkUsageVector
-	}
-
-	zoneNetworkMap, err := vectorFn(zr, defaultClusterID)
+func GetNetworkUsageData(zr interface{}, rr interface{}, ir interface{}, defaultClusterID string) (map[string]*NetworkUsageData, error) {
+	zoneNetworkMap, err := getNetworkUsage(zr, defaultClusterID)
 	if err != nil {
 		return nil, err
 	}
 
-	regionNetworkMap, err := vectorFn(rr, defaultClusterID)
+	regionNetworkMap, err := getNetworkUsage(rr, defaultClusterID)
 	if err != nil {
 		return nil, err
 	}
 
-	internetNetworkMap, err := vectorFn(ir, defaultClusterID)
+	internetNetworkMap, err := getNetworkUsage(ir, defaultClusterID)
 	if err != nil {
 		return nil, err
 	}
@@ -147,175 +135,36 @@ func GetNetworkCost(usage *NetworkUsageData, cloud costAnalyzerCloud.Provider) (
 	return results, nil
 }
 
-func getNetworkUsageVector(qr interface{}, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
+func getNetworkUsage(qr interface{}, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
 	ncdmap := make(map[string]*NetworkUsageVector)
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
-		if err != nil {
-			return nil, err
-		}
-		return nil, fmt.Errorf(e)
-	}
-	d, ok := data.(map[string]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
-	}
-	result, ok := d["result"]
-	if !ok {
-		return nil, fmt.Errorf("Result field not present in prometheus response")
-	}
-	results, ok := result.([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
+	result, err := NewQueryResults(qr)
+	if err != nil {
+		return nil, err
 	}
-	for _, val := range results {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
-		}
-		podName, ok := metricMap["pod_name"]
-		if !ok {
-			return nil, fmt.Errorf("Pod Name does not exist in data result vector")
-		}
-		podNameStr, ok := podName.(string)
-		if !ok {
-			return nil, fmt.Errorf("Pod Name field improperly formatted")
-		}
-		namespace, ok := metricMap["namespace"]
-		if !ok {
-			return nil, fmt.Errorf("Namespace field does not exist in data result vector")
-		}
-		namespaceStr, ok := namespace.(string)
-		if !ok {
-			return nil, fmt.Errorf("Namespace field improperly formatted")
-		}
-		cid, ok := metricMap["cluster_id"]
-		if !ok {
-			klog.V(4).Info("Prometheus vector does not have cluster id")
-			cid = defaultClusterID
-		}
-		clusterID, ok := cid.(string)
-		if !ok {
-			return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
-		}
-		dataPoint, ok := val.(map[string]interface{})["value"]
-		if !ok {
-			return nil, fmt.Errorf("Value field does not exist in data result vector")
-		}
-		value, ok := dataPoint.([]interface{})
-		if !ok || len(value) != 2 {
-			return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-		}
-		var vectors []*Vector
-		strVal := value[1].(string)
-		v, err := strconv.ParseFloat(strVal, 64)
+
+	for _, val := range result {
+		podName, err := val.GetString("pod_name")
 		if err != nil {
 			return nil, err
 		}
 
-		vectors = append(vectors, &Vector{
-			Timestamp: value[0].(float64),
-			Value:     v,
-		})
-
-		key := namespaceStr + "," + podNameStr + "," + clusterID
-		ncdmap[key] = &NetworkUsageVector{
-			ClusterID: clusterID,
-			Namespace: namespaceStr,
-			PodName:   podNameStr,
-			Values:    vectors,
-		}
-	}
-	return ncdmap, nil
-}
-
-func getNetworkUsageVectors(qr interface{}, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
-	ncdmap := make(map[string]*NetworkUsageVector)
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
+		namespace, err := val.GetString("namespace")
 		if err != nil {
 			return nil, err
 		}
-		return nil, fmt.Errorf(e)
-	}
-	d, ok := data.(map[string]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
-	}
-	result, ok := d["result"]
-	if !ok {
-		return nil, fmt.Errorf("Result field not present in prometheus response")
-	}
-	results, ok := result.([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
-	}
-	for _, val := range results {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
-		}
-		podName, ok := metricMap["pod_name"]
-		if !ok {
-			return nil, fmt.Errorf("Pod Name does not exist in data result vector")
-		}
-		podNameStr, ok := podName.(string)
-		if !ok {
-			return nil, fmt.Errorf("Pod Name field improperly formatted")
-		}
-		namespace, ok := metricMap["namespace"]
-		if !ok {
-			return nil, fmt.Errorf("Namespace field does not exist in data result vector")
-		}
-		namespaceStr, ok := namespace.(string)
-		if !ok {
-			return nil, fmt.Errorf("Namespace field improperly formatted")
-		}
-		cid, ok := metricMap["cluster_id"]
-		if !ok {
-			klog.V(4).Info("Prometheus vector does not have cluster id")
-			cid = defaultClusterID
-		}
-		clusterID, ok := cid.(string)
-		if !ok {
-			return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
-		}
-
-		values, ok := val.(map[string]interface{})["values"].([]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Values field is improperly formatted")
-		}
-		var vectors []*Vector
-		for _, value := range values {
-			dataPoint, ok := value.([]interface{})
-			if !ok || len(dataPoint) != 2 {
-				return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-			}
 
-			strVal := dataPoint[1].(string)
-			v, _ := strconv.ParseFloat(strVal, 64)
-			vectors = append(vectors, &Vector{
-				Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
-				Value:     v,
-			})
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			klog.V(4).Info("Prometheus vector does not have cluster id")
+			clusterID = defaultClusterID
 		}
 
-		key := namespaceStr + "," + podNameStr + "," + clusterID
+		key := namespace + "," + podName + "," + clusterID
 		ncdmap[key] = &NetworkUsageVector{
 			ClusterID: clusterID,
-			Namespace: namespaceStr,
-			PodName:   podNameStr,
-			Values:    vectors,
+			Namespace: namespace,
+			PodName:   podName,
+			Values:    val.Values,
 		}
 	}
 	return ncdmap, nil

+ 350 - 0
costmodel/promparsers.go

@@ -0,0 +1,350 @@
+package costmodel
+
+import (
+	"fmt"
+	"math"
+	"strconv"
+	"strings"
+
+	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
+	"k8s.io/klog"
+)
+
+// PromQueryResult contains a single result from a prometheus query
+type PromQueryResult struct {
+	Metric map[string]interface{}
+	Values []*Vector
+}
+
+func (pqr *PromQueryResult) GetString(field string) (string, error) {
+	f, ok := pqr.Metric[field]
+	if !ok {
+		return "", fmt.Errorf("%s field does not exist in data result vector", field)
+	}
+
+	strField, ok := f.(string)
+	if !ok {
+		return "", fmt.Errorf("%s field is improperly formatted", field)
+	}
+
+	return strField, nil
+}
+
+func (pqr *PromQueryResult) GetLabels() map[string]string {
+	result := make(map[string]string)
+
+	// Find All keys with prefix label_, remove prefix, add to labels
+	for k, v := range pqr.Metric {
+		if !strings.HasPrefix(k, "label_") {
+			continue
+		}
+
+		label := k[6:]
+		value, ok := v.(string)
+		if !ok {
+			klog.V(3).Infof("Failed to parse label value for label: %s", label)
+			continue
+		}
+
+		result[label] = value
+	}
+
+	return result
+}
+
+// NewQueryResults accepts the raw prometheus query result and returns an array of
+// PromQueryResult objects
+func NewQueryResults(queryResult interface{}) ([]*PromQueryResult, error) {
+	var result []*PromQueryResult
+
+	data, ok := queryResult.(map[string]interface{})["data"]
+	if !ok {
+		e, err := wrapPrometheusError(queryResult)
+		if err != nil {
+			return nil, err
+		}
+		return nil, fmt.Errorf(e)
+	}
+
+	// Deep Check for proper formatting
+	d, ok := data.(map[string]interface{})
+	if !ok {
+		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
+	}
+	resultData, ok := d["result"]
+	if !ok {
+		return nil, fmt.Errorf("Result field not present in prometheus response")
+	}
+	resultsData, ok := resultData.([]interface{})
+	if !ok {
+		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
+	}
+
+	// Scan Results
+	for _, val := range resultsData {
+		resultInterface, ok := val.(map[string]interface{})
+		if !ok {
+			return nil, fmt.Errorf("Result is improperly formatted")
+		}
+
+		metricInterface, ok := resultInterface["metric"]
+		if !ok {
+			return nil, fmt.Errorf("Metric field does not exist in data result vector")
+		}
+		metricMap, ok := metricInterface.(map[string]interface{})
+		if !ok {
+			return nil, fmt.Errorf("Metric field is improperly formatted")
+		}
+
+		// Determine if the result is a ranged data set or single value
+		_, isRange := resultInterface["values"]
+
+		var vectors []*Vector
+		if !isRange {
+			dataPoint, ok := resultInterface["value"]
+			if !ok {
+				return nil, fmt.Errorf("Value field does not exist in data result vector")
+			}
+
+			v, err := parseDataPoint(dataPoint)
+			if err != nil {
+				return nil, err
+			}
+			vectors = append(vectors, v)
+		} else {
+			values, ok := resultInterface["values"].([]interface{})
+			if !ok {
+				return nil, fmt.Errorf("Values field is improperly formatted")
+			}
+
+			for _, value := range values {
+				v, err := parseDataPoint(value)
+				if err != nil {
+					return nil, err
+				}
+
+				vectors = append(vectors, v)
+			}
+		}
+
+		result = append(result, &PromQueryResult{
+			Metric: metricMap,
+			Values: vectors,
+		})
+	}
+
+	return result, nil
+}
+
+func parseDataPoint(dataPoint interface{}) (*Vector, error) {
+	value, ok := dataPoint.([]interface{})
+	if !ok || len(value) != 2 {
+		return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+	}
+
+	strVal := value[1].(string)
+	v, err := strconv.ParseFloat(strVal, 64)
+	if err != nil {
+		return nil, err
+	}
+
+	return &Vector{
+		Timestamp: math.Round(value[0].(float64)/10) * 10,
+		Value:     v,
+	}, nil
+}
+
+func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
+	toReturn := make(map[string][]*PersistentVolumeClaimData)
+	result, err := NewQueryResults(queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		pod, err := val.GetString("pod")
+		if err != nil {
+			return toReturn, err
+		}
+
+		pvcName, err := val.GetString("persistentvolumeclaim")
+		if err != nil {
+			return toReturn, err
+		}
+
+		pvName, err := val.GetString("persistentvolume")
+		if err != nil {
+			return toReturn, err
+		}
+
+		key := fmt.Sprintf("%s,%s,%s", ns, pod, clusterID)
+		pvcData := &PersistentVolumeClaimData{
+			Class:      "",
+			Claim:      pvcName,
+			Namespace:  ns,
+			ClusterID:  clusterID,
+			VolumeName: pvName,
+			Values:     val.Values,
+		}
+
+		toReturn[key] = append(toReturn[key], pvcData)
+	}
+
+	return toReturn, nil
+}
+
+func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
+	toReturn := make(map[string]*costAnalyzerCloud.PV)
+	result, err := NewQueryResults(queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		volumeName, err := val.GetString("volumename")
+		if err != nil {
+			return toReturn, err
+		}
+
+		key := fmt.Sprintf("%s,%s", volumeName, clusterID)
+		toReturn[key] = &costAnalyzerCloud.PV{
+			Cost: fmt.Sprintf("%f", val.Values[0].Value),
+		}
+	}
+
+	return toReturn, nil
+}
+
+func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+	result, err := NewQueryResults(queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		// We want Namespace and ClusterID for key generation purposes
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		nsKey := ns + "," + clusterID
+		toReturn[nsKey] = val.GetLabels()
+	}
+
+	return toReturn, nil
+}
+
+func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+	result, err := NewQueryResults(queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		// We want Pod, Namespace and ClusterID for key generation purposes
+		pod, err := val.GetString("pod")
+		if err != nil {
+			return toReturn, err
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		nsKey := ns + "," + pod + "," + clusterID
+		toReturn[nsKey] = val.GetLabels()
+	}
+
+	return toReturn, nil
+}
+
+func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+	result, err := NewQueryResults(queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		// We want Deployment, Namespace and ClusterID for key generation purposes
+		deployment, err := val.GetString("deployment")
+		if err != nil {
+			return toReturn, err
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		nsKey := ns + "," + deployment + "," + clusterID
+		toReturn[nsKey] = val.GetLabels()
+	}
+
+	return toReturn, nil
+}
+
+func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+	result, err := NewQueryResults(queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		// We want Service, Namespace and ClusterID for key generation purposes
+		service, err := val.GetString("service")
+		if err != nil {
+			return toReturn, err
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		nsKey := ns + "," + service + "," + clusterID
+		toReturn[nsKey] = val.GetLabels()
+	}
+
+	return toReturn, nil
+}

+ 1 - 1
costmodel/router.go

@@ -409,7 +409,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		remoteEnabled = true
 	}
 
-	// Use Thanos Client if it exists (enabled) and remote flag not set
+	// Use Thanos Client if it exists (enabled) and remote flag set
 	var pClient prometheusClient.Client
 	if remote != "false" && a.ThanosClient != nil {
 		pClient = a.ThanosClient