router.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "os"
  10. "reflect"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "k8s.io/klog"
  15. "github.com/julienschmidt/httprouter"
  16. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  17. "github.com/patrickmn/go-cache"
  18. prometheusClient "github.com/prometheus/client_golang/api"
  19. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  20. "github.com/prometheus/client_golang/prometheus"
  21. v1 "k8s.io/api/core/v1"
  22. "k8s.io/client-go/kubernetes"
  23. "k8s.io/client-go/rest"
  24. )
  25. const (
  26. prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
  27. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  28. )
  29. var (
  30. // gitCommit is set by the build system
  31. gitCommit string
  32. )
  33. var Router = httprouter.New()
  34. var A Accesses
  35. type Accesses struct {
  36. PrometheusClient prometheusClient.Client
  37. ThanosClient prometheusClient.Client
  38. KubeClientSet kubernetes.Interface
  39. Cloud costAnalyzerCloud.Provider
  40. CPUPriceRecorder *prometheus.GaugeVec
  41. RAMPriceRecorder *prometheus.GaugeVec
  42. PersistentVolumePriceRecorder *prometheus.GaugeVec
  43. GPUPriceRecorder *prometheus.GaugeVec
  44. NodeTotalPriceRecorder *prometheus.GaugeVec
  45. RAMAllocationRecorder *prometheus.GaugeVec
  46. CPUAllocationRecorder *prometheus.GaugeVec
  47. GPUAllocationRecorder *prometheus.GaugeVec
  48. PVAllocationRecorder *prometheus.GaugeVec
  49. ContainerUptimeRecorder *prometheus.GaugeVec
  50. NetworkZoneEgressRecorder prometheus.Gauge
  51. NetworkRegionEgressRecorder prometheus.Gauge
  52. NetworkInternetEgressRecorder prometheus.Gauge
  53. ServiceSelectorRecorder *prometheus.GaugeVec
  54. DeploymentSelectorRecorder *prometheus.GaugeVec
  55. Model *CostModel
  56. Cache *cache.Cache
  57. }
  58. type DataEnvelope struct {
  59. Code int `json:"code"`
  60. Status string `json:"status"`
  61. Data interface{} `json:"data"`
  62. Message string `json:"message,omitempty"`
  63. }
  64. func normalizeTimeParam(param string) (string, error) {
  65. // convert days to hours
  66. if param[len(param)-1:] == "d" {
  67. count := param[:len(param)-1]
  68. val, err := strconv.ParseInt(count, 10, 64)
  69. if err != nil {
  70. return "", err
  71. }
  72. val = val * 24
  73. param = fmt.Sprintf("%dh", val)
  74. }
  75. return param, nil
  76. }
  77. func wrapDataWithMessage(data interface{}, err error, message string) []byte {
  78. var resp []byte
  79. if err != nil {
  80. klog.V(1).Infof("Error returned to client: %s", err.Error())
  81. resp, _ = json.Marshal(&DataEnvelope{
  82. Code: http.StatusInternalServerError,
  83. Status: "error",
  84. Message: err.Error(),
  85. Data: data,
  86. })
  87. } else {
  88. resp, _ = json.Marshal(&DataEnvelope{
  89. Code: http.StatusOK,
  90. Status: "success",
  91. Data: data,
  92. Message: message,
  93. })
  94. }
  95. return resp
  96. }
  97. func wrapData(data interface{}, err error) []byte {
  98. var resp []byte
  99. if err != nil {
  100. klog.V(1).Infof("Error returned to client: %s", err.Error())
  101. resp, _ = json.Marshal(&DataEnvelope{
  102. Code: http.StatusInternalServerError,
  103. Status: "error",
  104. Message: err.Error(),
  105. Data: data,
  106. })
  107. } else {
  108. resp, _ = json.Marshal(&DataEnvelope{
  109. Code: http.StatusOK,
  110. Status: "success",
  111. Data: data,
  112. })
  113. }
  114. return resp
  115. }
  116. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  117. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  118. w.Header().Set("Content-Type", "application/json")
  119. w.Header().Set("Access-Control-Allow-Origin", "*")
  120. err := a.Cloud.DownloadPricingData()
  121. w.Write(wrapData(nil, err))
  122. }
  123. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  124. fs := strings.Split(fields, ",")
  125. fmap := make(map[string]bool)
  126. for _, f := range fs {
  127. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  128. klog.V(1).Infof("to delete: %s", fieldNameLower)
  129. fmap[fieldNameLower] = true
  130. }
  131. filteredData := make(map[string]CostData)
  132. for cname, costdata := range data {
  133. s := reflect.TypeOf(*costdata)
  134. val := reflect.ValueOf(*costdata)
  135. costdata2 := CostData{}
  136. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  137. n := s.NumField()
  138. for i := 0; i < n; i++ {
  139. field := s.Field(i)
  140. value := val.Field(i)
  141. value2 := cd2.Field(i)
  142. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  143. value2.Set(reflect.Value(value))
  144. }
  145. }
  146. filteredData[cname] = cd2.Interface().(CostData)
  147. }
  148. return filteredData
  149. }
  150. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  151. w.Header().Set("Content-Type", "application/json")
  152. w.Header().Set("Access-Control-Allow-Origin", "*")
  153. window := r.URL.Query().Get("timeWindow")
  154. offset := r.URL.Query().Get("offset")
  155. fields := r.URL.Query().Get("filterFields")
  156. namespace := r.URL.Query().Get("namespace")
  157. aggregationField := r.URL.Query().Get("aggregation")
  158. aggregationSubField := r.URL.Query().Get("aggregationSubfield")
  159. if offset != "" {
  160. offset = "offset " + offset
  161. }
  162. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  163. if aggregationField != "" {
  164. c, err := a.Cloud.GetConfig()
  165. if err != nil {
  166. w.Write(wrapData(nil, err))
  167. }
  168. discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
  169. if err != nil {
  170. w.Write(wrapData(nil, err))
  171. }
  172. discount = discount * 0.01
  173. agg := AggregateCostModel(a.Cloud, data, aggregationField, aggregationSubField, false, discount, 1.0, nil)
  174. w.Write(wrapData(agg, nil))
  175. } else {
  176. if fields != "" {
  177. filteredData := filterFields(fields, data)
  178. w.Write(wrapData(filteredData, err))
  179. } else {
  180. w.Write(wrapData(data, err))
  181. }
  182. }
  183. }
  184. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  185. w.Header().Set("Content-Type", "application/json")
  186. w.Header().Set("Access-Control-Allow-Origin", "*")
  187. window := r.URL.Query().Get("window")
  188. offset := r.URL.Query().Get("offset")
  189. if offset != "" {
  190. offset = "offset " + offset
  191. }
  192. data, err := ClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
  193. w.Write(wrapData(data, err))
  194. }
  195. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  196. w.Header().Set("Content-Type", "application/json")
  197. w.Header().Set("Access-Control-Allow-Origin", "*")
  198. start := r.URL.Query().Get("start")
  199. end := r.URL.Query().Get("end")
  200. window := r.URL.Query().Get("window")
  201. offset := r.URL.Query().Get("offset")
  202. if offset != "" {
  203. offset = "offset " + offset
  204. }
  205. data, err := ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  206. w.Write(wrapData(data, err))
  207. }
  208. // AggregateCostModel handles HTTP requests to the aggregated cost model API, which can be parametrized
  209. // by time period using window and offset, aggregation field using field and subfield (in cases like
  210. // field=label, subfield=app for grouping by label.app), and filtered by namespace.
  211. func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  212. w.Header().Set("Content-Type", "application/json")
  213. w.Header().Set("Access-Control-Allow-Origin", "*")
  214. window := r.URL.Query().Get("window")
  215. offset := r.URL.Query().Get("offset")
  216. namespace := r.URL.Query().Get("namespace")
  217. cluster := r.URL.Query().Get("cluster")
  218. field := r.URL.Query().Get("aggregation")
  219. subfield := r.URL.Query().Get("aggregationSubfield")
  220. allocateIdle := r.URL.Query().Get("allocateIdle")
  221. sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
  222. sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
  223. sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
  224. remote := r.URL.Query().Get("remote")
  225. // timeSeries == true maintains the time series dimension of the data,
  226. // which by default gets summed over the entire interval
  227. timeSeries := r.URL.Query().Get("timeSeries") == "true"
  228. // disableCache, if set to "true", tells this function to recompute and
  229. // cache the requested data
  230. disableCache := r.URL.Query().Get("disableCache") == "true"
  231. // clearCache, if set to "true", tells this function to flush the cache,
  232. // then recompute and cache the requested data
  233. clearCache := r.URL.Query().Get("clearCache") == "true"
  234. // aggregation field is required
  235. if field == "" {
  236. w.WriteHeader(http.StatusBadRequest)
  237. w.Write(wrapData(nil, fmt.Errorf("Missing aggregation field parameter")))
  238. return
  239. }
  240. // aggregation subfield is required when aggregation field is "label"
  241. if field == "label" && subfield == "" {
  242. w.WriteHeader(http.StatusBadRequest)
  243. w.Write(wrapData(nil, fmt.Errorf("Missing aggregation subfield parameter for aggregation by label")))
  244. return
  245. }
  246. // endTime defaults to the current time, unless an offset is explicity declared,
  247. // in which case it shifts endTime back by given duration
  248. endTime := time.Now()
  249. if offset != "" {
  250. o, err := time.ParseDuration(offset)
  251. if err != nil {
  252. w.Write(wrapData(nil, err))
  253. return
  254. }
  255. endTime = endTime.Add(-1 * o)
  256. }
  257. // if window is defined in terms of days, convert to hours
  258. // e.g. convert "2d" to "48h"
  259. window, err := normalizeTimeParam(window)
  260. if err != nil {
  261. w.Write(wrapData(nil, err))
  262. return
  263. }
  264. // convert time window into start and end times, formatted
  265. // as ISO datetime strings
  266. d, err := time.ParseDuration(window)
  267. if err != nil {
  268. w.Write(wrapData(nil, err))
  269. return
  270. }
  271. startTime := endTime.Add(-1 * d)
  272. layout := "2006-01-02T15:04:05.000Z"
  273. start := startTime.Format(layout)
  274. end := endTime.Format(layout)
  275. // clear cache prior to checking the cache so that a clearCache=true
  276. // request always returns a freshly computed value
  277. if clearCache {
  278. a.Cache.Flush()
  279. }
  280. aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s:%s:%t", window, offset, namespace, cluster, field, subfield, timeSeries)
  281. // check the cache for aggregated response; if cache is hit and not disabled, return response
  282. if result, found := a.Cache.Get(aggKey); found && !disableCache {
  283. w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache hit: %s", aggKey)))
  284. return
  285. }
  286. remoteAvailable := os.Getenv(remoteEnabled)
  287. remoteEnabled := false
  288. if remoteAvailable == "true" && remote != "false" {
  289. remoteEnabled = true
  290. }
  291. klog.Infof("REMOTE ENABLED: %t", remoteEnabled)
  292. // Use Thanos Client if it exists (enabled) and remote flag set
  293. var pClient prometheusClient.Client
  294. if remote != "false" && a.ThanosClient != nil {
  295. pClient = a.ThanosClient
  296. } else {
  297. pClient = a.PrometheusClient
  298. }
  299. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace, cluster, remoteEnabled)
  300. if err != nil {
  301. w.Write(wrapData(nil, err))
  302. return
  303. }
  304. c, err := a.Cloud.GetConfig()
  305. if err != nil {
  306. w.Write(wrapData(nil, err))
  307. return
  308. }
  309. discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
  310. if err != nil {
  311. w.Write(wrapData(nil, err))
  312. return
  313. }
  314. discount = discount * 0.01
  315. idleCoefficient := 1.0
  316. if allocateIdle == "true" {
  317. idleCoefficient, err = ComputeIdleCoefficient(data, a.PrometheusClient, a.Cloud, discount, fmt.Sprintf("%dh", int(d.Hours())), offset)
  318. if err != nil {
  319. w.Write(wrapData(nil, err))
  320. }
  321. }
  322. sn := []string{}
  323. sln := []string{}
  324. slv := []string{}
  325. if sharedNamespaces != "" {
  326. sn = strings.Split(sharedNamespaces, ",")
  327. }
  328. if sharedLabelNames != "" {
  329. sln = strings.Split(sharedLabelNames, ",")
  330. slv = strings.Split(sharedLabelValues, ",")
  331. if len(sln) != len(slv) || slv[0] == "" {
  332. w.Write(wrapData(nil, fmt.Errorf("Supply exacly one label value per label name")))
  333. return
  334. }
  335. }
  336. var sr *SharedResourceInfo
  337. if len(sn) > 0 || len(sln) > 0 {
  338. sr = NewSharedResourceInfo(true, sn, sln, slv)
  339. }
  340. // aggregate cost model data by given fields and cache the result for the default expiration
  341. result := AggregateCostModel(a.Cloud, data, field, subfield, timeSeries, discount, idleCoefficient, sr)
  342. a.Cache.Set(aggKey, result, cache.DefaultExpiration)
  343. w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache miss: %s", aggKey)))
  344. }
  345. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  346. w.Header().Set("Content-Type", "application/json")
  347. w.Header().Set("Access-Control-Allow-Origin", "*")
  348. start := r.URL.Query().Get("start")
  349. end := r.URL.Query().Get("end")
  350. window := r.URL.Query().Get("window")
  351. fields := r.URL.Query().Get("filterFields")
  352. namespace := r.URL.Query().Get("namespace")
  353. cluster := r.URL.Query().Get("cluster")
  354. aggregationField := r.URL.Query().Get("aggregation")
  355. aggregationSubField := r.URL.Query().Get("aggregationSubfield")
  356. remote := r.URL.Query().Get("remote")
  357. remoteAvailable := os.Getenv(remoteEnabled)
  358. remoteEnabled := false
  359. if remoteAvailable == "true" && remote != "false" {
  360. remoteEnabled = true
  361. }
  362. // Use Thanos Client if it exists (enabled) and remote flag set
  363. var pClient prometheusClient.Client
  364. if remote != "false" && a.ThanosClient != nil {
  365. pClient = a.ThanosClient
  366. } else {
  367. pClient = a.PrometheusClient
  368. }
  369. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, namespace, cluster, remoteEnabled)
  370. if err != nil {
  371. w.Write(wrapData(nil, err))
  372. }
  373. if aggregationField != "" {
  374. c, err := a.Cloud.GetConfig()
  375. if err != nil {
  376. w.Write(wrapData(nil, err))
  377. }
  378. discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
  379. if err != nil {
  380. w.Write(wrapData(nil, err))
  381. }
  382. discount = discount * 0.01
  383. agg := AggregateCostModel(a.Cloud, data, aggregationField, aggregationSubField, false, discount, 1.0, nil)
  384. w.Write(wrapData(agg, nil))
  385. } else {
  386. if fields != "" {
  387. filteredData := filterFields(fields, data)
  388. w.Write(wrapData(filteredData, err))
  389. } else {
  390. w.Write(wrapData(data, err))
  391. }
  392. }
  393. }
  394. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  395. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  396. w.Header().Set("Content-Type", "application/json")
  397. w.Header().Set("Access-Control-Allow-Origin", "*")
  398. startString := r.URL.Query().Get("start")
  399. endString := r.URL.Query().Get("end")
  400. windowString := r.URL.Query().Get("window")
  401. layout := "2006-01-02T15:04:05.000Z"
  402. var start time.Time
  403. var end time.Time
  404. var err error
  405. if windowString == "" {
  406. windowString = "1h"
  407. }
  408. if startString != "" {
  409. start, err = time.Parse(layout, startString)
  410. if err != nil {
  411. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  412. w.Write(wrapData(nil, err))
  413. }
  414. } else {
  415. window, err := time.ParseDuration(windowString)
  416. if err != nil {
  417. w.Write(wrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  418. }
  419. start = time.Now().Add(-2 * window)
  420. }
  421. if endString != "" {
  422. end, err = time.Parse(layout, endString)
  423. if err != nil {
  424. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  425. w.Write(wrapData(nil, err))
  426. }
  427. } else {
  428. end = time.Now()
  429. }
  430. remoteLayout := "2006-01-02T15:04:05Z"
  431. remoteStartStr := start.Format(remoteLayout)
  432. remoteEndStr := end.Format(remoteLayout)
  433. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  434. data, err := CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  435. w.Write(wrapData(data, err))
  436. }
  437. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  438. w.Header().Set("Content-Type", "application/json")
  439. w.Header().Set("Access-Control-Allow-Origin", "*")
  440. start := r.URL.Query().Get("start")
  441. end := r.URL.Query().Get("end")
  442. aggregator := r.URL.Query().Get("aggregator")
  443. customAggregation := r.URL.Query().Get("customAggregation")
  444. var data []*costAnalyzerCloud.OutOfClusterAllocation
  445. var err error
  446. if customAggregation != "" {
  447. data, err = a.Cloud.ExternalAllocations(start, end, customAggregation)
  448. } else {
  449. data, err = a.Cloud.ExternalAllocations(start, end, "kubernetes_"+aggregator)
  450. }
  451. w.Write(wrapData(data, err))
  452. }
  453. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  454. w.Header().Set("Content-Type", "application/json")
  455. w.Header().Set("Access-Control-Allow-Origin", "*")
  456. data, err := p.Cloud.AllNodePricing()
  457. w.Write(wrapData(data, err))
  458. }
  459. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  460. w.Header().Set("Content-Type", "application/json")
  461. w.Header().Set("Access-Control-Allow-Origin", "*")
  462. data, err := p.Cloud.GetConfig()
  463. w.Write(wrapData(data, err))
  464. }
  465. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  466. w.Header().Set("Content-Type", "application/json")
  467. w.Header().Set("Access-Control-Allow-Origin", "*")
  468. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  469. if err != nil {
  470. w.Write(wrapData(data, err))
  471. return
  472. }
  473. w.Write(wrapData(data, err))
  474. err = p.Cloud.DownloadPricingData()
  475. if err != nil {
  476. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  477. }
  478. return
  479. }
  480. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  481. w.Header().Set("Content-Type", "application/json")
  482. w.Header().Set("Access-Control-Allow-Origin", "*")
  483. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  484. if err != nil {
  485. w.Write(wrapData(data, err))
  486. return
  487. }
  488. w.Write(wrapData(data, err))
  489. return
  490. }
  491. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  492. w.Header().Set("Content-Type", "application/json")
  493. w.Header().Set("Access-Control-Allow-Origin", "*")
  494. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  495. if err != nil {
  496. w.Write(wrapData(data, err))
  497. return
  498. }
  499. w.Write(wrapData(data, err))
  500. return
  501. }
  502. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  503. w.Header().Set("Content-Type", "application/json")
  504. w.Header().Set("Access-Control-Allow-Origin", "*")
  505. data, err := p.Cloud.UpdateConfig(r.Body, "")
  506. if err != nil {
  507. w.Write(wrapData(data, err))
  508. return
  509. }
  510. w.Write(wrapData(data, err))
  511. return
  512. }
  513. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  514. w.Header().Set("Content-Type", "application/json")
  515. w.Header().Set("Access-Control-Allow-Origin", "*")
  516. data, err := p.Cloud.GetManagementPlatform()
  517. if err != nil {
  518. w.Write(wrapData(data, err))
  519. return
  520. }
  521. w.Write(wrapData(data, err))
  522. return
  523. }
  524. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  525. w.Header().Set("Content-Type", "application/json")
  526. w.Header().Set("Access-Control-Allow-Origin", "*")
  527. data, err := p.Cloud.ClusterInfo()
  528. w.Write(wrapData(data, err))
  529. }
  530. func Healthz(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  531. w.WriteHeader(200)
  532. w.Header().Set("Content-Length", "0")
  533. w.Header().Set("Content-Type", "text/plain")
  534. }
  535. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  536. w.Header().Set("Content-Type", "application/json")
  537. w.Header().Set("Access-Control-Allow-Origin", "*")
  538. w.Write(wrapData(ValidatePrometheus(p.PrometheusClient)))
  539. }
  540. func (p *Accesses) ContainerUptimes(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  541. w.Header().Set("Content-Type", "application/json")
  542. w.Header().Set("Access-Control-Allow-Origin", "*")
  543. res, err := ComputeUptimes(p.PrometheusClient)
  544. w.Write(wrapData(res, err))
  545. }
  546. func (a *Accesses) recordPrices() {
  547. go func() {
  548. containerSeen := make(map[string]bool)
  549. nodeSeen := make(map[string]bool)
  550. pvSeen := make(map[string]bool)
  551. pvcSeen := make(map[string]bool)
  552. getKeyFromLabelStrings := func(labels ...string) string {
  553. return strings.Join(labels, ",")
  554. }
  555. getLabelStringsFromKey := func(key string) []string {
  556. return strings.Split(key, ",")
  557. }
  558. for {
  559. klog.V(4).Info("Recording prices...")
  560. podlist := a.Model.Cache.GetAllPods()
  561. podStatus := make(map[string]v1.PodPhase)
  562. for _, pod := range podlist {
  563. podStatus[pod.Name] = pod.Status.Phase
  564. }
  565. // Record network pricing at global scope
  566. networkCosts, err := a.Cloud.NetworkPricing()
  567. if err != nil {
  568. klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
  569. } else {
  570. a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  571. a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  572. a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  573. }
  574. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
  575. if err != nil {
  576. klog.V(1).Info("Error in price recording: " + err.Error())
  577. // zero the for loop so the time.Sleep will still work
  578. data = map[string]*CostData{}
  579. }
  580. for _, costs := range data {
  581. nodeName := costs.NodeName
  582. node := costs.NodeData
  583. if node == nil {
  584. klog.V(4).Infof("Skipping Node \"%s\" due to missing Node Data costs", nodeName)
  585. continue
  586. }
  587. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  588. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  589. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  590. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  591. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  592. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  593. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  594. namespace := costs.Namespace
  595. podName := costs.PodName
  596. containerName := costs.Name
  597. if costs.PVCData != nil {
  598. for _, pvc := range costs.PVCData {
  599. if pvc.Volume != nil {
  600. a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
  601. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  602. pvcSeen[labelKey] = true
  603. }
  604. }
  605. }
  606. a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(cpuCost)
  607. a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName).Set(ramCost)
  608. a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(gpuCost)
  609. a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName).Set(totalCost)
  610. labelKey := getKeyFromLabelStrings(nodeName, nodeName)
  611. nodeSeen[labelKey] = true
  612. if len(costs.RAMAllocation) > 0 {
  613. a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  614. }
  615. if len(costs.CPUAllocation) > 0 {
  616. a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  617. }
  618. if len(costs.GPUReq) > 0 {
  619. // allocation here is set to the request because shared GPU usage not yet supported.
  620. a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  621. }
  622. labelKey = getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  623. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  624. containerSeen[labelKey] = true
  625. } else {
  626. containerSeen[labelKey] = false
  627. }
  628. storageClasses := a.Model.Cache.GetAllStorageClasses()
  629. storageClassMap := make(map[string]map[string]string)
  630. for _, storageClass := range storageClasses {
  631. params := storageClass.Parameters
  632. storageClassMap[storageClass.ObjectMeta.Name] = params
  633. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  634. storageClassMap["default"] = params
  635. storageClassMap[""] = params
  636. }
  637. }
  638. pvs := a.Model.Cache.GetAllPersistentVolumes()
  639. for _, pv := range pvs {
  640. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  641. if !ok {
  642. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  643. }
  644. cacPv := &costAnalyzerCloud.PV{
  645. Class: pv.Spec.StorageClassName,
  646. Region: pv.Labels[v1.LabelZoneRegion],
  647. Parameters: parameters,
  648. }
  649. GetPVCost(cacPv, pv, a.Cloud)
  650. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  651. a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
  652. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  653. pvSeen[labelKey] = true
  654. }
  655. containerUptime, _ := ComputeUptimes(a.PrometheusClient)
  656. for key, uptime := range containerUptime {
  657. container, _ := NewContainerMetricFromKey(key)
  658. a.ContainerUptimeRecorder.WithLabelValues(container.Namespace, container.PodName, container.ContainerName).Set(uptime)
  659. }
  660. }
  661. for labelString, seen := range nodeSeen {
  662. if !seen {
  663. labels := getLabelStringsFromKey(labelString)
  664. a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  665. a.CPUPriceRecorder.DeleteLabelValues(labels...)
  666. a.GPUPriceRecorder.DeleteLabelValues(labels...)
  667. a.RAMPriceRecorder.DeleteLabelValues(labels...)
  668. delete(nodeSeen, labelString)
  669. }
  670. nodeSeen[labelString] = false
  671. }
  672. for labelString, seen := range containerSeen {
  673. if !seen {
  674. labels := getLabelStringsFromKey(labelString)
  675. a.RAMAllocationRecorder.DeleteLabelValues(labels...)
  676. a.CPUAllocationRecorder.DeleteLabelValues(labels...)
  677. a.GPUAllocationRecorder.DeleteLabelValues(labels...)
  678. a.ContainerUptimeRecorder.DeleteLabelValues(labels...)
  679. delete(containerSeen, labelString)
  680. }
  681. containerSeen[labelString] = false
  682. }
  683. for labelString, seen := range pvSeen {
  684. if !seen {
  685. labels := getLabelStringsFromKey(labelString)
  686. a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  687. delete(pvSeen, labelString)
  688. }
  689. pvSeen[labelString] = false
  690. }
  691. for labelString, seen := range pvcSeen {
  692. if !seen {
  693. labels := getLabelStringsFromKey(labelString)
  694. a.PVAllocationRecorder.DeleteLabelValues(labels...)
  695. delete(pvcSeen, labelString)
  696. }
  697. pvcSeen[labelString] = false
  698. }
  699. time.Sleep(time.Minute)
  700. }
  701. }()
  702. }
  703. func init() {
  704. klog.InitFlags(nil)
  705. flag.Set("v", "3")
  706. flag.Parse()
  707. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  708. address := os.Getenv(prometheusServerEndpointEnvVar)
  709. if address == "" {
  710. klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
  711. }
  712. var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
  713. Proxy: http.ProxyFromEnvironment,
  714. DialContext: (&net.Dialer{
  715. Timeout: 120 * time.Second,
  716. KeepAlive: 120 * time.Second,
  717. }).DialContext,
  718. TLSHandshakeTimeout: 10 * time.Second,
  719. }
  720. pc := prometheusClient.Config{
  721. Address: address,
  722. RoundTripper: LongTimeoutRoundTripper,
  723. }
  724. promCli, _ := prometheusClient.NewClient(pc)
  725. api := prometheusAPI.NewAPI(promCli)
  726. _, err := api.Config(context.Background())
  727. if err != nil {
  728. klog.Fatalf("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  729. }
  730. klog.V(1).Info("Success: retrieved a prometheus config file from: " + address)
  731. _, err = ValidatePrometheus(promCli)
  732. if err != nil {
  733. klog.Fatalf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  734. }
  735. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  736. // Kubernetes API setup
  737. kc, err := rest.InClusterConfig()
  738. if err != nil {
  739. panic(err.Error())
  740. }
  741. kubeClientset, err := kubernetes.NewForConfig(kc)
  742. if err != nil {
  743. panic(err.Error())
  744. }
  745. cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
  746. cloudProvider, err := costAnalyzerCloud.NewProvider(kubeClientset, cloudProviderKey)
  747. if err != nil {
  748. panic(err.Error())
  749. }
  750. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  751. Name: "node_cpu_hourly_cost",
  752. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  753. }, []string{"instance", "node"})
  754. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  755. Name: "node_ram_hourly_cost",
  756. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  757. }, []string{"instance", "node"})
  758. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  759. Name: "node_gpu_hourly_cost",
  760. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  761. }, []string{"instance", "node"})
  762. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  763. Name: "node_total_hourly_cost",
  764. Help: "node_total_hourly_cost Total node cost per hour",
  765. }, []string{"instance", "node"})
  766. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  767. Name: "pv_hourly_cost",
  768. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  769. }, []string{"volumename", "persistentvolume"})
  770. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  771. Name: "container_memory_allocation_bytes",
  772. Help: "container_memory_allocation_bytes Bytes of RAM used",
  773. }, []string{"namespace", "pod", "container", "instance", "node"})
  774. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  775. Name: "container_cpu_allocation",
  776. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  777. }, []string{"namespace", "pod", "container", "instance", "node"})
  778. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  779. Name: "container_gpu_allocation",
  780. Help: "container_gpu_allocation GPU used",
  781. }, []string{"namespace", "pod", "container", "instance", "node"})
  782. PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  783. Name: "pod_pvc_allocation",
  784. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  785. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  786. ContainerUptimeRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  787. Name: "container_uptime_seconds",
  788. Help: "container_uptime_seconds Seconds a container has been running",
  789. }, []string{"namespace", "pod", "container"})
  790. NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  791. Name: "kubecost_network_zone_egress_cost",
  792. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  793. })
  794. NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  795. Name: "kubecost_network_region_egress_cost",
  796. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  797. })
  798. NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  799. Name: "kubecost_network_internet_egress_cost",
  800. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  801. })
  802. prometheus.MustRegister(cpuGv)
  803. prometheus.MustRegister(ramGv)
  804. prometheus.MustRegister(gpuGv)
  805. prometheus.MustRegister(totalGv)
  806. prometheus.MustRegister(pvGv)
  807. prometheus.MustRegister(RAMAllocation)
  808. prometheus.MustRegister(CPUAllocation)
  809. prometheus.MustRegister(ContainerUptimeRecorder)
  810. prometheus.MustRegister(PVAllocation)
  811. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  812. prometheus.MustRegister(ServiceCollector{
  813. KubeClientSet: kubeClientset,
  814. })
  815. prometheus.MustRegister(DeploymentCollector{
  816. KubeClientSet: kubeClientset,
  817. })
  818. // cache responses from model for a default of 2 minutes; clear expired responses every 10 minutes
  819. modelCache := cache.New(time.Minute*2, time.Minute*10)
  820. A = Accesses{
  821. PrometheusClient: promCli,
  822. KubeClientSet: kubeClientset,
  823. Cloud: cloudProvider,
  824. CPUPriceRecorder: cpuGv,
  825. RAMPriceRecorder: ramGv,
  826. GPUPriceRecorder: gpuGv,
  827. NodeTotalPriceRecorder: totalGv,
  828. RAMAllocationRecorder: RAMAllocation,
  829. CPUAllocationRecorder: CPUAllocation,
  830. GPUAllocationRecorder: GPUAllocation,
  831. PVAllocationRecorder: PVAllocation,
  832. ContainerUptimeRecorder: ContainerUptimeRecorder,
  833. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  834. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  835. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  836. PersistentVolumePriceRecorder: pvGv,
  837. Model: NewCostModel(kubeClientset),
  838. Cache: modelCache,
  839. }
  840. remoteEnabled := os.Getenv(remoteEnabled)
  841. if remoteEnabled == "true" {
  842. info, err := cloudProvider.ClusterInfo()
  843. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  844. if err != nil {
  845. klog.Infof("Error saving cluster id %s", err.Error())
  846. }
  847. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  848. if err != nil {
  849. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  850. }
  851. }
  852. // Thanos Client
  853. if os.Getenv(thanosEnabled) == "true" {
  854. thanosUrl := os.Getenv(thanosQueryUrl)
  855. if thanosUrl != "" {
  856. var thanosRT http.RoundTripper = &http.Transport{
  857. Proxy: http.ProxyFromEnvironment,
  858. DialContext: (&net.Dialer{
  859. Timeout: 120 * time.Second,
  860. KeepAlive: 120 * time.Second,
  861. }).DialContext,
  862. TLSHandshakeTimeout: 10 * time.Second,
  863. }
  864. thanosConfig := prometheusClient.Config{
  865. Address: thanosUrl,
  866. RoundTripper: thanosRT,
  867. }
  868. thanosCli, _ := prometheusClient.NewClient(thanosConfig)
  869. _, err = ValidatePrometheus(thanosCli)
  870. if err != nil {
  871. klog.Fatalf("Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
  872. } else {
  873. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
  874. A.ThanosClient = thanosCli
  875. }
  876. } else {
  877. klog.Infof("Error resolving environment variable: $%s", thanosQueryUrl)
  878. }
  879. }
  880. err = A.Cloud.DownloadPricingData()
  881. if err != nil {
  882. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  883. }
  884. A.recordPrices()
  885. Router.GET("/costDataModel", A.CostDataModel)
  886. Router.GET("/costDataModelRange", A.CostDataModelRange)
  887. Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
  888. Router.GET("/outOfClusterCosts", A.OutofClusterCosts)
  889. Router.GET("/allNodePricing", A.GetAllNodePricing)
  890. Router.GET("/healthz", Healthz)
  891. Router.GET("/getConfigs", A.GetConfigs)
  892. Router.POST("/refreshPricing", A.RefreshPricingData)
  893. Router.POST("/updateSpotInfoConfigs", A.UpdateSpotInfoConfigs)
  894. Router.POST("/updateAthenaInfoConfigs", A.UpdateAthenaInfoConfigs)
  895. Router.POST("/updateBigQueryInfoConfigs", A.UpdateBigQueryInfoConfigs)
  896. Router.POST("/updateConfigByKey", A.UpdateConfigByKey)
  897. Router.GET("/clusterCostsOverTime", A.ClusterCostsOverTime)
  898. Router.GET("/clusterCosts", A.ClusterCosts)
  899. Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
  900. Router.GET("/managementPlatform", A.ManagementPlatform)
  901. Router.GET("/clusterInfo", A.ClusterInfo)
  902. Router.GET("/containerUptimes", A.ContainerUptimes)
  903. Router.GET("/aggregatedCostModel", A.AggregateCostModel)
  904. }