sql.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package costmodel
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "fmt"
  6. "os"
  7. "time"
  8. "k8s.io/klog"
  9. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  10. _ "github.com/lib/pq"
  11. )
  12. const remotePW = "REMOTE_WRITE_PASSWORD"
  13. const sqlAddress = "SQL_ADDRESS"
  14. func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
  15. nodes := make(map[string]*costAnalyzerCloud.Node)
  16. query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  17. FROM metrics
  18. WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost') AND value != 'NaN' AND value != 0
  19. GROUP BY instance,name,clusterid`
  20. rows, err := db.Query(query)
  21. if err != nil {
  22. return nil, err
  23. }
  24. defer rows.Close()
  25. for rows.Next() {
  26. var (
  27. name string
  28. avg float64
  29. instance string
  30. clusterid string
  31. )
  32. if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
  33. return nil, err
  34. }
  35. if data, ok := nodes[instance]; ok {
  36. if name == "node_cpu_hourly_cost" {
  37. data.VCPUCost = fmt.Sprintf("%f", avg)
  38. } else if name == "node_ram_hourly_cost" {
  39. data.RAMCost = fmt.Sprintf("%f", avg)
  40. } else if name == "node_gpu_hourly_cost" {
  41. data.GPUCost = fmt.Sprintf("%f", avg)
  42. }
  43. } else {
  44. nodes[instance] = &costAnalyzerCloud.Node{}
  45. data := nodes[instance]
  46. if name == "node_cpu_hourly_cost" {
  47. data.VCPUCost = fmt.Sprintf("%f", avg)
  48. } else if name == "node_ram_hourly_cost" {
  49. data.RAMCost = fmt.Sprintf("%f", avg)
  50. } else if name == "node_gpu_hourly_cost" {
  51. data.GPUCost = fmt.Sprintf("%f", avg)
  52. }
  53. }
  54. }
  55. return nodes, nil
  56. }
  57. func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
  58. pw := os.Getenv(remotePW)
  59. address := os.Getenv(sqlAddress)
  60. connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
  61. db, err := sql.Open("postgres", connStr)
  62. defer db.Close()
  63. if err != nil {
  64. return nil, err
  65. }
  66. nodes, err := getNodeCosts(db)
  67. if err != nil {
  68. return nil, err
  69. }
  70. model := make(map[string]*CostData)
  71. query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  72. FROM metrics
  73. WHERE (name='container_cpu_allocation') AND
  74. time > $2 AND time < $3 AND value != 'NaN'
  75. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  76. ORDER BY container,bucket;
  77. `
  78. rows, err := db.Query(query, window, start, end)
  79. if err != nil {
  80. return nil, err
  81. }
  82. defer rows.Close()
  83. for rows.Next() {
  84. var (
  85. bucket string
  86. name string
  87. sum float64
  88. container string
  89. pod string
  90. namespace string
  91. instance string
  92. clusterid string
  93. )
  94. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  95. return nil, err
  96. }
  97. layout := "2006-01-02T15:04:05Z"
  98. t, err := time.Parse(layout, bucket)
  99. if err != nil {
  100. return nil, err
  101. }
  102. k := newContainerMetricFromValues(namespace, pod, container, instance)
  103. key := k.Key()
  104. allocationVector := &Vector{
  105. Timestamp: float64(t.Unix()),
  106. Value: sum,
  107. }
  108. if data, ok := model[key]; ok {
  109. if name == "container_cpu_allocation" {
  110. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  111. } else if name == "container_memory_allocation_bytes" {
  112. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  113. } else if name == "container_gpu_allocation" {
  114. data.GPUReq = append(data.GPUReq, allocationVector)
  115. }
  116. } else {
  117. node, ok := nodes[instance]
  118. if !ok {
  119. return nil, fmt.Errorf("No node found")
  120. }
  121. model[key] = &CostData{
  122. Name: container,
  123. PodName: pod,
  124. NodeName: instance,
  125. NodeData: node,
  126. CPUAllocation: []*Vector{},
  127. RAMAllocation: []*Vector{},
  128. GPUReq: []*Vector{},
  129. Namespace: namespace,
  130. ClusterID: clusterid,
  131. }
  132. data := model[key]
  133. if name == "container_cpu_allocation" {
  134. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  135. } else if name == "container_memory_allocation_bytes" {
  136. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  137. } else if name == "container_gpu_allocation" {
  138. data.GPUReq = append(data.GPUReq, allocationVector)
  139. }
  140. }
  141. }
  142. query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  143. FROM metrics
  144. WHERE (name='container_memory_allocation_bytes') AND
  145. time > $2 AND time < $3 AND value != 'NaN'
  146. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  147. ORDER BY container,bucket;
  148. `
  149. rows, err = db.Query(query, window, start, end)
  150. if err != nil {
  151. return nil, err
  152. }
  153. for rows.Next() {
  154. var (
  155. bucket string
  156. name string
  157. sum float64
  158. container string
  159. pod string
  160. namespace string
  161. instance string
  162. clusterid string
  163. )
  164. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  165. return nil, err
  166. }
  167. layout := "2006-01-02T15:04:05Z"
  168. t, err := time.Parse(layout, bucket)
  169. if err != nil {
  170. return nil, err
  171. }
  172. k := newContainerMetricFromValues(namespace, pod, container, instance)
  173. key := k.Key()
  174. allocationVector := &Vector{
  175. Timestamp: float64(t.Unix()),
  176. Value: sum,
  177. }
  178. if data, ok := model[key]; ok {
  179. if name == "container_cpu_allocation" {
  180. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  181. } else if name == "container_memory_allocation_bytes" {
  182. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  183. } else if name == "container_gpu_allocation" {
  184. data.GPUReq = append(data.GPUReq, allocationVector)
  185. }
  186. } else {
  187. node, ok := nodes[instance]
  188. if !ok {
  189. return nil, fmt.Errorf("No node found")
  190. }
  191. model[key] = &CostData{
  192. Name: container,
  193. PodName: pod,
  194. NodeName: instance,
  195. NodeData: node,
  196. CPUAllocation: []*Vector{},
  197. RAMAllocation: []*Vector{},
  198. GPUReq: []*Vector{},
  199. Namespace: namespace,
  200. ClusterID: clusterid,
  201. }
  202. data := model[key]
  203. if name == "container_cpu_allocation" {
  204. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  205. } else if name == "container_memory_allocation_bytes" {
  206. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  207. } else if name == "container_gpu_allocation" {
  208. data.GPUReq = append(data.GPUReq, allocationVector)
  209. }
  210. }
  211. }
  212. query = `SELECT DISTINCT ON (labels->>'namespace') * FROM METRICS WHERE name='kube_namespace_labels' ORDER BY labels->>'namespace',time DESC;`
  213. rows, err = db.Query(query)
  214. if err != nil {
  215. return nil, err
  216. }
  217. cols, err := rows.Columns()
  218. if err != nil {
  219. return nil, err
  220. }
  221. rawResult := make([][]byte, len(cols))
  222. result := make([]string, len(cols))
  223. dest := make([]interface{}, len(cols)) // A temporary interface{} slice
  224. for i, _ := range rawResult {
  225. dest[i] = &rawResult[i] // Put pointers to each string in the interface slice
  226. }
  227. nsToLabels := make(map[string]map[string]string)
  228. for rows.Next() {
  229. err = rows.Scan(dest...)
  230. if err != nil {
  231. return nil, err
  232. }
  233. for i, raw := range rawResult {
  234. if raw == nil {
  235. result[i] = "\\N"
  236. } else {
  237. result[i] = string(raw)
  238. }
  239. }
  240. klog.Infof("%#v\n", result)
  241. var dat map[string]string
  242. err := json.Unmarshal([]byte(result[4]), &dat)
  243. if err != nil {
  244. return nil, err
  245. }
  246. ns, ok := dat["namespace"]
  247. if !ok {
  248. return nil, fmt.Errorf("No namespace found")
  249. }
  250. nsToLabels[ns] = dat
  251. }
  252. for _, cd := range model {
  253. ns := cd.Namespace
  254. if labels, ok := nsToLabels[ns]; ok {
  255. cd.NamespaceLabels = labels
  256. cd.Labels = labels // TODO: override with podlabels
  257. }
  258. }
  259. return model, nil
  260. }