cluster.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package costmodel
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  7. prometheusClient "github.com/prometheus/client_golang/api"
  8. "k8s.io/klog"
  9. )
  10. const (
  11. queryClusterCores = `sum(
  12. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  13. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  14. ) by (cluster_id)`
  15. queryClusterRAM = `sum(
  16. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  17. ) by (cluster_id)`
  18. queryStorage = `sum(
  19. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  20. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  21. ) by (cluster_id) %s`
  22. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  23. sum(
  24. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  25. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  26. ) by (cluster_id) %s`
  27. )
  28. type Totals struct {
  29. TotalCost [][]string `json:"totalcost"`
  30. CPUCost [][]string `json:"cpucost"`
  31. MemCost [][]string `json:"memcost"`
  32. StorageCost [][]string `json:"storageCost"`
  33. }
  34. func resultToTotals(qr interface{}) ([][]string, error) {
  35. results, err := NewQueryResults(qr)
  36. if err != nil {
  37. return nil, err
  38. }
  39. if len(results) == 0 {
  40. return nil, fmt.Errorf("Not enough data available in the selected time range")
  41. }
  42. result := results[0]
  43. totals := [][]string{}
  44. for _, value := range result.Values {
  45. d0 := fmt.Sprintf("%f", value.Timestamp)
  46. d1 := fmt.Sprintf("%f", value.Value)
  47. toAppend := []string{
  48. d0,
  49. d1,
  50. }
  51. totals = append(totals, toAppend)
  52. }
  53. return totals, nil
  54. }
  55. func resultToTotal(qr interface{}) (map[string][][]string, error) {
  56. defaultClusterID := os.Getenv(clusterIDKey)
  57. results, err := NewQueryResults(qr)
  58. if err != nil {
  59. return nil, err
  60. }
  61. toReturn := make(map[string][][]string)
  62. for _, result := range results {
  63. clusterID, _ := result.GetString("cluster_id")
  64. if clusterID == "" {
  65. clusterID = defaultClusterID
  66. }
  67. // Expect a single value only
  68. if len(result.Values) == 0 {
  69. klog.V(1).Infof("[Warning] Metric values did not contain any valid data.")
  70. continue
  71. }
  72. value := result.Values[0]
  73. d0 := fmt.Sprintf("%f", value.Timestamp)
  74. d1 := fmt.Sprintf("%f", value.Value)
  75. toAppend := []string{
  76. d0,
  77. d1,
  78. }
  79. if t, ok := toReturn[clusterID]; ok {
  80. t = append(t, toAppend)
  81. } else {
  82. toReturn[clusterID] = [][]string{toAppend}
  83. }
  84. }
  85. return toReturn, nil
  86. }
  87. // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
  88. func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
  89. if offset != "" {
  90. offset = fmt.Sprintf("offset %s", offset)
  91. }
  92. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  93. if err != nil {
  94. return nil, err
  95. }
  96. if localStorageQuery != "" {
  97. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  98. }
  99. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  100. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  101. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  102. klog.V(4).Infof("Running query %s", qCores)
  103. resultClusterCores, err := Query(cli, qCores)
  104. if err != nil {
  105. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  106. }
  107. klog.V(4).Infof("Running query %s", qRAM)
  108. resultClusterRAM, err := Query(cli, qRAM)
  109. if err != nil {
  110. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  111. }
  112. klog.V(4).Infof("Running query %s", qRAM)
  113. resultStorage, err := Query(cli, qStorage)
  114. if err != nil {
  115. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  116. }
  117. toReturn := make(map[string]*Totals)
  118. coreTotal, err := resultToTotal(resultClusterCores)
  119. if err != nil {
  120. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  121. }
  122. for clusterID, total := range coreTotal {
  123. if _, ok := toReturn[clusterID]; !ok {
  124. toReturn[clusterID] = &Totals{}
  125. }
  126. toReturn[clusterID].CPUCost = total
  127. }
  128. ramTotal, err := resultToTotal(resultClusterRAM)
  129. if err != nil {
  130. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  131. }
  132. for clusterID, total := range ramTotal {
  133. if _, ok := toReturn[clusterID]; !ok {
  134. toReturn[clusterID] = &Totals{}
  135. }
  136. toReturn[clusterID].MemCost = total
  137. }
  138. storageTotal, err := resultToTotal(resultStorage)
  139. if err != nil {
  140. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  141. }
  142. for clusterID, total := range storageTotal {
  143. if _, ok := toReturn[clusterID]; !ok {
  144. toReturn[clusterID] = &Totals{}
  145. }
  146. toReturn[clusterID].StorageCost = total
  147. }
  148. return toReturn, nil
  149. }
  150. // ClusterCosts gives the current full cluster costs averaged over a window of time.
  151. func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
  152. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  153. if offset != "" {
  154. offset = fmt.Sprintf("offset %s", offset)
  155. }
  156. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  157. if err != nil {
  158. return nil, err
  159. }
  160. if localStorageQuery != "" {
  161. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  162. }
  163. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  164. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  165. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  166. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  167. resultClusterCores, err := Query(cli, qCores)
  168. if err != nil {
  169. return nil, err
  170. }
  171. resultClusterRAM, err := Query(cli, qRAM)
  172. if err != nil {
  173. return nil, err
  174. }
  175. resultStorage, err := Query(cli, qStorage)
  176. if err != nil {
  177. return nil, err
  178. }
  179. resultTotal, err := Query(cli, qTotal)
  180. if err != nil {
  181. return nil, err
  182. }
  183. coreTotal, err := resultToTotal(resultClusterCores)
  184. if err != nil {
  185. return nil, err
  186. }
  187. ramTotal, err := resultToTotal(resultClusterRAM)
  188. if err != nil {
  189. return nil, err
  190. }
  191. storageTotal, err := resultToTotal(resultStorage)
  192. if err != nil {
  193. return nil, err
  194. }
  195. clusterTotal, err := resultToTotal(resultTotal)
  196. if err != nil {
  197. return nil, err
  198. }
  199. defaultClusterID := os.Getenv(clusterIDKey)
  200. return &Totals{
  201. TotalCost: clusterTotal[defaultClusterID],
  202. CPUCost: coreTotal[defaultClusterID],
  203. MemCost: ramTotal[defaultClusterID],
  204. StorageCost: storageTotal[defaultClusterID],
  205. }, nil
  206. }
  207. // ClusterCostsOverTime gives the full cluster costs over time
  208. func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  209. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  210. if err != nil {
  211. return nil, err
  212. }
  213. if localStorageQuery != "" {
  214. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  215. }
  216. layout := "2006-01-02T15:04:05.000Z"
  217. start, err := time.Parse(layout, startString)
  218. if err != nil {
  219. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  220. return nil, err
  221. }
  222. end, err := time.Parse(layout, endString)
  223. if err != nil {
  224. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  225. return nil, err
  226. }
  227. window, err := time.ParseDuration(windowString)
  228. if err != nil {
  229. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  230. return nil, err
  231. }
  232. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  233. if offset != "" {
  234. offset = fmt.Sprintf("offset %s", offset)
  235. }
  236. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  237. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  238. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  239. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  240. resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
  241. if err != nil {
  242. return nil, err
  243. }
  244. resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
  245. if err != nil {
  246. return nil, err
  247. }
  248. resultStorage, err := QueryRange(cli, qStorage, start, end, window)
  249. if err != nil {
  250. return nil, err
  251. }
  252. resultTotal, err := QueryRange(cli, qTotal, start, end, window)
  253. if err != nil {
  254. return nil, err
  255. }
  256. coreTotal, err := resultToTotals(resultClusterCores)
  257. if err != nil {
  258. return nil, err
  259. }
  260. ramTotal, err := resultToTotals(resultClusterRAM)
  261. if err != nil {
  262. return nil, err
  263. }
  264. storageTotal, err := resultToTotals(resultStorage)
  265. if err != nil {
  266. return nil, err
  267. }
  268. clusterTotal, err := resultToTotals(resultTotal)
  269. if err != nil {
  270. return nil, err
  271. }
  272. return &Totals{
  273. TotalCost: clusterTotal,
  274. CPUCost: coreTotal,
  275. MemCost: ramTotal,
  276. StorageCost: storageTotal,
  277. }, nil
  278. }