Просмотр исходного кода

parallelize and use listwatcher to speed up queries

AjayTripathy 6 лет назад
Родитель
Сommit
a5ba361e98
3 измененных файлов с 215 добавлено и 111 удалено
  1. 1 1
      cloud/gcpprovider.go
  2. 209 107
      costmodel/costmodel.go
  3. 5 3
      main.go

+ 1 - 1
cloud/gcpprovider.go

@@ -197,7 +197,7 @@ func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string)
 						LEFT JOIN UNNEST(labels) as labels
 						ON labels.key = "kubernetes_namespace" OR labels.key = "kubernetes_container" OR labels.key = "kubernetes_deployment" OR labels.key = "kubernetes_pod" OR labels.key = "kubernetes_daemonset"
 				GROUP BY aggregator, environment, service;`, c.BillingDataDataset, start, end) // For example, "billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2"
-	klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
+	klog.V(4).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
 	return gcp.QuerySQL(queryString)
 }
 

+ 209 - 107
costmodel/costmodel.go

@@ -9,14 +9,18 @@ import (
 	"sort"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/util/workqueue"
 	"k8s.io/klog"
 )
 
@@ -38,6 +42,59 @@ const (
 	epFlags           = apiPrefix + "/status/flags"
 )
 
+type CostModel struct {
+	Clientset  kubernetes.Interface
+	Controller *Controller
+}
+
+func NewCostModel(clientset kubernetes.Interface) *CostModel {
+	cm := &CostModel{
+		Clientset: clientset,
+	}
+	cm.ContainerWatcher()
+	return cm
+}
+
+func (cm *CostModel) ContainerWatcher() {
+	podListWatcher := cache.NewListWatchFromClient(cm.Clientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
+
+	// create the workqueue
+	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+
+	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
+	// whenever the cache is updated, the pod key is added to the workqueue.
+	// Note that when we finally process the item from the workqueue, we might see a newer version
+	// of the Pod than the version which was responsible for triggering the update.
+	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
+		AddFunc: func(obj interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		UpdateFunc: func(old interface{}, new interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(new)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		DeleteFunc: func(obj interface{}) {
+			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
+			// key function.
+			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+	}, cache.Indexers{})
+
+	cm.Controller = NewController(queue, indexer, informer)
+	// Now let's start the controller
+	stop := make(chan struct{})
+	//defer close(stop)
+	go cm.Controller.Run(1, stop)
+}
+
 type CostData struct {
 	Name            string                       `json:"name,omitempty"`
 	PodName         string                       `json:"podName,omitempty"`
@@ -227,7 +284,7 @@ func ComputeUptimes(cli prometheusClient.Client) (map[string]float64, error) {
 	return results, nil
 }
 
-func ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
+func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, window, offset, window, offset)
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset)
 	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, window, offset, window, offset)
@@ -236,33 +293,77 @@ func ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
 	normalization := fmt.Sprintf(normalizationStr, window, offset)
 
-	resultRAMRequests, err := query(cli, queryRAMRequests)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching RAM requests: " + err.Error())
-	}
-	resultRAMUsage, err := query(cli, queryRAMUsage)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching RAM usage: " + err.Error())
-	}
-	resultCPURequests, err := query(cli, queryCPURequests)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching CPU requests: " + err.Error())
-	}
-	resultCPUUsage, err := query(cli, queryCPUUsage)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching CPUUsage requests: " + err.Error())
-	}
-	resultGPURequests, err := query(cli, queryGPURequests)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching GPU requests: " + err.Error())
-	}
-	resultPVRequests, err := query(cli, queryPVRequests)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching PV requests: " + err.Error())
+	var wg sync.WaitGroup
+	wg.Add(8)
+
+	var promErr error
+	var resultRAMRequests interface{}
+	go func() {
+		resultRAMRequests, promErr = query(cli, queryRAMRequests)
+		defer wg.Done()
+	}()
+	var resultRAMUsage interface{}
+	go func() {
+		resultRAMUsage, promErr = query(cli, queryRAMUsage)
+		defer wg.Done()
+	}()
+	var resultCPURequests interface{}
+	go func() {
+		resultCPURequests, promErr = query(cli, queryCPURequests)
+		defer wg.Done()
+	}()
+	var resultCPUUsage interface{}
+	go func() {
+		resultCPUUsage, promErr = query(cli, queryCPUUsage)
+		defer wg.Done()
+	}()
+	var resultGPURequests interface{}
+	go func() {
+		resultGPURequests, promErr = query(cli, queryGPURequests)
+		defer wg.Done()
+	}()
+	var resultPVRequests interface{}
+	go func() {
+		resultPVRequests, promErr = query(cli, queryPVRequests)
+		defer wg.Done()
+	}()
+	var normalizationResult interface{}
+	go func() {
+		normalizationResult, promErr = query(cli, normalization)
+		defer wg.Done()
+	}()
+
+	podDeploymentsMapping := make(map[string]map[string][]string)
+	podServicesMapping := make(map[string]map[string][]string)
+	namespaceLabelsMapping := make(map[string]map[string]string)
+	podlist := cm.Controller.GetAll()
+	var k8sErr error
+	go func() {
+
+		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		if k8sErr != nil {
+			return
+		}
+
+		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		if k8sErr != nil {
+			return
+		}
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		if k8sErr != nil {
+			return
+		}
+
+		wg.Done()
+	}()
+
+	wg.Wait()
+
+	if promErr != nil {
+		return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
 	}
-	normalizationResult, err := query(cli, normalization)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching normalization data: " + err.Error())
+	if k8sErr != nil {
+		return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
 	}
 
 	normalizationValue, err := getNormalization(normalizationResult)
@@ -276,25 +377,6 @@ func ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface
 		return nil, err
 	}
 
-	podlist, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
-
-	podDeploymentsMapping, err := getPodDeployments(clientset, podlist)
-	if err != nil {
-		return nil, err
-	}
-
-	podServicesMapping, err := getPodServices(clientset, podlist)
-	if err != nil {
-		return nil, err
-	}
-	namespaceLabelsMapping, err := getNamespaceLabels(clientset)
-	if err != nil {
-		return nil, err
-	}
-
 	pvClaimMapping, err := getPVInfoVector(resultPVRequests)
 	if err != nil {
 		klog.Infof("Unable to get PV Data: %s", err.Error())
@@ -306,11 +388,6 @@ func ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface
 		}
 	}
 
-	err = addPVData(clientset, pvClaimMapping, cloud)
-	if err != nil {
-		return nil, err
-	}
-
 	containerNameCost := make(map[string]*CostData)
 	containers := make(map[string]bool)
 
@@ -351,17 +428,17 @@ func ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface
 		containers[key] = true
 	}
 	currentContainers := make(map[string]v1.Pod)
-	for _, pod := range podlist.Items {
+	for _, pod := range podlist {
 		if pod.Status.Phase != "Running" {
 			continue
 		}
-		cs, err := newContainerMetricsFromPod(pod)
+		cs, err := newContainerMetricsFromPod(*pod)
 		if err != nil {
 			return nil, err
 		}
 		for _, c := range cs {
 			containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
-			currentContainers[c.Key()] = pod
+			currentContainers[c.Key()] = *pod
 		}
 	}
 	missingNodes := make(map[string]*costAnalyzerCloud.Node)
@@ -856,7 +933,7 @@ func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provide
 	return nodes, nil
 }
 
-func getPodServices(clientset kubernetes.Interface, podList *v1.PodList) (map[string]map[string][]string, error) {
+func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
 	servicesList, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
 	if err != nil {
 		return nil, err
@@ -870,7 +947,7 @@ func getPodServices(clientset kubernetes.Interface, podList *v1.PodList) (map[st
 			podServicesMapping[namespace] = make(map[string][]string)
 		}
 		s := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
-		for _, pod := range podList.Items {
+		for _, pod := range podList {
 			labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
 			if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
 				services, ok := podServicesMapping[namespace][pod.GetObjectMeta().GetName()]
@@ -885,7 +962,7 @@ func getPodServices(clientset kubernetes.Interface, podList *v1.PodList) (map[st
 	return podServicesMapping, nil
 }
 
-func getPodDeployments(clientset kubernetes.Interface, podList *v1.PodList) (map[string]map[string][]string, error) {
+func getPodDeployments(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
 	deploymentsList, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
 	if err != nil {
 		return nil, err
@@ -901,7 +978,7 @@ func getPodDeployments(clientset kubernetes.Interface, podList *v1.PodList) (map
 		if err != nil {
 			klog.V(2).Infof("Error doing deployment label conversion: " + err.Error())
 		}
-		for _, pod := range podList.Items {
+		for _, pod := range podList {
 			labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
 			if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
 				deployments, ok := podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()]
@@ -916,7 +993,7 @@ func getPodDeployments(clientset kubernetes.Interface, podList *v1.PodList) (map
 	return podDeploymentsMapping, nil
 }
 
-func ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider,
+func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider,
 	startString, endString, windowString string, filterNamespace string) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
@@ -943,33 +1020,77 @@ func ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Inte
 		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
-	resultRAMRequests, err := queryRange(cli, queryRAMRequests, start, end, window)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching RAM requests: " + err.Error())
-	}
-	resultRAMUsage, err := queryRange(cli, queryRAMUsage, start, end, window)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching RAM usage: " + err.Error())
-	}
-	resultCPURequests, err := queryRange(cli, queryCPURequests, start, end, window)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching CPU requests: " + err.Error())
-	}
-	resultCPUUsage, err := queryRange(cli, queryCPUUsage, start, end, window)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching CPU usage: " + err.Error())
-	}
-	resultGPURequests, err := queryRange(cli, queryGPURequests, start, end, window)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching GPU requests: " + err.Error())
-	}
-	resultPVRequests, err := queryRange(cli, queryPVRequests, start, end, window)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching PV requests: " + err.Error())
+	var wg sync.WaitGroup
+	wg.Add(8)
+
+	var promErr error
+	var resultRAMRequests interface{}
+	go func() {
+		resultRAMRequests, promErr = queryRange(cli, queryRAMRequests, start, end, window)
+		defer wg.Done()
+	}()
+	var resultRAMUsage interface{}
+	go func() {
+		resultRAMUsage, promErr = queryRange(cli, queryRAMUsage, start, end, window)
+		defer wg.Done()
+	}()
+	var resultCPURequests interface{}
+	go func() {
+		resultCPURequests, promErr = queryRange(cli, queryCPURequests, start, end, window)
+		defer wg.Done()
+	}()
+	var resultCPUUsage interface{}
+	go func() {
+		resultCPUUsage, promErr = queryRange(cli, queryCPUUsage, start, end, window)
+		defer wg.Done()
+	}()
+	var resultGPURequests interface{}
+	go func() {
+		resultGPURequests, promErr = queryRange(cli, queryGPURequests, start, end, window)
+		defer wg.Done()
+	}()
+	var resultPVRequests interface{}
+	go func() {
+		resultPVRequests, promErr = queryRange(cli, queryPVRequests, start, end, window)
+		defer wg.Done()
+	}()
+	var normalizationResult interface{}
+	go func() {
+		normalizationResult, promErr = query(cli, normalization)
+		defer wg.Done()
+	}()
+
+	podDeploymentsMapping := make(map[string]map[string][]string)
+	podServicesMapping := make(map[string]map[string][]string)
+	namespaceLabelsMapping := make(map[string]map[string]string)
+	podlist := cm.Controller.GetAll()
+	var k8sErr error
+	go func() {
+
+		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		if k8sErr != nil {
+			return
+		}
+
+		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		if k8sErr != nil {
+			return
+		}
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		if k8sErr != nil {
+			return
+		}
+
+		wg.Done()
+	}()
+
+	wg.Wait()
+
+	if promErr != nil {
+		return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
 	}
-	normalizationResult, err := query(cli, normalization)
-	if err != nil {
-		return nil, fmt.Errorf("Error fetching normalization data: " + err.Error())
+	if k8sErr != nil {
+		return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
 	}
 
 	normalizationValue, err := getNormalization(normalizationResult)
@@ -983,23 +1104,6 @@ func ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Inte
 		return nil, err
 	}
 
-	podlist, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
-
-	podDeploymentsMapping, err := getPodDeployments(clientset, podlist)
-	if err != nil {
-		return nil, err
-	}
-	podServicesMapping, err := getPodServices(clientset, podlist)
-	if err != nil {
-		return nil, err
-	}
-	namespaceLabelsMapping, err := getNamespaceLabels(clientset)
-	if err != nil {
-		return nil, err
-	}
 	pvClaimMapping, err := getPVInfoVectors(resultPVRequests)
 	if err != nil {
 		// Just log for compatibility with KSM less than 1.6
@@ -1010,8 +1114,6 @@ func ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Inte
 		if err != nil {
 			return nil, err
 		}
-	} else {
-		klog.Infof("WHY IS THIS NIL??")
 	}
 
 	containerNameCost := make(map[string]*CostData)
@@ -1054,17 +1156,17 @@ func ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Inte
 		containers[key] = true
 	}
 	currentContainers := make(map[string]v1.Pod)
-	for _, pod := range podlist.Items {
+	for _, pod := range podlist {
 		if pod.Status.Phase != "Running" {
 			continue
 		}
-		cs, err := newContainerMetricsFromPod(pod)
+		cs, err := newContainerMetricsFromPod(*pod)
 		if err != nil {
 			return nil, err
 		}
 		for _, c := range cs {
 			containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
-			currentContainers[c.Key()] = pod
+			currentContainers[c.Key()] = *pod
 		}
 	}
 

+ 5 - 3
main.go

@@ -53,6 +53,7 @@ type Accesses struct {
 	CPUAllocationRecorder         *prometheus.GaugeVec
 	GPUAllocationRecorder         *prometheus.GaugeVec
 	ContainerUptimeRecorder       *prometheus.GaugeVec
+	Model                         *costModel.CostModel
 }
 
 type DataEnvelope struct {
@@ -134,7 +135,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 		offset = "offset " + offset
 	}
 
-	data, err := costModel.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
+	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
 	if fields != "" {
 		filteredData := filterFields(fields, data)
 		w.Write(wrapData(filteredData, err))
@@ -185,7 +186,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
 
-	data, err := costModel.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace)
+	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace)
 	if fields != "" {
 		filteredData := filterFields(fields, data)
 		w.Write(wrapData(filteredData, err))
@@ -348,7 +349,7 @@ func (a *Accesses) recordPrices() {
 			a.GPUAllocationRecorder = GPUAllocation
 
 			klog.V(4).Info("Recording prices...")
-			data, err := costModel.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
+			data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
 			if err != nil {
 				klog.V(1).Info("Error in price recording: " + err.Error())
 				// zero the for loop so the time.Sleep will still work
@@ -553,6 +554,7 @@ func main() {
 		GPUAllocationRecorder:         GPUAllocation,
 		ContainerUptimeRecorder:       ContainerUptimeRecorder,
 		PersistentVolumePriceRecorder: pvGv,
+		Model:                         costModel.NewCostModel(kubeClientset),
 	}
 
 	err = a.Cloud.DownloadPricingData()