cluster.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. package costmodel
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  7. prometheusClient "github.com/prometheus/client_golang/api"
  8. "k8s.io/klog"
  9. )
  10. const (
  11. queryClusterCores = `sum(
  12. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  13. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  14. ) by (cluster_id)`
  15. queryClusterRAM = `sum(
  16. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  17. ) by (cluster_id)`
  18. queryStorage = `sum(
  19. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  20. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  21. ) by (cluster_id) %s`
  22. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  23. sum(
  24. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  25. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  26. ) by (cluster_id) %s`
  27. )
  28. type Totals struct {
  29. TotalCost [][]string `json:"totalcost"`
  30. CPUCost [][]string `json:"cpucost"`
  31. MemCost [][]string `json:"memcost"`
  32. StorageCost [][]string `json:"storageCost"`
  33. }
  34. func resultToTotals(qr interface{}) ([][]string, error) {
  35. data, ok := qr.(map[string]interface{})["data"]
  36. if !ok {
  37. e, err := wrapPrometheusError(qr)
  38. if err != nil {
  39. return nil, err
  40. }
  41. return nil, fmt.Errorf(e)
  42. }
  43. r, ok := data.(map[string]interface{})["result"]
  44. if !ok {
  45. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  46. }
  47. results, ok := r.([]interface{})
  48. if !ok {
  49. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  50. }
  51. if len(results) == 0 {
  52. return nil, fmt.Errorf("Not enough data available in the selected time range")
  53. }
  54. res, ok := results[0].(map[string]interface{})["values"]
  55. totals := [][]string{}
  56. for _, val := range res.([]interface{}) {
  57. if !ok {
  58. return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  59. }
  60. dataPoint, ok := val.([]interface{})
  61. if !ok || len(dataPoint) != 2 {
  62. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  63. }
  64. d0 := fmt.Sprintf("%f", dataPoint[0].(float64))
  65. toAppend := []string{
  66. d0,
  67. dataPoint[1].(string),
  68. }
  69. totals = append(totals, toAppend)
  70. }
  71. return totals, nil
  72. }
  73. func resultToTotal(qr interface{}) (map[string][][]string, error) {
  74. defaultClusterID := os.Getenv(clusterIDKey)
  75. data, ok := qr.(map[string]interface{})["data"]
  76. if !ok {
  77. e, err := wrapPrometheusError(qr)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return nil, fmt.Errorf("Prometheus query error: %s", e)
  82. }
  83. r, ok := data.(map[string]interface{})["result"]
  84. if !ok {
  85. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  86. }
  87. results, ok := r.([]interface{})
  88. if !ok {
  89. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  90. }
  91. if len(results) == 0 {
  92. return nil, fmt.Errorf("Not enough data available in the selected time range")
  93. }
  94. toReturn := make(map[string][][]string)
  95. for i := range results {
  96. metrics, ok := results[i].(map[string]interface{})["metric"]
  97. if !ok {
  98. return nil, fmt.Errorf("Improperly formatted results from prometheus, metric is not a field in the vector")
  99. }
  100. metricMap, ok := metrics.(map[string]interface{})
  101. cid, ok := metricMap["cluster_id"]
  102. if !ok {
  103. klog.V(4).Info("Prometheus vector does not have cluster id")
  104. cid = defaultClusterID
  105. }
  106. clusterID, ok := cid.(string)
  107. if !ok {
  108. return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
  109. }
  110. val, ok := results[i].(map[string]interface{})["value"]
  111. if !ok {
  112. return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  113. }
  114. dataPoint, ok := val.([]interface{})
  115. if !ok || len(dataPoint) != 2 {
  116. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  117. }
  118. d0 := fmt.Sprintf("%f", dataPoint[0].(float64))
  119. toAppend := []string{
  120. d0,
  121. dataPoint[1].(string),
  122. }
  123. if t, ok := toReturn[clusterID]; ok {
  124. t = append(t, toAppend)
  125. } else {
  126. toReturn[clusterID] = [][]string{toAppend}
  127. }
  128. }
  129. return toReturn, nil
  130. }
  131. // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
  132. func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
  133. if offset != "" {
  134. offset = fmt.Sprintf("offset %s", offset)
  135. }
  136. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  137. if err != nil {
  138. return nil, err
  139. }
  140. if localStorageQuery != "" {
  141. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  142. }
  143. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  144. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  145. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  146. klog.V(4).Infof("Running query %s", qCores)
  147. resultClusterCores, err := Query(cli, qCores)
  148. if err != nil {
  149. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  150. }
  151. klog.V(4).Infof("Running query %s", qRAM)
  152. resultClusterRAM, err := Query(cli, qRAM)
  153. if err != nil {
  154. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  155. }
  156. klog.V(4).Infof("Running query %s", qRAM)
  157. resultStorage, err := Query(cli, qStorage)
  158. if err != nil {
  159. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  160. }
  161. toReturn := make(map[string]*Totals)
  162. coreTotal, err := resultToTotal(resultClusterCores)
  163. if err != nil {
  164. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  165. }
  166. for clusterID, total := range coreTotal {
  167. if _, ok := toReturn[clusterID]; !ok {
  168. toReturn[clusterID] = &Totals{}
  169. }
  170. toReturn[clusterID].CPUCost = total
  171. }
  172. ramTotal, err := resultToTotal(resultClusterRAM)
  173. if err != nil {
  174. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  175. }
  176. for clusterID, total := range ramTotal {
  177. if _, ok := toReturn[clusterID]; !ok {
  178. toReturn[clusterID] = &Totals{}
  179. }
  180. toReturn[clusterID].MemCost = total
  181. }
  182. storageTotal, err := resultToTotal(resultStorage)
  183. if err != nil {
  184. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  185. }
  186. for clusterID, total := range storageTotal {
  187. if _, ok := toReturn[clusterID]; !ok {
  188. toReturn[clusterID] = &Totals{}
  189. }
  190. toReturn[clusterID].StorageCost = total
  191. }
  192. return toReturn, nil
  193. }
  194. // ClusterCosts gives the current full cluster costs averaged over a window of time.
  195. func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
  196. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  197. if offset != "" {
  198. offset = fmt.Sprintf("offset %s", offset)
  199. }
  200. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  201. if err != nil {
  202. return nil, err
  203. }
  204. if localStorageQuery != "" {
  205. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  206. }
  207. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  208. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  209. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  210. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  211. resultClusterCores, err := Query(cli, qCores)
  212. if err != nil {
  213. return nil, err
  214. }
  215. resultClusterRAM, err := Query(cli, qRAM)
  216. if err != nil {
  217. return nil, err
  218. }
  219. resultStorage, err := Query(cli, qStorage)
  220. if err != nil {
  221. return nil, err
  222. }
  223. resultTotal, err := Query(cli, qTotal)
  224. if err != nil {
  225. return nil, err
  226. }
  227. coreTotal, err := resultToTotal(resultClusterCores)
  228. if err != nil {
  229. return nil, err
  230. }
  231. ramTotal, err := resultToTotal(resultClusterRAM)
  232. if err != nil {
  233. return nil, err
  234. }
  235. storageTotal, err := resultToTotal(resultStorage)
  236. if err != nil {
  237. return nil, err
  238. }
  239. clusterTotal, err := resultToTotal(resultTotal)
  240. if err != nil {
  241. return nil, err
  242. }
  243. defaultClusterID := os.Getenv(clusterIDKey)
  244. return &Totals{
  245. TotalCost: clusterTotal[defaultClusterID],
  246. CPUCost: coreTotal[defaultClusterID],
  247. MemCost: ramTotal[defaultClusterID],
  248. StorageCost: storageTotal[defaultClusterID],
  249. }, nil
  250. }
  251. // ClusterCostsOverTime gives the full cluster costs over time
  252. func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  253. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  254. if err != nil {
  255. return nil, err
  256. }
  257. if localStorageQuery != "" {
  258. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  259. }
  260. layout := "2006-01-02T15:04:05.000Z"
  261. start, err := time.Parse(layout, startString)
  262. if err != nil {
  263. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  264. return nil, err
  265. }
  266. end, err := time.Parse(layout, endString)
  267. if err != nil {
  268. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  269. return nil, err
  270. }
  271. window, err := time.ParseDuration(windowString)
  272. if err != nil {
  273. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  274. return nil, err
  275. }
  276. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  277. if offset != "" {
  278. offset = fmt.Sprintf("offset %s", offset)
  279. }
  280. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  281. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  282. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  283. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  284. resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
  285. if err != nil {
  286. return nil, err
  287. }
  288. resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
  289. if err != nil {
  290. return nil, err
  291. }
  292. resultStorage, err := QueryRange(cli, qStorage, start, end, window)
  293. if err != nil {
  294. return nil, err
  295. }
  296. resultTotal, err := QueryRange(cli, qTotal, start, end, window)
  297. if err != nil {
  298. return nil, err
  299. }
  300. coreTotal, err := resultToTotals(resultClusterCores)
  301. if err != nil {
  302. return nil, err
  303. }
  304. ramTotal, err := resultToTotals(resultClusterRAM)
  305. if err != nil {
  306. return nil, err
  307. }
  308. storageTotal, err := resultToTotals(resultStorage)
  309. if err != nil {
  310. return nil, err
  311. }
  312. clusterTotal, err := resultToTotals(resultTotal)
  313. if err != nil {
  314. return nil, err
  315. }
  316. return &Totals{
  317. TotalCost: clusterTotal,
  318. CPUCost: coreTotal,
  319. MemCost: ramTotal,
  320. StorageCost: storageTotal,
  321. }, nil
  322. }