소스 검색

Merge pull request #156 from kubecost/AjayTripathy-add-deps

costmodel/aggregations.go
Ajay Tripathy 6 년 전
부모
커밋
93fd67fe2b
2개의 변경된 파일395개의 추가작업 그리고 0개의 파일을 삭제
  1. 225 0
      costmodel/aggregations.go
  2. 170 0
      costmodel/metrics.go

+ 225 - 0
costmodel/aggregations.go

@@ -0,0 +1,225 @@
+package costmodel
+
+import (
+	"database/sql"
+	"fmt"
+	"os"
+	"time"
+
+	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
+	_ "github.com/lib/pq"
+)
+
+const remotePW = "REMOTE_WRITE_PASSWORD"
+const sqlAddress = "SQL_ADDRESS"
+
+func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
+
+	nodes := make(map[string]*costAnalyzerCloud.Node)
+
+	query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost')  AND value != 'NaN' AND value != 0
+	GROUP BY instance,name,clusterid`
+	rows, err := db.Query(query)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var (
+			name      string
+			avg       float64
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		if data, ok := nodes[instance]; ok {
+			if name == "node_cpu_hourly_cost" {
+				data.VCPUCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_ram_hourly_cost" {
+				data.RAMCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_gpu_hourly_cost" {
+				data.GPUCost = fmt.Sprintf("%f", avg)
+			}
+		} else {
+			nodes[instance] = &costAnalyzerCloud.Node{}
+			data := nodes[instance]
+			if name == "node_cpu_hourly_cost" {
+				data.VCPUCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_ram_hourly_cost" {
+				data.RAMCost = fmt.Sprintf("%f", avg)
+			} else if name == "node_gpu_hourly_cost" {
+				data.GPUCost = fmt.Sprintf("%f", avg)
+			}
+		}
+
+	}
+
+	return nodes, nil
+}
+
+func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
+	pw := os.Getenv(remotePW)
+	address := os.Getenv(sqlAddress)
+	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
+	db, err := sql.Open("postgres", connStr)
+	defer db.Close()
+	if err != nil {
+		return nil, err
+	}
+	nodes, err := getNodeCosts(db)
+	if err != nil {
+		return nil, err
+	}
+	model := make(map[string]*CostData)
+	query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='container_cpu_allocation') AND
+	  time > $2 AND time < $3 AND value != 'NaN'
+	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
+	ORDER BY container,bucket;
+	`
+	rows, err := db.Query(query, window, start, end)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var (
+			bucket    string
+			name      string
+			sum       float64
+			container string
+			pod       string
+			namespace string
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		layout := "2006-01-02T15:04:05Z"
+		t, err := time.Parse(layout, bucket)
+		if err != nil {
+			return nil, err
+		}
+
+		k := newContainerMetricFromValues(namespace, pod, container, instance)
+		key := k.Key()
+		allocationVector := &Vector{
+			Timestamp: float64(t.Unix()),
+			Value:     sum,
+		}
+		if data, ok := model[key]; ok {
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		} else {
+			node, ok := nodes[instance]
+			if !ok {
+				return nil, fmt.Errorf("No node found")
+			}
+			model[key] = &CostData{
+				Name:          container,
+				PodName:       pod,
+				NodeName:      instance,
+				NodeData:      node,
+				CPUAllocation: []*Vector{},
+				RAMAllocation: []*Vector{},
+				GPUReq:        []*Vector{},
+				Namespace:     namespace,
+				ClusterID:     clusterid,
+			}
+			data := model[key]
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		}
+	}
+	query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
+	FROM metrics
+	WHERE (name='container_memory_allocation_bytes') AND
+		time > $2 AND time < $3 AND value != 'NaN'
+	GROUP BY container,pod,bucket,namespace,instance,clusterid,name
+	ORDER BY container,bucket;
+	`
+	rows, err = db.Query(query, window, start, end)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var (
+			bucket    string
+			name      string
+			sum       float64
+			container string
+			pod       string
+			namespace string
+			instance  string
+			clusterid string
+		)
+		if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
+			return nil, err
+		}
+		layout := "2006-01-02T15:04:05Z"
+		t, err := time.Parse(layout, bucket)
+		if err != nil {
+			return nil, err
+		}
+
+		k := newContainerMetricFromValues(namespace, pod, container, instance)
+		key := k.Key()
+		allocationVector := &Vector{
+			Timestamp: float64(t.Unix()),
+			Value:     sum,
+		}
+		if data, ok := model[key]; ok {
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		} else {
+			node, ok := nodes[instance]
+			if !ok {
+				return nil, fmt.Errorf("No node found")
+			}
+			model[key] = &CostData{
+				Name:          container,
+				PodName:       pod,
+				NodeName:      instance,
+				NodeData:      node,
+				CPUAllocation: []*Vector{},
+				RAMAllocation: []*Vector{},
+				GPUReq:        []*Vector{},
+				Namespace:     namespace,
+				ClusterID:     clusterid,
+			}
+			data := model[key]
+			if name == "container_cpu_allocation" {
+				data.CPUAllocation = append(data.CPUAllocation, allocationVector)
+			} else if name == "container_memory_allocation_bytes" {
+				data.RAMAllocation = append(data.RAMAllocation, allocationVector)
+			} else if name == "container_gpu_allocation" {
+				data.GPUReq = append(data.GPUReq, allocationVector)
+			}
+		}
+	}
+	return model, nil
+}

+ 170 - 0
costmodel/metrics.go

@@ -0,0 +1,170 @@
+package costmodel
+
+import (
+	"regexp"
+	"sort"
+
+	"github.com/prometheus/client_golang/prometheus"
+	dto "github.com/prometheus/client_model/go"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/client-go/kubernetes"
+)
+
+var (
+	invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
+)
+
+func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string) {
+	labelKeys := make([]string, 0, len(labels))
+	for k := range labels {
+		labelKeys = append(labelKeys, k)
+	}
+	sort.Strings(labelKeys)
+
+	labelValues := make([]string, 0, len(labels))
+	for i, k := range labelKeys {
+		labelKeys[i] = "label_" + sanitizeLabelName(k)
+		labelValues = append(labelValues, labels[k])
+	}
+	return labelKeys, labelValues
+}
+
+func sanitizeLabelName(s string) string {
+	return invalidLabelCharRE.ReplaceAllString(s, "_")
+}
+
+type DeploymentCollector struct {
+	KubeClientSet kubernetes.Interface
+}
+
+func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewInvalidDesc(nil)
+}
+
+func newDeploymentMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) DeploymentMetric {
+	return DeploymentMetric{
+		fqName:         fqname,
+		labelNames:     labelNames,
+		labelValues:    labelvalues,
+		help:           "service_selector_labels Service Selector Labels",
+		deploymentName: name,
+		namespace:      namespace,
+	}
+}
+
+type DeploymentMetric struct {
+	fqName         string
+	help           string
+	labelNames     []string
+	labelValues    []string
+	deploymentName string
+	namespace      string
+}
+
+func (s DeploymentMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{"deployment": s.deploymentName, "namespace": s.namespace}
+	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
+}
+
+func (s DeploymentMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+	var labels []*dto.LabelPair
+	for i := range s.labelNames {
+		labels = append(labels, &dto.LabelPair{
+			Name:  &s.labelNames[i],
+			Value: &s.labelValues[i],
+		})
+	}
+	n := "namespace"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &n,
+		Value: &s.namespace,
+	})
+	r := "deployment"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &r,
+		Value: &s.deploymentName,
+	})
+	m.Label = labels
+	return nil
+}
+
+func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
+	ds, _ := sc.KubeClientSet.AppsV1().Deployments("").List(metav1.ListOptions{})
+	for _, deployment := range ds.Items {
+		labels, values := kubeLabelsToPrometheusLabels(deployment.Spec.Selector.MatchLabels)
+		m := newDeploymentMetric(sanitizeLabelName(deployment.GetName()), sanitizeLabelName(deployment.GetNamespace()), "deployment_match_labels", labels, values)
+		ch <- m
+	}
+}
+
+type ServiceCollector struct {
+	KubeClientSet kubernetes.Interface
+}
+
+func (sc ServiceCollector) Describe(ch chan<- *prometheus.Desc) {
+	return
+}
+
+func newServiceMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) ServiceMetric {
+	return ServiceMetric{
+		fqName:      fqname,
+		labelNames:  labelNames,
+		labelValues: labelvalues,
+		help:        "service_selector_labels Service Selector Labels",
+		serviceName: name,
+		namespace:   namespace,
+	}
+}
+
+type ServiceMetric struct {
+	fqName      string
+	help        string
+	labelNames  []string
+	labelValues []string
+	serviceName string
+	namespace   string
+}
+
+func (s ServiceMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{"service": s.serviceName, "namespace": s.namespace}
+	return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
+}
+
+func (s ServiceMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+	var labels []*dto.LabelPair
+	for i := range s.labelNames {
+		labels = append(labels, &dto.LabelPair{
+			Name:  &s.labelNames[i],
+			Value: &s.labelValues[i],
+		})
+	}
+	n := "namespace"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &n,
+		Value: &s.namespace,
+	})
+	r := "service"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &r,
+		Value: &s.serviceName,
+	})
+	m.Label = labels
+	return nil
+}
+
+func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
+	svcs, _ := sc.KubeClientSet.CoreV1().Services("").List(metav1.ListOptions{})
+	for _, svc := range svcs.Items {
+		labels, values := kubeLabelsToPrometheusLabels(svc.Spec.Selector)
+		m := newServiceMetric(sanitizeLabelName(svc.GetName()), sanitizeLabelName(svc.GetNamespace()), "service_selector_labels", labels, values)
+		ch <- m
+	}
+}