router.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094
  1. package costmodel
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "net/http"
  7. "reflect"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "k8s.io/klog"
  13. "github.com/julienschmidt/httprouter"
  14. sentry "github.com/getsentry/sentry-go"
  15. "github.com/kubecost/cost-model/pkg/cloud"
  16. "github.com/kubecost/cost-model/pkg/clustercache"
  17. cm "github.com/kubecost/cost-model/pkg/clustermanager"
  18. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  19. "github.com/kubecost/cost-model/pkg/env"
  20. "github.com/kubecost/cost-model/pkg/errors"
  21. "github.com/kubecost/cost-model/pkg/kubecost"
  22. "github.com/kubecost/cost-model/pkg/log"
  23. "github.com/kubecost/cost-model/pkg/prom"
  24. "github.com/kubecost/cost-model/pkg/thanos"
  25. "github.com/kubecost/cost-model/pkg/util/json"
  26. prometheus "github.com/prometheus/client_golang/api"
  27. prometheusClient "github.com/prometheus/client_golang/api"
  28. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  29. v1 "k8s.io/api/core/v1"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "github.com/patrickmn/go-cache"
  32. "k8s.io/client-go/kubernetes"
  33. "k8s.io/client-go/rest"
  34. "k8s.io/client-go/tools/clientcmd"
  35. )
  36. const (
  37. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  38. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  39. maxCacheMinutes1d = 11
  40. maxCacheMinutes2d = 17
  41. maxCacheMinutes7d = 37
  42. maxCacheMinutes30d = 137
  43. CustomPricingSetting = "CustomPricing"
  44. DiscountSetting = "Discount"
  45. )
  46. var (
  47. // gitCommit is set by the build system
  48. gitCommit string
  49. )
  50. // Accesses defines a singleton application instance, providing access to
  51. // Prometheus, Kubernetes, the cloud provider, and caches.
  52. type Accesses struct {
  53. Router *httprouter.Router
  54. PrometheusClient prometheusClient.Client
  55. ThanosClient prometheusClient.Client
  56. KubeClientSet kubernetes.Interface
  57. ClusterManager *cm.ClusterManager
  58. ClusterMap clusters.ClusterMap
  59. CloudProvider cloud.Provider
  60. Model *CostModel
  61. MetricsEmitter *CostModelMetricsEmitter
  62. OutOfClusterCache *cache.Cache
  63. AggregateCache *cache.Cache
  64. CostDataCache *cache.Cache
  65. ClusterCostsCache *cache.Cache
  66. CacheExpiration map[time.Duration]time.Duration
  67. AggAPI Aggregator
  68. // SettingsCache stores current state of app settings
  69. SettingsCache *cache.Cache
  70. // settingsSubscribers tracks channels through which changes to different
  71. // settings will be published in a pub/sub model
  72. settingsSubscribers map[string][]chan string
  73. settingsMutex sync.Mutex
  74. }
  75. // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
  76. // should be used.
  77. func (a *Accesses) GetPrometheusClient(remote bool) prometheusClient.Client {
  78. // Use Thanos Client if it exists (enabled) and remote flag set
  79. var pc prometheusClient.Client
  80. if remote && a.ThanosClient != nil {
  81. pc = a.ThanosClient
  82. } else {
  83. pc = a.PrometheusClient
  84. }
  85. return pc
  86. }
  87. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  88. // If one does not exists, it returns the default cache expiration, which is defined by
  89. // the particular cache.
  90. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  91. if expiration, ok := a.CacheExpiration[dur]; ok {
  92. return expiration
  93. }
  94. return cache.DefaultExpiration
  95. }
  96. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  97. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  98. // not found or is less than 2 minutes.
  99. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  100. expiry := a.GetCacheExpiration(dur).Minutes()
  101. if expiry <= 2.0 {
  102. return time.Minute
  103. }
  104. mins := time.Duration(expiry/2.0) * time.Minute
  105. return mins
  106. }
  107. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  108. w.Header().Set("Content-Type", "application/json")
  109. durationHrs := "24h"
  110. offset := "1m"
  111. pClient := a.GetPrometheusClient(true)
  112. key := fmt.Sprintf("%s:%s", durationHrs, offset)
  113. if data, valid := a.ClusterCostsCache.Get(key); valid {
  114. clusterCosts := data.(map[string]*ClusterCosts)
  115. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  116. } else {
  117. data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, durationHrs, offset, true)
  118. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  119. }
  120. }
  121. type Response struct {
  122. Code int `json:"code"`
  123. Status string `json:"status"`
  124. Data interface{} `json:"data"`
  125. Message string `json:"message,omitempty"`
  126. Warning string `json:"warning,omitempty"`
  127. }
  128. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  129. type FilterFunc func(*CostData) (bool, string)
  130. // FilterCostData allows through only CostData that matches all the given filter functions
  131. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  132. result := make(map[string]*CostData)
  133. filteredEnvironments := make(map[string]int)
  134. filteredContainers := 0
  135. DataLoop:
  136. for key, datum := range data {
  137. for _, rf := range retains {
  138. if ok, _ := rf(datum); ok {
  139. result[key] = datum
  140. // if any retain function passes, the data is retained and move on
  141. continue DataLoop
  142. }
  143. }
  144. for _, ff := range filters {
  145. if ok, environment := ff(datum); !ok {
  146. if environment != "" {
  147. filteredEnvironments[environment]++
  148. }
  149. filteredContainers++
  150. // if any filter function check fails, move on to the next datum
  151. continue DataLoop
  152. }
  153. }
  154. result[key] = datum
  155. }
  156. return result, filteredContainers, filteredEnvironments
  157. }
  158. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  159. fs := strings.Split(fields, ",")
  160. fmap := make(map[string]bool)
  161. for _, f := range fs {
  162. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  163. klog.V(1).Infof("to delete: %s", fieldNameLower)
  164. fmap[fieldNameLower] = true
  165. }
  166. filteredData := make(map[string]CostData)
  167. for cname, costdata := range data {
  168. s := reflect.TypeOf(*costdata)
  169. val := reflect.ValueOf(*costdata)
  170. costdata2 := CostData{}
  171. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  172. n := s.NumField()
  173. for i := 0; i < n; i++ {
  174. field := s.Field(i)
  175. value := val.Field(i)
  176. value2 := cd2.Field(i)
  177. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  178. value2.Set(reflect.Value(value))
  179. }
  180. }
  181. filteredData[cname] = cd2.Interface().(CostData)
  182. }
  183. return filteredData
  184. }
  185. func normalizeTimeParam(param string) (string, error) {
  186. if param == "" {
  187. return "", fmt.Errorf("invalid time param")
  188. }
  189. // convert days to hours
  190. if param[len(param)-1:] == "d" {
  191. count := param[:len(param)-1]
  192. val, err := strconv.ParseInt(count, 10, 64)
  193. if err != nil {
  194. return "", err
  195. }
  196. val = val * 24
  197. param = fmt.Sprintf("%dh", val)
  198. }
  199. return param, nil
  200. }
  201. // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  202. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  203. // return 0.0.
  204. func ParsePercentString(percentStr string) (float64, error) {
  205. if len(percentStr) == 0 {
  206. return 0.0, nil
  207. }
  208. if percentStr[len(percentStr)-1:] == "%" {
  209. percentStr = percentStr[:len(percentStr)-1]
  210. }
  211. discount, err := strconv.ParseFloat(percentStr, 64)
  212. if err != nil {
  213. return 0.0, err
  214. }
  215. discount *= 0.01
  216. return discount, nil
  217. }
  218. // parseDuration converts a Prometheus-style duration string into a Duration
  219. // TODO:CLEANUP delete this. do it now.
  220. func ParseDuration(duration string) (*time.Duration, error) {
  221. unitStr := duration[len(duration)-1:]
  222. var unit time.Duration
  223. switch unitStr {
  224. case "s":
  225. unit = time.Second
  226. case "m":
  227. unit = time.Minute
  228. case "h":
  229. unit = time.Hour
  230. case "d":
  231. unit = 24.0 * time.Hour
  232. default:
  233. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  234. }
  235. amountStr := duration[:len(duration)-1]
  236. amount, err := strconv.ParseInt(amountStr, 10, 64)
  237. if err != nil {
  238. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  239. }
  240. dur := time.Duration(amount) * unit
  241. return &dur, nil
  242. }
  243. // ParseTimeRange returns a start and end time, respectively, which are converted from
  244. // a duration and offset, defined as strings with Prometheus-style syntax.
  245. func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
  246. // endTime defaults to the current time, unless an offset is explicity declared,
  247. // in which case it shifts endTime back by given duration
  248. endTime := time.Now()
  249. if offset != "" {
  250. o, err := ParseDuration(offset)
  251. if err != nil {
  252. return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
  253. }
  254. endTime = endTime.Add(-1 * *o)
  255. }
  256. // if duration is defined in terms of days, convert to hours
  257. // e.g. convert "2d" to "48h"
  258. durationNorm, err := normalizeTimeParam(duration)
  259. if err != nil {
  260. return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
  261. }
  262. // convert time duration into start and end times, formatted
  263. // as ISO datetime strings
  264. dur, err := time.ParseDuration(durationNorm)
  265. if err != nil {
  266. return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
  267. }
  268. startTime := endTime.Add(-1 * dur)
  269. return &startTime, &endTime, nil
  270. }
  271. func WrapData(data interface{}, err error) []byte {
  272. var resp []byte
  273. if err != nil {
  274. klog.V(1).Infof("Error returned to client: %s", err.Error())
  275. resp, _ = json.Marshal(&Response{
  276. Code: http.StatusInternalServerError,
  277. Status: "error",
  278. Message: err.Error(),
  279. Data: data,
  280. })
  281. } else {
  282. resp, _ = json.Marshal(&Response{
  283. Code: http.StatusOK,
  284. Status: "success",
  285. Data: data,
  286. })
  287. }
  288. return resp
  289. }
  290. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  291. var resp []byte
  292. if err != nil {
  293. klog.V(1).Infof("Error returned to client: %s", err.Error())
  294. resp, _ = json.Marshal(&Response{
  295. Code: http.StatusInternalServerError,
  296. Status: "error",
  297. Message: err.Error(),
  298. Data: data,
  299. })
  300. } else {
  301. resp, _ = json.Marshal(&Response{
  302. Code: http.StatusOK,
  303. Status: "success",
  304. Data: data,
  305. Message: message,
  306. })
  307. }
  308. return resp
  309. }
  310. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  311. var resp []byte
  312. if err != nil {
  313. klog.V(1).Infof("Error returned to client: %s", err.Error())
  314. resp, _ = json.Marshal(&Response{
  315. Code: http.StatusInternalServerError,
  316. Status: "error",
  317. Message: err.Error(),
  318. Warning: warning,
  319. Data: data,
  320. })
  321. } else {
  322. resp, _ = json.Marshal(&Response{
  323. Code: http.StatusOK,
  324. Status: "success",
  325. Data: data,
  326. Warning: warning,
  327. })
  328. }
  329. return resp
  330. }
  331. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  332. var resp []byte
  333. if err != nil {
  334. klog.V(1).Infof("Error returned to client: %s", err.Error())
  335. resp, _ = json.Marshal(&Response{
  336. Code: http.StatusInternalServerError,
  337. Status: "error",
  338. Message: err.Error(),
  339. Warning: warning,
  340. Data: data,
  341. })
  342. } else {
  343. resp, _ = json.Marshal(&Response{
  344. Code: http.StatusOK,
  345. Status: "success",
  346. Data: data,
  347. Message: message,
  348. Warning: warning,
  349. })
  350. }
  351. return resp
  352. }
  353. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  354. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  355. w.Header().Set("Content-Type", "application/json")
  356. w.Header().Set("Access-Control-Allow-Origin", "*")
  357. err := a.CloudProvider.DownloadPricingData()
  358. w.Write(WrapData(nil, err))
  359. }
  360. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  361. w.Header().Set("Content-Type", "application/json")
  362. w.Header().Set("Access-Control-Allow-Origin", "*")
  363. window := r.URL.Query().Get("timeWindow")
  364. offset := r.URL.Query().Get("offset")
  365. fields := r.URL.Query().Get("filterFields")
  366. namespace := r.URL.Query().Get("namespace")
  367. if offset != "" {
  368. offset = "offset " + offset
  369. }
  370. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
  371. if fields != "" {
  372. filteredData := filterFields(fields, data)
  373. w.Write(WrapData(filteredData, err))
  374. } else {
  375. w.Write(WrapData(data, err))
  376. }
  377. }
  378. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  379. w.Header().Set("Content-Type", "application/json")
  380. w.Header().Set("Access-Control-Allow-Origin", "*")
  381. window := r.URL.Query().Get("window")
  382. offset := r.URL.Query().Get("offset")
  383. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  384. if useThanos && !thanos.IsEnabled() {
  385. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  386. return
  387. }
  388. var client prometheusClient.Client
  389. if useThanos {
  390. client = a.ThanosClient
  391. offset = thanos.Offset()
  392. } else {
  393. client = a.PrometheusClient
  394. }
  395. data, err := a.ComputeClusterCosts(client, a.CloudProvider, window, offset, true)
  396. w.Write(WrapData(data, err))
  397. }
  398. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  399. w.Header().Set("Content-Type", "application/json")
  400. w.Header().Set("Access-Control-Allow-Origin", "*")
  401. start := r.URL.Query().Get("start")
  402. end := r.URL.Query().Get("end")
  403. window := r.URL.Query().Get("window")
  404. offset := r.URL.Query().Get("offset")
  405. data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, window, offset)
  406. w.Write(WrapData(data, err))
  407. }
  408. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  409. w.Header().Set("Content-Type", "application/json")
  410. w.Header().Set("Access-Control-Allow-Origin", "*")
  411. startStr := r.URL.Query().Get("start")
  412. endStr := r.URL.Query().Get("end")
  413. windowStr := r.URL.Query().Get("window")
  414. fields := r.URL.Query().Get("filterFields")
  415. namespace := r.URL.Query().Get("namespace")
  416. cluster := r.URL.Query().Get("cluster")
  417. remote := r.URL.Query().Get("remote")
  418. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  419. layout := "2006-01-02T15:04:05.000Z"
  420. start, err := time.Parse(layout, startStr)
  421. if err != nil {
  422. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
  423. return
  424. }
  425. end, err := time.Parse(layout, endStr)
  426. if err != nil {
  427. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
  428. return
  429. }
  430. window := kubecost.NewWindow(&start, &end)
  431. if window.IsOpen() || window.IsEmpty() || window.IsNegative() {
  432. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
  433. return
  434. }
  435. resolution := time.Hour
  436. if resDur, err := time.ParseDuration(windowStr); err == nil {
  437. resolution = resDur
  438. }
  439. // Use Thanos Client if it exists (enabled) and remote flag set
  440. var pClient prometheusClient.Client
  441. if remote != "false" && a.ThanosClient != nil {
  442. pClient = a.ThanosClient
  443. } else {
  444. pClient = a.PrometheusClient
  445. }
  446. data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
  447. if err != nil {
  448. w.Write(WrapData(nil, err))
  449. }
  450. if fields != "" {
  451. filteredData := filterFields(fields, data)
  452. w.Write(WrapData(filteredData, err))
  453. } else {
  454. w.Write(WrapData(data, err))
  455. }
  456. }
  457. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  458. var key string
  459. var filter string
  460. var val []string
  461. if customAggregation != "" {
  462. key = customAggregation
  463. filter = filterType
  464. val = strings.Split(customAggregation, ",")
  465. } else {
  466. aggregations := strings.Split(aggregator, ",")
  467. for i, agg := range aggregations {
  468. aggregations[i] = "kubernetes_" + agg
  469. }
  470. key = strings.Join(aggregations, ",")
  471. filter = "kubernetes_" + filterType
  472. val = aggregations
  473. }
  474. return key, val, filter
  475. }
  476. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  477. w.Header().Set("Content-Type", "application/json")
  478. w.Header().Set("Access-Control-Allow-Origin", "*")
  479. start := r.URL.Query().Get("start")
  480. end := r.URL.Query().Get("end")
  481. aggregator := r.URL.Query().Get("aggregator")
  482. customAggregation := r.URL.Query().Get("customAggregation")
  483. filterType := r.URL.Query().Get("filterType")
  484. filterValue := r.URL.Query().Get("filterValue")
  485. var data []*cloud.OutOfClusterAllocation
  486. var err error
  487. _, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
  488. data, err = a.CloudProvider.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
  489. w.Write(WrapData(data, err))
  490. }
  491. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  492. w.Header().Set("Content-Type", "application/json")
  493. w.Header().Set("Access-Control-Allow-Origin", "*")
  494. // start date for which to query costs, inclusive; format YYYY-MM-DD
  495. start := r.URL.Query().Get("start")
  496. // end date for which to query costs, inclusive; format YYYY-MM-DD
  497. end := r.URL.Query().Get("end")
  498. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  499. kubernetesAggregation := r.URL.Query().Get("aggregator")
  500. // customAggregation allows full customization of aggregator w/o prepending
  501. customAggregation := r.URL.Query().Get("customAggregation")
  502. // disableCache, if set to "true", tells this function to recompute and
  503. // cache the requested data
  504. disableCache := r.URL.Query().Get("disableCache") == "true"
  505. // clearCache, if set to "true", tells this function to flush the cache,
  506. // then recompute and cache the requested data
  507. clearCache := r.URL.Query().Get("clearCache") == "true"
  508. filterType := r.URL.Query().Get("filterType")
  509. filterValue := r.URL.Query().Get("filterValue")
  510. aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
  511. // clear cache prior to checking the cache so that a clearCache=true
  512. // request always returns a freshly computed value
  513. if clearCache {
  514. a.OutOfClusterCache.Flush()
  515. }
  516. // attempt to retrieve cost data from cache
  517. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
  518. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  519. if data, ok := value.([]*cloud.OutOfClusterAllocation); ok {
  520. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  521. return
  522. }
  523. klog.Errorf("caching error: failed to type cast data: %s", key)
  524. }
  525. data, err := a.CloudProvider.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
  526. if err == nil {
  527. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  528. }
  529. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  530. }
  531. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  532. w.Header().Set("Content-Type", "application/json")
  533. w.Header().Set("Access-Control-Allow-Origin", "*")
  534. data, err := a.CloudProvider.AllNodePricing()
  535. w.Write(WrapData(data, err))
  536. }
  537. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  538. w.Header().Set("Content-Type", "application/json")
  539. w.Header().Set("Access-Control-Allow-Origin", "*")
  540. data, err := a.CloudProvider.GetConfig()
  541. w.Write(WrapData(data, err))
  542. }
  543. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  544. w.Header().Set("Content-Type", "application/json")
  545. w.Header().Set("Access-Control-Allow-Origin", "*")
  546. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.SpotInfoUpdateType)
  547. if err != nil {
  548. w.Write(WrapData(data, err))
  549. return
  550. }
  551. w.Write(WrapData(data, err))
  552. err = a.CloudProvider.DownloadPricingData()
  553. if err != nil {
  554. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  555. }
  556. return
  557. }
  558. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  559. w.Header().Set("Content-Type", "application/json")
  560. w.Header().Set("Access-Control-Allow-Origin", "*")
  561. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.AthenaInfoUpdateType)
  562. if err != nil {
  563. w.Write(WrapData(data, err))
  564. return
  565. }
  566. w.Write(WrapData(data, err))
  567. return
  568. }
  569. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  570. w.Header().Set("Content-Type", "application/json")
  571. w.Header().Set("Access-Control-Allow-Origin", "*")
  572. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.BigqueryUpdateType)
  573. if err != nil {
  574. w.Write(WrapData(data, err))
  575. return
  576. }
  577. w.Write(WrapData(data, err))
  578. return
  579. }
  580. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  581. w.Header().Set("Content-Type", "application/json")
  582. w.Header().Set("Access-Control-Allow-Origin", "*")
  583. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  584. if err != nil {
  585. w.Write(WrapData(data, err))
  586. return
  587. }
  588. w.Write(WrapData(data, err))
  589. return
  590. }
  591. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  592. w.Header().Set("Content-Type", "application/json")
  593. w.Header().Set("Access-Control-Allow-Origin", "*")
  594. data, err := a.CloudProvider.GetManagementPlatform()
  595. if err != nil {
  596. w.Write(WrapData(data, err))
  597. return
  598. }
  599. w.Write(WrapData(data, err))
  600. return
  601. }
  602. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  603. w.Header().Set("Content-Type", "application/json")
  604. w.Header().Set("Access-Control-Allow-Origin", "*")
  605. data := GetClusterInfo(a.KubeClientSet, a.CloudProvider)
  606. w.Write(WrapData(data, nil))
  607. }
  608. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  609. w.Header().Set("Content-Type", "application/json")
  610. w.Header().Set("Access-Control-Allow-Origin", "*")
  611. data := a.ClusterMap.AsMap()
  612. w.Write(WrapData(data, nil))
  613. }
  614. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  615. w.Header().Set("Content-Type", "application/json")
  616. w.Header().Set("Access-Control-Allow-Origin", "*")
  617. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  618. }
  619. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  620. w.Header().Set("Content-Type", "application/json")
  621. w.Header().Set("Access-Control-Allow-Origin", "*")
  622. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  623. }
  624. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  625. w.Header().Set("Content-Type", "application/json")
  626. w.Header().Set("Access-Control-Allow-Origin", "*")
  627. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  628. }
  629. func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  630. w.Header().Set("Content-Type", "application/json")
  631. w.Header().Set("Access-Control-Allow-Origin", "*")
  632. w.Write(WrapData(prom.Validate(a.PrometheusClient)))
  633. }
  634. // Creates a new ClusterManager instance using a boltdb storage. If that fails,
  635. // then we fall back to a memory-only storage.
  636. func newClusterManager() *cm.ClusterManager {
  637. clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
  638. // Return a memory-backed cluster manager populated by configmap
  639. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  640. // NOTE: The following should be used with a persistent disk store. Since the
  641. // NOTE: configmap approach is currently the "persistent" source (entries are read-only
  642. // NOTE: on the backend), we don't currently need to store on disk.
  643. /*
  644. path := env.GetConfigPath()
  645. db, err := bolt.Open(path+"costmodel.db", 0600, nil)
  646. if err != nil {
  647. klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
  648. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  649. }
  650. store, err := cm.NewBoltDBClusterStorage("clusters", db)
  651. if err != nil {
  652. klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
  653. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  654. }
  655. return cm.NewConfiguredClusterManager(store, clustersConfigFile)
  656. */
  657. }
  658. type ConfigWatchers struct {
  659. ConfigmapName string
  660. WatchFunc func(string, map[string]string) error
  661. }
  662. // captures the panic event in sentry
  663. func capturePanicEvent(err string, stack string) {
  664. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  665. klog.V(1).Infoln(msg)
  666. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  667. Level: sentry.LevelError,
  668. Message: msg,
  669. })
  670. sentry.Flush(5 * time.Second)
  671. }
  672. // handle any panics reported by the errors package
  673. func handlePanic(p errors.Panic) bool {
  674. err := p.Error
  675. if err != nil {
  676. if err, ok := err.(error); ok {
  677. capturePanicEvent(err.Error(), p.Stack)
  678. }
  679. if err, ok := err.(string); ok {
  680. capturePanicEvent(err, p.Stack)
  681. }
  682. }
  683. // Return true to recover iff the type is http, otherwise allow kubernetes
  684. // to recover.
  685. return p.Type == errors.PanicTypeHTTP
  686. }
  687. func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
  688. klog.InitFlags(nil)
  689. flag.Set("v", "3")
  690. flag.Parse()
  691. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", env.GetAppVersion())
  692. var err error
  693. if errorReportingEnabled {
  694. err = sentry.Init(sentry.ClientOptions{Release: env.GetAppVersion()})
  695. if err != nil {
  696. klog.Infof("Failed to initialize sentry for error reporting")
  697. } else {
  698. err = errors.SetPanicHandler(handlePanic)
  699. if err != nil {
  700. klog.Infof("Failed to set panic handler: %s", err)
  701. }
  702. }
  703. }
  704. address := env.GetPrometheusServerEndpoint()
  705. if address == "" {
  706. klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  707. }
  708. queryConcurrency := env.GetMaxQueryConcurrency()
  709. klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  710. timeout := 120 * time.Second
  711. keepAlive := 120 * time.Second
  712. scrapeInterval, _ := time.ParseDuration("1m")
  713. promCli, _ := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
  714. api := prometheusAPI.NewAPI(promCli)
  715. pcfg, err := api.Config(context.Background())
  716. if err != nil {
  717. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
  718. } else {
  719. klog.V(1).Info("Retrieved a prometheus config file from: " + address)
  720. sc, err := GetPrometheusConfig(pcfg.YAML)
  721. if err != nil {
  722. klog.Infof("Fix YAML error %s", err)
  723. }
  724. for _, scrapeconfig := range sc.ScrapeConfigs {
  725. if scrapeconfig.JobName == GetKubecostJobName() {
  726. if scrapeconfig.ScrapeInterval != "" {
  727. si := scrapeconfig.ScrapeInterval
  728. sid, err := time.ParseDuration(si)
  729. if err != nil {
  730. klog.Infof("error parseing scrapeConfig for %s", scrapeconfig.JobName)
  731. } else {
  732. klog.Infof("Found Kubecost job scrape interval of: %s", si)
  733. scrapeInterval = sid
  734. }
  735. }
  736. }
  737. }
  738. }
  739. klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  740. m, err := prom.Validate(promCli)
  741. if err != nil || m.Running == false {
  742. if err != nil {
  743. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  744. } else if m.Running == false {
  745. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
  746. }
  747. api := prometheusAPI.NewAPI(promCli)
  748. _, err = api.Config(context.Background())
  749. if err != nil {
  750. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
  751. } else {
  752. klog.V(1).Info("Retrieved a prometheus config file from: " + address)
  753. }
  754. } else {
  755. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  756. }
  757. // Kubernetes API setup
  758. var kc *rest.Config
  759. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  760. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  761. } else {
  762. kc, err = rest.InClusterConfig()
  763. }
  764. if err != nil {
  765. panic(err.Error())
  766. }
  767. kubeClientset, err := kubernetes.NewForConfig(kc)
  768. if err != nil {
  769. panic(err.Error())
  770. }
  771. // Create Kubernetes Cluster Cache + Watchers
  772. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  773. k8sCache.Run()
  774. cloudProviderKey := env.GetCloudProviderAPIKey()
  775. cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey)
  776. if err != nil {
  777. panic(err.Error())
  778. }
  779. watchConfigFunc := func(c interface{}) {
  780. conf := c.(*v1.ConfigMap)
  781. if conf.GetName() == "pricing-configs" {
  782. _, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
  783. if err != nil {
  784. klog.Infof("ERROR UPDATING %s CONFIG: %s", "pricing-configs", err.Error())
  785. }
  786. }
  787. for _, cw := range additionalConfigWatchers {
  788. if conf.GetName() == cw.ConfigmapName {
  789. err := cw.WatchFunc(conf.GetName(), conf.Data)
  790. if err != nil {
  791. klog.Infof("ERROR UPDATING %s CONFIG: %s", cw.ConfigmapName, err.Error())
  792. }
  793. }
  794. }
  795. }
  796. kubecostNamespace := env.GetKubecostNamespace()
  797. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  798. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), "pricing-configs", metav1.GetOptions{})
  799. if err != nil {
  800. klog.Infof("No %s configmap found at installtime, using existing configs: %s", "pricing-configs", err.Error())
  801. } else {
  802. watchConfigFunc(configs)
  803. }
  804. for _, cw := range additionalConfigWatchers {
  805. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw.ConfigmapName, metav1.GetOptions{})
  806. if err != nil {
  807. klog.Infof("No %s configmap found at installtime, using existing configs: %s", cw.ConfigmapName, err.Error())
  808. } else {
  809. watchConfigFunc(configs)
  810. }
  811. }
  812. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  813. // TODO: General Architecture Note: Several passes have been made to modularize a lot of
  814. // TODO: our code, but the router still continues to be the obvious entry point for new \
  815. // TODO: features. We should look to split out the actual "router" functionality and
  816. // TODO: implement a builder -> controller for stitching new features and other dependencies.
  817. clusterManager := newClusterManager()
  818. // Initialize metrics here
  819. remoteEnabled := env.IsRemoteEnabled()
  820. if remoteEnabled {
  821. info, err := cloudProvider.ClusterInfo()
  822. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  823. if err != nil {
  824. klog.Infof("Error saving cluster id %s", err.Error())
  825. }
  826. _, _, err = cloud.GetOrCreateClusterMeta(info["id"], info["name"])
  827. if err != nil {
  828. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  829. }
  830. }
  831. // Thanos Client
  832. var thanosClient prometheusClient.Client
  833. if thanos.IsEnabled() {
  834. thanosAddress := thanos.QueryURL()
  835. if thanosAddress != "" {
  836. thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
  837. _, err = prom.Validate(thanosCli)
  838. if err != nil {
  839. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  840. thanosClient = thanosCli
  841. } else {
  842. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  843. thanosClient = thanosCli
  844. }
  845. } else {
  846. klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  847. }
  848. }
  849. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  850. var clusterMap clusters.ClusterMap
  851. if thanosClient != nil {
  852. clusterMap = clusters.NewClusterMap(thanosClient, 10*time.Minute)
  853. } else {
  854. clusterMap = clusters.NewClusterMap(promCli, 5*time.Minute)
  855. }
  856. // cache responses from model and aggregation for a default of 10 minutes;
  857. // clear expired responses every 20 minutes
  858. aggregateCache := cache.New(time.Minute*10, time.Minute*20)
  859. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  860. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  861. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  862. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  863. // query durations that should be cached longer should be registered here
  864. // use relatively prime numbers to minimize likelihood of synchronized
  865. // attempts at cache warming
  866. day := 24 * time.Hour
  867. cacheExpiration := map[time.Duration]time.Duration{
  868. day: maxCacheMinutes1d * time.Minute,
  869. 2 * day: maxCacheMinutes2d * time.Minute,
  870. 7 * day: maxCacheMinutes7d * time.Minute,
  871. 30 * day: maxCacheMinutes30d * time.Minute,
  872. }
  873. var pc prometheus.Client
  874. if thanosClient != nil {
  875. pc = thanosClient
  876. } else {
  877. pc = promCli
  878. }
  879. costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
  880. metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
  881. a := &Accesses{
  882. Router: httprouter.New(),
  883. PrometheusClient: promCli,
  884. ThanosClient: thanosClient,
  885. KubeClientSet: kubeClientset,
  886. ClusterManager: clusterManager,
  887. ClusterMap: clusterMap,
  888. CloudProvider: cloudProvider,
  889. Model: costModel,
  890. MetricsEmitter: metricsEmitter,
  891. AggregateCache: aggregateCache,
  892. CostDataCache: costDataCache,
  893. ClusterCostsCache: clusterCostsCache,
  894. OutOfClusterCache: outOfClusterCache,
  895. SettingsCache: settingsCache,
  896. CacheExpiration: cacheExpiration,
  897. }
  898. // Use the Accesses instance, itself, as the CostModelAggregator. This is
  899. // confusing and unconventional, but necessary so that we can swap it
  900. // out for the ETL-adapted version elsewhere.
  901. // TODO clean this up once ETL is open-sourced.
  902. a.AggAPI = a
  903. // Initialize mechanism for subscribing to settings changes
  904. a.InitializeSettingsPubSub()
  905. err = a.CloudProvider.DownloadPricingData()
  906. if err != nil {
  907. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  908. }
  909. // Warm the aggregate cache unless explicitly set to false
  910. if env.IsCacheWarmingEnabled() {
  911. log.Infof("Init: AggregateCostModel cache warming enabled")
  912. a.warmAggregateCostModelCache()
  913. } else {
  914. log.Infof("Init: AggregateCostModel cache warming disabled")
  915. }
  916. a.MetricsEmitter.Start()
  917. managerEndpoints := cm.NewClusterManagerEndpoints(a.ClusterManager)
  918. a.Router.GET("/costDataModel", a.CostDataModel)
  919. a.Router.GET("/costDataModelRange", a.CostDataModelRange)
  920. a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
  921. a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
  922. a.Router.GET("/outOfClusterCosts", a.OutOfClusterCostsWithCache)
  923. a.Router.GET("/allNodePricing", a.GetAllNodePricing)
  924. a.Router.POST("/refreshPricing", a.RefreshPricingData)
  925. a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  926. a.Router.GET("/clusterCosts", a.ClusterCosts)
  927. a.Router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  928. a.Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  929. a.Router.GET("/managementPlatform", a.ManagementPlatform)
  930. a.Router.GET("/clusterInfo", a.ClusterInfo)
  931. a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  932. a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  933. a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  934. a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  935. // cluster manager endpoints
  936. a.Router.GET("/clusters", managerEndpoints.GetAllClusters)
  937. a.Router.PUT("/clusters", managerEndpoints.PutCluster)
  938. a.Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
  939. return a
  940. }