router.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "os"
  10. "reflect"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "k8s.io/klog"
  15. "github.com/julienschmidt/httprouter"
  16. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  17. "github.com/kubecost/cost-model/clustercache"
  18. "github.com/patrickmn/go-cache"
  19. prometheusClient "github.com/prometheus/client_golang/api"
  20. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  21. "github.com/prometheus/client_golang/prometheus"
  22. v1 "k8s.io/api/core/v1"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/client-go/kubernetes"
  25. "k8s.io/client-go/rest"
  26. )
  27. const (
  28. prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
  29. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  30. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  31. )
  32. var (
  33. // gitCommit is set by the build system
  34. gitCommit string
  35. )
  36. var Router = httprouter.New()
  37. var A Accesses
  38. type Accesses struct {
  39. PrometheusClient prometheusClient.Client
  40. ThanosClient prometheusClient.Client
  41. KubeClientSet kubernetes.Interface
  42. Cloud costAnalyzerCloud.Provider
  43. CPUPriceRecorder *prometheus.GaugeVec
  44. RAMPriceRecorder *prometheus.GaugeVec
  45. PersistentVolumePriceRecorder *prometheus.GaugeVec
  46. GPUPriceRecorder *prometheus.GaugeVec
  47. NodeTotalPriceRecorder *prometheus.GaugeVec
  48. RAMAllocationRecorder *prometheus.GaugeVec
  49. CPUAllocationRecorder *prometheus.GaugeVec
  50. GPUAllocationRecorder *prometheus.GaugeVec
  51. PVAllocationRecorder *prometheus.GaugeVec
  52. ContainerUptimeRecorder *prometheus.GaugeVec
  53. NetworkZoneEgressRecorder prometheus.Gauge
  54. NetworkRegionEgressRecorder prometheus.Gauge
  55. NetworkInternetEgressRecorder prometheus.Gauge
  56. ServiceSelectorRecorder *prometheus.GaugeVec
  57. DeploymentSelectorRecorder *prometheus.GaugeVec
  58. Model *CostModel
  59. OutOfClusterCache *cache.Cache
  60. }
  61. type DataEnvelope struct {
  62. Code int `json:"code"`
  63. Status string `json:"status"`
  64. Data interface{} `json:"data"`
  65. Message string `json:"message,omitempty"`
  66. }
  67. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  68. type FilterFunc func(*CostData) (bool, string)
  69. // FilterCostData allows through only CostData that matches all the given filter functions
  70. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  71. result := make(map[string]*CostData)
  72. filteredEnvironments := make(map[string]int)
  73. filteredContainers := 0
  74. DataLoop:
  75. for key, datum := range data {
  76. for _, rf := range retains {
  77. if ok, _ := rf(datum); ok {
  78. result[key] = datum
  79. // if any retain function passes, the data is retained and move on
  80. continue DataLoop
  81. }
  82. }
  83. for _, ff := range filters {
  84. if ok, environment := ff(datum); !ok {
  85. if environment != "" {
  86. filteredEnvironments[environment]++
  87. }
  88. filteredContainers++
  89. // if any filter function check fails, move on to the next datum
  90. continue DataLoop
  91. }
  92. }
  93. result[key] = datum
  94. }
  95. return result, filteredContainers, filteredEnvironments
  96. }
  97. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  98. fs := strings.Split(fields, ",")
  99. fmap := make(map[string]bool)
  100. for _, f := range fs {
  101. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  102. klog.V(1).Infof("to delete: %s", fieldNameLower)
  103. fmap[fieldNameLower] = true
  104. }
  105. filteredData := make(map[string]CostData)
  106. for cname, costdata := range data {
  107. s := reflect.TypeOf(*costdata)
  108. val := reflect.ValueOf(*costdata)
  109. costdata2 := CostData{}
  110. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  111. n := s.NumField()
  112. for i := 0; i < n; i++ {
  113. field := s.Field(i)
  114. value := val.Field(i)
  115. value2 := cd2.Field(i)
  116. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  117. value2.Set(reflect.Value(value))
  118. }
  119. }
  120. filteredData[cname] = cd2.Interface().(CostData)
  121. }
  122. return filteredData
  123. }
  124. func normalizeTimeParam(param string) (string, error) {
  125. // convert days to hours
  126. if param[len(param)-1:] == "d" {
  127. count := param[:len(param)-1]
  128. val, err := strconv.ParseInt(count, 10, 64)
  129. if err != nil {
  130. return "", err
  131. }
  132. val = val * 24
  133. param = fmt.Sprintf("%dh", val)
  134. }
  135. return param, nil
  136. }
  137. // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  138. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  139. // return 0.0.
  140. func ParsePercentString(percentStr string) (float64, error) {
  141. if len(percentStr) == 0 {
  142. return 0.0, nil
  143. }
  144. if percentStr[len(percentStr)-1:] == "%" {
  145. percentStr = percentStr[:len(percentStr)-1]
  146. }
  147. discount, err := strconv.ParseFloat(percentStr, 64)
  148. if err != nil {
  149. return 0.0, err
  150. }
  151. discount *= 0.01
  152. return discount, nil
  153. }
  154. // parseDuration converts a Prometheus-style duration string into a Duration
  155. func ParseDuration(duration string) (*time.Duration, error) {
  156. unitStr := duration[len(duration)-1:]
  157. var unit time.Duration
  158. switch unitStr {
  159. case "s":
  160. unit = time.Second
  161. case "m":
  162. unit = time.Minute
  163. case "h":
  164. unit = time.Hour
  165. case "d":
  166. unit = 24.0 * time.Hour
  167. default:
  168. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  169. }
  170. amountStr := duration[:len(duration)-1]
  171. amount, err := strconv.ParseInt(amountStr, 10, 64)
  172. if err != nil {
  173. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  174. }
  175. dur := time.Duration(amount) * unit
  176. return &dur, nil
  177. }
  178. // ParseTimeRange returns a start and end time, respectively, which are converted from
  179. // a duration and offset, defined as strings with Prometheus-style syntax.
  180. func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
  181. // endTime defaults to the current time, unless an offset is explicity declared,
  182. // in which case it shifts endTime back by given duration
  183. endTime := time.Now()
  184. if offset != "" {
  185. o, err := ParseDuration(offset)
  186. if err != nil {
  187. return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
  188. }
  189. endTime = endTime.Add(-1 * *o)
  190. }
  191. // if duration is defined in terms of days, convert to hours
  192. // e.g. convert "2d" to "48h"
  193. durationNorm, err := normalizeTimeParam(duration)
  194. if err != nil {
  195. return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
  196. }
  197. // convert time duration into start and end times, formatted
  198. // as ISO datetime strings
  199. dur, err := time.ParseDuration(durationNorm)
  200. if err != nil {
  201. return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
  202. }
  203. startTime := endTime.Add(-1 * dur)
  204. return &startTime, &endTime, nil
  205. }
  206. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  207. var resp []byte
  208. if err != nil {
  209. klog.V(1).Infof("Error returned to client: %s", err.Error())
  210. resp, _ = json.Marshal(&DataEnvelope{
  211. Code: http.StatusInternalServerError,
  212. Status: "error",
  213. Message: err.Error(),
  214. Data: data,
  215. })
  216. } else {
  217. resp, _ = json.Marshal(&DataEnvelope{
  218. Code: http.StatusOK,
  219. Status: "success",
  220. Data: data,
  221. Message: message,
  222. })
  223. }
  224. return resp
  225. }
  226. func WrapData(data interface{}, err error) []byte {
  227. var resp []byte
  228. if err != nil {
  229. klog.V(1).Infof("Error returned to client: %s", err.Error())
  230. resp, _ = json.Marshal(&DataEnvelope{
  231. Code: http.StatusInternalServerError,
  232. Status: "error",
  233. Message: err.Error(),
  234. Data: data,
  235. })
  236. } else {
  237. resp, _ = json.Marshal(&DataEnvelope{
  238. Code: http.StatusOK,
  239. Status: "success",
  240. Data: data,
  241. })
  242. }
  243. return resp
  244. }
  245. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  246. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  247. w.Header().Set("Content-Type", "application/json")
  248. w.Header().Set("Access-Control-Allow-Origin", "*")
  249. err := a.Cloud.DownloadPricingData()
  250. w.Write(WrapData(nil, err))
  251. }
  252. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  253. w.Header().Set("Content-Type", "application/json")
  254. w.Header().Set("Access-Control-Allow-Origin", "*")
  255. window := r.URL.Query().Get("timeWindow")
  256. offset := r.URL.Query().Get("offset")
  257. fields := r.URL.Query().Get("filterFields")
  258. namespace := r.URL.Query().Get("namespace")
  259. if offset != "" {
  260. offset = "offset " + offset
  261. }
  262. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  263. if fields != "" {
  264. filteredData := filterFields(fields, data)
  265. w.Write(WrapData(filteredData, err))
  266. } else {
  267. w.Write(WrapData(data, err))
  268. }
  269. }
  270. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  271. w.Header().Set("Content-Type", "application/json")
  272. w.Header().Set("Access-Control-Allow-Origin", "*")
  273. window := r.URL.Query().Get("window")
  274. offset := r.URL.Query().Get("offset")
  275. data, err := ClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
  276. w.Write(WrapData(data, err))
  277. }
  278. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  279. w.Header().Set("Content-Type", "application/json")
  280. w.Header().Set("Access-Control-Allow-Origin", "*")
  281. start := r.URL.Query().Get("start")
  282. end := r.URL.Query().Get("end")
  283. window := r.URL.Query().Get("window")
  284. offset := r.URL.Query().Get("offset")
  285. data, err := ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  286. w.Write(WrapData(data, err))
  287. }
  288. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  289. w.Header().Set("Content-Type", "application/json")
  290. w.Header().Set("Access-Control-Allow-Origin", "*")
  291. start := r.URL.Query().Get("start")
  292. end := r.URL.Query().Get("end")
  293. window := r.URL.Query().Get("window")
  294. fields := r.URL.Query().Get("filterFields")
  295. namespace := r.URL.Query().Get("namespace")
  296. cluster := r.URL.Query().Get("cluster")
  297. remote := r.URL.Query().Get("remote")
  298. remoteAvailable := os.Getenv(remoteEnabled)
  299. remoteEnabled := false
  300. if remoteAvailable == "true" && remote != "false" {
  301. remoteEnabled = true
  302. }
  303. // Use Thanos Client if it exists (enabled) and remote flag set
  304. var pClient prometheusClient.Client
  305. if remote != "false" && a.ThanosClient != nil {
  306. pClient = a.ThanosClient
  307. } else {
  308. pClient = a.PrometheusClient
  309. }
  310. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, namespace, cluster, remoteEnabled)
  311. if err != nil {
  312. w.Write(WrapData(nil, err))
  313. }
  314. if fields != "" {
  315. filteredData := filterFields(fields, data)
  316. w.Write(WrapData(filteredData, err))
  317. } else {
  318. w.Write(WrapData(data, err))
  319. }
  320. }
  321. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  322. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  323. w.Header().Set("Content-Type", "application/json")
  324. w.Header().Set("Access-Control-Allow-Origin", "*")
  325. startString := r.URL.Query().Get("start")
  326. endString := r.URL.Query().Get("end")
  327. windowString := r.URL.Query().Get("window")
  328. var start time.Time
  329. var end time.Time
  330. var err error
  331. if windowString == "" {
  332. windowString = "1h"
  333. }
  334. if startString != "" {
  335. start, err = time.Parse(RFC3339Milli, startString)
  336. if err != nil {
  337. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  338. w.Write(WrapData(nil, err))
  339. }
  340. } else {
  341. window, err := time.ParseDuration(windowString)
  342. if err != nil {
  343. w.Write(WrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  344. }
  345. start = time.Now().Add(-2 * window)
  346. }
  347. if endString != "" {
  348. end, err = time.Parse(RFC3339Milli, endString)
  349. if err != nil {
  350. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  351. w.Write(WrapData(nil, err))
  352. }
  353. } else {
  354. end = time.Now()
  355. }
  356. remoteLayout := "2006-01-02T15:04:05Z"
  357. remoteStartStr := start.Format(remoteLayout)
  358. remoteEndStr := end.Format(remoteLayout)
  359. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  360. data, err := CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  361. w.Write(WrapData(data, err))
  362. }
  363. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  364. w.Header().Set("Content-Type", "application/json")
  365. w.Header().Set("Access-Control-Allow-Origin", "*")
  366. start := r.URL.Query().Get("start")
  367. end := r.URL.Query().Get("end")
  368. aggregator := r.URL.Query().Get("aggregator")
  369. customAggregation := r.URL.Query().Get("customAggregation")
  370. filterType := r.URL.Query().Get("filterType")
  371. filterValue := r.URL.Query().Get("filterValue")
  372. var data []*costAnalyzerCloud.OutOfClusterAllocation
  373. var err error
  374. if customAggregation != "" {
  375. data, err = a.Cloud.ExternalAllocations(start, end, customAggregation, filterType, filterValue)
  376. } else {
  377. data, err = a.Cloud.ExternalAllocations(start, end, "kubernetes_"+aggregator, "kubernetes_"+filterType, filterValue)
  378. }
  379. w.Write(WrapData(data, err))
  380. }
  381. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  382. w.Header().Set("Content-Type", "application/json")
  383. w.Header().Set("Access-Control-Allow-Origin", "*")
  384. // start date for which to query costs, inclusive; format YYYY-MM-DD
  385. start := r.URL.Query().Get("start")
  386. // end date for which to query costs, inclusive; format YYYY-MM-DD
  387. end := r.URL.Query().Get("end")
  388. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  389. kubernetesAggregation := r.URL.Query().Get("aggregator")
  390. // customAggregation allows full customization of aggregator w/o prepending
  391. customAggregation := r.URL.Query().Get("customAggregation")
  392. // disableCache, if set to "true", tells this function to recompute and
  393. // cache the requested data
  394. disableCache := r.URL.Query().Get("disableCache") == "true"
  395. // clearCache, if set to "true", tells this function to flush the cache,
  396. // then recompute and cache the requested data
  397. clearCache := r.URL.Query().Get("clearCache") == "true"
  398. filterType := r.URL.Query().Get("filterType")
  399. filterValue := r.URL.Query().Get("filterValue")
  400. aggregation := "kubernetes_" + kubernetesAggregation
  401. filterType = "kubernetes_" + filterType
  402. if customAggregation != "" {
  403. aggregation = customAggregation
  404. }
  405. // clear cache prior to checking the cache so that a clearCache=true
  406. // request always returns a freshly computed value
  407. if clearCache {
  408. a.OutOfClusterCache.Flush()
  409. }
  410. // attempt to retrieve cost data from cache
  411. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregation, filterType, filterValue)
  412. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  413. if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
  414. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluser cache hit: %s", key)))
  415. return
  416. }
  417. klog.Errorf("caching error: failed to type cast data: %s", key)
  418. }
  419. data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filterType, filterValue)
  420. if err == nil {
  421. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  422. }
  423. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  424. }
  425. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  426. w.Header().Set("Content-Type", "application/json")
  427. w.Header().Set("Access-Control-Allow-Origin", "*")
  428. data, err := p.Cloud.AllNodePricing()
  429. w.Write(WrapData(data, err))
  430. }
  431. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  432. w.Header().Set("Content-Type", "application/json")
  433. w.Header().Set("Access-Control-Allow-Origin", "*")
  434. data, err := p.Cloud.GetConfig()
  435. w.Write(WrapData(data, err))
  436. }
  437. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  438. w.Header().Set("Content-Type", "application/json")
  439. w.Header().Set("Access-Control-Allow-Origin", "*")
  440. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  441. if err != nil {
  442. w.Write(WrapData(data, err))
  443. return
  444. }
  445. w.Write(WrapData(data, err))
  446. err = p.Cloud.DownloadPricingData()
  447. if err != nil {
  448. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  449. }
  450. return
  451. }
  452. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  453. w.Header().Set("Content-Type", "application/json")
  454. w.Header().Set("Access-Control-Allow-Origin", "*")
  455. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  456. if err != nil {
  457. w.Write(WrapData(data, err))
  458. return
  459. }
  460. w.Write(WrapData(data, err))
  461. return
  462. }
  463. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  464. w.Header().Set("Content-Type", "application/json")
  465. w.Header().Set("Access-Control-Allow-Origin", "*")
  466. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  467. if err != nil {
  468. w.Write(WrapData(data, err))
  469. return
  470. }
  471. w.Write(WrapData(data, err))
  472. return
  473. }
  474. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  475. w.Header().Set("Content-Type", "application/json")
  476. w.Header().Set("Access-Control-Allow-Origin", "*")
  477. data, err := p.Cloud.UpdateConfig(r.Body, "")
  478. if err != nil {
  479. w.Write(WrapData(data, err))
  480. return
  481. }
  482. w.Write(WrapData(data, err))
  483. return
  484. }
  485. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  486. w.Header().Set("Content-Type", "application/json")
  487. w.Header().Set("Access-Control-Allow-Origin", "*")
  488. data, err := p.Cloud.GetManagementPlatform()
  489. if err != nil {
  490. w.Write(WrapData(data, err))
  491. return
  492. }
  493. w.Write(WrapData(data, err))
  494. return
  495. }
  496. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  497. w.Header().Set("Content-Type", "application/json")
  498. w.Header().Set("Access-Control-Allow-Origin", "*")
  499. data, err := p.Cloud.ClusterInfo()
  500. w.Write(WrapData(data, err))
  501. }
  502. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  503. w.Header().Set("Content-Type", "application/json")
  504. w.Header().Set("Access-Control-Allow-Origin", "*")
  505. w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
  506. }
  507. func (p *Accesses) ContainerUptimes(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  508. w.Header().Set("Content-Type", "application/json")
  509. w.Header().Set("Access-Control-Allow-Origin", "*")
  510. res, err := ComputeUptimes(p.PrometheusClient)
  511. w.Write(WrapData(res, err))
  512. }
  513. func (a *Accesses) recordPrices() {
  514. go func() {
  515. containerSeen := make(map[string]bool)
  516. nodeSeen := make(map[string]bool)
  517. pvSeen := make(map[string]bool)
  518. pvcSeen := make(map[string]bool)
  519. getKeyFromLabelStrings := func(labels ...string) string {
  520. return strings.Join(labels, ",")
  521. }
  522. getLabelStringsFromKey := func(key string) []string {
  523. return strings.Split(key, ",")
  524. }
  525. for {
  526. klog.V(4).Info("Recording prices...")
  527. podlist := a.Model.Cache.GetAllPods()
  528. podStatus := make(map[string]v1.PodPhase)
  529. for _, pod := range podlist {
  530. podStatus[pod.Name] = pod.Status.Phase
  531. }
  532. // Record network pricing at global scope
  533. networkCosts, err := a.Cloud.NetworkPricing()
  534. if err != nil {
  535. klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
  536. } else {
  537. a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  538. a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  539. a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  540. }
  541. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
  542. if err != nil {
  543. klog.V(1).Info("Error in price recording: " + err.Error())
  544. // zero the for loop so the time.Sleep will still work
  545. data = map[string]*CostData{}
  546. }
  547. nodes, err := a.Model.GetNodeCost(a.Cloud)
  548. for nodeName, node := range nodes {
  549. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  550. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  551. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  552. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  553. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  554. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  555. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  556. a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(cpuCost)
  557. a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName).Set(ramCost)
  558. a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(gpuCost)
  559. a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName).Set(totalCost)
  560. labelKey := getKeyFromLabelStrings(nodeName, nodeName)
  561. nodeSeen[labelKey] = true
  562. }
  563. for _, costs := range data {
  564. nodeName := costs.NodeName
  565. namespace := costs.Namespace
  566. podName := costs.PodName
  567. containerName := costs.Name
  568. if costs.PVCData != nil {
  569. for _, pvc := range costs.PVCData {
  570. if pvc.Volume != nil {
  571. a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
  572. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  573. pvcSeen[labelKey] = true
  574. }
  575. }
  576. }
  577. if len(costs.RAMAllocation) > 0 {
  578. a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  579. }
  580. if len(costs.CPUAllocation) > 0 {
  581. a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  582. }
  583. if len(costs.GPUReq) > 0 {
  584. // allocation here is set to the request because shared GPU usage not yet supported.
  585. a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  586. }
  587. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  588. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  589. containerSeen[labelKey] = true
  590. } else {
  591. containerSeen[labelKey] = false
  592. }
  593. storageClasses := a.Model.Cache.GetAllStorageClasses()
  594. storageClassMap := make(map[string]map[string]string)
  595. for _, storageClass := range storageClasses {
  596. params := storageClass.Parameters
  597. storageClassMap[storageClass.ObjectMeta.Name] = params
  598. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  599. storageClassMap["default"] = params
  600. storageClassMap[""] = params
  601. }
  602. }
  603. pvs := a.Model.Cache.GetAllPersistentVolumes()
  604. for _, pv := range pvs {
  605. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  606. if !ok {
  607. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  608. }
  609. cacPv := &costAnalyzerCloud.PV{
  610. Class: pv.Spec.StorageClassName,
  611. Region: pv.Labels[v1.LabelZoneRegion],
  612. Parameters: parameters,
  613. }
  614. GetPVCost(cacPv, pv, a.Cloud)
  615. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  616. a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
  617. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  618. pvSeen[labelKey] = true
  619. }
  620. containerUptime, _ := ComputeUptimes(a.PrometheusClient)
  621. for key, uptime := range containerUptime {
  622. container, _ := NewContainerMetricFromKey(key)
  623. a.ContainerUptimeRecorder.WithLabelValues(container.Namespace, container.PodName, container.ContainerName).Set(uptime)
  624. }
  625. }
  626. for labelString, seen := range nodeSeen {
  627. if !seen {
  628. labels := getLabelStringsFromKey(labelString)
  629. a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  630. a.CPUPriceRecorder.DeleteLabelValues(labels...)
  631. a.GPUPriceRecorder.DeleteLabelValues(labels...)
  632. a.RAMPriceRecorder.DeleteLabelValues(labels...)
  633. delete(nodeSeen, labelString)
  634. }
  635. nodeSeen[labelString] = false
  636. }
  637. for labelString, seen := range containerSeen {
  638. if !seen {
  639. labels := getLabelStringsFromKey(labelString)
  640. a.RAMAllocationRecorder.DeleteLabelValues(labels...)
  641. a.CPUAllocationRecorder.DeleteLabelValues(labels...)
  642. a.GPUAllocationRecorder.DeleteLabelValues(labels...)
  643. a.ContainerUptimeRecorder.DeleteLabelValues(labels...)
  644. delete(containerSeen, labelString)
  645. }
  646. containerSeen[labelString] = false
  647. }
  648. for labelString, seen := range pvSeen {
  649. if !seen {
  650. labels := getLabelStringsFromKey(labelString)
  651. a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  652. delete(pvSeen, labelString)
  653. }
  654. pvSeen[labelString] = false
  655. }
  656. for labelString, seen := range pvcSeen {
  657. if !seen {
  658. labels := getLabelStringsFromKey(labelString)
  659. a.PVAllocationRecorder.DeleteLabelValues(labels...)
  660. delete(pvcSeen, labelString)
  661. }
  662. pvcSeen[labelString] = false
  663. }
  664. time.Sleep(time.Minute)
  665. }
  666. }()
  667. }
  668. func Initialize() {
  669. klog.InitFlags(nil)
  670. flag.Set("v", "3")
  671. flag.Parse()
  672. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  673. address := os.Getenv(prometheusServerEndpointEnvVar)
  674. if address == "" {
  675. klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
  676. }
  677. var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
  678. Proxy: http.ProxyFromEnvironment,
  679. DialContext: (&net.Dialer{
  680. Timeout: 120 * time.Second,
  681. KeepAlive: 120 * time.Second,
  682. }).DialContext,
  683. TLSHandshakeTimeout: 10 * time.Second,
  684. }
  685. pc := prometheusClient.Config{
  686. Address: address,
  687. RoundTripper: LongTimeoutRoundTripper,
  688. }
  689. promCli, _ := prometheusClient.NewClient(pc)
  690. api := prometheusAPI.NewAPI(promCli)
  691. _, err := api.Config(context.Background())
  692. if err != nil {
  693. klog.Fatalf("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  694. }
  695. klog.V(1).Info("Success: retrieved a prometheus config file from: " + address)
  696. _, err = ValidatePrometheus(promCli, false)
  697. if err != nil {
  698. klog.Fatalf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  699. }
  700. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  701. // Kubernetes API setup
  702. kc, err := rest.InClusterConfig()
  703. if err != nil {
  704. panic(err.Error())
  705. }
  706. kubeClientset, err := kubernetes.NewForConfig(kc)
  707. if err != nil {
  708. panic(err.Error())
  709. }
  710. // Create Kubernetes Cluster Cache + Watchers
  711. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  712. k8sCache.Run()
  713. cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
  714. cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
  715. if err != nil {
  716. panic(err.Error())
  717. }
  718. watchConfigFunc := func(c interface{}) {
  719. conf := c.(*v1.ConfigMap)
  720. if conf.GetName() == "pricing-configs" {
  721. _, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
  722. if err != nil {
  723. klog.Infof("ERROR UPDATING CONFIG: %s", err.Error())
  724. }
  725. }
  726. }
  727. kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
  728. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  729. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
  730. if err != nil {
  731. klog.Infof("ERROR FETCHING configmap: %s", err.Error())
  732. }
  733. watchConfigFunc(configs)
  734. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  735. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  736. Name: "node_cpu_hourly_cost",
  737. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  738. }, []string{"instance", "node"})
  739. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  740. Name: "node_ram_hourly_cost",
  741. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  742. }, []string{"instance", "node"})
  743. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  744. Name: "node_gpu_hourly_cost",
  745. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  746. }, []string{"instance", "node"})
  747. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  748. Name: "node_total_hourly_cost",
  749. Help: "node_total_hourly_cost Total node cost per hour",
  750. }, []string{"instance", "node"})
  751. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  752. Name: "pv_hourly_cost",
  753. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  754. }, []string{"volumename", "persistentvolume"})
  755. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  756. Name: "container_memory_allocation_bytes",
  757. Help: "container_memory_allocation_bytes Bytes of RAM used",
  758. }, []string{"namespace", "pod", "container", "instance", "node"})
  759. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  760. Name: "container_cpu_allocation",
  761. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  762. }, []string{"namespace", "pod", "container", "instance", "node"})
  763. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  764. Name: "container_gpu_allocation",
  765. Help: "container_gpu_allocation GPU used",
  766. }, []string{"namespace", "pod", "container", "instance", "node"})
  767. PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  768. Name: "pod_pvc_allocation",
  769. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  770. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  771. ContainerUptimeRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  772. Name: "container_uptime_seconds",
  773. Help: "container_uptime_seconds Seconds a container has been running",
  774. }, []string{"namespace", "pod", "container"})
  775. NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  776. Name: "kubecost_network_zone_egress_cost",
  777. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  778. })
  779. NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  780. Name: "kubecost_network_region_egress_cost",
  781. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  782. })
  783. NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  784. Name: "kubecost_network_internet_egress_cost",
  785. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  786. })
  787. prometheus.MustRegister(cpuGv)
  788. prometheus.MustRegister(ramGv)
  789. prometheus.MustRegister(gpuGv)
  790. prometheus.MustRegister(totalGv)
  791. prometheus.MustRegister(pvGv)
  792. prometheus.MustRegister(RAMAllocation)
  793. prometheus.MustRegister(CPUAllocation)
  794. prometheus.MustRegister(ContainerUptimeRecorder)
  795. prometheus.MustRegister(PVAllocation)
  796. prometheus.MustRegister(GPUAllocation)
  797. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  798. prometheus.MustRegister(ServiceCollector{
  799. KubeClientSet: kubeClientset,
  800. })
  801. prometheus.MustRegister(DeploymentCollector{
  802. KubeClientSet: kubeClientset,
  803. })
  804. prometheus.MustRegister(StatefulsetCollector{
  805. KubeClientSet: kubeClientset,
  806. })
  807. // cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
  808. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  809. A = Accesses{
  810. PrometheusClient: promCli,
  811. KubeClientSet: kubeClientset,
  812. Cloud: cloudProvider,
  813. CPUPriceRecorder: cpuGv,
  814. RAMPriceRecorder: ramGv,
  815. GPUPriceRecorder: gpuGv,
  816. NodeTotalPriceRecorder: totalGv,
  817. RAMAllocationRecorder: RAMAllocation,
  818. CPUAllocationRecorder: CPUAllocation,
  819. GPUAllocationRecorder: GPUAllocation,
  820. PVAllocationRecorder: PVAllocation,
  821. ContainerUptimeRecorder: ContainerUptimeRecorder,
  822. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  823. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  824. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  825. PersistentVolumePriceRecorder: pvGv,
  826. Model: NewCostModel(k8sCache),
  827. OutOfClusterCache: outOfClusterCache,
  828. }
  829. remoteEnabled := os.Getenv(remoteEnabled)
  830. if remoteEnabled == "true" {
  831. info, err := cloudProvider.ClusterInfo()
  832. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  833. if err != nil {
  834. klog.Infof("Error saving cluster id %s", err.Error())
  835. }
  836. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  837. if err != nil {
  838. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  839. }
  840. }
  841. // Thanos Client
  842. if os.Getenv(thanosEnabled) == "true" {
  843. thanosUrl := os.Getenv(thanosQueryUrl)
  844. if thanosUrl != "" {
  845. var thanosRT http.RoundTripper = &http.Transport{
  846. Proxy: http.ProxyFromEnvironment,
  847. DialContext: (&net.Dialer{
  848. Timeout: 120 * time.Second,
  849. KeepAlive: 120 * time.Second,
  850. }).DialContext,
  851. TLSHandshakeTimeout: 10 * time.Second,
  852. }
  853. thanosConfig := prometheusClient.Config{
  854. Address: thanosUrl,
  855. RoundTripper: thanosRT,
  856. }
  857. thanosCli, _ := prometheusClient.NewClient(thanosConfig)
  858. _, err = ValidatePrometheus(thanosCli, true)
  859. if err != nil {
  860. klog.V(1).Infof("Warning: Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
  861. A.ThanosClient = thanosCli
  862. } else {
  863. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
  864. A.ThanosClient = thanosCli
  865. }
  866. } else {
  867. klog.Infof("Error resolving environment variable: $%s", thanosQueryUrl)
  868. }
  869. }
  870. err = A.Cloud.DownloadPricingData()
  871. if err != nil {
  872. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  873. }
  874. A.recordPrices()
  875. Router.GET("/costDataModel", A.CostDataModel)
  876. Router.GET("/costDataModelRange", A.CostDataModelRange)
  877. Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
  878. Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
  879. Router.GET("/allNodePricing", A.GetAllNodePricing)
  880. Router.GET("/getConfigs", A.GetConfigs)
  881. Router.POST("/refreshPricing", A.RefreshPricingData)
  882. Router.POST("/updateSpotInfoConfigs", A.UpdateSpotInfoConfigs)
  883. Router.POST("/updateAthenaInfoConfigs", A.UpdateAthenaInfoConfigs)
  884. Router.POST("/updateBigQueryInfoConfigs", A.UpdateBigQueryInfoConfigs)
  885. Router.POST("/updateConfigByKey", A.UpdateConfigByKey)
  886. Router.GET("/clusterCostsOverTime", A.ClusterCostsOverTime)
  887. Router.GET("/clusterCosts", A.ClusterCosts)
  888. Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
  889. Router.GET("/managementPlatform", A.ManagementPlatform)
  890. Router.GET("/clusterInfo", A.ClusterInfo)
  891. Router.GET("/containerUptimes", A.ContainerUptimes)
  892. }