sql.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. package costmodel
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "time"
  6. costAnalyzerCloud "github.com/opencost/opencost/pkg/cloud"
  7. "github.com/opencost/opencost/pkg/env"
  8. "github.com/opencost/opencost/pkg/log"
  9. "github.com/opencost/opencost/pkg/util"
  10. "github.com/opencost/opencost/pkg/util/json"
  11. _ "github.com/lib/pq"
  12. )
  13. func getPVCosts(db *sql.DB) (map[string]*costAnalyzerCloud.PV, error) {
  14. pvs := make(map[string]*costAnalyzerCloud.PV)
  15. query := `SELECT name, avg(value),labels->>'volumename' AS volumename, labels->>'cluster_id' AS clusterid
  16. FROM metrics
  17. WHERE (name='pv_hourly_cost') AND value != 'NaN' AND value != 0
  18. GROUP BY volumename,name,clusterid;`
  19. rows, err := db.Query(query)
  20. if err != nil {
  21. return nil, err
  22. }
  23. defer rows.Close()
  24. for rows.Next() {
  25. var (
  26. name string
  27. avg float64
  28. volumename string
  29. clusterid string
  30. )
  31. if err := rows.Scan(&name, &avg, &volumename, &clusterid); err != nil {
  32. return nil, err
  33. }
  34. pvs[volumename] = &costAnalyzerCloud.PV{
  35. Cost: fmt.Sprintf("%f", avg),
  36. }
  37. }
  38. return pvs, nil
  39. }
  40. func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
  41. nodes := make(map[string]*costAnalyzerCloud.Node)
  42. query := `SELECT name, avg(value),labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  43. FROM metrics
  44. WHERE (name='node_cpu_hourly_cost' OR name='node_ram_hourly_cost' OR name='node_gpu_hourly_cost') AND value != 'NaN' AND value != 0
  45. GROUP BY instance,name,clusterid`
  46. rows, err := db.Query(query)
  47. if err != nil {
  48. return nil, err
  49. }
  50. defer rows.Close()
  51. for rows.Next() {
  52. var (
  53. name string
  54. avg float64
  55. instance string
  56. clusterid string
  57. )
  58. if err := rows.Scan(&name, &avg, &instance, &clusterid); err != nil {
  59. return nil, err
  60. }
  61. if data, ok := nodes[instance]; ok {
  62. if name == "node_cpu_hourly_cost" {
  63. data.VCPUCost = fmt.Sprintf("%f", avg)
  64. } else if name == "node_ram_hourly_cost" {
  65. data.RAMCost = fmt.Sprintf("%f", avg)
  66. } else if name == "node_gpu_hourly_cost" {
  67. data.GPUCost = fmt.Sprintf("%f", avg)
  68. }
  69. } else {
  70. nodes[instance] = &costAnalyzerCloud.Node{}
  71. data := nodes[instance]
  72. if name == "node_cpu_hourly_cost" {
  73. data.VCPUCost = fmt.Sprintf("%f", avg)
  74. } else if name == "node_ram_hourly_cost" {
  75. data.RAMCost = fmt.Sprintf("%f", avg)
  76. } else if name == "node_gpu_hourly_cost" {
  77. data.GPUCost = fmt.Sprintf("%f", avg)
  78. }
  79. }
  80. }
  81. return nodes, nil
  82. }
  83. func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
  84. pw := env.GetRemotePW()
  85. address := env.GetSQLAddress()
  86. connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
  87. db, err := sql.Open("postgres", connStr)
  88. defer db.Close()
  89. if err != nil {
  90. return nil, err
  91. }
  92. nodes, err := getNodeCosts(db)
  93. if err != nil {
  94. return nil, err
  95. }
  96. model := make(map[string]*CostData)
  97. query := `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  98. FROM metrics
  99. WHERE (name='container_cpu_allocation') AND
  100. time > $2 AND time < $3 AND value != 'NaN'
  101. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  102. ORDER BY container,bucket;
  103. `
  104. rows, err := db.Query(query, window, start, end)
  105. if err != nil {
  106. return nil, err
  107. }
  108. defer rows.Close()
  109. for rows.Next() {
  110. var (
  111. bucket string
  112. name string
  113. sum float64
  114. container string
  115. pod string
  116. namespace string
  117. instance string
  118. clusterid string
  119. )
  120. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  121. return nil, err
  122. }
  123. layout := "2006-01-02T15:04:05Z"
  124. t, err := time.Parse(layout, bucket)
  125. if err != nil {
  126. return nil, err
  127. }
  128. k := NewContainerMetricFromValues(namespace, pod, container, instance, clusterid)
  129. key := k.Key()
  130. allocationVector := &util.Vector{
  131. Timestamp: float64(t.Unix()),
  132. Value: sum,
  133. }
  134. if data, ok := model[key]; ok {
  135. if name == "container_cpu_allocation" {
  136. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  137. } else if name == "container_memory_allocation_bytes" {
  138. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  139. } else if name == "container_gpu_allocation" {
  140. data.GPUReq = append(data.GPUReq, allocationVector)
  141. }
  142. } else {
  143. node, ok := nodes[instance]
  144. if !ok {
  145. return nil, fmt.Errorf("No node found")
  146. }
  147. model[key] = &CostData{
  148. Name: container,
  149. PodName: pod,
  150. NodeName: instance,
  151. NodeData: node,
  152. CPUAllocation: []*util.Vector{},
  153. RAMAllocation: []*util.Vector{},
  154. GPUReq: []*util.Vector{},
  155. Namespace: namespace,
  156. ClusterID: clusterid,
  157. }
  158. data := model[key]
  159. if name == "container_cpu_allocation" {
  160. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  161. } else if name == "container_memory_allocation_bytes" {
  162. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  163. } else if name == "container_gpu_allocation" {
  164. data.GPUReq = append(data.GPUReq, allocationVector)
  165. }
  166. }
  167. }
  168. query = `SELECT time_bucket($1, time) AS bucket, name, avg(value),labels->>'container' AS container,labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'instance' AS instance, labels->>'cluster_id' AS clusterid
  169. FROM metrics
  170. WHERE (name='container_memory_allocation_bytes') AND
  171. time > $2 AND time < $3 AND value != 'NaN'
  172. GROUP BY container,pod,bucket,namespace,instance,clusterid,name
  173. ORDER BY container,bucket;
  174. `
  175. rows, err = db.Query(query, window, start, end)
  176. if err != nil {
  177. return nil, err
  178. }
  179. for rows.Next() {
  180. var (
  181. bucket string
  182. name string
  183. sum float64
  184. container string
  185. pod string
  186. namespace string
  187. instance string
  188. clusterid string
  189. )
  190. if err := rows.Scan(&bucket, &name, &sum, &container, &pod, &namespace, &instance, &clusterid); err != nil {
  191. return nil, err
  192. }
  193. layout := "2006-01-02T15:04:05Z"
  194. t, err := time.Parse(layout, bucket)
  195. if err != nil {
  196. return nil, err
  197. }
  198. k := NewContainerMetricFromValues(namespace, pod, container, instance, clusterid)
  199. key := k.Key()
  200. allocationVector := &util.Vector{
  201. Timestamp: float64(t.Unix()),
  202. Value: sum,
  203. }
  204. if data, ok := model[key]; ok {
  205. if name == "container_cpu_allocation" {
  206. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  207. } else if name == "container_memory_allocation_bytes" {
  208. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  209. } else if name == "container_gpu_allocation" {
  210. data.GPUReq = append(data.GPUReq, allocationVector)
  211. }
  212. } else {
  213. node, ok := nodes[instance]
  214. if !ok {
  215. return nil, fmt.Errorf("No node found")
  216. }
  217. model[key] = &CostData{
  218. Name: container,
  219. PodName: pod,
  220. NodeName: instance,
  221. NodeData: node,
  222. CPUAllocation: []*util.Vector{},
  223. RAMAllocation: []*util.Vector{},
  224. GPUReq: []*util.Vector{},
  225. Namespace: namespace,
  226. ClusterID: clusterid,
  227. }
  228. data := model[key]
  229. if name == "container_cpu_allocation" {
  230. data.CPUAllocation = append(data.CPUAllocation, allocationVector)
  231. } else if name == "container_memory_allocation_bytes" {
  232. data.RAMAllocation = append(data.RAMAllocation, allocationVector)
  233. } else if name == "container_gpu_allocation" {
  234. data.GPUReq = append(data.GPUReq, allocationVector)
  235. }
  236. }
  237. }
  238. query = `SELECT DISTINCT ON (labels->>'namespace') * FROM METRICS WHERE name='kube_namespace_labels' ORDER BY labels->>'namespace',time DESC;`
  239. rows, err = db.Query(query)
  240. if err != nil {
  241. return nil, err
  242. }
  243. cols, err := rows.Columns()
  244. if err != nil {
  245. return nil, err
  246. }
  247. rawResult := make([][]byte, len(cols))
  248. result := make([]string, len(cols))
  249. dest := make([]interface{}, len(cols)) // A temporary interface{} slice
  250. for i, _ := range rawResult {
  251. dest[i] = &rawResult[i] // Put pointers to each string in the interface slice
  252. }
  253. nsToLabels := make(map[string]map[string]string)
  254. for rows.Next() {
  255. err = rows.Scan(dest...)
  256. if err != nil {
  257. return nil, err
  258. }
  259. for i, raw := range rawResult {
  260. if raw == nil {
  261. result[i] = "\\N"
  262. } else {
  263. result[i] = string(raw)
  264. }
  265. }
  266. var dat map[string]string
  267. err := json.Unmarshal([]byte(result[4]), &dat)
  268. if err != nil {
  269. return nil, err
  270. }
  271. ns, ok := dat["namespace"]
  272. if !ok {
  273. return nil, fmt.Errorf("No namespace found")
  274. }
  275. nsToLabels[ns] = dat
  276. }
  277. for _, cd := range model {
  278. ns := cd.Namespace
  279. if labels, ok := nsToLabels[ns]; ok {
  280. cd.NamespaceLabels = labels
  281. cd.Labels = labels // TODO: override with podlabels
  282. }
  283. }
  284. volumes, err := getPVCosts(db)
  285. if err != nil {
  286. log.Infof("Error fetching pv data from sql: %s. Skipping PVData", err.Error())
  287. } else {
  288. query = `SELECT time_bucket($1, time) AS bucket, name, avg(value), labels->>'persistentvolumeclaim' AS claim, labels->>'pod' AS pod,labels->>'namespace' AS namespace, labels->>'persistentvolume' AS volumename, labels->>'cluster_id' AS clusterid
  289. FROM metrics
  290. WHERE (name='pod_pvc_allocation') AND
  291. time > $2 AND time < $3 AND value != 'NaN'
  292. GROUP BY claim,pod,bucket,namespace,volumename,clusterid,name
  293. ORDER BY pod,bucket;`
  294. rows, err = db.Query(query, window, start, end)
  295. if err != nil {
  296. return nil, err
  297. }
  298. pvcData := make(map[string]*PersistentVolumeClaimData)
  299. for rows.Next() {
  300. var (
  301. bucket string
  302. name string
  303. sum float64
  304. claim string
  305. pod string
  306. namespace string
  307. volumename sql.NullString
  308. clusterid string
  309. )
  310. if err := rows.Scan(&bucket, &name, &sum, &claim, &pod, &namespace, &volumename, &clusterid); err != nil {
  311. return nil, err
  312. }
  313. layout := "2006-01-02T15:04:05Z"
  314. t, err := time.Parse(layout, bucket)
  315. if err != nil {
  316. return nil, err
  317. }
  318. allocationVector := &util.Vector{
  319. Timestamp: float64(t.Unix()),
  320. Value: sum,
  321. }
  322. if pvcd, ok := pvcData[claim]; ok {
  323. pvcd.Values = append(pvcd.Values, allocationVector)
  324. } else {
  325. if volumename.Valid {
  326. vname := volumename.String
  327. d := &PersistentVolumeClaimData{
  328. Namespace: namespace,
  329. ClusterID: clusterid,
  330. VolumeName: vname,
  331. Claim: claim,
  332. }
  333. if volume, ok := volumes[vname]; ok {
  334. volume.Size = fmt.Sprintf("%f", sum) // Just assume the claim is the whole volume for now
  335. d.Volume = volume
  336. }
  337. d.Values = append(d.Values, allocationVector)
  338. pvcData[claim] = d
  339. for _, cd := range model { // TODO: make this not doubly nested
  340. if cd.PodName == pod && cd.Namespace == namespace {
  341. if len(cd.PVCData) > 0 {
  342. cd.PVCData = append(cd.PVCData, d)
  343. } else {
  344. cd.PVCData = []*PersistentVolumeClaimData{d}
  345. }
  346. break // break so we only assign to the first
  347. }
  348. }
  349. }
  350. }
  351. }
  352. }
  353. return model, nil
  354. }