router.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283
  1. package costmodel
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "net/http"
  7. "reflect"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/kubecost/cost-model/pkg/util/httputil"
  13. "github.com/kubecost/cost-model/pkg/util/timeutil"
  14. "k8s.io/klog"
  15. "github.com/julienschmidt/httprouter"
  16. sentry "github.com/getsentry/sentry-go"
  17. "github.com/kubecost/cost-model/pkg/cloud"
  18. "github.com/kubecost/cost-model/pkg/clustercache"
  19. cm "github.com/kubecost/cost-model/pkg/clustermanager"
  20. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  21. "github.com/kubecost/cost-model/pkg/env"
  22. "github.com/kubecost/cost-model/pkg/errors"
  23. "github.com/kubecost/cost-model/pkg/kubecost"
  24. "github.com/kubecost/cost-model/pkg/log"
  25. "github.com/kubecost/cost-model/pkg/prom"
  26. "github.com/kubecost/cost-model/pkg/thanos"
  27. "github.com/kubecost/cost-model/pkg/util/json"
  28. prometheus "github.com/prometheus/client_golang/api"
  29. prometheusClient "github.com/prometheus/client_golang/api"
  30. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  31. v1 "k8s.io/api/core/v1"
  32. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  33. "github.com/patrickmn/go-cache"
  34. "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/rest"
  36. "k8s.io/client-go/tools/clientcmd"
  37. )
  38. const (
  39. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  40. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  41. maxCacheMinutes1d = 11
  42. maxCacheMinutes2d = 17
  43. maxCacheMinutes7d = 37
  44. maxCacheMinutes30d = 137
  45. CustomPricingSetting = "CustomPricing"
  46. DiscountSetting = "Discount"
  47. )
  48. var (
  49. // gitCommit is set by the build system
  50. gitCommit string
  51. )
  52. // Accesses defines a singleton application instance, providing access to
  53. // Prometheus, Kubernetes, the cloud provider, and caches.
  54. type Accesses struct {
  55. Router *httprouter.Router
  56. PrometheusClient prometheusClient.Client
  57. ThanosClient prometheusClient.Client
  58. KubeClientSet kubernetes.Interface
  59. ClusterManager *cm.ClusterManager
  60. ClusterMap clusters.ClusterMap
  61. CloudProvider cloud.Provider
  62. Model *CostModel
  63. MetricsEmitter *CostModelMetricsEmitter
  64. OutOfClusterCache *cache.Cache
  65. AggregateCache *cache.Cache
  66. CostDataCache *cache.Cache
  67. ClusterCostsCache *cache.Cache
  68. CacheExpiration map[time.Duration]time.Duration
  69. AggAPI Aggregator
  70. // SettingsCache stores current state of app settings
  71. SettingsCache *cache.Cache
  72. // settingsSubscribers tracks channels through which changes to different
  73. // settings will be published in a pub/sub model
  74. settingsSubscribers map[string][]chan string
  75. settingsMutex sync.Mutex
  76. }
  77. // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
  78. // should be used.
  79. func (a *Accesses) GetPrometheusClient(remote bool) prometheusClient.Client {
  80. // Use Thanos Client if it exists (enabled) and remote flag set
  81. var pc prometheusClient.Client
  82. if remote && a.ThanosClient != nil {
  83. pc = a.ThanosClient
  84. } else {
  85. pc = a.PrometheusClient
  86. }
  87. return pc
  88. }
  89. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  90. // If one does not exists, it returns the default cache expiration, which is defined by
  91. // the particular cache.
  92. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  93. if expiration, ok := a.CacheExpiration[dur]; ok {
  94. return expiration
  95. }
  96. return cache.DefaultExpiration
  97. }
  98. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  99. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  100. // not found or is less than 2 minutes.
  101. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  102. expiry := a.GetCacheExpiration(dur).Minutes()
  103. if expiry <= 2.0 {
  104. return time.Minute
  105. }
  106. mins := time.Duration(expiry/2.0) * time.Minute
  107. return mins
  108. }
  109. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  110. w.Header().Set("Content-Type", "application/json")
  111. duration := 24 * time.Hour
  112. offset := time.Minute
  113. durationHrs := "24h"
  114. fmtOffset := "1m"
  115. pClient := a.GetPrometheusClient(true)
  116. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  117. if data, valid := a.ClusterCostsCache.Get(key); valid {
  118. clusterCosts := data.(map[string]*ClusterCosts)
  119. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  120. } else {
  121. data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
  122. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  123. }
  124. }
  125. type Response struct {
  126. Code int `json:"code"`
  127. Status string `json:"status"`
  128. Data interface{} `json:"data"`
  129. Message string `json:"message,omitempty"`
  130. Warning string `json:"warning,omitempty"`
  131. }
  132. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  133. type FilterFunc func(*CostData) (bool, string)
  134. // FilterCostData allows through only CostData that matches all the given filter functions
  135. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  136. result := make(map[string]*CostData)
  137. filteredEnvironments := make(map[string]int)
  138. filteredContainers := 0
  139. DataLoop:
  140. for key, datum := range data {
  141. for _, rf := range retains {
  142. if ok, _ := rf(datum); ok {
  143. result[key] = datum
  144. // if any retain function passes, the data is retained and move on
  145. continue DataLoop
  146. }
  147. }
  148. for _, ff := range filters {
  149. if ok, environment := ff(datum); !ok {
  150. if environment != "" {
  151. filteredEnvironments[environment]++
  152. }
  153. filteredContainers++
  154. // if any filter function check fails, move on to the next datum
  155. continue DataLoop
  156. }
  157. }
  158. result[key] = datum
  159. }
  160. return result, filteredContainers, filteredEnvironments
  161. }
  162. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  163. fs := strings.Split(fields, ",")
  164. fmap := make(map[string]bool)
  165. for _, f := range fs {
  166. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  167. klog.V(1).Infof("to delete: %s", fieldNameLower)
  168. fmap[fieldNameLower] = true
  169. }
  170. filteredData := make(map[string]CostData)
  171. for cname, costdata := range data {
  172. s := reflect.TypeOf(*costdata)
  173. val := reflect.ValueOf(*costdata)
  174. costdata2 := CostData{}
  175. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  176. n := s.NumField()
  177. for i := 0; i < n; i++ {
  178. field := s.Field(i)
  179. value := val.Field(i)
  180. value2 := cd2.Field(i)
  181. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  182. value2.Set(reflect.Value(value))
  183. }
  184. }
  185. filteredData[cname] = cd2.Interface().(CostData)
  186. }
  187. return filteredData
  188. }
  189. func normalizeTimeParam(param string) (string, error) {
  190. if param == "" {
  191. return "", fmt.Errorf("invalid time param")
  192. }
  193. // convert days to hours
  194. if param[len(param)-1:] == "d" {
  195. count := param[:len(param)-1]
  196. val, err := strconv.ParseInt(count, 10, 64)
  197. if err != nil {
  198. return "", err
  199. }
  200. val = val * 24
  201. param = fmt.Sprintf("%dh", val)
  202. }
  203. return param, nil
  204. }
  205. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  206. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  207. // return 0.0.
  208. func ParsePercentString(percentStr string) (float64, error) {
  209. if len(percentStr) == 0 {
  210. return 0.0, nil
  211. }
  212. if percentStr[len(percentStr)-1:] == "%" {
  213. percentStr = percentStr[:len(percentStr)-1]
  214. }
  215. discount, err := strconv.ParseFloat(percentStr, 64)
  216. if err != nil {
  217. return 0.0, err
  218. }
  219. discount *= 0.01
  220. return discount, nil
  221. }
  222. func WrapData(data interface{}, err error) []byte {
  223. var resp []byte
  224. if err != nil {
  225. klog.V(1).Infof("Error returned to client: %s", err.Error())
  226. resp, _ = json.Marshal(&Response{
  227. Code: http.StatusInternalServerError,
  228. Status: "error",
  229. Message: err.Error(),
  230. Data: data,
  231. })
  232. } else {
  233. resp, _ = json.Marshal(&Response{
  234. Code: http.StatusOK,
  235. Status: "success",
  236. Data: data,
  237. })
  238. }
  239. return resp
  240. }
  241. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  242. var resp []byte
  243. if err != nil {
  244. klog.V(1).Infof("Error returned to client: %s", err.Error())
  245. resp, _ = json.Marshal(&Response{
  246. Code: http.StatusInternalServerError,
  247. Status: "error",
  248. Message: err.Error(),
  249. Data: data,
  250. })
  251. } else {
  252. resp, _ = json.Marshal(&Response{
  253. Code: http.StatusOK,
  254. Status: "success",
  255. Data: data,
  256. Message: message,
  257. })
  258. }
  259. return resp
  260. }
  261. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  262. var resp []byte
  263. if err != nil {
  264. klog.V(1).Infof("Error returned to client: %s", err.Error())
  265. resp, _ = json.Marshal(&Response{
  266. Code: http.StatusInternalServerError,
  267. Status: "error",
  268. Message: err.Error(),
  269. Warning: warning,
  270. Data: data,
  271. })
  272. } else {
  273. resp, _ = json.Marshal(&Response{
  274. Code: http.StatusOK,
  275. Status: "success",
  276. Data: data,
  277. Warning: warning,
  278. })
  279. }
  280. return resp
  281. }
  282. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  283. var resp []byte
  284. if err != nil {
  285. klog.V(1).Infof("Error returned to client: %s", err.Error())
  286. resp, _ = json.Marshal(&Response{
  287. Code: http.StatusInternalServerError,
  288. Status: "error",
  289. Message: err.Error(),
  290. Warning: warning,
  291. Data: data,
  292. })
  293. } else {
  294. resp, _ = json.Marshal(&Response{
  295. Code: http.StatusOK,
  296. Status: "success",
  297. Data: data,
  298. Message: message,
  299. Warning: warning,
  300. })
  301. }
  302. return resp
  303. }
  304. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  305. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  306. w.Header().Set("Content-Type", "application/json")
  307. w.Header().Set("Access-Control-Allow-Origin", "*")
  308. err := a.CloudProvider.DownloadPricingData()
  309. w.Write(WrapData(nil, err))
  310. }
  311. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  312. w.Header().Set("Content-Type", "application/json")
  313. w.Header().Set("Access-Control-Allow-Origin", "*")
  314. window := r.URL.Query().Get("timeWindow")
  315. offset := r.URL.Query().Get("offset")
  316. fields := r.URL.Query().Get("filterFields")
  317. namespace := r.URL.Query().Get("namespace")
  318. if offset != "" {
  319. offset = "offset " + offset
  320. }
  321. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
  322. if fields != "" {
  323. filteredData := filterFields(fields, data)
  324. w.Write(WrapData(filteredData, err))
  325. } else {
  326. w.Write(WrapData(data, err))
  327. }
  328. }
  329. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  330. w.Header().Set("Content-Type", "application/json")
  331. w.Header().Set("Access-Control-Allow-Origin", "*")
  332. window := r.URL.Query().Get("window")
  333. offset := r.URL.Query().Get("offset")
  334. if window == "" {
  335. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  336. return
  337. }
  338. windowDur, err := timeutil.ParseDuration(window)
  339. if err != nil {
  340. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  341. return
  342. }
  343. // offset is not a required parameter
  344. var offsetDur time.Duration
  345. if offset != "" {
  346. offsetDur, err = timeutil.ParseDuration(offset)
  347. if err != nil {
  348. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  349. return
  350. }
  351. }
  352. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  353. if useThanos && !thanos.IsEnabled() {
  354. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  355. return
  356. }
  357. var client prometheusClient.Client
  358. if useThanos {
  359. client = a.ThanosClient
  360. offsetDur = thanos.OffsetDuration()
  361. } else {
  362. client = a.PrometheusClient
  363. }
  364. data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
  365. w.Write(WrapData(data, err))
  366. }
  367. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  368. w.Header().Set("Content-Type", "application/json")
  369. w.Header().Set("Access-Control-Allow-Origin", "*")
  370. start := r.URL.Query().Get("start")
  371. end := r.URL.Query().Get("end")
  372. window := r.URL.Query().Get("window")
  373. offset := r.URL.Query().Get("offset")
  374. if window == "" {
  375. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  376. return
  377. }
  378. windowDur, err := timeutil.ParseDuration(window)
  379. if err != nil {
  380. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  381. return
  382. }
  383. // offset is not a required parameter
  384. var offsetDur time.Duration
  385. if offset != "" {
  386. offsetDur, err = timeutil.ParseDuration(offset)
  387. if err != nil {
  388. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  389. return
  390. }
  391. }
  392. data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
  393. w.Write(WrapData(data, err))
  394. }
  395. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  396. w.Header().Set("Content-Type", "application/json")
  397. w.Header().Set("Access-Control-Allow-Origin", "*")
  398. startStr := r.URL.Query().Get("start")
  399. endStr := r.URL.Query().Get("end")
  400. windowStr := r.URL.Query().Get("window")
  401. fields := r.URL.Query().Get("filterFields")
  402. namespace := r.URL.Query().Get("namespace")
  403. cluster := r.URL.Query().Get("cluster")
  404. remote := r.URL.Query().Get("remote")
  405. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  406. layout := "2006-01-02T15:04:05.000Z"
  407. start, err := time.Parse(layout, startStr)
  408. if err != nil {
  409. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
  410. return
  411. }
  412. end, err := time.Parse(layout, endStr)
  413. if err != nil {
  414. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
  415. return
  416. }
  417. window := kubecost.NewWindow(&start, &end)
  418. if window.IsOpen() || window.IsEmpty() || window.IsNegative() {
  419. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
  420. return
  421. }
  422. resolution := time.Hour
  423. if resDur, err := time.ParseDuration(windowStr); err == nil {
  424. resolution = resDur
  425. }
  426. // Use Thanos Client if it exists (enabled) and remote flag set
  427. var pClient prometheusClient.Client
  428. if remote != "false" && a.ThanosClient != nil {
  429. pClient = a.ThanosClient
  430. } else {
  431. pClient = a.PrometheusClient
  432. }
  433. data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
  434. if err != nil {
  435. w.Write(WrapData(nil, err))
  436. }
  437. if fields != "" {
  438. filteredData := filterFields(fields, data)
  439. w.Write(WrapData(filteredData, err))
  440. } else {
  441. w.Write(WrapData(data, err))
  442. }
  443. }
  444. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  445. var key string
  446. var filter string
  447. var val []string
  448. if customAggregation != "" {
  449. key = customAggregation
  450. filter = filterType
  451. val = strings.Split(customAggregation, ",")
  452. } else {
  453. aggregations := strings.Split(aggregator, ",")
  454. for i, agg := range aggregations {
  455. aggregations[i] = "kubernetes_" + agg
  456. }
  457. key = strings.Join(aggregations, ",")
  458. filter = "kubernetes_" + filterType
  459. val = aggregations
  460. }
  461. return key, val, filter
  462. }
  463. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  464. w.Header().Set("Content-Type", "application/json")
  465. w.Header().Set("Access-Control-Allow-Origin", "*")
  466. start := r.URL.Query().Get("start")
  467. end := r.URL.Query().Get("end")
  468. aggregator := r.URL.Query().Get("aggregator")
  469. customAggregation := r.URL.Query().Get("customAggregation")
  470. filterType := r.URL.Query().Get("filterType")
  471. filterValue := r.URL.Query().Get("filterValue")
  472. var data []*cloud.OutOfClusterAllocation
  473. var err error
  474. _, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
  475. data, err = a.CloudProvider.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
  476. w.Write(WrapData(data, err))
  477. }
  478. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  479. w.Header().Set("Content-Type", "application/json")
  480. w.Header().Set("Access-Control-Allow-Origin", "*")
  481. // start date for which to query costs, inclusive; format YYYY-MM-DD
  482. start := r.URL.Query().Get("start")
  483. // end date for which to query costs, inclusive; format YYYY-MM-DD
  484. end := r.URL.Query().Get("end")
  485. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  486. kubernetesAggregation := r.URL.Query().Get("aggregator")
  487. // customAggregation allows full customization of aggregator w/o prepending
  488. customAggregation := r.URL.Query().Get("customAggregation")
  489. // disableCache, if set to "true", tells this function to recompute and
  490. // cache the requested data
  491. disableCache := r.URL.Query().Get("disableCache") == "true"
  492. // clearCache, if set to "true", tells this function to flush the cache,
  493. // then recompute and cache the requested data
  494. clearCache := r.URL.Query().Get("clearCache") == "true"
  495. filterType := r.URL.Query().Get("filterType")
  496. filterValue := r.URL.Query().Get("filterValue")
  497. aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
  498. // clear cache prior to checking the cache so that a clearCache=true
  499. // request always returns a freshly computed value
  500. if clearCache {
  501. a.OutOfClusterCache.Flush()
  502. }
  503. // attempt to retrieve cost data from cache
  504. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
  505. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  506. if data, ok := value.([]*cloud.OutOfClusterAllocation); ok {
  507. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  508. return
  509. }
  510. klog.Errorf("caching error: failed to type cast data: %s", key)
  511. }
  512. data, err := a.CloudProvider.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
  513. if err == nil {
  514. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  515. }
  516. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  517. }
  518. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  519. w.Header().Set("Content-Type", "application/json")
  520. w.Header().Set("Access-Control-Allow-Origin", "*")
  521. data, err := a.CloudProvider.AllNodePricing()
  522. w.Write(WrapData(data, err))
  523. }
  524. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  525. w.Header().Set("Content-Type", "application/json")
  526. w.Header().Set("Access-Control-Allow-Origin", "*")
  527. data, err := a.CloudProvider.GetConfig()
  528. w.Write(WrapData(data, err))
  529. }
  530. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  531. w.Header().Set("Content-Type", "application/json")
  532. w.Header().Set("Access-Control-Allow-Origin", "*")
  533. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.SpotInfoUpdateType)
  534. if err != nil {
  535. w.Write(WrapData(data, err))
  536. return
  537. }
  538. w.Write(WrapData(data, err))
  539. err = a.CloudProvider.DownloadPricingData()
  540. if err != nil {
  541. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  542. }
  543. return
  544. }
  545. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  546. w.Header().Set("Content-Type", "application/json")
  547. w.Header().Set("Access-Control-Allow-Origin", "*")
  548. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.AthenaInfoUpdateType)
  549. if err != nil {
  550. w.Write(WrapData(data, err))
  551. return
  552. }
  553. w.Write(WrapData(data, err))
  554. return
  555. }
  556. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  557. w.Header().Set("Content-Type", "application/json")
  558. w.Header().Set("Access-Control-Allow-Origin", "*")
  559. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.BigqueryUpdateType)
  560. if err != nil {
  561. w.Write(WrapData(data, err))
  562. return
  563. }
  564. w.Write(WrapData(data, err))
  565. return
  566. }
  567. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  568. w.Header().Set("Content-Type", "application/json")
  569. w.Header().Set("Access-Control-Allow-Origin", "*")
  570. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  571. if err != nil {
  572. w.Write(WrapData(data, err))
  573. return
  574. }
  575. w.Write(WrapData(data, err))
  576. return
  577. }
  578. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  579. w.Header().Set("Content-Type", "application/json")
  580. w.Header().Set("Access-Control-Allow-Origin", "*")
  581. data, err := a.CloudProvider.GetManagementPlatform()
  582. if err != nil {
  583. w.Write(WrapData(data, err))
  584. return
  585. }
  586. w.Write(WrapData(data, err))
  587. return
  588. }
  589. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  590. w.Header().Set("Content-Type", "application/json")
  591. w.Header().Set("Access-Control-Allow-Origin", "*")
  592. data := GetClusterInfo(a.KubeClientSet, a.CloudProvider)
  593. w.Write(WrapData(data, nil))
  594. }
  595. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  596. w.Header().Set("Content-Type", "application/json")
  597. w.Header().Set("Access-Control-Allow-Origin", "*")
  598. data := a.ClusterMap.AsMap()
  599. w.Write(WrapData(data, nil))
  600. }
  601. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  602. w.Header().Set("Content-Type", "application/json")
  603. w.Header().Set("Access-Control-Allow-Origin", "*")
  604. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  605. }
  606. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  607. w.Header().Set("Content-Type", "application/json")
  608. w.Header().Set("Access-Control-Allow-Origin", "*")
  609. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  610. }
  611. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  612. w.Header().Set("Content-Type", "application/json")
  613. w.Header().Set("Access-Control-Allow-Origin", "*")
  614. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  615. }
  616. func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  617. w.Header().Set("Content-Type", "application/json")
  618. w.Header().Set("Access-Control-Allow-Origin", "*")
  619. w.Write(WrapData(prom.Validate(a.PrometheusClient)))
  620. }
  621. func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  622. w.Header().Set("Content-Type", "application/json")
  623. w.Header().Set("Access-Control-Allow-Origin", "*")
  624. qp := httputil.NewQueryParams(r.URL.Query())
  625. query := qp.Get("query", "")
  626. if query == "" {
  627. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  628. return
  629. }
  630. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  631. body, err := ctx.RawQuery(query)
  632. if err != nil {
  633. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  634. return
  635. }
  636. w.Write(body)
  637. }
  638. func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  639. w.Header().Set("Content-Type", "application/json")
  640. w.Header().Set("Access-Control-Allow-Origin", "*")
  641. qp := httputil.NewQueryParams(r.URL.Query())
  642. query := qp.Get("query", "")
  643. if query == "" {
  644. fmt.Fprintf(w, "Error parsing query from request parameters.")
  645. return
  646. }
  647. start, end, duration, err := toStartEndStep(qp)
  648. if err != nil {
  649. fmt.Fprintf(w, err.Error())
  650. return
  651. }
  652. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  653. body, err := ctx.RawQueryRange(query, start, end, duration)
  654. if err != nil {
  655. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  656. return
  657. }
  658. w.Write(body)
  659. }
  660. func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  661. w.Header().Set("Content-Type", "application/json")
  662. w.Header().Set("Access-Control-Allow-Origin", "*")
  663. if !thanos.IsEnabled() {
  664. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  665. return
  666. }
  667. qp := httputil.NewQueryParams(r.URL.Query())
  668. query := qp.Get("query", "")
  669. if query == "" {
  670. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  671. return
  672. }
  673. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  674. body, err := ctx.RawQuery(query)
  675. if err != nil {
  676. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  677. return
  678. }
  679. w.Write(body)
  680. }
  681. func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  682. w.Header().Set("Content-Type", "application/json")
  683. w.Header().Set("Access-Control-Allow-Origin", "*")
  684. if !thanos.IsEnabled() {
  685. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  686. return
  687. }
  688. qp := httputil.NewQueryParams(r.URL.Query())
  689. query := qp.Get("query", "")
  690. if query == "" {
  691. fmt.Fprintf(w, "Error parsing query from request parameters.")
  692. return
  693. }
  694. start, end, duration, err := toStartEndStep(qp)
  695. if err != nil {
  696. fmt.Fprintf(w, err.Error())
  697. return
  698. }
  699. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  700. body, err := ctx.RawQueryRange(query, start, end, duration)
  701. if err != nil {
  702. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  703. return
  704. }
  705. w.Write(body)
  706. }
  707. // helper for query range proxy requests
  708. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  709. var e error
  710. ss := qp.Get("start", "")
  711. es := qp.Get("end", "")
  712. ds := qp.Get("duration", "")
  713. layout := "2006-01-02T15:04:05.000Z"
  714. start, e = time.Parse(layout, ss)
  715. if e != nil {
  716. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  717. return
  718. }
  719. end, e = time.Parse(layout, es)
  720. if e != nil {
  721. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  722. return
  723. }
  724. step, e = time.ParseDuration(ds)
  725. if e != nil {
  726. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  727. return
  728. }
  729. err = nil
  730. return
  731. }
  732. func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  733. w.Header().Set("Content-Type", "application/json")
  734. w.Header().Set("Access-Control-Allow-Origin", "*")
  735. promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
  736. if err != nil {
  737. w.Write(WrapData(nil, err))
  738. return
  739. }
  740. result := map[string]*prom.PrometheusQueueState{
  741. "prometheus": promQueueState,
  742. }
  743. if thanos.IsEnabled() {
  744. thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
  745. if err != nil {
  746. log.Warningf("Error getting Thanos queue state: %s", err)
  747. } else {
  748. result["thanos"] = thanosQueueState
  749. }
  750. }
  751. w.Write(WrapData(result, nil))
  752. }
  753. // GetPrometheusMetrics retrieves availability of Prometheus and Thanos metrics
  754. func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  755. w.Header().Set("Content-Type", "application/json")
  756. w.Header().Set("Access-Control-Allow-Origin", "*")
  757. promMetrics, err := prom.GetPrometheusMetrics(a.PrometheusClient, "")
  758. if err != nil {
  759. w.Write(WrapData(nil, err))
  760. return
  761. }
  762. result := map[string][]*prom.PrometheusDiagnostic{
  763. "prometheus": promMetrics,
  764. }
  765. if thanos.IsEnabled() {
  766. thanosMetrics, err := prom.GetPrometheusMetrics(a.ThanosClient, thanos.QueryOffset())
  767. if err != nil {
  768. log.Warningf("Error getting Thanos queue state: %s", err)
  769. } else {
  770. result["thanos"] = thanosMetrics
  771. }
  772. }
  773. w.Write(WrapData(result, nil))
  774. }
  775. // Creates a new ClusterManager instance using a boltdb storage. If that fails,
  776. // then we fall back to a memory-only storage.
  777. func newClusterManager() *cm.ClusterManager {
  778. clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
  779. // Return a memory-backed cluster manager populated by configmap
  780. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  781. // NOTE: The following should be used with a persistent disk store. Since the
  782. // NOTE: configmap approach is currently the "persistent" source (entries are read-only
  783. // NOTE: on the backend), we don't currently need to store on disk.
  784. /*
  785. path := env.GetConfigPath()
  786. db, err := bolt.Open(path+"costmodel.db", 0600, nil)
  787. if err != nil {
  788. klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
  789. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  790. }
  791. store, err := cm.NewBoltDBClusterStorage("clusters", db)
  792. if err != nil {
  793. klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
  794. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  795. }
  796. return cm.NewConfiguredClusterManager(store, clustersConfigFile)
  797. */
  798. }
  799. type ConfigWatchers struct {
  800. ConfigmapName string
  801. WatchFunc func(string, map[string]string) error
  802. }
  803. // captures the panic event in sentry
  804. func capturePanicEvent(err string, stack string) {
  805. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  806. klog.V(1).Infoln(msg)
  807. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  808. Level: sentry.LevelError,
  809. Message: msg,
  810. })
  811. sentry.Flush(5 * time.Second)
  812. }
  813. // handle any panics reported by the errors package
  814. func handlePanic(p errors.Panic) bool {
  815. err := p.Error
  816. if err != nil {
  817. if err, ok := err.(error); ok {
  818. capturePanicEvent(err.Error(), p.Stack)
  819. }
  820. if err, ok := err.(string); ok {
  821. capturePanicEvent(err, p.Stack)
  822. }
  823. }
  824. // Return true to recover iff the type is http, otherwise allow kubernetes
  825. // to recover.
  826. return p.Type == errors.PanicTypeHTTP
  827. }
  828. func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
  829. klog.InitFlags(nil)
  830. flag.Set("v", "3")
  831. flag.Parse()
  832. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", env.GetAppVersion())
  833. var err error
  834. if errorReportingEnabled {
  835. err = sentry.Init(sentry.ClientOptions{Release: env.GetAppVersion()})
  836. if err != nil {
  837. klog.Infof("Failed to initialize sentry for error reporting")
  838. } else {
  839. err = errors.SetPanicHandler(handlePanic)
  840. if err != nil {
  841. klog.Infof("Failed to set panic handler: %s", err)
  842. }
  843. }
  844. }
  845. address := env.GetPrometheusServerEndpoint()
  846. if address == "" {
  847. klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  848. }
  849. queryConcurrency := env.GetMaxQueryConcurrency()
  850. klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  851. timeout := 120 * time.Second
  852. keepAlive := 120 * time.Second
  853. scrapeInterval, _ := time.ParseDuration("1m")
  854. promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
  855. if err != nil {
  856. klog.Fatalf("Failed to create prometheus client, Error: %v", err)
  857. }
  858. api := prometheusAPI.NewAPI(promCli)
  859. pcfg, err := api.Config(context.Background())
  860. if err != nil {
  861. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
  862. } else {
  863. klog.V(1).Info("Retrieved a prometheus config file from: " + address)
  864. sc, err := GetPrometheusConfig(pcfg.YAML)
  865. if err != nil {
  866. klog.Infof("Fix YAML error %s", err)
  867. }
  868. for _, scrapeconfig := range sc.ScrapeConfigs {
  869. if scrapeconfig.JobName == GetKubecostJobName() {
  870. if scrapeconfig.ScrapeInterval != "" {
  871. si := scrapeconfig.ScrapeInterval
  872. sid, err := time.ParseDuration(si)
  873. if err != nil {
  874. klog.Infof("error parseing scrapeConfig for %s", scrapeconfig.JobName)
  875. } else {
  876. klog.Infof("Found Kubecost job scrape interval of: %s", si)
  877. scrapeInterval = sid
  878. }
  879. }
  880. }
  881. }
  882. }
  883. klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  884. m, err := prom.Validate(promCli)
  885. if err != nil || m.Running == false {
  886. if err != nil {
  887. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  888. } else if m.Running == false {
  889. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
  890. }
  891. api := prometheusAPI.NewAPI(promCli)
  892. _, err = api.Config(context.Background())
  893. if err != nil {
  894. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
  895. } else {
  896. klog.V(1).Info("Retrieved a prometheus config file from: " + address)
  897. }
  898. } else {
  899. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  900. }
  901. // Kubernetes API setup
  902. var kc *rest.Config
  903. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  904. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  905. } else {
  906. kc, err = rest.InClusterConfig()
  907. }
  908. if err != nil {
  909. panic(err.Error())
  910. }
  911. kubeClientset, err := kubernetes.NewForConfig(kc)
  912. if err != nil {
  913. panic(err.Error())
  914. }
  915. // Create Kubernetes Cluster Cache + Watchers
  916. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  917. k8sCache.Run()
  918. cloudProviderKey := env.GetCloudProviderAPIKey()
  919. cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey)
  920. if err != nil {
  921. panic(err.Error())
  922. }
  923. watchConfigFunc := func(c interface{}) {
  924. conf := c.(*v1.ConfigMap)
  925. if conf.GetName() == env.GetPricingConfigmapName() {
  926. _, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
  927. if err != nil {
  928. klog.Infof("ERROR UPDATING %s CONFIG: %s", "pricing-configs", err.Error())
  929. }
  930. }
  931. for _, cw := range additionalConfigWatchers {
  932. if conf.GetName() == cw.ConfigmapName {
  933. err := cw.WatchFunc(conf.GetName(), conf.Data)
  934. if err != nil {
  935. klog.Infof("ERROR UPDATING %s CONFIG: %s", cw.ConfigmapName, err.Error())
  936. }
  937. }
  938. }
  939. }
  940. kubecostNamespace := env.GetKubecostNamespace()
  941. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  942. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), env.GetPricingConfigmapName(), metav1.GetOptions{})
  943. if err != nil {
  944. klog.Infof("No %s configmap found at installtime, using existing configs: %s", env.GetPricingConfigmapName(), err.Error())
  945. } else {
  946. watchConfigFunc(configs)
  947. }
  948. for _, cw := range additionalConfigWatchers {
  949. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw.ConfigmapName, metav1.GetOptions{})
  950. if err != nil {
  951. klog.Infof("No %s configmap found at installtime, using existing configs: %s", cw.ConfigmapName, err.Error())
  952. } else {
  953. klog.Infof("Found configmap %s, watching...", configs.Name)
  954. watchConfigFunc(configs)
  955. }
  956. }
  957. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  958. // TODO: General Architecture Note: Several passes have been made to modularize a lot of
  959. // TODO: our code, but the router still continues to be the obvious entry point for new \
  960. // TODO: features. We should look to split out the actual "router" functionality and
  961. // TODO: implement a builder -> controller for stitching new features and other dependencies.
  962. clusterManager := newClusterManager()
  963. // Initialize metrics here
  964. remoteEnabled := env.IsRemoteEnabled()
  965. if remoteEnabled {
  966. info, err := cloudProvider.ClusterInfo()
  967. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  968. if err != nil {
  969. klog.Infof("Error saving cluster id %s", err.Error())
  970. }
  971. _, _, err = cloud.GetOrCreateClusterMeta(info["id"], info["name"])
  972. if err != nil {
  973. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  974. }
  975. }
  976. // Thanos Client
  977. var thanosClient prometheusClient.Client
  978. if thanos.IsEnabled() {
  979. thanosAddress := thanos.QueryURL()
  980. if thanosAddress != "" {
  981. thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
  982. _, err = prom.Validate(thanosCli)
  983. if err != nil {
  984. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  985. thanosClient = thanosCli
  986. } else {
  987. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  988. thanosClient = thanosCli
  989. }
  990. } else {
  991. klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  992. }
  993. }
  994. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  995. var clusterMap clusters.ClusterMap
  996. localCIProvider := NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  997. if thanosClient != nil {
  998. clusterMap = clusters.NewClusterMap(thanosClient, localCIProvider, 10*time.Minute)
  999. } else {
  1000. clusterMap = clusters.NewClusterMap(promCli, localCIProvider, 5*time.Minute)
  1001. }
  1002. // cache responses from model and aggregation for a default of 10 minutes;
  1003. // clear expired responses every 20 minutes
  1004. aggregateCache := cache.New(time.Minute*10, time.Minute*20)
  1005. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  1006. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1007. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  1008. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1009. // query durations that should be cached longer should be registered here
  1010. // use relatively prime numbers to minimize likelihood of synchronized
  1011. // attempts at cache warming
  1012. day := 24 * time.Hour
  1013. cacheExpiration := map[time.Duration]time.Duration{
  1014. day: maxCacheMinutes1d * time.Minute,
  1015. 2 * day: maxCacheMinutes2d * time.Minute,
  1016. 7 * day: maxCacheMinutes7d * time.Minute,
  1017. 30 * day: maxCacheMinutes30d * time.Minute,
  1018. }
  1019. var pc prometheus.Client
  1020. if thanosClient != nil {
  1021. pc = thanosClient
  1022. } else {
  1023. pc = promCli
  1024. }
  1025. costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
  1026. metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
  1027. a := &Accesses{
  1028. Router: httprouter.New(),
  1029. PrometheusClient: promCli,
  1030. ThanosClient: thanosClient,
  1031. KubeClientSet: kubeClientset,
  1032. ClusterManager: clusterManager,
  1033. ClusterMap: clusterMap,
  1034. CloudProvider: cloudProvider,
  1035. Model: costModel,
  1036. MetricsEmitter: metricsEmitter,
  1037. AggregateCache: aggregateCache,
  1038. CostDataCache: costDataCache,
  1039. ClusterCostsCache: clusterCostsCache,
  1040. OutOfClusterCache: outOfClusterCache,
  1041. SettingsCache: settingsCache,
  1042. CacheExpiration: cacheExpiration,
  1043. }
  1044. // Use the Accesses instance, itself, as the CostModelAggregator. This is
  1045. // confusing and unconventional, but necessary so that we can swap it
  1046. // out for the ETL-adapted version elsewhere.
  1047. // TODO clean this up once ETL is open-sourced.
  1048. a.AggAPI = a
  1049. // Initialize mechanism for subscribing to settings changes
  1050. a.InitializeSettingsPubSub()
  1051. err = a.CloudProvider.DownloadPricingData()
  1052. if err != nil {
  1053. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  1054. }
  1055. // Warm the aggregate cache unless explicitly set to false
  1056. if env.IsCacheWarmingEnabled() {
  1057. log.Infof("Init: AggregateCostModel cache warming enabled")
  1058. a.warmAggregateCostModelCache()
  1059. } else {
  1060. log.Infof("Init: AggregateCostModel cache warming disabled")
  1061. }
  1062. a.MetricsEmitter.Start()
  1063. managerEndpoints := cm.NewClusterManagerEndpoints(a.ClusterManager)
  1064. a.Router.GET("/costDataModel", a.CostDataModel)
  1065. a.Router.GET("/costDataModelRange", a.CostDataModelRange)
  1066. a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
  1067. a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
  1068. a.Router.GET("/outOfClusterCosts", a.OutOfClusterCostsWithCache)
  1069. a.Router.GET("/allNodePricing", a.GetAllNodePricing)
  1070. a.Router.POST("/refreshPricing", a.RefreshPricingData)
  1071. a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  1072. a.Router.GET("/clusterCosts", a.ClusterCosts)
  1073. a.Router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  1074. a.Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  1075. a.Router.GET("/managementPlatform", a.ManagementPlatform)
  1076. a.Router.GET("/clusterInfo", a.ClusterInfo)
  1077. a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  1078. a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  1079. a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  1080. a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  1081. // prom query proxies
  1082. a.Router.GET("/prometheusQuery", a.PrometheusQuery)
  1083. a.Router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
  1084. a.Router.GET("/thanosQuery", a.ThanosQuery)
  1085. a.Router.GET("/thanosQueryRange", a.ThanosQueryRange)
  1086. // diagnostics
  1087. a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
  1088. a.Router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
  1089. // cluster manager endpoints
  1090. a.Router.GET("/clusters", managerEndpoints.GetAllClusters)
  1091. a.Router.PUT("/clusters", managerEndpoints.PutCluster)
  1092. a.Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
  1093. return a
  1094. }