package costmodel import ( "database/sql" "fmt" "os" "time" costAnalyzerCloud "github.com/kubecost/cost-model/cloud" _ "github.com/lib/pq" ) const remotePW = "REMOTE_WRITE_PASSWORD" const sqlAddress = "SQL_ADDRESS" func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) { nodes := make(map[string]*costAnalyzerCloud.Node) query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid FROM metrics WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost') AND value != 'NaN' AND value != 0 GROUP BY instance,name,clusterid` rows, err := db.Query(query) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var ( name string avg float64 instance string clusterid string ) if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil { return nil, err } if data, ok := nodes[instance]; ok { if name == "node_cpu_hourly_cost" { data.VCPUCost = fmt.Sprintf("%f", avg) } else if name == "node_ram_hourly_cost" { data.RAMCost = fmt.Sprintf("%f", avg) } else if name == "node_gpu_hourly_cost" { data.GPUCost = fmt.Sprintf("%f", avg) } } else { nodes[instance] = &costAnalyzerCloud.Node{} data := nodes[instance] if name == "node_cpu_hourly_cost" { data.VCPUCost = fmt.Sprintf("%f", avg) } else if name == "node_ram_hourly_cost" { data.RAMCost = fmt.Sprintf("%f", avg) } else if name == "node_gpu_hourly_cost" { data.GPUCost = fmt.Sprintf("%f", avg) } } } return nodes, nil } func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) { pw := os.Getenv(remotePW) address := os.Getenv(sqlAddress) connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address) db, err := sql.Open("postgres", connStr) defer db.Close() if err != nil { return nil, err } nodes, err := getNodeCosts(db) if err != nil { return nil, err } model := make(map[string]*CostData) query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid FROM metrics WHERE (name='container_cpu_allocation') AND time > $2 AND time < $3 AND value != 'NaN' GROUP BY container,pod,bucket,namespace,instance,clusterid,name ORDER BY container,bucket; ` rows, err := db.Query(query, window, start, end) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var ( bucket string name string sum float64 container string pod string namespace string instance string clusterid string ) if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil { return nil, err } layout := "2006-01-02T15:04:05Z" t, err := time.Parse(layout, bucket) if err != nil { return nil, err } k := newContainerMetricFromValues(namespace, pod, container, instance) key := k.Key() allocationVector := &Vector{ Timestamp: float64(t.Unix()), Value: sum, } if data, ok := model[key]; ok { if name == "container_cpu_allocation" { data.CPUAllocation = append(data.CPUAllocation, allocationVector) } else if name == "container_memory_allocation_bytes" { data.RAMAllocation = append(data.RAMAllocation, allocationVector) } else if name == "container_gpu_allocation" { data.GPUReq = append(data.GPUReq, allocationVector) } } else { node, ok := nodes[instance] if !ok { return nil, fmt.Errorf("No node found") } model[key] = &CostData{ Name: container, PodName: pod, NodeName: instance, NodeData: node, CPUAllocation: []*Vector{}, RAMAllocation: []*Vector{}, GPUReq: []*Vector{}, Namespace: namespace, ClusterID: clusterid, } data := model[key] if name == "container_cpu_allocation" { data.CPUAllocation = append(data.CPUAllocation, allocationVector) } else if name == "container_memory_allocation_bytes" { data.RAMAllocation = append(data.RAMAllocation, allocationVector) } else if name == "container_gpu_allocation" { data.GPUReq = append(data.GPUReq, allocationVector) } } } query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid FROM metrics WHERE (name='container_memory_allocation_bytes') AND time > $2 AND time < $3 AND value != 'NaN' GROUP BY container,pod,bucket,namespace,instance,clusterid,name ORDER BY container,bucket; ` rows, err = db.Query(query, window, start, end) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var ( bucket string name string sum float64 container string pod string namespace string instance string clusterid string ) if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil { return nil, err } layout := "2006-01-02T15:04:05Z" t, err := time.Parse(layout, bucket) if err != nil { return nil, err } k := newContainerMetricFromValues(namespace, pod, container, instance) key := k.Key() allocationVector := &Vector{ Timestamp: float64(t.Unix()), Value: sum, } if data, ok := model[key]; ok { if name == "container_cpu_allocation" { data.CPUAllocation = append(data.CPUAllocation, allocationVector) } else if name == "container_memory_allocation_bytes" { data.RAMAllocation = append(data.RAMAllocation, allocationVector) } else if name == "container_gpu_allocation" { data.GPUReq = append(data.GPUReq, allocationVector) } } else { node, ok := nodes[instance] if !ok { return nil, fmt.Errorf("No node found") } model[key] = &CostData{ Name: container, PodName: pod, NodeName: instance, NodeData: node, CPUAllocation: []*Vector{}, RAMAllocation: []*Vector{}, GPUReq: []*Vector{}, Namespace: namespace, ClusterID: clusterid, } data := model[key] if name == "container_cpu_allocation" { data.CPUAllocation = append(data.CPUAllocation, allocationVector) } else if name == "container_memory_allocation_bytes" { data.RAMAllocation = append(data.RAMAllocation, allocationVector) } else if name == "container_gpu_allocation" { data.GPUReq = append(data.GPUReq, allocationVector) } } } return model, nil }