Kaynağa Gözat

Merge pull request #270 from kubecost/AjayTripathy-configmap-updates

add configmap init and watcher
Ajay Tripathy 6 yıl önce
ebeveyn
işleme
e3fc9b6615

+ 12 - 1
cloud/awsprovider.go

@@ -268,7 +268,18 @@ func (aws *AWS) GetConfig() (*CustomPricing, error) {
 	}
 	}
 	return c, nil
 	return c, nil
 }
 }
-
+func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("aws.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	configPath := path + "aws.json"
+	return configmapUpdate(c, configPath, a)
+}
 func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("aws.json")
 	c, err := GetDefaultPricingData("aws.json")
 	if err != nil {
 	if err != nil {

+ 14 - 1
cloud/azureprovider.go

@@ -343,7 +343,7 @@ func (az *Azure) DownloadPricingData() error {
 	baseCPUPrice := c.CPU
 	baseCPUPrice := c.CPU
 
 
 	for _, v := range *result.Meters {
 	for _, v := range *result.Meters {
-		if !strings.Contains(*v.MeterSubCategory, "Windows") {
+		if !strings.Contains(*v.MeterSubCategory, "Windows") && !strings.Contains(*v.MeterCategory, "Cloud Services") {
 
 
 			region, err := toRegionID(*v.MeterRegion, regions)
 			region, err := toRegionID(*v.MeterRegion, regions)
 			if err != nil {
 			if err != nil {
@@ -513,6 +513,19 @@ func (az *Azure) AddServiceKey(url url.Values) error {
 	return nil
 	return nil
 }
 }
 
 
+func (az *Azure) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("azure.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	configPath := path + "azure.json"
+	return configmapUpdate(c, configPath, a)
+}
+
 func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	defer az.DownloadPricingData()
 	defer az.DownloadPricingData()
 	c, err := GetDefaultPricingData("azure.json")
 	c, err := GetDefaultPricingData("azure.json")

+ 13 - 0
cloud/customprovider.go

@@ -54,6 +54,19 @@ func (*CustomProvider) ApplyReservedInstancePricing(nodes map[string]*Node) {
 
 
 }
 }
 
 
+func (cp *CustomProvider) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("default.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	configPath := path + "default.json"
+	return configmapUpdate(c, configPath, a)
+}
+
 func (cp *CustomProvider) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (cp *CustomProvider) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("default.json")
 	c, err := GetDefaultPricingData("default.json")
 	if err != nil {
 	if err != nil {

+ 13 - 0
cloud/gcpprovider.go

@@ -118,6 +118,19 @@ func (gcp *GCP) GetManagementPlatform() (string, error) {
 	return "", nil
 	return "", nil
 }
 }
 
 
+func (gcp *GCP) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("gcp.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	configPath := path + "gcp.json"
+	return configmapUpdate(c, configPath, a)
+}
+
 func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("gcp.json")
 	c, err := GetDefaultPricingData("gcp.json")
 	if err != nil {
 	if err != nil {

+ 20 - 0
cloud/provider.go

@@ -165,6 +165,7 @@ type Provider interface {
 	GetKey(map[string]string) Key
 	GetKey(map[string]string) Key
 	GetPVKey(*v1.PersistentVolume, map[string]string) PVKey
 	GetPVKey(*v1.PersistentVolume, map[string]string) PVKey
 	UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error)
 	UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error)
+	UpdateConfigFromConfigMap(map[string]string) (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
 	GetManagementPlatform() (string, error)
 	GetLocalStorageQuery(offset string) (string, error)
 	GetLocalStorageQuery(offset string) (string, error)
@@ -255,6 +256,25 @@ func GetDefaultPricingData(fname string) (*CustomPricing, error) {
 	}
 	}
 }
 }
 
 
+func configmapUpdate(c *CustomPricing, path string, a map[string]string) (*CustomPricing, error) {
+	for k, v := range a {
+		kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
+		err := SetCustomPricingField(c, kUpper, v)
+		if err != nil {
+			return nil, err
+		}
+	}
+	cj, err := json.Marshal(c)
+	if err != nil {
+		return nil, err
+	}
+	err = ioutil.WriteFile(path, cj, 0644)
+	if err != nil {
+		return nil, err
+	}
+	return c, nil
+}
+
 func SetCustomPricingField(obj *CustomPricing, name string, value string) error {
 func SetCustomPricingField(obj *CustomPricing, name string, value string) error {
 	structValue := reflect.ValueOf(obj).Elem()
 	structValue := reflect.ValueOf(obj).Elem()
 	structFieldValue := structValue.FieldByName(name)
 	structFieldValue := structValue.FieldByName(name)

+ 50 - 17
clustercache/clustercache.go

@@ -1,8 +1,11 @@
 package clustercache
 package clustercache
 
 
 import (
 import (
+	"os"
 	"sync"
 	"sync"
 
 
+	"k8s.io/klog"
+
 	appsv1 "k8s.io/api/apps/v1"
 	appsv1 "k8s.io/api/apps/v1"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
 	stv1 "k8s.io/api/storage/v1"
 	stv1 "k8s.io/api/storage/v1"
@@ -38,25 +41,33 @@ type ClusterCache interface {
 	// GetAllDeployments returns all the cached deployments
 	// GetAllDeployments returns all the cached deployments
 	GetAllDeployments() []*appsv1.Deployment
 	GetAllDeployments() []*appsv1.Deployment
 
 
+	// GetAllDeployments returns all the cached deployments
+	GetAllStatefulSets() []*appsv1.StatefulSet
+
 	// GetAllPersistentVolumes returns all the cached persistent volumes
 	// GetAllPersistentVolumes returns all the cached persistent volumes
 	GetAllPersistentVolumes() []*v1.PersistentVolume
 	GetAllPersistentVolumes() []*v1.PersistentVolume
 
 
 	// GetAllStorageClasses returns all the cached storage classes
 	// GetAllStorageClasses returns all the cached storage classes
 	GetAllStorageClasses() []*stv1.StorageClass
 	GetAllStorageClasses() []*stv1.StorageClass
+
+	// SetConfigMapUpdateFunc sets the configmap update function
+	SetConfigMapUpdateFunc(func(interface{}))
 }
 }
 
 
 // KubernetesClusterCache is the implementation of ClusterCache
 // KubernetesClusterCache is the implementation of ClusterCache
 type KubernetesClusterCache struct {
 type KubernetesClusterCache struct {
 	client kubernetes.Interface
 	client kubernetes.Interface
 
 
-	namespaceWatch    WatchController
-	nodeWatch         WatchController
-	podWatch          WatchController
-	serviceWatch      WatchController
-	deploymentsWatch  WatchController
-	pvWatch           WatchController
-	storageClassWatch WatchController
-	stop              chan struct{}
+	namespaceWatch         WatchController
+	nodeWatch              WatchController
+	podWatch               WatchController
+	kubecostConfigMapWatch WatchController
+	serviceWatch           WatchController
+	deploymentsWatch       WatchController
+	statefulsetWatch       WatchController
+	pvWatch                WatchController
+	storageClassWatch      WatchController
+	stop                   chan struct{}
 }
 }
 
 
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
@@ -69,28 +80,35 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	appsRestClient := client.AppsV1().RESTClient()
 	appsRestClient := client.AppsV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 
 
+	kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
+	klog.Infof("NAMESPACE: %s", kubecostNamespace)
+
 	kcc := &KubernetesClusterCache{
 	kcc := &KubernetesClusterCache{
-		client:            client,
-		namespaceWatch:    NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
-		nodeWatch:         NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
-		podWatch:          NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
-		serviceWatch:      NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
-		deploymentsWatch:  NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
-		pvWatch:           NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
-		storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+		client:                 client,
+		namespaceWatch:         NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
+		nodeWatch:              NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
+		podWatch:               NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
+		kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
+		serviceWatch:           NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		deploymentsWatch:       NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		statefulsetWatch:       NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
+		pvWatch:                NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		storageClassWatch:      NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
 	}
 	}
 
 
 	// Wait for each caching watcher to initialize
 	// Wait for each caching watcher to initialize
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
-	wg.Add(7)
+	wg.Add(9)
 
 
 	cancel := make(chan struct{})
 	cancel := make(chan struct{})
 
 
 	go initializeCache(kcc.namespaceWatch, &wg, cancel)
 	go initializeCache(kcc.namespaceWatch, &wg, cancel)
 	go initializeCache(kcc.nodeWatch, &wg, cancel)
 	go initializeCache(kcc.nodeWatch, &wg, cancel)
 	go initializeCache(kcc.podWatch, &wg, cancel)
 	go initializeCache(kcc.podWatch, &wg, cancel)
+	go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
 	go initializeCache(kcc.serviceWatch, &wg, cancel)
 	go initializeCache(kcc.serviceWatch, &wg, cancel)
 	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
 	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
+	go initializeCache(kcc.statefulsetWatch, &wg, cancel)
 	go initializeCache(kcc.pvWatch, &wg, cancel)
 	go initializeCache(kcc.pvWatch, &wg, cancel)
 	go initializeCache(kcc.storageClassWatch, &wg, cancel)
 	go initializeCache(kcc.storageClassWatch, &wg, cancel)
 
 
@@ -109,7 +127,9 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.nodeWatch.Run(1, stopCh)
 	go kcc.nodeWatch.Run(1, stopCh)
 	go kcc.podWatch.Run(1, stopCh)
 	go kcc.podWatch.Run(1, stopCh)
 	go kcc.serviceWatch.Run(1, stopCh)
 	go kcc.serviceWatch.Run(1, stopCh)
+	go kcc.kubecostConfigMapWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
+	go kcc.statefulsetWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 
 
@@ -174,6 +194,15 @@ func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
 	return deployments
 	return deployments
 }
 }
 
 
+func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
+	var statefulsets []*appsv1.StatefulSet
+	items := kcc.statefulsetWatch.GetAll()
+	for _, statefulset := range items {
+		statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
+	}
+	return statefulsets
+}
+
 func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
 func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
 	var pvs []*v1.PersistentVolume
 	var pvs []*v1.PersistentVolume
 	items := kcc.pvWatch.GetAll()
 	items := kcc.pvWatch.GetAll()
@@ -191,3 +220,7 @@ func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
 	}
 	}
 	return storageClasses
 	return storageClasses
 }
 }
+
+func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
+	kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
+}

+ 64 - 3
costmodel/costmodel.go

@@ -142,7 +142,7 @@ const (
 			), "pod_name","$1","pod","(.+)"
 			), "pod_name","$1","pod","(.+)"
 		) 
 		) 
 	) by (namespace,container_name,pod_name,node,cluster_id) 
 	) by (namespace,container_name,pod_name,node,cluster_id) 
-	* on (pod_name) group_right(namespace,container_name,node,cluster_id) label_replace(avg_over_time(kube_pod_status_phase{phase="Running"}[%s] %s), "pod_name","$1","pod","(.+)")`
+	* on (pod_name, namespace, node, cluster_id) group_right(container_name) label_replace(avg_over_time(kube_pod_status_phase{phase="Running"}[%s] %s), "pod_name","$1","pod","(.+)")`
 	queryPVRequestsStr = `avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id) 
 	queryPVRequestsStr = `avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id) 
 						* 
 						* 
 						on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
 						on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
@@ -174,6 +174,7 @@ const (
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
 	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
 	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
+	queryStatefulsetLabels    = `avg_over_time(statefulSet_match_labels[%s])`
 	queryServiceLabels        = `avg_over_time(service_selector_labels[%s])`
 	queryServiceLabels        = `avg_over_time(service_selector_labels[%s])`
 	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
@@ -1089,6 +1090,37 @@ func getPodServices(cache clustercache.ClusterCache, podList []*v1.Pod, clusterI
 	return podServicesMapping, nil
 	return podServicesMapping, nil
 }
 }
 
 
+func getPodStatefulsets(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
+	ssList := cache.GetAllStatefulSets()
+	podSSMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
+	for _, ss := range ssList {
+		namespace := ss.GetObjectMeta().GetNamespace()
+		name := ss.GetObjectMeta().GetName()
+
+		key := namespace + "," + clusterID
+		if _, ok := podSSMapping[key]; !ok {
+			podSSMapping[key] = make(map[string][]string)
+		}
+		s, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
+		if err != nil {
+			klog.V(2).Infof("Error doing deployment label conversion: " + err.Error())
+		}
+		for _, pod := range podList {
+			labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
+			if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
+				sss, ok := podSSMapping[key][pod.GetObjectMeta().GetName()]
+				if ok {
+					podSSMapping[key][pod.GetObjectMeta().GetName()] = append(sss, name)
+				} else {
+					podSSMapping[key][pod.GetObjectMeta().GetName()] = []string{name}
+				}
+			}
+		}
+	}
+	return podSSMapping, nil
+
+}
+
 func getPodDeployments(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 func getPodDeployments(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 	deploymentsList := cache.GetAllDeployments()
 	deploymentsList := cache.GetAllDeployments()
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
@@ -1351,7 +1383,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}
 	}
 
 
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
-	wg.Add(19)
+	wg.Add(20)
 
 
 	var promErr error
 	var promErr error
 	var resultRAMRequests interface{}
 	var resultRAMRequests interface{}
@@ -1456,6 +1488,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 
 		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
 		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
 	}()
 	}()
+	var statefulsetLabelsResults interface{}
+	go func() {
+		defer wg.Done()
+		statefulsetLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryStatefulsetLabels, windowString), start, end, window)
+	}()
 	var normalizationResults interface{}
 	var normalizationResults interface{}
 	go func() {
 	go func() {
 		defer wg.Done()
 		defer wg.Done()
@@ -1464,6 +1501,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}()
 	}()
 
 
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podDeploymentsMapping := make(map[string]map[string][]string)
+	podStatefulsetsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
 	podlist := cm.Cache.GetAllPods()
 	podlist := cm.Cache.GetAllPods()
@@ -1476,6 +1514,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			return
 			return
 		}
 		}
 
 
+		podStatefulsetsMapping, k8sErr = getPodStatefulsets(cm.Cache, podlist, clusterID)
+		if k8sErr != nil {
+			return
+		}
+
 		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist, clusterID)
 		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist, clusterID)
 		if k8sErr != nil {
 		if k8sErr != nil {
 			return
 			return
@@ -1557,6 +1600,16 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
 		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
 	}
 	}
 
 
+	statefulsetLabels, err := GetStatefulsetMatchLabelsMetrics(statefulsetLabelsResults, clusterID)
+	if err != nil {
+		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
+	}
+	podStatefulsetMetricsMapping, err := getPodDeploymentsWithMetrics(statefulsetLabels, podLabels)
+	if err != nil {
+		klog.V(1).Infof("Unable to get match Statefulset Labels Metrics to Pods: %s", err.Error())
+	}
+	appendLabelsList(podStatefulsetsMapping, podStatefulsetMetricsMapping)
+
 	podDeploymentsMetricsMapping, err := getPodDeploymentsWithMetrics(deploymentLabels, podLabels)
 	podDeploymentsMetricsMapping, err := getPodDeploymentsWithMetrics(deploymentLabels, podLabels)
 	if err != nil {
 	if err != nil {
 		klog.V(1).Infof("Unable to get match Deployment Labels Metrics to Pods: %s", err.Error())
 		klog.V(1).Infof("Unable to get match Deployment Labels Metrics to Pods: %s", err.Error())
@@ -1676,6 +1729,14 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 					podDeployments = []string{}
 					podDeployments = []string{}
 				}
 				}
 			}
 			}
+			var podStatefulSets []string
+			if _, ok := podStatefulsetsMapping[nsKey]; ok {
+				if ds, ok := podStatefulsetsMapping[nsKey][pod.GetObjectMeta().GetName()]; ok {
+					podStatefulSets = ds
+				} else {
+					podStatefulSets = []string{}
+				}
+			}
 
 
 			var podPVs []*PersistentVolumeClaimData
 			var podPVs []*PersistentVolumeClaimData
 			podClaims := pod.Spec.Volumes
 			podClaims := pod.Spec.Volumes
@@ -1774,7 +1835,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 					Services:        podServices,
 					Services:        podServices,
 					Daemonsets:      getDaemonsetsOfPod(pod),
 					Daemonsets:      getDaemonsetsOfPod(pod),
 					Jobs:            getJobsOfPod(pod),
 					Jobs:            getJobsOfPod(pod),
-					Statefulsets:    getStatefulSetsOfPod(pod),
+					Statefulsets:    podStatefulSets,
 					NodeData:        nodeData,
 					NodeData:        nodeData,
 					RAMReq:          RAMReqV,
 					RAMReq:          RAMReqV,
 					RAMUsed:         RAMUsedV,
 					RAMUsed:         RAMUsedV,

+ 68 - 0
costmodel/metrics.go

@@ -33,6 +33,14 @@ func sanitizeLabelName(s string) string {
 	return invalidLabelCharRE.ReplaceAllString(s, "_")
 	return invalidLabelCharRE.ReplaceAllString(s, "_")
 }
 }
 
 
+type StatefulsetCollector struct {
+	KubeClientSet kubernetes.Interface
+}
+
+func (sc StatefulsetCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("statefulSet_match_labels", "statfulSet match labels", []string{}, nil)
+}
+
 type DeploymentCollector struct {
 type DeploymentCollector struct {
 	KubeClientSet kubernetes.Interface
 	KubeClientSet kubernetes.Interface
 }
 }
@@ -41,6 +49,66 @@ func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
 	ch <- prometheus.NewInvalidDesc(nil)
 	ch <- prometheus.NewInvalidDesc(nil)
 }
 }
 
 
+func newStatefulsetMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) StatefulsetMetric {
+	return StatefulsetMetric{
+		fqName:          fqname,
+		labelNames:      labelNames,
+		labelValues:     labelvalues,
+		help:            "statefulSet_match_labels StatefulSet Match Labels",
+		statefulsetName: name,
+		namespace:       namespace,
+	}
+}
+
+type StatefulsetMetric struct {
+	fqName          string
+	help            string
+	labelNames      []string
+	labelValues     []string
+	statefulsetName string
+	namespace       string
+}
+
+func (s StatefulsetMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{"statefulSet": s.statefulsetName, "namespace": s.namespace}
+	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
+}
+
+func (s StatefulsetMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+	var labels []*dto.LabelPair
+	for i := range s.labelNames {
+		labels = append(labels, &dto.LabelPair{
+			Name:  &s.labelNames[i],
+			Value: &s.labelValues[i],
+		})
+	}
+	n := "namespace"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &n,
+		Value: &s.namespace,
+	})
+	r := "statefulSet"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &r,
+		Value: &s.statefulsetName,
+	})
+	m.Label = labels
+	return nil
+}
+
+func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
+	ds, _ := sc.KubeClientSet.AppsV1().StatefulSets("").List(metav1.ListOptions{})
+	for _, statefulset := range ds.Items {
+		labels, values := kubeLabelsToPrometheusLabels(statefulset.Spec.Selector.MatchLabels)
+		m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
+		ch <- m
+	}
+}
+
 func newDeploymentMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) DeploymentMetric {
 func newDeploymentMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) DeploymentMetric {
 	return DeploymentMetric{
 	return DeploymentMetric{
 		fqName:         fqname,
 		fqName:         fqname,

+ 31 - 0
costmodel/promparsers.go

@@ -337,6 +337,37 @@ func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[
 	return toReturn, nil
 	return toReturn, nil
 }
 }
 
 
+func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+	result, err := NewQueryResults(queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+
+	for _, val := range result {
+		// We want Statefulset, Namespace and ClusterID for key generation purposes
+		ss, err := val.GetString("statefulSet")
+		if err != nil {
+			return toReturn, err
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		nsKey := ns + "," + ss + "," + clusterID
+		toReturn[nsKey] = val.GetLabels()
+	}
+
+	return toReturn, nil
+}
+
 func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
 	toReturn := make(map[string]map[string]string)
 	result, err := NewQueryResults(queryResult)
 	result, err := NewQueryResults(queryResult)

+ 23 - 0
costmodel/router.go

@@ -23,6 +23,7 @@ import (
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
 	"k8s.io/client-go/rest"
@@ -821,6 +822,25 @@ func Initialize() {
 		panic(err.Error())
 		panic(err.Error())
 	}
 	}
 
 
+	watchConfigFunc := func(c interface{}) {
+		conf := c.(*v1.ConfigMap)
+		if conf.GetName() == "pricing-configs" {
+			_, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
+			if err != nil {
+				klog.Infof("ERROR UPDATING CONFIG: %s", err.Error())
+			}
+		}
+	}
+	kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
+	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
+	configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
+	if err != nil {
+		klog.Infof("ERROR FETCHING configmap: %s", err.Error())
+	}
+	watchConfigFunc(configs)
+
+	k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
+
 	cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 	cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "node_cpu_hourly_cost",
 		Name: "node_cpu_hourly_cost",
 		Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
 		Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
@@ -900,6 +920,9 @@ func Initialize() {
 	prometheus.MustRegister(DeploymentCollector{
 	prometheus.MustRegister(DeploymentCollector{
 		KubeClientSet: kubeClientset,
 		KubeClientSet: kubeClientset,
 	})
 	})
+	prometheus.MustRegister(StatefulsetCollector{
+		KubeClientSet: kubeClientset,
+	})
 
 
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)