sql.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package costmodel
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "os"
  6. "time"
  7. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  8. _ "github.com/lib/pq"
  9. )
  10. const remotePW = "REMOTE_WRITE_PASSWORD"
  11. const sqlAddress = "SQL_ADDRESS"
  12. func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
  13. nodes := make(map[string]*costAnalyzerCloud.Node)
  14. query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  15. FROM metrics
  16. WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost') AND value != 'NaN' AND value != 0
  17. GROUP BY instance,name,clusterid`
  18. rows, err := db.Query(query)
  19. if err != nil {
  20. return nil, err
  21. }
  22. defer rows.Close()
  23. for rows.Next() {
  24. var (
  25. name string
  26. avg float64
  27. instance string
  28. clusterid string
  29. )
  30. if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
  31. return nil, err
  32. }
  33. if data, ok := nodes[instance]; ok {
  34. if name == "node_cpu_hourly_cost" {
  35. data.VCPUCost = fmt.Sprintf("%f", avg)
  36. } else if name == "node_ram_hourly_cost" {
  37. data.RAMCost = fmt.Sprintf("%f", avg)
  38. } else if name == "node_gpu_hourly_cost" {
  39. data.GPUCost = fmt.Sprintf("%f", avg)
  40. }
  41. } else {
  42. nodes[instance] = &costAnalyzerCloud.Node{}
  43. data := nodes[instance]
  44. if name == "node_cpu_hourly_cost" {
  45. data.VCPUCost = fmt.Sprintf("%f", avg)
  46. } else if name == "node_ram_hourly_cost" {
  47. data.RAMCost = fmt.Sprintf("%f", avg)
  48. } else if name == "node_gpu_hourly_cost" {
  49. data.GPUCost = fmt.Sprintf("%f", avg)
  50. }
  51. }
  52. }
  53. return nodes, nil
  54. }
  55. func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
  56. pw := os.Getenv(remotePW)
  57. address := os.Getenv(sqlAddress)
  58. connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
  59. db, err := sql.Open("postgres", connStr)
  60. defer db.Close()
  61. if err != nil {
  62. return nil, err
  63. }
  64. nodes, err := getNodeCosts(db)
  65. if err != nil {
  66. return nil, err
  67. }
  68. model := make(map[string]*CostData)
  69. query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  70. FROM metrics
  71. WHERE (name='container_cpu_allocation') AND
  72. time > $2 AND time < $3 AND value != 'NaN'
  73. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  74. ORDER BY container,bucket;
  75. `
  76. rows, err := db.Query(query, window, start, end)
  77. if err != nil {
  78. return nil, err
  79. }
  80. defer rows.Close()
  81. for rows.Next() {
  82. var (
  83. bucket string
  84. name string
  85. sum float64
  86. container string
  87. pod string
  88. namespace string
  89. instance string
  90. clusterid string
  91. )
  92. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  93. return nil, err
  94. }
  95. layout := "2006-01-02T15:04:05Z"
  96. t, err := time.Parse(layout, bucket)
  97. if err != nil {
  98. return nil, err
  99. }
  100. k := newContainerMetricFromValues(namespace, pod, container, instance)
  101. key := k.Key()
  102. allocationVector := &Vector{
  103. Timestamp: float64(t.Unix()),
  104. Value: sum,
  105. }
  106. if data, ok := model[key]; ok {
  107. if name == "container_cpu_allocation" {
  108. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  109. } else if name == "container_memory_allocation_bytes" {
  110. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  111. } else if name == "container_gpu_allocation" {
  112. data.GPUReq = append(data.GPUReq, allocationVector)
  113. }
  114. } else {
  115. node, ok := nodes[instance]
  116. if !ok {
  117. return nil, fmt.Errorf("No node found")
  118. }
  119. model[key] = &CostData{
  120. Name: container,
  121. PodName: pod,
  122. NodeName: instance,
  123. NodeData: node,
  124. CPUAllocation: []*Vector{},
  125. RAMAllocation: []*Vector{},
  126. GPUReq: []*Vector{},
  127. Namespace: namespace,
  128. ClusterID: clusterid,
  129. }
  130. data := model[key]
  131. if name == "container_cpu_allocation" {
  132. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  133. } else if name == "container_memory_allocation_bytes" {
  134. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  135. } else if name == "container_gpu_allocation" {
  136. data.GPUReq = append(data.GPUReq, allocationVector)
  137. }
  138. }
  139. }
  140. query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  141. FROM metrics
  142. WHERE (name='container_memory_allocation_bytes') AND
  143. time > $2 AND time < $3 AND value != 'NaN'
  144. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  145. ORDER BY container,bucket;
  146. `
  147. rows, err = db.Query(query, window, start, end)
  148. if err != nil {
  149. return nil, err
  150. }
  151. defer rows.Close()
  152. for rows.Next() {
  153. var (
  154. bucket string
  155. name string
  156. sum float64
  157. container string
  158. pod string
  159. namespace string
  160. instance string
  161. clusterid string
  162. )
  163. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  164. return nil, err
  165. }
  166. layout := "2006-01-02T15:04:05Z"
  167. t, err := time.Parse(layout, bucket)
  168. if err != nil {
  169. return nil, err
  170. }
  171. k := newContainerMetricFromValues(namespace, pod, container, instance)
  172. key := k.Key()
  173. allocationVector := &Vector{
  174. Timestamp: float64(t.Unix()),
  175. Value: sum,
  176. }
  177. if data, ok := model[key]; ok {
  178. if name == "container_cpu_allocation" {
  179. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  180. } else if name == "container_memory_allocation_bytes" {
  181. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  182. } else if name == "container_gpu_allocation" {
  183. data.GPUReq = append(data.GPUReq, allocationVector)
  184. }
  185. } else {
  186. node, ok := nodes[instance]
  187. if !ok {
  188. return nil, fmt.Errorf("No node found")
  189. }
  190. model[key] = &CostData{
  191. Name: container,
  192. PodName: pod,
  193. NodeName: instance,
  194. NodeData: node,
  195. CPUAllocation: []*Vector{},
  196. RAMAllocation: []*Vector{},
  197. GPUReq: []*Vector{},
  198. Namespace: namespace,
  199. ClusterID: clusterid,
  200. }
  201. data := model[key]
  202. if name == "container_cpu_allocation" {
  203. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  204. } else if name == "container_memory_allocation_bytes" {
  205. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  206. } else if name == "container_gpu_allocation" {
  207. data.GPUReq = append(data.GPUReq, allocationVector)
  208. }
  209. }
  210. }
  211. return model, nil
  212. }