Quellcode durchsuchen

add configmap init and watcher

AjayTripathy vor 6 Jahren
Ursprung
Commit
d343932b5b
7 geänderte Dateien mit 124 neuen und 18 gelöschten Zeilen
  1. 12 1
      cloud/awsprovider.go
  2. 13 0
      cloud/azureprovider.go
  3. 13 0
      cloud/customprovider.go
  4. 13 0
      cloud/gcpprovider.go
  5. 20 0
      cloud/provider.go
  6. 34 17
      clustercache/clustercache.go
  7. 19 0
      costmodel/router.go

+ 12 - 1
cloud/awsprovider.go

@@ -268,7 +268,18 @@ func (aws *AWS) GetConfig() (*CustomPricing, error) {
 	}
 	}
 	return c, nil
 	return c, nil
 }
 }
-
+func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("aws.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	configPath := path + "aws.json"
+	return configmapUpdate(c, configPath, a)
+}
 func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("aws.json")
 	c, err := GetDefaultPricingData("aws.json")
 	if err != nil {
 	if err != nil {

+ 13 - 0
cloud/azureprovider.go

@@ -513,6 +513,19 @@ func (az *Azure) AddServiceKey(url url.Values) error {
 	return nil
 	return nil
 }
 }
 
 
+func (az *Azure) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("azure.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	configPath := path + "azure.json"
+	return configmapUpdate(c, configPath, a)
+}
+
 func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	defer az.DownloadPricingData()
 	defer az.DownloadPricingData()
 	c, err := GetDefaultPricingData("azure.json")
 	c, err := GetDefaultPricingData("azure.json")

+ 13 - 0
cloud/customprovider.go

@@ -54,6 +54,19 @@ func (*CustomProvider) ApplyReservedInstancePricing(nodes map[string]*Node) {
 
 
 }
 }
 
 
+func (cp *CustomProvider) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("default.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	configPath := path + "default.json"
+	return configmapUpdate(c, configPath, a)
+}
+
 func (cp *CustomProvider) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (cp *CustomProvider) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("default.json")
 	c, err := GetDefaultPricingData("default.json")
 	if err != nil {
 	if err != nil {

+ 13 - 0
cloud/gcpprovider.go

@@ -118,6 +118,19 @@ func (gcp *GCP) GetManagementPlatform() (string, error) {
 	return "", nil
 	return "", nil
 }
 }
 
 
+func (gcp *GCP) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("gcp.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	configPath := path + "gcp.json"
+	return configmapUpdate(c, configPath, a)
+}
+
 func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("gcp.json")
 	c, err := GetDefaultPricingData("gcp.json")
 	if err != nil {
 	if err != nil {

+ 20 - 0
cloud/provider.go

@@ -165,6 +165,7 @@ type Provider interface {
 	GetKey(map[string]string) Key
 	GetKey(map[string]string) Key
 	GetPVKey(*v1.PersistentVolume, map[string]string) PVKey
 	GetPVKey(*v1.PersistentVolume, map[string]string) PVKey
 	UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error)
 	UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error)
+	UpdateConfigFromConfigMap(map[string]string) (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
 	GetManagementPlatform() (string, error)
 	GetLocalStorageQuery(offset string) (string, error)
 	GetLocalStorageQuery(offset string) (string, error)
@@ -255,6 +256,25 @@ func GetDefaultPricingData(fname string) (*CustomPricing, error) {
 	}
 	}
 }
 }
 
 
+func configmapUpdate(c *CustomPricing, path string, a map[string]string) (*CustomPricing, error) {
+	for k, v := range a {
+		kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
+		err := SetCustomPricingField(c, kUpper, v)
+		if err != nil {
+			return nil, err
+		}
+	}
+	cj, err := json.Marshal(c)
+	if err != nil {
+		return nil, err
+	}
+	err = ioutil.WriteFile(path, cj, 0644)
+	if err != nil {
+		return nil, err
+	}
+	return c, nil
+}
+
 func SetCustomPricingField(obj *CustomPricing, name string, value string) error {
 func SetCustomPricingField(obj *CustomPricing, name string, value string) error {
 	structValue := reflect.ValueOf(obj).Elem()
 	structValue := reflect.ValueOf(obj).Elem()
 	structFieldValue := structValue.FieldByName(name)
 	structFieldValue := structValue.FieldByName(name)

+ 34 - 17
clustercache/clustercache.go

@@ -1,8 +1,11 @@
 package clustercache
 package clustercache
 
 
 import (
 import (
+	"os"
 	"sync"
 	"sync"
 
 
+	"k8s.io/klog"
+
 	appsv1 "k8s.io/api/apps/v1"
 	appsv1 "k8s.io/api/apps/v1"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
 	stv1 "k8s.io/api/storage/v1"
 	stv1 "k8s.io/api/storage/v1"
@@ -43,20 +46,24 @@ type ClusterCache interface {
 
 
 	// GetAllStorageClasses returns all the cached storage classes
 	// GetAllStorageClasses returns all the cached storage classes
 	GetAllStorageClasses() []*stv1.StorageClass
 	GetAllStorageClasses() []*stv1.StorageClass
+
+	// SetConfigMapUpdateFunc sets the configmap update function
+	SetConfigMapUpdateFunc(func(interface{}))
 }
 }
 
 
 // KubernetesClusterCache is the implementation of ClusterCache
 // KubernetesClusterCache is the implementation of ClusterCache
 type KubernetesClusterCache struct {
 type KubernetesClusterCache struct {
 	client kubernetes.Interface
 	client kubernetes.Interface
 
 
-	namespaceWatch    WatchController
-	nodeWatch         WatchController
-	podWatch          WatchController
-	serviceWatch      WatchController
-	deploymentsWatch  WatchController
-	pvWatch           WatchController
-	storageClassWatch WatchController
-	stop              chan struct{}
+	namespaceWatch         WatchController
+	nodeWatch              WatchController
+	podWatch               WatchController
+	kubecostConfigMapWatch WatchController
+	serviceWatch           WatchController
+	deploymentsWatch       WatchController
+	pvWatch                WatchController
+	storageClassWatch      WatchController
+	stop                   chan struct{}
 }
 }
 
 
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
@@ -69,26 +76,31 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	appsRestClient := client.AppsV1().RESTClient()
 	appsRestClient := client.AppsV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 
 
+	kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
+	klog.Infof("NAMESPACE: %s", kubecostNamespace)
+
 	kcc := &KubernetesClusterCache{
 	kcc := &KubernetesClusterCache{
-		client:            client,
-		namespaceWatch:    NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
-		nodeWatch:         NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
-		podWatch:          NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
-		serviceWatch:      NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
-		deploymentsWatch:  NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
-		pvWatch:           NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
-		storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+		client:                 client,
+		namespaceWatch:         NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
+		nodeWatch:              NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
+		podWatch:               NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
+		kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
+		serviceWatch:           NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		deploymentsWatch:       NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		pvWatch:                NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		storageClassWatch:      NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
 	}
 	}
 
 
 	// Wait for each caching watcher to initialize
 	// Wait for each caching watcher to initialize
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
-	wg.Add(7)
+	wg.Add(8)
 
 
 	cancel := make(chan struct{})
 	cancel := make(chan struct{})
 
 
 	go initializeCache(kcc.namespaceWatch, &wg, cancel)
 	go initializeCache(kcc.namespaceWatch, &wg, cancel)
 	go initializeCache(kcc.nodeWatch, &wg, cancel)
 	go initializeCache(kcc.nodeWatch, &wg, cancel)
 	go initializeCache(kcc.podWatch, &wg, cancel)
 	go initializeCache(kcc.podWatch, &wg, cancel)
+	go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
 	go initializeCache(kcc.serviceWatch, &wg, cancel)
 	go initializeCache(kcc.serviceWatch, &wg, cancel)
 	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
 	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
 	go initializeCache(kcc.pvWatch, &wg, cancel)
 	go initializeCache(kcc.pvWatch, &wg, cancel)
@@ -109,6 +121,7 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.nodeWatch.Run(1, stopCh)
 	go kcc.nodeWatch.Run(1, stopCh)
 	go kcc.podWatch.Run(1, stopCh)
 	go kcc.podWatch.Run(1, stopCh)
 	go kcc.serviceWatch.Run(1, stopCh)
 	go kcc.serviceWatch.Run(1, stopCh)
+	go kcc.kubecostConfigMapWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
@@ -191,3 +204,7 @@ func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
 	}
 	}
 	return storageClasses
 	return storageClasses
 }
 }
+
+func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
+	kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
+}

+ 19 - 0
costmodel/router.go

@@ -23,6 +23,7 @@ import (
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
 	"k8s.io/client-go/rest"
@@ -822,6 +823,24 @@ func Initialize() {
 		panic(err.Error())
 		panic(err.Error())
 	}
 	}
 
 
+	watchConfigFunc := func(c interface{}) {
+		conf := c.(*v1.ConfigMap)
+		if conf.GetName() == "pricing-configs" {
+			_, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
+			if err != nil {
+				klog.Infof("ERROR UPDATING CONFIG: %s", err.Error())
+			}
+		}
+	}
+	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
+	configs, err := kubeClientset.CoreV1().ConfigMaps("kubecost").Get("pricing-configs", metav1.GetOptions{})
+	if err != nil {
+		klog.Infof("ERROR FETCHING configmap: %s", err.Error())
+	}
+	watchConfigFunc(configs)
+
+	k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
+
 	cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 	cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "node_cpu_hourly_cost",
 		Name: "node_cpu_hourly_cost",
 		Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
 		Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",