router.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "math"
  8. "net"
  9. "net/http"
  10. "os"
  11. "reflect"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "k8s.io/klog"
  16. "github.com/julienschmidt/httprouter"
  17. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  18. "github.com/kubecost/cost-model/clustercache"
  19. "github.com/patrickmn/go-cache"
  20. prometheusClient "github.com/prometheus/client_golang/api"
  21. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  22. "github.com/prometheus/client_golang/prometheus"
  23. v1 "k8s.io/api/core/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/client-go/kubernetes"
  26. "k8s.io/client-go/rest"
  27. )
  28. const (
  29. prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
  30. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  31. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  32. )
  33. var (
  34. // gitCommit is set by the build system
  35. gitCommit string
  36. )
  37. var Router = httprouter.New()
  38. var A Accesses
  39. type Accesses struct {
  40. PrometheusClient prometheusClient.Client
  41. ThanosClient prometheusClient.Client
  42. KubeClientSet kubernetes.Interface
  43. Cloud costAnalyzerCloud.Provider
  44. CPUPriceRecorder *prometheus.GaugeVec
  45. RAMPriceRecorder *prometheus.GaugeVec
  46. PersistentVolumePriceRecorder *prometheus.GaugeVec
  47. GPUPriceRecorder *prometheus.GaugeVec
  48. NodeTotalPriceRecorder *prometheus.GaugeVec
  49. RAMAllocationRecorder *prometheus.GaugeVec
  50. CPUAllocationRecorder *prometheus.GaugeVec
  51. GPUAllocationRecorder *prometheus.GaugeVec
  52. PVAllocationRecorder *prometheus.GaugeVec
  53. ContainerUptimeRecorder *prometheus.GaugeVec
  54. NetworkZoneEgressRecorder prometheus.Gauge
  55. NetworkRegionEgressRecorder prometheus.Gauge
  56. NetworkInternetEgressRecorder prometheus.Gauge
  57. ServiceSelectorRecorder *prometheus.GaugeVec
  58. DeploymentSelectorRecorder *prometheus.GaugeVec
  59. Model *CostModel
  60. OutOfClusterCache *cache.Cache
  61. }
  62. type DataEnvelope struct {
  63. Code int `json:"code"`
  64. Status string `json:"status"`
  65. Data interface{} `json:"data"`
  66. Message string `json:"message,omitempty"`
  67. }
  68. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  69. type FilterFunc func(*CostData) (bool, string)
  70. // FilterCostData allows through only CostData that matches all the given filter functions
  71. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  72. result := make(map[string]*CostData)
  73. filteredEnvironments := make(map[string]int)
  74. filteredContainers := 0
  75. DataLoop:
  76. for key, datum := range data {
  77. for _, rf := range retains {
  78. if ok, _ := rf(datum); ok {
  79. result[key] = datum
  80. // if any retain function passes, the data is retained and move on
  81. continue DataLoop
  82. }
  83. }
  84. for _, ff := range filters {
  85. if ok, environment := ff(datum); !ok {
  86. if environment != "" {
  87. filteredEnvironments[environment]++
  88. }
  89. filteredContainers++
  90. // if any filter function check fails, move on to the next datum
  91. continue DataLoop
  92. }
  93. }
  94. result[key] = datum
  95. }
  96. return result, filteredContainers, filteredEnvironments
  97. }
  98. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  99. fs := strings.Split(fields, ",")
  100. fmap := make(map[string]bool)
  101. for _, f := range fs {
  102. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  103. klog.V(1).Infof("to delete: %s", fieldNameLower)
  104. fmap[fieldNameLower] = true
  105. }
  106. filteredData := make(map[string]CostData)
  107. for cname, costdata := range data {
  108. s := reflect.TypeOf(*costdata)
  109. val := reflect.ValueOf(*costdata)
  110. costdata2 := CostData{}
  111. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  112. n := s.NumField()
  113. for i := 0; i < n; i++ {
  114. field := s.Field(i)
  115. value := val.Field(i)
  116. value2 := cd2.Field(i)
  117. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  118. value2.Set(reflect.Value(value))
  119. }
  120. }
  121. filteredData[cname] = cd2.Interface().(CostData)
  122. }
  123. return filteredData
  124. }
  125. func normalizeTimeParam(param string) (string, error) {
  126. // convert days to hours
  127. if param[len(param)-1:] == "d" {
  128. count := param[:len(param)-1]
  129. val, err := strconv.ParseInt(count, 10, 64)
  130. if err != nil {
  131. return "", err
  132. }
  133. val = val * 24
  134. param = fmt.Sprintf("%dh", val)
  135. }
  136. return param, nil
  137. }
  138. // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  139. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  140. // return 0.0.
  141. func ParsePercentString(percentStr string) (float64, error) {
  142. if len(percentStr) == 0 {
  143. return 0.0, nil
  144. }
  145. if percentStr[len(percentStr)-1:] == "%" {
  146. percentStr = percentStr[:len(percentStr)-1]
  147. }
  148. discount, err := strconv.ParseFloat(percentStr, 64)
  149. if err != nil {
  150. return 0.0, err
  151. }
  152. discount *= 0.01
  153. return discount, nil
  154. }
  155. // parseDuration converts a Prometheus-style duration string into a Duration
  156. func ParseDuration(duration string) (*time.Duration, error) {
  157. unitStr := duration[len(duration)-1:]
  158. var unit time.Duration
  159. switch unitStr {
  160. case "s":
  161. unit = time.Second
  162. case "m":
  163. unit = time.Minute
  164. case "h":
  165. unit = time.Hour
  166. case "d":
  167. unit = 24.0 * time.Hour
  168. default:
  169. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  170. }
  171. amountStr := duration[:len(duration)-1]
  172. amount, err := strconv.ParseInt(amountStr, 10, 64)
  173. if err != nil {
  174. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  175. }
  176. dur := time.Duration(amount) * unit
  177. return &dur, nil
  178. }
  179. // ParseTimeRange returns a start and end time, respectively, which are converted from
  180. // a duration and offset, defined as strings with Prometheus-style syntax.
  181. func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
  182. // endTime defaults to the current time, unless an offset is explicity declared,
  183. // in which case it shifts endTime back by given duration
  184. endTime := time.Now()
  185. if offset != "" {
  186. o, err := ParseDuration(offset)
  187. if err != nil {
  188. return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
  189. }
  190. endTime = endTime.Add(-1 * *o)
  191. }
  192. // if duration is defined in terms of days, convert to hours
  193. // e.g. convert "2d" to "48h"
  194. durationNorm, err := normalizeTimeParam(duration)
  195. if err != nil {
  196. return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
  197. }
  198. // convert time duration into start and end times, formatted
  199. // as ISO datetime strings
  200. dur, err := time.ParseDuration(durationNorm)
  201. if err != nil {
  202. return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
  203. }
  204. startTime := endTime.Add(-1 * dur)
  205. return &startTime, &endTime, nil
  206. }
  207. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  208. var resp []byte
  209. if err != nil {
  210. klog.V(1).Infof("Error returned to client: %s", err.Error())
  211. resp, _ = json.Marshal(&DataEnvelope{
  212. Code: http.StatusInternalServerError,
  213. Status: "error",
  214. Message: err.Error(),
  215. Data: data,
  216. })
  217. } else {
  218. resp, _ = json.Marshal(&DataEnvelope{
  219. Code: http.StatusOK,
  220. Status: "success",
  221. Data: data,
  222. Message: message,
  223. })
  224. }
  225. return resp
  226. }
  227. func WrapData(data interface{}, err error) []byte {
  228. var resp []byte
  229. if err != nil {
  230. klog.V(1).Infof("Error returned to client: %s", err.Error())
  231. resp, _ = json.Marshal(&DataEnvelope{
  232. Code: http.StatusInternalServerError,
  233. Status: "error",
  234. Message: err.Error(),
  235. Data: data,
  236. })
  237. } else {
  238. resp, _ = json.Marshal(&DataEnvelope{
  239. Code: http.StatusOK,
  240. Status: "success",
  241. Data: data,
  242. })
  243. }
  244. return resp
  245. }
  246. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  247. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  248. w.Header().Set("Content-Type", "application/json")
  249. w.Header().Set("Access-Control-Allow-Origin", "*")
  250. err := a.Cloud.DownloadPricingData()
  251. w.Write(WrapData(nil, err))
  252. }
  253. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  254. w.Header().Set("Content-Type", "application/json")
  255. w.Header().Set("Access-Control-Allow-Origin", "*")
  256. window := r.URL.Query().Get("timeWindow")
  257. offset := r.URL.Query().Get("offset")
  258. fields := r.URL.Query().Get("filterFields")
  259. namespace := r.URL.Query().Get("namespace")
  260. if offset != "" {
  261. offset = "offset " + offset
  262. }
  263. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  264. if fields != "" {
  265. filteredData := filterFields(fields, data)
  266. w.Write(WrapData(filteredData, err))
  267. } else {
  268. w.Write(WrapData(data, err))
  269. }
  270. }
  271. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  272. w.Header().Set("Content-Type", "application/json")
  273. w.Header().Set("Access-Control-Allow-Origin", "*")
  274. window := r.URL.Query().Get("window")
  275. offset := r.URL.Query().Get("offset")
  276. data, err := ClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
  277. w.Write(WrapData(data, err))
  278. }
  279. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  280. w.Header().Set("Content-Type", "application/json")
  281. w.Header().Set("Access-Control-Allow-Origin", "*")
  282. start := r.URL.Query().Get("start")
  283. end := r.URL.Query().Get("end")
  284. window := r.URL.Query().Get("window")
  285. offset := r.URL.Query().Get("offset")
  286. data, err := ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  287. w.Write(WrapData(data, err))
  288. }
  289. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  290. w.Header().Set("Content-Type", "application/json")
  291. w.Header().Set("Access-Control-Allow-Origin", "*")
  292. start := r.URL.Query().Get("start")
  293. end := r.URL.Query().Get("end")
  294. window := r.URL.Query().Get("window")
  295. fields := r.URL.Query().Get("filterFields")
  296. namespace := r.URL.Query().Get("namespace")
  297. cluster := r.URL.Query().Get("cluster")
  298. remote := r.URL.Query().Get("remote")
  299. remoteAvailable := os.Getenv(remoteEnabled)
  300. remoteEnabled := false
  301. if remoteAvailable == "true" && remote != "false" {
  302. remoteEnabled = true
  303. }
  304. // Use Thanos Client if it exists (enabled) and remote flag set
  305. var pClient prometheusClient.Client
  306. if remote != "false" && a.ThanosClient != nil {
  307. pClient = a.ThanosClient
  308. } else {
  309. pClient = a.PrometheusClient
  310. }
  311. resolutionHours := 1.0
  312. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, resolutionHours, namespace, cluster, remoteEnabled)
  313. if err != nil {
  314. w.Write(WrapData(nil, err))
  315. }
  316. if fields != "" {
  317. filteredData := filterFields(fields, data)
  318. w.Write(WrapData(filteredData, err))
  319. } else {
  320. w.Write(WrapData(data, err))
  321. }
  322. }
  323. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  324. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  325. w.Header().Set("Content-Type", "application/json")
  326. w.Header().Set("Access-Control-Allow-Origin", "*")
  327. startString := r.URL.Query().Get("start")
  328. endString := r.URL.Query().Get("end")
  329. windowString := r.URL.Query().Get("window")
  330. var start time.Time
  331. var end time.Time
  332. var err error
  333. if windowString == "" {
  334. windowString = "1h"
  335. }
  336. if startString != "" {
  337. start, err = time.Parse(RFC3339Milli, startString)
  338. if err != nil {
  339. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  340. w.Write(WrapData(nil, err))
  341. }
  342. } else {
  343. window, err := time.ParseDuration(windowString)
  344. if err != nil {
  345. w.Write(WrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  346. }
  347. start = time.Now().Add(-2 * window)
  348. }
  349. if endString != "" {
  350. end, err = time.Parse(RFC3339Milli, endString)
  351. if err != nil {
  352. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  353. w.Write(WrapData(nil, err))
  354. }
  355. } else {
  356. end = time.Now()
  357. }
  358. remoteLayout := "2006-01-02T15:04:05Z"
  359. remoteStartStr := start.Format(remoteLayout)
  360. remoteEndStr := end.Format(remoteLayout)
  361. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  362. data, err := CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  363. w.Write(WrapData(data, err))
  364. }
  365. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  366. w.Header().Set("Content-Type", "application/json")
  367. w.Header().Set("Access-Control-Allow-Origin", "*")
  368. start := r.URL.Query().Get("start")
  369. end := r.URL.Query().Get("end")
  370. aggregator := r.URL.Query().Get("aggregator")
  371. customAggregation := r.URL.Query().Get("customAggregation")
  372. filterType := r.URL.Query().Get("filterType")
  373. filterValue := r.URL.Query().Get("filterValue")
  374. var data []*costAnalyzerCloud.OutOfClusterAllocation
  375. var err error
  376. if customAggregation != "" {
  377. data, err = a.Cloud.ExternalAllocations(start, end, customAggregation, filterType, filterValue)
  378. } else {
  379. data, err = a.Cloud.ExternalAllocations(start, end, "kubernetes_"+aggregator, "kubernetes_"+filterType, filterValue)
  380. }
  381. w.Write(WrapData(data, err))
  382. }
  383. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  384. w.Header().Set("Content-Type", "application/json")
  385. w.Header().Set("Access-Control-Allow-Origin", "*")
  386. // start date for which to query costs, inclusive; format YYYY-MM-DD
  387. start := r.URL.Query().Get("start")
  388. // end date for which to query costs, inclusive; format YYYY-MM-DD
  389. end := r.URL.Query().Get("end")
  390. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  391. kubernetesAggregation := r.URL.Query().Get("aggregator")
  392. // customAggregation allows full customization of aggregator w/o prepending
  393. customAggregation := r.URL.Query().Get("customAggregation")
  394. // disableCache, if set to "true", tells this function to recompute and
  395. // cache the requested data
  396. disableCache := r.URL.Query().Get("disableCache") == "true"
  397. // clearCache, if set to "true", tells this function to flush the cache,
  398. // then recompute and cache the requested data
  399. clearCache := r.URL.Query().Get("clearCache") == "true"
  400. filterType := r.URL.Query().Get("filterType")
  401. filterValue := r.URL.Query().Get("filterValue")
  402. aggregation := "kubernetes_" + kubernetesAggregation
  403. filterType = "kubernetes_" + filterType
  404. if customAggregation != "" {
  405. aggregation = customAggregation
  406. }
  407. // clear cache prior to checking the cache so that a clearCache=true
  408. // request always returns a freshly computed value
  409. if clearCache {
  410. a.OutOfClusterCache.Flush()
  411. }
  412. // attempt to retrieve cost data from cache
  413. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregation, filterType, filterValue)
  414. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  415. if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
  416. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  417. return
  418. }
  419. klog.Errorf("caching error: failed to type cast data: %s", key)
  420. }
  421. data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filterType, filterValue)
  422. if err == nil {
  423. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  424. }
  425. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  426. }
  427. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  428. w.Header().Set("Content-Type", "application/json")
  429. w.Header().Set("Access-Control-Allow-Origin", "*")
  430. data, err := p.Cloud.AllNodePricing()
  431. w.Write(WrapData(data, err))
  432. }
  433. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  434. w.Header().Set("Content-Type", "application/json")
  435. w.Header().Set("Access-Control-Allow-Origin", "*")
  436. data, err := p.Cloud.GetConfig()
  437. w.Write(WrapData(data, err))
  438. }
  439. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  440. w.Header().Set("Content-Type", "application/json")
  441. w.Header().Set("Access-Control-Allow-Origin", "*")
  442. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  443. if err != nil {
  444. w.Write(WrapData(data, err))
  445. return
  446. }
  447. w.Write(WrapData(data, err))
  448. err = p.Cloud.DownloadPricingData()
  449. if err != nil {
  450. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  451. }
  452. return
  453. }
  454. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  455. w.Header().Set("Content-Type", "application/json")
  456. w.Header().Set("Access-Control-Allow-Origin", "*")
  457. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  458. if err != nil {
  459. w.Write(WrapData(data, err))
  460. return
  461. }
  462. w.Write(WrapData(data, err))
  463. return
  464. }
  465. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  466. w.Header().Set("Content-Type", "application/json")
  467. w.Header().Set("Access-Control-Allow-Origin", "*")
  468. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  469. if err != nil {
  470. w.Write(WrapData(data, err))
  471. return
  472. }
  473. w.Write(WrapData(data, err))
  474. return
  475. }
  476. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  477. w.Header().Set("Content-Type", "application/json")
  478. w.Header().Set("Access-Control-Allow-Origin", "*")
  479. data, err := p.Cloud.UpdateConfig(r.Body, "")
  480. if err != nil {
  481. w.Write(WrapData(data, err))
  482. return
  483. }
  484. w.Write(WrapData(data, err))
  485. return
  486. }
  487. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  488. w.Header().Set("Content-Type", "application/json")
  489. w.Header().Set("Access-Control-Allow-Origin", "*")
  490. data, err := p.Cloud.GetManagementPlatform()
  491. if err != nil {
  492. w.Write(WrapData(data, err))
  493. return
  494. }
  495. w.Write(WrapData(data, err))
  496. return
  497. }
  498. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  499. w.Header().Set("Content-Type", "application/json")
  500. w.Header().Set("Access-Control-Allow-Origin", "*")
  501. data, err := p.Cloud.ClusterInfo()
  502. w.Write(WrapData(data, err))
  503. }
  504. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  505. w.Header().Set("Content-Type", "application/json")
  506. w.Header().Set("Access-Control-Allow-Origin", "*")
  507. w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
  508. }
  509. func (p *Accesses) ContainerUptimes(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  510. w.Header().Set("Content-Type", "application/json")
  511. w.Header().Set("Access-Control-Allow-Origin", "*")
  512. res, err := ComputeUptimes(p.PrometheusClient)
  513. w.Write(WrapData(res, err))
  514. }
  515. func (a *Accesses) recordPrices() {
  516. go func() {
  517. containerSeen := make(map[string]bool)
  518. nodeSeen := make(map[string]bool)
  519. pvSeen := make(map[string]bool)
  520. pvcSeen := make(map[string]bool)
  521. getKeyFromLabelStrings := func(labels ...string) string {
  522. return strings.Join(labels, ",")
  523. }
  524. getLabelStringsFromKey := func(key string) []string {
  525. return strings.Split(key, ",")
  526. }
  527. for {
  528. klog.V(4).Info("Recording prices...")
  529. podlist := a.Model.Cache.GetAllPods()
  530. podStatus := make(map[string]v1.PodPhase)
  531. for _, pod := range podlist {
  532. podStatus[pod.Name] = pod.Status.Phase
  533. }
  534. cfg, _ := a.Cloud.GetConfig()
  535. // Record network pricing at global scope
  536. networkCosts, err := a.Cloud.NetworkPricing()
  537. if err != nil {
  538. klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
  539. } else {
  540. a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  541. a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  542. a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  543. }
  544. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
  545. if err != nil {
  546. klog.V(1).Info("Error in price recording: " + err.Error())
  547. // zero the for loop so the time.Sleep will still work
  548. data = map[string]*CostData{}
  549. }
  550. nodes, err := a.Model.GetNodeCost(a.Cloud)
  551. for nodeName, node := range nodes {
  552. // Emit costs, guarding against NaN inputs for custom pricing.
  553. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  554. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  555. cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
  556. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  557. cpuCost = 0
  558. }
  559. }
  560. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  561. if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
  562. cpu = 1 // Assume 1 CPU
  563. }
  564. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  565. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  566. ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
  567. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  568. ramCost = 0
  569. }
  570. }
  571. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  572. if math.IsNaN(ram) || math.IsInf(ram, 0) {
  573. ram = 0
  574. }
  575. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  576. if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
  577. gpu = 0
  578. }
  579. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  580. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  581. gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
  582. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  583. gpuCost = 0
  584. }
  585. }
  586. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  587. a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(cpuCost)
  588. a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName).Set(ramCost)
  589. a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(gpuCost)
  590. a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName).Set(totalCost)
  591. labelKey := getKeyFromLabelStrings(nodeName, nodeName)
  592. nodeSeen[labelKey] = true
  593. }
  594. for _, costs := range data {
  595. nodeName := costs.NodeName
  596. namespace := costs.Namespace
  597. podName := costs.PodName
  598. containerName := costs.Name
  599. if costs.PVCData != nil {
  600. for _, pvc := range costs.PVCData {
  601. if pvc.Volume != nil {
  602. a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
  603. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  604. pvcSeen[labelKey] = true
  605. }
  606. }
  607. }
  608. if len(costs.RAMAllocation) > 0 {
  609. a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  610. }
  611. if len(costs.CPUAllocation) > 0 {
  612. a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  613. }
  614. if len(costs.GPUReq) > 0 {
  615. // allocation here is set to the request because shared GPU usage not yet supported.
  616. a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  617. }
  618. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  619. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  620. containerSeen[labelKey] = true
  621. } else {
  622. containerSeen[labelKey] = false
  623. }
  624. storageClasses := a.Model.Cache.GetAllStorageClasses()
  625. storageClassMap := make(map[string]map[string]string)
  626. for _, storageClass := range storageClasses {
  627. params := storageClass.Parameters
  628. storageClassMap[storageClass.ObjectMeta.Name] = params
  629. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  630. storageClassMap["default"] = params
  631. storageClassMap[""] = params
  632. }
  633. }
  634. pvs := a.Model.Cache.GetAllPersistentVolumes()
  635. for _, pv := range pvs {
  636. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  637. if !ok {
  638. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  639. }
  640. cacPv := &costAnalyzerCloud.PV{
  641. Class: pv.Spec.StorageClassName,
  642. Region: pv.Labels[v1.LabelZoneRegion],
  643. Parameters: parameters,
  644. }
  645. GetPVCost(cacPv, pv, a.Cloud)
  646. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  647. a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
  648. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  649. pvSeen[labelKey] = true
  650. }
  651. containerUptime, _ := ComputeUptimes(a.PrometheusClient)
  652. for key, uptime := range containerUptime {
  653. container, _ := NewContainerMetricFromKey(key)
  654. a.ContainerUptimeRecorder.WithLabelValues(container.Namespace, container.PodName, container.ContainerName).Set(uptime)
  655. }
  656. }
  657. for labelString, seen := range nodeSeen {
  658. if !seen {
  659. labels := getLabelStringsFromKey(labelString)
  660. a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  661. a.CPUPriceRecorder.DeleteLabelValues(labels...)
  662. a.GPUPriceRecorder.DeleteLabelValues(labels...)
  663. a.RAMPriceRecorder.DeleteLabelValues(labels...)
  664. delete(nodeSeen, labelString)
  665. }
  666. nodeSeen[labelString] = false
  667. }
  668. for labelString, seen := range containerSeen {
  669. if !seen {
  670. labels := getLabelStringsFromKey(labelString)
  671. a.RAMAllocationRecorder.DeleteLabelValues(labels...)
  672. a.CPUAllocationRecorder.DeleteLabelValues(labels...)
  673. a.GPUAllocationRecorder.DeleteLabelValues(labels...)
  674. a.ContainerUptimeRecorder.DeleteLabelValues(labels...)
  675. delete(containerSeen, labelString)
  676. }
  677. containerSeen[labelString] = false
  678. }
  679. for labelString, seen := range pvSeen {
  680. if !seen {
  681. labels := getLabelStringsFromKey(labelString)
  682. a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  683. delete(pvSeen, labelString)
  684. }
  685. pvSeen[labelString] = false
  686. }
  687. for labelString, seen := range pvcSeen {
  688. if !seen {
  689. labels := getLabelStringsFromKey(labelString)
  690. a.PVAllocationRecorder.DeleteLabelValues(labels...)
  691. delete(pvcSeen, labelString)
  692. }
  693. pvcSeen[labelString] = false
  694. }
  695. time.Sleep(time.Minute)
  696. }
  697. }()
  698. }
  699. func Initialize() {
  700. klog.InitFlags(nil)
  701. flag.Set("v", "3")
  702. flag.Parse()
  703. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  704. address := os.Getenv(prometheusServerEndpointEnvVar)
  705. if address == "" {
  706. klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
  707. }
  708. var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
  709. Proxy: http.ProxyFromEnvironment,
  710. DialContext: (&net.Dialer{
  711. Timeout: 120 * time.Second,
  712. KeepAlive: 120 * time.Second,
  713. }).DialContext,
  714. TLSHandshakeTimeout: 10 * time.Second,
  715. }
  716. pc := prometheusClient.Config{
  717. Address: address,
  718. RoundTripper: LongTimeoutRoundTripper,
  719. }
  720. promCli, _ := prometheusClient.NewClient(pc)
  721. api := prometheusAPI.NewAPI(promCli)
  722. _, err := api.Config(context.Background())
  723. if err != nil {
  724. klog.Fatalf("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  725. }
  726. klog.V(1).Info("Success: retrieved a prometheus config file from: " + address)
  727. _, err = ValidatePrometheus(promCli, false)
  728. if err != nil {
  729. klog.Fatalf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  730. }
  731. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  732. // Kubernetes API setup
  733. kc, err := rest.InClusterConfig()
  734. if err != nil {
  735. panic(err.Error())
  736. }
  737. kubeClientset, err := kubernetes.NewForConfig(kc)
  738. if err != nil {
  739. panic(err.Error())
  740. }
  741. // Create Kubernetes Cluster Cache + Watchers
  742. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  743. k8sCache.Run()
  744. cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
  745. cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
  746. if err != nil {
  747. panic(err.Error())
  748. }
  749. watchConfigFunc := func(c interface{}) {
  750. conf := c.(*v1.ConfigMap)
  751. if conf.GetName() == "pricing-configs" {
  752. _, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
  753. if err != nil {
  754. klog.Infof("ERROR UPDATING CONFIG: %s", err.Error())
  755. }
  756. }
  757. }
  758. kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
  759. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  760. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
  761. if err != nil {
  762. klog.Infof("No configs found at installtime, using existing configs: %s", err.Error())
  763. } else {
  764. watchConfigFunc(configs)
  765. }
  766. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  767. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  768. Name: "node_cpu_hourly_cost",
  769. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  770. }, []string{"instance", "node"})
  771. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  772. Name: "node_ram_hourly_cost",
  773. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  774. }, []string{"instance", "node"})
  775. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  776. Name: "node_gpu_hourly_cost",
  777. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  778. }, []string{"instance", "node"})
  779. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  780. Name: "node_total_hourly_cost",
  781. Help: "node_total_hourly_cost Total node cost per hour",
  782. }, []string{"instance", "node"})
  783. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  784. Name: "pv_hourly_cost",
  785. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  786. }, []string{"volumename", "persistentvolume"})
  787. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  788. Name: "container_memory_allocation_bytes",
  789. Help: "container_memory_allocation_bytes Bytes of RAM used",
  790. }, []string{"namespace", "pod", "container", "instance", "node"})
  791. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  792. Name: "container_cpu_allocation",
  793. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  794. }, []string{"namespace", "pod", "container", "instance", "node"})
  795. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  796. Name: "container_gpu_allocation",
  797. Help: "container_gpu_allocation GPU used",
  798. }, []string{"namespace", "pod", "container", "instance", "node"})
  799. PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  800. Name: "pod_pvc_allocation",
  801. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  802. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  803. ContainerUptimeRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  804. Name: "container_uptime_seconds",
  805. Help: "container_uptime_seconds Seconds a container has been running",
  806. }, []string{"namespace", "pod", "container"})
  807. NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  808. Name: "kubecost_network_zone_egress_cost",
  809. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  810. })
  811. NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  812. Name: "kubecost_network_region_egress_cost",
  813. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  814. })
  815. NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  816. Name: "kubecost_network_internet_egress_cost",
  817. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  818. })
  819. prometheus.MustRegister(cpuGv)
  820. prometheus.MustRegister(ramGv)
  821. prometheus.MustRegister(gpuGv)
  822. prometheus.MustRegister(totalGv)
  823. prometheus.MustRegister(pvGv)
  824. prometheus.MustRegister(RAMAllocation)
  825. prometheus.MustRegister(CPUAllocation)
  826. prometheus.MustRegister(ContainerUptimeRecorder)
  827. prometheus.MustRegister(PVAllocation)
  828. prometheus.MustRegister(GPUAllocation)
  829. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  830. prometheus.MustRegister(ServiceCollector{
  831. KubeClientSet: kubeClientset,
  832. })
  833. prometheus.MustRegister(DeploymentCollector{
  834. KubeClientSet: kubeClientset,
  835. })
  836. prometheus.MustRegister(StatefulsetCollector{
  837. KubeClientSet: kubeClientset,
  838. })
  839. // cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
  840. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  841. A = Accesses{
  842. PrometheusClient: promCli,
  843. KubeClientSet: kubeClientset,
  844. Cloud: cloudProvider,
  845. CPUPriceRecorder: cpuGv,
  846. RAMPriceRecorder: ramGv,
  847. GPUPriceRecorder: gpuGv,
  848. NodeTotalPriceRecorder: totalGv,
  849. RAMAllocationRecorder: RAMAllocation,
  850. CPUAllocationRecorder: CPUAllocation,
  851. GPUAllocationRecorder: GPUAllocation,
  852. PVAllocationRecorder: PVAllocation,
  853. ContainerUptimeRecorder: ContainerUptimeRecorder,
  854. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  855. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  856. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  857. PersistentVolumePriceRecorder: pvGv,
  858. Model: NewCostModel(k8sCache),
  859. OutOfClusterCache: outOfClusterCache,
  860. }
  861. remoteEnabled := os.Getenv(remoteEnabled)
  862. if remoteEnabled == "true" {
  863. info, err := cloudProvider.ClusterInfo()
  864. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  865. if err != nil {
  866. klog.Infof("Error saving cluster id %s", err.Error())
  867. }
  868. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  869. if err != nil {
  870. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  871. }
  872. }
  873. // Thanos Client
  874. if os.Getenv(thanosEnabled) == "true" {
  875. thanosUrl := os.Getenv(thanosQueryUrl)
  876. if thanosUrl != "" {
  877. var thanosRT http.RoundTripper = &http.Transport{
  878. Proxy: http.ProxyFromEnvironment,
  879. DialContext: (&net.Dialer{
  880. Timeout: 120 * time.Second,
  881. KeepAlive: 120 * time.Second,
  882. }).DialContext,
  883. TLSHandshakeTimeout: 10 * time.Second,
  884. }
  885. thanosConfig := prometheusClient.Config{
  886. Address: thanosUrl,
  887. RoundTripper: thanosRT,
  888. }
  889. thanosCli, _ := prometheusClient.NewClient(thanosConfig)
  890. _, err = ValidatePrometheus(thanosCli, true)
  891. if err != nil {
  892. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
  893. A.ThanosClient = thanosCli
  894. } else {
  895. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
  896. A.ThanosClient = thanosCli
  897. }
  898. } else {
  899. klog.Infof("Error resolving environment variable: $%s", thanosQueryUrl)
  900. }
  901. }
  902. err = A.Cloud.DownloadPricingData()
  903. if err != nil {
  904. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  905. }
  906. A.recordPrices()
  907. Router.GET("/costDataModel", A.CostDataModel)
  908. Router.GET("/costDataModelRange", A.CostDataModelRange)
  909. Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
  910. Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
  911. Router.GET("/allNodePricing", A.GetAllNodePricing)
  912. Router.POST("/refreshPricing", A.RefreshPricingData)
  913. Router.GET("/clusterCostsOverTime", A.ClusterCostsOverTime)
  914. Router.GET("/clusterCosts", A.ClusterCosts)
  915. Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
  916. Router.GET("/managementPlatform", A.ManagementPlatform)
  917. Router.GET("/clusterInfo", A.ClusterInfo)
  918. Router.GET("/containerUptimes", A.ContainerUptimes)
  919. }