router.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/kubecost/cost-model/pkg/config"
  14. "github.com/kubecost/cost-model/pkg/services"
  15. "github.com/kubecost/cost-model/pkg/util/httputil"
  16. "github.com/kubecost/cost-model/pkg/util/timeutil"
  17. "github.com/kubecost/cost-model/pkg/util/watcher"
  18. "github.com/microcosm-cc/bluemonday"
  19. v1 "k8s.io/api/core/v1"
  20. "k8s.io/klog"
  21. "github.com/julienschmidt/httprouter"
  22. sentry "github.com/getsentry/sentry-go"
  23. "github.com/kubecost/cost-model/pkg/cloud"
  24. "github.com/kubecost/cost-model/pkg/clustercache"
  25. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  26. "github.com/kubecost/cost-model/pkg/env"
  27. "github.com/kubecost/cost-model/pkg/errors"
  28. "github.com/kubecost/cost-model/pkg/kubecost"
  29. "github.com/kubecost/cost-model/pkg/log"
  30. "github.com/kubecost/cost-model/pkg/prom"
  31. "github.com/kubecost/cost-model/pkg/thanos"
  32. "github.com/kubecost/cost-model/pkg/util/json"
  33. prometheus "github.com/prometheus/client_golang/api"
  34. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  35. appsv1 "k8s.io/api/apps/v1"
  36. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  37. "github.com/patrickmn/go-cache"
  38. "k8s.io/client-go/kubernetes"
  39. "k8s.io/client-go/rest"
  40. "k8s.io/client-go/tools/clientcmd"
  41. )
  42. var sanitizePolicy = bluemonday.UGCPolicy()
  43. const (
  44. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  45. maxCacheMinutes1d = 11
  46. maxCacheMinutes2d = 17
  47. maxCacheMinutes7d = 37
  48. maxCacheMinutes30d = 137
  49. CustomPricingSetting = "CustomPricing"
  50. DiscountSetting = "Discount"
  51. epRules = apiPrefix + "/rules"
  52. LogSeparator = "+-------------------------------------------------------------------------------------"
  53. )
  54. var (
  55. // gitCommit is set by the build system
  56. gitCommit string
  57. )
  58. // Accesses defines a singleton application instance, providing access to
  59. // Prometheus, Kubernetes, the cloud provider, and caches.
  60. type Accesses struct {
  61. Router *httprouter.Router
  62. PrometheusClient prometheus.Client
  63. ThanosClient prometheus.Client
  64. KubeClientSet kubernetes.Interface
  65. ClusterCache clustercache.ClusterCache
  66. ClusterMap clusters.ClusterMap
  67. CloudProvider cloud.Provider
  68. ConfigFileManager *config.ConfigFileManager
  69. ClusterInfoProvider clusters.ClusterInfoProvider
  70. Model *CostModel
  71. MetricsEmitter *CostModelMetricsEmitter
  72. OutOfClusterCache *cache.Cache
  73. AggregateCache *cache.Cache
  74. CostDataCache *cache.Cache
  75. ClusterCostsCache *cache.Cache
  76. CacheExpiration map[time.Duration]time.Duration
  77. AggAPI Aggregator
  78. // SettingsCache stores current state of app settings
  79. SettingsCache *cache.Cache
  80. // settingsSubscribers tracks channels through which changes to different
  81. // settings will be published in a pub/sub model
  82. settingsSubscribers map[string][]chan string
  83. settingsMutex sync.Mutex
  84. // registered http service instances
  85. httpServices services.HTTPServices
  86. }
  87. // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
  88. // should be used.
  89. func (a *Accesses) GetPrometheusClient(remote bool) prometheus.Client {
  90. // Use Thanos Client if it exists (enabled) and remote flag set
  91. var pc prometheus.Client
  92. if remote && a.ThanosClient != nil {
  93. pc = a.ThanosClient
  94. } else {
  95. pc = a.PrometheusClient
  96. }
  97. return pc
  98. }
  99. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  100. // If one does not exists, it returns the default cache expiration, which is defined by
  101. // the particular cache.
  102. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  103. if expiration, ok := a.CacheExpiration[dur]; ok {
  104. return expiration
  105. }
  106. return cache.DefaultExpiration
  107. }
  108. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  109. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  110. // not found or is less than 2 minutes.
  111. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  112. expiry := a.GetCacheExpiration(dur).Minutes()
  113. if expiry <= 2.0 {
  114. return time.Minute
  115. }
  116. mins := time.Duration(expiry/2.0) * time.Minute
  117. return mins
  118. }
  119. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  120. w.Header().Set("Content-Type", "application/json")
  121. duration := 24 * time.Hour
  122. offset := time.Minute
  123. durationHrs := "24h"
  124. fmtOffset := "1m"
  125. pClient := a.GetPrometheusClient(true)
  126. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  127. if data, valid := a.ClusterCostsCache.Get(key); valid {
  128. clusterCosts := data.(map[string]*ClusterCosts)
  129. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  130. } else {
  131. data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
  132. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  133. }
  134. }
  135. type Response struct {
  136. Code int `json:"code"`
  137. Status string `json:"status"`
  138. Data interface{} `json:"data"`
  139. Message string `json:"message,omitempty"`
  140. Warning string `json:"warning,omitempty"`
  141. }
  142. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  143. type FilterFunc func(*CostData) (bool, string)
  144. // FilterCostData allows through only CostData that matches all the given filter functions
  145. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  146. result := make(map[string]*CostData)
  147. filteredEnvironments := make(map[string]int)
  148. filteredContainers := 0
  149. DataLoop:
  150. for key, datum := range data {
  151. for _, rf := range retains {
  152. if ok, _ := rf(datum); ok {
  153. result[key] = datum
  154. // if any retain function passes, the data is retained and move on
  155. continue DataLoop
  156. }
  157. }
  158. for _, ff := range filters {
  159. if ok, environment := ff(datum); !ok {
  160. if environment != "" {
  161. filteredEnvironments[environment]++
  162. }
  163. filteredContainers++
  164. // if any filter function check fails, move on to the next datum
  165. continue DataLoop
  166. }
  167. }
  168. result[key] = datum
  169. }
  170. return result, filteredContainers, filteredEnvironments
  171. }
  172. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  173. fs := strings.Split(fields, ",")
  174. fmap := make(map[string]bool)
  175. for _, f := range fs {
  176. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  177. klog.V(1).Infof("to delete: %s", fieldNameLower)
  178. fmap[fieldNameLower] = true
  179. }
  180. filteredData := make(map[string]CostData)
  181. for cname, costdata := range data {
  182. s := reflect.TypeOf(*costdata)
  183. val := reflect.ValueOf(*costdata)
  184. costdata2 := CostData{}
  185. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  186. n := s.NumField()
  187. for i := 0; i < n; i++ {
  188. field := s.Field(i)
  189. value := val.Field(i)
  190. value2 := cd2.Field(i)
  191. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  192. value2.Set(reflect.Value(value))
  193. }
  194. }
  195. filteredData[cname] = cd2.Interface().(CostData)
  196. }
  197. return filteredData
  198. }
  199. func normalizeTimeParam(param string) (string, error) {
  200. if param == "" {
  201. return "", fmt.Errorf("invalid time param")
  202. }
  203. // convert days to hours
  204. if param[len(param)-1:] == "d" {
  205. count := param[:len(param)-1]
  206. val, err := strconv.ParseInt(count, 10, 64)
  207. if err != nil {
  208. return "", err
  209. }
  210. val = val * 24
  211. param = fmt.Sprintf("%dh", val)
  212. }
  213. return param, nil
  214. }
  215. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  216. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  217. // return 0.0.
  218. func ParsePercentString(percentStr string) (float64, error) {
  219. if len(percentStr) == 0 {
  220. return 0.0, nil
  221. }
  222. if percentStr[len(percentStr)-1:] == "%" {
  223. percentStr = percentStr[:len(percentStr)-1]
  224. }
  225. discount, err := strconv.ParseFloat(percentStr, 64)
  226. if err != nil {
  227. return 0.0, err
  228. }
  229. discount *= 0.01
  230. return discount, nil
  231. }
  232. func WrapData(data interface{}, err error) []byte {
  233. var resp []byte
  234. if err != nil {
  235. klog.V(1).Infof("Error returned to client: %s", err.Error())
  236. resp, _ = json.Marshal(&Response{
  237. Code: http.StatusInternalServerError,
  238. Status: "error",
  239. Message: err.Error(),
  240. Data: data,
  241. })
  242. } else {
  243. resp, _ = json.Marshal(&Response{
  244. Code: http.StatusOK,
  245. Status: "success",
  246. Data: data,
  247. })
  248. }
  249. return resp
  250. }
  251. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  252. var resp []byte
  253. if err != nil {
  254. klog.V(1).Infof("Error returned to client: %s", err.Error())
  255. resp, _ = json.Marshal(&Response{
  256. Code: http.StatusInternalServerError,
  257. Status: "error",
  258. Message: err.Error(),
  259. Data: data,
  260. })
  261. } else {
  262. resp, _ = json.Marshal(&Response{
  263. Code: http.StatusOK,
  264. Status: "success",
  265. Data: data,
  266. Message: message,
  267. })
  268. }
  269. return resp
  270. }
  271. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  272. var resp []byte
  273. if err != nil {
  274. klog.V(1).Infof("Error returned to client: %s", err.Error())
  275. resp, _ = json.Marshal(&Response{
  276. Code: http.StatusInternalServerError,
  277. Status: "error",
  278. Message: err.Error(),
  279. Warning: warning,
  280. Data: data,
  281. })
  282. } else {
  283. resp, _ = json.Marshal(&Response{
  284. Code: http.StatusOK,
  285. Status: "success",
  286. Data: data,
  287. Warning: warning,
  288. })
  289. }
  290. return resp
  291. }
  292. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  293. var resp []byte
  294. if err != nil {
  295. klog.V(1).Infof("Error returned to client: %s", err.Error())
  296. resp, _ = json.Marshal(&Response{
  297. Code: http.StatusInternalServerError,
  298. Status: "error",
  299. Message: err.Error(),
  300. Warning: warning,
  301. Data: data,
  302. })
  303. } else {
  304. resp, _ = json.Marshal(&Response{
  305. Code: http.StatusOK,
  306. Status: "success",
  307. Data: data,
  308. Message: message,
  309. Warning: warning,
  310. })
  311. }
  312. return resp
  313. }
  314. // wrapAsObjectItems wraps a slice of items into an object containing a single items list
  315. // allows our k8s proxy methods to emulate a List() request to k8s API
  316. func wrapAsObjectItems(items interface{}) map[string]interface{} {
  317. return map[string]interface{}{
  318. "items": items,
  319. }
  320. }
  321. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  322. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  323. w.Header().Set("Content-Type", "application/json")
  324. w.Header().Set("Access-Control-Allow-Origin", "*")
  325. err := a.CloudProvider.DownloadPricingData()
  326. if err != nil {
  327. klog.V(1).Infof("Error refreshing pricing data: %s", err.Error())
  328. }
  329. w.Write(WrapData(nil, err))
  330. }
  331. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  332. w.Header().Set("Content-Type", "application/json")
  333. w.Header().Set("Access-Control-Allow-Origin", "*")
  334. window := r.URL.Query().Get("timeWindow")
  335. offset := r.URL.Query().Get("offset")
  336. fields := r.URL.Query().Get("filterFields")
  337. namespace := r.URL.Query().Get("namespace")
  338. if offset != "" {
  339. offset = "offset " + offset
  340. }
  341. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
  342. if fields != "" {
  343. filteredData := filterFields(fields, data)
  344. w.Write(WrapData(filteredData, err))
  345. } else {
  346. w.Write(WrapData(data, err))
  347. }
  348. }
  349. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  350. w.Header().Set("Content-Type", "application/json")
  351. w.Header().Set("Access-Control-Allow-Origin", "*")
  352. window := r.URL.Query().Get("window")
  353. offset := r.URL.Query().Get("offset")
  354. if window == "" {
  355. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  356. return
  357. }
  358. windowDur, err := timeutil.ParseDuration(window)
  359. if err != nil {
  360. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  361. return
  362. }
  363. // offset is not a required parameter
  364. var offsetDur time.Duration
  365. if offset != "" {
  366. offsetDur, err = timeutil.ParseDuration(offset)
  367. if err != nil {
  368. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  369. return
  370. }
  371. }
  372. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  373. if useThanos && !thanos.IsEnabled() {
  374. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  375. return
  376. }
  377. var client prometheus.Client
  378. if useThanos {
  379. client = a.ThanosClient
  380. offsetDur = thanos.OffsetDuration()
  381. } else {
  382. client = a.PrometheusClient
  383. }
  384. data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
  385. w.Write(WrapData(data, err))
  386. }
  387. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  388. w.Header().Set("Content-Type", "application/json")
  389. w.Header().Set("Access-Control-Allow-Origin", "*")
  390. start := r.URL.Query().Get("start")
  391. end := r.URL.Query().Get("end")
  392. window := r.URL.Query().Get("window")
  393. offset := r.URL.Query().Get("offset")
  394. if window == "" {
  395. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  396. return
  397. }
  398. windowDur, err := timeutil.ParseDuration(window)
  399. if err != nil {
  400. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  401. return
  402. }
  403. // offset is not a required parameter
  404. var offsetDur time.Duration
  405. if offset != "" {
  406. offsetDur, err = timeutil.ParseDuration(offset)
  407. if err != nil {
  408. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  409. return
  410. }
  411. }
  412. data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
  413. w.Write(WrapData(data, err))
  414. }
  415. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  416. w.Header().Set("Content-Type", "application/json")
  417. w.Header().Set("Access-Control-Allow-Origin", "*")
  418. startStr := r.URL.Query().Get("start")
  419. endStr := r.URL.Query().Get("end")
  420. windowStr := r.URL.Query().Get("window")
  421. fields := r.URL.Query().Get("filterFields")
  422. namespace := r.URL.Query().Get("namespace")
  423. cluster := r.URL.Query().Get("cluster")
  424. remote := r.URL.Query().Get("remote")
  425. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  426. layout := "2006-01-02T15:04:05.000Z"
  427. start, err := time.Parse(layout, startStr)
  428. if err != nil {
  429. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
  430. return
  431. }
  432. end, err := time.Parse(layout, endStr)
  433. if err != nil {
  434. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
  435. return
  436. }
  437. window := kubecost.NewWindow(&start, &end)
  438. if window.IsOpen() || window.IsEmpty() || window.IsNegative() {
  439. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
  440. return
  441. }
  442. resolution := time.Hour
  443. if resDur, err := time.ParseDuration(windowStr); err == nil {
  444. resolution = resDur
  445. }
  446. // Use Thanos Client if it exists (enabled) and remote flag set
  447. var pClient prometheus.Client
  448. if remote != "false" && a.ThanosClient != nil {
  449. pClient = a.ThanosClient
  450. } else {
  451. pClient = a.PrometheusClient
  452. }
  453. data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
  454. if err != nil {
  455. w.Write(WrapData(nil, err))
  456. }
  457. if fields != "" {
  458. filteredData := filterFields(fields, data)
  459. w.Write(WrapData(filteredData, err))
  460. } else {
  461. w.Write(WrapData(data, err))
  462. }
  463. }
  464. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  465. var key string
  466. var filter string
  467. var val []string
  468. if customAggregation != "" {
  469. key = customAggregation
  470. filter = filterType
  471. val = strings.Split(customAggregation, ",")
  472. } else {
  473. aggregations := strings.Split(aggregator, ",")
  474. for i, agg := range aggregations {
  475. aggregations[i] = "kubernetes_" + agg
  476. }
  477. key = strings.Join(aggregations, ",")
  478. filter = "kubernetes_" + filterType
  479. val = aggregations
  480. }
  481. return key, val, filter
  482. }
  483. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  484. w.Header().Set("Content-Type", "application/json")
  485. w.Header().Set("Access-Control-Allow-Origin", "*")
  486. start := r.URL.Query().Get("start")
  487. end := r.URL.Query().Get("end")
  488. aggregator := r.URL.Query().Get("aggregator")
  489. customAggregation := r.URL.Query().Get("customAggregation")
  490. filterType := r.URL.Query().Get("filterType")
  491. filterValue := r.URL.Query().Get("filterValue")
  492. var data []*cloud.OutOfClusterAllocation
  493. var err error
  494. _, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
  495. data, err = a.CloudProvider.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
  496. w.Write(WrapData(data, err))
  497. }
  498. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  499. w.Header().Set("Content-Type", "application/json")
  500. w.Header().Set("Access-Control-Allow-Origin", "*")
  501. // start date for which to query costs, inclusive; format YYYY-MM-DD
  502. start := r.URL.Query().Get("start")
  503. // end date for which to query costs, inclusive; format YYYY-MM-DD
  504. end := r.URL.Query().Get("end")
  505. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  506. kubernetesAggregation := r.URL.Query().Get("aggregator")
  507. // customAggregation allows full customization of aggregator w/o prepending
  508. customAggregation := r.URL.Query().Get("customAggregation")
  509. // disableCache, if set to "true", tells this function to recompute and
  510. // cache the requested data
  511. disableCache := r.URL.Query().Get("disableCache") == "true"
  512. // clearCache, if set to "true", tells this function to flush the cache,
  513. // then recompute and cache the requested data
  514. clearCache := r.URL.Query().Get("clearCache") == "true"
  515. filterType := r.URL.Query().Get("filterType")
  516. filterValue := r.URL.Query().Get("filterValue")
  517. aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
  518. // clear cache prior to checking the cache so that a clearCache=true
  519. // request always returns a freshly computed value
  520. if clearCache {
  521. a.OutOfClusterCache.Flush()
  522. }
  523. // attempt to retrieve cost data from cache
  524. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
  525. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  526. if data, ok := value.([]*cloud.OutOfClusterAllocation); ok {
  527. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  528. return
  529. }
  530. klog.Errorf("caching error: failed to type cast data: %s", key)
  531. }
  532. data, err := a.CloudProvider.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
  533. if err == nil {
  534. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  535. }
  536. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  537. }
  538. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  539. w.Header().Set("Content-Type", "application/json")
  540. w.Header().Set("Access-Control-Allow-Origin", "*")
  541. data, err := a.CloudProvider.AllNodePricing()
  542. w.Write(WrapData(data, err))
  543. }
  544. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  545. w.Header().Set("Content-Type", "application/json")
  546. w.Header().Set("Access-Control-Allow-Origin", "*")
  547. data, err := a.CloudProvider.GetConfig()
  548. w.Write(WrapData(data, err))
  549. }
  550. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  551. w.Header().Set("Content-Type", "application/json")
  552. w.Header().Set("Access-Control-Allow-Origin", "*")
  553. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.SpotInfoUpdateType)
  554. if err != nil {
  555. w.Write(WrapData(data, err))
  556. return
  557. }
  558. w.Write(WrapData(data, err))
  559. err = a.CloudProvider.DownloadPricingData()
  560. if err != nil {
  561. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  562. }
  563. return
  564. }
  565. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  566. w.Header().Set("Content-Type", "application/json")
  567. w.Header().Set("Access-Control-Allow-Origin", "*")
  568. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.AthenaInfoUpdateType)
  569. if err != nil {
  570. w.Write(WrapData(data, err))
  571. return
  572. }
  573. w.Write(WrapData(data, err))
  574. return
  575. }
  576. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  577. w.Header().Set("Content-Type", "application/json")
  578. w.Header().Set("Access-Control-Allow-Origin", "*")
  579. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.BigqueryUpdateType)
  580. if err != nil {
  581. w.Write(WrapData(data, err))
  582. return
  583. }
  584. w.Write(WrapData(data, err))
  585. return
  586. }
  587. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  588. w.Header().Set("Content-Type", "application/json")
  589. w.Header().Set("Access-Control-Allow-Origin", "*")
  590. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  591. if err != nil {
  592. w.Write(WrapData(data, err))
  593. return
  594. }
  595. w.Write(WrapData(data, err))
  596. return
  597. }
  598. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  599. w.Header().Set("Content-Type", "application/json")
  600. w.Header().Set("Access-Control-Allow-Origin", "*")
  601. data, err := a.CloudProvider.GetManagementPlatform()
  602. if err != nil {
  603. w.Write(WrapData(data, err))
  604. return
  605. }
  606. w.Write(WrapData(data, err))
  607. return
  608. }
  609. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  610. w.Header().Set("Content-Type", "application/json")
  611. w.Header().Set("Access-Control-Allow-Origin", "*")
  612. data := a.ClusterInfoProvider.GetClusterInfo()
  613. w.Write(WrapData(data, nil))
  614. }
  615. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  616. w.Header().Set("Content-Type", "application/json")
  617. w.Header().Set("Access-Control-Allow-Origin", "*")
  618. data := a.ClusterMap.AsMap()
  619. w.Write(WrapData(data, nil))
  620. }
  621. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  622. w.Header().Set("Content-Type", "application/json")
  623. w.Header().Set("Access-Control-Allow-Origin", "*")
  624. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  625. }
  626. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  627. w.Header().Set("Content-Type", "application/json")
  628. w.Header().Set("Access-Control-Allow-Origin", "*")
  629. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  630. }
  631. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  632. w.Header().Set("Content-Type", "application/json")
  633. w.Header().Set("Access-Control-Allow-Origin", "*")
  634. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  635. }
  636. func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  637. w.Header().Set("Content-Type", "application/json")
  638. w.Header().Set("Access-Control-Allow-Origin", "*")
  639. w.Write(WrapData(prom.Validate(a.PrometheusClient)))
  640. }
  641. func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  642. w.Header().Set("Content-Type", "application/json")
  643. w.Header().Set("Access-Control-Allow-Origin", "*")
  644. qp := httputil.NewQueryParams(r.URL.Query())
  645. query := qp.Get("query", "")
  646. if query == "" {
  647. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  648. return
  649. }
  650. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  651. body, err := ctx.RawQuery(query)
  652. if err != nil {
  653. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  654. return
  655. }
  656. w.Write(body)
  657. }
  658. func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  659. w.Header().Set("Content-Type", "application/json")
  660. w.Header().Set("Access-Control-Allow-Origin", "*")
  661. qp := httputil.NewQueryParams(r.URL.Query())
  662. query := qp.Get("query", "")
  663. if query == "" {
  664. fmt.Fprintf(w, "Error parsing query from request parameters.")
  665. return
  666. }
  667. start, end, duration, err := toStartEndStep(qp)
  668. if err != nil {
  669. fmt.Fprintf(w, err.Error())
  670. return
  671. }
  672. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  673. body, err := ctx.RawQueryRange(query, start, end, duration)
  674. if err != nil {
  675. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  676. return
  677. }
  678. w.Write(body)
  679. }
  680. func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  681. w.Header().Set("Content-Type", "application/json")
  682. w.Header().Set("Access-Control-Allow-Origin", "*")
  683. if !thanos.IsEnabled() {
  684. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  685. return
  686. }
  687. qp := httputil.NewQueryParams(r.URL.Query())
  688. query := qp.Get("query", "")
  689. if query == "" {
  690. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  691. return
  692. }
  693. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  694. body, err := ctx.RawQuery(query)
  695. if err != nil {
  696. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  697. return
  698. }
  699. w.Write(body)
  700. }
  701. func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  702. w.Header().Set("Content-Type", "application/json")
  703. w.Header().Set("Access-Control-Allow-Origin", "*")
  704. if !thanos.IsEnabled() {
  705. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  706. return
  707. }
  708. qp := httputil.NewQueryParams(r.URL.Query())
  709. query := qp.Get("query", "")
  710. if query == "" {
  711. fmt.Fprintf(w, "Error parsing query from request parameters.")
  712. return
  713. }
  714. start, end, duration, err := toStartEndStep(qp)
  715. if err != nil {
  716. fmt.Fprintf(w, err.Error())
  717. return
  718. }
  719. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  720. body, err := ctx.RawQueryRange(query, start, end, duration)
  721. if err != nil {
  722. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  723. return
  724. }
  725. w.Write(body)
  726. }
  727. // helper for query range proxy requests
  728. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  729. var e error
  730. ss := qp.Get("start", "")
  731. es := qp.Get("end", "")
  732. ds := qp.Get("duration", "")
  733. layout := "2006-01-02T15:04:05.000Z"
  734. start, e = time.Parse(layout, ss)
  735. if e != nil {
  736. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  737. return
  738. }
  739. end, e = time.Parse(layout, es)
  740. if e != nil {
  741. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  742. return
  743. }
  744. step, e = time.ParseDuration(ds)
  745. if e != nil {
  746. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  747. return
  748. }
  749. err = nil
  750. return
  751. }
  752. func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  753. w.Header().Set("Content-Type", "application/json")
  754. w.Header().Set("Access-Control-Allow-Origin", "*")
  755. promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
  756. if err != nil {
  757. w.Write(WrapData(nil, err))
  758. return
  759. }
  760. result := map[string]*prom.PrometheusQueueState{
  761. "prometheus": promQueueState,
  762. }
  763. if thanos.IsEnabled() {
  764. thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
  765. if err != nil {
  766. log.Warningf("Error getting Thanos queue state: %s", err)
  767. } else {
  768. result["thanos"] = thanosQueueState
  769. }
  770. }
  771. w.Write(WrapData(result, nil))
  772. }
  773. // GetPrometheusMetrics retrieves availability of Prometheus and Thanos metrics
  774. func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  775. w.Header().Set("Content-Type", "application/json")
  776. w.Header().Set("Access-Control-Allow-Origin", "*")
  777. promMetrics, err := prom.GetPrometheusMetrics(a.PrometheusClient, "")
  778. if err != nil {
  779. w.Write(WrapData(nil, err))
  780. return
  781. }
  782. result := map[string][]*prom.PrometheusDiagnostic{
  783. "prometheus": promMetrics,
  784. }
  785. if thanos.IsEnabled() {
  786. thanosMetrics, err := prom.GetPrometheusMetrics(a.ThanosClient, thanos.QueryOffset())
  787. if err != nil {
  788. log.Warningf("Error getting Thanos queue state: %s", err)
  789. } else {
  790. result["thanos"] = thanosMetrics
  791. }
  792. }
  793. w.Write(WrapData(result, nil))
  794. }
  795. func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  796. w.Header().Set("Content-Type", "application/json")
  797. w.Header().Set("Access-Control-Allow-Origin", "*")
  798. pvList := a.ClusterCache.GetAllPersistentVolumes()
  799. body, err := json.Marshal(wrapAsObjectItems(pvList))
  800. if err != nil {
  801. fmt.Fprintf(w, "Error decoding persistent volumes: "+err.Error())
  802. } else {
  803. w.Write(body)
  804. }
  805. }
  806. func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  807. w.Header().Set("Content-Type", "application/json")
  808. w.Header().Set("Access-Control-Allow-Origin", "*")
  809. qp := httputil.NewQueryParams(r.URL.Query())
  810. namespace := qp.Get("namespace", "")
  811. deploymentsList := a.ClusterCache.GetAllDeployments()
  812. // filter for provided namespace
  813. var deployments []*appsv1.Deployment
  814. if namespace == "" {
  815. deployments = deploymentsList
  816. } else {
  817. deployments = []*appsv1.Deployment{}
  818. for _, d := range deploymentsList {
  819. if d.Namespace == namespace {
  820. deployments = append(deployments, d)
  821. }
  822. }
  823. }
  824. body, err := json.Marshal(wrapAsObjectItems(deployments))
  825. if err != nil {
  826. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  827. } else {
  828. w.Write(body)
  829. }
  830. }
  831. func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  832. w.Header().Set("Content-Type", "application/json")
  833. w.Header().Set("Access-Control-Allow-Origin", "*")
  834. scList := a.ClusterCache.GetAllStorageClasses()
  835. body, err := json.Marshal(wrapAsObjectItems(scList))
  836. if err != nil {
  837. fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
  838. } else {
  839. w.Write(body)
  840. }
  841. }
  842. func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  843. w.Header().Set("Content-Type", "application/json")
  844. w.Header().Set("Access-Control-Allow-Origin", "*")
  845. qp := httputil.NewQueryParams(r.URL.Query())
  846. namespace := qp.Get("namespace", "")
  847. statefulSetsList := a.ClusterCache.GetAllStatefulSets()
  848. // filter for provided namespace
  849. var statefulSets []*appsv1.StatefulSet
  850. if namespace == "" {
  851. statefulSets = statefulSetsList
  852. } else {
  853. statefulSets = []*appsv1.StatefulSet{}
  854. for _, ss := range statefulSetsList {
  855. if ss.Namespace == namespace {
  856. statefulSets = append(statefulSets, ss)
  857. }
  858. }
  859. }
  860. body, err := json.Marshal(wrapAsObjectItems(statefulSets))
  861. if err != nil {
  862. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  863. } else {
  864. w.Write(body)
  865. }
  866. }
  867. func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  868. w.Header().Set("Content-Type", "application/json")
  869. w.Header().Set("Access-Control-Allow-Origin", "*")
  870. nodeList := a.ClusterCache.GetAllNodes()
  871. body, err := json.Marshal(wrapAsObjectItems(nodeList))
  872. if err != nil {
  873. fmt.Fprintf(w, "Error decoding nodes: "+err.Error())
  874. } else {
  875. w.Write(body)
  876. }
  877. }
  878. func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  879. w.Header().Set("Content-Type", "application/json")
  880. w.Header().Set("Access-Control-Allow-Origin", "*")
  881. podlist := a.ClusterCache.GetAllPods()
  882. body, err := json.Marshal(wrapAsObjectItems(podlist))
  883. if err != nil {
  884. fmt.Fprintf(w, "Error decoding pods: "+err.Error())
  885. } else {
  886. w.Write(body)
  887. }
  888. }
  889. func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  890. w.Header().Set("Content-Type", "application/json")
  891. w.Header().Set("Access-Control-Allow-Origin", "*")
  892. namespaces := a.ClusterCache.GetAllNamespaces()
  893. body, err := json.Marshal(wrapAsObjectItems(namespaces))
  894. if err != nil {
  895. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  896. } else {
  897. w.Write(body)
  898. }
  899. }
  900. func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  901. w.Header().Set("Content-Type", "application/json")
  902. w.Header().Set("Access-Control-Allow-Origin", "*")
  903. daemonSets := a.ClusterCache.GetAllDaemonSets()
  904. body, err := json.Marshal(wrapAsObjectItems(daemonSets))
  905. if err != nil {
  906. fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
  907. } else {
  908. w.Write(body)
  909. }
  910. }
  911. func (a *Accesses) GetPod(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  912. w.Header().Set("Content-Type", "application/json")
  913. w.Header().Set("Access-Control-Allow-Origin", "*")
  914. podName := ps.ByName("name")
  915. podNamespace := ps.ByName("namespace")
  916. // TODO: ClusterCache API could probably afford to have some better filtering
  917. allPods := a.ClusterCache.GetAllPods()
  918. for _, pod := range allPods {
  919. if pod.Namespace == podNamespace && pod.Name == podName {
  920. body, err := json.Marshal(pod)
  921. if err != nil {
  922. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  923. } else {
  924. w.Write(body)
  925. }
  926. return
  927. }
  928. }
  929. fmt.Fprintf(w, "Pod not found\n")
  930. }
  931. func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  932. w.Header().Set("Content-Type", "application/json")
  933. w.Header().Set("Access-Control-Allow-Origin", "*")
  934. u := a.PrometheusClient.URL(epRules, nil)
  935. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  936. if err != nil {
  937. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  938. }
  939. _, body, _, err := a.PrometheusClient.Do(r.Context(), req)
  940. if err != nil {
  941. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  942. } else {
  943. w.Write(body)
  944. }
  945. }
  946. func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  947. w.Header().Set("Content-Type", "application/json")
  948. w.Header().Set("Access-Control-Allow-Origin", "*")
  949. pConfig := map[string]string{
  950. "address": env.GetPrometheusServerEndpoint(),
  951. }
  952. body, err := json.Marshal(pConfig)
  953. if err != nil {
  954. fmt.Fprintf(w, "Error marshalling prometheus config")
  955. } else {
  956. w.Write(body)
  957. }
  958. }
  959. func (a *Accesses) PrometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  960. w.Header().Set("Content-Type", "application/json")
  961. w.Header().Set("Access-Control-Allow-Origin", "*")
  962. u := a.PrometheusClient.URL(epTargets, nil)
  963. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  964. if err != nil {
  965. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  966. }
  967. _, body, _, err := a.PrometheusClient.Do(r.Context(), req)
  968. if err != nil {
  969. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  970. } else {
  971. w.Write(body)
  972. }
  973. }
  974. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  975. w.Header().Set("Content-Type", "application/json")
  976. w.Header().Set("Access-Control-Allow-Origin", "*")
  977. podlist := a.ClusterCache.GetAllPods()
  978. var lonePods []*v1.Pod
  979. for _, pod := range podlist {
  980. if len(pod.OwnerReferences) == 0 {
  981. lonePods = append(lonePods, pod)
  982. }
  983. }
  984. body, err := json.Marshal(lonePods)
  985. if err != nil {
  986. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  987. } else {
  988. w.Write(body)
  989. }
  990. }
  991. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  992. w.Header().Set("Content-Type", "application/json")
  993. w.Header().Set("Access-Control-Allow-Origin", "*")
  994. ns := env.GetKubecostNamespace()
  995. w.Write([]byte(ns))
  996. }
  997. // logsFor pulls the logs for a specific pod, namespace, and container
  998. func logsFor(c kubernetes.Interface, namespace string, pod string, container string, dur time.Duration, ctx context.Context) (string, error) {
  999. since := time.Now().UTC().Add(-dur)
  1000. logOpts := v1.PodLogOptions{
  1001. SinceTime: &metav1.Time{Time: since},
  1002. }
  1003. if container != "" {
  1004. logOpts.Container = container
  1005. }
  1006. req := c.CoreV1().Pods(namespace).GetLogs(pod, &logOpts)
  1007. reader, err := req.Stream(ctx)
  1008. if err != nil {
  1009. return "", err
  1010. }
  1011. podLogs, err := ioutil.ReadAll(reader)
  1012. if err != nil {
  1013. return "", err
  1014. }
  1015. return string(podLogs), nil
  1016. }
  1017. func (a *Accesses) GetPodLogs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1018. w.Header().Set("Content-Type", "application/json")
  1019. w.Header().Set("Access-Control-Allow-Origin", "*")
  1020. qp := httputil.NewQueryParams(r.URL.Query())
  1021. ns := qp.Get("namespace", env.GetKubecostNamespace())
  1022. pod := qp.Get("pod", "")
  1023. selector := qp.Get("selector", "")
  1024. container := qp.Get("container", "")
  1025. since := qp.Get("since", "24h")
  1026. sinceDuration, err := time.ParseDuration(since)
  1027. if err != nil {
  1028. fmt.Fprintf(w, "Invalid Duration String: "+err.Error())
  1029. return
  1030. }
  1031. var logResult string
  1032. appendLog := func(ns string, pod string, container string, l string) {
  1033. if l == "" {
  1034. return
  1035. }
  1036. logResult += fmt.Sprintf("%s\n| %s:%s:%s\n%s\n%s\n\n", LogSeparator, ns, pod, container, LogSeparator, l)
  1037. }
  1038. if pod != "" {
  1039. pd, err := a.KubeClientSet.CoreV1().Pods(ns).Get(r.Context(), pod, metav1.GetOptions{})
  1040. if err != nil {
  1041. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1042. return
  1043. }
  1044. if container != "" {
  1045. var foundContainer bool
  1046. for _, cont := range pd.Spec.Containers {
  1047. if strings.EqualFold(cont.Name, container) {
  1048. foundContainer = true
  1049. break
  1050. }
  1051. }
  1052. if !foundContainer {
  1053. fmt.Fprintf(w, "Could not find container: "+container)
  1054. return
  1055. }
  1056. }
  1057. logs, err := logsFor(a.KubeClientSet, ns, pod, container, sinceDuration, r.Context())
  1058. if err != nil {
  1059. fmt.Fprintf(w, "Error Getting Logs: "+err.Error())
  1060. return
  1061. }
  1062. appendLog(ns, pod, container, logs)
  1063. w.Write([]byte(logResult))
  1064. return
  1065. }
  1066. if selector != "" {
  1067. pods, err := a.KubeClientSet.CoreV1().Pods(ns).List(r.Context(), metav1.ListOptions{LabelSelector: selector})
  1068. if err != nil {
  1069. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1070. return
  1071. }
  1072. for _, pd := range pods.Items {
  1073. for _, cont := range pd.Spec.Containers {
  1074. logs, err := logsFor(a.KubeClientSet, ns, pd.Name, cont.Name, sinceDuration, r.Context())
  1075. if err != nil {
  1076. continue
  1077. }
  1078. appendLog(ns, pd.Name, cont.Name, logs)
  1079. }
  1080. }
  1081. }
  1082. w.Write([]byte(logResult))
  1083. }
  1084. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1085. w.Header().Set("Content-Type", "application/json")
  1086. w.Header().Set("Access-Control-Allow-Origin", "*")
  1087. r.ParseForm()
  1088. key := r.PostForm.Get("key")
  1089. k := []byte(key)
  1090. err := ioutil.WriteFile("/var/configs/key.json", k, 0644)
  1091. if err != nil {
  1092. fmt.Fprintf(w, "Error writing service key: "+err.Error())
  1093. }
  1094. w.WriteHeader(http.StatusOK)
  1095. }
  1096. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1097. w.Header().Set("Content-Type", "application/json")
  1098. w.Header().Set("Access-Control-Allow-Origin", "*")
  1099. encodedValues := env.Get("HELM_VALUES", "")
  1100. if encodedValues == "" {
  1101. fmt.Fprintf(w, "Values reporting disabled")
  1102. return
  1103. }
  1104. result, err := base64.StdEncoding.DecodeString(encodedValues)
  1105. if err != nil {
  1106. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  1107. return
  1108. }
  1109. w.Write(result)
  1110. }
  1111. func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  1112. w.Header().Set("Content-Type", "application/json")
  1113. w.Header().Set("Access-Control-Allow-Origin", "*")
  1114. promServer := env.GetPrometheusServerEndpoint()
  1115. api := prometheusAPI.NewAPI(a.PrometheusClient)
  1116. result, err := api.Config(r.Context())
  1117. if err != nil {
  1118. fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
  1119. } else {
  1120. fmt.Fprintf(w, "Using Prometheus at "+promServer+". PrometheusConfig: "+result.YAML)
  1121. }
  1122. }
  1123. // captures the panic event in sentry
  1124. func capturePanicEvent(err string, stack string) {
  1125. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  1126. klog.V(1).Infoln(msg)
  1127. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  1128. Level: sentry.LevelError,
  1129. Message: msg,
  1130. })
  1131. sentry.Flush(5 * time.Second)
  1132. }
  1133. // handle any panics reported by the errors package
  1134. func handlePanic(p errors.Panic) bool {
  1135. err := p.Error
  1136. if err != nil {
  1137. if err, ok := err.(error); ok {
  1138. capturePanicEvent(err.Error(), p.Stack)
  1139. }
  1140. if err, ok := err.(string); ok {
  1141. capturePanicEvent(err, p.Stack)
  1142. }
  1143. }
  1144. // Return true to recover iff the type is http, otherwise allow kubernetes
  1145. // to recover.
  1146. return p.Type == errors.PanicTypeHTTP
  1147. }
  1148. func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  1149. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", env.GetAppVersion())
  1150. configWatchers := watcher.NewConfigMapWatchers(additionalConfigWatchers...)
  1151. var err error
  1152. if errorReportingEnabled {
  1153. err = sentry.Init(sentry.ClientOptions{Release: env.GetAppVersion()})
  1154. if err != nil {
  1155. klog.Infof("Failed to initialize sentry for error reporting")
  1156. } else {
  1157. err = errors.SetPanicHandler(handlePanic)
  1158. if err != nil {
  1159. klog.Infof("Failed to set panic handler: %s", err)
  1160. }
  1161. }
  1162. }
  1163. address := env.GetPrometheusServerEndpoint()
  1164. if address == "" {
  1165. klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  1166. }
  1167. queryConcurrency := env.GetMaxQueryConcurrency()
  1168. klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  1169. timeout := 120 * time.Second
  1170. keepAlive := 120 * time.Second
  1171. scrapeInterval := time.Minute
  1172. promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
  1173. if err != nil {
  1174. klog.Fatalf("Failed to create prometheus client, Error: %v", err)
  1175. }
  1176. m, err := prom.Validate(promCli)
  1177. if err != nil || !m.Running {
  1178. if err != nil {
  1179. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1180. } else if !m.Running {
  1181. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  1182. }
  1183. } else {
  1184. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  1185. }
  1186. api := prometheusAPI.NewAPI(promCli)
  1187. _, err = api.Config(context.Background())
  1188. if err != nil {
  1189. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1190. } else {
  1191. klog.Infof("Retrieved a prometheus config file from: %s", address)
  1192. }
  1193. // Lookup scrape interval for kubecost job, update if found
  1194. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  1195. if err == nil {
  1196. scrapeInterval = si
  1197. }
  1198. klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  1199. // Kubernetes API setup
  1200. var kc *rest.Config
  1201. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  1202. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  1203. } else {
  1204. kc, err = rest.InClusterConfig()
  1205. }
  1206. if err != nil {
  1207. panic(err.Error())
  1208. }
  1209. kubeClientset, err := kubernetes.NewForConfig(kc)
  1210. if err != nil {
  1211. panic(err.Error())
  1212. }
  1213. // Create ConfigFileManager for synchronization of shared configuration
  1214. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  1215. BucketStoreConfig: env.GetKubecostConfigBucket(),
  1216. LocalConfigPath: "/",
  1217. })
  1218. // Create Kubernetes Cluster Cache + Watchers
  1219. var k8sCache clustercache.ClusterCache
  1220. if env.IsClusterCacheFileEnabled() {
  1221. importLocation := confManager.ConfigFileAt("/var/configs/cluster-cache.json")
  1222. k8sCache = clustercache.NewClusterImporter(importLocation)
  1223. } else {
  1224. k8sCache = clustercache.NewKubernetesClusterCache(kubeClientset)
  1225. }
  1226. k8sCache.Run()
  1227. cloudProviderKey := env.GetCloudProviderAPIKey()
  1228. cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey, confManager)
  1229. if err != nil {
  1230. panic(err.Error())
  1231. }
  1232. // Append the pricing config watcher
  1233. configWatchers.AddWatcher(cloud.ConfigWatcherFor(cloudProvider))
  1234. watchConfigFunc := configWatchers.ToWatchFunc()
  1235. watchedConfigs := configWatchers.GetWatchedConfigs()
  1236. kubecostNamespace := env.GetKubecostNamespace()
  1237. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  1238. for _, cw := range watchedConfigs {
  1239. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
  1240. if err != nil {
  1241. klog.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
  1242. } else {
  1243. klog.Infof("Found configmap %s, watching...", configs.Name)
  1244. watchConfigFunc(configs)
  1245. }
  1246. }
  1247. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  1248. remoteEnabled := env.IsRemoteEnabled()
  1249. if remoteEnabled {
  1250. info, err := cloudProvider.ClusterInfo()
  1251. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  1252. if err != nil {
  1253. klog.Infof("Error saving cluster id %s", err.Error())
  1254. }
  1255. _, _, err = cloud.GetOrCreateClusterMeta(info["id"], info["name"])
  1256. if err != nil {
  1257. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  1258. }
  1259. }
  1260. // Thanos Client
  1261. var thanosClient prometheus.Client
  1262. if thanos.IsEnabled() {
  1263. thanosAddress := thanos.QueryURL()
  1264. if thanosAddress != "" {
  1265. thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
  1266. _, err = prom.Validate(thanosCli)
  1267. if err != nil {
  1268. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  1269. thanosClient = thanosCli
  1270. } else {
  1271. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  1272. thanosClient = thanosCli
  1273. }
  1274. } else {
  1275. klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  1276. }
  1277. }
  1278. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  1279. var clusterInfoProvider clusters.ClusterInfoProvider
  1280. if env.IsClusterInfoFileEnabled() {
  1281. clusterInfoFile := confManager.ConfigFileAt("/var/configs/cluster-info.json")
  1282. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  1283. } else {
  1284. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  1285. }
  1286. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  1287. var clusterMap clusters.ClusterMap
  1288. if thanosClient != nil {
  1289. clusterMap = clusters.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
  1290. } else {
  1291. clusterMap = clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
  1292. }
  1293. // cache responses from model and aggregation for a default of 10 minutes;
  1294. // clear expired responses every 20 minutes
  1295. aggregateCache := cache.New(time.Minute*10, time.Minute*20)
  1296. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  1297. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1298. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  1299. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1300. // query durations that should be cached longer should be registered here
  1301. // use relatively prime numbers to minimize likelihood of synchronized
  1302. // attempts at cache warming
  1303. day := 24 * time.Hour
  1304. cacheExpiration := map[time.Duration]time.Duration{
  1305. day: maxCacheMinutes1d * time.Minute,
  1306. 2 * day: maxCacheMinutes2d * time.Minute,
  1307. 7 * day: maxCacheMinutes7d * time.Minute,
  1308. 30 * day: maxCacheMinutes30d * time.Minute,
  1309. }
  1310. var pc prometheus.Client
  1311. if thanosClient != nil {
  1312. pc = thanosClient
  1313. } else {
  1314. pc = promCli
  1315. }
  1316. costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
  1317. metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
  1318. a := &Accesses{
  1319. Router: httprouter.New(),
  1320. PrometheusClient: promCli,
  1321. ThanosClient: thanosClient,
  1322. KubeClientSet: kubeClientset,
  1323. ClusterCache: k8sCache,
  1324. ClusterMap: clusterMap,
  1325. CloudProvider: cloudProvider,
  1326. ConfigFileManager: confManager,
  1327. ClusterInfoProvider: clusterInfoProvider,
  1328. Model: costModel,
  1329. MetricsEmitter: metricsEmitter,
  1330. AggregateCache: aggregateCache,
  1331. CostDataCache: costDataCache,
  1332. ClusterCostsCache: clusterCostsCache,
  1333. OutOfClusterCache: outOfClusterCache,
  1334. SettingsCache: settingsCache,
  1335. CacheExpiration: cacheExpiration,
  1336. httpServices: services.NewCostModelServices(),
  1337. }
  1338. // Use the Accesses instance, itself, as the CostModelAggregator. This is
  1339. // confusing and unconventional, but necessary so that we can swap it
  1340. // out for the ETL-adapted version elsewhere.
  1341. // TODO clean this up once ETL is open-sourced.
  1342. a.AggAPI = a
  1343. // Initialize mechanism for subscribing to settings changes
  1344. a.InitializeSettingsPubSub()
  1345. err = a.CloudProvider.DownloadPricingData()
  1346. if err != nil {
  1347. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  1348. }
  1349. // Warm the aggregate cache unless explicitly set to false
  1350. if env.IsCacheWarmingEnabled() {
  1351. log.Infof("Init: AggregateCostModel cache warming enabled")
  1352. a.warmAggregateCostModelCache()
  1353. } else {
  1354. log.Infof("Init: AggregateCostModel cache warming disabled")
  1355. }
  1356. if !env.IsKubecostMetricsPodEnabled() {
  1357. a.MetricsEmitter.Start()
  1358. }
  1359. a.Router.GET("/costDataModel", a.CostDataModel)
  1360. a.Router.GET("/costDataModelRange", a.CostDataModelRange)
  1361. a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
  1362. a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
  1363. a.Router.GET("/outOfClusterCosts", a.OutOfClusterCostsWithCache)
  1364. a.Router.GET("/allNodePricing", a.GetAllNodePricing)
  1365. a.Router.POST("/refreshPricing", a.RefreshPricingData)
  1366. a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  1367. a.Router.GET("/clusterCosts", a.ClusterCosts)
  1368. a.Router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  1369. a.Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  1370. a.Router.GET("/managementPlatform", a.ManagementPlatform)
  1371. a.Router.GET("/clusterInfo", a.ClusterInfo)
  1372. a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  1373. a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  1374. a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  1375. a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  1376. // endpoints migrated from server
  1377. a.Router.GET("/allPersistentVolumes", a.GetAllPersistentVolumes)
  1378. a.Router.GET("/allDeployments", a.GetAllDeployments)
  1379. a.Router.GET("/allStorageClasses", a.GetAllStorageClasses)
  1380. a.Router.GET("/allStatefulSets", a.GetAllStatefulSets)
  1381. a.Router.GET("/allNodes", a.GetAllNodes)
  1382. a.Router.GET("/allPods", a.GetAllPods)
  1383. a.Router.GET("/allNamespaces", a.GetAllNamespaces)
  1384. a.Router.GET("/allDaemonSets", a.GetAllDaemonSets)
  1385. a.Router.GET("/pod/:namespace/:name", a.GetPod)
  1386. a.Router.GET("/prometheusRecordingRules", a.PrometheusRecordingRules)
  1387. a.Router.GET("/prometheusConfig", a.PrometheusConfig)
  1388. a.Router.GET("/prometheusTargets", a.PrometheusTargets)
  1389. a.Router.GET("/orphanedPods", a.GetOrphanedPods)
  1390. a.Router.GET("/installNamespace", a.GetInstallNamespace)
  1391. a.Router.GET("/podLogs", a.GetPodLogs)
  1392. a.Router.POST("/serviceKey", a.AddServiceKey)
  1393. a.Router.GET("/helmValues", a.GetHelmValues)
  1394. a.Router.GET("/status", a.Status)
  1395. // prom query proxies
  1396. a.Router.GET("/prometheusQuery", a.PrometheusQuery)
  1397. a.Router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
  1398. a.Router.GET("/thanosQuery", a.ThanosQuery)
  1399. a.Router.GET("/thanosQueryRange", a.ThanosQueryRange)
  1400. // diagnostics
  1401. a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
  1402. a.Router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
  1403. a.httpServices.RegisterAll(a.Router)
  1404. return a
  1405. }