Explorar o código

Merge branch 'master' of github.com:kubecost/cost-model into AjayTripathy-pv

AjayTripathy %!s(int64=6) %!d(string=hai) anos
pai
achega
4192feb752
Modificáronse 7 ficheiros con 519 adicións e 304 borrados
  1. 15 15
      costmodel/aggregations.go
  2. 165 0
      costmodel/clustercache.go
  3. 0 179
      costmodel/containeruptime.go
  4. 48 92
      costmodel/costmodel.go
  5. 55 2
      costmodel/sql.go
  6. 201 0
      costmodel/watchcontroller.go
  7. 35 16
      main.go

+ 15 - 15
costmodel/aggregations.go

@@ -26,25 +26,25 @@ type Aggregation struct {
 	TotalCost          float64   `json:"totalCost"`
 }
 
-func AggregateCostModel(costData map[string]*CostData, aggregationField string, aggregationSubField string) map[string]*Aggregation {
+func AggregateCostModel(costData map[string]*CostData, discount float64, aggregationField string, aggregationSubField string) map[string]*Aggregation {
 	aggregations := make(map[string]*Aggregation)
 	for _, costDatum := range costData {
 		if aggregationField == "cluster" {
-			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations)
+			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations, discount)
 		} else if aggregationField == "namespace" {
-			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations)
+			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations, discount)
 		} else if aggregationField == "service" {
 			if len(costDatum.Services) > 0 {
-				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations)
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations, discount)
 			}
 		} else if aggregationField == "deployment" {
 			if len(costDatum.Deployments) > 0 {
-				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations)
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations, discount)
 			}
 		} else if aggregationField == "label" {
 			if costDatum.Labels != nil {
 				if subfieldName, ok := costDatum.Labels[aggregationSubField]; ok {
-					aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations)
+					aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations, discount)
 				}
 			}
 		}
@@ -59,7 +59,7 @@ func AggregateCostModel(costData map[string]*CostData, aggregationField string,
 	return aggregations
 }
 
-func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation) {
+func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation, discount float64) {
 	if _, ok := aggregations[key]; !ok {
 		agg := &Aggregation{}
 		agg.Aggregator = aggregator
@@ -68,15 +68,15 @@ func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubFiel
 		agg.Cluster = costDatum.ClusterID
 		aggregations[key] = agg
 	}
-	mergeVectors(costDatum, aggregations[key])
+	mergeVectors(costDatum, aggregations[key], discount)
 }
 
-func mergeVectors(costDatum *CostData, aggregation *Aggregation) {
+func mergeVectors(costDatum *CostData, aggregation *Aggregation, discount float64) {
 	aggregation.CPUAllocation = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocation)
 	aggregation.RAMAllocation = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocation)
 	aggregation.GPUAllocation = addVectors(costDatum.GPUReq, aggregation.GPUAllocation)
 
-	cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum)
+	cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum, discount)
 	aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
 	aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
 	aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
@@ -85,13 +85,13 @@ func mergeVectors(costDatum *CostData, aggregation *Aggregation) {
 	}
 }
 
-func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
+func getPriceVectors(costDatum *CostData, discount float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
 	cpuv := make([]*Vector, 0, len(costDatum.CPUAllocation))
 	for _, val := range costDatum.CPUAllocation {
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.VCPUCost, 64)
 		cpuv = append(cpuv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value * cost,
+			Value:     val.Value * cost * (1 - discount),
 		})
 	}
 	ramv := make([]*Vector, 0, len(costDatum.RAMAllocation))
@@ -99,7 +99,7 @@ func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.RAMCost, 64)
 		ramv = append(ramv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     (val.Value / 1024 / 1024 / 1024) * cost,
+			Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount),
 		})
 	}
 	gpuv := make([]*Vector, 0, len(costDatum.GPUReq))
@@ -107,7 +107,7 @@ func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.GPUCost, 64)
 		gpuv = append(gpuv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value * cost,
+			Value:     val.Value * cost * (1 - discount),
 		})
 	}
 	pvvs := make([][]*Vector, 0, len(costDatum.PVCData))
@@ -118,7 +118,7 @@ func getPriceVectors(costDatum *CostData) ([]*Vector, []*Vector, []*Vector, [][]
 			for _, val := range pvcData.Values {
 				pvv = append(pvv, &Vector{
 					Timestamp: math.Round(val.Timestamp/10) * 10,
-					Value:     (val.Value / 1024 / 1024 / 1024) * cost,
+					Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount),
 				})
 			}
 			pvvs = append(pvvs, pvv)

+ 165 - 0
costmodel/clustercache.go

@@ -0,0 +1,165 @@
+package costmodel
+
+import (
+	"sync"
+
+	appsv1 "k8s.io/api/apps/v1"
+	v1 "k8s.io/api/core/v1"
+	stv1 "k8s.io/api/storage/v1"
+	"k8s.io/apimachinery/pkg/fields"
+	"k8s.io/client-go/kubernetes"
+)
+
+// ClusterCache defines an contract for an object which caches components within a cluster, ensuring
+// up to date resources using watchers
+type ClusterCache interface {
+	// Run starts the watcher processes
+	Run(stopCh chan struct{})
+
+	// GetAllNamespaces returns all the cached namespaces
+	GetAllNamespaces() []*v1.Namespace
+
+	// GetAllNodes returns all the cached nodes
+	GetAllNodes() []*v1.Node
+
+	// GetAllPods returns all the cached pods
+	GetAllPods() []*v1.Pod
+
+	// GetAllServices returns all the cached services
+	GetAllServices() []*v1.Service
+
+	// GetAllDeployments returns all the cached deployments
+	GetAllDeployments() []*appsv1.Deployment
+
+	// GetAllPersistentVolumes returns all the cached persistent volumes
+	GetAllPersistentVolumes() []*v1.PersistentVolume
+
+	// GetAllStorageClasses returns all the cached storage classes
+	GetAllStorageClasses() []*stv1.StorageClass
+}
+
+// KubernetesClusterCache is the implementation of ClusterCache
+type KubernetesClusterCache struct {
+	client kubernetes.Interface
+
+	namespaceWatch    WatchController
+	nodeWatch         WatchController
+	podWatch          WatchController
+	serviceWatch      WatchController
+	deploymentsWatch  WatchController
+	pvWatch           WatchController
+	storageClassWatch WatchController
+}
+
+func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
+	defer wg.Done()
+	wc.WarmUp(cancel)
+}
+
+func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
+	coreRestClient := client.CoreV1().RESTClient()
+	appsRestClient := client.AppsV1().RESTClient()
+	storageRestClient := client.StorageV1().RESTClient()
+
+	kcc := &KubernetesClusterCache{
+		client:            client,
+		namespaceWatch:    NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
+		nodeWatch:         NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
+		podWatch:          NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
+		serviceWatch:      NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		deploymentsWatch:  NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		pvWatch:           NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+	}
+
+	// Wait for each caching watcher to initialize
+	var wg sync.WaitGroup
+	wg.Add(7)
+
+	cancel := make(chan struct{})
+
+	go initializeCache(kcc.namespaceWatch, &wg, cancel)
+	go initializeCache(kcc.nodeWatch, &wg, cancel)
+	go initializeCache(kcc.podWatch, &wg, cancel)
+	go initializeCache(kcc.serviceWatch, &wg, cancel)
+	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
+	go initializeCache(kcc.pvWatch, &wg, cancel)
+	go initializeCache(kcc.storageClassWatch, &wg, cancel)
+
+	wg.Wait()
+
+	return kcc
+}
+
+func (kcc *KubernetesClusterCache) Run(stopCh chan struct{}) {
+	go kcc.namespaceWatch.Run(1, stopCh)
+	go kcc.nodeWatch.Run(1, stopCh)
+	go kcc.podWatch.Run(1, stopCh)
+	go kcc.serviceWatch.Run(1, stopCh)
+	go kcc.deploymentsWatch.Run(1, stopCh)
+	go kcc.pvWatch.Run(1, stopCh)
+	go kcc.storageClassWatch.Run(1, stopCh)
+}
+
+func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
+	var namespaces []*v1.Namespace
+	items := kcc.namespaceWatch.GetAll()
+	for _, ns := range items {
+		namespaces = append(namespaces, ns.(*v1.Namespace))
+	}
+	return namespaces
+}
+
+func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
+	var nodes []*v1.Node
+	items := kcc.nodeWatch.GetAll()
+	for _, node := range items {
+		nodes = append(nodes, node.(*v1.Node))
+	}
+	return nodes
+}
+
+func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
+	var pods []*v1.Pod
+	items := kcc.podWatch.GetAll()
+	for _, pod := range items {
+		pods = append(pods, pod.(*v1.Pod))
+	}
+	return pods
+}
+
+func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
+	var services []*v1.Service
+	items := kcc.serviceWatch.GetAll()
+	for _, service := range items {
+		services = append(services, service.(*v1.Service))
+	}
+	return services
+}
+
+func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
+	var deployments []*appsv1.Deployment
+	items := kcc.deploymentsWatch.GetAll()
+	for _, deployment := range items {
+		deployments = append(deployments, deployment.(*appsv1.Deployment))
+	}
+	return deployments
+}
+
+func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
+	var pvs []*v1.PersistentVolume
+	items := kcc.pvWatch.GetAll()
+	for _, pv := range items {
+		pvs = append(pvs, pv.(*v1.PersistentVolume))
+	}
+	return pvs
+}
+
+func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
+	var storageClasses []*stv1.StorageClass
+	items := kcc.storageClassWatch.GetAll()
+	for _, stc := range items {
+		storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
+	}
+	return storageClasses
+}

+ 0 - 179
costmodel/containeruptime.go

@@ -1,179 +0,0 @@
-package costmodel
-
-import (
-	"fmt"
-	"time"
-
-	"k8s.io/klog"
-
-	v1 "k8s.io/api/core/v1"
-	"k8s.io/apimachinery/pkg/fields"
-	"k8s.io/apimachinery/pkg/util/runtime"
-	"k8s.io/apimachinery/pkg/util/wait"
-	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/tools/cache"
-	"k8s.io/client-go/util/workqueue"
-)
-
-type Controller struct {
-	indexer  cache.Indexer
-	queue    workqueue.RateLimitingInterface
-	informer cache.Controller
-}
-
-func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
-	return &Controller{
-		informer: informer,
-		indexer:  indexer,
-		queue:    queue,
-	}
-}
-
-func (c *Controller) processNextItem() bool {
-	// Wait until there is a new item in the working queue
-	key, quit := c.queue.Get()
-	if quit {
-		return false
-	}
-	// Tell the queue that we are done with processing this key. This unblocks the key for other workers
-	// This allows safe parallel processing because two pods with the same key are never processed in
-	// parallel.
-	defer c.queue.Done(key)
-
-	// Invoke the method containing the business logic
-	err := c.syncToPrometheus(key.(string))
-	// Handle the error if something went wrong during the execution of the business logic
-	c.handleErr(err, key)
-	return true
-}
-
-// syncToPrometheus is the business logic of the controller. In this controller it simply prints
-// information about the pod to stdout. In case an error happened, it has to simply return the error.
-// The retry logic should not be part of the business logic.
-func (c *Controller) syncToPrometheus(key string) error {
-	obj, exists, err := c.indexer.GetByKey(key)
-	if err != nil {
-		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
-		return err
-	}
-
-	if !exists {
-		// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
-		klog.V(1).Infof("Pod %s does not exist anymore\n", key)
-	} else {
-		// Note that you also have to check the uid if you have a local controlled resource, which
-		// is dependent on the actual instance, to detect that a Pod was recreated with the same name
-		klog.V(1).Infof("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
-	}
-	return nil
-}
-
-func (c *Controller) GetAll() []*v1.Pod {
-	objs := c.indexer.List()
-	var pods []*v1.Pod
-	for _, obj := range objs {
-		pods = append(pods, obj.(*v1.Pod))
-	}
-	return pods
-}
-
-// handleErr checks if an error happened and makes sure we will retry later.
-func (c *Controller) handleErr(err error, key interface{}) {
-	if err == nil {
-		// Forget about the #AddRateLimited history of the key on every successful synchronization.
-		// This ensures that future processing of updates for this key is not delayed because of
-		// an outdated error history.
-		c.queue.Forget(key)
-		return
-	}
-
-	// This controller retries 5 times if something goes wrong. After that, it stops trying.
-	if c.queue.NumRequeues(key) < 5 {
-		klog.Infof("Error syncing pod %v: %v", key, err)
-
-		// Re-enqueue the key rate limited. Based on the rate limiter on the
-		// queue and the re-enqueue history, the key will be processed later again.
-		c.queue.AddRateLimited(key)
-		return
-	}
-
-	c.queue.Forget(key)
-	// Report to an external entity that, even after several retries, we could not successfully process this key
-	runtime.HandleError(err)
-	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
-}
-
-func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
-	defer runtime.HandleCrash()
-
-	// Let the workers stop when we are done
-	defer c.queue.ShutDown()
-	klog.Info("Starting Pod controller")
-
-	go c.informer.Run(stopCh)
-
-	// Wait for all involved caches to be synced, before processing items from the queue is started
-	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
-		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
-		return
-	}
-
-	for i := 0; i < threadiness; i++ {
-		go wait.Until(c.runWorker, time.Second, stopCh)
-	}
-
-	<-stopCh
-	klog.Info("Stopping Pod controller")
-}
-
-func (c *Controller) runWorker() {
-	for c.processNextItem() {
-	}
-}
-
-func ContainerUptimeWatcher(clientset kubernetes.Interface) {
-	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
-
-	// create the workqueue
-	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
-
-	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
-	// whenever the cache is updated, the pod key is added to the workqueue.
-	// Note that when we finally process the item from the workqueue, we might see a newer version
-	// of the Pod than the version which was responsible for triggering the update.
-	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		UpdateFunc: func(old interface{}, new interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(new)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
-			// key function.
-			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-	}, cache.Indexers{})
-
-	controller := NewController(queue, indexer, informer)
-
-	/*
-		podList, _ := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
-		for _, pod := range podList.Items {
-			indexer.Add(&pod)
-		}
-	*/
-	// Now let's start the controller
-	stop := make(chan struct{})
-	//defer close(stop)
-	go controller.Run(1, stop)
-}

+ 48 - 92
costmodel/costmodel.go

@@ -19,8 +19,6 @@ import (
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/tools/cache"
-	"k8s.io/client-go/util/workqueue"
 	"k8s.io/klog"
 )
 
@@ -41,55 +39,24 @@ const (
 	epConfig          = apiPrefix + "/status/config"
 	epFlags           = apiPrefix + "/status/flags"
 	remoteEnabled     = "REMOTE_WRITE_ENABLED"
+	CLUSTER_ID        = "CLUSTER_ID"
 )
 
 type CostModel struct {
-	Controller *Controller
-}
+	Cache ClusterCache
 
-func NewCostModel(podListWatcher cache.ListerWatcher) *CostModel {
-	cm := &CostModel{}
-	cm.ContainerWatcher(podListWatcher)
-	return cm
+	stop chan struct{}
 }
 
-func (cm *CostModel) ContainerWatcher(podListWatcher cache.ListerWatcher) {
-
-	// create the workqueue
-	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+func NewCostModel(client kubernetes.Interface) *CostModel {
+	stopCh := make(chan struct{})
+	cache := NewKubernetesClusterCache(client)
+	cache.Run(stopCh)
 
-	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
-	// whenever the cache is updated, the pod key is added to the workqueue.
-	// Note that when we finally process the item from the workqueue, we might see a newer version
-	// of the Pod than the version which was responsible for triggering the update.
-	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		UpdateFunc: func(old interface{}, new interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(new)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
-			// key function.
-			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-	}, cache.Indexers{})
-
-	cm.Controller = NewController(queue, indexer, informer)
-	// Now let's start the controller
-	stop := make(chan struct{})
-	//defer close(stop)
-	go cm.Controller.Run(1, stop)
+	return &CostModel{
+		Cache: cache,
+		stop:  stopCh,
+	}
 }
 
 type CostData struct {
@@ -291,6 +258,8 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
 	normalization := fmt.Sprintf(normalizationStr, window, offset)
 
+	clustID := os.Getenv(CLUSTER_ID)
+
 	var wg sync.WaitGroup
 	wg.Add(8)
 
@@ -334,21 +303,21 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
-	podlist := cm.Controller.GetAll()
+	podlist := cm.Cache.GetAllPods()
 	var k8sErr error
 	go func() {
 		defer wg.Done()
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache)
 		if k8sErr != nil {
 			return
 		}
@@ -369,7 +338,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(clientset, cloud)
+	nodes, err := getNodeCost(cm.Cache, cloud)
 	if err != nil {
 		klog.V(1).Infof("Warning, no Node cost model available: " + err.Error())
 		return nil, err
@@ -380,7 +349,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(clientset, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cloud)
 		if err != nil {
 			return nil, err
 		}
@@ -551,6 +520,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 					PVCData:         pvReq,
 					Labels:          podLabels,
 					NamespaceLabels: nsLabels,
+					ClusterID:       clustID,
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -619,6 +589,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
+				ClusterID:       clustID,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -842,17 +813,14 @@ func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
 
 	return allocation
 }
-func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
+func addPVData(cache ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 		return err
 	}
-	storageClasses, err := clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	storageClasses := cache.GetAllStorageClasses()
 	storageClassMap := make(map[string]map[string]string)
-	for _, storageClass := range storageClasses.Items {
+	for _, storageClass := range storageClasses {
 		params := storageClass.Parameters
 		storageClassMap[storageClass.ObjectMeta.Name] = params
 		if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -861,12 +829,9 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 		}
 	}
 
-	pvs, err := clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	pvs := cache.GetAllPersistentVolumes()
 	pvMap := make(map[string]*costAnalyzerCloud.PV)
-	for _, pv := range pvs.Items {
+	for _, pv := range pvs {
 		parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 		if !ok {
 			klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
@@ -876,7 +841,7 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 			Region:     pv.Labels[v1.LabelZoneRegion],
 			Parameters: parameters,
 		}
-		err := GetPVCost(cacPv, &pv, cloud)
+		err := GetPVCost(cacPv, pv, cloud)
 		if err != nil {
 			return err
 		}
@@ -915,17 +880,14 @@ func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cloud costAna
 	return nil
 }
 
-func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
+func getNodeCost(cache ClusterCache, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 		return nil, err
 	}
-	nodeList, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+	nodeList := cache.GetAllNodes()
 	nodes := make(map[string]*costAnalyzerCloud.Node)
-	for _, n := range nodeList.Items {
+	for _, n := range nodeList {
 		name := n.GetObjectMeta().GetName()
 		nodeLabels := n.GetObjectMeta().GetLabels()
 		nodeLabels["providerID"] = n.Spec.ProviderID
@@ -1045,13 +1007,10 @@ func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provide
 	return nodes, nil
 }
 
-func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
-	servicesList, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+func getPodServices(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+	servicesList := cache.GetAllServices()
 	podServicesMapping := make(map[string]map[string][]string)
-	for _, service := range servicesList.Items {
+	for _, service := range servicesList {
 		namespace := service.GetObjectMeta().GetNamespace()
 		name := service.GetObjectMeta().GetName()
 
@@ -1074,13 +1033,10 @@ func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[stri
 	return podServicesMapping, nil
 }
 
-func getPodDeployments(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
-	deploymentsList, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+	deploymentsList := cache.GetAllDeployments()
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
-	for _, deployment := range deploymentsList.Items {
+	for _, deployment := range deploymentsList {
 		namespace := deployment.GetObjectMeta().GetNamespace()
 		name := deployment.GetObjectMeta().GetName()
 		if _, ok := podDeploymentsMapping[namespace]; !ok {
@@ -1132,6 +1088,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
+	clustID := os.Getenv(CLUSTER_ID)
 	remoteEnabled := os.Getenv(remoteEnabled)
 	if remoteEnabled == "true" {
 		remoteLayout := "2006-01-02T15:04:05Z"
@@ -1184,20 +1141,20 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
-	podlist := cm.Controller.GetAll()
+	podlist := cm.Cache.GetAllPods()
 	var k8sErr error
 	go func() {
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache)
 		if k8sErr != nil {
 			return
 		}
@@ -1219,7 +1176,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(clientset, cloud)
+	nodes, err := getNodeCost(cm.Cache, cloud)
 	if err != nil {
 		klog.V(1).Infof("Warning, no cost model available: " + err.Error())
 		return nil, err
@@ -1231,7 +1188,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(clientset, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cloud)
 		if err != nil {
 			return nil, err
 		}
@@ -1402,6 +1359,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 					PVCData:         pvReq,
 					Labels:          podLabels,
 					NamespaceLabels: nsLabels,
+					ClusterID:       clustID,
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -1468,6 +1426,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
+				ClusterID:       clustID,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -1499,13 +1458,10 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return containerNameCost, err
 }
 
-func getNamespaceLabels(clientset kubernetes.Interface) (map[string]map[string]string, error) {
+func getNamespaceLabels(cache ClusterCache) (map[string]map[string]string, error) {
 	nsToLabels := make(map[string]map[string]string)
-	nss, err := clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
-	for _, ns := range nss.Items {
+	nss := cache.GetAllNamespaces()
+	for _, ns := range nss {
 		nsToLabels[ns.Name] = ns.Labels
 	}
 	return nsToLabels, nil

+ 55 - 2
costmodel/sql.go

@@ -2,10 +2,13 @@ package costmodel
 
 import (
 	"database/sql"
+	"encoding/json"
 	"fmt"
 	"os"
 	"time"
 
+	"k8s.io/klog"
+
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	_ "github.com/lib/pq"
 )
@@ -159,8 +162,6 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 	if err != nil {
 		return nil, err
 	}
-	defer rows.Close()
-
 	for rows.Next() {
 		var (
 			bucket    string
@@ -221,5 +222,57 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 			}
 		}
 	}
+	query = `SELECT DISTINCT ON (labels->>'namespace') * FROM METRICS WHERE name='kube_namespace_labels' ORDER BY labels->>'namespace',time DESC;`
+	rows, err = db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	cols, err := rows.Columns()
+	if err != nil {
+		return nil, err
+	}
+	rawResult := make([][]byte, len(cols))
+	result := make([]string, len(cols))
+	dest := make([]interface{}, len(cols)) // A temporary interface{} slice
+	for i, _ := range rawResult {
+		dest[i] = &rawResult[i] // Put pointers to each string in the interface slice
+	}
+	nsToLabels := make(map[string]map[string]string)
+	for rows.Next() {
+		err = rows.Scan(dest...)
+		if err != nil {
+			return nil, err
+		}
+
+		for i, raw := range rawResult {
+			if raw == nil {
+				result[i] = "\\N"
+			} else {
+				result[i] = string(raw)
+			}
+		}
+
+		klog.Infof("%#v\n", result)
+		var dat map[string]string
+		err := json.Unmarshal([]byte(result[4]), &dat)
+		if err != nil {
+			return nil, err
+		}
+
+		ns, ok := dat["namespace"]
+		if !ok {
+			return nil, fmt.Errorf("No namespace found")
+		}
+		nsToLabels[ns] = dat
+	}
+
+	for _, cd := range model {
+		ns := cd.Namespace
+		if labels, ok := nsToLabels[ns]; ok {
+			cd.NamespaceLabels = labels
+			cd.Labels = labels // TODO: override with podlabels
+		}
+	}
+
 	return model, nil
 }

+ 201 - 0
costmodel/watchcontroller.go

@@ -0,0 +1,201 @@
+package costmodel
+
+import (
+	"fmt"
+	"reflect"
+	"time"
+
+	"k8s.io/klog"
+
+	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/fields"
+	rt "k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/util/runtime"
+	"k8s.io/apimachinery/pkg/util/wait"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/util/workqueue"
+)
+
+// Type alias for a receiver func
+type WatchHandler = func(interface{})
+
+// WatchController defines a contract for an object which watches a specific resource set for
+// add, updates, and removals
+type WatchController interface {
+	// Initializes the cache
+	WarmUp(chan struct{})
+
+	// Run starts the watching process
+	Run(int, chan struct{})
+
+	// GetAll returns all of the resources
+	GetAll() []interface{}
+
+	// SetUpdateHandler sets a specific handler for adding/updating individual resources
+	SetUpdateHandler(WatchHandler) WatchController
+
+	// SetRemovedHandler sets a specific handler for removing individual resources
+	SetRemovedHandler(WatchHandler) WatchController
+}
+
+// CachingWatchController composites the watching behavior and a cache to ensure that all
+// up to date resources are readily available
+type CachingWatchController struct {
+	indexer  cache.Indexer
+	queue    workqueue.RateLimitingInterface
+	informer cache.Controller
+
+	resource     string
+	resourceType string
+
+	updateHandler WatchHandler
+	removeHandler WatchHandler
+}
+
+func NewCachingWatcher(restClient rest.Interface, resource string, resourceType rt.Object, namespace string, fieldSelector fields.Selector) WatchController {
+	resourceCache := cache.NewListWatchFromClient(restClient, resource, namespace, fieldSelector)
+	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+	indexer, informer := cache.NewIndexerInformer(resourceCache, resourceType, 0, cache.ResourceEventHandlerFuncs{
+		AddFunc: func(obj interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		UpdateFunc: func(old interface{}, new interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(new)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		DeleteFunc: func(obj interface{}) {
+			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
+			// key function.
+			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+	}, cache.Indexers{})
+
+	return &CachingWatchController{
+		indexer:      indexer,
+		queue:        queue,
+		informer:     informer,
+		resource:     resource,
+		resourceType: reflect.TypeOf(resourceType).String(),
+	}
+}
+
+func (c *CachingWatchController) GetAll() []interface{} {
+	return c.indexer.List()
+}
+
+func (c *CachingWatchController) SetUpdateHandler(handler WatchHandler) WatchController {
+	c.updateHandler = handler
+	return c
+}
+
+func (c *CachingWatchController) SetRemovedHandler(handler WatchHandler) WatchController {
+	c.removeHandler = handler
+	return c
+}
+
+func (c *CachingWatchController) processNextItem() bool {
+	// Wait until there is a new item in the working queue
+	key, quit := c.queue.Get()
+	if quit {
+		return false
+	}
+	// Tell the queue that we are done with processing this key. This unblocks the key for other workers
+	// This allows safe parallel processing because two pods with the same key are never processed in
+	// parallel.
+	defer c.queue.Done(key)
+
+	// Invoke the method containing the business logic
+	err := c.handle(key.(string))
+	// Handle the error if something went wrong during the execution of the business logic
+	c.handleErr(err, key)
+	return true
+}
+
+// handle is the business logic of the controller.
+func (c *CachingWatchController) handle(key string) error {
+	obj, exists, err := c.indexer.GetByKey(key)
+	if err != nil {
+		klog.Errorf("Fetching %s with key %s from store failed with %v", c.resourceType, key, err)
+		return err
+	}
+
+	if !exists {
+		klog.V(3).Infof("Removed %s for key: %s\n", c.resourceType, key)
+
+		if c.removeHandler != nil {
+			c.removeHandler(key)
+		}
+	} else {
+		klog.V(3).Infof("Updated %s: %s\n", c.resourceType, obj.(v1.Object).GetName())
+
+		if c.updateHandler != nil {
+			c.updateHandler(obj)
+		}
+	}
+	return nil
+}
+
+// handleErr checks if an error happened and makes sure we will retry later.
+func (c *CachingWatchController) handleErr(err error, key interface{}) {
+	if err == nil {
+		// Forget about the #AddRateLimited history of the key on every successful synchronization.
+		// This ensures that future processing of updates for this key is not delayed because of
+		// an outdated error history.
+		c.queue.Forget(key)
+		return
+	}
+
+	// This controller retries 5 times if something goes wrong. After that, it stops trying.
+	if c.queue.NumRequeues(key) < 5 {
+		klog.V(3).Infof("Error syncing %s %v: %v", c.resourceType, key, err)
+
+		// Re-enqueue the key rate limited. Based on the rate limiter on the
+		// queue and the re-enqueue history, the key will be processed later again.
+		c.queue.AddRateLimited(key)
+		return
+	}
+
+	c.queue.Forget(key)
+	// Report to an external entity that, even after several retries, we could not successfully process this key
+	runtime.HandleError(err)
+	klog.Infof("Dropping %s %q out of the queue: %v", c.resourceType, key, err)
+}
+
+func (c *CachingWatchController) WarmUp(cancelCh chan struct{}) {
+	go c.informer.Run(cancelCh)
+
+	// Wait for all involved caches to be synced, before processing items from the queue is started
+	if !cache.WaitForCacheSync(cancelCh, c.informer.HasSynced) {
+		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
+		return
+	}
+}
+
+func (c *CachingWatchController) Run(threadiness int, stopCh chan struct{}) {
+	defer runtime.HandleCrash()
+
+	// Let the workers stop when we are done
+	defer c.queue.ShutDown()
+	klog.V(3).Infof("Starting %s controller", c.resourceType)
+
+	for i := 0; i < threadiness; i++ {
+		go wait.Until(c.runWorker, time.Second, stopCh)
+	}
+
+	<-stopCh
+	klog.V(3).Infof("Stopping %s controller", c.resourceType)
+}
+
+func (c *CachingWatchController) runWorker() {
+	for c.processNextItem() {
+	}
+}

+ 35 - 16
main.go

@@ -21,16 +21,13 @@ import (
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/prometheus/client_golang/prometheus"
 
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 
-	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
-	"k8s.io/client-go/tools/cache"
 )
 
 const (
@@ -146,7 +143,16 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 
 	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
 	if aggregation != "" {
-		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		c, err := a.Cloud.GetConfig()
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+
+		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -235,8 +241,16 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		w.Write(wrapData(nil, err))
 		return
 	}
+	c, err := a.Cloud.GetConfig()
+	if err != nil {
+		w.Write(wrapData(nil, err))
+	}
+	discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+	}
 	if aggregation != "" {
-		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		agg := costModel.AggregateCostModel(data, discount*0.01, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	}
 }
@@ -258,7 +272,15 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		w.Write(wrapData(nil, err))
 	}
 	if aggregation != "" {
-		agg := costModel.AggregateCostModel(data, aggregation, aggregationSubField)
+		c, err := a.Cloud.GetConfig()
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -456,7 +478,7 @@ func (a *Accesses) recordPrices() {
 
 		for {
 			klog.V(4).Info("Recording prices...")
-			podlist := a.Model.Controller.GetAll()
+			podlist := a.Model.Cache.GetAllPods()
 			podStatus := make(map[string]v1.PodPhase)
 			for _, pod := range podlist {
 				podStatus[pod.Name] = pod.Status.Phase
@@ -523,10 +545,9 @@ func (a *Accesses) recordPrices() {
 					containerSeen[labelKey] = false
 				}
 
-				storageClasses, _ := a.KubeClientSet.StorageV1().StorageClasses().List(metav1.ListOptions{})
-
+				storageClasses := a.Model.Cache.GetAllStorageClasses()
 				storageClassMap := make(map[string]map[string]string)
-				for _, storageClass := range storageClasses.Items {
+				for _, storageClass := range storageClasses {
 					params := storageClass.Parameters
 					storageClassMap[storageClass.ObjectMeta.Name] = params
 					if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -535,8 +556,8 @@ func (a *Accesses) recordPrices() {
 					}
 				}
 
-				pvs, _ := a.KubeClientSet.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-				for _, pv := range pvs.Items {
+				pvs := a.Model.Cache.GetAllPersistentVolumes()
+				for _, pv := range pvs {
 					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 					if !ok {
 						klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
@@ -546,7 +567,7 @@ func (a *Accesses) recordPrices() {
 						Region:     pv.Labels[v1.LabelZoneRegion],
 						Parameters: parameters,
 					}
-					costModel.GetPVCost(cacPv, &pv, a.Cloud)
+					costModel.GetPVCost(cacPv, pv, a.Cloud)
 					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
 					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
 					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
@@ -713,8 +734,6 @@ func main() {
 		KubeClientSet: kubeClientset,
 	})
 
-	podCache := cache.NewListWatchFromClient(kubeClientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
-
 	a := Accesses{
 		PrometheusClient:              promCli,
 		KubeClientSet:                 kubeClientset,
@@ -729,7 +748,7 @@ func main() {
 		PVAllocationRecorder:          PVAllocation,
 		ContainerUptimeRecorder:       ContainerUptimeRecorder,
 		PersistentVolumePriceRecorder: pvGv,
-		Model:                         costModel.NewCostModel(podCache),
+		Model:                         costModel.NewCostModel(kubeClientset),
 	}
 
 	remoteEnabled := os.Getenv(remoteEnabled)