cluster.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package costmodel
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  7. prometheusClient "github.com/prometheus/client_golang/api"
  8. "k8s.io/klog"
  9. )
  10. const (
  11. queryClusterCores = `sum(
  12. avg(kube_node_status_capacity_cpu_cores %s) by (node, cluster_id) * avg(node_cpu_hourly_cost %s) by (node, cluster_id) * 730 +
  13. avg(node_gpu_hourly_cost %s) by (node, cluster_id) * 730
  14. ) by (cluster_id)`
  15. queryClusterRAM = `sum(
  16. avg(kube_node_status_capacity_memory_bytes %s) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node, cluster_id) * 730
  17. ) by (cluster_id)`
  18. queryStorage = `sum(
  19. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  20. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  21. ) by (cluster_id) %s`
  22. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  23. sum(
  24. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  25. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  26. ) by (cluster_id) %s`
  27. )
  28. type Totals struct {
  29. TotalCost [][]string `json:"totalcost"`
  30. CPUCost [][]string `json:"cpucost"`
  31. MemCost [][]string `json:"memcost"`
  32. StorageCost [][]string `json:"storageCost"`
  33. }
  34. func resultToTotals(qr interface{}) ([][]string, error) {
  35. data, ok := qr.(map[string]interface{})["data"]
  36. if !ok {
  37. e, err := wrapPrometheusError(qr)
  38. if err != nil {
  39. return nil, err
  40. }
  41. return nil, fmt.Errorf(e)
  42. }
  43. r, ok := data.(map[string]interface{})["result"]
  44. if !ok {
  45. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  46. }
  47. results, ok := r.([]interface{})
  48. if !ok {
  49. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  50. }
  51. if len(results) == 0 {
  52. return nil, fmt.Errorf("Not enough data available in the selected time range")
  53. }
  54. res, ok := results[0].(map[string]interface{})["values"]
  55. totals := [][]string{}
  56. for _, val := range res.([]interface{}) {
  57. if !ok {
  58. return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  59. }
  60. dataPoint, ok := val.([]interface{})
  61. if !ok || len(dataPoint) != 2 {
  62. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  63. }
  64. d0 := fmt.Sprintf("%f", dataPoint[0].(float64))
  65. toAppend := []string{
  66. d0,
  67. dataPoint[1].(string),
  68. }
  69. totals = append(totals, toAppend)
  70. }
  71. return totals, nil
  72. }
  73. func resultToTotal(qr interface{}) (map[string][][]string, error) {
  74. defaultClusterID := os.Getenv(clusterIDKey)
  75. data, ok := qr.(map[string]interface{})["data"]
  76. if !ok {
  77. e, err := wrapPrometheusError(qr)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return nil, fmt.Errorf("Prometheus query error: %s", e)
  82. }
  83. r, ok := data.(map[string]interface{})["result"]
  84. if !ok {
  85. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  86. }
  87. results, ok := r.([]interface{})
  88. if !ok {
  89. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  90. }
  91. if len(results) == 0 {
  92. return nil, fmt.Errorf("Not enough data available in the selected time range")
  93. }
  94. toReturn := make(map[string][][]string)
  95. for i := range results {
  96. metrics, ok := results[i].(map[string]interface{})["metric"]
  97. if !ok {
  98. return nil, fmt.Errorf("Improperly formatted results from prometheus, metric is not a field in the vector")
  99. }
  100. metricMap, ok := metrics.(map[string]interface{})
  101. cid, ok := metricMap["cluster_id"]
  102. if !ok {
  103. klog.V(4).Info("Prometheus vector does not have cluster id")
  104. cid = defaultClusterID
  105. }
  106. clusterID, ok := cid.(string)
  107. if !ok {
  108. return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
  109. }
  110. val, ok := results[i].(map[string]interface{})["value"]
  111. if !ok {
  112. return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  113. }
  114. dataPoint, ok := val.([]interface{})
  115. if !ok || len(dataPoint) != 2 {
  116. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  117. }
  118. d0 := fmt.Sprintf("%f", dataPoint[0].(float64))
  119. toAppend := []string{
  120. d0,
  121. dataPoint[1].(string),
  122. }
  123. if t, ok := toReturn[clusterID]; ok {
  124. t = append(t, toAppend)
  125. } else {
  126. toReturn[clusterID] = [][]string{toAppend}
  127. }
  128. }
  129. return toReturn, nil
  130. }
  131. // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
  132. func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
  133. offset = fmt.Sprintf("offset 3h") // Set offset to 3h for block sync
  134. qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
  135. qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
  136. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, "")
  137. klog.V(4).Infof("Running query %s", qCores)
  138. resultClusterCores, err := Query(cli, qCores)
  139. if err != nil {
  140. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  141. }
  142. klog.V(4).Infof("Running query %s", qRAM)
  143. resultClusterRAM, err := Query(cli, qRAM)
  144. if err != nil {
  145. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  146. }
  147. klog.V(4).Infof("Running query %s", qRAM)
  148. resultStorage, err := Query(cli, qStorage)
  149. if err != nil {
  150. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  151. }
  152. toReturn := make(map[string]*Totals)
  153. coreTotal, err := resultToTotal(resultClusterCores)
  154. if err != nil {
  155. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  156. }
  157. for clusterID, total := range coreTotal {
  158. if _, ok := toReturn[clusterID]; !ok {
  159. toReturn[clusterID] = &Totals{}
  160. }
  161. toReturn[clusterID].CPUCost = total
  162. }
  163. ramTotal, err := resultToTotal(resultClusterRAM)
  164. if err != nil {
  165. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  166. }
  167. for clusterID, total := range ramTotal {
  168. if _, ok := toReturn[clusterID]; !ok {
  169. toReturn[clusterID] = &Totals{}
  170. }
  171. toReturn[clusterID].MemCost = total
  172. }
  173. storageTotal, err := resultToTotal(resultStorage)
  174. if err != nil {
  175. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  176. }
  177. for clusterID, total := range storageTotal {
  178. if _, ok := toReturn[clusterID]; !ok {
  179. toReturn[clusterID] = &Totals{}
  180. }
  181. toReturn[clusterID].StorageCost = total
  182. }
  183. return toReturn, nil
  184. }
  185. // ClusterCosts gives the current full cluster costs averaged over a window of time.
  186. func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
  187. localStorageQuery, err := cloud.GetLocalStorageQuery()
  188. if err != nil {
  189. return nil, err
  190. }
  191. if localStorageQuery != "" {
  192. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  193. }
  194. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  195. if offset != "" {
  196. offset = fmt.Sprintf("offset %s", offset)
  197. }
  198. qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
  199. qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
  200. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  201. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  202. resultClusterCores, err := Query(cli, qCores)
  203. if err != nil {
  204. return nil, err
  205. }
  206. resultClusterRAM, err := Query(cli, qRAM)
  207. if err != nil {
  208. return nil, err
  209. }
  210. resultStorage, err := Query(cli, qStorage)
  211. if err != nil {
  212. return nil, err
  213. }
  214. resultTotal, err := Query(cli, qTotal)
  215. if err != nil {
  216. return nil, err
  217. }
  218. coreTotal, err := resultToTotal(resultClusterCores)
  219. if err != nil {
  220. return nil, err
  221. }
  222. ramTotal, err := resultToTotal(resultClusterRAM)
  223. if err != nil {
  224. return nil, err
  225. }
  226. storageTotal, err := resultToTotal(resultStorage)
  227. if err != nil {
  228. return nil, err
  229. }
  230. clusterTotal, err := resultToTotal(resultTotal)
  231. if err != nil {
  232. return nil, err
  233. }
  234. defaultClusterID := os.Getenv(clusterIDKey)
  235. return &Totals{
  236. TotalCost: clusterTotal[defaultClusterID],
  237. CPUCost: coreTotal[defaultClusterID],
  238. MemCost: ramTotal[defaultClusterID],
  239. StorageCost: storageTotal[defaultClusterID],
  240. }, nil
  241. }
  242. // ClusterCostsOverTime gives the full cluster costs over time
  243. func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  244. localStorageQuery, err := cloud.GetLocalStorageQuery()
  245. if err != nil {
  246. return nil, err
  247. }
  248. if localStorageQuery != "" {
  249. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  250. }
  251. layout := "2006-01-02T15:04:05.000Z"
  252. start, err := time.Parse(layout, startString)
  253. if err != nil {
  254. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  255. return nil, err
  256. }
  257. end, err := time.Parse(layout, endString)
  258. if err != nil {
  259. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  260. return nil, err
  261. }
  262. window, err := time.ParseDuration(windowString)
  263. if err != nil {
  264. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  265. return nil, err
  266. }
  267. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  268. if offset != "" {
  269. offset = fmt.Sprintf("offset %s", offset)
  270. }
  271. qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
  272. qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
  273. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  274. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  275. resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
  276. if err != nil {
  277. return nil, err
  278. }
  279. resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
  280. if err != nil {
  281. return nil, err
  282. }
  283. resultStorage, err := QueryRange(cli, qStorage, start, end, window)
  284. if err != nil {
  285. return nil, err
  286. }
  287. resultTotal, err := QueryRange(cli, qTotal, start, end, window)
  288. if err != nil {
  289. return nil, err
  290. }
  291. coreTotal, err := resultToTotals(resultClusterCores)
  292. if err != nil {
  293. return nil, err
  294. }
  295. ramTotal, err := resultToTotals(resultClusterRAM)
  296. if err != nil {
  297. return nil, err
  298. }
  299. storageTotal, err := resultToTotals(resultStorage)
  300. if err != nil {
  301. return nil, err
  302. }
  303. clusterTotal, err := resultToTotals(resultTotal)
  304. if err != nil {
  305. return nil, err
  306. }
  307. return &Totals{
  308. TotalCost: clusterTotal,
  309. CPUCost: coreTotal,
  310. MemCost: ramTotal,
  311. StorageCost: storageTotal,
  312. }, nil
  313. }