Răsfoiți Sursa

Merge branch 'master' into add-network-costs

Matt Bolt 6 ani în urmă
părinte
comite
e43a6c67ba

+ 3 - 0
cloud/azureprovider.go

@@ -551,6 +551,9 @@ func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, e
 }
 func (az *Azure) GetConfig() (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("azure.json")
+	if c.Discount == "" {
+		c.Discount = "0%"
+	}
 	if err != nil {
 		return nil, err
 	}

+ 167 - 193
costmodel/aggregations.go

@@ -1,225 +1,199 @@
 package costmodel
 
 import (
-	"database/sql"
-	"fmt"
-	"os"
-	"time"
-
-	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
-	_ "github.com/lib/pq"
+	"math"
+	"sort"
+	"strconv"
 )
 
-const remotePW = "REMOTE_WRITE_PASSWORD"
-const sqlAddress = "SQL_ADDRESS"
-
-func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
-
-	nodes := make(map[string]*costAnalyzerCloud.Node)
+type Aggregation struct {
+	Aggregator         string    `json:"aggregation"`
+	AggregatorSubField string    `json:"aggregationSubfield"`
+	Environment        string    `json:"environment"`
+	Cluster            string    `json:"cluster"`
+	CPUAllocation      []*Vector `json:"-"`
+	CPUCostVector      []*Vector `json:"-"`
+	RAMAllocation      []*Vector `json:"-"`
+	RAMCostVector      []*Vector `json:"-"`
+	PVCostVector       []*Vector `json:"-"`
+	GPUAllocation      []*Vector `json:"-"`
+	GPUCostVector      []*Vector `json:"-"`
+	CPUCost            float64   `json:"cpuCost"`
+	RAMCost            float64   `json:"ramCost"`
+	GPUCost            float64   `json:"gpuCost"`
+	PVCost             float64   `json:"pvCost"`
+	NetworkCost        float64   `json:"networkCost"`
+	TotalCost          float64   `json:"totalCost"`
+}
 
-	query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
-	FROM metrics
-	WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost')  AND value != 'NaN' AND value != 0
-	GROUP BY instance,name,clusterid`
-	rows, err := db.Query(query)
-	if err != nil {
-		return nil, err
-	}
-	defer rows.Close()
-	for rows.Next() {
-		var (
-			name      string
-			avg       float64
-			instance  string
-			clusterid string
-		)
-		if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
-			return nil, err
-		}
-		if data, ok := nodes[instance]; ok {
-			if name == "node_cpu_hourly_cost" {
-				data.VCPUCost = fmt.Sprintf("%f", avg)
-			} else if name == "node_ram_hourly_cost" {
-				data.RAMCost = fmt.Sprintf("%f", avg)
-			} else if name == "node_gpu_hourly_cost" {
-				data.GPUCost = fmt.Sprintf("%f", avg)
+func AggregateCostModel(costData map[string]*CostData, discount float64, aggregationField string, aggregationSubField string) map[string]*Aggregation {
+	aggregations := make(map[string]*Aggregation)
+	for _, costDatum := range costData {
+		if aggregationField == "cluster" {
+			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations, discount)
+		} else if aggregationField == "namespace" {
+			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations, discount)
+		} else if aggregationField == "service" {
+			if len(costDatum.Services) > 0 {
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations, discount)
+			}
+		} else if aggregationField == "deployment" {
+			if len(costDatum.Deployments) > 0 {
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations, discount)
 			}
-		} else {
-			nodes[instance] = &costAnalyzerCloud.Node{}
-			data := nodes[instance]
-			if name == "node_cpu_hourly_cost" {
-				data.VCPUCost = fmt.Sprintf("%f", avg)
-			} else if name == "node_ram_hourly_cost" {
-				data.RAMCost = fmt.Sprintf("%f", avg)
-			} else if name == "node_gpu_hourly_cost" {
-				data.GPUCost = fmt.Sprintf("%f", avg)
+		} else if aggregationField == "label" {
+			if costDatum.Labels != nil {
+				if subfieldName, ok := costDatum.Labels[aggregationSubField]; ok {
+					aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations, discount)
+				}
 			}
 		}
+	}
+	for _, agg := range aggregations {
+		agg.CPUCost = totalVector(agg.CPUCostVector)
+		agg.RAMCost = totalVector(agg.RAMCostVector)
+		agg.GPUCost = totalVector(agg.GPUCostVector)
+		agg.PVCost = totalVector(agg.PVCostVector)
+		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost
+	}
+	return aggregations
+}
 
+func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation, discount float64) {
+	if _, ok := aggregations[key]; !ok {
+		agg := &Aggregation{}
+		agg.Aggregator = aggregator
+		agg.AggregatorSubField = aggregatorSubField
+		agg.Environment = key
+		agg.Cluster = costDatum.ClusterID
+		aggregations[key] = agg
 	}
+	mergeVectors(costDatum, aggregations[key], discount)
+}
+
+func mergeVectors(costDatum *CostData, aggregation *Aggregation, discount float64) {
+	aggregation.CPUAllocation = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocation)
+	aggregation.RAMAllocation = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocation)
+	aggregation.GPUAllocation = addVectors(costDatum.GPUReq, aggregation.GPUAllocation)
 
-	return nodes, nil
+	cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum, discount)
+	aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
+	aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
+	aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
+	for _, vectorList := range pvvs {
+		aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
+	}
 }
 
-func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
-	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
-	db, err := sql.Open("postgres", connStr)
-	defer db.Close()
-	if err != nil {
-		return nil, err
+func getPriceVectors(costDatum *CostData, discount float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
+	cpuv := make([]*Vector, 0, len(costDatum.CPUAllocation))
+	for _, val := range costDatum.CPUAllocation {
+		cost, _ := strconv.ParseFloat(costDatum.NodeData.VCPUCost, 64)
+		cpuv = append(cpuv, &Vector{
+			Timestamp: math.Round(val.Timestamp/10) * 10,
+			Value:     val.Value * cost * (1 - discount),
+		})
 	}
-	nodes, err := getNodeCosts(db)
-	if err != nil {
-		return nil, err
+	ramv := make([]*Vector, 0, len(costDatum.RAMAllocation))
+	for _, val := range costDatum.RAMAllocation {
+		cost, _ := strconv.ParseFloat(costDatum.NodeData.RAMCost, 64)
+		ramv = append(ramv, &Vector{
+			Timestamp: math.Round(val.Timestamp/10) * 10,
+			Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount),
+		})
 	}
-	model := make(map[string]*CostData)
-	query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
-	FROM metrics
-	WHERE (name='container_cpu_allocation') AND
-	  time > $2 AND time < $3 AND value != 'NaN'
-	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
-	ORDER BY container,bucket;
-	`
-	rows, err := db.Query(query, window, start, end)
-	if err != nil {
-		return nil, err
+	gpuv := make([]*Vector, 0, len(costDatum.GPUReq))
+	for _, val := range costDatum.GPUReq {
+		cost, _ := strconv.ParseFloat(costDatum.NodeData.GPUCost, 64)
+		gpuv = append(gpuv, &Vector{
+			Timestamp: math.Round(val.Timestamp/10) * 10,
+			Value:     val.Value * cost * (1 - discount),
+		})
 	}
-	defer rows.Close()
-
-	for rows.Next() {
-		var (
-			bucket    string
-			name      string
-			sum       float64
-			container string
-			pod       string
-			namespace string
-			instance  string
-			clusterid string
-		)
-		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
-			return nil, err
-		}
-		layout := "2006-01-02T15:04:05Z"
-		t, err := time.Parse(layout, bucket)
-		if err != nil {
-			return nil, err
+	pvvs := make([][]*Vector, 0, len(costDatum.PVCData))
+	for _, pvcData := range costDatum.PVCData {
+		pvv := make([]*Vector, 0, len(pvcData.Values))
+		if pvcData.Volume != nil {
+			cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
+			for _, val := range pvcData.Values {
+				pvv = append(pvv, &Vector{
+					Timestamp: math.Round(val.Timestamp/10) * 10,
+					Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount),
+				})
+			}
+			pvvs = append(pvvs, pvv)
 		}
+	}
+	return cpuv, ramv, gpuv, pvvs
+}
 
-		k := newContainerMetricFromValues(namespace, pod, container, instance)
-		key := k.Key()
-		allocationVector := &Vector{
-			Timestamp: float64(t.Unix()),
-			Value:     sum,
-		}
-		if data, ok := model[key]; ok {
-			if name == "container_cpu_allocation" {
-				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
-			} else if name == "container_memory_allocation_bytes" {
-				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
-			} else if name == "container_gpu_allocation" {
-				data.GPUReq = append(data.GPUReq, allocationVector)
-			}
-		} else {
-			node, ok := nodes[instance]
-			if !ok {
-				return nil, fmt.Errorf("No node found")
-			}
-			model[key] = &CostData{
-				Name:          container,
-				PodName:       pod,
-				NodeName:      instance,
-				NodeData:      node,
-				CPUAllocation: []*Vector{},
-				RAMAllocation: []*Vector{},
-				GPUReq:        []*Vector{},
-				Namespace:     namespace,
-				ClusterID:     clusterid,
-			}
-			data := model[key]
-			if name == "container_cpu_allocation" {
-				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
-			} else if name == "container_memory_allocation_bytes" {
-				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
-			} else if name == "container_gpu_allocation" {
-				data.GPUReq = append(data.GPUReq, allocationVector)
+func totalVector(vectors []*Vector) float64 {
+	total := 0.0
+	for _, vector := range vectors {
+		total += vector.Value
+	}
+	return total
+}
+
+func addVectors(req []*Vector, used []*Vector) []*Vector {
+	if req == nil || len(req) == 0 {
+		for _, usedV := range used {
+			if usedV.Timestamp == 0 {
+				continue
 			}
+			usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
 		}
+		return used
 	}
-	query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
-	FROM metrics
-	WHERE (name='container_memory_allocation_bytes') AND
-		time > $2 AND time < $3 AND value != 'NaN'
-	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
-	ORDER BY container,bucket;
-	`
-	rows, err = db.Query(query, window, start, end)
-	if err != nil {
-		return nil, err
+	if used == nil || len(used) == 0 {
+		for _, reqV := range req {
+			if reqV.Timestamp == 0 {
+				continue
+			}
+			reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
+		}
+		return req
 	}
-	defer rows.Close()
+	var allocation []*Vector
 
-	for rows.Next() {
-		var (
-			bucket    string
-			name      string
-			sum       float64
-			container string
-			pod       string
-			namespace string
-			instance  string
-			clusterid string
-		)
-		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
-			return nil, err
+	var timestamps []float64
+	reqMap := make(map[float64]float64)
+	for _, reqV := range req {
+		if reqV.Timestamp == 0 {
+			continue
+		}
+		reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
+		reqMap[reqV.Timestamp] = reqV.Value
+		timestamps = append(timestamps, reqV.Timestamp)
+	}
+	usedMap := make(map[float64]float64)
+	for _, usedV := range used {
+		if usedV.Timestamp == 0 {
+			continue
 		}
-		layout := "2006-01-02T15:04:05Z"
-		t, err := time.Parse(layout, bucket)
-		if err != nil {
-			return nil, err
+		usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
+		usedMap[usedV.Timestamp] = usedV.Value
+		if _, ok := reqMap[usedV.Timestamp]; !ok { // no need to double add, since we'll range over sorted timestamps and check.
+			timestamps = append(timestamps, usedV.Timestamp)
 		}
+	}
 
-		k := newContainerMetricFromValues(namespace, pod, container, instance)
-		key := k.Key()
+	sort.Float64s(timestamps)
+	for _, t := range timestamps {
+		rv, okR := reqMap[t]
+		uv, okU := usedMap[t]
 		allocationVector := &Vector{
-			Timestamp: float64(t.Unix()),
-			Value:     sum,
+			Timestamp: t,
 		}
-		if data, ok := model[key]; ok {
-			if name == "container_cpu_allocation" {
-				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
-			} else if name == "container_memory_allocation_bytes" {
-				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
-			} else if name == "container_gpu_allocation" {
-				data.GPUReq = append(data.GPUReq, allocationVector)
-			}
-		} else {
-			node, ok := nodes[instance]
-			if !ok {
-				return nil, fmt.Errorf("No node found")
-			}
-			model[key] = &CostData{
-				Name:          container,
-				PodName:       pod,
-				NodeName:      instance,
-				NodeData:      node,
-				CPUAllocation: []*Vector{},
-				RAMAllocation: []*Vector{},
-				GPUReq:        []*Vector{},
-				Namespace:     namespace,
-				ClusterID:     clusterid,
-			}
-			data := model[key]
-			if name == "container_cpu_allocation" {
-				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
-			} else if name == "container_memory_allocation_bytes" {
-				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
-			} else if name == "container_gpu_allocation" {
-				data.GPUReq = append(data.GPUReq, allocationVector)
-			}
+		if okR && okU {
+			allocationVector.Value = rv + uv
+		} else if okR {
+			allocationVector.Value = rv
+		} else if okU {
+			allocationVector.Value = uv
 		}
+		allocation = append(allocation, allocationVector)
 	}
-	return model, nil
+
+	return allocation
 }

+ 4 - 4
costmodel/cluster.go

@@ -125,21 +125,21 @@ func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider,
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 
-	resultClusterCores, err := query(cli, qCores)
+	resultClusterCores, err := Query(cli, qCores)
 	if err != nil {
 		return nil, err
 	}
-	resultClusterRAM, err := query(cli, qRAM)
+	resultClusterRAM, err := Query(cli, qRAM)
 	if err != nil {
 		return nil, err
 	}
 
-	resultStorage, err := query(cli, qStorage)
+	resultStorage, err := Query(cli, qStorage)
 	if err != nil {
 		return nil, err
 	}
 
-	resultTotal, err := query(cli, qTotal)
+	resultTotal, err := Query(cli, qTotal)
 	if err != nil {
 		return nil, err
 	}

+ 165 - 0
costmodel/clustercache.go

@@ -0,0 +1,165 @@
+package costmodel
+
+import (
+	"sync"
+
+	appsv1 "k8s.io/api/apps/v1"
+	v1 "k8s.io/api/core/v1"
+	stv1 "k8s.io/api/storage/v1"
+	"k8s.io/apimachinery/pkg/fields"
+	"k8s.io/client-go/kubernetes"
+)
+
+// ClusterCache defines an contract for an object which caches components within a cluster, ensuring
+// up to date resources using watchers
+type ClusterCache interface {
+	// Run starts the watcher processes
+	Run(stopCh chan struct{})
+
+	// GetAllNamespaces returns all the cached namespaces
+	GetAllNamespaces() []*v1.Namespace
+
+	// GetAllNodes returns all the cached nodes
+	GetAllNodes() []*v1.Node
+
+	// GetAllPods returns all the cached pods
+	GetAllPods() []*v1.Pod
+
+	// GetAllServices returns all the cached services
+	GetAllServices() []*v1.Service
+
+	// GetAllDeployments returns all the cached deployments
+	GetAllDeployments() []*appsv1.Deployment
+
+	// GetAllPersistentVolumes returns all the cached persistent volumes
+	GetAllPersistentVolumes() []*v1.PersistentVolume
+
+	// GetAllStorageClasses returns all the cached storage classes
+	GetAllStorageClasses() []*stv1.StorageClass
+}
+
+// KubernetesClusterCache is the implementation of ClusterCache
+type KubernetesClusterCache struct {
+	client kubernetes.Interface
+
+	namespaceWatch    WatchController
+	nodeWatch         WatchController
+	podWatch          WatchController
+	serviceWatch      WatchController
+	deploymentsWatch  WatchController
+	pvWatch           WatchController
+	storageClassWatch WatchController
+}
+
+func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
+	defer wg.Done()
+	wc.WarmUp(cancel)
+}
+
+func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
+	coreRestClient := client.CoreV1().RESTClient()
+	appsRestClient := client.AppsV1().RESTClient()
+	storageRestClient := client.StorageV1().RESTClient()
+
+	kcc := &KubernetesClusterCache{
+		client:            client,
+		namespaceWatch:    NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
+		nodeWatch:         NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
+		podWatch:          NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
+		serviceWatch:      NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		deploymentsWatch:  NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		pvWatch:           NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+	}
+
+	// Wait for each caching watcher to initialize
+	var wg sync.WaitGroup
+	wg.Add(7)
+
+	cancel := make(chan struct{})
+
+	go initializeCache(kcc.namespaceWatch, &wg, cancel)
+	go initializeCache(kcc.nodeWatch, &wg, cancel)
+	go initializeCache(kcc.podWatch, &wg, cancel)
+	go initializeCache(kcc.serviceWatch, &wg, cancel)
+	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
+	go initializeCache(kcc.pvWatch, &wg, cancel)
+	go initializeCache(kcc.storageClassWatch, &wg, cancel)
+
+	wg.Wait()
+
+	return kcc
+}
+
+func (kcc *KubernetesClusterCache) Run(stopCh chan struct{}) {
+	go kcc.namespaceWatch.Run(1, stopCh)
+	go kcc.nodeWatch.Run(1, stopCh)
+	go kcc.podWatch.Run(1, stopCh)
+	go kcc.serviceWatch.Run(1, stopCh)
+	go kcc.deploymentsWatch.Run(1, stopCh)
+	go kcc.pvWatch.Run(1, stopCh)
+	go kcc.storageClassWatch.Run(1, stopCh)
+}
+
+func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
+	var namespaces []*v1.Namespace
+	items := kcc.namespaceWatch.GetAll()
+	for _, ns := range items {
+		namespaces = append(namespaces, ns.(*v1.Namespace))
+	}
+	return namespaces
+}
+
+func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
+	var nodes []*v1.Node
+	items := kcc.nodeWatch.GetAll()
+	for _, node := range items {
+		nodes = append(nodes, node.(*v1.Node))
+	}
+	return nodes
+}
+
+func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
+	var pods []*v1.Pod
+	items := kcc.podWatch.GetAll()
+	for _, pod := range items {
+		pods = append(pods, pod.(*v1.Pod))
+	}
+	return pods
+}
+
+func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
+	var services []*v1.Service
+	items := kcc.serviceWatch.GetAll()
+	for _, service := range items {
+		services = append(services, service.(*v1.Service))
+	}
+	return services
+}
+
+func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
+	var deployments []*appsv1.Deployment
+	items := kcc.deploymentsWatch.GetAll()
+	for _, deployment := range items {
+		deployments = append(deployments, deployment.(*appsv1.Deployment))
+	}
+	return deployments
+}
+
+func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
+	var pvs []*v1.PersistentVolume
+	items := kcc.pvWatch.GetAll()
+	for _, pv := range items {
+		pvs = append(pvs, pv.(*v1.PersistentVolume))
+	}
+	return pvs
+}
+
+func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
+	var storageClasses []*stv1.StorageClass
+	items := kcc.storageClassWatch.GetAll()
+	for _, stc := range items {
+		storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
+	}
+	return storageClasses
+}

+ 0 - 179
costmodel/containeruptime.go

@@ -1,179 +0,0 @@
-package costmodel
-
-import (
-	"fmt"
-	"time"
-
-	"k8s.io/klog"
-
-	v1 "k8s.io/api/core/v1"
-	"k8s.io/apimachinery/pkg/fields"
-	"k8s.io/apimachinery/pkg/util/runtime"
-	"k8s.io/apimachinery/pkg/util/wait"
-	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/tools/cache"
-	"k8s.io/client-go/util/workqueue"
-)
-
-type Controller struct {
-	indexer  cache.Indexer
-	queue    workqueue.RateLimitingInterface
-	informer cache.Controller
-}
-
-func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
-	return &Controller{
-		informer: informer,
-		indexer:  indexer,
-		queue:    queue,
-	}
-}
-
-func (c *Controller) processNextItem() bool {
-	// Wait until there is a new item in the working queue
-	key, quit := c.queue.Get()
-	if quit {
-		return false
-	}
-	// Tell the queue that we are done with processing this key. This unblocks the key for other workers
-	// This allows safe parallel processing because two pods with the same key are never processed in
-	// parallel.
-	defer c.queue.Done(key)
-
-	// Invoke the method containing the business logic
-	err := c.syncToPrometheus(key.(string))
-	// Handle the error if something went wrong during the execution of the business logic
-	c.handleErr(err, key)
-	return true
-}
-
-// syncToPrometheus is the business logic of the controller. In this controller it simply prints
-// information about the pod to stdout. In case an error happened, it has to simply return the error.
-// The retry logic should not be part of the business logic.
-func (c *Controller) syncToPrometheus(key string) error {
-	obj, exists, err := c.indexer.GetByKey(key)
-	if err != nil {
-		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
-		return err
-	}
-
-	if !exists {
-		// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
-		klog.V(1).Infof("Pod %s does not exist anymore\n", key)
-	} else {
-		// Note that you also have to check the uid if you have a local controlled resource, which
-		// is dependent on the actual instance, to detect that a Pod was recreated with the same name
-		klog.V(1).Infof("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
-	}
-	return nil
-}
-
-func (c *Controller) GetAll() []*v1.Pod {
-	objs := c.indexer.List()
-	var pods []*v1.Pod
-	for _, obj := range objs {
-		pods = append(pods, obj.(*v1.Pod))
-	}
-	return pods
-}
-
-// handleErr checks if an error happened and makes sure we will retry later.
-func (c *Controller) handleErr(err error, key interface{}) {
-	if err == nil {
-		// Forget about the #AddRateLimited history of the key on every successful synchronization.
-		// This ensures that future processing of updates for this key is not delayed because of
-		// an outdated error history.
-		c.queue.Forget(key)
-		return
-	}
-
-	// This controller retries 5 times if something goes wrong. After that, it stops trying.
-	if c.queue.NumRequeues(key) < 5 {
-		klog.Infof("Error syncing pod %v: %v", key, err)
-
-		// Re-enqueue the key rate limited. Based on the rate limiter on the
-		// queue and the re-enqueue history, the key will be processed later again.
-		c.queue.AddRateLimited(key)
-		return
-	}
-
-	c.queue.Forget(key)
-	// Report to an external entity that, even after several retries, we could not successfully process this key
-	runtime.HandleError(err)
-	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
-}
-
-func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
-	defer runtime.HandleCrash()
-
-	// Let the workers stop when we are done
-	defer c.queue.ShutDown()
-	klog.Info("Starting Pod controller")
-
-	go c.informer.Run(stopCh)
-
-	// Wait for all involved caches to be synced, before processing items from the queue is started
-	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
-		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
-		return
-	}
-
-	for i := 0; i < threadiness; i++ {
-		go wait.Until(c.runWorker, time.Second, stopCh)
-	}
-
-	<-stopCh
-	klog.Info("Stopping Pod controller")
-}
-
-func (c *Controller) runWorker() {
-	for c.processNextItem() {
-	}
-}
-
-func ContainerUptimeWatcher(clientset kubernetes.Interface) {
-	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
-
-	// create the workqueue
-	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
-
-	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
-	// whenever the cache is updated, the pod key is added to the workqueue.
-	// Note that when we finally process the item from the workqueue, we might see a newer version
-	// of the Pod than the version which was responsible for triggering the update.
-	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		UpdateFunc: func(old interface{}, new interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(new)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
-			// key function.
-			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-	}, cache.Indexers{})
-
-	controller := NewController(queue, indexer, informer)
-
-	/*
-		podList, _ := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
-		for _, pod := range podList.Items {
-			indexer.Add(&pod)
-		}
-	*/
-	// Now let's start the controller
-	stop := make(chan struct{})
-	//defer close(stop)
-	go controller.Run(1, stop)
-}

+ 71 - 114
costmodel/costmodel.go

@@ -19,8 +19,6 @@ import (
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/tools/cache"
-	"k8s.io/client-go/util/workqueue"
 	"k8s.io/klog"
 )
 
@@ -41,55 +39,24 @@ const (
 	epConfig          = apiPrefix + "/status/config"
 	epFlags           = apiPrefix + "/status/flags"
 	remoteEnabled     = "REMOTE_WRITE_ENABLED"
+	CLUSTER_ID        = "CLUSTER_ID"
 )
 
 type CostModel struct {
-	Controller *Controller
-}
+	Cache ClusterCache
 
-func NewCostModel(podListWatcher cache.ListerWatcher) *CostModel {
-	cm := &CostModel{}
-	cm.ContainerWatcher(podListWatcher)
-	return cm
+	stop chan struct{}
 }
 
-func (cm *CostModel) ContainerWatcher(podListWatcher cache.ListerWatcher) {
-
-	// create the workqueue
-	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+func NewCostModel(client kubernetes.Interface) *CostModel {
+	stopCh := make(chan struct{})
+	cache := NewKubernetesClusterCache(client)
+	cache.Run(stopCh)
 
-	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
-	// whenever the cache is updated, the pod key is added to the workqueue.
-	// Note that when we finally process the item from the workqueue, we might see a newer version
-	// of the Pod than the version which was responsible for triggering the update.
-	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
-		AddFunc: func(obj interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		UpdateFunc: func(old interface{}, new interface{}) {
-			key, err := cache.MetaNamespaceKeyFunc(new)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
-			// key function.
-			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
-			if err == nil {
-				queue.Add(key)
-			}
-		},
-	}, cache.Indexers{})
-
-	cm.Controller = NewController(queue, indexer, informer)
-	// Now let's start the controller
-	stop := make(chan struct{})
-	//defer close(stop)
-	go cm.Controller.Run(1, stop)
+	return &CostModel{
+		Cache: cache,
+		stop:  stopCh,
+	}
 }
 
 type CostData struct {
@@ -187,7 +154,7 @@ type PrometheusMetadata struct {
 
 // ValidatePrometheus tells the model what data prometheus has on it.
 func ValidatePrometheus(cli prometheusClient.Client) (*PrometheusMetadata, error) {
-	data, err := query(cli, "up")
+	data, err := Query(cli, "up")
 	if err != nil {
 		return &PrometheusMetadata{
 			Running:            false,
@@ -266,11 +233,11 @@ func getUptimeData(qr interface{}) ([]*Vector, bool, error) {
 }
 
 func ComputeUptimes(cli prometheusClient.Client) (map[string]float64, error) {
-	res, err := query(cli, `container_start_time_seconds{container_name != "POD",container_name != ""}`)
+	res, err := Query(cli, `container_start_time_seconds{container_name != "POD",container_name != ""}`)
 	if err != nil {
 		return nil, err
 	}
-	vectors, err := getContainerMetricVector(res, false, 0)
+	vectors, err := GetContainerMetricVector(res, false, 0)
 	if err != nil {
 		return nil, err
 	}
@@ -298,38 +265,40 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, window, "")
 	normalization := fmt.Sprintf(normalizationStr, window, offset)
 
+	clustID := os.Getenv(CLUSTER_ID)
+
 	var wg sync.WaitGroup
 	wg.Add(11)
 
 	var promErr error
 	var resultRAMRequests interface{}
 	go func() {
-		resultRAMRequests, promErr = query(cli, queryRAMRequests)
+		resultRAMRequests, promErr = Query(cli, queryRAMRequests)
 		defer wg.Done()
 	}()
 	var resultRAMUsage interface{}
 	go func() {
-		resultRAMUsage, promErr = query(cli, queryRAMUsage)
+		resultRAMUsage, promErr = Query(cli, queryRAMUsage)
 		defer wg.Done()
 	}()
 	var resultCPURequests interface{}
 	go func() {
-		resultCPURequests, promErr = query(cli, queryCPURequests)
+		resultCPURequests, promErr = Query(cli, queryCPURequests)
 		defer wg.Done()
 	}()
 	var resultCPUUsage interface{}
 	go func() {
-		resultCPUUsage, promErr = query(cli, queryCPUUsage)
+		resultCPUUsage, promErr = Query(cli, queryCPUUsage)
 		defer wg.Done()
 	}()
 	var resultGPURequests interface{}
 	go func() {
-		resultGPURequests, promErr = query(cli, queryGPURequests)
+		resultGPURequests, promErr = Query(cli, queryGPURequests)
 		defer wg.Done()
 	}()
 	var resultPVRequests interface{}
 	go func() {
-		resultPVRequests, promErr = query(cli, queryPVRequests)
+		resultPVRequests, promErr = Query(cli, queryPVRequests)
 		defer wg.Done()
 	}()
 	var resultNetZoneRequests interface{}
@@ -349,28 +318,28 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	}()
 	var normalizationResult interface{}
 	go func() {
-		normalizationResult, promErr = query(cli, normalization)
+		normalizationResult, promErr = Query(cli, normalization)
 		defer wg.Done()
 	}()
 
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
-	podlist := cm.Controller.GetAll()
+	podlist := cm.Cache.GetAllPods()
 	var k8sErr error
 	go func() {
 		defer wg.Done()
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache)
 		if k8sErr != nil {
 			return
 		}
@@ -391,7 +360,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(clientset, cloud)
+	nodes, err := getNodeCost(cm.Cache, cloud)
 	if err != nil {
 		klog.V(1).Infof("Warning, no Node cost model available: " + err.Error())
 		return nil, err
@@ -402,7 +371,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(clientset, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cloud)
 		if err != nil {
 			return nil, err
 		}
@@ -417,7 +386,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	containerNameCost := make(map[string]*CostData)
 	containers := make(map[string]bool)
 
-	RAMReqMap, err := getContainerMetricVector(resultRAMRequests, true, normalizationValue)
+	RAMReqMap, err := GetContainerMetricVector(resultRAMRequests, true, normalizationValue)
 	if err != nil {
 		return nil, err
 	}
@@ -425,28 +394,28 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		containers[key] = true
 	}
 
-	RAMUsedMap, err := getContainerMetricVector(resultRAMUsage, true, normalizationValue)
+	RAMUsedMap, err := GetContainerMetricVector(resultRAMUsage, true, normalizationValue)
 	if err != nil {
 		return nil, err
 	}
 	for key := range RAMUsedMap {
 		containers[key] = true
 	}
-	CPUReqMap, err := getContainerMetricVector(resultCPURequests, true, normalizationValue)
+	CPUReqMap, err := GetContainerMetricVector(resultCPURequests, true, normalizationValue)
 	if err != nil {
 		return nil, err
 	}
 	for key := range CPUReqMap {
 		containers[key] = true
 	}
-	GPUReqMap, err := getContainerMetricVector(resultGPURequests, true, normalizationValue)
+	GPUReqMap, err := GetContainerMetricVector(resultGPURequests, true, normalizationValue)
 	if err != nil {
 		return nil, err
 	}
 	for key := range GPUReqMap {
 		containers[key] = true
 	}
-	CPUUsedMap, err := getContainerMetricVector(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
+	CPUUsedMap, err := GetContainerMetricVector(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
 	if err != nil {
 		return nil, err
 	}
@@ -592,6 +561,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 					NetworkData:     netReq,
 					Labels:          podLabels,
 					NamespaceLabels: nsLabels,
+					ClusterID:       clustID,
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -660,6 +630,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
+				ClusterID:       clustID,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -694,7 +665,7 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 		l := strings.Join(q, "|")
 		queryHistoricalPodLabels := fmt.Sprintf(`kube_pod_labels{pod=~"%s"}[%s]`, l, window)
 
-		podLabelsResult, err := query(cli, queryHistoricalPodLabels)
+		podLabelsResult, err := Query(cli, queryHistoricalPodLabels)
 		if err != nil {
 			return fmt.Errorf("Error fetching historical pod labels: " + err.Error())
 		}
@@ -704,6 +675,7 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 			labels, ok := podLabels[cm.PodName]
 			if !ok {
 				klog.V(1).Infof("Unable to find historical data for pod '%s'", cm.PodName)
+				labels = make(map[string]string)
 			}
 			for k, v := range costData.NamespaceLabels {
 				if _, ok := labels[k]; !ok {
@@ -770,15 +742,15 @@ func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*c
 		queryHistoricalRAMCost := fmt.Sprintf(`avg_over_time(node_ram_hourly_cost{instance=~"%s"}[%s])`, l, window)
 		queryHistoricalGPUCost := fmt.Sprintf(`avg_over_time(node_gpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
 
-		cpuCostResult, err := query(cli, queryHistoricalCPUCost)
+		cpuCostResult, err := Query(cli, queryHistoricalCPUCost)
 		if err != nil {
 			return fmt.Errorf("Error fetching cpu cost data: " + err.Error())
 		}
-		ramCostResult, err := query(cli, queryHistoricalRAMCost)
+		ramCostResult, err := Query(cli, queryHistoricalRAMCost)
 		if err != nil {
 			return fmt.Errorf("Error fetching ram cost data: " + err.Error())
 		}
-		gpuCostResult, err := query(cli, queryHistoricalGPUCost)
+		gpuCostResult, err := Query(cli, queryHistoricalGPUCost)
 		if err != nil {
 			return fmt.Errorf("Error fetching gpu cost data: " + err.Error())
 		}
@@ -882,17 +854,14 @@ func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
 
 	return allocation
 }
-func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
+func addPVData(cache ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 		return err
 	}
-	storageClasses, err := clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	storageClasses := cache.GetAllStorageClasses()
 	storageClassMap := make(map[string]map[string]string)
-	for _, storageClass := range storageClasses.Items {
+	for _, storageClass := range storageClasses {
 		params := storageClass.Parameters
 		storageClassMap[storageClass.ObjectMeta.Name] = params
 		if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -901,12 +870,9 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 		}
 	}
 
-	pvs, err := clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-	if err != nil {
-		return err
-	}
+	pvs := cache.GetAllPersistentVolumes()
 	pvMap := make(map[string]*costAnalyzerCloud.PV)
-	for _, pv := range pvs.Items {
+	for _, pv := range pvs {
 		parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 		if !ok {
 			klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
@@ -916,7 +882,7 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 			Region:     pv.Labels[v1.LabelZoneRegion],
 			Parameters: parameters,
 		}
-		err := GetPVCost(cacPv, &pv, cloud)
+		err := GetPVCost(cacPv, pv, cloud)
 		if err != nil {
 			return err
 		}
@@ -955,17 +921,14 @@ func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cloud costAna
 	return nil
 }
 
-func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
+func getNodeCost(cache ClusterCache, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
 		return nil, err
 	}
-	nodeList, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+	nodeList := cache.GetAllNodes()
 	nodes := make(map[string]*costAnalyzerCloud.Node)
-	for _, n := range nodeList.Items {
+	for _, n := range nodeList {
 		name := n.GetObjectMeta().GetName()
 		nodeLabels := n.GetObjectMeta().GetLabels()
 		nodeLabels["providerID"] = n.Spec.ProviderID
@@ -1085,13 +1048,10 @@ func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provide
 	return nodes, nil
 }
 
-func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
-	servicesList, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+func getPodServices(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+	servicesList := cache.GetAllServices()
 	podServicesMapping := make(map[string]map[string][]string)
-	for _, service := range servicesList.Items {
+	for _, service := range servicesList {
 		namespace := service.GetObjectMeta().GetNamespace()
 		name := service.GetObjectMeta().GetName()
 
@@ -1114,13 +1074,10 @@ func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[stri
 	return podServicesMapping, nil
 }
 
-func getPodDeployments(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
-	deploymentsList, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
+func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[string][]string, error) {
+	deploymentsList := cache.GetAllDeployments()
 	podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
-	for _, deployment := range deploymentsList.Items {
+	for _, deployment := range deploymentsList {
 		namespace := deployment.GetObjectMeta().GetNamespace()
 		name := deployment.GetObjectMeta().GetName()
 		if _, ok := podDeploymentsMapping[namespace]; !ok {
@@ -1175,6 +1132,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
+	clustID := os.Getenv(CLUSTER_ID)
 	remoteEnabled := os.Getenv(remoteEnabled)
 	if remoteEnabled == "true" {
 		remoteLayout := "2006-01-02T15:04:05Z"
@@ -1235,27 +1193,27 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	}()
 	var normalizationResult interface{}
 	go func() {
-		normalizationResult, promErr = query(cli, normalization)
+		normalizationResult, promErr = Query(cli, normalization)
 		defer wg.Done()
 	}()
 
 	podDeploymentsMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)
 	namespaceLabelsMapping := make(map[string]map[string]string)
-	podlist := cm.Controller.GetAll()
+	podlist := cm.Cache.GetAllPods()
 	var k8sErr error
 	go func() {
 
-		podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
+		podDeploymentsMapping, k8sErr = getPodDeployments(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
 
-		podServicesMapping, k8sErr = getPodServices(clientset, podlist)
+		podServicesMapping, k8sErr = getPodServices(cm.Cache, podlist)
 		if k8sErr != nil {
 			return
 		}
-		namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
+		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache)
 		if k8sErr != nil {
 			return
 		}
@@ -1277,7 +1235,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(clientset, cloud)
+	nodes, err := getNodeCost(cm.Cache, cloud)
 	if err != nil {
 		klog.V(1).Infof("Warning, no cost model available: " + err.Error())
 		return nil, err
@@ -1289,7 +1247,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(clientset, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cloud)
 		if err != nil {
 			return nil, err
 		}
@@ -1479,6 +1437,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 					Labels:          podLabels,
 					NetworkData:     netReq,
 					NamespaceLabels: nsLabels,
+					ClusterID:       clustID,
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -1545,6 +1504,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
+				ClusterID:       clustID,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -1576,13 +1536,10 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return containerNameCost, err
 }
 
-func getNamespaceLabels(clientset kubernetes.Interface) (map[string]map[string]string, error) {
+func getNamespaceLabels(cache ClusterCache) (map[string]map[string]string, error) {
 	nsToLabels := make(map[string]map[string]string)
-	nss, err := clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
-	if err != nil {
-		return nil, err
-	}
-	for _, ns := range nss.Items {
+	nss := cache.GetAllNamespaces()
+	for _, ns := range nss {
 		nsToLabels[ns.Name] = ns.Labels
 	}
 	return nsToLabels, nil
@@ -1883,7 +1840,7 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 	return toReturn, err
 }
 
-func query(cli prometheusClient.Client, query string) (interface{}, error) {
+func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 	u := cli.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)
@@ -2024,7 +1981,7 @@ func newContainerMetricFromPrometheus(metrics map[string]interface{}) (*Containe
 	}, nil
 }
 
-func getContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
+func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(qr)

+ 278 - 0
costmodel/sql.go

@@ -0,0 +1,278 @@
+package costmodel
+
+import (
+	"database/sql"
+	"encoding/json"
+	"fmt"
+	"os"
+	"time"
+
+	"k8s.io/klog"
+
+	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
+	_ "github.com/lib/pq"
+)
+
+const remotePW = "REMOTE_WRITE_PASSWORD"
+const sqlAddress = "SQL_ADDRESS"
+
+func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
+
+	nodes := make(map[string]*costAnalyzerCloud.Node)
+
+	query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost')  AND value != 'NaN' AND value != 0
+	GROUP BY instance,name,clusterid`
+	rows, err := db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var (
+			name      string
+			avg       float64
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		if data, ok := nodes[instance]; ok {
+			if name == "node_cpu_hourly_cost" {
+				data.VCPUCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_ram_hourly_cost" {
+				data.RAMCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_gpu_hourly_cost" {
+				data.GPUCost = fmt.Sprintf("%f", avg)
+			}
+		} else {
+			nodes[instance] = &costAnalyzerCloud.Node{}
+			data := nodes[instance]
+			if name == "node_cpu_hourly_cost" {
+				data.VCPUCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_ram_hourly_cost" {
+				data.RAMCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_gpu_hourly_cost" {
+				data.GPUCost = fmt.Sprintf("%f", avg)
+			}
+		}
+
+	}
+
+	return nodes, nil
+}
+
+func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
+	pw := os.Getenv(remotePW)
+	address := os.Getenv(sqlAddress)
+	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
+	db, err := sql.Open("postgres", connStr)
+	defer db.Close()
+	if err != nil {
+		return nil, err
+	}
+	nodes, err := getNodeCosts(db)
+	if err != nil {
+		return nil, err
+	}
+	model := make(map[string]*CostData)
+	query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='container_cpu_allocation') AND
+	  time > $2 AND time < $3 AND value != 'NaN'
+	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
+	ORDER BY container,bucket;
+	`
+	rows, err := db.Query(query, window, start, end)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var (
+			bucket    string
+			name      string
+			sum       float64
+			container string
+			pod       string
+			namespace string
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		layout := "2006-01-02T15:04:05Z"
+		t, err := time.Parse(layout, bucket)
+		if err != nil {
+			return nil, err
+		}
+
+		k := newContainerMetricFromValues(namespace, pod, container, instance)
+		key := k.Key()
+		allocationVector := &Vector{
+			Timestamp: float64(t.Unix()),
+			Value:     sum,
+		}
+		if data, ok := model[key]; ok {
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		} else {
+			node, ok := nodes[instance]
+			if !ok {
+				return nil, fmt.Errorf("No node found")
+			}
+			model[key] = &CostData{
+				Name:          container,
+				PodName:       pod,
+				NodeName:      instance,
+				NodeData:      node,
+				CPUAllocation: []*Vector{},
+				RAMAllocation: []*Vector{},
+				GPUReq:        []*Vector{},
+				Namespace:     namespace,
+				ClusterID:     clusterid,
+			}
+			data := model[key]
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		}
+	}
+	query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='container_memory_allocation_bytes') AND
+		time > $2 AND time < $3 AND value != 'NaN'
+	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
+	ORDER BY container,bucket;
+	`
+	rows, err = db.Query(query, window, start, end)
+	if err != nil {
+		return nil, err
+	}
+	for rows.Next() {
+		var (
+			bucket    string
+			name      string
+			sum       float64
+			container string
+			pod       string
+			namespace string
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		layout := "2006-01-02T15:04:05Z"
+		t, err := time.Parse(layout, bucket)
+		if err != nil {
+			return nil, err
+		}
+
+		k := newContainerMetricFromValues(namespace, pod, container, instance)
+		key := k.Key()
+		allocationVector := &Vector{
+			Timestamp: float64(t.Unix()),
+			Value:     sum,
+		}
+		if data, ok := model[key]; ok {
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		} else {
+			node, ok := nodes[instance]
+			if !ok {
+				return nil, fmt.Errorf("No node found")
+			}
+			model[key] = &CostData{
+				Name:          container,
+				PodName:       pod,
+				NodeName:      instance,
+				NodeData:      node,
+				CPUAllocation: []*Vector{},
+				RAMAllocation: []*Vector{},
+				GPUReq:        []*Vector{},
+				Namespace:     namespace,
+				ClusterID:     clusterid,
+			}
+			data := model[key]
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		}
+	}
+	query = `SELECT DISTINCT ON (labels->>'namespace') * FROM METRICS WHERE name='kube_namespace_labels' ORDER BY labels->>'namespace',time DESC;`
+	rows, err = db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	cols, err := rows.Columns()
+	if err != nil {
+		return nil, err
+	}
+	rawResult := make([][]byte, len(cols))
+	result := make([]string, len(cols))
+	dest := make([]interface{}, len(cols)) // A temporary interface{} slice
+	for i, _ := range rawResult {
+		dest[i] = &rawResult[i] // Put pointers to each string in the interface slice
+	}
+	nsToLabels := make(map[string]map[string]string)
+	for rows.Next() {
+		err = rows.Scan(dest...)
+		if err != nil {
+			return nil, err
+		}
+
+		for i, raw := range rawResult {
+			if raw == nil {
+				result[i] = "\\N"
+			} else {
+				result[i] = string(raw)
+			}
+		}
+
+		klog.Infof("%#v\n", result)
+		var dat map[string]string
+		err := json.Unmarshal([]byte(result[4]), &dat)
+		if err != nil {
+			return nil, err
+		}
+
+		ns, ok := dat["namespace"]
+		if !ok {
+			return nil, fmt.Errorf("No namespace found")
+		}
+		nsToLabels[ns] = dat
+	}
+
+	for _, cd := range model {
+		ns := cd.Namespace
+		if labels, ok := nsToLabels[ns]; ok {
+			cd.NamespaceLabels = labels
+			cd.Labels = labels // TODO: override with podlabels
+		}
+	}
+
+	return model, nil
+}

+ 201 - 0
costmodel/watchcontroller.go

@@ -0,0 +1,201 @@
+package costmodel
+
+import (
+	"fmt"
+	"reflect"
+	"time"
+
+	"k8s.io/klog"
+
+	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/fields"
+	rt "k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/util/runtime"
+	"k8s.io/apimachinery/pkg/util/wait"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/util/workqueue"
+)
+
+// Type alias for a receiver func
+type WatchHandler = func(interface{})
+
+// WatchController defines a contract for an object which watches a specific resource set for
+// add, updates, and removals
+type WatchController interface {
+	// Initializes the cache
+	WarmUp(chan struct{})
+
+	// Run starts the watching process
+	Run(int, chan struct{})
+
+	// GetAll returns all of the resources
+	GetAll() []interface{}
+
+	// SetUpdateHandler sets a specific handler for adding/updating individual resources
+	SetUpdateHandler(WatchHandler) WatchController
+
+	// SetRemovedHandler sets a specific handler for removing individual resources
+	SetRemovedHandler(WatchHandler) WatchController
+}
+
+// CachingWatchController composites the watching behavior and a cache to ensure that all
+// up to date resources are readily available
+type CachingWatchController struct {
+	indexer  cache.Indexer
+	queue    workqueue.RateLimitingInterface
+	informer cache.Controller
+
+	resource     string
+	resourceType string
+
+	updateHandler WatchHandler
+	removeHandler WatchHandler
+}
+
+func NewCachingWatcher(restClient rest.Interface, resource string, resourceType rt.Object, namespace string, fieldSelector fields.Selector) WatchController {
+	resourceCache := cache.NewListWatchFromClient(restClient, resource, namespace, fieldSelector)
+	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+	indexer, informer := cache.NewIndexerInformer(resourceCache, resourceType, 0, cache.ResourceEventHandlerFuncs{
+		AddFunc: func(obj interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		UpdateFunc: func(old interface{}, new interface{}) {
+			key, err := cache.MetaNamespaceKeyFunc(new)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+		DeleteFunc: func(obj interface{}) {
+			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
+			// key function.
+			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+			if err == nil {
+				queue.Add(key)
+			}
+		},
+	}, cache.Indexers{})
+
+	return &CachingWatchController{
+		indexer:      indexer,
+		queue:        queue,
+		informer:     informer,
+		resource:     resource,
+		resourceType: reflect.TypeOf(resourceType).String(),
+	}
+}
+
+func (c *CachingWatchController) GetAll() []interface{} {
+	return c.indexer.List()
+}
+
+func (c *CachingWatchController) SetUpdateHandler(handler WatchHandler) WatchController {
+	c.updateHandler = handler
+	return c
+}
+
+func (c *CachingWatchController) SetRemovedHandler(handler WatchHandler) WatchController {
+	c.removeHandler = handler
+	return c
+}
+
+func (c *CachingWatchController) processNextItem() bool {
+	// Wait until there is a new item in the working queue
+	key, quit := c.queue.Get()
+	if quit {
+		return false
+	}
+	// Tell the queue that we are done with processing this key. This unblocks the key for other workers
+	// This allows safe parallel processing because two pods with the same key are never processed in
+	// parallel.
+	defer c.queue.Done(key)
+
+	// Invoke the method containing the business logic
+	err := c.handle(key.(string))
+	// Handle the error if something went wrong during the execution of the business logic
+	c.handleErr(err, key)
+	return true
+}
+
+// handle is the business logic of the controller.
+func (c *CachingWatchController) handle(key string) error {
+	obj, exists, err := c.indexer.GetByKey(key)
+	if err != nil {
+		klog.Errorf("Fetching %s with key %s from store failed with %v", c.resourceType, key, err)
+		return err
+	}
+
+	if !exists {
+		klog.V(3).Infof("Removed %s for key: %s\n", c.resourceType, key)
+
+		if c.removeHandler != nil {
+			c.removeHandler(key)
+		}
+	} else {
+		klog.V(3).Infof("Updated %s: %s\n", c.resourceType, obj.(v1.Object).GetName())
+
+		if c.updateHandler != nil {
+			c.updateHandler(obj)
+		}
+	}
+	return nil
+}
+
+// handleErr checks if an error happened and makes sure we will retry later.
+func (c *CachingWatchController) handleErr(err error, key interface{}) {
+	if err == nil {
+		// Forget about the #AddRateLimited history of the key on every successful synchronization.
+		// This ensures that future processing of updates for this key is not delayed because of
+		// an outdated error history.
+		c.queue.Forget(key)
+		return
+	}
+
+	// This controller retries 5 times if something goes wrong. After that, it stops trying.
+	if c.queue.NumRequeues(key) < 5 {
+		klog.V(3).Infof("Error syncing %s %v: %v", c.resourceType, key, err)
+
+		// Re-enqueue the key rate limited. Based on the rate limiter on the
+		// queue and the re-enqueue history, the key will be processed later again.
+		c.queue.AddRateLimited(key)
+		return
+	}
+
+	c.queue.Forget(key)
+	// Report to an external entity that, even after several retries, we could not successfully process this key
+	runtime.HandleError(err)
+	klog.Infof("Dropping %s %q out of the queue: %v", c.resourceType, key, err)
+}
+
+func (c *CachingWatchController) WarmUp(cancelCh chan struct{}) {
+	go c.informer.Run(cancelCh)
+
+	// Wait for all involved caches to be synced, before processing items from the queue is started
+	if !cache.WaitForCacheSync(cancelCh, c.informer.HasSynced) {
+		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
+		return
+	}
+}
+
+func (c *CachingWatchController) Run(threadiness int, stopCh chan struct{}) {
+	defer runtime.HandleCrash()
+
+	// Let the workers stop when we are done
+	defer c.queue.ShutDown()
+	klog.V(3).Infof("Starting %s controller", c.resourceType)
+
+	for i := 0; i < threadiness; i++ {
+		go wait.Until(c.runWorker, time.Second, stopCh)
+	}
+
+	<-stopCh
+	klog.V(3).Infof("Stopping %s controller", c.resourceType)
+}
+
+func (c *CachingWatchController) runWorker() {
+	for c.processNextItem() {
+	}
+}

+ 109 - 21
main.go

@@ -21,16 +21,13 @@ import (
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/prometheus/client_golang/prometheus"
 
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 
-	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
-	"k8s.io/client-go/tools/cache"
 )
 
 const (
@@ -139,17 +136,33 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 	offset := r.URL.Query().Get("offset")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
+	aggregation := r.URL.Query().Get("aggregation")
+	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 
 	if offset != "" {
 		offset = "offset " + offset
 	}
 
 	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
-	if fields != "" {
-		filteredData := filterFields(fields, data)
-		w.Write(wrapData(filteredData, err))
+	if aggregation != "" {
+		c, err := a.Cloud.GetConfig()
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+
+		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
+		w.Write(wrapData(agg, nil))
 	} else {
-		w.Write(wrapData(data, err))
+		if fields != "" {
+			filteredData := filterFields(fields, data)
+			w.Write(wrapData(filteredData, err))
+		} else {
+			w.Write(wrapData(data, err))
+		}
 	}
 }
 
@@ -185,6 +198,65 @@ func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request,
 	w.Write(wrapData(data, err))
 }
 
+func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	window := r.URL.Query().Get("window")
+	offset := r.URL.Query().Get("offset")
+	aggregation := r.URL.Query().Get("aggregation")
+	namespace := r.URL.Query().Get("namespace")
+	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
+
+	endTime := time.Now()
+	if offset != "" {
+		o, err := time.ParseDuration(offset)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+
+		endTime = endTime.Add(-1 * o)
+	}
+
+	if window[len(window)-1:] == "d" {
+		count := window[:len(window)-1]
+		val, err := strconv.ParseInt(count, 10, 64)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+		val = val * 24
+		window = fmt.Sprintf("%dh", val)
+	}
+
+	d, err := time.ParseDuration(window)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+	startTime := endTime.Add(-1 * d)
+	layout := "2006-01-02T15:04:05.000Z"
+	start := startTime.Format(layout)
+	end := endTime.Format(layout)
+	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+	c, err := a.Cloud.GetConfig()
+	if err != nil {
+		w.Write(wrapData(nil, err))
+	}
+	discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+	}
+	if aggregation != "" {
+		agg := costModel.AggregateCostModel(data, discount*0.01, aggregation, aggregationSubField)
+		w.Write(wrapData(agg, nil))
+	}
+}
+
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -194,13 +266,31 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	window := r.URL.Query().Get("window")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
+	aggregation := r.URL.Query().Get("aggregation")
+	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 
 	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace)
-	if fields != "" {
-		filteredData := filterFields(fields, data)
-		w.Write(wrapData(filteredData, err))
+	if err != nil {
+		w.Write(wrapData(nil, err))
+	}
+	if aggregation != "" {
+		c, err := a.Cloud.GetConfig()
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
+		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
+		w.Write(wrapData(agg, nil))
 	} else {
-		w.Write(wrapData(data, err))
+		if fields != "" {
+			filteredData := filterFields(fields, data)
+			w.Write(wrapData(filteredData, err))
+		} else {
+			w.Write(wrapData(data, err))
+		}
 	}
 }
 
@@ -390,7 +480,7 @@ func (a *Accesses) recordPrices() {
 
 		for {
 			klog.V(4).Info("Recording prices...")
-			podlist := a.Model.Controller.GetAll()
+			podlist := a.Model.Cache.GetAllPods()
 			podStatus := make(map[string]v1.PodPhase)
 			for _, pod := range podlist {
 				podStatus[pod.Name] = pod.Status.Phase
@@ -466,10 +556,9 @@ func (a *Accesses) recordPrices() {
 					containerSeen[labelKey] = false
 				}
 
-				storageClasses, _ := a.KubeClientSet.StorageV1().StorageClasses().List(metav1.ListOptions{})
-
+				storageClasses := a.Model.Cache.GetAllStorageClasses()
 				storageClassMap := make(map[string]map[string]string)
-				for _, storageClass := range storageClasses.Items {
+				for _, storageClass := range storageClasses {
 					params := storageClass.Parameters
 					storageClassMap[storageClass.ObjectMeta.Name] = params
 					if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
@@ -478,8 +567,8 @@ func (a *Accesses) recordPrices() {
 					}
 				}
 
-				pvs, _ := a.KubeClientSet.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
-				for _, pv := range pvs.Items {
+				pvs := a.Model.Cache.GetAllPersistentVolumes()
+				for _, pv := range pvs {
 					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 					if !ok {
 						klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
@@ -489,7 +578,7 @@ func (a *Accesses) recordPrices() {
 						Region:     pv.Labels[v1.LabelZoneRegion],
 						Parameters: parameters,
 					}
-					costModel.GetPVCost(cacPv, &pv, a.Cloud)
+					costModel.GetPVCost(cacPv, pv, a.Cloud)
 					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
 					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
 					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
@@ -665,8 +754,6 @@ func main() {
 		KubeClientSet: kubeClientset,
 	})
 
-	podCache := cache.NewListWatchFromClient(kubeClientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
-
 	a := Accesses{
 		PrometheusClient:              promCli,
 		KubeClientSet:                 kubeClientset,
@@ -683,7 +770,7 @@ func main() {
 		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
-		Model:                         costModel.NewCostModel(podCache),
+		Model:                         costModel.NewCostModel(kubeClientset),
 	}
 
 	remoteEnabled := os.Getenv(remoteEnabled)
@@ -725,6 +812,7 @@ func main() {
 	router.GET("/managementPlatform", a.ManagementPlatform)
 	router.GET("/clusterInfo", a.ClusterInfo)
 	router.GET("/containerUptimes", a.ContainerUptimes)
+	router.GET("/aggregatedCostModel", a.AggregateCostModel)
 
 	rootMux := http.NewServeMux()
 	rootMux.Handle("/", router)