sql.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. package costmodel
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "fmt"
  6. "os"
  7. "time"
  8. "k8s.io/klog"
  9. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  10. _ "github.com/lib/pq"
  11. )
  12. const remotePW = "REMOTE_WRITE_PASSWORD"
  13. const sqlAddress = "SQL_ADDRESS"
  14. func getPVCosts(db *sql.DB) (map[string]*costAnalyzerCloud.PV, error) {
  15. pvs := make(map[string]*costAnalyzerCloud.PV)
  16. query := `SELECT name, avg(value),labels->>'volumename' AS volumename, labels->>'cluster_id' AS clusterid
  17. FROM metrics
  18. WHERE (name='pv_hourly_cost') AND value != 'NaN' AND value != 0
  19. GROUP BY volumename,name,clusterid;`
  20. rows, err := db.Query(query)
  21. if err != nil {
  22. return nil, err
  23. }
  24. defer rows.Close()
  25. for rows.Next() {
  26. var (
  27. name string
  28. avg float64
  29. volumename string
  30. clusterid string
  31. )
  32. if err := rows.Scan(&name, &avg, &volumename, &clusterid); err != nil {
  33. return nil, err
  34. }
  35. pvs[volumename] = &costAnalyzerCloud.PV{
  36. Cost: fmt.Sprintf("%f", avg),
  37. }
  38. }
  39. return pvs, nil
  40. }
  41. func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
  42. nodes := make(map[string]*costAnalyzerCloud.Node)
  43. query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  44. FROM metrics
  45. WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost') AND value != 'NaN' AND value != 0
  46. GROUP BY instance,name,clusterid`
  47. rows, err := db.Query(query)
  48. if err != nil {
  49. return nil, err
  50. }
  51. defer rows.Close()
  52. for rows.Next() {
  53. var (
  54. name string
  55. avg float64
  56. instance string
  57. clusterid string
  58. )
  59. if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
  60. return nil, err
  61. }
  62. if data, ok := nodes[instance]; ok {
  63. if name == "node_cpu_hourly_cost" {
  64. data.VCPUCost = fmt.Sprintf("%f", avg)
  65. } else if name == "node_ram_hourly_cost" {
  66. data.RAMCost = fmt.Sprintf("%f", avg)
  67. } else if name == "node_gpu_hourly_cost" {
  68. data.GPUCost = fmt.Sprintf("%f", avg)
  69. }
  70. } else {
  71. nodes[instance] = &costAnalyzerCloud.Node{}
  72. data := nodes[instance]
  73. if name == "node_cpu_hourly_cost" {
  74. data.VCPUCost = fmt.Sprintf("%f", avg)
  75. } else if name == "node_ram_hourly_cost" {
  76. data.RAMCost = fmt.Sprintf("%f", avg)
  77. } else if name == "node_gpu_hourly_cost" {
  78. data.GPUCost = fmt.Sprintf("%f", avg)
  79. }
  80. }
  81. }
  82. return nodes, nil
  83. }
  84. func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
  85. pw := os.Getenv(remotePW)
  86. address := os.Getenv(sqlAddress)
  87. connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
  88. db, err := sql.Open("postgres", connStr)
  89. defer db.Close()
  90. if err != nil {
  91. return nil, err
  92. }
  93. nodes, err := getNodeCosts(db)
  94. if err != nil {
  95. return nil, err
  96. }
  97. model := make(map[string]*CostData)
  98. query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  99. FROM metrics
  100. WHERE (name='container_cpu_allocation') AND
  101. time > $2 AND time < $3 AND value != 'NaN'
  102. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  103. ORDER BY container,bucket;
  104. `
  105. rows, err := db.Query(query, window, start, end)
  106. if err != nil {
  107. return nil, err
  108. }
  109. defer rows.Close()
  110. for rows.Next() {
  111. var (
  112. bucket string
  113. name string
  114. sum float64
  115. container string
  116. pod string
  117. namespace string
  118. instance string
  119. clusterid string
  120. )
  121. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  122. return nil, err
  123. }
  124. layout := "2006-01-02T15:04:05Z"
  125. t, err := time.Parse(layout, bucket)
  126. if err != nil {
  127. return nil, err
  128. }
  129. k := newContainerMetricFromValues(namespace, pod, container, instance)
  130. key := k.Key()
  131. allocationVector := &Vector{
  132. Timestamp: float64(t.Unix()),
  133. Value: sum,
  134. }
  135. if data, ok := model[key]; ok {
  136. if name == "container_cpu_allocation" {
  137. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  138. } else if name == "container_memory_allocation_bytes" {
  139. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  140. } else if name == "container_gpu_allocation" {
  141. data.GPUReq = append(data.GPUReq, allocationVector)
  142. }
  143. } else {
  144. node, ok := nodes[instance]
  145. if !ok {
  146. return nil, fmt.Errorf("No node found")
  147. }
  148. model[key] = &CostData{
  149. Name: container,
  150. PodName: pod,
  151. NodeName: instance,
  152. NodeData: node,
  153. CPUAllocation: []*Vector{},
  154. RAMAllocation: []*Vector{},
  155. GPUReq: []*Vector{},
  156. Namespace: namespace,
  157. ClusterID: clusterid,
  158. }
  159. data := model[key]
  160. if name == "container_cpu_allocation" {
  161. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  162. } else if name == "container_memory_allocation_bytes" {
  163. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  164. } else if name == "container_gpu_allocation" {
  165. data.GPUReq = append(data.GPUReq, allocationVector)
  166. }
  167. }
  168. }
  169. query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  170. FROM metrics
  171. WHERE (name='container_memory_allocation_bytes') AND
  172. time > $2 AND time < $3 AND value != 'NaN'
  173. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  174. ORDER BY container,bucket;
  175. `
  176. rows, err = db.Query(query, window, start, end)
  177. if err != nil {
  178. return nil, err
  179. }
  180. for rows.Next() {
  181. var (
  182. bucket string
  183. name string
  184. sum float64
  185. container string
  186. pod string
  187. namespace string
  188. instance string
  189. clusterid string
  190. )
  191. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  192. return nil, err
  193. }
  194. layout := "2006-01-02T15:04:05Z"
  195. t, err := time.Parse(layout, bucket)
  196. if err != nil {
  197. return nil, err
  198. }
  199. k := newContainerMetricFromValues(namespace, pod, container, instance)
  200. key := k.Key()
  201. allocationVector := &Vector{
  202. Timestamp: float64(t.Unix()),
  203. Value: sum,
  204. }
  205. if data, ok := model[key]; ok {
  206. if name == "container_cpu_allocation" {
  207. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  208. } else if name == "container_memory_allocation_bytes" {
  209. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  210. } else if name == "container_gpu_allocation" {
  211. data.GPUReq = append(data.GPUReq, allocationVector)
  212. }
  213. } else {
  214. node, ok := nodes[instance]
  215. if !ok {
  216. return nil, fmt.Errorf("No node found")
  217. }
  218. model[key] = &CostData{
  219. Name: container,
  220. PodName: pod,
  221. NodeName: instance,
  222. NodeData: node,
  223. CPUAllocation: []*Vector{},
  224. RAMAllocation: []*Vector{},
  225. GPUReq: []*Vector{},
  226. Namespace: namespace,
  227. ClusterID: clusterid,
  228. }
  229. data := model[key]
  230. if name == "container_cpu_allocation" {
  231. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  232. } else if name == "container_memory_allocation_bytes" {
  233. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  234. } else if name == "container_gpu_allocation" {
  235. data.GPUReq = append(data.GPUReq, allocationVector)
  236. }
  237. }
  238. }
  239. query = `SELECT DISTINCT ON (labels->>'namespace') * FROM METRICS WHERE name='kube_namespace_labels' ORDER BY labels->>'namespace',time DESC;`
  240. rows, err = db.Query(query)
  241. if err != nil {
  242. return nil, err
  243. }
  244. cols, err := rows.Columns()
  245. if err != nil {
  246. return nil, err
  247. }
  248. rawResult := make([][]byte, len(cols))
  249. result := make([]string, len(cols))
  250. dest := make([]interface{}, len(cols)) // A temporary interface{} slice
  251. for i, _ := range rawResult {
  252. dest[i] = &rawResult[i] // Put pointers to each string in the interface slice
  253. }
  254. nsToLabels := make(map[string]map[string]string)
  255. for rows.Next() {
  256. err = rows.Scan(dest...)
  257. if err != nil {
  258. return nil, err
  259. }
  260. for i, raw := range rawResult {
  261. if raw == nil {
  262. result[i] = "\\N"
  263. } else {
  264. result[i] = string(raw)
  265. }
  266. }
  267. var dat map[string]string
  268. err := json.Unmarshal([]byte(result[4]), &dat)
  269. if err != nil {
  270. return nil, err
  271. }
  272. ns, ok := dat["namespace"]
  273. if !ok {
  274. return nil, fmt.Errorf("No namespace found")
  275. }
  276. nsToLabels[ns] = dat
  277. }
  278. for _, cd := range model {
  279. ns := cd.Namespace
  280. if labels, ok := nsToLabels[ns]; ok {
  281. cd.NamespaceLabels = labels
  282. cd.Labels = labels // TODO: override with podlabels
  283. }
  284. }
  285. volumes, err := getPVCosts(db)
  286. if err != nil {
  287. klog.Infof("Error fetching pv data from sql: %s. Skipping PVData", err.Error())
  288. } else {
  289. query = `SELECT time_bucket($1, time) AS bucket, name, avg(value), labels->>'persistentvolumeclaim' AS claim, labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'persistentvolume' AS volumename, labels->>'cluster_id' AS clusterid
  290. FROM metrics
  291. WHERE (name='pod_pvc_allocation') AND
  292. time > $2 AND time < $3 AND value != 'NaN'
  293. GROUP BY claim,pod,bucket,namespace,volumename,clusterid,name
  294. ORDER BY pod,bucket;`
  295. rows, err = db.Query(query, window, start, end)
  296. if err != nil {
  297. return nil, err
  298. }
  299. pvcData := make(map[string]*PersistentVolumeClaimData)
  300. for rows.Next() {
  301. var (
  302. bucket string
  303. name string
  304. sum float64
  305. claim string
  306. pod string
  307. namespace string
  308. volumename sql.NullString
  309. clusterid string
  310. )
  311. if err := rows.Scan(&bucket, &name, &sum, &claim, &pod, &namespace, &volumename, &clusterid); err != nil {
  312. return nil, err
  313. }
  314. layout := "2006-01-02T15:04:05Z"
  315. t, err := time.Parse(layout, bucket)
  316. if err != nil {
  317. return nil, err
  318. }
  319. allocationVector := &Vector{
  320. Timestamp: float64(t.Unix()),
  321. Value: sum,
  322. }
  323. if pvcd, ok := pvcData[claim]; ok {
  324. pvcd.Values = append(pvcd.Values, allocationVector)
  325. } else {
  326. if volumename.Valid {
  327. vname := volumename.String
  328. d := &PersistentVolumeClaimData{
  329. Namespace: namespace,
  330. VolumeName: vname,
  331. Claim: claim,
  332. }
  333. if volume, ok := volumes[vname]; ok {
  334. volume.Size = fmt.Sprintf("%f", sum) // Just assume the claim is the whole volume for now
  335. d.Volume = volume
  336. }
  337. d.Values = append(d.Values, allocationVector)
  338. pvcData[claim] = d
  339. for _, cd := range model { // TODO: make this not doubly nested
  340. if cd.PodName == pod && cd.Namespace == namespace {
  341. if len(cd.PVCData) > 0 {
  342. cd.PVCData = append(cd.PVCData, d)
  343. } else {
  344. cd.PVCData = []*PersistentVolumeClaimData{d}
  345. }
  346. break // break so we only assign to the first
  347. }
  348. }
  349. }
  350. }
  351. }
  352. }
  353. return model, nil
  354. }