Ver código fonte

Added configuration variation of cluster definitions. Allow loading of a specific yaml containing definitions.

Matt Bolt 6 anos atrás
pai
commit
bbae636f02

+ 3 - 0
go.mod

@@ -9,6 +9,7 @@ require (
 	github.com/Azure/go-autorest v11.3.2+incompatible
 	github.com/aws/aws-sdk-go v1.28.9
 	github.com/dimchansky/utfbom v1.1.0 // indirect
+	github.com/etcd-io/bbolt v1.3.3
 	github.com/golang/mock v1.2.0
 	github.com/google/martian v2.1.0+incompatible // indirect
 	github.com/google/uuid v1.1.1
@@ -25,6 +26,7 @@ require (
 	github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
 	github.com/satori/go.uuid v1.2.0 // indirect
 	github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
+	go.etcd.io/bbolt v1.3.3 // indirect
 	golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529 // indirect
 	golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac // indirect
 	golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
@@ -35,6 +37,7 @@ require (
 	k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6
 	k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
 	k8s.io/klog v0.4.0
+	sigs.k8s.io/yaml v1.1.0
 )
 
 go 1.13

+ 4 - 0
go.sum

@@ -59,6 +59,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
 github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
+github.com/etcd-io/bbolt v1.3.3 h1:gSJmxrs37LgTqR/oyJBWok6k6SvXEUerFTbltIhXkBM=
+github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
 github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550 h1:mV9jbLoSW/8m4VK16ZkHTozJa8sesK5u5kTMFysTYac=
 github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
 github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
@@ -287,6 +289,8 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
 github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
 go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
+go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.opencensus.io v0.19.1/go.mod h1:gug0GbSHa8Pafr0d2urOSgoXHZ6x/RUlaiT0d9pqb4A=
 go.opencensus.io v0.19.2 h1:ZZpq6xI6kv/LuE/5s5UQvBU5vMjvRnPb8PvJrIntAnc=
 go.opencensus.io v0.19.2/go.mod h1:NO/8qkisMZLZ1FCsKNqtJPwc8/TaclWyY0B6wcYNg9M=

+ 3 - 10
pkg/cloud/providerconfig.go

@@ -9,6 +9,8 @@ import (
 	"strings"
 	"sync"
 
+	"github.com/kubecost/cost-model/pkg/util"
+
 	"k8s.io/klog"
 )
 
@@ -194,16 +196,7 @@ func SetCustomPricingField(obj *CustomPricing, name string, value string) error
 // but the error isn't relevant to the path. This can happen when the current
 // user doesn't have permission to access the file.
 func fileExists(filename string) (bool, error) {
-	info, err := os.Stat(filename)
-	if err != nil {
-		if os.IsNotExist(err) {
-			return false, nil
-		}
-
-		return false, err
-	}
-
-	return !info.IsDir(), nil
+	return util.FileExists(filename) // delegate to utility method
 }
 
 // Returns the configuration directory concatenated with a specific config file name

+ 89 - 0
pkg/clustermanager/boltdbstorage.go

@@ -0,0 +1,89 @@
+package clustermanager
+
+import (
+	bolt "github.com/etcd-io/bbolt"
+	_ "k8s.io/klog"
+)
+
+type BoltDBClusterStorage struct {
+	bucket []byte
+	db     *bolt.DB
+}
+
+func NewBoltDBClusterStorage(bucket string, db *bolt.DB) (ClusterStorage, error) {
+	bucketKey := []byte(bucket)
+
+	err := db.Update(func(tx *bolt.Tx) error {
+		_, err := tx.CreateBucketIfNotExists(bucketKey)
+		if err != nil {
+			return err
+		}
+
+		return nil
+	})
+
+	if err != nil {
+		return nil, err
+	}
+
+	return &BoltDBClusterStorage{
+		bucket: bucketKey,
+		db:     db,
+	}, nil
+}
+
+// Adds the entry if the key does not exist
+func (cs *BoltDBClusterStorage) AddIfNotExists(key string, cluster []byte) error {
+	return cs.db.Update(func(tx *bolt.Tx) error {
+		k := []byte(key)
+		bucket := tx.Bucket(cs.bucket)
+
+		if bucket.Get(k) != nil {
+			return nil
+		}
+		return bucket.Put(k, cluster)
+	})
+}
+
+// Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
+// value with the provided.
+func (cs *BoltDBClusterStorage) AddOrUpdate(key string, cluster []byte) error {
+	return cs.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(cs.bucket)
+
+		return bucket.Put([]byte(key), cluster)
+	})
+}
+
+// Removes a key from the cluster storage
+func (cs *BoltDBClusterStorage) Remove(key string) error {
+	return cs.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(cs.bucket)
+
+		return bucket.Delete([]byte(key))
+	})
+}
+
+// Retrieve all the keys stored
+func (cs *BoltDBClusterStorage) Each(handler func(string, []byte)) error {
+	return cs.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(cs.bucket)
+
+		return bucket.ForEach(func(k, v []byte) error {
+			// Allow the bytes to live outside transaction by copy
+			key := make([]byte, len(k))
+			value := make([]byte, len(v))
+
+			copy(key, k)
+			copy(value, v)
+
+			handler(string(key), value)
+			return nil
+		})
+	})
+}
+
+// Closes the backing storage
+func (cs *BoltDBClusterStorage) Close() error {
+	return cs.db.Close()
+}

+ 162 - 0
pkg/clustermanager/clustermanager.go

@@ -0,0 +1,162 @@
+package clustermanager
+
+import (
+	"encoding/json"
+	"io/ioutil"
+
+	"github.com/google/uuid"
+
+	"github.com/kubecost/cost-model/pkg/util"
+
+	"k8s.io/klog"
+	"sigs.k8s.io/yaml"
+)
+
+// Cluster definition from a configuration yaml
+type ClusterConfigEntry struct {
+	Name    string `yaml:"name"`
+	Address string `yaml:"address"`
+}
+
+// ClusterDefinition
+type ClusterDefinition struct {
+	ID      string                 `json:"id,omitempty"`
+	Name    string                 `json:"name"`
+	Address string                 `json:"address"`
+	Details map[string]interface{} `json:"details,omitempty"`
+}
+
+// ClusterStorage interface defines an implementation prototype for a storage responsible
+// for ClusterDefinition instances
+type ClusterStorage interface {
+	// Add only if the key does not exist
+	AddIfNotExists(key string, cluster []byte) error
+
+	// Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
+	// value with the provided.
+	AddOrUpdate(key string, cluster []byte) error
+
+	// Removes a key from the cluster storage
+	Remove(key string) error
+
+	// Call a handler to receive each key and value stored
+	Each(handler func(string, []byte)) error
+
+	// Closes the backing storage
+	Close() error
+}
+
+type ClusterManager struct {
+	storage ClusterStorage
+	// cache   map[string]*ClusterDefinition
+}
+
+// Creates a new ClusterManager instance using the provided storage
+func NewClusterManager(storage ClusterStorage) *ClusterManager {
+	return &ClusterManager{
+		storage: storage,
+	}
+}
+
+// Creates a new ClusterManager instance using the provided storage and populates a
+// yaml configured list of clusters
+func NewConfiguredClusterManager(storage ClusterStorage, config string) *ClusterManager {
+	clusterManager := NewClusterManager(storage)
+
+	exists, err := util.FileExists(config)
+	if !exists {
+		if err != nil {
+			klog.V(1).Infof("[Error] Failed to load config file: %s. Error: %s", config, err.Error())
+		}
+		return clusterManager
+	}
+
+	data, err := ioutil.ReadFile(config)
+	if err != nil {
+		return clusterManager
+	}
+
+	var entries []ClusterConfigEntry
+	err = yaml.Unmarshal(data, &entries)
+	if err != nil {
+		return clusterManager
+	}
+
+	for _, entry := range entries {
+		clusterManager.Add(ClusterDefinition{
+			ID:      entry.Name,
+			Name:    entry.Name,
+			Address: entry.Address,
+		})
+	}
+
+	return clusterManager
+}
+
+// Adds, but will not update an existing entry.
+func (cm *ClusterManager) Add(cluster ClusterDefinition) (*ClusterDefinition, error) {
+	// First time add
+	if cluster.ID == "" {
+		cluster.ID = uuid.New().String()
+	}
+
+	data, err := json.Marshal(cluster)
+	if err != nil {
+		return nil, err
+	}
+
+	err = cm.storage.AddIfNotExists(cluster.ID, data)
+	if err != nil {
+		return nil, err
+	}
+
+	return &cluster, nil
+}
+
+func (cm *ClusterManager) AddOrUpdate(cluster ClusterDefinition) (*ClusterDefinition, error) {
+	// First time add
+	if cluster.ID == "" {
+		cluster.ID = uuid.New().String()
+	}
+
+	data, err := json.Marshal(cluster)
+	if err != nil {
+		return nil, err
+	}
+
+	err = cm.storage.AddOrUpdate(cluster.ID, data)
+	if err != nil {
+		return nil, err
+	}
+
+	return &cluster, nil
+}
+
+func (cm *ClusterManager) Remove(id string) error {
+	return cm.storage.Remove(id)
+}
+
+func (cm *ClusterManager) GetAll() []*ClusterDefinition {
+	clusters := []*ClusterDefinition{}
+
+	err := cm.storage.Each(func(key string, cluster []byte) {
+		var cd ClusterDefinition
+		err := json.Unmarshal(cluster, &cd)
+		if err != nil {
+			klog.V(1).Infof("[Error] Failed to unmarshal json cluster definition for key: %s", key)
+			return
+		}
+
+		clusters = append(clusters, &cd)
+	})
+
+	if err != nil {
+		klog.Infof("[Error] Failed to load list of clusters: %s", err.Error())
+	}
+
+	return clusters
+}
+
+func (cm *ClusterManager) Close() error {
+	return cm.storage.Close()
+}

+ 104 - 0
pkg/clustermanager/clustersendpoints.go

@@ -0,0 +1,104 @@
+package clustermanager
+
+import (
+	"encoding/json"
+	"errors"
+	"io/ioutil"
+	"net/http"
+
+	"github.com/julienschmidt/httprouter"
+
+	"k8s.io/klog"
+)
+
+// DataEnvelope is a generic wrapper struct for http response data
+type DataEnvelope struct {
+	Code   int         `json:"code"`
+	Status string      `json:"status"`
+	Data   interface{} `json:"data"`
+}
+
+type ClusterManagerEndpoints struct {
+	manager *ClusterManager
+}
+
+func NewClusterManagerEndpoints(manager *ClusterManager) *ClusterManagerEndpoints {
+	return &ClusterManagerEndpoints{
+		manager: manager,
+	}
+}
+
+func (cme *ClusterManagerEndpoints) GetAllClusters(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	clusters := cme.manager.GetAll()
+	w.Write(wrapData(clusters, nil))
+}
+
+func (cme *ClusterManagerEndpoints) PutCluster(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	data, err := ioutil.ReadAll(r.Body)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+
+	var clusterDef ClusterDefinition
+	err = json.Unmarshal(data, &clusterDef)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+
+	cd, err := cme.manager.AddOrUpdate(clusterDef)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+
+	w.Write(wrapData(cd, nil))
+}
+
+func (cme *ClusterManagerEndpoints) DeleteCluster(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	clusterID := ps.ByName("id")
+	if clusterID == "" {
+		w.Write(wrapData(nil, errors.New("Failed to locate cluster with empty id.")))
+		return
+	}
+
+	err := cme.manager.Remove(clusterID)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+
+	w.Write(wrapData("success", nil))
+}
+
+func wrapData(data interface{}, err error) []byte {
+	var resp []byte
+
+	if err != nil {
+		klog.V(1).Infof("Error returned to client: %s", err.Error())
+		resp, _ = json.Marshal(&DataEnvelope{
+			Code:   http.StatusInternalServerError,
+			Status: "error",
+			Data:   err.Error(),
+		})
+	} else {
+		resp, _ = json.Marshal(&DataEnvelope{
+			Code:   http.StatusOK,
+			Status: "success",
+			Data:   data,
+		})
+
+	}
+
+	return resp
+}

+ 52 - 0
pkg/clustermanager/mapdbstorage.go

@@ -0,0 +1,52 @@
+package clustermanager
+
+import (
+	_ "k8s.io/klog"
+)
+
+type MapDBClusterStorage struct {
+	store map[string][]byte
+}
+
+func NewMapDBClusterStorage() ClusterStorage {
+	return &MapDBClusterStorage{
+		store: make(map[string][]byte),
+	}
+}
+
+// Adds the entry if the key does not exist
+func (cs *MapDBClusterStorage) AddIfNotExists(key string, cluster []byte) error {
+	if _, ok := cs.store[key]; !ok {
+		cs.store[key] = cluster
+	}
+	return nil
+}
+
+// Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
+// value with the provided.
+func (cs *MapDBClusterStorage) AddOrUpdate(key string, cluster []byte) error {
+	cs.store[key] = cluster
+	return nil
+}
+
+// Removes a key from the cluster storage
+func (cs *MapDBClusterStorage) Remove(key string) error {
+	delete(cs.store, key)
+	return nil
+}
+
+// Retrieve all the keys stored
+func (cs *MapDBClusterStorage) Each(handler func(string, []byte)) error {
+	for k, v := range cs.store {
+		value := make([]byte, len(v))
+		copy(value, v)
+
+		handler(k, value)
+	}
+	return nil
+}
+
+// Closes the backing storage
+func (cs *MapDBClusterStorage) Close() error {
+	return nil
+}

+ 40 - 3
pkg/costmodel/router.go

@@ -19,13 +19,16 @@ import (
 	"github.com/julienschmidt/httprouter"
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
-	"github.com/patrickmn/go-cache"
+	cm "github.com/kubecost/cost-model/pkg/clustermanager"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
-	"github.com/prometheus/client_golang/prometheus"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
+	bolt "github.com/etcd-io/bbolt"
+	"github.com/patrickmn/go-cache"
+	"github.com/prometheus/client_golang/prometheus"
+
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
 )
@@ -48,6 +51,7 @@ type Accesses struct {
 	PrometheusClient              prometheusClient.Client
 	ThanosClient                  prometheusClient.Client
 	KubeClientSet                 kubernetes.Interface
+	ClusterManager                *cm.ClusterManager
 	Cloud                         costAnalyzerCloud.Provider
 	CPUPriceRecorder              *prometheus.GaugeVec
 	RAMPriceRecorder              *prometheus.GaugeVec
@@ -795,6 +799,27 @@ func (a *Accesses) recordPrices() {
 	}()
 }
 
+// Creates a new ClusterManager instance using a boltdb storage. If that fails,
+// then we fall back to a memory-only storage.
+func newClusterManager() *cm.ClusterManager {
+	clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
+
+	path := os.Getenv("CONFIG_PATH")
+	db, err := bolt.Open(path+"costmodel.db", 0600, nil)
+	if err != nil {
+		klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
+		return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
+	}
+
+	store, err := cm.NewBoltDBClusterStorage("clusters", db)
+	if err != nil {
+		klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
+		return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
+	}
+
+	return cm.NewConfiguredClusterManager(store, clustersConfigFile)
+}
+
 func Initialize() {
 	klog.InitFlags(nil)
 	flag.Set("v", "3")
@@ -859,7 +884,7 @@ func Initialize() {
 		if conf.GetName() == "pricing-configs" {
 			_, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
 			if err != nil {
-				klog.Infof("ERROR UPDATING CONFIG: %s", err.Error())
+				klog.Infof("[Error] Failed to update config: %s", err.Error())
 			}
 		}
 	}
@@ -874,6 +899,12 @@ func Initialize() {
 
 	k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
 
+	// TODO: General Architecture Note: Several passes have been made to modularize a lot of
+	// TODO: our code, but the router still continues to be the obvious entry point for new \
+	// TODO: features. We should look to spliting out the actual "router" functionality and
+	// TODO: implement a builder -> controller for stitching new features and other dependencies.
+	clusterManager := newClusterManager()
+
 	cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "node_cpu_hourly_cost",
 		Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
@@ -963,6 +994,7 @@ func Initialize() {
 	A = Accesses{
 		PrometheusClient:              promCli,
 		KubeClientSet:                 kubeClientset,
+		ClusterManager:                clusterManager,
 		Cloud:                         cloudProvider,
 		CPUPriceRecorder:              cpuGv,
 		RAMPriceRecorder:              ramGv,
@@ -1035,6 +1067,8 @@ func Initialize() {
 
 	A.recordPrices()
 
+	managerEndpoints := cm.NewClusterManagerEndpoints(A.ClusterManager)
+
 	Router.GET("/costDataModel", A.CostDataModel)
 	Router.GET("/costDataModelRange", A.CostDataModelRange)
 	Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
@@ -1047,4 +1081,7 @@ func Initialize() {
 	Router.GET("/managementPlatform", A.ManagementPlatform)
 	Router.GET("/clusterInfo", A.ClusterInfo)
 	Router.GET("/containerUptimes", A.ContainerUptimes)
+	Router.GET("/clusters", managerEndpoints.GetAllClusters)
+	Router.PUT("/clusters", managerEndpoints.PutCluster)
+	Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
 }

+ 23 - 0
pkg/util/file.go

@@ -0,0 +1,23 @@
+package util
+
+import "os"
+
+// File exists has three different return cases that should be handled:
+//   1. File exists and is not a directory (true, nil)
+//   2. File does not exist (false, nil)
+//   3. File may or may not exist. Error occurred during stat (false, error)
+// The third case represents the scenario where the stat returns an error,
+// but the error isn't relevant to the path. This can happen when the current
+// user doesn't have permission to access the file.
+func FileExists(filename string) (bool, error) {
+	info, err := os.Stat(filename)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return false, nil
+		}
+
+		return false, err
+	}
+
+	return !info.IsDir(), nil
+}