| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- package costmodel
- import (
- "database/sql"
- "fmt"
- "time"
- costAnalyzerCloud "github.com/opencost/opencost/pkg/cloud"
- "github.com/opencost/opencost/pkg/env"
- "github.com/opencost/opencost/pkg/log"
- "github.com/opencost/opencost/pkg/util"
- "github.com/opencost/opencost/pkg/util/json"
- _ "github.com/lib/pq"
- )
- func getPVCosts(db *sql.DB) (map[string]*costAnalyzerCloud.PV, error) {
- pvs := make(map[string]*costAnalyzerCloud.PV)
- query := `SELECT name, avg(value),labels->>'volumename' AS volumename, labels->>'cluster_id' AS clusterid
- FROM metrics
- WHERE (name='pv_hourly_cost') AND value != 'NaN' AND value != 0
- GROUP BY volumename,name,clusterid;`
- rows, err := db.Query(query)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- for rows.Next() {
- var (
- name string
- avg float64
- volumename string
- clusterid string
- )
- if err := rows.Scan(&name, &avg, &volumename, &clusterid); err != nil {
- return nil, err
- }
- pvs[volumename] = &costAnalyzerCloud.PV{
- Cost: fmt.Sprintf("%f", avg),
- }
- }
- return pvs, nil
- }
- func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
- nodes := make(map[string]*costAnalyzerCloud.Node)
- query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
- FROM metrics
- WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost') AND value != 'NaN' AND value != 0
- GROUP BY instance,name,clusterid`
- rows, err := db.Query(query)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- for rows.Next() {
- var (
- name string
- avg float64
- instance string
- clusterid string
- )
- if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
- return nil, err
- }
- if data, ok := nodes[instance]; ok {
- if name == "node_cpu_hourly_cost" {
- data.VCPUCost = fmt.Sprintf("%f", avg)
- } else if name == "node_ram_hourly_cost" {
- data.RAMCost = fmt.Sprintf("%f", avg)
- } else if name == "node_gpu_hourly_cost" {
- data.GPUCost = fmt.Sprintf("%f", avg)
- }
- } else {
- nodes[instance] = &costAnalyzerCloud.Node{}
- data := nodes[instance]
- if name == "node_cpu_hourly_cost" {
- data.VCPUCost = fmt.Sprintf("%f", avg)
- } else if name == "node_ram_hourly_cost" {
- data.RAMCost = fmt.Sprintf("%f", avg)
- } else if name == "node_gpu_hourly_cost" {
- data.GPUCost = fmt.Sprintf("%f", avg)
- }
- }
- }
- return nodes, nil
- }
- func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
- pw := env.GetRemotePW()
- address := env.GetSQLAddress()
- connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
- db, err := sql.Open("postgres", connStr)
- defer db.Close()
- if err != nil {
- return nil, err
- }
- nodes, err := getNodeCosts(db)
- if err != nil {
- return nil, err
- }
- model := make(map[string]*CostData)
- query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
- FROM metrics
- WHERE (name='container_cpu_allocation') AND
- time > $2 AND time < $3 AND value != 'NaN'
- GROUP BY container,pod,bucket,namespace,instance,clusterid,name
- ORDER BY container,bucket;
- `
- rows, err := db.Query(query, window, start, end)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- for rows.Next() {
- var (
- bucket string
- name string
- sum float64
- container string
- pod string
- namespace string
- instance string
- clusterid string
- )
- if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
- return nil, err
- }
- layout := "2006-01-02T15:04:05Z"
- t, err := time.Parse(layout, bucket)
- if err != nil {
- return nil, err
- }
- k := NewContainerMetricFromValues(namespace, pod, container, instance, clusterid)
- key := k.Key()
- allocationVector := &util.Vector{
- Timestamp: float64(t.Unix()),
- Value: sum,
- }
- if data, ok := model[key]; ok {
- if name == "container_cpu_allocation" {
- data.CPUAllocation = append(data.CPUAllocation, allocationVector)
- } else if name == "container_memory_allocation_bytes" {
- data.RAMAllocation = append(data.RAMAllocation, allocationVector)
- } else if name == "container_gpu_allocation" {
- data.GPUReq = append(data.GPUReq, allocationVector)
- }
- } else {
- node, ok := nodes[instance]
- if !ok {
- return nil, fmt.Errorf("No node found")
- }
- model[key] = &CostData{
- Name: container,
- PodName: pod,
- NodeName: instance,
- NodeData: node,
- CPUAllocation: []*util.Vector{},
- RAMAllocation: []*util.Vector{},
- GPUReq: []*util.Vector{},
- Namespace: namespace,
- ClusterID: clusterid,
- }
- data := model[key]
- if name == "container_cpu_allocation" {
- data.CPUAllocation = append(data.CPUAllocation, allocationVector)
- } else if name == "container_memory_allocation_bytes" {
- data.RAMAllocation = append(data.RAMAllocation, allocationVector)
- } else if name == "container_gpu_allocation" {
- data.GPUReq = append(data.GPUReq, allocationVector)
- }
- }
- }
- query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
- FROM metrics
- WHERE (name='container_memory_allocation_bytes') AND
- time > $2 AND time < $3 AND value != 'NaN'
- GROUP BY container,pod,bucket,namespace,instance,clusterid,name
- ORDER BY container,bucket;
- `
- rows, err = db.Query(query, window, start, end)
- if err != nil {
- return nil, err
- }
- for rows.Next() {
- var (
- bucket string
- name string
- sum float64
- container string
- pod string
- namespace string
- instance string
- clusterid string
- )
- if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
- return nil, err
- }
- layout := "2006-01-02T15:04:05Z"
- t, err := time.Parse(layout, bucket)
- if err != nil {
- return nil, err
- }
- k := NewContainerMetricFromValues(namespace, pod, container, instance, clusterid)
- key := k.Key()
- allocationVector := &util.Vector{
- Timestamp: float64(t.Unix()),
- Value: sum,
- }
- if data, ok := model[key]; ok {
- if name == "container_cpu_allocation" {
- data.CPUAllocation = append(data.CPUAllocation, allocationVector)
- } else if name == "container_memory_allocation_bytes" {
- data.RAMAllocation = append(data.RAMAllocation, allocationVector)
- } else if name == "container_gpu_allocation" {
- data.GPUReq = append(data.GPUReq, allocationVector)
- }
- } else {
- node, ok := nodes[instance]
- if !ok {
- return nil, fmt.Errorf("No node found")
- }
- model[key] = &CostData{
- Name: container,
- PodName: pod,
- NodeName: instance,
- NodeData: node,
- CPUAllocation: []*util.Vector{},
- RAMAllocation: []*util.Vector{},
- GPUReq: []*util.Vector{},
- Namespace: namespace,
- ClusterID: clusterid,
- }
- data := model[key]
- if name == "container_cpu_allocation" {
- data.CPUAllocation = append(data.CPUAllocation, allocationVector)
- } else if name == "container_memory_allocation_bytes" {
- data.RAMAllocation = append(data.RAMAllocation, allocationVector)
- } else if name == "container_gpu_allocation" {
- data.GPUReq = append(data.GPUReq, allocationVector)
- }
- }
- }
- query = `SELECT DISTINCT ON (labels->>'namespace') * FROM METRICS WHERE name='kube_namespace_labels' ORDER BY labels->>'namespace',time DESC;`
- rows, err = db.Query(query)
- if err != nil {
- return nil, err
- }
- cols, err := rows.Columns()
- if err != nil {
- return nil, err
- }
- rawResult := make([][]byte, len(cols))
- result := make([]string, len(cols))
- dest := make([]interface{}, len(cols)) // A temporary interface{} slice
- for i, _ := range rawResult {
- dest[i] = &rawResult[i] // Put pointers to each string in the interface slice
- }
- nsToLabels := make(map[string]map[string]string)
- for rows.Next() {
- err = rows.Scan(dest...)
- if err != nil {
- return nil, err
- }
- for i, raw := range rawResult {
- if raw == nil {
- result[i] = "\\N"
- } else {
- result[i] = string(raw)
- }
- }
- var dat map[string]string
- err := json.Unmarshal([]byte(result[4]), &dat)
- if err != nil {
- return nil, err
- }
- ns, ok := dat["namespace"]
- if !ok {
- return nil, fmt.Errorf("No namespace found")
- }
- nsToLabels[ns] = dat
- }
- for _, cd := range model {
- ns := cd.Namespace
- if labels, ok := nsToLabels[ns]; ok {
- cd.NamespaceLabels = labels
- cd.Labels = labels // TODO: override with podlabels
- }
- }
- volumes, err := getPVCosts(db)
- if err != nil {
- log.Infof("Error fetching pv data from sql: %s. Skipping PVData", err.Error())
- } else {
- query = `SELECT time_bucket($1, time) AS bucket, name, avg(value), labels->>'persistentvolumeclaim' AS claim, labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'persistentvolume' AS volumename, labels->>'cluster_id' AS clusterid
- FROM metrics
- WHERE (name='pod_pvc_allocation') AND
- time > $2 AND time < $3 AND value != 'NaN'
- GROUP BY claim,pod,bucket,namespace,volumename,clusterid,name
- ORDER BY pod,bucket;`
- rows, err = db.Query(query, window, start, end)
- if err != nil {
- return nil, err
- }
- pvcData := make(map[string]*PersistentVolumeClaimData)
- for rows.Next() {
- var (
- bucket string
- name string
- sum float64
- claim string
- pod string
- namespace string
- volumename sql.NullString
- clusterid string
- )
- if err := rows.Scan(&bucket, &name, &sum, &claim, &pod, &namespace, &volumename, &clusterid); err != nil {
- return nil, err
- }
- layout := "2006-01-02T15:04:05Z"
- t, err := time.Parse(layout, bucket)
- if err != nil {
- return nil, err
- }
- allocationVector := &util.Vector{
- Timestamp: float64(t.Unix()),
- Value: sum,
- }
- if pvcd, ok := pvcData[claim]; ok {
- pvcd.Values = append(pvcd.Values, allocationVector)
- } else {
- if volumename.Valid {
- vname := volumename.String
- d := &PersistentVolumeClaimData{
- Namespace: namespace,
- ClusterID: clusterid,
- VolumeName: vname,
- Claim: claim,
- }
- if volume, ok := volumes[vname]; ok {
- volume.Size = fmt.Sprintf("%f", sum) // Just assume the claim is the whole volume for now
- d.Volume = volume
- }
- d.Values = append(d.Values, allocationVector)
- pvcData[claim] = d
- for _, cd := range model { // TODO: make this not doubly nested
- if cd.PodName == pod && cd.Namespace == namespace {
- if len(cd.PVCData) > 0 {
- cd.PVCData = append(cd.PVCData, d)
- } else {
- cd.PVCData = []*PersistentVolumeClaimData{d}
- }
- break // break so we only assign to the first
- }
- }
- }
- }
- }
- }
- return model, nil
- }
|