AjayTripathy 6 лет назад
Родитель
Сommit
d873a29ffa

+ 1 - 1
cloud/azureprovider.go

@@ -343,7 +343,7 @@ func (az *Azure) DownloadPricingData() error {
 	baseCPUPrice := c.CPU
 
 	for _, v := range *result.Meters {
-		if !strings.Contains(*v.MeterSubCategory, "Windows") {
+		if !strings.Contains(*v.MeterSubCategory, "Windows") && !strings.Contains(*v.MeterCategory, "Cloud Services") {
 
 			region, err := toRegionID(*v.MeterRegion, regions)
 			if err != nil {

+ 17 - 1
clustercache/clustercache.go

@@ -41,6 +41,9 @@ type ClusterCache interface {
 	// GetAllDeployments returns all the cached deployments
 	GetAllDeployments() []*appsv1.Deployment
 
+	// GetAllDeployments returns all the cached deployments
+	GetAllStatefulSets() []*appsv1.StatefulSet
+
 	// GetAllPersistentVolumes returns all the cached persistent volumes
 	GetAllPersistentVolumes() []*v1.PersistentVolume
 
@@ -61,6 +64,7 @@ type KubernetesClusterCache struct {
 	kubecostConfigMapWatch WatchController
 	serviceWatch           WatchController
 	deploymentsWatch       WatchController
+	statefulsetWatch       WatchController
 	pvWatch                WatchController
 	storageClassWatch      WatchController
 	stop                   chan struct{}
@@ -87,13 +91,14 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 		kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
 		serviceWatch:           NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
 		deploymentsWatch:       NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		statefulsetWatch:       NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
 		pvWatch:                NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
 		storageClassWatch:      NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
 	}
 
 	// Wait for each caching watcher to initialize
 	var wg sync.WaitGroup
-	wg.Add(8)
+	wg.Add(9)
 
 	cancel := make(chan struct{})
 
@@ -103,6 +108,7 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
 	go initializeCache(kcc.serviceWatch, &wg, cancel)
 	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
+	go initializeCache(kcc.statefulsetWatch, &wg, cancel)
 	go initializeCache(kcc.pvWatch, &wg, cancel)
 	go initializeCache(kcc.storageClassWatch, &wg, cancel)
 
@@ -123,6 +129,7 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.serviceWatch.Run(1, stopCh)
 	go kcc.kubecostConfigMapWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
+	go kcc.statefulsetWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 
@@ -187,6 +194,15 @@ func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
 	return deployments
 }
 
+func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
+	var statefulsets []*appsv1.StatefulSet
+	items := kcc.statefulsetWatch.GetAll()
+	for _, statefulset := range items {
+		statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
+	}
+	return statefulsets
+}
+
 func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
 	var pvs []*v1.PersistentVolume
 	items := kcc.pvWatch.GetAll()

+ 64 - 3
costmodel/costmodel.go

@@ -142,7 +142,7 @@ const (
 			), "pod_name","$1","pod","(.+)"
 		) 
 	) by (namespace,container_name,pod_name,node,cluster_id) 
-	* on (pod_name) group_right(namespace,container_name,node,cluster_id) label_replace(avg_over_time(kube_pod_status_phase{phase="Running"}[%s] %s), "pod_name","$1","pod","(.+)")`
+	* on (pod_name, namespace) group_right(container_name,node,cluster_id) label_replace(avg_over_time(kube_pod_status_phase{phase="Running"}[%s] %s), "pod_name","$1","pod","(.+)")`
 	queryPVRequestsStr = `avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id) 
 						* 
 						on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
@@ -174,6 +174,7 @@ const (
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
 	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
+	queryStatefulsetLabels    = `avg_over_time(statefulSet_match_labels[%s])`
 	queryServiceLabels        = `avg_over_time(service_selector_labels[%s])`
 	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
@@ -1089,6 +1090,37 @@ func getPodServices(cache clustercache.ClusterCache, podList []*v1.Pod, clusterI
 	return podServicesMapping, nil
 }
 
+func getPodStatefulsets(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
+	ssList := cache.GetAllStatefulSets()
+	podSSMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
+	for _, ss := range ssList {
+		namespace := ss.GetObjectMeta().GetNamespace()
+		name := ss.GetObjectMeta().GetName()
+
+		key := namespace + "," + clusterID
+		if _, ok := podSSMapping[key]; !ok {
+			podSSMapping[key] = make(map[string][]string)
+		}
+		s, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
+		if err != nil {
+			klog.V(2).Infof("Error doing deployment label conversion: " + err.Error())
+		}
+		for _, pod := range podList {
+			labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
+			if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
+				sss, ok := podSSMapping[key][pod.GetObjectMeta().GetName()]
+				if ok {
+					podSSMapping[key][pod.GetObjectMeta().GetName()] = append(sss, name)
+				} else {
+					podSSMapping[key][pod.GetObjectMeta().GetName()] = []string{name}
+				}
+			}
+		}
+	}
+	return podSSMapping, nil
+
+}
+
 func getPodDeployments(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 	deploymentsList := cache.GetAllDeployments()
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
@@ -1351,7 +1383,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}
 
 	var wg sync.WaitGroup
-	wg.Add(19)
+	wg.Add(20)
 
 	var promErr error
 	var resultRAMRequests interface{}
@@ -1456,6 +1488,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
 	}()
+	var statefulsetLabelsResults interface{}
+	go func() {
+		defer wg.Done()
+		statefulsetLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryStatefulsetLabels, windowString), start, end, window)
+	}()
 	var normalizationResults interface{}
 	go func() {
 		defer wg.Done()
@@ -1464,6 +1501,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}()
 
 	podDeploymentsMapping := make(map[string]map[string][]string)
+	podStatefulsetsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
 	podlist := cm.Cache.GetAllPods()
@@ -1476,6 +1514,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			return
 		}
 
+		podStatefulsetsMapping, k8sErr = getPodStatefulsets(cm.Cache, podlist, clusterID)
+		if k8sErr != nil {
+			return
+		}
+
 		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist, clusterID)
 		if k8sErr != nil {
 			return
@@ -1557,6 +1600,16 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
 	}
 
+	statefulsetLabels, err := GetStatefulsetMatchLabelsMetrics(statefulsetLabelsResults, clusterID)
+	if err != nil {
+		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
+	}
+	podStatefulsetMetricsMapping, err := getPodDeploymentsWithMetrics(statefulsetLabels, podLabels)
+	if err != nil {
+		klog.V(1).Infof("Unable to get match Statefulset Labels Metrics to Pods: %s", err.Error())
+	}
+	appendLabelsList(podStatefulsetsMapping, podStatefulsetMetricsMapping)
+
 	podDeploymentsMetricsMapping, err := getPodDeploymentsWithMetrics(deploymentLabels, podLabels)
 	if err != nil {
 		klog.V(1).Infof("Unable to get match Deployment Labels Metrics to Pods: %s", err.Error())
@@ -1676,6 +1729,14 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 					podDeployments = []string{}
 				}
 			}
+			var podStatefulSets []string
+			if _, ok := podStatefulsetsMapping[nsKey]; ok {
+				if ds, ok := podStatefulsetsMapping[nsKey][pod.GetObjectMeta().GetName()]; ok {
+					podStatefulSets = ds
+				} else {
+					podStatefulSets = []string{}
+				}
+			}
 
 			var podPVs []*PersistentVolumeClaimData
 			podClaims := pod.Spec.Volumes
@@ -1774,7 +1835,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 					Services:        podServices,
 					Daemonsets:      getDaemonsetsOfPod(pod),
 					Jobs:            getJobsOfPod(pod),
-					Statefulsets:    getStatefulSetsOfPod(pod),
+					Statefulsets:    podStatefulSets,
 					NodeData:        nodeData,
 					RAMReq:          RAMReqV,
 					RAMUsed:         RAMUsedV,

+ 68 - 0
costmodel/metrics.go

@@ -33,6 +33,14 @@ func sanitizeLabelName(s string) string {
 	return invalidLabelCharRE.ReplaceAllString(s, "_")
 }
 
+type StatefulsetCollector struct {
+	KubeClientSet kubernetes.Interface
+}
+
+func (sc StatefulsetCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("statefulSet_match_labels", "statfulSet match labels", []string{}, nil)
+}
+
 type DeploymentCollector struct {
 	KubeClientSet kubernetes.Interface
 }
@@ -41,6 +49,66 @@ func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
 	ch <- prometheus.NewInvalidDesc(nil)
 }
 
+func newStatefulsetMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) StatefulsetMetric {
+	return StatefulsetMetric{
+		fqName:          fqname,
+		labelNames:      labelNames,
+		labelValues:     labelvalues,
+		help:            "statefulSet_match_labels StatefulSet Match Labels",
+		statefulsetName: name,
+		namespace:       namespace,
+	}
+}
+
+type StatefulsetMetric struct {
+	fqName          string
+	help            string
+	labelNames      []string
+	labelValues     []string
+	statefulsetName string
+	namespace       string
+}
+
+func (s StatefulsetMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{"statefulSet": s.statefulsetName, "namespace": s.namespace}
+	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
+}
+
+func (s StatefulsetMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+	var labels []*dto.LabelPair
+	for i := range s.labelNames {
+		labels = append(labels, &dto.LabelPair{
+			Name:  &s.labelNames[i],
+			Value: &s.labelValues[i],
+		})
+	}
+	n := "namespace"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &n,
+		Value: &s.namespace,
+	})
+	r := "statefulSet"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &r,
+		Value: &s.statefulsetName,
+	})
+	m.Label = labels
+	return nil
+}
+
+func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
+	ds, _ := sc.KubeClientSet.AppsV1().StatefulSets("").List(metav1.ListOptions{})
+	for _, statefulset := range ds.Items {
+		labels, values := kubeLabelsToPrometheusLabels(statefulset.Spec.Selector.MatchLabels)
+		m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
+		ch <- m
+	}
+}
+
 func newDeploymentMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) DeploymentMetric {
 	return DeploymentMetric{
 		fqName:         fqname,

+ 31 - 0
costmodel/promparsers.go

@@ -337,6 +337,37 @@ func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[
 	return toReturn, nil
 }
 
+func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+	result, err := NewQueryResults(queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		// We want Statefulset, Namespace and ClusterID for key generation purposes
+		ss, err := val.GetString("statefulSet")
+		if err != nil {
+			return toReturn, err
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		nsKey := ns + "," + ss + "," + clusterID
+		toReturn[nsKey] = val.GetLabels()
+	}
+
+	return toReturn, nil
+}
+
 func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 	result, err := NewQueryResults(queryResult)

+ 3 - 0
costmodel/router.go

@@ -920,6 +920,9 @@ func Initialize() {
 	prometheus.MustRegister(DeploymentCollector{
 		KubeClientSet: kubeClientset,
 	})
+	prometheus.MustRegister(StatefulsetCollector{
+		KubeClientSet: kubeClientset,
+	})
 
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)