Explorar el Código

Use caching watchers to maintain up to date cached cluster resources rather than query k8s API.

Matt Bolt hace 6 años
padre
commit
d90f287593
Se han modificado 4 ficheros con 287 adiciones y 188 borrados
  1. 140 0
      costmodel/clustercache.go
  2. 40 92
      costmodel/costmodel.go
  3. 100 83
      costmodel/watchcontroller.go
  4. 7 13
      main.go

+ 140 - 0
costmodel/clustercache.go

@@ -0,0 +1,140 @@
+package costmodel
+
+import (
+	appsv1 "k8s.io/api/apps/v1"
+	v1 "k8s.io/api/core/v1"
+	stv1 "k8s.io/api/storage/v1"
+	"k8s.io/apimachinery/pkg/fields"
+	"k8s.io/client-go/kubernetes"
+)
+
+// ClusterCache defines an contract for an object which caches components within a cluster, ensuring
+// up to date resources using watchers
+type ClusterCache interface {
+	// Run starts the watcher processes
+	Run(stopCh chan struct{})
+
+	// GetAllNamespaces returns all the cached namespaces
+	GetAllNamespaces() []*v1.Namespace
+
+	// GetAllNodes returns all the cached nodes
+	GetAllNodes() []*v1.Node
+
+	// GetAllPods returns all the cached pods
+	GetAllPods() []*v1.Pod
+
+	// GetAllServices returns all the cached services
+	GetAllServices() []*v1.Service
+
+	// GetAllDeployments returns all the cached deployments
+	GetAllDeployments() []*appsv1.Deployment
+
+	// GetAllPersistentVolumes returns all the cached persistent volumes
+	GetAllPersistentVolumes() []*v1.PersistentVolume
+
+	// GetAllStorageClasses returns all the cached storage classes
+	GetAllStorageClasses() []*stv1.StorageClass
+}
+
+// KubernetesClusterCache is the implementation of ClusterCache
+type KubernetesClusterCache struct {
+	client kubernetes.Interface
+
+	namespaceWatch    WatchController
+	nodeWatch         WatchController
+	podWatch          WatchController
+	serviceWatch      WatchController
+	deploymentsWatch  WatchController
+	pvWatch           WatchController
+	storageClassWatch WatchController
+}
+
+func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
+	coreRestClient := client.CoreV1().RESTClient()
+	appsRestClient := client.AppsV1().RESTClient()
+	storageRestClient := client.StorageV1().RESTClient()
+
+	return &KubernetesClusterCache{
+		client:            client,
+		namespaceWatch:    NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
+		nodeWatch:         NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
+		podWatch:          NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
+		serviceWatch:      NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		deploymentsWatch:  NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		pvWatch:           NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+	}
+}
+
+func (kcc *KubernetesClusterCache) Run(stopCh chan struct{}) {
+	go kcc.namespaceWatch.Run(1, stopCh)
+	go kcc.nodeWatch.Run(1, stopCh)
+	go kcc.podWatch.Run(1, stopCh)
+	go kcc.serviceWatch.Run(1, stopCh)
+	go kcc.deploymentsWatch.Run(1, stopCh)
+	go kcc.pvWatch.Run(1, stopCh)
+	go kcc.storageClassWatch.Run(1, stopCh)
+}
+
+func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
+	var namespaces []*v1.Namespace
+	items := kcc.namespaceWatch.GetAll()
+	for _, ns := range items {
+		namespaces = append(namespaces, ns.(*v1.Namespace))
+	}
+	return namespaces
+}
+
+func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
+	var nodes []*v1.Node
+	items := kcc.nodeWatch.GetAll()
+	for _, node := range items {
+		nodes = append(nodes, node.(*v1.Node))
+	}
+	return nodes
+}
+
+func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
+	var pods []*v1.Pod
+	items := kcc.podWatch.GetAll()
+	for _, pod := range items {
+		pods = append(pods, pod.(*v1.Pod))
+	}
+	return pods
+}
+
+func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
+	var services []*v1.Service
+	items := kcc.serviceWatch.GetAll()
+	for _, service := range items {
+		services = append(services, service.(*v1.Service))
+	}
+	return services
+}
+
+func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
+	var deployments []*appsv1.Deployment
+	items := kcc.deploymentsWatch.GetAll()
+	for _, deployment := range items {
+		deployments = append(deployments, deployment.(*appsv1.Deployment))
+	}
+	return deployments
+}
+
+func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
+	var pvs []*v1.PersistentVolume
+	items := kcc.pvWatch.GetAll()
+	for _, pv := range items {
+		pvs = append(pvs, pv.(*v1.PersistentVolume))
+	}
+	return pvs
+}
+
+func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
+	var storageClasses []*stv1.StorageClass
+	items := kcc.storageClassWatch.GetAll()
+	for _, stc := range items {
+		storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
+	}
+	return storageClasses
+}

+ 40 - 92
costmodel/costmodel.go

@@ -19,8 +19,6 @@ import (
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/tools/cache"
-	"k8s.io/client-go/util/workqueue"
 	"k8s.io/klog"
 )
 
@@ -44,52 +42,20 @@ const (
 )
 
 type CostModel struct {
-	Controller *Controller
-}
+	Cache ClusterCache
 
-func NewCostModel(podListWatcher cache.ListerWatcher) *CostModel {
-	cm := &CostModel{}
-	cm.ContainerWatcher(podListWatcher)
-	return cm
+	stop chan struct{}
 }
 
-func (cm *CostModel) ContainerWatcher(podListWatcher cache.ListerWatcher) {
-
-	// create the workqueue
-	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+func NewCostModel(client kubernetes.Interface) *CostModel {
+	stopCh := make(chan struct{})
+	cache := NewKubernetesClusterCache(client)
+	cache.Run(stopCh)
 
-	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
-	// whenever the cache is updated, the pod key is added to the workqueue.
-	// Note that when we finally process the item from the workqueue, we might see a newer version
-	// of the Pod than the version which was responsible for triggering the update.
-	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		UpdateFunc: func(old interface{}, new interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(new)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
-			// key function.
-			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-	}, cache.Indexers{})
-
-	cm.Controller = NewController(queue, indexer, informer)
-	// Now let's start the controller
-	stop := make(chan struct{})
-	//defer close(stop)
-	go cm.Controller.Run(1, stop)
+	return &CostModel{
+		Cache: cache,
+		stop:  stopCh,
+	}
 }
 
 type CostData struct {
@@ -334,21 +300,21 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
-	podlist := cm.Controller.GetAll()
+	podlist := cm.Cache.GetAllPods()
 	var k8sErr error
 	go func() {
 		defer wg.Done()
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache)
 		if k8sErr != nil {
 			return
 		}
@@ -369,7 +335,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(clientset, cloud)
+	nodes, err := getNodeCost(cm.Cache, cloud)
 	if err != nil {
 		klog.V(1).Infof("Warning, no Node cost model available: " + err.Error())
 		return nil, err
@@ -380,7 +346,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(clientset, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cloud)
 		if err != nil {
 			return nil, err
 		}
@@ -841,17 +807,14 @@ func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
 
 	return allocation
 }
-func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
+func addPVData(cache ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 		return err
 	}
-	storageClasses, err := clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	storageClasses := cache.GetAllStorageClasses()
 	storageClassMap := make(map[string]map[string]string)
-	for _, storageClass := range storageClasses.Items {
+	for _, storageClass := range storageClasses {
 		params := storageClass.Parameters
 		storageClassMap[storageClass.ObjectMeta.Name] = params
 		if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -860,12 +823,9 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 		}
 	}
 
-	pvs, err := clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	pvs := cache.GetAllPersistentVolumes()
 	pvMap := make(map[string]*costAnalyzerCloud.PV)
-	for _, pv := range pvs.Items {
+	for _, pv := range pvs {
 		parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 		if !ok {
 			klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
@@ -875,7 +835,7 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 			Region:     pv.Labels[v1.LabelZoneRegion],
 			Parameters: parameters,
 		}
-		err := GetPVCost(cacPv, &pv, cloud)
+		err := GetPVCost(cacPv, pv, cloud)
 		if err != nil {
 			return err
 		}
@@ -914,17 +874,14 @@ func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cloud costAna
 	return nil
 }
 
-func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
+func getNodeCost(cache ClusterCache, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 		return nil, err
 	}
-	nodeList, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+	nodeList := cache.GetAllNodes()
 	nodes := make(map[string]*costAnalyzerCloud.Node)
-	for _, n := range nodeList.Items {
+	for _, n := range nodeList {
 		name := n.GetObjectMeta().GetName()
 		nodeLabels := n.GetObjectMeta().GetLabels()
 		nodeLabels["providerID"] = n.Spec.ProviderID
@@ -1044,13 +1001,10 @@ func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provide
 	return nodes, nil
 }
 
-func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
-	servicesList, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+func getPodServices(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+	servicesList := cache.GetAllServices()
 	podServicesMapping := make(map[string]map[string][]string)
-	for _, service := range servicesList.Items {
+	for _, service := range servicesList {
 		namespace := service.GetObjectMeta().GetNamespace()
 		name := service.GetObjectMeta().GetName()
 
@@ -1073,13 +1027,10 @@ func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[stri
 	return podServicesMapping, nil
 }
 
-func getPodDeployments(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
-	deploymentsList, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+	deploymentsList := cache.GetAllDeployments()
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
-	for _, deployment := range deploymentsList.Items {
+	for _, deployment := range deploymentsList {
 		namespace := deployment.GetObjectMeta().GetNamespace()
 		name := deployment.GetObjectMeta().GetName()
 		if _, ok := podDeploymentsMapping[namespace]; !ok {
@@ -1183,20 +1134,20 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
-	podlist := cm.Controller.GetAll()
+	podlist := cm.Cache.GetAllPods()
 	var k8sErr error
 	go func() {
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache)
 		if k8sErr != nil {
 			return
 		}
@@ -1218,7 +1169,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(clientset, cloud)
+	nodes, err := getNodeCost(cm.Cache, cloud)
 	if err != nil {
 		klog.V(1).Infof("Warning, no cost model available: " + err.Error())
 		return nil, err
@@ -1230,7 +1181,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(clientset, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cloud)
 		if err != nil {
 			return nil, err
 		}
@@ -1498,13 +1449,10 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return containerNameCost, err
 }
 
-func getNamespaceLabels(clientset kubernetes.Interface) (map[string]map[string]string, error) {
+func getNamespaceLabels(cache ClusterCache) (map[string]map[string]string, error) {
 	nsToLabels := make(map[string]map[string]string)
-	nss, err := clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
-	for _, ns := range nss.Items {
+	nss := cache.GetAllNamespaces()
+	for _, ns := range nss {
 		nsToLabels[ns.Name] = ns.Labels
 	}
 	return nsToLabels, nil

+ 100 - 83
costmodel/containeruptime.go → costmodel/watchcontroller.go

@@ -2,34 +2,104 @@ package costmodel
 
 import (
 	"fmt"
+	"reflect"
 	"time"
 
 	"k8s.io/klog"
 
-	v1 "k8s.io/api/core/v1"
+	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/fields"
+	rt "k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/util/runtime"
 	"k8s.io/apimachinery/pkg/util/wait"
-	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/rest"
 	"k8s.io/client-go/tools/cache"
 	"k8s.io/client-go/util/workqueue"
 )
 
-type Controller struct {
+// Type alias for a receiver func
+type WatchHandler = func(interface{})
+
+// WatchController defines a contract for an object which watches a specific resource set for
+// add, updates, and removals
+type WatchController interface {
+	// Run starts the watching process
+	Run(int, chan struct{})
+
+	// GetAll returns all of the resources
+	GetAll() []interface{}
+
+	// SetUpdateHandler sets a specific handler for adding/updating individual resources
+	SetUpdateHandler(WatchHandler) WatchController
+
+	// SetRemovedHandler sets a specific handler for removing individual resources
+	SetRemovedHandler(WatchHandler) WatchController
+}
+
+// CachingWatchController composites the watching behavior and a cache to ensure that all
+// up to date resources are readily available
+type CachingWatchController struct {
 	indexer  cache.Indexer
 	queue    workqueue.RateLimitingInterface
 	informer cache.Controller
+
+	resource     string
+	resourceType string
+
+	updateHandler WatchHandler
+	removeHandler WatchHandler
 }
 
-func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
-	return &Controller{
-		informer: informer,
-		indexer:  indexer,
-		queue:    queue,
+func NewCachingWatcher(restClient rest.Interface, resource string, resourceType rt.Object, namespace string, fieldSelector fields.Selector) WatchController {
+	resourceCache := cache.NewListWatchFromClient(restClient, resource, namespace, fieldSelector)
+	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+	indexer, informer := cache.NewIndexerInformer(resourceCache, resourceType, 0, cache.ResourceEventHandlerFuncs{
+		AddFunc: func(obj interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		UpdateFunc: func(old interface{}, new interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(new)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		DeleteFunc: func(obj interface{}) {
+			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
+			// key function.
+			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+	}, cache.Indexers{})
+
+	return &CachingWatchController{
+		indexer:      indexer,
+		queue:        queue,
+		informer:     informer,
+		resource:     resource,
+		resourceType: reflect.TypeOf(resourceType).String(),
 	}
 }
 
-func (c *Controller) processNextItem() bool {
+func (c *CachingWatchController) GetAll() []interface{} {
+	return c.indexer.List()
+}
+
+func (c *CachingWatchController) SetUpdateHandler(handler WatchHandler) WatchController {
+	c.updateHandler = handler
+	return c
+}
+
+func (c *CachingWatchController) SetRemovedHandler(handler WatchHandler) WatchController {
+	c.removeHandler = handler
+	return c
+}
+
+func (c *CachingWatchController) processNextItem() bool {
 	// Wait until there is a new item in the working queue
 	key, quit := c.queue.Get()
 	if quit {
@@ -41,44 +111,38 @@ func (c *Controller) processNextItem() bool {
 	defer c.queue.Done(key)
 
 	// Invoke the method containing the business logic
-	err := c.syncToPrometheus(key.(string))
+	err := c.handle(key.(string))
 	// Handle the error if something went wrong during the execution of the business logic
 	c.handleErr(err, key)
 	return true
 }
 
-// syncToPrometheus is the business logic of the controller. In this controller it simply prints
-// information about the pod to stdout. In case an error happened, it has to simply return the error.
-// The retry logic should not be part of the business logic.
-func (c *Controller) syncToPrometheus(key string) error {
+// handle is the business logic of the controller.
+func (c *CachingWatchController) handle(key string) error {
 	obj, exists, err := c.indexer.GetByKey(key)
 	if err != nil {
-		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
+		klog.Errorf("Fetching %s with key %s from store failed with %v", c.resourceType, key, err)
 		return err
 	}
 
 	if !exists {
-		// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
-		klog.V(1).Infof("Pod %s does not exist anymore\n", key)
+		klog.V(3).Infof("Removed %s for key: %s\n", c.resourceType, key)
+
+		if c.removeHandler != nil {
+			c.removeHandler(key)
+		}
 	} else {
-		// Note that you also have to check the uid if you have a local controlled resource, which
-		// is dependent on the actual instance, to detect that a Pod was recreated with the same name
-		klog.V(1).Infof("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
-	}
-	return nil
-}
+		klog.V(3).Infof("Updated %s: %s\n", c.resourceType, obj.(v1.Object).GetName())
 
-func (c *Controller) GetAll() []*v1.Pod {
-	objs := c.indexer.List()
-	var pods []*v1.Pod
-	for _, obj := range objs {
-		pods = append(pods, obj.(*v1.Pod))
+		if c.updateHandler != nil {
+			c.updateHandler(obj)
+		}
 	}
-	return pods
+	return nil
 }
 
 // handleErr checks if an error happened and makes sure we will retry later.
-func (c *Controller) handleErr(err error, key interface{}) {
+func (c *CachingWatchController) handleErr(err error, key interface{}) {
 	if err == nil {
 		// Forget about the #AddRateLimited history of the key on every successful synchronization.
 		// This ensures that future processing of updates for this key is not delayed because of
@@ -89,7 +153,7 @@ func (c *Controller) handleErr(err error, key interface{}) {
 
 	// This controller retries 5 times if something goes wrong. After that, it stops trying.
 	if c.queue.NumRequeues(key) < 5 {
-		klog.Infof("Error syncing pod %v: %v", key, err)
+		klog.V(3).Infof("Error syncing %s %v: %v", c.resourceType, key, err)
 
 		// Re-enqueue the key rate limited. Based on the rate limiter on the
 		// queue and the re-enqueue history, the key will be processed later again.
@@ -100,15 +164,15 @@ func (c *Controller) handleErr(err error, key interface{}) {
 	c.queue.Forget(key)
 	// Report to an external entity that, even after several retries, we could not successfully process this key
 	runtime.HandleError(err)
-	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
+	klog.Infof("Dropping %s %q out of the queue: %v", c.resourceType, key, err)
 }
 
-func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
+func (c *CachingWatchController) Run(threadiness int, stopCh chan struct{}) {
 	defer runtime.HandleCrash()
 
 	// Let the workers stop when we are done
 	defer c.queue.ShutDown()
-	klog.Info("Starting Pod controller")
+	klog.V(3).Infof("Starting %s controller", c.resourceType)
 
 	go c.informer.Run(stopCh)
 
@@ -123,57 +187,10 @@ func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
 	}
 
 	<-stopCh
-	klog.Info("Stopping Pod controller")
+	klog.V(3).Infof("Stopping %s controller", c.resourceType)
 }
 
-func (c *Controller) runWorker() {
+func (c *CachingWatchController) runWorker() {
 	for c.processNextItem() {
 	}
 }
-
-func ContainerUptimeWatcher(clientset kubernetes.Interface) {
-	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
-
-	// create the workqueue
-	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
-
-	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
-	// whenever the cache is updated, the pod key is added to the workqueue.
-	// Note that when we finally process the item from the workqueue, we might see a newer version
-	// of the Pod than the version which was responsible for triggering the update.
-	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		UpdateFunc: func(old interface{}, new interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(new)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
-			// key function.
-			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-	}, cache.Indexers{})
-
-	controller := NewController(queue, indexer, informer)
-
-	/*
-		podList, _ := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
-		for _, pod := range podList.Items {
-			indexer.Add(&pod)
-		}
-	*/
-	// Now let's start the controller
-	stop := make(chan struct{})
-	//defer close(stop)
-	go controller.Run(1, stop)
-}

+ 7 - 13
main.go

@@ -21,16 +21,13 @@ import (
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/prometheus/client_golang/prometheus"
 
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 
-	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
-	"k8s.io/client-go/tools/cache"
 )
 
 const (
@@ -387,7 +384,7 @@ func (a *Accesses) recordPrices() {
 
 		for {
 			klog.V(4).Info("Recording prices...")
-			podlist := a.Model.Controller.GetAll()
+			podlist := a.Model.Cache.GetAllPods()
 			podStatus := make(map[string]v1.PodPhase)
 			for _, pod := range podlist {
 				podStatus[pod.Name] = pod.Status.Phase
@@ -452,10 +449,9 @@ func (a *Accesses) recordPrices() {
 					containerSeen[labelKey] = false
 				}
 
-				storageClasses, _ := a.KubeClientSet.StorageV1().StorageClasses().List(metav1.ListOptions{})
-
+				storageClasses := a.Model.Cache.GetAllStorageClasses()
 				storageClassMap := make(map[string]map[string]string)
-				for _, storageClass := range storageClasses.Items {
+				for _, storageClass := range storageClasses {
 					params := storageClass.Parameters
 					storageClassMap[storageClass.ObjectMeta.Name] = params
 					if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -464,8 +460,8 @@ func (a *Accesses) recordPrices() {
 					}
 				}
 
-				pvs, _ := a.KubeClientSet.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-				for _, pv := range pvs.Items {
+				pvs := a.Model.Cache.GetAllPersistentVolumes()
+				for _, pv := range pvs {
 					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 					if !ok {
 						klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
@@ -475,7 +471,7 @@ func (a *Accesses) recordPrices() {
 						Region:     pv.Labels[v1.LabelZoneRegion],
 						Parameters: parameters,
 					}
-					costModel.GetPVCost(cacPv, &pv, a.Cloud)
+					costModel.GetPVCost(cacPv, pv, a.Cloud)
 					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
 					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
 					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
@@ -637,8 +633,6 @@ func main() {
 		KubeClientSet: kubeClientset,
 	})
 
-	podCache := cache.NewListWatchFromClient(kubeClientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
-
 	a := Accesses{
 		PrometheusClient:              promCli,
 		KubeClientSet:                 kubeClientset,
@@ -652,7 +646,7 @@ func main() {
 		GPUAllocationRecorder:         GPUAllocation,
 		ContainerUptimeRecorder:       ContainerUptimeRecorder,
 		PersistentVolumePriceRecorder: pvGv,
-		Model:                         costModel.NewCostModel(podCache),
+		Model:                         costModel.NewCostModel(kubeClientset),
 	}
 
 	remoteEnabled := os.Getenv(remoteEnabled)