فهرست منبع

Move ClusterCache to new package. Use ClusterCache in Provider implementations. Add reserved instance allocation to node data.

Matt Bolt 6 سال پیش
والد
کامیت
55a7270cdc
9فایلهای تغییر یافته به همراه255 افزوده شده و 110 حذف شده
  1. 25 27
      cloud/awsprovider.go
  2. 11 9
      cloud/azureprovider.go
  3. 6 2
      cloud/customprovider.go
  4. 127 25
      cloud/gcpprovider.go
  5. 37 27
      cloud/provider.go
  6. 31 3
      clustercache/clustercache.go
  7. 1 1
      clustercache/watchcontroller.go
  8. 10 14
      costmodel/costmodel.go
  9. 7 2
      costmodel/router.go

+ 25 - 27
cloud/awsprovider.go

@@ -19,6 +19,8 @@ import (
 
 
 	"k8s.io/klog"
 	"k8s.io/klog"
 
 
+	"github.com/kubecost/cost-model/clustercache"
+
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/awserr"
 	"github.com/aws/aws-sdk-go/aws/awserr"
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/aws/session"
@@ -29,8 +31,6 @@ import (
 	"github.com/jszwec/csvutil"
 	"github.com/jszwec/csvutil"
 
 
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/client-go/kubernetes"
 )
 )
 
 
 const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
 const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
@@ -44,7 +44,7 @@ type AWS struct {
 	Pricing                 map[string]*AWSProductTerms
 	Pricing                 map[string]*AWSProductTerms
 	SpotPricingByInstanceID map[string]*spotInfo
 	SpotPricingByInstanceID map[string]*spotInfo
 	ValidPricingKeys        map[string]bool
 	ValidPricingKeys        map[string]bool
-	Clientset               *kubernetes.Clientset
+	Clientset               clustercache.ClusterCache
 	BaseCPUPrice            string
 	BaseCPUPrice            string
 	BaseRAMPrice            string
 	BaseRAMPrice            string
 	BaseGPUPrice            string
 	BaseGPUPrice            string
@@ -238,12 +238,10 @@ type AwsAthenaInfo struct {
 }
 }
 
 
 func (aws *AWS) GetManagementPlatform() (string, error) {
 func (aws *AWS) GetManagementPlatform() (string, error) {
-	nodes, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return "", err
-	}
-	if len(nodes.Items) > 0 {
-		n := nodes.Items[0]
+	nodes := aws.Clientset.GetAllNodes()
+
+	if len(nodes) > 0 {
+		n := nodes[0]
 		version := n.Status.NodeInfo.KubeletVersion
 		version := n.Status.NodeInfo.KubeletVersion
 		if strings.Contains(version, "eks") {
 		if strings.Contains(version, "eks") {
 			return "eks", nil
 			return "eks", nil
@@ -475,25 +473,20 @@ func (aws *AWS) DownloadPricingData() error {
 	if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
 	if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
 		klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
 		klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
 	}
 	}
-	nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	nodeList := aws.Clientset.GetAllNodes()
+
 	inputkeys := make(map[string]bool)
 	inputkeys := make(map[string]bool)
-	for _, n := range nodeList.Items {
+	for _, n := range nodeList {
 		labels := n.GetObjectMeta().GetLabels()
 		labels := n.GetObjectMeta().GetLabels()
 		key := aws.GetKey(labels)
 		key := aws.GetKey(labels)
 		inputkeys[key.Features()] = true
 		inputkeys[key.Features()] = true
 	}
 	}
 
 
-	pvList, err := aws.Clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	pvList := aws.Clientset.GetAllPersistentVolumes()
 
 
-	storageClasses, err := aws.Clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
+	storageClasses := aws.Clientset.GetAllStorageClasses()
 	storageClassMap := make(map[string]map[string]string)
 	storageClassMap := make(map[string]map[string]string)
-	for _, storageClass := range storageClasses.Items {
+	for _, storageClass := range storageClasses {
 		params := storageClass.Parameters
 		params := storageClass.Parameters
 		storageClassMap[storageClass.ObjectMeta.Name] = params
 		storageClassMap[storageClass.ObjectMeta.Name] = params
 		if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
 		if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -503,13 +496,13 @@ func (aws *AWS) DownloadPricingData() error {
 	}
 	}
 
 
 	pvkeys := make(map[string]PVKey)
 	pvkeys := make(map[string]PVKey)
-	for _, pv := range pvList.Items {
+	for _, pv := range pvList {
 		params, ok := storageClassMap[pv.Spec.StorageClassName]
 		params, ok := storageClassMap[pv.Spec.StorageClassName]
 		if !ok {
 		if !ok {
 			klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
 			klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
 			continue
 			continue
 		}
 		}
-		key := aws.GetPVKey(&pv, params)
+		key := aws.GetPVKey(pv, params)
 		pvkeys[key.Features()] = key
 		pvkeys[key.Features()] = key
 	}
 	}
 
 
@@ -815,6 +808,10 @@ func (aws *AWS) NodePricing(k Key) (*Node, error) {
 func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 	defaultClusterName := "AWS Cluster #1"
 	defaultClusterName := "AWS Cluster #1"
 	c, err := awsProvider.GetConfig()
 	c, err := awsProvider.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+
 	remote := os.Getenv(remoteEnabled)
 	remote := os.Getenv(remoteEnabled)
 	remoteEnabled := false
 	remoteEnabled := false
 	if os.Getenv(remote) == "true" {
 	if os.Getenv(remote) == "true" {
@@ -845,11 +842,8 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 	}
 	}
 	provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
 	provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
 	clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
 	clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
-	nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
-	for _, n := range nodeList.Items {
+	nodeList := awsProvider.Clientset.GetAllNodes()
+	for _, n := range nodeList {
 		region := ""
 		region := ""
 		instanceId := ""
 		instanceId := ""
 		providerId := n.Spec.ProviderID
 		providerId := n.Spec.ProviderID
@@ -1402,6 +1396,10 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 	return spots, nil
 	return spots, nil
 }
 }
 
 
+func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
+
+}
+
 /*
 /*
 func (aws *AWS) getReservedInstances() ([]interface{}, error) {
 func (aws *AWS) getReservedInstances() ([]interface{}, error) {
 	customPricing, err := a.GetConfig()
 	customPricing, err := a.GetConfig()

+ 11 - 9
cloud/azureprovider.go

@@ -13,6 +13,8 @@ import (
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 
 
+	"github.com/kubecost/cost-model/clustercache"
+
 	"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-09-01/skus"
 	"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-09-01/skus"
 	"github.com/Azure/azure-sdk-for-go/services/containerservice/mgmt/2018-03-31/containerservice"
 	"github.com/Azure/azure-sdk-for-go/services/containerservice/mgmt/2018-03-31/containerservice"
 	"github.com/Azure/azure-sdk-for-go/services/preview/commerce/mgmt/2015-06-01-preview/commerce"
 	"github.com/Azure/azure-sdk-for-go/services/preview/commerce/mgmt/2015-06-01-preview/commerce"
@@ -22,8 +24,6 @@ import (
 	"github.com/Azure/go-autorest/autorest/azure"
 	"github.com/Azure/go-autorest/autorest/azure"
 	"github.com/Azure/go-autorest/autorest/azure/auth"
 	"github.com/Azure/go-autorest/autorest/azure/auth"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/client-go/kubernetes"
 	"k8s.io/klog"
 	"k8s.io/klog"
 )
 )
 
 
@@ -165,7 +165,7 @@ func checkRegionID(regionID string, regions map[string]string) bool {
 type Azure struct {
 type Azure struct {
 	allPrices               map[string]*Node
 	allPrices               map[string]*Node
 	DownloadPricingDataLock sync.RWMutex
 	DownloadPricingDataLock sync.RWMutex
-	Clientset               *kubernetes.Clientset
+	Clientset               clustercache.ClusterCache
 }
 }
 
 
 type azureKey struct {
 type azureKey struct {
@@ -265,12 +265,10 @@ func getMachineTypeVariants(mt string) []string {
 }
 }
 
 
 func (az *Azure) GetManagementPlatform() (string, error) {
 func (az *Azure) GetManagementPlatform() (string, error) {
-	nodes, err := az.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return "", err
-	}
-	if len(nodes.Items) > 0 {
-		n := nodes.Items[0]
+	nodes := az.Clientset.GetAllNodes()
+
+	if len(nodes) > 0 {
+		n := nodes[0]
 		providerID := n.Spec.ProviderID
 		providerID := n.Spec.ProviderID
 		if strings.Contains(providerID, "aks") {
 		if strings.Contains(providerID, "aks") {
 			return "aks", nil
 			return "aks", nil
@@ -578,6 +576,10 @@ func (az *Azure) ExternalAllocations(string, string, string) ([]*OutOfClusterAll
 	return nil, nil
 	return nil, nil
 }
 }
 
 
+func (az *Azure) ApplyReservedInstancePricing(nodes map[string]*Node) {
+
+}
+
 func (az *Azure) PVPricing(PVKey) (*PV, error) {
 func (az *Azure) PVPricing(PVKey) (*PV, error) {
 	return nil, nil
 	return nil, nil
 }
 }

+ 6 - 2
cloud/customprovider.go

@@ -10,8 +10,8 @@ import (
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 
 
+	"github.com/kubecost/cost-model/clustercache"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
-	"k8s.io/client-go/kubernetes"
 )
 )
 
 
 type NodePrice struct {
 type NodePrice struct {
@@ -21,7 +21,7 @@ type NodePrice struct {
 }
 }
 
 
 type CustomProvider struct {
 type CustomProvider struct {
-	Clientset               *kubernetes.Clientset
+	Clientset               clustercache.ClusterCache
 	Pricing                 map[string]*NodePrice
 	Pricing                 map[string]*NodePrice
 	SpotLabel               string
 	SpotLabel               string
 	SpotLabelValue          string
 	SpotLabelValue          string
@@ -50,6 +50,10 @@ func (*CustomProvider) GetManagementPlatform() (string, error) {
 	return "", nil
 	return "", nil
 }
 }
 
 
+func (*CustomProvider) ApplyReservedInstancePricing(nodes map[string]*Node) {
+
+}
+
 func (cp *CustomProvider) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 func (cp *CustomProvider) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("default.json")
 	c, err := GetDefaultPricingData("default.json")
 	if err != nil {
 	if err != nil {

+ 127 - 25
cloud/gcpprovider.go

@@ -20,13 +20,12 @@ import (
 
 
 	"cloud.google.com/go/bigquery"
 	"cloud.google.com/go/bigquery"
 	"cloud.google.com/go/compute/metadata"
 	"cloud.google.com/go/compute/metadata"
+	"github.com/kubecost/cost-model/clustercache"
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2/google"
 	"golang.org/x/oauth2/google"
 	compute "google.golang.org/api/compute/v1"
 	compute "google.golang.org/api/compute/v1"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/iterator"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/client-go/kubernetes"
 )
 )
 
 
 const GKE_GPU_TAG = "cloud.google.com/gke-accelerator"
 const GKE_GPU_TAG = "cloud.google.com/gke-accelerator"
@@ -45,12 +44,13 @@ func (t userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error)
 // GCP implements a provider interface for GCP
 // GCP implements a provider interface for GCP
 type GCP struct {
 type GCP struct {
 	Pricing                 map[string]*GCPPricing
 	Pricing                 map[string]*GCPPricing
-	Clientset               *kubernetes.Clientset
+	Clientset               clustercache.ClusterCache
 	APIKey                  string
 	APIKey                  string
 	BaseCPUPrice            string
 	BaseCPUPrice            string
 	ProjectID               string
 	ProjectID               string
 	BillingDataDataset      string
 	BillingDataDataset      string
 	DownloadPricingDataLock sync.RWMutex
 	DownloadPricingDataLock sync.RWMutex
+	ReservedInstances       []*GCPReservedInstance
 	*CustomProvider
 	*CustomProvider
 }
 }
 
 
@@ -103,12 +103,10 @@ type BigQueryConfig struct {
 }
 }
 
 
 func (gcp *GCP) GetManagementPlatform() (string, error) {
 func (gcp *GCP) GetManagementPlatform() (string, error) {
-	nodes, err := gcp.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return "", err
-	}
-	if len(nodes.Items) > 0 {
-		n := nodes.Items[0]
+	nodes := gcp.Clientset.GetAllNodes()
+
+	if len(nodes) > 0 {
+		n := nodes[0]
 		version := n.Status.NodeInfo.KubeletVersion
 		version := n.Status.NodeInfo.KubeletVersion
 		if strings.Contains(version, "gke") {
 		if strings.Contains(version, "gke") {
 			return "gke", nil
 			return "gke", nil
@@ -666,26 +664,19 @@ func (gcp *GCP) DownloadPricingData() error {
 	gcp.ProjectID = c.ProjectID
 	gcp.ProjectID = c.ProjectID
 	gcp.BillingDataDataset = c.BillingDataDataset
 	gcp.BillingDataDataset = c.BillingDataDataset
 
 
-	nodeList, err := gcp.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	nodeList := gcp.Clientset.GetAllNodes()
 	inputkeys := make(map[string]Key)
 	inputkeys := make(map[string]Key)
 
 
-	for _, n := range nodeList.Items {
+	for _, n := range nodeList {
 		labels := n.GetObjectMeta().GetLabels()
 		labels := n.GetObjectMeta().GetLabels()
 		key := gcp.GetKey(labels)
 		key := gcp.GetKey(labels)
 		inputkeys[key.Features()] = key
 		inputkeys[key.Features()] = key
 	}
 	}
 
 
-	pvList, err := gcp.Clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
-
-	storageClasses, err := gcp.Clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
+	pvList := gcp.Clientset.GetAllPersistentVolumes()
+	storageClasses := gcp.Clientset.GetAllStorageClasses()
 	storageClassMap := make(map[string]map[string]string)
 	storageClassMap := make(map[string]map[string]string)
-	for _, storageClass := range storageClasses.Items {
+	for _, storageClass := range storageClasses {
 		params := storageClass.Parameters
 		params := storageClass.Parameters
 		storageClassMap[storageClass.ObjectMeta.Name] = params
 		storageClassMap[storageClass.ObjectMeta.Name] = params
 		if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
 		if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -695,13 +686,13 @@ func (gcp *GCP) DownloadPricingData() error {
 	}
 	}
 
 
 	pvkeys := make(map[string]PVKey)
 	pvkeys := make(map[string]PVKey)
-	for _, pv := range pvList.Items {
+	for _, pv := range pvList {
 		params, ok := storageClassMap[pv.Spec.StorageClassName]
 		params, ok := storageClassMap[pv.Spec.StorageClassName]
 		if !ok {
 		if !ok {
 			klog.Infof("Unable to find params for storageClassName %s", pv.Name)
 			klog.Infof("Unable to find params for storageClassName %s", pv.Name)
 			continue
 			continue
 		}
 		}
-		key := gcp.GetPVKey(&pv, params)
+		key := gcp.GetPVKey(pv, params)
 		pvkeys[key.Features()] = key
 		pvkeys[key.Features()] = key
 	}
 	}
 
 
@@ -710,8 +701,9 @@ func (gcp *GCP) DownloadPricingData() error {
 		klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
 		klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
 	} else {
 	} else {
 		klog.V(1).Infof("Found %d reserved instances", len(reserved))
 		klog.V(1).Infof("Found %d reserved instances", len(reserved))
+		gcp.ReservedInstances = reserved
 		for _, r := range reserved {
 		for _, r := range reserved {
-			klog.V(1).Infof("Reserved: CPU: %d, RAM: %d", r.ReservedCPU, r.ReservedRAM)
+			klog.V(1).Infof("Reserved: CPU: %d, RAM: %d, Region: %s, Start: %s, End: %s", r.ReservedCPU, r.ReservedRAM, r.Region, r.StartDate.String(), r.EndDate.String())
 		}
 		}
 	}
 	}
 
 
@@ -784,6 +776,20 @@ type GCPReservedInstance struct {
 	Region      string
 	Region      string
 }
 }
 
 
+type ReservedCounter struct {
+	RemainingCPU int64
+	RemainingRAM int64
+	Instance     *GCPReservedInstance
+}
+
+func newReservedCounter(instance *GCPReservedInstance) *ReservedCounter {
+	return &ReservedCounter{
+		RemainingCPU: instance.ReservedCPU,
+		RemainingRAM: instance.ReservedRAM,
+		Instance:     instance,
+	}
+}
+
 // Two available Reservation plans for GCP, 1-year and 3-year
 // Two available Reservation plans for GCP, 1-year and 3-year
 var gcpReservedInstancePlans map[string]*GCPReservedInstancePlan = map[string]*GCPReservedInstancePlan{
 var gcpReservedInstancePlans map[string]*GCPReservedInstancePlan = map[string]*GCPReservedInstancePlan{
 	GCPReservedInstancePlanOneYear: &GCPReservedInstancePlan{
 	GCPReservedInstancePlanOneYear: &GCPReservedInstancePlan{
@@ -798,6 +804,102 @@ var gcpReservedInstancePlans map[string]*GCPReservedInstancePlan = map[string]*G
 	},
 	},
 }
 }
 
 
+func (gcp *GCP) ApplyReservedInstancePricing(nodes map[string]*Node) {
+	numReserved := len(gcp.ReservedInstances)
+
+	// Early return if no reserved instance data loaded
+	if numReserved == 0 {
+		klog.V(1).Infof("[Reserved] No Reserved Instances")
+		return
+	}
+
+	now := time.Now()
+
+	counters := make(map[string][]*ReservedCounter)
+	for _, r := range gcp.ReservedInstances {
+		if now.Before(r.StartDate) || now.After(r.EndDate) {
+			klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
+			continue
+		}
+
+		_, ok := counters[r.Region]
+		counter := newReservedCounter(r)
+		if !ok {
+			counters[r.Region] = []*ReservedCounter{counter}
+		} else {
+			counters[r.Region] = append(counters[r.Region], counter)
+		}
+	}
+
+	gcpNodes := make(map[string]*v1.Node)
+	currentNodes := gcp.Clientset.GetAllNodes()
+
+	// Create a node name -> node map
+	for _, gcpNode := range currentNodes {
+		gcpNodes[gcpNode.GetName()] = gcpNode
+	}
+
+	// go through all provider nodes using k8s nodes for region
+	for nodeName, node := range nodes {
+		kNode, ok := gcpNodes[nodeName]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
+			continue
+		}
+
+		nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find node region")
+			continue
+		}
+
+		reservedCounters, ok := counters[nodeRegion]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
+			continue
+		}
+		if node.Reserved != nil {
+			continue
+		}
+
+		node.Reserved = &ReservedInstanceData{
+			ReservedCPU: 0,
+			ReservedRAM: 0,
+		}
+
+		for _, reservedCounter := range reservedCounters {
+			if reservedCounter.RemainingCPU != 0 {
+				nodeCPU, _ := strconv.ParseInt(node.VCPU, 10, 64)
+				nodeCPU -= node.Reserved.ReservedCPU
+				node.Reserved.CPUCost = reservedCounter.Instance.Plan.CPUCost
+
+				if reservedCounter.RemainingCPU >= nodeCPU {
+					reservedCounter.RemainingCPU -= nodeCPU
+					node.Reserved.ReservedCPU += nodeCPU
+				} else {
+					node.Reserved.ReservedCPU += reservedCounter.RemainingCPU
+					reservedCounter.RemainingCPU = 0
+				}
+			}
+
+			if reservedCounter.RemainingRAM != 0 {
+				nodeRAMF, _ := strconv.ParseFloat(node.RAMBytes, 64)
+				nodeRAM := int64(nodeRAMF)
+				nodeRAM -= node.Reserved.ReservedRAM
+				node.Reserved.RAMCost = reservedCounter.Instance.Plan.RAMCost
+
+				if reservedCounter.RemainingRAM >= nodeRAM {
+					reservedCounter.RemainingRAM -= nodeRAM
+					node.Reserved.ReservedRAM += nodeRAM
+				} else {
+					node.Reserved.ReservedRAM += reservedCounter.RemainingRAM
+					reservedCounter.RemainingRAM = 0
+				}
+			}
+		}
+	}
+}
+
 func (gcp *GCP) getReservedInstances() ([]*GCPReservedInstance, error) {
 func (gcp *GCP) getReservedInstances() ([]*GCPReservedInstance, error) {
 	var results []*GCPReservedInstance
 	var results []*GCPReservedInstance
 
 
@@ -823,7 +925,7 @@ func (gcp *GCP) getReservedInstances() ([]*GCPReservedInstance, error) {
 			for _, resource := range commit.Resources {
 			for _, resource := range commit.Resources {
 				switch resource.Type {
 				switch resource.Type {
 				case GCPReservedInstanceResourceTypeRAM:
 				case GCPReservedInstanceResourceTypeRAM:
-					ram = resource.Amount
+					ram = resource.Amount * 1024 * 1024
 				case GCPReservedInstanceResourceTypeCPU:
 				case GCPReservedInstanceResourceTypeCPU:
 					vcpu = resource.Amount
 					vcpu = resource.Amount
 				default:
 				default:

+ 37 - 27
cloud/provider.go

@@ -15,10 +15,9 @@ import (
 	"k8s.io/klog"
 	"k8s.io/klog"
 
 
 	"cloud.google.com/go/compute/metadata"
 	"cloud.google.com/go/compute/metadata"
+	"github.com/kubecost/cost-model/clustercache"
 
 
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/client-go/kubernetes"
 )
 )
 
 
 const clusterIDKey = "CLUSTER_ID"
 const clusterIDKey = "CLUSTER_ID"
@@ -34,25 +33,35 @@ var createTableStatements = []string{
 	);`,
 	);`,
 }
 }
 
 
+// ReservedInstanceData keeps record of resources on a node should be
+// priced at reserved rates
+type ReservedInstanceData struct {
+	ReservedCPU int64   `json:"reservedCPU"`
+	ReservedRAM int64   `json:"reservedRAM"`
+	CPUCost     float64 `json:"CPUHourlyCost"`
+	RAMCost     float64 `json:"RAMHourlyCost"`
+}
+
 // Node is the interface by which the provider and cost model communicate Node prices.
 // Node is the interface by which the provider and cost model communicate Node prices.
 // The provider will best-effort try to fill out this struct.
 // The provider will best-effort try to fill out this struct.
 type Node struct {
 type Node struct {
-	Cost             string `json:"hourlyCost"`
-	VCPU             string `json:"CPU"`
-	VCPUCost         string `json:"CPUHourlyCost"`
-	RAM              string `json:"RAM"`
-	RAMBytes         string `json:"RAMBytes"`
-	RAMCost          string `json:"RAMGBHourlyCost"`
-	Storage          string `json:"storage"`
-	StorageCost      string `json:"storageHourlyCost"`
-	UsesBaseCPUPrice bool   `json:"usesDefaultPrice"`
-	BaseCPUPrice     string `json:"baseCPUPrice"` // Used to compute an implicit RAM GB/Hr price when RAM pricing is not provided.
-	BaseRAMPrice     string `json:"baseRAMPrice"` // Used to compute an implicit RAM GB/Hr price when RAM pricing is not provided.
-	BaseGPUPrice     string `json:"baseGPUPrice"`
-	UsageType        string `json:"usageType"`
-	GPU              string `json:"gpu"` // GPU represents the number of GPU on the instance
-	GPUName          string `json:"gpuName"`
-	GPUCost          string `json:"gpuCost"`
+	Cost             string                `json:"hourlyCost"`
+	VCPU             string                `json:"CPU"`
+	VCPUCost         string                `json:"CPUHourlyCost"`
+	RAM              string                `json:"RAM"`
+	RAMBytes         string                `json:"RAMBytes"`
+	RAMCost          string                `json:"RAMGBHourlyCost"`
+	Storage          string                `json:"storage"`
+	StorageCost      string                `json:"storageHourlyCost"`
+	UsesBaseCPUPrice bool                  `json:"usesDefaultPrice"`
+	BaseCPUPrice     string                `json:"baseCPUPrice"` // Used to compute an implicit RAM GB/Hr price when RAM pricing is not provided.
+	BaseRAMPrice     string                `json:"baseRAMPrice"` // Used to compute an implicit RAM GB/Hr price when RAM pricing is not provided.
+	BaseGPUPrice     string                `json:"baseGPUPrice"`
+	UsageType        string                `json:"usageType"`
+	GPU              string                `json:"gpu"` // GPU represents the number of GPU on the instance
+	GPUName          string                `json:"gpuName"`
+	GPUCost          string                `json:"gpuCost"`
+	Reserved         *ReservedInstanceData `json:"reserved,omitempty"`
 }
 }
 
 
 // IsSpot determines whether or not a Node uses spot by usage type
 // IsSpot determines whether or not a Node uses spot by usage type
@@ -156,6 +165,7 @@ type Provider interface {
 	GetManagementPlatform() (string, error)
 	GetManagementPlatform() (string, error)
 	GetLocalStorageQuery(offset string) (string, error)
 	GetLocalStorageQuery(offset string) (string, error)
 	ExternalAllocations(string, string, string) ([]*OutOfClusterAllocation, error)
 	ExternalAllocations(string, string, string) ([]*OutOfClusterAllocation, error)
+	ApplyReservedInstancePricing(map[string]*Node)
 }
 }
 
 
 // ClusterName returns the name defined in cluster info, defaulting to the
 // ClusterName returns the name defined in cluster info, defaulting to the
@@ -261,38 +271,38 @@ func SetCustomPricingField(obj *CustomPricing, name string, value string) error
 }
 }
 
 
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
-func NewProvider(clientset *kubernetes.Clientset, apiKey string) (Provider, error) {
+func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, error) {
 	if metadata.OnGCE() {
 	if metadata.OnGCE() {
 		klog.V(3).Info("metadata reports we are in GCE")
 		klog.V(3).Info("metadata reports we are in GCE")
 		if apiKey == "" {
 		if apiKey == "" {
 			return nil, errors.New("Supply a GCP Key to start getting data")
 			return nil, errors.New("Supply a GCP Key to start getting data")
 		}
 		}
 		return &GCP{
 		return &GCP{
-			Clientset: clientset,
+			Clientset: cache,
 			APIKey:    apiKey,
 			APIKey:    apiKey,
 		}, nil
 		}, nil
 	}
 	}
 
 
-	nodes, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
+	nodes := cache.GetAllNodes()
+	if len(nodes) == 0 {
+		return nil, fmt.Errorf("Could not locate any nodes for cluster.")
 	}
 	}
 
 
-	provider := strings.ToLower(nodes.Items[0].Spec.ProviderID)
+	provider := strings.ToLower(nodes[0].Spec.ProviderID)
 	if strings.HasPrefix(provider, "aws") {
 	if strings.HasPrefix(provider, "aws") {
 		klog.V(2).Info("Found ProviderID starting with \"aws\", using AWS Provider")
 		klog.V(2).Info("Found ProviderID starting with \"aws\", using AWS Provider")
 		return &AWS{
 		return &AWS{
-			Clientset: clientset,
+			Clientset: cache,
 		}, nil
 		}, nil
 	} else if strings.HasPrefix(provider, "azure") {
 	} else if strings.HasPrefix(provider, "azure") {
 		klog.V(2).Info("Found ProviderID starting with \"azure\", using Azure Provider")
 		klog.V(2).Info("Found ProviderID starting with \"azure\", using Azure Provider")
 		return &Azure{
 		return &Azure{
-			Clientset: clientset,
+			Clientset: cache,
 		}, nil
 		}, nil
 	} else {
 	} else {
 		klog.V(2).Info("Unsupported provider, falling back to default")
 		klog.V(2).Info("Unsupported provider, falling back to default")
 		return &CustomProvider{
 		return &CustomProvider{
-			Clientset: clientset,
+			Clientset: cache,
 		}, nil
 		}, nil
 	}
 	}
 }
 }

+ 31 - 3
costmodel/clustercache.go → clustercache/clustercache.go

@@ -1,4 +1,4 @@
-package costmodel
+package clustercache
 
 
 import (
 import (
 	"sync"
 	"sync"
@@ -14,7 +14,14 @@ import (
 // up to date resources using watchers
 // up to date resources using watchers
 type ClusterCache interface {
 type ClusterCache interface {
 	// Run starts the watcher processes
 	// Run starts the watcher processes
-	Run(stopCh chan struct{})
+	Run()
+
+	// Stops the watcher processes
+	Stop()
+
+	// Gets the underlying clientset
+	// TODO: Remove once we support all cached cluster components
+	GetClient() kubernetes.Interface
 
 
 	// GetAllNamespaces returns all the cached namespaces
 	// GetAllNamespaces returns all the cached namespaces
 	GetAllNamespaces() []*v1.Namespace
 	GetAllNamespaces() []*v1.Namespace
@@ -49,6 +56,7 @@ type KubernetesClusterCache struct {
 	deploymentsWatch  WatchController
 	deploymentsWatch  WatchController
 	pvWatch           WatchController
 	pvWatch           WatchController
 	storageClassWatch WatchController
 	storageClassWatch WatchController
+	stop              chan struct{}
 }
 }
 
 
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
@@ -91,7 +99,12 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	return kcc
 	return kcc
 }
 }
 
 
-func (kcc *KubernetesClusterCache) Run(stopCh chan struct{}) {
+func (kcc *KubernetesClusterCache) Run() {
+	if kcc.stop != nil {
+		return
+	}
+	stopCh := make(chan struct{})
+
 	go kcc.namespaceWatch.Run(1, stopCh)
 	go kcc.namespaceWatch.Run(1, stopCh)
 	go kcc.nodeWatch.Run(1, stopCh)
 	go kcc.nodeWatch.Run(1, stopCh)
 	go kcc.podWatch.Run(1, stopCh)
 	go kcc.podWatch.Run(1, stopCh)
@@ -99,6 +112,21 @@ func (kcc *KubernetesClusterCache) Run(stopCh chan struct{}) {
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
+
+	kcc.stop = stopCh
+}
+
+func (kcc *KubernetesClusterCache) Stop() {
+	if kcc.stop == nil {
+		return
+	}
+
+	close(kcc.stop)
+	kcc.stop = nil
+}
+
+func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
+	return kcc.client
 }
 }
 
 
 func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
 func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {

+ 1 - 1
costmodel/watchcontroller.go → clustercache/watchcontroller.go

@@ -1,4 +1,4 @@
-package costmodel
+package clustercache
 
 
 import (
 import (
 	"fmt"
 	"fmt"

+ 10 - 14
costmodel/costmodel.go

@@ -14,6 +14,7 @@ import (
 	"time"
 	"time"
 
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
+	"github.com/kubecost/cost-model/clustercache"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -48,24 +49,17 @@ const (
 )
 )
 
 
 type CostModel struct {
 type CostModel struct {
-	Cache        ClusterCache
+	Cache        clustercache.ClusterCache
 	RequestGroup *singleflight.Group
 	RequestGroup *singleflight.Group
-
-	stop chan struct{}
 }
 }
 
 
-func NewCostModel(client kubernetes.Interface) *CostModel {
+func NewCostModel(cache clustercache.ClusterCache) *CostModel {
 	// request grouping to prevent over-requesting the same data prior to caching
 	// request grouping to prevent over-requesting the same data prior to caching
 	requestGroup := new(singleflight.Group)
 	requestGroup := new(singleflight.Group)
 
 
-	stopCh := make(chan struct{})
-	cache := NewKubernetesClusterCache(client)
-	cache.Run(stopCh)
-
 	return &CostModel{
 	return &CostModel{
 		Cache:        cache,
 		Cache:        cache,
 		RequestGroup: requestGroup,
 		RequestGroup: requestGroup,
-		stop:         stopCh,
 	}
 	}
 }
 }
 
 
@@ -907,7 +901,7 @@ func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
 	return allocation
 	return allocation
 }
 }
 
 
-func addPVData(cache ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
+func addPVData(cache clustercache.ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
 	cfg, err := cloud.GetConfig()
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -974,7 +968,7 @@ func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cp costAnalyz
 	return nil
 	return nil
 }
 }
 
 
-func getNodeCost(cache ClusterCache, cp costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
+func getNodeCost(cache clustercache.ClusterCache, cp costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
 	cfg, err := cp.GetConfig()
 	cfg, err := cp.GetConfig()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -1110,10 +1104,12 @@ func getNodeCost(cache ClusterCache, cp costAnalyzerCloud.Provider) (map[string]
 		nodes[name] = &newCnode
 		nodes[name] = &newCnode
 	}
 	}
 
 
+	cp.ApplyReservedInstancePricing(nodes)
+
 	return nodes, nil
 	return nodes, nil
 }
 }
 
 
-func getPodServices(cache ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
+func getPodServices(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 	servicesList := cache.GetAllServices()
 	servicesList := cache.GetAllServices()
 	podServicesMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	for _, service := range servicesList {
 	for _, service := range servicesList {
@@ -1139,7 +1135,7 @@ func getPodServices(cache ClusterCache, podList []*v1.Pod, clusterID string) (ma
 	return podServicesMapping, nil
 	return podServicesMapping, nil
 }
 }
 
 
-func getPodDeployments(cache ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
+func getPodDeployments(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 	deploymentsList := cache.GetAllDeployments()
 	deploymentsList := cache.GetAllDeployments()
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
 	for _, deployment := range deploymentsList {
 	for _, deployment := range deploymentsList {
@@ -2078,7 +2074,7 @@ func appendLabelsList(mainLabels map[string]map[string][]string, labels map[stri
 	}
 	}
 }
 }
 
 
-func getNamespaceLabels(cache ClusterCache, clusterID string) (map[string]map[string]string, error) {
+func getNamespaceLabels(cache clustercache.ClusterCache, clusterID string) (map[string]map[string]string, error) {
 	nsToLabels := make(map[string]map[string]string)
 	nsToLabels := make(map[string]map[string]string)
 	nss := cache.GetAllNamespaces()
 	nss := cache.GetAllNamespaces()
 	for _, ns := range nss {
 	for _, ns := range nss {

+ 7 - 2
costmodel/router.go

@@ -15,6 +15,7 @@ import (
 
 
 	"k8s.io/klog"
 	"k8s.io/klog"
 
 
+	"github.com/kubecost/cost-model/clustercache"
 	"github.com/julienschmidt/httprouter"
 	"github.com/julienschmidt/httprouter"
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	"github.com/patrickmn/go-cache"
 	"github.com/patrickmn/go-cache"
@@ -820,8 +821,12 @@ func Initialize() {
 		panic(err.Error())
 		panic(err.Error())
 	}
 	}
 
 
+	// Create Kubernetes Cluster Cache + Watchers
+	k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
+	k8sCache.Run()
+
 	cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
 	cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
-	cloudProvider, err := costAnalyzerCloud.NewProvider(kubeClientset, cloudProviderKey)
+	cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
 	if err != nil {
 	if err != nil {
 		panic(err.Error())
 		panic(err.Error())
 	}
 	}
@@ -927,7 +932,7 @@ func Initialize() {
 		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
 		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
 		PersistentVolumePriceRecorder: pvGv,
-		Model:                         NewCostModel(kubeClientset),
+		Model:                         NewCostModel(k8sCache),
 		CostDataCache:                 costDataCache,
 		CostDataCache:                 costDataCache,
 		OutOfClusterCache:             outOfClusterCache,
 		OutOfClusterCache:             outOfClusterCache,
 		SettingsCache:                 settingsCache,
 		SettingsCache:                 settingsCache,