Просмотр исходного кода

Merge branch 'master' into add-network-costs

Matt Bolt 6 лет назад
Родитель
Сommit
342c7aef4b
10 измененных файлов с 743 добавлено и 38 удалено
  1. 1 1
      CONTRIBUTING.md
  2. 1 1
      README.md
  3. 8 0
      cloud/awsprovider.go
  4. 8 0
      cloud/azureprovider.go
  5. 8 0
      cloud/gcpprovider.go
  6. 101 0
      cloud/provider.go
  7. 225 0
      costmodel/aggregations.go
  8. 150 26
      costmodel/costmodel.go
  9. 170 0
      costmodel/metrics.go
  10. 71 10
      main.go

+ 1 - 1
CONTRIBUTING.md

@@ -5,7 +5,7 @@ Thanks for your help improving the project!
 ## Getting Help ##
 
 If you have a question about Kubecost or have encountered problems using it,
-you can start by asking a question on [Slack](https://join.slack.com/t/kubecost/shared_invite/enQtNTA2MjQ1NDUyODE5LTg0MzYyMDIzN2E4M2M5OTE3NjdmODJlNzBjZGY1NjQ3MThlODVjMGY3NWZlNjQ5NjIwNDc2NGU3MWNiM2E5Mjc) or via email at [team@kubecost.com](team@kubecost.com)
+you can start by asking a question on [Slack](https://join.slack.com/t/kubecost/shared_invite/enQtNTA2MjQ1NDUyODE5LWFjYzIzNWE4MDkzMmUyZGU4NjkwMzMyMjIyM2E0NGNmYjExZjBiNjk1YzY5ZDI0ZTNhZDg4NjlkMGRkYzFlZTU) or via email at [team@kubecost.com](team@kubecost.com)
 
 ## Building ## 
 

+ 1 - 1
README.md

@@ -72,4 +72,4 @@ Modify [spotCPU](https://github.com/kubecost/cost-model/blob/master/cloud/defaul
 
 We supply a global key with a low limit for evaluation, but you will want to supply your own before moving to production.  
   
-Please reach out with any additional questions on  [Slack](https://join.slack.com/t/kubecost/shared_invite/enQtNTA2MjQ1NDUyODE5LTg0MzYyMDIzN2E4M2M5OTE3NjdmODJlNzBjZGY1NjQ3MThlODVjMGY3NWZlNjQ5NjIwNDc2NGU3MWNiM2E5Mjc) or via email at [team@kubecost.com](team@kubecost.com). 
+Please reach out with any additional questions on  [Slack](https://join.slack.com/t/kubecost/shared_invite/enQtNTA2MjQ1NDUyODE5LWFjYzIzNWE4MDkzMmUyZGU4NjkwMzMyMjIyM2E0NGNmYjExZjBiNjk1YzY5ZDI0ZTNhZDg4NjlkMGRkYzFlZTU) or via email at [team@kubecost.com](team@kubecost.com). 

+ 8 - 0
cloud/awsprovider.go

@@ -326,6 +326,13 @@ func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 		path = "/models/"
 	}
 	path += "aws.json"
+	remoteEnabled := os.Getenv(remoteEnabled)
+	if remoteEnabled == "true" {
+		err = UpdateClusterMeta(os.Getenv(KC_CLUSTER_ID), c.ClusterName)
+		if err != nil {
+			return nil, err
+		}
+	}
 	err = ioutil.WriteFile(path, cj, 0644)
 	if err != nil {
 		return nil, err
@@ -819,6 +826,7 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		m := make(map[string]string)
 		m["name"] = clusterName
 		m["provider"] = "AWS"
+		m["id"] = os.Getenv(KC_CLUSTER_ID)
 		return m, nil
 	}
 

+ 8 - 0
cloud/azureprovider.go

@@ -498,6 +498,7 @@ func (az *Azure) ClusterInfo() (map[string]string, error) {
 		m["name"] = c.ClusterName
 	}
 	m["provider"] = "azure"
+	m["id"] = os.Getenv(KC_CLUSTER_ID)
 	return m, nil
 
 }
@@ -532,6 +533,13 @@ func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, e
 	if err != nil {
 		return nil, err
 	}
+	remoteEnabled := os.Getenv(remoteEnabled)
+	if remoteEnabled == "true" {
+		err = UpdateClusterMeta(os.Getenv(KC_CLUSTER_ID), c.ClusterName)
+		if err != nil {
+			return nil, err
+		}
+	}
 
 	configPath := path + "azure.json"
 	err = ioutil.WriteFile(configPath, cj, 0644)

+ 8 - 0
cloud/gcpprovider.go

@@ -163,6 +163,13 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 	if err != nil {
 		return nil, err
 	}
+	remoteEnabled := os.Getenv(remoteEnabled)
+	if remoteEnabled == "true" {
+		err = UpdateClusterMeta(os.Getenv(KC_CLUSTER_ID), c.ClusterName)
+		if err != nil {
+			return nil, err
+		}
+	}
 
 	configPath := path + "gcp.json"
 	err = ioutil.WriteFile(configPath, cj, 0644)
@@ -256,6 +263,7 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m := make(map[string]string)
 	m["name"] = attribute
 	m["provider"] = "GCP"
+	m["id"] = os.Getenv(KC_CLUSTER_ID)
 	return m, nil
 }
 

+ 101 - 0
cloud/provider.go

@@ -1,6 +1,7 @@
 package cloud
 
 import (
+	"database/sql"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -22,6 +23,19 @@ import (
 	"k8s.io/client-go/kubernetes"
 )
 
+const KC_CLUSTER_ID = "CLUSTER_ID"
+const remotePW = "REMOTE_WRITE_PASSWORD"
+const sqlAddress = "SQL_ADDRESS"
+const remoteEnabled = "REMOTE_WRITE_ENABLED"
+
+var createTableStatements = []string{
+	`CREATE TABLE IF NOT EXISTS names (
+		cluster_id VARCHAR(255) NOT NULL,
+		cluster_name VARCHAR(255) NULL,
+		PRIMARY KEY (cluster_id)
+	);`,
+}
+
 // Node is the interface by which the provider and cost model communicate Node prices.
 // The provider will best-effort try to fill out this struct.
 type Node struct {
@@ -487,3 +501,90 @@ func NewProvider(clientset *kubernetes.Clientset, apiKey string) (Provider, erro
 		}, nil
 	}
 }
+
+func UpdateClusterMeta(cluster_id, cluster_name string) error {
+	pw := os.Getenv(remotePW)
+	address := os.Getenv(sqlAddress)
+	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
+	db, err := sql.Open("postgres", connStr)
+	if err != nil {
+		return err
+	}
+	defer db.Close()
+	updateStmt := `UPDATE names SET cluster_name = $1 WHERE cluster_id = $2;`
+	_, err = db.Exec(updateStmt, cluster_name, cluster_id)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func CreateClusterMeta(cluster_id, cluster_name string) error {
+	pw := os.Getenv(remotePW)
+	address := os.Getenv(sqlAddress)
+	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
+	db, err := sql.Open("postgres", connStr)
+	if err != nil {
+		return err
+	}
+	defer db.Close()
+	for _, stmt := range createTableStatements {
+		_, err := db.Exec(stmt)
+		if err != nil {
+			return err
+		}
+	}
+	insertStmt := `INSERT INTO names (cluster_id, cluster_name) VALUES ($1, $2);`
+	_, err = db.Exec(insertStmt, cluster_id, cluster_name)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func GetClusterMeta(cluster_id string) (string, string, error) {
+	pw := os.Getenv(remotePW)
+	address := os.Getenv(sqlAddress)
+	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
+	db, err := sql.Open("postgres", connStr)
+	defer db.Close()
+	query := `SELECT cluster_id, cluster_name
+	FROM names
+	WHERE cluster_id = ?`
+
+	rows, err := db.Query(query, cluster_id)
+	if err != nil {
+		return "", "", err
+	}
+	defer rows.Close()
+	var (
+		sql_cluster_id string
+		cluster_name   string
+	)
+	for rows.Next() {
+		if err := rows.Scan(&sql_cluster_id, &cluster_name); err != nil {
+			return "", "", err
+		}
+	}
+
+	return sql_cluster_id, cluster_name, nil
+}
+
+func GetOrCreateClusterMeta(cluster_id, cluster_name string) (string, string, error) {
+	id, name, err := GetClusterMeta(cluster_id)
+	if err != nil {
+		err := CreateClusterMeta(cluster_id, cluster_name)
+		if err != nil {
+			return "", "", err
+		}
+	}
+	if id == "" {
+		err := CreateClusterMeta(cluster_id, cluster_name)
+		if err != nil {
+			return "", "", err
+		}
+	}
+
+	return id, name, nil
+
+}

+ 225 - 0
costmodel/aggregations.go

@@ -0,0 +1,225 @@
+package costmodel
+
+import (
+	"database/sql"
+	"fmt"
+	"os"
+	"time"
+
+	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
+	_ "github.com/lib/pq"
+)
+
+const remotePW = "REMOTE_WRITE_PASSWORD"
+const sqlAddress = "SQL_ADDRESS"
+
+func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
+
+	nodes := make(map[string]*costAnalyzerCloud.Node)
+
+	query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost')  AND value != 'NaN' AND value != 0
+	GROUP BY instance,name,clusterid`
+	rows, err := db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var (
+			name      string
+			avg       float64
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		if data, ok := nodes[instance]; ok {
+			if name == "node_cpu_hourly_cost" {
+				data.VCPUCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_ram_hourly_cost" {
+				data.RAMCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_gpu_hourly_cost" {
+				data.GPUCost = fmt.Sprintf("%f", avg)
+			}
+		} else {
+			nodes[instance] = &costAnalyzerCloud.Node{}
+			data := nodes[instance]
+			if name == "node_cpu_hourly_cost" {
+				data.VCPUCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_ram_hourly_cost" {
+				data.RAMCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_gpu_hourly_cost" {
+				data.GPUCost = fmt.Sprintf("%f", avg)
+			}
+		}
+
+	}
+
+	return nodes, nil
+}
+
+func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
+	pw := os.Getenv(remotePW)
+	address := os.Getenv(sqlAddress)
+	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
+	db, err := sql.Open("postgres", connStr)
+	defer db.Close()
+	if err != nil {
+		return nil, err
+	}
+	nodes, err := getNodeCosts(db)
+	if err != nil {
+		return nil, err
+	}
+	model := make(map[string]*CostData)
+	query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='container_cpu_allocation') AND
+	  time > $2 AND time < $3 AND value != 'NaN'
+	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
+	ORDER BY container,bucket;
+	`
+	rows, err := db.Query(query, window, start, end)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var (
+			bucket    string
+			name      string
+			sum       float64
+			container string
+			pod       string
+			namespace string
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		layout := "2006-01-02T15:04:05Z"
+		t, err := time.Parse(layout, bucket)
+		if err != nil {
+			return nil, err
+		}
+
+		k := newContainerMetricFromValues(namespace, pod, container, instance)
+		key := k.Key()
+		allocationVector := &Vector{
+			Timestamp: float64(t.Unix()),
+			Value:     sum,
+		}
+		if data, ok := model[key]; ok {
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		} else {
+			node, ok := nodes[instance]
+			if !ok {
+				return nil, fmt.Errorf("No node found")
+			}
+			model[key] = &CostData{
+				Name:          container,
+				PodName:       pod,
+				NodeName:      instance,
+				NodeData:      node,
+				CPUAllocation: []*Vector{},
+				RAMAllocation: []*Vector{},
+				GPUReq:        []*Vector{},
+				Namespace:     namespace,
+				ClusterID:     clusterid,
+			}
+			data := model[key]
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		}
+	}
+	query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='container_memory_allocation_bytes') AND
+		time > $2 AND time < $3 AND value != 'NaN'
+	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
+	ORDER BY container,bucket;
+	`
+	rows, err = db.Query(query, window, start, end)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var (
+			bucket    string
+			name      string
+			sum       float64
+			container string
+			pod       string
+			namespace string
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		layout := "2006-01-02T15:04:05Z"
+		t, err := time.Parse(layout, bucket)
+		if err != nil {
+			return nil, err
+		}
+
+		k := newContainerMetricFromValues(namespace, pod, container, instance)
+		key := k.Key()
+		allocationVector := &Vector{
+			Timestamp: float64(t.Unix()),
+			Value:     sum,
+		}
+		if data, ok := model[key]; ok {
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		} else {
+			node, ok := nodes[instance]
+			if !ok {
+				return nil, fmt.Errorf("No node found")
+			}
+			model[key] = &CostData{
+				Name:          container,
+				PodName:       pod,
+				NodeName:      instance,
+				NodeData:      node,
+				CPUAllocation: []*Vector{},
+				RAMAllocation: []*Vector{},
+				GPUReq:        []*Vector{},
+				Namespace:     namespace,
+				ClusterID:     clusterid,
+			}
+			data := model[key]
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		}
+	}
+	return model, nil
+}

+ 150 - 26
costmodel/costmodel.go

@@ -114,6 +114,7 @@ type CostData struct {
 	NetworkData     []*Vector                    `json:"network,omitempty"`
 	Labels          map[string]string            `json:"labels,omitempty"`
 	NamespaceLabels map[string]string            `json:"namespaceLabels,omitempty"`
+	ClusterID       string                       `json:"clusterId"`
 }
 
 type Vector struct {
@@ -467,6 +468,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		}
 	}
 	missingNodes := make(map[string]*costAnalyzerCloud.Node)
+	missingContainers := make(map[string]*CostData)
 	for key := range containers {
 		if _, ok := containerNameCost[key]; ok {
 			continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
@@ -477,6 +479,9 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 
 			nsLabels := namespaceLabelsMapping[ns]
 			podLabels := pod.GetObjectMeta().GetLabels()
+			if podLabels == nil {
+				podLabels = make(map[string]string)
+			}
 
 			for k, v := range nsLabels {
 				if _, ok := podLabels[k]; !ok {
@@ -639,35 +644,119 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 					missingNodes[c.NodeName] = node
 				}
 			}
+			namespacelabels, ok := namespaceLabelsMapping[c.Namespace]
+			if !ok {
+				klog.V(3).Infof("Missing data for namespace %s", c.Namespace)
+			}
 			costs := &CostData{
-				Name:      c.ContainerName,
-				PodName:   c.PodName,
-				NodeName:  c.NodeName,
-				NodeData:  node,
-				Namespace: c.Namespace,
-				RAMReq:    RAMReqV,
-				RAMUsed:   RAMUsedV,
-				CPUReq:    CPUReqV,
-				CPUUsed:   CPUUsedV,
-				GPUReq:    GPUReqV,
+				Name:            c.ContainerName,
+				PodName:         c.PodName,
+				NodeName:        c.NodeName,
+				NodeData:        node,
+				Namespace:       c.Namespace,
+				RAMReq:          RAMReqV,
+				RAMUsed:         RAMUsedV,
+				CPUReq:          CPUReqV,
+				CPUUsed:         CPUUsedV,
+				GPUReq:          GPUReqV,
+				NamespaceLabels: namespacelabels,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
 			if filterNamespace == "" {
 				containerNameCost[key] = costs
+				missingContainers[key] = costs
 			} else if costs.Namespace == filterNamespace {
 				containerNameCost[key] = costs
+				missingContainers[key] = costs
 			}
 		}
 	}
 	err = findDeletedNodeInfo(cli, missingNodes, window)
 
+	if err != nil {
+		return nil, err
+	}
+	err = findDeletedPodInfo(cli, missingContainers, window)
 	if err != nil {
 		return nil, err
 	}
 	return containerNameCost, err
 }
 
+func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[string]*CostData, window string) error {
+	if len(missingContainers) > 0 {
+		q := make([]string, 0, len(missingContainers))
+		for key := range missingContainers {
+			cm, _ := NewContainerMetricFromKey(key)
+			q = append(q, cm.PodName)
+		}
+		l := strings.Join(q, "|")
+		queryHistoricalPodLabels := fmt.Sprintf(`kube_pod_labels{pod=~"%s"}[%s]`, l, window)
+
+		podLabelsResult, err := query(cli, queryHistoricalPodLabels)
+		if err != nil {
+			return fmt.Errorf("Error fetching historical pod labels: " + err.Error())
+		}
+		podLabels, err := labelsFromPrometheusQuery(podLabelsResult)
+		for key, costData := range missingContainers {
+			cm, _ := NewContainerMetricFromKey(key)
+			labels, ok := podLabels[cm.PodName]
+			if !ok {
+				klog.V(1).Infof("Unable to find historical data for pod '%s'", cm.PodName)
+			}
+			for k, v := range costData.NamespaceLabels {
+				if _, ok := labels[k]; !ok {
+					labels[k] = v
+				}
+			}
+			costData.Labels = labels
+		}
+	}
+
+	return nil
+}
+
+func labelsFromPrometheusQuery(qr interface{}) (map[string]map[string]string, error) {
+	toReturn := make(map[string]map[string]string)
+	for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
+		metricInterface, ok := val.(map[string]interface{})["metric"]
+		if !ok {
+			return nil, fmt.Errorf("Metric field does not exist in data result vector")
+		}
+		metricMap, ok := metricInterface.(map[string]interface{})
+		if !ok {
+			return nil, fmt.Errorf("Metric field is improperly formatted")
+		}
+		pod, ok := metricMap["pod"]
+		if !ok {
+			return nil, fmt.Errorf("pod field does not exist in data result vector")
+		}
+		podName, ok := pod.(string)
+		if !ok {
+			return nil, fmt.Errorf("pod field is improperly formatted")
+		}
+
+		for labelName, labelValue := range metricMap {
+			parsedLabelName := labelName
+			parsedLv, ok := labelValue.(string)
+			if !ok {
+				return nil, fmt.Errorf("label value is improperly formatted")
+			}
+			if strings.HasPrefix(parsedLabelName, "label_") {
+				l := strings.Replace(parsedLabelName, "label_", "", 1)
+				if podLabels, ok := toReturn[podName]; ok {
+					podLabels[l] = parsedLv
+				} else {
+					toReturn[podName] = make(map[string]string)
+					toReturn[podName][l] = parsedLv
+				}
+			}
+		}
+	}
+	return toReturn, nil
+}
+
 func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*costAnalyzerCloud.Node, window string) error {
 	if len(missingNodes) > 0 {
 		q := make([]string, 0, len(missingNodes))
@@ -794,6 +883,10 @@ func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
 	return allocation
 }
 func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
+	cfg, err := cloud.GetConfig()
+	if err != nil {
+		return err
+	}
 	storageClasses, err := clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
 	if err != nil {
 		return err
@@ -831,16 +924,27 @@ func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*Persis
 	}
 
 	for _, pvc := range pvClaimMapping {
-		pvc.Volume = pvMap[pvc.VolumeName]
+		if vol, ok := pvMap[pvc.VolumeName]; ok {
+			pvc.Volume = vol
+		} else {
+			klog.V(1).Infof("PV not found, using default")
+			pvc.Volume = &costAnalyzerCloud.PV{
+				Cost: cfg.Storage,
+			}
+		}
 	}
 	return nil
 }
 
 func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cloud costAnalyzerCloud.Provider) error {
 	cfg, err := cloud.GetConfig()
+	if err != nil {
+		return err
+	}
 	key := cloud.GetPVKey(kpv, pv.Parameters)
 	pvWithCost, err := cloud.PVPricing(key)
 	if err != nil {
+		pv.Cost = cfg.Storage
 		return err
 	}
 	if pvWithCost == nil || pvWithCost.Cost == "" {
@@ -1069,9 +1173,12 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, err
 	}
 	remoteEnabled := os.Getenv(remoteEnabled)
-	if remoteEnabled == "true" && (end.Sub(start) > time.Hour*168) {
+	if remoteEnabled == "true" {
+		remoteLayout := "2006-01-02T15:04:05Z"
+		remoteStartStr := start.Format(remoteLayout)
+		remoteEndStr := end.Format(remoteLayout)
 		klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
-		return CostDataRangeFromSQL("", "", windowString, startString, endString)
+		return CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
 	}
 
 	var wg sync.WaitGroup
@@ -1225,7 +1332,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	}
 
 	missingNodes := make(map[string]*costAnalyzerCloud.Node)
-
+	missingContainers := make(map[string]*CostData)
 	for key := range containers {
 		if _, ok := containerNameCost[key]; ok {
 			continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
@@ -1270,6 +1377,10 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 			nsLabels := namespaceLabelsMapping[ns]
 			podLabels := pod.GetObjectMeta().GetLabels()
 
+			if podLabels == nil {
+				podLabels = make(map[string]string)
+			}
+
 			for k, v := range nsLabels {
 				if _, ok := podLabels[k]; !ok {
 					podLabels[k] = v
@@ -1381,35 +1492,48 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 					missingNodes[c.NodeName] = node
 				}
 			}
+			namespacelabels, ok := namespaceLabelsMapping[c.Namespace]
+			if !ok {
+				klog.V(3).Infof("Missing data for namespace %s", c.Namespace)
+			}
 			costs := &CostData{
-				Name:      c.ContainerName,
-				PodName:   c.PodName,
-				NodeName:  c.NodeName,
-				NodeData:  node,
-				Namespace: c.Namespace,
-				RAMReq:    RAMReqV,
-				RAMUsed:   RAMUsedV,
-				CPUReq:    CPUReqV,
-				CPUUsed:   CPUUsedV,
-				GPUReq:    GPUReqV,
+				Name:            c.ContainerName,
+				PodName:         c.PodName,
+				NodeName:        c.NodeName,
+				NodeData:        node,
+				Namespace:       c.Namespace,
+				RAMReq:          RAMReqV,
+				RAMUsed:         RAMUsedV,
+				CPUReq:          CPUReqV,
+				CPUUsed:         CPUUsedV,
+				GPUReq:          GPUReqV,
+				NamespaceLabels: namespacelabels,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
 			if filterNamespace == "" {
 				containerNameCost[key] = costs
+				missingContainers[key] = costs
 			} else if costs.Namespace == filterNamespace {
 				containerNameCost[key] = costs
+				missingContainers[key] = costs
 			}
 		}
 	}
 
 	w := end.Sub(start)
+	w += window
 	if w.Minutes() > 0 {
 		wStr := fmt.Sprintf("%dm", int(w.Minutes()))
 		err = findDeletedNodeInfo(cli, missingNodes, wStr)
 		if err != nil {
 			return nil, err
 		}
+		klog.Infof("Finding deleted pod info from range query:")
+		err = findDeletedPodInfo(cli, missingContainers, wStr)
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	return containerNameCost, err
@@ -1723,7 +1847,7 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 		return nil, err
 	}
 
-	_, body, err := cli.Do(context.Background(), req)
+	_, body, _, err := cli.Do(context.Background(), req)
 	if err != nil {
 		klog.V(1).Infof("ERROR" + err.Error())
 	}
@@ -1749,7 +1873,7 @@ func query(cli prometheusClient.Client, query string) (interface{}, error) {
 		return nil, err
 	}
 
-	_, body, err := cli.Do(context.Background(), req)
+	_, body, _, err := cli.Do(context.Background(), req)
 	if err != nil {
 		return nil, err
 	}

+ 170 - 0
costmodel/metrics.go

@@ -0,0 +1,170 @@
+package costmodel
+
+import (
+	"regexp"
+	"sort"
+
+	"github.com/prometheus/client_golang/prometheus"
+	dto "github.com/prometheus/client_model/go"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/client-go/kubernetes"
+)
+
+var (
+	invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
+)
+
+func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string) {
+	labelKeys := make([]string, 0, len(labels))
+	for k := range labels {
+		labelKeys = append(labelKeys, k)
+	}
+	sort.Strings(labelKeys)
+
+	labelValues := make([]string, 0, len(labels))
+	for i, k := range labelKeys {
+		labelKeys[i] = "label_" + sanitizeLabelName(k)
+		labelValues = append(labelValues, labels[k])
+	}
+	return labelKeys, labelValues
+}
+
+func sanitizeLabelName(s string) string {
+	return invalidLabelCharRE.ReplaceAllString(s, "_")
+}
+
+type DeploymentCollector struct {
+	KubeClientSet kubernetes.Interface
+}
+
+func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewInvalidDesc(nil)
+}
+
+func newDeploymentMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) DeploymentMetric {
+	return DeploymentMetric{
+		fqName:         fqname,
+		labelNames:     labelNames,
+		labelValues:    labelvalues,
+		help:           "service_selector_labels Service Selector Labels",
+		deploymentName: name,
+		namespace:      namespace,
+	}
+}
+
+type DeploymentMetric struct {
+	fqName         string
+	help           string
+	labelNames     []string
+	labelValues    []string
+	deploymentName string
+	namespace      string
+}
+
+func (s DeploymentMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{"deployment": s.deploymentName, "namespace": s.namespace}
+	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
+}
+
+func (s DeploymentMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+	var labels []*dto.LabelPair
+	for i := range s.labelNames {
+		labels = append(labels, &dto.LabelPair{
+			Name:  &s.labelNames[i],
+			Value: &s.labelValues[i],
+		})
+	}
+	n := "namespace"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &n,
+		Value: &s.namespace,
+	})
+	r := "deployment"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &r,
+		Value: &s.deploymentName,
+	})
+	m.Label = labels
+	return nil
+}
+
+func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
+	ds, _ := sc.KubeClientSet.AppsV1().Deployments("").List(metav1.ListOptions{})
+	for _, deployment := range ds.Items {
+		labels, values := kubeLabelsToPrometheusLabels(deployment.Spec.Selector.MatchLabels)
+		m := newDeploymentMetric(sanitizeLabelName(deployment.GetName()), sanitizeLabelName(deployment.GetNamespace()), "deployment_match_labels", labels, values)
+		ch <- m
+	}
+}
+
+type ServiceCollector struct {
+	KubeClientSet kubernetes.Interface
+}
+
+func (sc ServiceCollector) Describe(ch chan<- *prometheus.Desc) {
+	return
+}
+
+func newServiceMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) ServiceMetric {
+	return ServiceMetric{
+		fqName:      fqname,
+		labelNames:  labelNames,
+		labelValues: labelvalues,
+		help:        "service_selector_labels Service Selector Labels",
+		serviceName: name,
+		namespace:   namespace,
+	}
+}
+
+type ServiceMetric struct {
+	fqName      string
+	help        string
+	labelNames  []string
+	labelValues []string
+	serviceName string
+	namespace   string
+}
+
+func (s ServiceMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{"service": s.serviceName, "namespace": s.namespace}
+	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
+}
+
+func (s ServiceMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+	var labels []*dto.LabelPair
+	for i := range s.labelNames {
+		labels = append(labels, &dto.LabelPair{
+			Name:  &s.labelNames[i],
+			Value: &s.labelValues[i],
+		})
+	}
+	n := "namespace"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &n,
+		Value: &s.namespace,
+	})
+	r := "service"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &r,
+		Value: &s.serviceName,
+	})
+	m.Label = labels
+	return nil
+}
+
+func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
+	svcs, _ := sc.KubeClientSet.CoreV1().Services("").List(metav1.ListOptions{})
+	for _, svc := range svcs.Items {
+		labels, values := kubeLabelsToPrometheusLabels(svc.Spec.Selector)
+		m := newServiceMetric(sanitizeLabelName(svc.GetName()), sanitizeLabelName(svc.GetNamespace()), "service_selector_labels", labels, values)
+		ch <- m
+	}
+}

+ 71 - 10
main.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"encoding/json"
 	"flag"
+	"fmt"
 	"net"
 	"net/http"
 	"os"
@@ -35,6 +36,7 @@ import (
 const (
 	prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
 	prometheusTroubleshootingEp    = "http://docs.kubecost.com/custom-prom#troubleshoot"
+	remoteEnabled                  = "REMOTE_WRITE_ENABLED"
 )
 
 var (
@@ -58,6 +60,8 @@ type Accesses struct {
 	NetworkZoneEgressRecorder     *prometheus.GaugeVec
 	NetworkRegionEgressRecorder   *prometheus.GaugeVec
 	NetworkInternetEgressRecorder *prometheus.GaugeVec
+	ServiceSelectorRecorder       *prometheus.GaugeVec
+	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	Model                         *costModel.CostModel
 }
 
@@ -200,18 +204,54 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	}
 }
 
+// CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
 func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	/*
-		start := r.URL.Query().Get("start")
-		end := r.URL.Query().Get("end")
-		window := r.URL.Query().Get("window")
-		fields := r.URL.Query().Get("filterFields")
-		namespace := r.URL.Query().Get("namespace")
-	*/
-	data, err := costModel.CostDataRangeFromSQL("", "", "", "", "")
+	startString := r.URL.Query().Get("start")
+	endString := r.URL.Query().Get("end")
+	windowString := r.URL.Query().Get("window")
+
+	layout := "2006-01-02T15:04:05.000Z"
+
+	var start time.Time
+	var end time.Time
+	var err error
+
+	if windowString == "" {
+		windowString = "1h"
+	}
+	if startString != "" {
+		start, err = time.Parse(layout, startString)
+		if err != nil {
+			klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
+			w.Write(wrapData(nil, err))
+		}
+	} else {
+		window, err := time.ParseDuration(windowString)
+		if err != nil {
+			w.Write(wrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
+
+		}
+		start = time.Now().Add(-2 * window)
+	}
+	if endString != "" {
+		end, err = time.Parse(layout, endString)
+		if err != nil {
+			klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
+			w.Write(wrapData(nil, err))
+		}
+	} else {
+		end = time.Now()
+	}
+
+	remoteLayout := "2006-01-02T15:04:05Z"
+	remoteStartStr := start.Format(remoteLayout)
+	remoteEndStr := end.Format(remoteLayout)
+	klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
+
+	data, err := costModel.CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
 	w.Write(wrapData(data, err))
 }
 
@@ -380,8 +420,10 @@ func (a *Accesses) recordPrices() {
 
 				if costs.PVCData != nil {
 					for _, pvc := range costs.PVCData {
-						pvCost, _ := strconv.ParseFloat(pvc.Volume.Cost, 64)
-						a.PersistentVolumePriceRecorder.WithLabelValues(pvc.VolumeName, pvc.VolumeName).Set(pvCost)
+						if pvc.Volume != nil {
+							pvCost, _ := strconv.ParseFloat(pvc.Volume.Cost, 64)
+							a.PersistentVolumePriceRecorder.WithLabelValues(pvc.VolumeName, pvc.VolumeName).Set(pvCost)
+						}
 					}
 				}
 
@@ -605,6 +647,12 @@ func main() {
 	prometheus.MustRegister(CPUAllocation)
 	prometheus.MustRegister(ContainerUptimeRecorder)
 	prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
+	prometheus.MustRegister(costModel.ServiceCollector{
+		KubeClientSet: kubeClientset,
+	})
+	prometheus.MustRegister(costModel.DeploymentCollector{
+		KubeClientSet: kubeClientset,
+	})
 
 	podCache := cache.NewListWatchFromClient(kubeClientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
 
@@ -627,6 +675,19 @@ func main() {
 		Model:                         costModel.NewCostModel(podCache),
 	}
 
+	remoteEnabled := os.Getenv(remoteEnabled)
+	if remoteEnabled == "true" {
+		info, err := cloudProvider.ClusterInfo()
+		klog.Infof("Saving cluster  with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
+		if err != nil {
+			klog.Infof("Error saving cluster id %s", err.Error())
+		}
+		_, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
+		if err != nil {
+			klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
+		}
+	}
+
 	err = a.Cloud.DownloadPricingData()
 	if err != nil {
 		klog.V(1).Info("Failed to download pricing data: " + err.Error())