sql.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. package costmodel
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "fmt"
  6. "os"
  7. "time"
  8. "k8s.io/klog"
  9. costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
  10. "github.com/kubecost/cost-model/pkg/util"
  11. _ "github.com/lib/pq"
  12. )
  13. const remotePW = "REMOTE_WRITE_PASSWORD"
  14. const sqlAddress = "SQL_ADDRESS"
  15. func getPVCosts(db *sql.DB) (map[string]*costAnalyzerCloud.PV, error) {
  16. pvs := make(map[string]*costAnalyzerCloud.PV)
  17. query := `SELECT name, avg(value),labels->>'volumename' AS volumename, labels->>'cluster_id' AS clusterid
  18. FROM metrics
  19. WHERE (name='pv_hourly_cost') AND value != 'NaN' AND value != 0
  20. GROUP BY volumename,name,clusterid;`
  21. rows, err := db.Query(query)
  22. if err != nil {
  23. return nil, err
  24. }
  25. defer rows.Close()
  26. for rows.Next() {
  27. var (
  28. name string
  29. avg float64
  30. volumename string
  31. clusterid string
  32. )
  33. if err := rows.Scan(&name, &avg, &volumename, &clusterid); err != nil {
  34. return nil, err
  35. }
  36. pvs[volumename] = &costAnalyzerCloud.PV{
  37. Cost: fmt.Sprintf("%f", avg),
  38. }
  39. }
  40. return pvs, nil
  41. }
  42. func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
  43. nodes := make(map[string]*costAnalyzerCloud.Node)
  44. query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  45. FROM metrics
  46. WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost') AND value != 'NaN' AND value != 0
  47. GROUP BY instance,name,clusterid`
  48. rows, err := db.Query(query)
  49. if err != nil {
  50. return nil, err
  51. }
  52. defer rows.Close()
  53. for rows.Next() {
  54. var (
  55. name string
  56. avg float64
  57. instance string
  58. clusterid string
  59. )
  60. if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
  61. return nil, err
  62. }
  63. if data, ok := nodes[instance]; ok {
  64. if name == "node_cpu_hourly_cost" {
  65. data.VCPUCost = fmt.Sprintf("%f", avg)
  66. } else if name == "node_ram_hourly_cost" {
  67. data.RAMCost = fmt.Sprintf("%f", avg)
  68. } else if name == "node_gpu_hourly_cost" {
  69. data.GPUCost = fmt.Sprintf("%f", avg)
  70. }
  71. } else {
  72. nodes[instance] = &costAnalyzerCloud.Node{}
  73. data := nodes[instance]
  74. if name == "node_cpu_hourly_cost" {
  75. data.VCPUCost = fmt.Sprintf("%f", avg)
  76. } else if name == "node_ram_hourly_cost" {
  77. data.RAMCost = fmt.Sprintf("%f", avg)
  78. } else if name == "node_gpu_hourly_cost" {
  79. data.GPUCost = fmt.Sprintf("%f", avg)
  80. }
  81. }
  82. }
  83. return nodes, nil
  84. }
  85. func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
  86. pw := os.Getenv(remotePW)
  87. address := os.Getenv(sqlAddress)
  88. connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
  89. db, err := sql.Open("postgres", connStr)
  90. defer db.Close()
  91. if err != nil {
  92. return nil, err
  93. }
  94. nodes, err := getNodeCosts(db)
  95. if err != nil {
  96. return nil, err
  97. }
  98. model := make(map[string]*CostData)
  99. query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  100. FROM metrics
  101. WHERE (name='container_cpu_allocation') AND
  102. time > $2 AND time < $3 AND value != 'NaN'
  103. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  104. ORDER BY container,bucket;
  105. `
  106. rows, err := db.Query(query, window, start, end)
  107. if err != nil {
  108. return nil, err
  109. }
  110. defer rows.Close()
  111. for rows.Next() {
  112. var (
  113. bucket string
  114. name string
  115. sum float64
  116. container string
  117. pod string
  118. namespace string
  119. instance string
  120. clusterid string
  121. )
  122. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  123. return nil, err
  124. }
  125. layout := "2006-01-02T15:04:05Z"
  126. t, err := time.Parse(layout, bucket)
  127. if err != nil {
  128. return nil, err
  129. }
  130. k := newContainerMetricFromValues(namespace, pod, container, instance, clusterid)
  131. key := k.Key()
  132. allocationVector := &util.Vector{
  133. Timestamp: float64(t.Unix()),
  134. Value: sum,
  135. }
  136. if data, ok := model[key]; ok {
  137. if name == "container_cpu_allocation" {
  138. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  139. } else if name == "container_memory_allocation_bytes" {
  140. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  141. } else if name == "container_gpu_allocation" {
  142. data.GPUReq = append(data.GPUReq, allocationVector)
  143. }
  144. } else {
  145. node, ok := nodes[instance]
  146. if !ok {
  147. return nil, fmt.Errorf("No node found")
  148. }
  149. model[key] = &CostData{
  150. Name: container,
  151. PodName: pod,
  152. NodeName: instance,
  153. NodeData: node,
  154. CPUAllocation: []*util.Vector{},
  155. RAMAllocation: []*util.Vector{},
  156. GPUReq: []*util.Vector{},
  157. Namespace: namespace,
  158. ClusterID: clusterid,
  159. }
  160. data := model[key]
  161. if name == "container_cpu_allocation" {
  162. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  163. } else if name == "container_memory_allocation_bytes" {
  164. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  165. } else if name == "container_gpu_allocation" {
  166. data.GPUReq = append(data.GPUReq, allocationVector)
  167. }
  168. }
  169. }
  170. query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  171. FROM metrics
  172. WHERE (name='container_memory_allocation_bytes') AND
  173. time > $2 AND time < $3 AND value != 'NaN'
  174. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  175. ORDER BY container,bucket;
  176. `
  177. rows, err = db.Query(query, window, start, end)
  178. if err != nil {
  179. return nil, err
  180. }
  181. for rows.Next() {
  182. var (
  183. bucket string
  184. name string
  185. sum float64
  186. container string
  187. pod string
  188. namespace string
  189. instance string
  190. clusterid string
  191. )
  192. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  193. return nil, err
  194. }
  195. layout := "2006-01-02T15:04:05Z"
  196. t, err := time.Parse(layout, bucket)
  197. if err != nil {
  198. return nil, err
  199. }
  200. k := newContainerMetricFromValues(namespace, pod, container, instance, clusterid)
  201. key := k.Key()
  202. allocationVector := &util.Vector{
  203. Timestamp: float64(t.Unix()),
  204. Value: sum,
  205. }
  206. if data, ok := model[key]; ok {
  207. if name == "container_cpu_allocation" {
  208. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  209. } else if name == "container_memory_allocation_bytes" {
  210. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  211. } else if name == "container_gpu_allocation" {
  212. data.GPUReq = append(data.GPUReq, allocationVector)
  213. }
  214. } else {
  215. node, ok := nodes[instance]
  216. if !ok {
  217. return nil, fmt.Errorf("No node found")
  218. }
  219. model[key] = &CostData{
  220. Name: container,
  221. PodName: pod,
  222. NodeName: instance,
  223. NodeData: node,
  224. CPUAllocation: []*util.Vector{},
  225. RAMAllocation: []*util.Vector{},
  226. GPUReq: []*util.Vector{},
  227. Namespace: namespace,
  228. ClusterID: clusterid,
  229. }
  230. data := model[key]
  231. if name == "container_cpu_allocation" {
  232. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  233. } else if name == "container_memory_allocation_bytes" {
  234. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  235. } else if name == "container_gpu_allocation" {
  236. data.GPUReq = append(data.GPUReq, allocationVector)
  237. }
  238. }
  239. }
  240. query = `SELECT DISTINCT ON (labels->>'namespace') * FROM METRICS WHERE name='kube_namespace_labels' ORDER BY labels->>'namespace',time DESC;`
  241. rows, err = db.Query(query)
  242. if err != nil {
  243. return nil, err
  244. }
  245. cols, err := rows.Columns()
  246. if err != nil {
  247. return nil, err
  248. }
  249. rawResult := make([][]byte, len(cols))
  250. result := make([]string, len(cols))
  251. dest := make([]interface{}, len(cols)) // A temporary interface{} slice
  252. for i, _ := range rawResult {
  253. dest[i] = &rawResult[i] // Put pointers to each string in the interface slice
  254. }
  255. nsToLabels := make(map[string]map[string]string)
  256. for rows.Next() {
  257. err = rows.Scan(dest...)
  258. if err != nil {
  259. return nil, err
  260. }
  261. for i, raw := range rawResult {
  262. if raw == nil {
  263. result[i] = "\\N"
  264. } else {
  265. result[i] = string(raw)
  266. }
  267. }
  268. var dat map[string]string
  269. err := json.Unmarshal([]byte(result[4]), &dat)
  270. if err != nil {
  271. return nil, err
  272. }
  273. ns, ok := dat["namespace"]
  274. if !ok {
  275. return nil, fmt.Errorf("No namespace found")
  276. }
  277. nsToLabels[ns] = dat
  278. }
  279. for _, cd := range model {
  280. ns := cd.Namespace
  281. if labels, ok := nsToLabels[ns]; ok {
  282. cd.NamespaceLabels = labels
  283. cd.Labels = labels // TODO: override with podlabels
  284. }
  285. }
  286. volumes, err := getPVCosts(db)
  287. if err != nil {
  288. klog.Infof("Error fetching pv data from sql: %s. Skipping PVData", err.Error())
  289. } else {
  290. query = `SELECT time_bucket($1, time) AS bucket, name, avg(value), labels->>'persistentvolumeclaim' AS claim, labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'persistentvolume' AS volumename, labels->>'cluster_id' AS clusterid
  291. FROM metrics
  292. WHERE (name='pod_pvc_allocation') AND
  293. time > $2 AND time < $3 AND value != 'NaN'
  294. GROUP BY claim,pod,bucket,namespace,volumename,clusterid,name
  295. ORDER BY pod,bucket;`
  296. rows, err = db.Query(query, window, start, end)
  297. if err != nil {
  298. return nil, err
  299. }
  300. pvcData := make(map[string]*PersistentVolumeClaimData)
  301. for rows.Next() {
  302. var (
  303. bucket string
  304. name string
  305. sum float64
  306. claim string
  307. pod string
  308. namespace string
  309. volumename sql.NullString
  310. clusterid string
  311. )
  312. if err := rows.Scan(&bucket, &name, &sum, &claim, &pod, &namespace, &volumename, &clusterid); err != nil {
  313. return nil, err
  314. }
  315. layout := "2006-01-02T15:04:05Z"
  316. t, err := time.Parse(layout, bucket)
  317. if err != nil {
  318. return nil, err
  319. }
  320. allocationVector := &util.Vector{
  321. Timestamp: float64(t.Unix()),
  322. Value: sum,
  323. }
  324. if pvcd, ok := pvcData[claim]; ok {
  325. pvcd.Values = append(pvcd.Values, allocationVector)
  326. } else {
  327. if volumename.Valid {
  328. vname := volumename.String
  329. d := &PersistentVolumeClaimData{
  330. Namespace: namespace,
  331. ClusterID: clusterid,
  332. VolumeName: vname,
  333. Claim: claim,
  334. }
  335. if volume, ok := volumes[vname]; ok {
  336. volume.Size = fmt.Sprintf("%f", sum) // Just assume the claim is the whole volume for now
  337. d.Volume = volume
  338. }
  339. d.Values = append(d.Values, allocationVector)
  340. pvcData[claim] = d
  341. for _, cd := range model { // TODO: make this not doubly nested
  342. if cd.PodName == pod && cd.Namespace == namespace {
  343. if len(cd.PVCData) > 0 {
  344. cd.PVCData = append(cd.PVCData, d)
  345. } else {
  346. cd.PVCData = []*PersistentVolumeClaimData{d}
  347. }
  348. break // break so we only assign to the first
  349. }
  350. }
  351. }
  352. }
  353. }
  354. }
  355. return model, nil
  356. }