main.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "os"
  10. "reflect"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "k8s.io/klog"
  15. "github.com/julienschmidt/httprouter"
  16. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  17. costModel "github.com/kubecost/cost-model/costmodel"
  18. prometheusClient "github.com/prometheus/client_golang/api"
  19. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  20. v1 "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "github.com/prometheus/client_golang/prometheus"
  23. "github.com/prometheus/client_golang/prometheus/promhttp"
  24. "k8s.io/apimachinery/pkg/fields"
  25. "k8s.io/client-go/kubernetes"
  26. "k8s.io/client-go/rest"
  27. "k8s.io/client-go/tools/cache"
  28. )
  29. const (
  30. prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
  31. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  32. remoteEnabled = "REMOTE_WRITE_ENABLED"
  33. )
  34. var (
  35. // gitCommit is set by the build system
  36. gitCommit string
  37. )
  38. type Accesses struct {
  39. PrometheusClient prometheusClient.Client
  40. KubeClientSet kubernetes.Interface
  41. Cloud costAnalyzerCloud.Provider
  42. CPUPriceRecorder *prometheus.GaugeVec
  43. RAMPriceRecorder *prometheus.GaugeVec
  44. PersistentVolumePriceRecorder *prometheus.GaugeVec
  45. GPUPriceRecorder *prometheus.GaugeVec
  46. NodeTotalPriceRecorder *prometheus.GaugeVec
  47. RAMAllocationRecorder *prometheus.GaugeVec
  48. CPUAllocationRecorder *prometheus.GaugeVec
  49. GPUAllocationRecorder *prometheus.GaugeVec
  50. ContainerUptimeRecorder *prometheus.GaugeVec
  51. NetworkZoneEgressRecorder *prometheus.GaugeVec
  52. NetworkRegionEgressRecorder *prometheus.GaugeVec
  53. NetworkInternetEgressRecorder *prometheus.GaugeVec
  54. ServiceSelectorRecorder *prometheus.GaugeVec
  55. DeploymentSelectorRecorder *prometheus.GaugeVec
  56. Model *costModel.CostModel
  57. }
  58. type DataEnvelope struct {
  59. Code int `json:"code"`
  60. Status string `json:"status"`
  61. Data interface{} `json:"data"`
  62. Message string `json:"message,omitempty"`
  63. }
  64. func wrapData(data interface{}, err error) []byte {
  65. var resp []byte
  66. if err != nil {
  67. klog.V(1).Infof("Error returned to client: %s", err.Error())
  68. resp, _ = json.Marshal(&DataEnvelope{
  69. Code: 500,
  70. Status: "error",
  71. Message: err.Error(),
  72. Data: data,
  73. })
  74. } else {
  75. resp, _ = json.Marshal(&DataEnvelope{
  76. Code: 200,
  77. Status: "success",
  78. Data: data,
  79. })
  80. }
  81. return resp
  82. }
  83. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  84. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  85. w.Header().Set("Content-Type", "application/json")
  86. w.Header().Set("Access-Control-Allow-Origin", "*")
  87. err := a.Cloud.DownloadPricingData()
  88. w.Write(wrapData(nil, err))
  89. }
  90. func filterFields(fields string, data map[string]*costModel.CostData) map[string]costModel.CostData {
  91. fs := strings.Split(fields, ",")
  92. fmap := make(map[string]bool)
  93. for _, f := range fs {
  94. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  95. klog.V(1).Infof("to delete: %s", fieldNameLower)
  96. fmap[fieldNameLower] = true
  97. }
  98. filteredData := make(map[string]costModel.CostData)
  99. for cname, costdata := range data {
  100. s := reflect.TypeOf(*costdata)
  101. val := reflect.ValueOf(*costdata)
  102. costdata2 := costModel.CostData{}
  103. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  104. n := s.NumField()
  105. for i := 0; i < n; i++ {
  106. field := s.Field(i)
  107. value := val.Field(i)
  108. value2 := cd2.Field(i)
  109. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  110. value2.Set(reflect.Value(value))
  111. }
  112. }
  113. filteredData[cname] = cd2.Interface().(costModel.CostData)
  114. }
  115. return filteredData
  116. }
  117. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  118. w.Header().Set("Content-Type", "application/json")
  119. w.Header().Set("Access-Control-Allow-Origin", "*")
  120. window := r.URL.Query().Get("timeWindow")
  121. offset := r.URL.Query().Get("offset")
  122. fields := r.URL.Query().Get("filterFields")
  123. namespace := r.URL.Query().Get("namespace")
  124. if offset != "" {
  125. offset = "offset " + offset
  126. }
  127. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  128. if fields != "" {
  129. filteredData := filterFields(fields, data)
  130. w.Write(wrapData(filteredData, err))
  131. } else {
  132. w.Write(wrapData(data, err))
  133. }
  134. }
  135. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  136. w.Header().Set("Content-Type", "application/json")
  137. w.Header().Set("Access-Control-Allow-Origin", "*")
  138. window := r.URL.Query().Get("window")
  139. offset := r.URL.Query().Get("offset")
  140. if offset != "" {
  141. offset = "offset " + offset
  142. }
  143. data, err := costModel.ClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
  144. w.Write(wrapData(data, err))
  145. }
  146. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  147. w.Header().Set("Content-Type", "application/json")
  148. w.Header().Set("Access-Control-Allow-Origin", "*")
  149. start := r.URL.Query().Get("start")
  150. end := r.URL.Query().Get("end")
  151. window := r.URL.Query().Get("window")
  152. offset := r.URL.Query().Get("offset")
  153. if offset != "" {
  154. offset = "offset " + offset
  155. }
  156. data, err := costModel.ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  157. w.Write(wrapData(data, err))
  158. }
  159. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  160. w.Header().Set("Content-Type", "application/json")
  161. w.Header().Set("Access-Control-Allow-Origin", "*")
  162. start := r.URL.Query().Get("start")
  163. end := r.URL.Query().Get("end")
  164. window := r.URL.Query().Get("window")
  165. fields := r.URL.Query().Get("filterFields")
  166. namespace := r.URL.Query().Get("namespace")
  167. data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace)
  168. if fields != "" {
  169. filteredData := filterFields(fields, data)
  170. w.Write(wrapData(filteredData, err))
  171. } else {
  172. w.Write(wrapData(data, err))
  173. }
  174. }
  175. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  176. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  177. w.Header().Set("Content-Type", "application/json")
  178. w.Header().Set("Access-Control-Allow-Origin", "*")
  179. startString := r.URL.Query().Get("start")
  180. endString := r.URL.Query().Get("end")
  181. windowString := r.URL.Query().Get("window")
  182. layout := "2006-01-02T15:04:05.000Z"
  183. var start time.Time
  184. var end time.Time
  185. var err error
  186. if windowString == "" {
  187. windowString = "1h"
  188. }
  189. if startString != "" {
  190. start, err = time.Parse(layout, startString)
  191. if err != nil {
  192. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  193. w.Write(wrapData(nil, err))
  194. }
  195. } else {
  196. window, err := time.ParseDuration(windowString)
  197. if err != nil {
  198. w.Write(wrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  199. }
  200. start = time.Now().Add(-2 * window)
  201. }
  202. if endString != "" {
  203. end, err = time.Parse(layout, endString)
  204. if err != nil {
  205. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  206. w.Write(wrapData(nil, err))
  207. }
  208. } else {
  209. end = time.Now()
  210. }
  211. remoteLayout := "2006-01-02T15:04:05Z"
  212. remoteStartStr := start.Format(remoteLayout)
  213. remoteEndStr := end.Format(remoteLayout)
  214. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  215. data, err := costModel.CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  216. w.Write(wrapData(data, err))
  217. }
  218. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  219. w.Header().Set("Content-Type", "application/json")
  220. w.Header().Set("Access-Control-Allow-Origin", "*")
  221. start := r.URL.Query().Get("start")
  222. end := r.URL.Query().Get("end")
  223. aggregator := r.URL.Query().Get("aggregator")
  224. data, err := a.Cloud.ExternalAllocations(start, end, aggregator)
  225. w.Write(wrapData(data, err))
  226. }
  227. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  228. w.Header().Set("Content-Type", "application/json")
  229. w.Header().Set("Access-Control-Allow-Origin", "*")
  230. data, err := p.Cloud.AllNodePricing()
  231. w.Write(wrapData(data, err))
  232. }
  233. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  234. w.Header().Set("Content-Type", "application/json")
  235. w.Header().Set("Access-Control-Allow-Origin", "*")
  236. data, err := p.Cloud.GetConfig()
  237. w.Write(wrapData(data, err))
  238. }
  239. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  240. w.Header().Set("Content-Type", "application/json")
  241. w.Header().Set("Access-Control-Allow-Origin", "*")
  242. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  243. if err != nil {
  244. w.Write(wrapData(data, err))
  245. return
  246. }
  247. w.Write(wrapData(data, err))
  248. err = p.Cloud.DownloadPricingData()
  249. if err != nil {
  250. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  251. }
  252. return
  253. }
  254. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  255. w.Header().Set("Content-Type", "application/json")
  256. w.Header().Set("Access-Control-Allow-Origin", "*")
  257. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  258. if err != nil {
  259. w.Write(wrapData(data, err))
  260. return
  261. }
  262. w.Write(wrapData(data, err))
  263. return
  264. }
  265. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  266. w.Header().Set("Content-Type", "application/json")
  267. w.Header().Set("Access-Control-Allow-Origin", "*")
  268. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  269. if err != nil {
  270. w.Write(wrapData(data, err))
  271. return
  272. }
  273. w.Write(wrapData(data, err))
  274. return
  275. }
  276. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  277. w.Header().Set("Content-Type", "application/json")
  278. w.Header().Set("Access-Control-Allow-Origin", "*")
  279. data, err := p.Cloud.UpdateConfig(r.Body, "")
  280. if err != nil {
  281. w.Write(wrapData(data, err))
  282. return
  283. }
  284. w.Write(wrapData(data, err))
  285. return
  286. }
  287. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  288. w.Header().Set("Content-Type", "application/json")
  289. w.Header().Set("Access-Control-Allow-Origin", "*")
  290. data, err := p.Cloud.GetManagementPlatform()
  291. if err != nil {
  292. w.Write(wrapData(data, err))
  293. return
  294. }
  295. w.Write(wrapData(data, err))
  296. return
  297. }
  298. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  299. w.Header().Set("Content-Type", "application/json")
  300. w.Header().Set("Access-Control-Allow-Origin", "*")
  301. data, err := p.Cloud.ClusterInfo()
  302. w.Write(wrapData(data, err))
  303. }
  304. func Healthz(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  305. w.WriteHeader(200)
  306. w.Header().Set("Content-Length", "0")
  307. w.Header().Set("Content-Type", "text/plain")
  308. }
  309. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  310. w.Header().Set("Content-Type", "application/json")
  311. w.Header().Set("Access-Control-Allow-Origin", "*")
  312. w.Write(wrapData(costModel.ValidatePrometheus(p.PrometheusClient)))
  313. }
  314. func (p *Accesses) ContainerUptimes(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  315. w.Header().Set("Content-Type", "application/json")
  316. w.Header().Set("Access-Control-Allow-Origin", "*")
  317. res, err := costModel.ComputeUptimes(p.PrometheusClient)
  318. w.Write(wrapData(res, err))
  319. }
  320. func (a *Accesses) recordPrices() {
  321. go func() {
  322. containerSeen := make(map[string]bool)
  323. nodeSeen := make(map[string]bool)
  324. pvSeen := make(map[string]bool)
  325. getKeyFromLabelStrings := func(labels ...string) string {
  326. return strings.Join(labels, ",")
  327. }
  328. getLabelStringsFromKey := func(key string) []string {
  329. return strings.Split(key, ",")
  330. }
  331. for {
  332. klog.V(4).Info("Recording prices...")
  333. podlist := a.Model.Controller.GetAll()
  334. podStatus := make(map[string]v1.PodPhase)
  335. for _, pod := range podlist {
  336. podStatus[pod.Name] = pod.Status.Phase
  337. }
  338. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
  339. if err != nil {
  340. klog.V(1).Info("Error in price recording: " + err.Error())
  341. // zero the for loop so the time.Sleep will still work
  342. data = map[string]*costModel.CostData{}
  343. }
  344. for _, costs := range data {
  345. nodeName := costs.NodeName
  346. node := costs.NodeData
  347. if node == nil {
  348. klog.V(4).Infof("Skipping Node \"%s\" due to missing Node Data costs", nodeName)
  349. continue
  350. }
  351. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  352. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  353. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  354. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  355. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  356. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  357. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  358. if costs.PVCData != nil {
  359. for _, pvc := range costs.PVCData {
  360. if pvc.Volume != nil {
  361. pvCost, _ := strconv.ParseFloat(pvc.Volume.Cost, 64)
  362. a.PersistentVolumePriceRecorder.WithLabelValues(pvc.VolumeName, pvc.VolumeName).Set(pvCost)
  363. }
  364. }
  365. }
  366. a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(cpuCost)
  367. a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName).Set(ramCost)
  368. a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(gpuCost)
  369. a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName).Set(totalCost)
  370. labelKey := getKeyFromLabelStrings(nodeName, nodeName)
  371. nodeSeen[labelKey] = true
  372. namespace := costs.Namespace
  373. podName := costs.PodName
  374. containerName := costs.Name
  375. if len(costs.RAMAllocation) > 0 {
  376. a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  377. }
  378. if len(costs.CPUAllocation) > 0 {
  379. a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  380. }
  381. if len(costs.GPUReq) > 0 {
  382. // allocation here is set to the request because shared GPU usage not yet supported.
  383. a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  384. }
  385. labelKey = getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  386. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  387. containerSeen[labelKey] = true
  388. } else {
  389. klog.Infof("Container %s not running", labelKey)
  390. containerSeen[labelKey] = false
  391. }
  392. storageClasses, _ := a.KubeClientSet.StorageV1().StorageClasses().List(metav1.ListOptions{})
  393. storageClassMap := make(map[string]map[string]string)
  394. for _, storageClass := range storageClasses.Items {
  395. params := storageClass.Parameters
  396. storageClassMap[storageClass.ObjectMeta.Name] = params
  397. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  398. storageClassMap["default"] = params
  399. storageClassMap[""] = params
  400. }
  401. }
  402. pvs, _ := a.KubeClientSet.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
  403. for _, pv := range pvs.Items {
  404. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  405. if !ok {
  406. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  407. }
  408. cacPv := &costAnalyzerCloud.PV{
  409. Class: pv.Spec.StorageClassName,
  410. Region: pv.Labels[v1.LabelZoneRegion],
  411. Parameters: parameters,
  412. }
  413. costModel.GetPVCost(cacPv, &pv, a.Cloud)
  414. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  415. a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
  416. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  417. pvSeen[labelKey] = true
  418. }
  419. containerUptime, _ := costModel.ComputeUptimes(a.PrometheusClient)
  420. for key, uptime := range containerUptime {
  421. container, _ := costModel.NewContainerMetricFromKey(key)
  422. a.ContainerUptimeRecorder.WithLabelValues(container.Namespace, container.PodName, container.ContainerName).Set(uptime)
  423. }
  424. }
  425. for labelString, seen := range nodeSeen {
  426. if !seen {
  427. labels := getLabelStringsFromKey(labelString)
  428. a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  429. a.CPUPriceRecorder.DeleteLabelValues(labels...)
  430. a.GPUPriceRecorder.DeleteLabelValues(labels...)
  431. a.RAMPriceRecorder.DeleteLabelValues(labels...)
  432. delete(nodeSeen, labelString)
  433. }
  434. nodeSeen[labelString] = false
  435. }
  436. for labelString, seen := range containerSeen {
  437. if !seen {
  438. labels := getLabelStringsFromKey(labelString)
  439. a.RAMAllocationRecorder.DeleteLabelValues(labels...)
  440. a.CPUAllocationRecorder.DeleteLabelValues(labels...)
  441. a.GPUAllocationRecorder.DeleteLabelValues(labels...)
  442. a.ContainerUptimeRecorder.DeleteLabelValues(labels...)
  443. delete(containerSeen, labelString)
  444. }
  445. containerSeen[labelString] = false
  446. }
  447. for labelString, seen := range pvSeen {
  448. if !seen {
  449. labels := getLabelStringsFromKey(labelString)
  450. a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  451. delete(pvSeen, labelString)
  452. }
  453. pvSeen[labelString] = false
  454. }
  455. time.Sleep(time.Minute)
  456. }
  457. }()
  458. }
  459. func main() {
  460. klog.InitFlags(nil)
  461. flag.Set("v", "3")
  462. flag.Parse()
  463. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  464. address := os.Getenv(prometheusServerEndpointEnvVar)
  465. if address == "" {
  466. klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
  467. }
  468. var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
  469. Proxy: http.ProxyFromEnvironment,
  470. DialContext: (&net.Dialer{
  471. Timeout: 120 * time.Second,
  472. KeepAlive: 120 * time.Second,
  473. }).DialContext,
  474. TLSHandshakeTimeout: 10 * time.Second,
  475. }
  476. pc := prometheusClient.Config{
  477. Address: address,
  478. RoundTripper: LongTimeoutRoundTripper,
  479. }
  480. promCli, _ := prometheusClient.NewClient(pc)
  481. api := prometheusAPI.NewAPI(promCli)
  482. _, err := api.Config(context.Background())
  483. if err != nil {
  484. klog.Fatalf("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  485. }
  486. klog.V(1).Info("Success: retrieved a prometheus config file from: " + address)
  487. _, err = costModel.ValidatePrometheus(promCli)
  488. if err != nil {
  489. klog.Fatalf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  490. }
  491. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  492. // Kubernetes API setup
  493. kc, err := rest.InClusterConfig()
  494. if err != nil {
  495. panic(err.Error())
  496. }
  497. kubeClientset, err := kubernetes.NewForConfig(kc)
  498. if err != nil {
  499. panic(err.Error())
  500. }
  501. cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
  502. cloudProvider, err := costAnalyzerCloud.NewProvider(kubeClientset, cloudProviderKey)
  503. if err != nil {
  504. panic(err.Error())
  505. }
  506. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  507. Name: "node_cpu_hourly_cost",
  508. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  509. }, []string{"instance", "node"})
  510. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  511. Name: "node_ram_hourly_cost",
  512. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  513. }, []string{"instance", "node"})
  514. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  515. Name: "node_gpu_hourly_cost",
  516. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  517. }, []string{"instance", "node"})
  518. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  519. Name: "node_total_hourly_cost",
  520. Help: "node_total_hourly_cost Total node cost per hour",
  521. }, []string{"instance", "node"})
  522. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  523. Name: "pv_hourly_cost",
  524. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  525. }, []string{"volumename", "persistentvolume"})
  526. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  527. Name: "container_memory_allocation_bytes",
  528. Help: "container_memory_allocation_bytes Bytes of RAM used",
  529. }, []string{"namespace", "pod", "container", "instance", "node"})
  530. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  531. Name: "container_cpu_allocation",
  532. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  533. }, []string{"namespace", "pod", "container", "instance", "node"})
  534. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  535. Name: "container_gpu_allocation",
  536. Help: "container_gpu_allocation GPU used",
  537. }, []string{"namespace", "pod", "container", "instance", "node"})
  538. ContainerUptimeRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  539. Name: "container_uptime_seconds",
  540. Help: "container_uptime_seconds Seconds a container has been running",
  541. }, []string{"namespace", "pod", "container"})
  542. NetworkZoneEgressRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  543. Name: "kubecost_network_zone_egress_cost",
  544. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  545. }, []string{"namespace", "pod"})
  546. NetworkRegionEgressRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  547. Name: "kubecost_network_region_egress_cost",
  548. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  549. }, []string{"namespace", "pod"})
  550. NetworkInternetEgressRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  551. Name: "kubecost_network_internet_egress_cost",
  552. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  553. }, []string{"namespace", "pod"})
  554. prometheus.MustRegister(cpuGv)
  555. prometheus.MustRegister(ramGv)
  556. prometheus.MustRegister(gpuGv)
  557. prometheus.MustRegister(totalGv)
  558. prometheus.MustRegister(pvGv)
  559. prometheus.MustRegister(RAMAllocation)
  560. prometheus.MustRegister(CPUAllocation)
  561. prometheus.MustRegister(ContainerUptimeRecorder)
  562. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  563. prometheus.MustRegister(costModel.ServiceCollector{
  564. KubeClientSet: kubeClientset,
  565. })
  566. prometheus.MustRegister(costModel.DeploymentCollector{
  567. KubeClientSet: kubeClientset,
  568. })
  569. podCache := cache.NewListWatchFromClient(kubeClientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
  570. a := Accesses{
  571. PrometheusClient: promCli,
  572. KubeClientSet: kubeClientset,
  573. Cloud: cloudProvider,
  574. CPUPriceRecorder: cpuGv,
  575. RAMPriceRecorder: ramGv,
  576. GPUPriceRecorder: gpuGv,
  577. NodeTotalPriceRecorder: totalGv,
  578. RAMAllocationRecorder: RAMAllocation,
  579. CPUAllocationRecorder: CPUAllocation,
  580. GPUAllocationRecorder: GPUAllocation,
  581. ContainerUptimeRecorder: ContainerUptimeRecorder,
  582. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  583. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  584. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  585. PersistentVolumePriceRecorder: pvGv,
  586. Model: costModel.NewCostModel(podCache),
  587. }
  588. remoteEnabled := os.Getenv(remoteEnabled)
  589. if remoteEnabled == "true" {
  590. info, err := cloudProvider.ClusterInfo()
  591. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  592. if err != nil {
  593. klog.Infof("Error saving cluster id %s", err.Error())
  594. }
  595. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  596. if err != nil {
  597. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  598. }
  599. }
  600. err = a.Cloud.DownloadPricingData()
  601. if err != nil {
  602. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  603. }
  604. a.recordPrices()
  605. router := httprouter.New()
  606. router.GET("/costDataModel", a.CostDataModel)
  607. router.GET("/costDataModelRange", a.CostDataModelRange)
  608. router.GET("/costDataModelRangeLarge", a.CostDataModelRangeLarge)
  609. router.GET("/outOfClusterCosts", a.OutofClusterCosts)
  610. router.GET("/allNodePricing", a.GetAllNodePricing)
  611. router.GET("/healthz", Healthz)
  612. router.GET("/getConfigs", a.GetConfigs)
  613. router.POST("/refreshPricing", a.RefreshPricingData)
  614. router.POST("/updateSpotInfoConfigs", a.UpdateSpotInfoConfigs)
  615. router.POST("/updateAthenaInfoConfigs", a.UpdateAthenaInfoConfigs)
  616. router.POST("/updateBigQueryInfoConfigs", a.UpdateBigQueryInfoConfigs)
  617. router.POST("/updateConfigByKey", a.UpdateConfigByKey)
  618. router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  619. router.GET("/clusterCosts", a.ClusterCosts)
  620. router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  621. router.GET("/managementPlatform", a.ManagementPlatform)
  622. router.GET("/clusterInfo", a.ClusterInfo)
  623. router.GET("/containerUptimes", a.ContainerUptimes)
  624. rootMux := http.NewServeMux()
  625. rootMux.Handle("/", router)
  626. rootMux.Handle("/metrics", promhttp.Handler())
  627. klog.Fatal(http.ListenAndServe(":9003", rootMux))
  628. }