cluster.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package costmodel
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  7. prometheusClient "github.com/prometheus/client_golang/api"
  8. "k8s.io/klog"
  9. )
  10. const (
  11. queryClusterCores = `sum(
  12. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  13. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  14. ) by (cluster_id)`
  15. queryClusterRAM = `sum(
  16. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  17. ) by (cluster_id)`
  18. queryStorage = `sum(
  19. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  20. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  21. ) by (cluster_id) %s`
  22. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  23. sum(
  24. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  25. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  26. ) by (cluster_id) %s`
  27. )
  28. type Totals struct {
  29. TotalCost [][]string `json:"totalcost"`
  30. CPUCost [][]string `json:"cpucost"`
  31. MemCost [][]string `json:"memcost"`
  32. StorageCost [][]string `json:"storageCost"`
  33. }
  34. func resultToTotals(qr interface{}) ([][]string, error) {
  35. results, err := NewQueryResults(qr)
  36. if err != nil {
  37. return nil, err
  38. }
  39. if len(results) == 0 {
  40. return nil, fmt.Errorf("Not enough data available in the selected time range")
  41. }
  42. result := results[0]
  43. totals := [][]string{}
  44. for _, value := range result.Values {
  45. d0 := fmt.Sprintf("%f", value.Timestamp)
  46. d1 := fmt.Sprintf("%f", value.Value)
  47. toAppend := []string{
  48. d0,
  49. d1,
  50. }
  51. totals = append(totals, toAppend)
  52. }
  53. return totals, nil
  54. }
  55. func resultToTotal(qr interface{}) (map[string][][]string, error) {
  56. defaultClusterID := os.Getenv(clusterIDKey)
  57. results, err := NewQueryResults(qr)
  58. if err != nil {
  59. return nil, err
  60. }
  61. toReturn := make(map[string][][]string)
  62. for _, result := range results {
  63. clusterID, _ := result.GetString("cluster_id")
  64. if clusterID == "" {
  65. clusterID = defaultClusterID
  66. }
  67. // Expect a single value only
  68. if len(result.Values) == 0 {
  69. klog.V(1).Infof("[Warning] Metric values did not contain any valid data.")
  70. continue
  71. }
  72. value := result.Values[0]
  73. d0 := fmt.Sprintf("%f", value.Timestamp)
  74. d1 := fmt.Sprintf("%f", value.Value)
  75. toAppend := []string{
  76. d0,
  77. d1,
  78. }
  79. if t, ok := toReturn[clusterID]; ok {
  80. t = append(t, toAppend)
  81. } else {
  82. toReturn[clusterID] = [][]string{toAppend}
  83. }
  84. }
  85. return toReturn, nil
  86. }
  87. // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
  88. func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, window, offset string) (map[string]*Totals, error) {
  89. if offset != "" {
  90. offset = fmt.Sprintf("offset %s", offset)
  91. }
  92. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  93. if err != nil {
  94. return nil, err
  95. }
  96. if localStorageQuery != "" {
  97. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  98. }
  99. qCores := fmt.Sprintf(queryClusterCores, window, offset, window, offset, window, offset)
  100. qRAM := fmt.Sprintf(queryClusterRAM, window, offset, window, offset)
  101. qStorage := fmt.Sprintf(queryStorage, window, offset, window, offset, localStorageQuery)
  102. klog.Infof("[Debug] qCores: %s", qCores)
  103. klog.Infof("[Debug] qRAM: %s", qRAM)
  104. klog.Infof("[Debug] qStorage: %s", qStorage)
  105. klog.V(4).Infof("Running query %s", qCores)
  106. resultClusterCores, err := Query(cli, qCores)
  107. if err != nil {
  108. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  109. }
  110. klog.V(4).Infof("Running query %s", qRAM)
  111. resultClusterRAM, err := Query(cli, qRAM)
  112. if err != nil {
  113. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  114. }
  115. klog.V(4).Infof("Running query %s", qRAM)
  116. resultStorage, err := Query(cli, qStorage)
  117. if err != nil {
  118. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  119. }
  120. toReturn := make(map[string]*Totals)
  121. coreTotal, err := resultToTotal(resultClusterCores)
  122. if err != nil {
  123. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  124. }
  125. for clusterID, total := range coreTotal {
  126. if _, ok := toReturn[clusterID]; !ok {
  127. toReturn[clusterID] = &Totals{}
  128. }
  129. toReturn[clusterID].CPUCost = total
  130. }
  131. ramTotal, err := resultToTotal(resultClusterRAM)
  132. if err != nil {
  133. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  134. }
  135. for clusterID, total := range ramTotal {
  136. if _, ok := toReturn[clusterID]; !ok {
  137. toReturn[clusterID] = &Totals{}
  138. }
  139. toReturn[clusterID].MemCost = total
  140. }
  141. storageTotal, err := resultToTotal(resultStorage)
  142. if err != nil {
  143. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  144. }
  145. for clusterID, total := range storageTotal {
  146. if _, ok := toReturn[clusterID]; !ok {
  147. toReturn[clusterID] = &Totals{}
  148. }
  149. toReturn[clusterID].StorageCost = total
  150. }
  151. return toReturn, nil
  152. }
  153. // ClusterCosts gives the current full cluster costs averaged over a window of time.
  154. func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
  155. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  156. if offset != "" {
  157. offset = fmt.Sprintf("offset %s", offset)
  158. }
  159. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  160. if err != nil {
  161. return nil, err
  162. }
  163. if localStorageQuery != "" {
  164. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  165. }
  166. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  167. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  168. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  169. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  170. resultClusterCores, err := Query(cli, qCores)
  171. if err != nil {
  172. return nil, err
  173. }
  174. resultClusterRAM, err := Query(cli, qRAM)
  175. if err != nil {
  176. return nil, err
  177. }
  178. resultStorage, err := Query(cli, qStorage)
  179. if err != nil {
  180. return nil, err
  181. }
  182. resultTotal, err := Query(cli, qTotal)
  183. if err != nil {
  184. return nil, err
  185. }
  186. coreTotal, err := resultToTotal(resultClusterCores)
  187. if err != nil {
  188. return nil, err
  189. }
  190. ramTotal, err := resultToTotal(resultClusterRAM)
  191. if err != nil {
  192. return nil, err
  193. }
  194. storageTotal, err := resultToTotal(resultStorage)
  195. if err != nil {
  196. return nil, err
  197. }
  198. clusterTotal, err := resultToTotal(resultTotal)
  199. if err != nil {
  200. return nil, err
  201. }
  202. defaultClusterID := os.Getenv(clusterIDKey)
  203. return &Totals{
  204. TotalCost: clusterTotal[defaultClusterID],
  205. CPUCost: coreTotal[defaultClusterID],
  206. MemCost: ramTotal[defaultClusterID],
  207. StorageCost: storageTotal[defaultClusterID],
  208. }, nil
  209. }
  210. // ClusterCostsOverTime gives the full cluster costs over time
  211. func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  212. localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
  213. if err != nil {
  214. return nil, err
  215. }
  216. if localStorageQuery != "" {
  217. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  218. }
  219. layout := "2006-01-02T15:04:05.000Z"
  220. start, err := time.Parse(layout, startString)
  221. if err != nil {
  222. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  223. return nil, err
  224. }
  225. end, err := time.Parse(layout, endString)
  226. if err != nil {
  227. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  228. return nil, err
  229. }
  230. window, err := time.ParseDuration(windowString)
  231. if err != nil {
  232. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  233. return nil, err
  234. }
  235. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  236. if offset != "" {
  237. offset = fmt.Sprintf("offset %s", offset)
  238. }
  239. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  240. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  241. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  242. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  243. resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
  244. if err != nil {
  245. return nil, err
  246. }
  247. resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
  248. if err != nil {
  249. return nil, err
  250. }
  251. resultStorage, err := QueryRange(cli, qStorage, start, end, window)
  252. if err != nil {
  253. return nil, err
  254. }
  255. resultTotal, err := QueryRange(cli, qTotal, start, end, window)
  256. if err != nil {
  257. return nil, err
  258. }
  259. coreTotal, err := resultToTotals(resultClusterCores)
  260. if err != nil {
  261. return nil, err
  262. }
  263. ramTotal, err := resultToTotals(resultClusterRAM)
  264. if err != nil {
  265. return nil, err
  266. }
  267. storageTotal, err := resultToTotals(resultStorage)
  268. if err != nil {
  269. return nil, err
  270. }
  271. clusterTotal, err := resultToTotals(resultTotal)
  272. if err != nil {
  273. return nil, err
  274. }
  275. return &Totals{
  276. TotalCost: clusterTotal,
  277. CPUCost: coreTotal,
  278. MemCost: ramTotal,
  279. StorageCost: storageTotal,
  280. }, nil
  281. }