Browse Source

Merge pull request #165 from kubecost/cluster-caching

Kubernetes Caching Watchers Implementation
Matt Bolt 6 years ago
parent
commit
49ec96649b
5 changed files with 413 additions and 284 deletions
  1. 165 0
      costmodel/clustercache.go
  2. 0 179
      costmodel/containeruptime.go
  3. 40 92
      costmodel/costmodel.go
  4. 201 0
      costmodel/watchcontroller.go
  5. 7 13
      main.go

+ 165 - 0
costmodel/clustercache.go

@@ -0,0 +1,165 @@
+package costmodel
+
+import (
+	"sync"
+
+	appsv1 "k8s.io/api/apps/v1"
+	v1 "k8s.io/api/core/v1"
+	stv1 "k8s.io/api/storage/v1"
+	"k8s.io/apimachinery/pkg/fields"
+	"k8s.io/client-go/kubernetes"
+)
+
+// ClusterCache defines an contract for an object which caches components within a cluster, ensuring
+// up to date resources using watchers
+type ClusterCache interface {
+	// Run starts the watcher processes
+	Run(stopCh chan struct{})
+
+	// GetAllNamespaces returns all the cached namespaces
+	GetAllNamespaces() []*v1.Namespace
+
+	// GetAllNodes returns all the cached nodes
+	GetAllNodes() []*v1.Node
+
+	// GetAllPods returns all the cached pods
+	GetAllPods() []*v1.Pod
+
+	// GetAllServices returns all the cached services
+	GetAllServices() []*v1.Service
+
+	// GetAllDeployments returns all the cached deployments
+	GetAllDeployments() []*appsv1.Deployment
+
+	// GetAllPersistentVolumes returns all the cached persistent volumes
+	GetAllPersistentVolumes() []*v1.PersistentVolume
+
+	// GetAllStorageClasses returns all the cached storage classes
+	GetAllStorageClasses() []*stv1.StorageClass
+}
+
+// KubernetesClusterCache is the implementation of ClusterCache
+type KubernetesClusterCache struct {
+	client kubernetes.Interface
+
+	namespaceWatch    WatchController
+	nodeWatch         WatchController
+	podWatch          WatchController
+	serviceWatch      WatchController
+	deploymentsWatch  WatchController
+	pvWatch           WatchController
+	storageClassWatch WatchController
+}
+
+func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
+	defer wg.Done()
+	wc.WarmUp(cancel)
+}
+
+func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
+	coreRestClient := client.CoreV1().RESTClient()
+	appsRestClient := client.AppsV1().RESTClient()
+	storageRestClient := client.StorageV1().RESTClient()
+
+	kcc := &KubernetesClusterCache{
+		client:            client,
+		namespaceWatch:    NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
+		nodeWatch:         NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
+		podWatch:          NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
+		serviceWatch:      NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		deploymentsWatch:  NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		pvWatch:           NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+	}
+
+	// Wait for each caching watcher to initialize
+	var wg sync.WaitGroup
+	wg.Add(7)
+
+	cancel := make(chan struct{})
+
+	go initializeCache(kcc.namespaceWatch, &wg, cancel)
+	go initializeCache(kcc.nodeWatch, &wg, cancel)
+	go initializeCache(kcc.podWatch, &wg, cancel)
+	go initializeCache(kcc.serviceWatch, &wg, cancel)
+	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
+	go initializeCache(kcc.pvWatch, &wg, cancel)
+	go initializeCache(kcc.storageClassWatch, &wg, cancel)
+
+	wg.Wait()
+
+	return kcc
+}
+
+func (kcc *KubernetesClusterCache) Run(stopCh chan struct{}) {
+	go kcc.namespaceWatch.Run(1, stopCh)
+	go kcc.nodeWatch.Run(1, stopCh)
+	go kcc.podWatch.Run(1, stopCh)
+	go kcc.serviceWatch.Run(1, stopCh)
+	go kcc.deploymentsWatch.Run(1, stopCh)
+	go kcc.pvWatch.Run(1, stopCh)
+	go kcc.storageClassWatch.Run(1, stopCh)
+}
+
+func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
+	var namespaces []*v1.Namespace
+	items := kcc.namespaceWatch.GetAll()
+	for _, ns := range items {
+		namespaces = append(namespaces, ns.(*v1.Namespace))
+	}
+	return namespaces
+}
+
+func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
+	var nodes []*v1.Node
+	items := kcc.nodeWatch.GetAll()
+	for _, node := range items {
+		nodes = append(nodes, node.(*v1.Node))
+	}
+	return nodes
+}
+
+func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
+	var pods []*v1.Pod
+	items := kcc.podWatch.GetAll()
+	for _, pod := range items {
+		pods = append(pods, pod.(*v1.Pod))
+	}
+	return pods
+}
+
+func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
+	var services []*v1.Service
+	items := kcc.serviceWatch.GetAll()
+	for _, service := range items {
+		services = append(services, service.(*v1.Service))
+	}
+	return services
+}
+
+func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
+	var deployments []*appsv1.Deployment
+	items := kcc.deploymentsWatch.GetAll()
+	for _, deployment := range items {
+		deployments = append(deployments, deployment.(*appsv1.Deployment))
+	}
+	return deployments
+}
+
+func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
+	var pvs []*v1.PersistentVolume
+	items := kcc.pvWatch.GetAll()
+	for _, pv := range items {
+		pvs = append(pvs, pv.(*v1.PersistentVolume))
+	}
+	return pvs
+}
+
+func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
+	var storageClasses []*stv1.StorageClass
+	items := kcc.storageClassWatch.GetAll()
+	for _, stc := range items {
+		storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
+	}
+	return storageClasses
+}

+ 0 - 179
costmodel/containeruptime.go

@@ -1,179 +0,0 @@
-package costmodel
-
-import (
-	"fmt"
-	"time"
-
-	"k8s.io/klog"
-
-	v1 "k8s.io/api/core/v1"
-	"k8s.io/apimachinery/pkg/fields"
-	"k8s.io/apimachinery/pkg/util/runtime"
-	"k8s.io/apimachinery/pkg/util/wait"
-	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/tools/cache"
-	"k8s.io/client-go/util/workqueue"
-)
-
-type Controller struct {
-	indexer  cache.Indexer
-	queue    workqueue.RateLimitingInterface
-	informer cache.Controller
-}
-
-func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
-	return &Controller{
-		informer: informer,
-		indexer:  indexer,
-		queue:    queue,
-	}
-}
-
-func (c *Controller) processNextItem() bool {
-	// Wait until there is a new item in the working queue
-	key, quit := c.queue.Get()
-	if quit {
-		return false
-	}
-	// Tell the queue that we are done with processing this key. This unblocks the key for other workers
-	// This allows safe parallel processing because two pods with the same key are never processed in
-	// parallel.
-	defer c.queue.Done(key)
-
-	// Invoke the method containing the business logic
-	err := c.syncToPrometheus(key.(string))
-	// Handle the error if something went wrong during the execution of the business logic
-	c.handleErr(err, key)
-	return true
-}
-
-// syncToPrometheus is the business logic of the controller. In this controller it simply prints
-// information about the pod to stdout. In case an error happened, it has to simply return the error.
-// The retry logic should not be part of the business logic.
-func (c *Controller) syncToPrometheus(key string) error {
-	obj, exists, err := c.indexer.GetByKey(key)
-	if err != nil {
-		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
-		return err
-	}
-
-	if !exists {
-		// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
-		klog.V(1).Infof("Pod %s does not exist anymore\n", key)
-	} else {
-		// Note that you also have to check the uid if you have a local controlled resource, which
-		// is dependent on the actual instance, to detect that a Pod was recreated with the same name
-		klog.V(1).Infof("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
-	}
-	return nil
-}
-
-func (c *Controller) GetAll() []*v1.Pod {
-	objs := c.indexer.List()
-	var pods []*v1.Pod
-	for _, obj := range objs {
-		pods = append(pods, obj.(*v1.Pod))
-	}
-	return pods
-}
-
-// handleErr checks if an error happened and makes sure we will retry later.
-func (c *Controller) handleErr(err error, key interface{}) {
-	if err == nil {
-		// Forget about the #AddRateLimited history of the key on every successful synchronization.
-		// This ensures that future processing of updates for this key is not delayed because of
-		// an outdated error history.
-		c.queue.Forget(key)
-		return
-	}
-
-	// This controller retries 5 times if something goes wrong. After that, it stops trying.
-	if c.queue.NumRequeues(key) < 5 {
-		klog.Infof("Error syncing pod %v: %v", key, err)
-
-		// Re-enqueue the key rate limited. Based on the rate limiter on the
-		// queue and the re-enqueue history, the key will be processed later again.
-		c.queue.AddRateLimited(key)
-		return
-	}
-
-	c.queue.Forget(key)
-	// Report to an external entity that, even after several retries, we could not successfully process this key
-	runtime.HandleError(err)
-	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
-}
-
-func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
-	defer runtime.HandleCrash()
-
-	// Let the workers stop when we are done
-	defer c.queue.ShutDown()
-	klog.Info("Starting Pod controller")
-
-	go c.informer.Run(stopCh)
-
-	// Wait for all involved caches to be synced, before processing items from the queue is started
-	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
-		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
-		return
-	}
-
-	for i := 0; i < threadiness; i++ {
-		go wait.Until(c.runWorker, time.Second, stopCh)
-	}
-
-	<-stopCh
-	klog.Info("Stopping Pod controller")
-}
-
-func (c *Controller) runWorker() {
-	for c.processNextItem() {
-	}
-}
-
-func ContainerUptimeWatcher(clientset kubernetes.Interface) {
-	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
-
-	// create the workqueue
-	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
-
-	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
-	// whenever the cache is updated, the pod key is added to the workqueue.
-	// Note that when we finally process the item from the workqueue, we might see a newer version
-	// of the Pod than the version which was responsible for triggering the update.
-	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		UpdateFunc: func(old interface{}, new interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(new)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
-			// key function.
-			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-	}, cache.Indexers{})
-
-	controller := NewController(queue, indexer, informer)
-
-	/*
-		podList, _ := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
-		for _, pod := range podList.Items {
-			indexer.Add(&pod)
-		}
-	*/
-	// Now let's start the controller
-	stop := make(chan struct{})
-	//defer close(stop)
-	go controller.Run(1, stop)
-}

+ 40 - 92
costmodel/costmodel.go

@@ -19,8 +19,6 @@ import (
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/tools/cache"
-	"k8s.io/client-go/util/workqueue"
 	"k8s.io/klog"
 )
 
@@ -45,52 +43,20 @@ const (
 )
 
 type CostModel struct {
-	Controller *Controller
-}
+	Cache ClusterCache
 
-func NewCostModel(podListWatcher cache.ListerWatcher) *CostModel {
-	cm := &CostModel{}
-	cm.ContainerWatcher(podListWatcher)
-	return cm
+	stop chan struct{}
 }
 
-func (cm *CostModel) ContainerWatcher(podListWatcher cache.ListerWatcher) {
-
-	// create the workqueue
-	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+func NewCostModel(client kubernetes.Interface) *CostModel {
+	stopCh := make(chan struct{})
+	cache := NewKubernetesClusterCache(client)
+	cache.Run(stopCh)
 
-	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
-	// whenever the cache is updated, the pod key is added to the workqueue.
-	// Note that when we finally process the item from the workqueue, we might see a newer version
-	// of the Pod than the version which was responsible for triggering the update.
-	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		UpdateFunc: func(old interface{}, new interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(new)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
-			// key function.
-			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-	}, cache.Indexers{})
-
-	cm.Controller = NewController(queue, indexer, informer)
-	// Now let's start the controller
-	stop := make(chan struct{})
-	//defer close(stop)
-	go cm.Controller.Run(1, stop)
+	return &CostModel{
+		Cache: cache,
+		stop:  stopCh,
+	}
 }
 
 type CostData struct {
@@ -337,21 +303,21 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
-	podlist := cm.Controller.GetAll()
+	podlist := cm.Cache.GetAllPods()
 	var k8sErr error
 	go func() {
 		defer wg.Done()
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache)
 		if k8sErr != nil {
 			return
 		}
@@ -372,7 +338,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(clientset, cloud)
+	nodes, err := getNodeCost(cm.Cache, cloud)
 	if err != nil {
 		klog.V(1).Infof("Warning, no Node cost model available: " + err.Error())
 		return nil, err
@@ -383,7 +349,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(clientset, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cloud)
 		if err != nil {
 			return nil, err
 		}
@@ -847,17 +813,14 @@ func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
 
 	return allocation
 }
-func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
+func addPVData(cache ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 		return err
 	}
-	storageClasses, err := clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	storageClasses := cache.GetAllStorageClasses()
 	storageClassMap := make(map[string]map[string]string)
-	for _, storageClass := range storageClasses.Items {
+	for _, storageClass := range storageClasses {
 		params := storageClass.Parameters
 		storageClassMap[storageClass.ObjectMeta.Name] = params
 		if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -866,12 +829,9 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 		}
 	}
 
-	pvs, err := clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	pvs := cache.GetAllPersistentVolumes()
 	pvMap := make(map[string]*costAnalyzerCloud.PV)
-	for _, pv := range pvs.Items {
+	for _, pv := range pvs {
 		parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 		if !ok {
 			klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
@@ -881,7 +841,7 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 			Region:     pv.Labels[v1.LabelZoneRegion],
 			Parameters: parameters,
 		}
-		err := GetPVCost(cacPv, &pv, cloud)
+		err := GetPVCost(cacPv, pv, cloud)
 		if err != nil {
 			return err
 		}
@@ -920,17 +880,14 @@ func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cloud costAna
 	return nil
 }
 
-func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
+func getNodeCost(cache ClusterCache, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 		return nil, err
 	}
-	nodeList, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+	nodeList := cache.GetAllNodes()
 	nodes := make(map[string]*costAnalyzerCloud.Node)
-	for _, n := range nodeList.Items {
+	for _, n := range nodeList {
 		name := n.GetObjectMeta().GetName()
 		nodeLabels := n.GetObjectMeta().GetLabels()
 		nodeLabels["providerID"] = n.Spec.ProviderID
@@ -1050,13 +1007,10 @@ func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provide
 	return nodes, nil
 }
 
-func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
-	servicesList, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+func getPodServices(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+	servicesList := cache.GetAllServices()
 	podServicesMapping := make(map[string]map[string][]string)
-	for _, service := range servicesList.Items {
+	for _, service := range servicesList {
 		namespace := service.GetObjectMeta().GetNamespace()
 		name := service.GetObjectMeta().GetName()
 
@@ -1079,13 +1033,10 @@ func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[stri
 	return podServicesMapping, nil
 }
 
-func getPodDeployments(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
-	deploymentsList, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+	deploymentsList := cache.GetAllDeployments()
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
-	for _, deployment := range deploymentsList.Items {
+	for _, deployment := range deploymentsList {
 		namespace := deployment.GetObjectMeta().GetNamespace()
 		name := deployment.GetObjectMeta().GetName()
 		if _, ok := podDeploymentsMapping[namespace]; !ok {
@@ -1190,20 +1141,20 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
-	podlist := cm.Controller.GetAll()
+	podlist := cm.Cache.GetAllPods()
 	var k8sErr error
 	go func() {
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache)
 		if k8sErr != nil {
 			return
 		}
@@ -1225,7 +1176,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(clientset, cloud)
+	nodes, err := getNodeCost(cm.Cache, cloud)
 	if err != nil {
 		klog.V(1).Infof("Warning, no cost model available: " + err.Error())
 		return nil, err
@@ -1237,7 +1188,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(clientset, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cloud)
 		if err != nil {
 			return nil, err
 		}
@@ -1507,13 +1458,10 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return containerNameCost, err
 }
 
-func getNamespaceLabels(clientset kubernetes.Interface) (map[string]map[string]string, error) {
+func getNamespaceLabels(cache ClusterCache) (map[string]map[string]string, error) {
 	nsToLabels := make(map[string]map[string]string)
-	nss, err := clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
-	for _, ns := range nss.Items {
+	nss := cache.GetAllNamespaces()
+	for _, ns := range nss {
 		nsToLabels[ns.Name] = ns.Labels
 	}
 	return nsToLabels, nil

+ 201 - 0
costmodel/watchcontroller.go

@@ -0,0 +1,201 @@
+package costmodel
+
+import (
+	"fmt"
+	"reflect"
+	"time"
+
+	"k8s.io/klog"
+
+	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/fields"
+	rt "k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/util/runtime"
+	"k8s.io/apimachinery/pkg/util/wait"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/util/workqueue"
+)
+
+// Type alias for a receiver func
+type WatchHandler = func(interface{})
+
+// WatchController defines a contract for an object which watches a specific resource set for
+// add, updates, and removals
+type WatchController interface {
+	// Initializes the cache
+	WarmUp(chan struct{})
+
+	// Run starts the watching process
+	Run(int, chan struct{})
+
+	// GetAll returns all of the resources
+	GetAll() []interface{}
+
+	// SetUpdateHandler sets a specific handler for adding/updating individual resources
+	SetUpdateHandler(WatchHandler) WatchController
+
+	// SetRemovedHandler sets a specific handler for removing individual resources
+	SetRemovedHandler(WatchHandler) WatchController
+}
+
+// CachingWatchController composites the watching behavior and a cache to ensure that all
+// up to date resources are readily available
+type CachingWatchController struct {
+	indexer  cache.Indexer
+	queue    workqueue.RateLimitingInterface
+	informer cache.Controller
+
+	resource     string
+	resourceType string
+
+	updateHandler WatchHandler
+	removeHandler WatchHandler
+}
+
+func NewCachingWatcher(restClient rest.Interface, resource string, resourceType rt.Object, namespace string, fieldSelector fields.Selector) WatchController {
+	resourceCache := cache.NewListWatchFromClient(restClient, resource, namespace, fieldSelector)
+	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+	indexer, informer := cache.NewIndexerInformer(resourceCache, resourceType, 0, cache.ResourceEventHandlerFuncs{
+		AddFunc: func(obj interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		UpdateFunc: func(old interface{}, new interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(new)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		DeleteFunc: func(obj interface{}) {
+			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
+			// key function.
+			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+	}, cache.Indexers{})
+
+	return &CachingWatchController{
+		indexer:      indexer,
+		queue:        queue,
+		informer:     informer,
+		resource:     resource,
+		resourceType: reflect.TypeOf(resourceType).String(),
+	}
+}
+
+func (c *CachingWatchController) GetAll() []interface{} {
+	return c.indexer.List()
+}
+
+func (c *CachingWatchController) SetUpdateHandler(handler WatchHandler) WatchController {
+	c.updateHandler = handler
+	return c
+}
+
+func (c *CachingWatchController) SetRemovedHandler(handler WatchHandler) WatchController {
+	c.removeHandler = handler
+	return c
+}
+
+func (c *CachingWatchController) processNextItem() bool {
+	// Wait until there is a new item in the working queue
+	key, quit := c.queue.Get()
+	if quit {
+		return false
+	}
+	// Tell the queue that we are done with processing this key. This unblocks the key for other workers
+	// This allows safe parallel processing because two pods with the same key are never processed in
+	// parallel.
+	defer c.queue.Done(key)
+
+	// Invoke the method containing the business logic
+	err := c.handle(key.(string))
+	// Handle the error if something went wrong during the execution of the business logic
+	c.handleErr(err, key)
+	return true
+}
+
+// handle is the business logic of the controller.
+func (c *CachingWatchController) handle(key string) error {
+	obj, exists, err := c.indexer.GetByKey(key)
+	if err != nil {
+		klog.Errorf("Fetching %s with key %s from store failed with %v", c.resourceType, key, err)
+		return err
+	}
+
+	if !exists {
+		klog.V(3).Infof("Removed %s for key: %s\n", c.resourceType, key)
+
+		if c.removeHandler != nil {
+			c.removeHandler(key)
+		}
+	} else {
+		klog.V(3).Infof("Updated %s: %s\n", c.resourceType, obj.(v1.Object).GetName())
+
+		if c.updateHandler != nil {
+			c.updateHandler(obj)
+		}
+	}
+	return nil
+}
+
+// handleErr checks if an error happened and makes sure we will retry later.
+func (c *CachingWatchController) handleErr(err error, key interface{}) {
+	if err == nil {
+		// Forget about the #AddRateLimited history of the key on every successful synchronization.
+		// This ensures that future processing of updates for this key is not delayed because of
+		// an outdated error history.
+		c.queue.Forget(key)
+		return
+	}
+
+	// This controller retries 5 times if something goes wrong. After that, it stops trying.
+	if c.queue.NumRequeues(key) < 5 {
+		klog.V(3).Infof("Error syncing %s %v: %v", c.resourceType, key, err)
+
+		// Re-enqueue the key rate limited. Based on the rate limiter on the
+		// queue and the re-enqueue history, the key will be processed later again.
+		c.queue.AddRateLimited(key)
+		return
+	}
+
+	c.queue.Forget(key)
+	// Report to an external entity that, even after several retries, we could not successfully process this key
+	runtime.HandleError(err)
+	klog.Infof("Dropping %s %q out of the queue: %v", c.resourceType, key, err)
+}
+
+func (c *CachingWatchController) WarmUp(cancelCh chan struct{}) {
+	go c.informer.Run(cancelCh)
+
+	// Wait for all involved caches to be synced, before processing items from the queue is started
+	if !cache.WaitForCacheSync(cancelCh, c.informer.HasSynced) {
+		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
+		return
+	}
+}
+
+func (c *CachingWatchController) Run(threadiness int, stopCh chan struct{}) {
+	defer runtime.HandleCrash()
+
+	// Let the workers stop when we are done
+	defer c.queue.ShutDown()
+	klog.V(3).Infof("Starting %s controller", c.resourceType)
+
+	for i := 0; i < threadiness; i++ {
+		go wait.Until(c.runWorker, time.Second, stopCh)
+	}
+
+	<-stopCh
+	klog.V(3).Infof("Stopping %s controller", c.resourceType)
+}
+
+func (c *CachingWatchController) runWorker() {
+	for c.processNextItem() {
+	}
+}

+ 7 - 13
main.go

@@ -21,16 +21,13 @@ import (
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/prometheus/client_golang/prometheus"
 
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 
-	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
-	"k8s.io/client-go/tools/cache"
 )
 
 const (
@@ -480,7 +477,7 @@ func (a *Accesses) recordPrices() {
 
 		for {
 			klog.V(4).Info("Recording prices...")
-			podlist := a.Model.Controller.GetAll()
+			podlist := a.Model.Cache.GetAllPods()
 			podStatus := make(map[string]v1.PodPhase)
 			for _, pod := range podlist {
 				podStatus[pod.Name] = pod.Status.Phase
@@ -545,10 +542,9 @@ func (a *Accesses) recordPrices() {
 					containerSeen[labelKey] = false
 				}
 
-				storageClasses, _ := a.KubeClientSet.StorageV1().StorageClasses().List(metav1.ListOptions{})
-
+				storageClasses := a.Model.Cache.GetAllStorageClasses()
 				storageClassMap := make(map[string]map[string]string)
-				for _, storageClass := range storageClasses.Items {
+				for _, storageClass := range storageClasses {
 					params := storageClass.Parameters
 					storageClassMap[storageClass.ObjectMeta.Name] = params
 					if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -557,8 +553,8 @@ func (a *Accesses) recordPrices() {
 					}
 				}
 
-				pvs, _ := a.KubeClientSet.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-				for _, pv := range pvs.Items {
+				pvs := a.Model.Cache.GetAllPersistentVolumes()
+				for _, pv := range pvs {
 					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 					if !ok {
 						klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
@@ -568,7 +564,7 @@ func (a *Accesses) recordPrices() {
 						Region:     pv.Labels[v1.LabelZoneRegion],
 						Parameters: parameters,
 					}
-					costModel.GetPVCost(cacPv, &pv, a.Cloud)
+					costModel.GetPVCost(cacPv, pv, a.Cloud)
 					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
 					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
 					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
@@ -730,8 +726,6 @@ func main() {
 		KubeClientSet: kubeClientset,
 	})
 
-	podCache := cache.NewListWatchFromClient(kubeClientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
-
 	a := Accesses{
 		PrometheusClient:              promCli,
 		KubeClientSet:                 kubeClientset,
@@ -745,7 +739,7 @@ func main() {
 		GPUAllocationRecorder:         GPUAllocation,
 		ContainerUptimeRecorder:       ContainerUptimeRecorder,
 		PersistentVolumePriceRecorder: pvGv,
-		Model:                         costModel.NewCostModel(podCache),
+		Model:                         costModel.NewCostModel(kubeClientset),
 	}
 
 	remoteEnabled := os.Getenv(remoteEnabled)