router.go 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/kubecost/cost-model/pkg/config"
  14. "github.com/kubecost/cost-model/pkg/metrics"
  15. "github.com/kubecost/cost-model/pkg/services"
  16. "github.com/kubecost/cost-model/pkg/util/httputil"
  17. "github.com/kubecost/cost-model/pkg/util/timeutil"
  18. "github.com/kubecost/cost-model/pkg/util/watcher"
  19. "github.com/microcosm-cc/bluemonday"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/klog"
  22. "github.com/julienschmidt/httprouter"
  23. sentry "github.com/getsentry/sentry-go"
  24. "github.com/kubecost/cost-model/pkg/cloud"
  25. "github.com/kubecost/cost-model/pkg/clustercache"
  26. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  27. "github.com/kubecost/cost-model/pkg/env"
  28. "github.com/kubecost/cost-model/pkg/errors"
  29. "github.com/kubecost/cost-model/pkg/kubecost"
  30. "github.com/kubecost/cost-model/pkg/log"
  31. "github.com/kubecost/cost-model/pkg/prom"
  32. "github.com/kubecost/cost-model/pkg/thanos"
  33. "github.com/kubecost/cost-model/pkg/util/json"
  34. prometheus "github.com/prometheus/client_golang/api"
  35. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  36. appsv1 "k8s.io/api/apps/v1"
  37. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  38. "github.com/patrickmn/go-cache"
  39. "k8s.io/client-go/kubernetes"
  40. "k8s.io/client-go/rest"
  41. "k8s.io/client-go/tools/clientcmd"
  42. )
  43. var sanitizePolicy = bluemonday.UGCPolicy()
  44. const (
  45. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  46. maxCacheMinutes1d = 11
  47. maxCacheMinutes2d = 17
  48. maxCacheMinutes7d = 37
  49. maxCacheMinutes30d = 137
  50. CustomPricingSetting = "CustomPricing"
  51. DiscountSetting = "Discount"
  52. epRules = apiPrefix + "/rules"
  53. LogSeparator = "+-------------------------------------------------------------------------------------"
  54. )
  55. var (
  56. // gitCommit is set by the build system
  57. gitCommit string
  58. )
  59. // Accesses defines a singleton application instance, providing access to
  60. // Prometheus, Kubernetes, the cloud provider, and caches.
  61. type Accesses struct {
  62. Router *httprouter.Router
  63. PrometheusClient prometheus.Client
  64. ThanosClient prometheus.Client
  65. KubeClientSet kubernetes.Interface
  66. ClusterCache clustercache.ClusterCache
  67. ClusterMap clusters.ClusterMap
  68. CloudProvider cloud.Provider
  69. ConfigFileManager *config.ConfigFileManager
  70. ClusterInfoProvider clusters.ClusterInfoProvider
  71. Model *CostModel
  72. MetricsEmitter *CostModelMetricsEmitter
  73. OutOfClusterCache *cache.Cache
  74. AggregateCache *cache.Cache
  75. CostDataCache *cache.Cache
  76. ClusterCostsCache *cache.Cache
  77. CacheExpiration map[time.Duration]time.Duration
  78. AggAPI Aggregator
  79. // SettingsCache stores current state of app settings
  80. SettingsCache *cache.Cache
  81. // settingsSubscribers tracks channels through which changes to different
  82. // settings will be published in a pub/sub model
  83. settingsSubscribers map[string][]chan string
  84. settingsMutex sync.Mutex
  85. // registered http service instances
  86. httpServices services.HTTPServices
  87. }
  88. // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
  89. // should be used.
  90. func (a *Accesses) GetPrometheusClient(remote bool) prometheus.Client {
  91. // Use Thanos Client if it exists (enabled) and remote flag set
  92. var pc prometheus.Client
  93. if remote && a.ThanosClient != nil {
  94. pc = a.ThanosClient
  95. } else {
  96. pc = a.PrometheusClient
  97. }
  98. return pc
  99. }
  100. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  101. // If one does not exists, it returns the default cache expiration, which is defined by
  102. // the particular cache.
  103. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  104. if expiration, ok := a.CacheExpiration[dur]; ok {
  105. return expiration
  106. }
  107. return cache.DefaultExpiration
  108. }
  109. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  110. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  111. // not found or is less than 2 minutes.
  112. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  113. expiry := a.GetCacheExpiration(dur).Minutes()
  114. if expiry <= 2.0 {
  115. return time.Minute
  116. }
  117. mins := time.Duration(expiry/2.0) * time.Minute
  118. return mins
  119. }
  120. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  121. w.Header().Set("Content-Type", "application/json")
  122. duration := 24 * time.Hour
  123. offset := time.Minute
  124. durationHrs := "24h"
  125. fmtOffset := "1m"
  126. pClient := a.GetPrometheusClient(true)
  127. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  128. if data, valid := a.ClusterCostsCache.Get(key); valid {
  129. clusterCosts := data.(map[string]*ClusterCosts)
  130. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  131. } else {
  132. data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
  133. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  134. }
  135. }
  136. type Response struct {
  137. Code int `json:"code"`
  138. Status string `json:"status"`
  139. Data interface{} `json:"data"`
  140. Message string `json:"message,omitempty"`
  141. Warning string `json:"warning,omitempty"`
  142. }
  143. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  144. type FilterFunc func(*CostData) (bool, string)
  145. // FilterCostData allows through only CostData that matches all the given filter functions
  146. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  147. result := make(map[string]*CostData)
  148. filteredEnvironments := make(map[string]int)
  149. filteredContainers := 0
  150. DataLoop:
  151. for key, datum := range data {
  152. for _, rf := range retains {
  153. if ok, _ := rf(datum); ok {
  154. result[key] = datum
  155. // if any retain function passes, the data is retained and move on
  156. continue DataLoop
  157. }
  158. }
  159. for _, ff := range filters {
  160. if ok, environment := ff(datum); !ok {
  161. if environment != "" {
  162. filteredEnvironments[environment]++
  163. }
  164. filteredContainers++
  165. // if any filter function check fails, move on to the next datum
  166. continue DataLoop
  167. }
  168. }
  169. result[key] = datum
  170. }
  171. return result, filteredContainers, filteredEnvironments
  172. }
  173. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  174. fs := strings.Split(fields, ",")
  175. fmap := make(map[string]bool)
  176. for _, f := range fs {
  177. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  178. klog.V(1).Infof("to delete: %s", fieldNameLower)
  179. fmap[fieldNameLower] = true
  180. }
  181. filteredData := make(map[string]CostData)
  182. for cname, costdata := range data {
  183. s := reflect.TypeOf(*costdata)
  184. val := reflect.ValueOf(*costdata)
  185. costdata2 := CostData{}
  186. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  187. n := s.NumField()
  188. for i := 0; i < n; i++ {
  189. field := s.Field(i)
  190. value := val.Field(i)
  191. value2 := cd2.Field(i)
  192. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  193. value2.Set(reflect.Value(value))
  194. }
  195. }
  196. filteredData[cname] = cd2.Interface().(CostData)
  197. }
  198. return filteredData
  199. }
  200. func normalizeTimeParam(param string) (string, error) {
  201. if param == "" {
  202. return "", fmt.Errorf("invalid time param")
  203. }
  204. // convert days to hours
  205. if param[len(param)-1:] == "d" {
  206. count := param[:len(param)-1]
  207. val, err := strconv.ParseInt(count, 10, 64)
  208. if err != nil {
  209. return "", err
  210. }
  211. val = val * 24
  212. param = fmt.Sprintf("%dh", val)
  213. }
  214. return param, nil
  215. }
  216. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  217. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  218. // return 0.0.
  219. func ParsePercentString(percentStr string) (float64, error) {
  220. if len(percentStr) == 0 {
  221. return 0.0, nil
  222. }
  223. if percentStr[len(percentStr)-1:] == "%" {
  224. percentStr = percentStr[:len(percentStr)-1]
  225. }
  226. discount, err := strconv.ParseFloat(percentStr, 64)
  227. if err != nil {
  228. return 0.0, err
  229. }
  230. discount *= 0.01
  231. return discount, nil
  232. }
  233. func WrapData(data interface{}, err error) []byte {
  234. var resp []byte
  235. if err != nil {
  236. klog.V(1).Infof("Error returned to client: %s", err.Error())
  237. resp, _ = json.Marshal(&Response{
  238. Code: http.StatusInternalServerError,
  239. Status: "error",
  240. Message: err.Error(),
  241. Data: data,
  242. })
  243. } else {
  244. resp, _ = json.Marshal(&Response{
  245. Code: http.StatusOK,
  246. Status: "success",
  247. Data: data,
  248. })
  249. }
  250. return resp
  251. }
  252. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  253. var resp []byte
  254. if err != nil {
  255. klog.V(1).Infof("Error returned to client: %s", err.Error())
  256. resp, _ = json.Marshal(&Response{
  257. Code: http.StatusInternalServerError,
  258. Status: "error",
  259. Message: err.Error(),
  260. Data: data,
  261. })
  262. } else {
  263. resp, _ = json.Marshal(&Response{
  264. Code: http.StatusOK,
  265. Status: "success",
  266. Data: data,
  267. Message: message,
  268. })
  269. }
  270. return resp
  271. }
  272. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  273. var resp []byte
  274. if err != nil {
  275. klog.V(1).Infof("Error returned to client: %s", err.Error())
  276. resp, _ = json.Marshal(&Response{
  277. Code: http.StatusInternalServerError,
  278. Status: "error",
  279. Message: err.Error(),
  280. Warning: warning,
  281. Data: data,
  282. })
  283. } else {
  284. resp, _ = json.Marshal(&Response{
  285. Code: http.StatusOK,
  286. Status: "success",
  287. Data: data,
  288. Warning: warning,
  289. })
  290. }
  291. return resp
  292. }
  293. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  294. var resp []byte
  295. if err != nil {
  296. klog.V(1).Infof("Error returned to client: %s", err.Error())
  297. resp, _ = json.Marshal(&Response{
  298. Code: http.StatusInternalServerError,
  299. Status: "error",
  300. Message: err.Error(),
  301. Warning: warning,
  302. Data: data,
  303. })
  304. } else {
  305. resp, _ = json.Marshal(&Response{
  306. Code: http.StatusOK,
  307. Status: "success",
  308. Data: data,
  309. Message: message,
  310. Warning: warning,
  311. })
  312. }
  313. return resp
  314. }
  315. // wrapAsObjectItems wraps a slice of items into an object containing a single items list
  316. // allows our k8s proxy methods to emulate a List() request to k8s API
  317. func wrapAsObjectItems(items interface{}) map[string]interface{} {
  318. return map[string]interface{}{
  319. "items": items,
  320. }
  321. }
  322. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  323. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  324. w.Header().Set("Content-Type", "application/json")
  325. w.Header().Set("Access-Control-Allow-Origin", "*")
  326. err := a.CloudProvider.DownloadPricingData()
  327. if err != nil {
  328. klog.V(1).Infof("Error refreshing pricing data: %s", err.Error())
  329. }
  330. w.Write(WrapData(nil, err))
  331. }
  332. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  333. w.Header().Set("Content-Type", "application/json")
  334. w.Header().Set("Access-Control-Allow-Origin", "*")
  335. window := r.URL.Query().Get("timeWindow")
  336. offset := r.URL.Query().Get("offset")
  337. fields := r.URL.Query().Get("filterFields")
  338. namespace := r.URL.Query().Get("namespace")
  339. if offset != "" {
  340. offset = "offset " + offset
  341. }
  342. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
  343. if fields != "" {
  344. filteredData := filterFields(fields, data)
  345. w.Write(WrapData(filteredData, err))
  346. } else {
  347. w.Write(WrapData(data, err))
  348. }
  349. }
  350. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  351. w.Header().Set("Content-Type", "application/json")
  352. w.Header().Set("Access-Control-Allow-Origin", "*")
  353. window := r.URL.Query().Get("window")
  354. offset := r.URL.Query().Get("offset")
  355. if window == "" {
  356. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  357. return
  358. }
  359. windowDur, err := timeutil.ParseDuration(window)
  360. if err != nil {
  361. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  362. return
  363. }
  364. // offset is not a required parameter
  365. var offsetDur time.Duration
  366. if offset != "" {
  367. offsetDur, err = timeutil.ParseDuration(offset)
  368. if err != nil {
  369. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  370. return
  371. }
  372. }
  373. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  374. if useThanos && !thanos.IsEnabled() {
  375. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  376. return
  377. }
  378. var client prometheus.Client
  379. if useThanos {
  380. client = a.ThanosClient
  381. offsetDur = thanos.OffsetDuration()
  382. } else {
  383. client = a.PrometheusClient
  384. }
  385. data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
  386. w.Write(WrapData(data, err))
  387. }
  388. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  389. w.Header().Set("Content-Type", "application/json")
  390. w.Header().Set("Access-Control-Allow-Origin", "*")
  391. start := r.URL.Query().Get("start")
  392. end := r.URL.Query().Get("end")
  393. window := r.URL.Query().Get("window")
  394. offset := r.URL.Query().Get("offset")
  395. if window == "" {
  396. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  397. return
  398. }
  399. windowDur, err := timeutil.ParseDuration(window)
  400. if err != nil {
  401. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  402. return
  403. }
  404. // offset is not a required parameter
  405. var offsetDur time.Duration
  406. if offset != "" {
  407. offsetDur, err = timeutil.ParseDuration(offset)
  408. if err != nil {
  409. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  410. return
  411. }
  412. }
  413. data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
  414. w.Write(WrapData(data, err))
  415. }
  416. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  417. w.Header().Set("Content-Type", "application/json")
  418. w.Header().Set("Access-Control-Allow-Origin", "*")
  419. startStr := r.URL.Query().Get("start")
  420. endStr := r.URL.Query().Get("end")
  421. windowStr := r.URL.Query().Get("window")
  422. fields := r.URL.Query().Get("filterFields")
  423. namespace := r.URL.Query().Get("namespace")
  424. cluster := r.URL.Query().Get("cluster")
  425. remote := r.URL.Query().Get("remote")
  426. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  427. layout := "2006-01-02T15:04:05.000Z"
  428. start, err := time.Parse(layout, startStr)
  429. if err != nil {
  430. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
  431. return
  432. }
  433. end, err := time.Parse(layout, endStr)
  434. if err != nil {
  435. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
  436. return
  437. }
  438. window := kubecost.NewWindow(&start, &end)
  439. if window.IsOpen() || window.IsEmpty() || window.IsNegative() {
  440. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
  441. return
  442. }
  443. resolution := time.Hour
  444. if resDur, err := time.ParseDuration(windowStr); err == nil {
  445. resolution = resDur
  446. }
  447. // Use Thanos Client if it exists (enabled) and remote flag set
  448. var pClient prometheus.Client
  449. if remote != "false" && a.ThanosClient != nil {
  450. pClient = a.ThanosClient
  451. } else {
  452. pClient = a.PrometheusClient
  453. }
  454. data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
  455. if err != nil {
  456. w.Write(WrapData(nil, err))
  457. }
  458. if fields != "" {
  459. filteredData := filterFields(fields, data)
  460. w.Write(WrapData(filteredData, err))
  461. } else {
  462. w.Write(WrapData(data, err))
  463. }
  464. }
  465. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  466. var key string
  467. var filter string
  468. var val []string
  469. if customAggregation != "" {
  470. key = customAggregation
  471. filter = filterType
  472. val = strings.Split(customAggregation, ",")
  473. } else {
  474. aggregations := strings.Split(aggregator, ",")
  475. for i, agg := range aggregations {
  476. aggregations[i] = "kubernetes_" + agg
  477. }
  478. key = strings.Join(aggregations, ",")
  479. filter = "kubernetes_" + filterType
  480. val = aggregations
  481. }
  482. return key, val, filter
  483. }
  484. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  485. w.Header().Set("Content-Type", "application/json")
  486. w.Header().Set("Access-Control-Allow-Origin", "*")
  487. data, err := a.CloudProvider.AllNodePricing()
  488. w.Write(WrapData(data, err))
  489. }
  490. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  491. w.Header().Set("Content-Type", "application/json")
  492. w.Header().Set("Access-Control-Allow-Origin", "*")
  493. data, err := a.CloudProvider.GetConfig()
  494. w.Write(WrapData(data, err))
  495. }
  496. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  497. w.Header().Set("Content-Type", "application/json")
  498. w.Header().Set("Access-Control-Allow-Origin", "*")
  499. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.SpotInfoUpdateType)
  500. if err != nil {
  501. w.Write(WrapData(data, err))
  502. return
  503. }
  504. w.Write(WrapData(data, err))
  505. err = a.CloudProvider.DownloadPricingData()
  506. if err != nil {
  507. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  508. }
  509. return
  510. }
  511. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  512. w.Header().Set("Content-Type", "application/json")
  513. w.Header().Set("Access-Control-Allow-Origin", "*")
  514. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.AthenaInfoUpdateType)
  515. if err != nil {
  516. w.Write(WrapData(data, err))
  517. return
  518. }
  519. w.Write(WrapData(data, err))
  520. return
  521. }
  522. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  523. w.Header().Set("Content-Type", "application/json")
  524. w.Header().Set("Access-Control-Allow-Origin", "*")
  525. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.BigqueryUpdateType)
  526. if err != nil {
  527. w.Write(WrapData(data, err))
  528. return
  529. }
  530. w.Write(WrapData(data, err))
  531. return
  532. }
  533. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  534. w.Header().Set("Content-Type", "application/json")
  535. w.Header().Set("Access-Control-Allow-Origin", "*")
  536. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  537. if err != nil {
  538. w.Write(WrapData(data, err))
  539. return
  540. }
  541. w.Write(WrapData(data, err))
  542. return
  543. }
  544. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  545. w.Header().Set("Content-Type", "application/json")
  546. w.Header().Set("Access-Control-Allow-Origin", "*")
  547. data, err := a.CloudProvider.GetManagementPlatform()
  548. if err != nil {
  549. w.Write(WrapData(data, err))
  550. return
  551. }
  552. w.Write(WrapData(data, err))
  553. return
  554. }
  555. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  556. w.Header().Set("Content-Type", "application/json")
  557. w.Header().Set("Access-Control-Allow-Origin", "*")
  558. data := a.ClusterInfoProvider.GetClusterInfo()
  559. w.Write(WrapData(data, nil))
  560. }
  561. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  562. w.Header().Set("Content-Type", "application/json")
  563. w.Header().Set("Access-Control-Allow-Origin", "*")
  564. data := a.ClusterMap.AsMap()
  565. w.Write(WrapData(data, nil))
  566. }
  567. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  568. w.Header().Set("Content-Type", "application/json")
  569. w.Header().Set("Access-Control-Allow-Origin", "*")
  570. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  571. }
  572. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  573. w.Header().Set("Content-Type", "application/json")
  574. w.Header().Set("Access-Control-Allow-Origin", "*")
  575. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  576. }
  577. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  578. w.Header().Set("Content-Type", "application/json")
  579. w.Header().Set("Access-Control-Allow-Origin", "*")
  580. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  581. }
  582. func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  583. w.Header().Set("Content-Type", "application/json")
  584. w.Header().Set("Access-Control-Allow-Origin", "*")
  585. w.Write(WrapData(prom.Validate(a.PrometheusClient)))
  586. }
  587. func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  588. w.Header().Set("Content-Type", "application/json")
  589. w.Header().Set("Access-Control-Allow-Origin", "*")
  590. qp := httputil.NewQueryParams(r.URL.Query())
  591. query := qp.Get("query", "")
  592. if query == "" {
  593. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  594. return
  595. }
  596. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  597. body, err := ctx.RawQuery(query)
  598. if err != nil {
  599. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  600. return
  601. }
  602. w.Write(body)
  603. }
  604. func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  605. w.Header().Set("Content-Type", "application/json")
  606. w.Header().Set("Access-Control-Allow-Origin", "*")
  607. qp := httputil.NewQueryParams(r.URL.Query())
  608. query := qp.Get("query", "")
  609. if query == "" {
  610. fmt.Fprintf(w, "Error parsing query from request parameters.")
  611. return
  612. }
  613. start, end, duration, err := toStartEndStep(qp)
  614. if err != nil {
  615. fmt.Fprintf(w, err.Error())
  616. return
  617. }
  618. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  619. body, err := ctx.RawQueryRange(query, start, end, duration)
  620. if err != nil {
  621. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  622. return
  623. }
  624. w.Write(body)
  625. }
  626. func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  627. w.Header().Set("Content-Type", "application/json")
  628. w.Header().Set("Access-Control-Allow-Origin", "*")
  629. if !thanos.IsEnabled() {
  630. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  631. return
  632. }
  633. qp := httputil.NewQueryParams(r.URL.Query())
  634. query := qp.Get("query", "")
  635. if query == "" {
  636. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  637. return
  638. }
  639. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  640. body, err := ctx.RawQuery(query)
  641. if err != nil {
  642. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  643. return
  644. }
  645. w.Write(body)
  646. }
  647. func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  648. w.Header().Set("Content-Type", "application/json")
  649. w.Header().Set("Access-Control-Allow-Origin", "*")
  650. if !thanos.IsEnabled() {
  651. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  652. return
  653. }
  654. qp := httputil.NewQueryParams(r.URL.Query())
  655. query := qp.Get("query", "")
  656. if query == "" {
  657. fmt.Fprintf(w, "Error parsing query from request parameters.")
  658. return
  659. }
  660. start, end, duration, err := toStartEndStep(qp)
  661. if err != nil {
  662. fmt.Fprintf(w, err.Error())
  663. return
  664. }
  665. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  666. body, err := ctx.RawQueryRange(query, start, end, duration)
  667. if err != nil {
  668. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  669. return
  670. }
  671. w.Write(body)
  672. }
  673. // helper for query range proxy requests
  674. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  675. var e error
  676. ss := qp.Get("start", "")
  677. es := qp.Get("end", "")
  678. ds := qp.Get("duration", "")
  679. layout := "2006-01-02T15:04:05.000Z"
  680. start, e = time.Parse(layout, ss)
  681. if e != nil {
  682. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  683. return
  684. }
  685. end, e = time.Parse(layout, es)
  686. if e != nil {
  687. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  688. return
  689. }
  690. step, e = time.ParseDuration(ds)
  691. if e != nil {
  692. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  693. return
  694. }
  695. err = nil
  696. return
  697. }
  698. func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  699. w.Header().Set("Content-Type", "application/json")
  700. w.Header().Set("Access-Control-Allow-Origin", "*")
  701. promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
  702. if err != nil {
  703. w.Write(WrapData(nil, err))
  704. return
  705. }
  706. result := map[string]*prom.PrometheusQueueState{
  707. "prometheus": promQueueState,
  708. }
  709. if thanos.IsEnabled() {
  710. thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
  711. if err != nil {
  712. log.Warningf("Error getting Thanos queue state: %s", err)
  713. } else {
  714. result["thanos"] = thanosQueueState
  715. }
  716. }
  717. w.Write(WrapData(result, nil))
  718. }
  719. // GetPrometheusMetrics retrieves availability of Prometheus and Thanos metrics
  720. func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  721. w.Header().Set("Content-Type", "application/json")
  722. w.Header().Set("Access-Control-Allow-Origin", "*")
  723. promMetrics, err := prom.GetPrometheusMetrics(a.PrometheusClient, "")
  724. if err != nil {
  725. w.Write(WrapData(nil, err))
  726. return
  727. }
  728. result := map[string][]*prom.PrometheusDiagnostic{
  729. "prometheus": promMetrics,
  730. }
  731. if thanos.IsEnabled() {
  732. thanosMetrics, err := prom.GetPrometheusMetrics(a.ThanosClient, thanos.QueryOffset())
  733. if err != nil {
  734. log.Warningf("Error getting Thanos queue state: %s", err)
  735. } else {
  736. result["thanos"] = thanosMetrics
  737. }
  738. }
  739. w.Write(WrapData(result, nil))
  740. }
  741. func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  742. w.Header().Set("Content-Type", "application/json")
  743. w.Header().Set("Access-Control-Allow-Origin", "*")
  744. pvList := a.ClusterCache.GetAllPersistentVolumes()
  745. body, err := json.Marshal(wrapAsObjectItems(pvList))
  746. if err != nil {
  747. fmt.Fprintf(w, "Error decoding persistent volumes: "+err.Error())
  748. } else {
  749. w.Write(body)
  750. }
  751. }
  752. func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  753. w.Header().Set("Content-Type", "application/json")
  754. w.Header().Set("Access-Control-Allow-Origin", "*")
  755. qp := httputil.NewQueryParams(r.URL.Query())
  756. namespace := qp.Get("namespace", "")
  757. deploymentsList := a.ClusterCache.GetAllDeployments()
  758. // filter for provided namespace
  759. var deployments []*appsv1.Deployment
  760. if namespace == "" {
  761. deployments = deploymentsList
  762. } else {
  763. deployments = []*appsv1.Deployment{}
  764. for _, d := range deploymentsList {
  765. if d.Namespace == namespace {
  766. deployments = append(deployments, d)
  767. }
  768. }
  769. }
  770. body, err := json.Marshal(wrapAsObjectItems(deployments))
  771. if err != nil {
  772. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  773. } else {
  774. w.Write(body)
  775. }
  776. }
  777. func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  778. w.Header().Set("Content-Type", "application/json")
  779. w.Header().Set("Access-Control-Allow-Origin", "*")
  780. scList := a.ClusterCache.GetAllStorageClasses()
  781. body, err := json.Marshal(wrapAsObjectItems(scList))
  782. if err != nil {
  783. fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
  784. } else {
  785. w.Write(body)
  786. }
  787. }
  788. func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  789. w.Header().Set("Content-Type", "application/json")
  790. w.Header().Set("Access-Control-Allow-Origin", "*")
  791. qp := httputil.NewQueryParams(r.URL.Query())
  792. namespace := qp.Get("namespace", "")
  793. statefulSetsList := a.ClusterCache.GetAllStatefulSets()
  794. // filter for provided namespace
  795. var statefulSets []*appsv1.StatefulSet
  796. if namespace == "" {
  797. statefulSets = statefulSetsList
  798. } else {
  799. statefulSets = []*appsv1.StatefulSet{}
  800. for _, ss := range statefulSetsList {
  801. if ss.Namespace == namespace {
  802. statefulSets = append(statefulSets, ss)
  803. }
  804. }
  805. }
  806. body, err := json.Marshal(wrapAsObjectItems(statefulSets))
  807. if err != nil {
  808. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  809. } else {
  810. w.Write(body)
  811. }
  812. }
  813. func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  814. w.Header().Set("Content-Type", "application/json")
  815. w.Header().Set("Access-Control-Allow-Origin", "*")
  816. nodeList := a.ClusterCache.GetAllNodes()
  817. body, err := json.Marshal(wrapAsObjectItems(nodeList))
  818. if err != nil {
  819. fmt.Fprintf(w, "Error decoding nodes: "+err.Error())
  820. } else {
  821. w.Write(body)
  822. }
  823. }
  824. func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  825. w.Header().Set("Content-Type", "application/json")
  826. w.Header().Set("Access-Control-Allow-Origin", "*")
  827. podlist := a.ClusterCache.GetAllPods()
  828. body, err := json.Marshal(wrapAsObjectItems(podlist))
  829. if err != nil {
  830. fmt.Fprintf(w, "Error decoding pods: "+err.Error())
  831. } else {
  832. w.Write(body)
  833. }
  834. }
  835. func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  836. w.Header().Set("Content-Type", "application/json")
  837. w.Header().Set("Access-Control-Allow-Origin", "*")
  838. namespaces := a.ClusterCache.GetAllNamespaces()
  839. body, err := json.Marshal(wrapAsObjectItems(namespaces))
  840. if err != nil {
  841. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  842. } else {
  843. w.Write(body)
  844. }
  845. }
  846. func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  847. w.Header().Set("Content-Type", "application/json")
  848. w.Header().Set("Access-Control-Allow-Origin", "*")
  849. daemonSets := a.ClusterCache.GetAllDaemonSets()
  850. body, err := json.Marshal(wrapAsObjectItems(daemonSets))
  851. if err != nil {
  852. fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
  853. } else {
  854. w.Write(body)
  855. }
  856. }
  857. func (a *Accesses) GetPod(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  858. w.Header().Set("Content-Type", "application/json")
  859. w.Header().Set("Access-Control-Allow-Origin", "*")
  860. podName := ps.ByName("name")
  861. podNamespace := ps.ByName("namespace")
  862. // TODO: ClusterCache API could probably afford to have some better filtering
  863. allPods := a.ClusterCache.GetAllPods()
  864. for _, pod := range allPods {
  865. for _, container := range pod.Spec.Containers {
  866. container.Env = make([]v1.EnvVar, 0)
  867. }
  868. if pod.Namespace == podNamespace && pod.Name == podName {
  869. body, err := json.Marshal(pod)
  870. if err != nil {
  871. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  872. } else {
  873. w.Write(body)
  874. }
  875. return
  876. }
  877. }
  878. fmt.Fprintf(w, "Pod not found\n")
  879. }
  880. func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  881. w.Header().Set("Content-Type", "application/json")
  882. w.Header().Set("Access-Control-Allow-Origin", "*")
  883. u := a.PrometheusClient.URL(epRules, nil)
  884. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  885. if err != nil {
  886. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  887. }
  888. _, body, _, err := a.PrometheusClient.Do(r.Context(), req)
  889. if err != nil {
  890. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  891. } else {
  892. w.Write(body)
  893. }
  894. }
  895. func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  896. w.Header().Set("Content-Type", "application/json")
  897. w.Header().Set("Access-Control-Allow-Origin", "*")
  898. pConfig := map[string]string{
  899. "address": env.GetPrometheusServerEndpoint(),
  900. }
  901. body, err := json.Marshal(pConfig)
  902. if err != nil {
  903. fmt.Fprintf(w, "Error marshalling prometheus config")
  904. } else {
  905. w.Write(body)
  906. }
  907. }
  908. func (a *Accesses) PrometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  909. w.Header().Set("Content-Type", "application/json")
  910. w.Header().Set("Access-Control-Allow-Origin", "*")
  911. u := a.PrometheusClient.URL(epTargets, nil)
  912. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  913. if err != nil {
  914. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  915. }
  916. _, body, _, err := a.PrometheusClient.Do(r.Context(), req)
  917. if err != nil {
  918. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  919. } else {
  920. w.Write(body)
  921. }
  922. }
  923. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  924. w.Header().Set("Content-Type", "application/json")
  925. w.Header().Set("Access-Control-Allow-Origin", "*")
  926. podlist := a.ClusterCache.GetAllPods()
  927. var lonePods []*v1.Pod
  928. for _, pod := range podlist {
  929. if len(pod.OwnerReferences) == 0 {
  930. lonePods = append(lonePods, pod)
  931. }
  932. }
  933. body, err := json.Marshal(lonePods)
  934. if err != nil {
  935. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  936. } else {
  937. w.Write(body)
  938. }
  939. }
  940. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  941. w.Header().Set("Content-Type", "application/json")
  942. w.Header().Set("Access-Control-Allow-Origin", "*")
  943. ns := env.GetKubecostNamespace()
  944. w.Write([]byte(ns))
  945. }
  946. // logsFor pulls the logs for a specific pod, namespace, and container
  947. func logsFor(c kubernetes.Interface, namespace string, pod string, container string, dur time.Duration, ctx context.Context) (string, error) {
  948. since := time.Now().UTC().Add(-dur)
  949. logOpts := v1.PodLogOptions{
  950. SinceTime: &metav1.Time{Time: since},
  951. }
  952. if container != "" {
  953. logOpts.Container = container
  954. }
  955. req := c.CoreV1().Pods(namespace).GetLogs(pod, &logOpts)
  956. reader, err := req.Stream(ctx)
  957. if err != nil {
  958. return "", err
  959. }
  960. podLogs, err := ioutil.ReadAll(reader)
  961. if err != nil {
  962. return "", err
  963. }
  964. return string(podLogs), nil
  965. }
  966. func (a *Accesses) GetPodLogs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  967. w.Header().Set("Content-Type", "application/json")
  968. w.Header().Set("Access-Control-Allow-Origin", "*")
  969. qp := httputil.NewQueryParams(r.URL.Query())
  970. ns := qp.Get("namespace", env.GetKubecostNamespace())
  971. pod := qp.Get("pod", "")
  972. selector := qp.Get("selector", "")
  973. container := qp.Get("container", "")
  974. since := qp.Get("since", "24h")
  975. sinceDuration, err := time.ParseDuration(since)
  976. if err != nil {
  977. fmt.Fprintf(w, "Invalid Duration String: "+err.Error())
  978. return
  979. }
  980. var logResult string
  981. appendLog := func(ns string, pod string, container string, l string) {
  982. if l == "" {
  983. return
  984. }
  985. logResult += fmt.Sprintf("%s\n| %s:%s:%s\n%s\n%s\n\n", LogSeparator, ns, pod, container, LogSeparator, l)
  986. }
  987. if pod != "" {
  988. pd, err := a.KubeClientSet.CoreV1().Pods(ns).Get(r.Context(), pod, metav1.GetOptions{})
  989. if err != nil {
  990. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  991. return
  992. }
  993. if container != "" {
  994. var foundContainer bool
  995. for _, cont := range pd.Spec.Containers {
  996. if strings.EqualFold(cont.Name, container) {
  997. foundContainer = true
  998. break
  999. }
  1000. }
  1001. if !foundContainer {
  1002. fmt.Fprintf(w, "Could not find container: "+container)
  1003. return
  1004. }
  1005. }
  1006. logs, err := logsFor(a.KubeClientSet, ns, pod, container, sinceDuration, r.Context())
  1007. if err != nil {
  1008. fmt.Fprintf(w, "Error Getting Logs: "+err.Error())
  1009. return
  1010. }
  1011. appendLog(ns, pod, container, logs)
  1012. w.Write([]byte(logResult))
  1013. return
  1014. }
  1015. if selector != "" {
  1016. pods, err := a.KubeClientSet.CoreV1().Pods(ns).List(r.Context(), metav1.ListOptions{LabelSelector: selector})
  1017. if err != nil {
  1018. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1019. return
  1020. }
  1021. for _, pd := range pods.Items {
  1022. for _, cont := range pd.Spec.Containers {
  1023. logs, err := logsFor(a.KubeClientSet, ns, pd.Name, cont.Name, sinceDuration, r.Context())
  1024. if err != nil {
  1025. continue
  1026. }
  1027. appendLog(ns, pd.Name, cont.Name, logs)
  1028. }
  1029. }
  1030. }
  1031. w.Write([]byte(logResult))
  1032. }
  1033. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1034. w.Header().Set("Content-Type", "application/json")
  1035. w.Header().Set("Access-Control-Allow-Origin", "*")
  1036. r.ParseForm()
  1037. key := r.PostForm.Get("key")
  1038. k := []byte(key)
  1039. err := ioutil.WriteFile("/var/configs/key.json", k, 0644)
  1040. if err != nil {
  1041. fmt.Fprintf(w, "Error writing service key: "+err.Error())
  1042. }
  1043. w.WriteHeader(http.StatusOK)
  1044. }
  1045. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1046. w.Header().Set("Content-Type", "application/json")
  1047. w.Header().Set("Access-Control-Allow-Origin", "*")
  1048. encodedValues := env.Get("HELM_VALUES", "")
  1049. if encodedValues == "" {
  1050. fmt.Fprintf(w, "Values reporting disabled")
  1051. return
  1052. }
  1053. result, err := base64.StdEncoding.DecodeString(encodedValues)
  1054. if err != nil {
  1055. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  1056. return
  1057. }
  1058. w.Write(result)
  1059. }
  1060. func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  1061. w.Header().Set("Content-Type", "application/json")
  1062. w.Header().Set("Access-Control-Allow-Origin", "*")
  1063. promServer := env.GetPrometheusServerEndpoint()
  1064. api := prometheusAPI.NewAPI(a.PrometheusClient)
  1065. result, err := api.Config(r.Context())
  1066. if err != nil {
  1067. fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
  1068. } else {
  1069. fmt.Fprintf(w, "Using Prometheus at "+promServer+". PrometheusConfig: "+result.YAML)
  1070. }
  1071. }
  1072. // captures the panic event in sentry
  1073. func capturePanicEvent(err string, stack string) {
  1074. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  1075. klog.V(1).Infoln(msg)
  1076. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  1077. Level: sentry.LevelError,
  1078. Message: msg,
  1079. })
  1080. sentry.Flush(5 * time.Second)
  1081. }
  1082. // handle any panics reported by the errors package
  1083. func handlePanic(p errors.Panic) bool {
  1084. err := p.Error
  1085. if err != nil {
  1086. if err, ok := err.(error); ok {
  1087. capturePanicEvent(err.Error(), p.Stack)
  1088. }
  1089. if err, ok := err.(string); ok {
  1090. capturePanicEvent(err, p.Stack)
  1091. }
  1092. }
  1093. // Return true to recover iff the type is http, otherwise allow kubernetes
  1094. // to recover.
  1095. return p.Type == errors.PanicTypeHTTP
  1096. }
  1097. func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  1098. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", env.GetAppVersion())
  1099. configWatchers := watcher.NewConfigMapWatchers(additionalConfigWatchers...)
  1100. var err error
  1101. if errorReportingEnabled {
  1102. err = sentry.Init(sentry.ClientOptions{Release: env.GetAppVersion()})
  1103. if err != nil {
  1104. klog.Infof("Failed to initialize sentry for error reporting")
  1105. } else {
  1106. err = errors.SetPanicHandler(handlePanic)
  1107. if err != nil {
  1108. klog.Infof("Failed to set panic handler: %s", err)
  1109. }
  1110. }
  1111. }
  1112. address := env.GetPrometheusServerEndpoint()
  1113. if address == "" {
  1114. klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  1115. }
  1116. queryConcurrency := env.GetMaxQueryConcurrency()
  1117. klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  1118. timeout := 120 * time.Second
  1119. keepAlive := 120 * time.Second
  1120. tlsHandshakeTimeout := 10 * time.Second
  1121. scrapeInterval := time.Minute
  1122. var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
  1123. if env.IsPrometheusRetryOnRateLimitResponse() {
  1124. rateLimitRetryOpts = &prom.RateLimitRetryOpts{
  1125. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  1126. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  1127. }
  1128. }
  1129. promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
  1130. Timeout: timeout,
  1131. KeepAlive: keepAlive,
  1132. TLSHandshakeTimeout: tlsHandshakeTimeout,
  1133. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  1134. RateLimitRetryOpts: rateLimitRetryOpts,
  1135. Auth: &prom.ClientAuth{
  1136. Username: env.GetDBBasicAuthUsername(),
  1137. Password: env.GetDBBasicAuthUserPassword(),
  1138. BearerToken: env.GetDBBearerToken(),
  1139. },
  1140. QueryConcurrency: queryConcurrency,
  1141. QueryLogFile: "",
  1142. })
  1143. if err != nil {
  1144. klog.Fatalf("Failed to create prometheus client, Error: %v", err)
  1145. }
  1146. m, err := prom.Validate(promCli)
  1147. if err != nil || !m.Running {
  1148. if err != nil {
  1149. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1150. } else if !m.Running {
  1151. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  1152. }
  1153. } else {
  1154. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  1155. }
  1156. api := prometheusAPI.NewAPI(promCli)
  1157. _, err = api.Config(context.Background())
  1158. if err != nil {
  1159. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1160. } else {
  1161. klog.Infof("Retrieved a prometheus config file from: %s", address)
  1162. }
  1163. // Lookup scrape interval for kubecost job, update if found
  1164. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  1165. if err == nil {
  1166. scrapeInterval = si
  1167. }
  1168. klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  1169. // Kubernetes API setup
  1170. var kc *rest.Config
  1171. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  1172. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  1173. } else {
  1174. kc, err = rest.InClusterConfig()
  1175. }
  1176. if err != nil {
  1177. panic(err.Error())
  1178. }
  1179. kubeClientset, err := kubernetes.NewForConfig(kc)
  1180. if err != nil {
  1181. panic(err.Error())
  1182. }
  1183. // Create ConfigFileManager for synchronization of shared configuration
  1184. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  1185. BucketStoreConfig: env.GetKubecostConfigBucket(),
  1186. LocalConfigPath: "/",
  1187. })
  1188. // Create Kubernetes Cluster Cache + Watchers
  1189. var k8sCache clustercache.ClusterCache
  1190. if env.IsClusterCacheFileEnabled() {
  1191. importLocation := confManager.ConfigFileAt("/var/configs/cluster-cache.json")
  1192. k8sCache = clustercache.NewClusterImporter(importLocation)
  1193. } else {
  1194. k8sCache = clustercache.NewKubernetesClusterCache(kubeClientset)
  1195. }
  1196. k8sCache.Run()
  1197. cloudProviderKey := env.GetCloudProviderAPIKey()
  1198. cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey, confManager)
  1199. if err != nil {
  1200. panic(err.Error())
  1201. }
  1202. // Append the pricing config watcher
  1203. configWatchers.AddWatcher(cloud.ConfigWatcherFor(cloudProvider))
  1204. configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
  1205. watchConfigFunc := configWatchers.ToWatchFunc()
  1206. watchedConfigs := configWatchers.GetWatchedConfigs()
  1207. kubecostNamespace := env.GetKubecostNamespace()
  1208. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  1209. for _, cw := range watchedConfigs {
  1210. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
  1211. if err != nil {
  1212. klog.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
  1213. } else {
  1214. klog.Infof("Found configmap %s, watching...", configs.Name)
  1215. watchConfigFunc(configs)
  1216. }
  1217. }
  1218. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  1219. remoteEnabled := env.IsRemoteEnabled()
  1220. if remoteEnabled {
  1221. info, err := cloudProvider.ClusterInfo()
  1222. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  1223. if err != nil {
  1224. klog.Infof("Error saving cluster id %s", err.Error())
  1225. }
  1226. _, _, err = cloud.GetOrCreateClusterMeta(info["id"], info["name"])
  1227. if err != nil {
  1228. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  1229. }
  1230. }
  1231. // Thanos Client
  1232. var thanosClient prometheus.Client
  1233. if thanos.IsEnabled() {
  1234. thanosAddress := thanos.QueryURL()
  1235. if thanosAddress != "" {
  1236. thanosCli, _ := thanos.NewThanosClient(thanosAddress, &prom.PrometheusClientConfig{
  1237. Timeout: timeout,
  1238. KeepAlive: keepAlive,
  1239. TLSHandshakeTimeout: tlsHandshakeTimeout,
  1240. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  1241. RateLimitRetryOpts: rateLimitRetryOpts,
  1242. Auth: &prom.ClientAuth{
  1243. Username: env.GetMultiClusterBasicAuthUsername(),
  1244. Password: env.GetMultiClusterBasicAuthPassword(),
  1245. BearerToken: env.GetMultiClusterBearerToken(),
  1246. },
  1247. QueryConcurrency: queryConcurrency,
  1248. QueryLogFile: env.GetQueryLoggingFile(),
  1249. })
  1250. _, err = prom.Validate(thanosCli)
  1251. if err != nil {
  1252. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  1253. thanosClient = thanosCli
  1254. } else {
  1255. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  1256. thanosClient = thanosCli
  1257. }
  1258. } else {
  1259. klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  1260. }
  1261. }
  1262. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  1263. var clusterInfoProvider clusters.ClusterInfoProvider
  1264. if env.IsClusterInfoFileEnabled() {
  1265. clusterInfoFile := confManager.ConfigFileAt("/var/configs/cluster-info.json")
  1266. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  1267. } else {
  1268. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  1269. }
  1270. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  1271. var clusterMap clusters.ClusterMap
  1272. if thanosClient != nil {
  1273. clusterMap = clusters.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
  1274. } else {
  1275. clusterMap = clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
  1276. }
  1277. // cache responses from model and aggregation for a default of 10 minutes;
  1278. // clear expired responses every 20 minutes
  1279. aggregateCache := cache.New(time.Minute*10, time.Minute*20)
  1280. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  1281. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1282. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  1283. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1284. // query durations that should be cached longer should be registered here
  1285. // use relatively prime numbers to minimize likelihood of synchronized
  1286. // attempts at cache warming
  1287. day := 24 * time.Hour
  1288. cacheExpiration := map[time.Duration]time.Duration{
  1289. day: maxCacheMinutes1d * time.Minute,
  1290. 2 * day: maxCacheMinutes2d * time.Minute,
  1291. 7 * day: maxCacheMinutes7d * time.Minute,
  1292. 30 * day: maxCacheMinutes30d * time.Minute,
  1293. }
  1294. var pc prometheus.Client
  1295. if thanosClient != nil {
  1296. pc = thanosClient
  1297. } else {
  1298. pc = promCli
  1299. }
  1300. costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
  1301. metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
  1302. a := &Accesses{
  1303. Router: httprouter.New(),
  1304. PrometheusClient: promCli,
  1305. ThanosClient: thanosClient,
  1306. KubeClientSet: kubeClientset,
  1307. ClusterCache: k8sCache,
  1308. ClusterMap: clusterMap,
  1309. CloudProvider: cloudProvider,
  1310. ConfigFileManager: confManager,
  1311. ClusterInfoProvider: clusterInfoProvider,
  1312. Model: costModel,
  1313. MetricsEmitter: metricsEmitter,
  1314. AggregateCache: aggregateCache,
  1315. CostDataCache: costDataCache,
  1316. ClusterCostsCache: clusterCostsCache,
  1317. OutOfClusterCache: outOfClusterCache,
  1318. SettingsCache: settingsCache,
  1319. CacheExpiration: cacheExpiration,
  1320. httpServices: services.NewCostModelServices(),
  1321. }
  1322. // Use the Accesses instance, itself, as the CostModelAggregator. This is
  1323. // confusing and unconventional, but necessary so that we can swap it
  1324. // out for the ETL-adapted version elsewhere.
  1325. // TODO clean this up once ETL is open-sourced.
  1326. a.AggAPI = a
  1327. // Initialize mechanism for subscribing to settings changes
  1328. a.InitializeSettingsPubSub()
  1329. err = a.CloudProvider.DownloadPricingData()
  1330. if err != nil {
  1331. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  1332. }
  1333. // Warm the aggregate cache unless explicitly set to false
  1334. if env.IsCacheWarmingEnabled() {
  1335. log.Infof("Init: AggregateCostModel cache warming enabled")
  1336. a.warmAggregateCostModelCache()
  1337. } else {
  1338. log.Infof("Init: AggregateCostModel cache warming disabled")
  1339. }
  1340. if !env.IsKubecostMetricsPodEnabled() {
  1341. a.MetricsEmitter.Start()
  1342. }
  1343. a.Router.GET("/costDataModel", a.CostDataModel)
  1344. a.Router.GET("/costDataModelRange", a.CostDataModelRange)
  1345. a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
  1346. a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
  1347. a.Router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
  1348. a.Router.GET("/allNodePricing", a.GetAllNodePricing)
  1349. a.Router.POST("/refreshPricing", a.RefreshPricingData)
  1350. a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  1351. a.Router.GET("/clusterCosts", a.ClusterCosts)
  1352. a.Router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  1353. a.Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  1354. a.Router.GET("/managementPlatform", a.ManagementPlatform)
  1355. a.Router.GET("/clusterInfo", a.ClusterInfo)
  1356. a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  1357. a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  1358. a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  1359. a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  1360. // endpoints migrated from server
  1361. a.Router.GET("/allPersistentVolumes", a.GetAllPersistentVolumes)
  1362. a.Router.GET("/allDeployments", a.GetAllDeployments)
  1363. a.Router.GET("/allStorageClasses", a.GetAllStorageClasses)
  1364. a.Router.GET("/allStatefulSets", a.GetAllStatefulSets)
  1365. a.Router.GET("/allNodes", a.GetAllNodes)
  1366. a.Router.GET("/allPods", a.GetAllPods)
  1367. a.Router.GET("/allNamespaces", a.GetAllNamespaces)
  1368. a.Router.GET("/allDaemonSets", a.GetAllDaemonSets)
  1369. a.Router.GET("/pod/:namespace/:name", a.GetPod)
  1370. a.Router.GET("/prometheusRecordingRules", a.PrometheusRecordingRules)
  1371. a.Router.GET("/prometheusConfig", a.PrometheusConfig)
  1372. a.Router.GET("/prometheusTargets", a.PrometheusTargets)
  1373. a.Router.GET("/orphanedPods", a.GetOrphanedPods)
  1374. a.Router.GET("/installNamespace", a.GetInstallNamespace)
  1375. a.Router.GET("/podLogs", a.GetPodLogs)
  1376. a.Router.POST("/serviceKey", a.AddServiceKey)
  1377. a.Router.GET("/helmValues", a.GetHelmValues)
  1378. a.Router.GET("/status", a.Status)
  1379. // prom query proxies
  1380. a.Router.GET("/prometheusQuery", a.PrometheusQuery)
  1381. a.Router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
  1382. a.Router.GET("/thanosQuery", a.ThanosQuery)
  1383. a.Router.GET("/thanosQueryRange", a.ThanosQueryRange)
  1384. // diagnostics
  1385. a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
  1386. a.Router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
  1387. a.httpServices.RegisterAll(a.Router)
  1388. return a
  1389. }