router.go 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "flag"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "os"
  10. "reflect"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/kubecost/cost-model/pkg/services"
  16. "github.com/kubecost/cost-model/pkg/util/httputil"
  17. "github.com/kubecost/cost-model/pkg/util/timeutil"
  18. "github.com/kubecost/cost-model/pkg/util/watcher"
  19. "github.com/microcosm-cc/bluemonday"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/klog"
  22. k8serrors "k8s.io/apimachinery/pkg/api/errors"
  23. "github.com/julienschmidt/httprouter"
  24. sentry "github.com/getsentry/sentry-go"
  25. "github.com/kubecost/cost-model/pkg/cloud"
  26. "github.com/kubecost/cost-model/pkg/clustercache"
  27. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  28. "github.com/kubecost/cost-model/pkg/env"
  29. "github.com/kubecost/cost-model/pkg/errors"
  30. "github.com/kubecost/cost-model/pkg/kubecost"
  31. "github.com/kubecost/cost-model/pkg/log"
  32. "github.com/kubecost/cost-model/pkg/prom"
  33. "github.com/kubecost/cost-model/pkg/thanos"
  34. "github.com/kubecost/cost-model/pkg/util/json"
  35. prometheus "github.com/prometheus/client_golang/api"
  36. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  37. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  38. "github.com/patrickmn/go-cache"
  39. "k8s.io/client-go/kubernetes"
  40. "k8s.io/client-go/rest"
  41. "k8s.io/client-go/tools/clientcmd"
  42. )
  43. var sanitizePolicy = bluemonday.UGCPolicy()
  44. const (
  45. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  46. maxCacheMinutes1d = 11
  47. maxCacheMinutes2d = 17
  48. maxCacheMinutes7d = 37
  49. maxCacheMinutes30d = 137
  50. CustomPricingSetting = "CustomPricing"
  51. DiscountSetting = "Discount"
  52. epRules = apiPrefix + "/rules"
  53. LogSeparator = "+-------------------------------------------------------------------------------------"
  54. )
  55. var (
  56. // gitCommit is set by the build system
  57. gitCommit string
  58. )
  59. // Accesses defines a singleton application instance, providing access to
  60. // Prometheus, Kubernetes, the cloud provider, and caches.
  61. type Accesses struct {
  62. Router *httprouter.Router
  63. PrometheusClient prometheus.Client
  64. ThanosClient prometheus.Client
  65. KubeClientSet kubernetes.Interface
  66. ClusterMap clusters.ClusterMap
  67. CloudProvider cloud.Provider
  68. Model *CostModel
  69. MetricsEmitter *CostModelMetricsEmitter
  70. OutOfClusterCache *cache.Cache
  71. AggregateCache *cache.Cache
  72. CostDataCache *cache.Cache
  73. ClusterCostsCache *cache.Cache
  74. CacheExpiration map[time.Duration]time.Duration
  75. AggAPI Aggregator
  76. // SettingsCache stores current state of app settings
  77. SettingsCache *cache.Cache
  78. // settingsSubscribers tracks channels through which changes to different
  79. // settings will be published in a pub/sub model
  80. settingsSubscribers map[string][]chan string
  81. settingsMutex sync.Mutex
  82. // registered http service instances
  83. httpServices services.HTTPServices
  84. }
  85. // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
  86. // should be used.
  87. func (a *Accesses) GetPrometheusClient(remote bool) prometheus.Client {
  88. // Use Thanos Client if it exists (enabled) and remote flag set
  89. var pc prometheus.Client
  90. if remote && a.ThanosClient != nil {
  91. pc = a.ThanosClient
  92. } else {
  93. pc = a.PrometheusClient
  94. }
  95. return pc
  96. }
  97. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  98. // If one does not exists, it returns the default cache expiration, which is defined by
  99. // the particular cache.
  100. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  101. if expiration, ok := a.CacheExpiration[dur]; ok {
  102. return expiration
  103. }
  104. return cache.DefaultExpiration
  105. }
  106. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  107. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  108. // not found or is less than 2 minutes.
  109. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  110. expiry := a.GetCacheExpiration(dur).Minutes()
  111. if expiry <= 2.0 {
  112. return time.Minute
  113. }
  114. mins := time.Duration(expiry/2.0) * time.Minute
  115. return mins
  116. }
  117. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  118. w.Header().Set("Content-Type", "application/json")
  119. duration := 24 * time.Hour
  120. offset := time.Minute
  121. durationHrs := "24h"
  122. fmtOffset := "1m"
  123. pClient := a.GetPrometheusClient(true)
  124. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  125. if data, valid := a.ClusterCostsCache.Get(key); valid {
  126. clusterCosts := data.(map[string]*ClusterCosts)
  127. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  128. } else {
  129. data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
  130. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  131. }
  132. }
  133. type Response struct {
  134. Code int `json:"code"`
  135. Status string `json:"status"`
  136. Data interface{} `json:"data"`
  137. Message string `json:"message,omitempty"`
  138. Warning string `json:"warning,omitempty"`
  139. }
  140. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  141. type FilterFunc func(*CostData) (bool, string)
  142. // FilterCostData allows through only CostData that matches all the given filter functions
  143. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  144. result := make(map[string]*CostData)
  145. filteredEnvironments := make(map[string]int)
  146. filteredContainers := 0
  147. DataLoop:
  148. for key, datum := range data {
  149. for _, rf := range retains {
  150. if ok, _ := rf(datum); ok {
  151. result[key] = datum
  152. // if any retain function passes, the data is retained and move on
  153. continue DataLoop
  154. }
  155. }
  156. for _, ff := range filters {
  157. if ok, environment := ff(datum); !ok {
  158. if environment != "" {
  159. filteredEnvironments[environment]++
  160. }
  161. filteredContainers++
  162. // if any filter function check fails, move on to the next datum
  163. continue DataLoop
  164. }
  165. }
  166. result[key] = datum
  167. }
  168. return result, filteredContainers, filteredEnvironments
  169. }
  170. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  171. fs := strings.Split(fields, ",")
  172. fmap := make(map[string]bool)
  173. for _, f := range fs {
  174. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  175. klog.V(1).Infof("to delete: %s", fieldNameLower)
  176. fmap[fieldNameLower] = true
  177. }
  178. filteredData := make(map[string]CostData)
  179. for cname, costdata := range data {
  180. s := reflect.TypeOf(*costdata)
  181. val := reflect.ValueOf(*costdata)
  182. costdata2 := CostData{}
  183. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  184. n := s.NumField()
  185. for i := 0; i < n; i++ {
  186. field := s.Field(i)
  187. value := val.Field(i)
  188. value2 := cd2.Field(i)
  189. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  190. value2.Set(reflect.Value(value))
  191. }
  192. }
  193. filteredData[cname] = cd2.Interface().(CostData)
  194. }
  195. return filteredData
  196. }
  197. func normalizeTimeParam(param string) (string, error) {
  198. if param == "" {
  199. return "", fmt.Errorf("invalid time param")
  200. }
  201. // convert days to hours
  202. if param[len(param)-1:] == "d" {
  203. count := param[:len(param)-1]
  204. val, err := strconv.ParseInt(count, 10, 64)
  205. if err != nil {
  206. return "", err
  207. }
  208. val = val * 24
  209. param = fmt.Sprintf("%dh", val)
  210. }
  211. return param, nil
  212. }
  213. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  214. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  215. // return 0.0.
  216. func ParsePercentString(percentStr string) (float64, error) {
  217. if len(percentStr) == 0 {
  218. return 0.0, nil
  219. }
  220. if percentStr[len(percentStr)-1:] == "%" {
  221. percentStr = percentStr[:len(percentStr)-1]
  222. }
  223. discount, err := strconv.ParseFloat(percentStr, 64)
  224. if err != nil {
  225. return 0.0, err
  226. }
  227. discount *= 0.01
  228. return discount, nil
  229. }
  230. func WrapData(data interface{}, err error) []byte {
  231. var resp []byte
  232. if err != nil {
  233. klog.V(1).Infof("Error returned to client: %s", err.Error())
  234. resp, _ = json.Marshal(&Response{
  235. Code: http.StatusInternalServerError,
  236. Status: "error",
  237. Message: err.Error(),
  238. Data: data,
  239. })
  240. } else {
  241. resp, _ = json.Marshal(&Response{
  242. Code: http.StatusOK,
  243. Status: "success",
  244. Data: data,
  245. })
  246. }
  247. return resp
  248. }
  249. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  250. var resp []byte
  251. if err != nil {
  252. klog.V(1).Infof("Error returned to client: %s", err.Error())
  253. resp, _ = json.Marshal(&Response{
  254. Code: http.StatusInternalServerError,
  255. Status: "error",
  256. Message: err.Error(),
  257. Data: data,
  258. })
  259. } else {
  260. resp, _ = json.Marshal(&Response{
  261. Code: http.StatusOK,
  262. Status: "success",
  263. Data: data,
  264. Message: message,
  265. })
  266. }
  267. return resp
  268. }
  269. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  270. var resp []byte
  271. if err != nil {
  272. klog.V(1).Infof("Error returned to client: %s", err.Error())
  273. resp, _ = json.Marshal(&Response{
  274. Code: http.StatusInternalServerError,
  275. Status: "error",
  276. Message: err.Error(),
  277. Warning: warning,
  278. Data: data,
  279. })
  280. } else {
  281. resp, _ = json.Marshal(&Response{
  282. Code: http.StatusOK,
  283. Status: "success",
  284. Data: data,
  285. Warning: warning,
  286. })
  287. }
  288. return resp
  289. }
  290. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  291. var resp []byte
  292. if err != nil {
  293. klog.V(1).Infof("Error returned to client: %s", err.Error())
  294. resp, _ = json.Marshal(&Response{
  295. Code: http.StatusInternalServerError,
  296. Status: "error",
  297. Message: err.Error(),
  298. Warning: warning,
  299. Data: data,
  300. })
  301. } else {
  302. resp, _ = json.Marshal(&Response{
  303. Code: http.StatusOK,
  304. Status: "success",
  305. Data: data,
  306. Message: message,
  307. Warning: warning,
  308. })
  309. }
  310. return resp
  311. }
  312. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  313. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  314. w.Header().Set("Content-Type", "application/json")
  315. w.Header().Set("Access-Control-Allow-Origin", "*")
  316. err := a.CloudProvider.DownloadPricingData()
  317. if err != nil {
  318. klog.V(1).Infof("Error refreshing pricing data: %s", err.Error())
  319. }
  320. w.Write(WrapData(nil, err))
  321. }
  322. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  323. w.Header().Set("Content-Type", "application/json")
  324. w.Header().Set("Access-Control-Allow-Origin", "*")
  325. window := r.URL.Query().Get("timeWindow")
  326. offset := r.URL.Query().Get("offset")
  327. fields := r.URL.Query().Get("filterFields")
  328. namespace := r.URL.Query().Get("namespace")
  329. if offset != "" {
  330. offset = "offset " + offset
  331. }
  332. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
  333. if fields != "" {
  334. filteredData := filterFields(fields, data)
  335. w.Write(WrapData(filteredData, err))
  336. } else {
  337. w.Write(WrapData(data, err))
  338. }
  339. }
  340. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  341. w.Header().Set("Content-Type", "application/json")
  342. w.Header().Set("Access-Control-Allow-Origin", "*")
  343. window := r.URL.Query().Get("window")
  344. offset := r.URL.Query().Get("offset")
  345. if window == "" {
  346. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  347. return
  348. }
  349. windowDur, err := timeutil.ParseDuration(window)
  350. if err != nil {
  351. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  352. return
  353. }
  354. // offset is not a required parameter
  355. var offsetDur time.Duration
  356. if offset != "" {
  357. offsetDur, err = timeutil.ParseDuration(offset)
  358. if err != nil {
  359. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  360. return
  361. }
  362. }
  363. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  364. if useThanos && !thanos.IsEnabled() {
  365. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  366. return
  367. }
  368. var client prometheus.Client
  369. if useThanos {
  370. client = a.ThanosClient
  371. offsetDur = thanos.OffsetDuration()
  372. } else {
  373. client = a.PrometheusClient
  374. }
  375. data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
  376. w.Write(WrapData(data, err))
  377. }
  378. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  379. w.Header().Set("Content-Type", "application/json")
  380. w.Header().Set("Access-Control-Allow-Origin", "*")
  381. start := r.URL.Query().Get("start")
  382. end := r.URL.Query().Get("end")
  383. window := r.URL.Query().Get("window")
  384. offset := r.URL.Query().Get("offset")
  385. if window == "" {
  386. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  387. return
  388. }
  389. windowDur, err := timeutil.ParseDuration(window)
  390. if err != nil {
  391. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  392. return
  393. }
  394. // offset is not a required parameter
  395. var offsetDur time.Duration
  396. if offset != "" {
  397. offsetDur, err = timeutil.ParseDuration(offset)
  398. if err != nil {
  399. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  400. return
  401. }
  402. }
  403. data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
  404. w.Write(WrapData(data, err))
  405. }
  406. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  407. w.Header().Set("Content-Type", "application/json")
  408. w.Header().Set("Access-Control-Allow-Origin", "*")
  409. startStr := r.URL.Query().Get("start")
  410. endStr := r.URL.Query().Get("end")
  411. windowStr := r.URL.Query().Get("window")
  412. fields := r.URL.Query().Get("filterFields")
  413. namespace := r.URL.Query().Get("namespace")
  414. cluster := r.URL.Query().Get("cluster")
  415. remote := r.URL.Query().Get("remote")
  416. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  417. layout := "2006-01-02T15:04:05.000Z"
  418. start, err := time.Parse(layout, startStr)
  419. if err != nil {
  420. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
  421. return
  422. }
  423. end, err := time.Parse(layout, endStr)
  424. if err != nil {
  425. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
  426. return
  427. }
  428. window := kubecost.NewWindow(&start, &end)
  429. if window.IsOpen() || window.IsEmpty() || window.IsNegative() {
  430. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
  431. return
  432. }
  433. resolution := time.Hour
  434. if resDur, err := time.ParseDuration(windowStr); err == nil {
  435. resolution = resDur
  436. }
  437. // Use Thanos Client if it exists (enabled) and remote flag set
  438. var pClient prometheus.Client
  439. if remote != "false" && a.ThanosClient != nil {
  440. pClient = a.ThanosClient
  441. } else {
  442. pClient = a.PrometheusClient
  443. }
  444. data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
  445. if err != nil {
  446. w.Write(WrapData(nil, err))
  447. }
  448. if fields != "" {
  449. filteredData := filterFields(fields, data)
  450. w.Write(WrapData(filteredData, err))
  451. } else {
  452. w.Write(WrapData(data, err))
  453. }
  454. }
  455. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  456. var key string
  457. var filter string
  458. var val []string
  459. if customAggregation != "" {
  460. key = customAggregation
  461. filter = filterType
  462. val = strings.Split(customAggregation, ",")
  463. } else {
  464. aggregations := strings.Split(aggregator, ",")
  465. for i, agg := range aggregations {
  466. aggregations[i] = "kubernetes_" + agg
  467. }
  468. key = strings.Join(aggregations, ",")
  469. filter = "kubernetes_" + filterType
  470. val = aggregations
  471. }
  472. return key, val, filter
  473. }
  474. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  475. w.Header().Set("Content-Type", "application/json")
  476. w.Header().Set("Access-Control-Allow-Origin", "*")
  477. start := r.URL.Query().Get("start")
  478. end := r.URL.Query().Get("end")
  479. aggregator := r.URL.Query().Get("aggregator")
  480. customAggregation := r.URL.Query().Get("customAggregation")
  481. filterType := r.URL.Query().Get("filterType")
  482. filterValue := r.URL.Query().Get("filterValue")
  483. var data []*cloud.OutOfClusterAllocation
  484. var err error
  485. _, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
  486. data, err = a.CloudProvider.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
  487. w.Write(WrapData(data, err))
  488. }
  489. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  490. w.Header().Set("Content-Type", "application/json")
  491. w.Header().Set("Access-Control-Allow-Origin", "*")
  492. // start date for which to query costs, inclusive; format YYYY-MM-DD
  493. start := r.URL.Query().Get("start")
  494. // end date for which to query costs, inclusive; format YYYY-MM-DD
  495. end := r.URL.Query().Get("end")
  496. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  497. kubernetesAggregation := r.URL.Query().Get("aggregator")
  498. // customAggregation allows full customization of aggregator w/o prepending
  499. customAggregation := r.URL.Query().Get("customAggregation")
  500. // disableCache, if set to "true", tells this function to recompute and
  501. // cache the requested data
  502. disableCache := r.URL.Query().Get("disableCache") == "true"
  503. // clearCache, if set to "true", tells this function to flush the cache,
  504. // then recompute and cache the requested data
  505. clearCache := r.URL.Query().Get("clearCache") == "true"
  506. filterType := r.URL.Query().Get("filterType")
  507. filterValue := r.URL.Query().Get("filterValue")
  508. aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
  509. // clear cache prior to checking the cache so that a clearCache=true
  510. // request always returns a freshly computed value
  511. if clearCache {
  512. a.OutOfClusterCache.Flush()
  513. }
  514. // attempt to retrieve cost data from cache
  515. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
  516. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  517. if data, ok := value.([]*cloud.OutOfClusterAllocation); ok {
  518. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  519. return
  520. }
  521. klog.Errorf("caching error: failed to type cast data: %s", key)
  522. }
  523. data, err := a.CloudProvider.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
  524. if err == nil {
  525. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  526. }
  527. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  528. }
  529. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  530. w.Header().Set("Content-Type", "application/json")
  531. w.Header().Set("Access-Control-Allow-Origin", "*")
  532. data, err := a.CloudProvider.AllNodePricing()
  533. w.Write(WrapData(data, err))
  534. }
  535. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  536. w.Header().Set("Content-Type", "application/json")
  537. w.Header().Set("Access-Control-Allow-Origin", "*")
  538. data, err := a.CloudProvider.GetConfig()
  539. w.Write(WrapData(data, err))
  540. }
  541. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  542. w.Header().Set("Content-Type", "application/json")
  543. w.Header().Set("Access-Control-Allow-Origin", "*")
  544. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.SpotInfoUpdateType)
  545. if err != nil {
  546. w.Write(WrapData(data, err))
  547. return
  548. }
  549. w.Write(WrapData(data, err))
  550. err = a.CloudProvider.DownloadPricingData()
  551. if err != nil {
  552. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  553. }
  554. return
  555. }
  556. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  557. w.Header().Set("Content-Type", "application/json")
  558. w.Header().Set("Access-Control-Allow-Origin", "*")
  559. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.AthenaInfoUpdateType)
  560. if err != nil {
  561. w.Write(WrapData(data, err))
  562. return
  563. }
  564. w.Write(WrapData(data, err))
  565. return
  566. }
  567. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  568. w.Header().Set("Content-Type", "application/json")
  569. w.Header().Set("Access-Control-Allow-Origin", "*")
  570. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.BigqueryUpdateType)
  571. if err != nil {
  572. w.Write(WrapData(data, err))
  573. return
  574. }
  575. w.Write(WrapData(data, err))
  576. return
  577. }
  578. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  579. w.Header().Set("Content-Type", "application/json")
  580. w.Header().Set("Access-Control-Allow-Origin", "*")
  581. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  582. if err != nil {
  583. w.Write(WrapData(data, err))
  584. return
  585. }
  586. w.Write(WrapData(data, err))
  587. return
  588. }
  589. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  590. w.Header().Set("Content-Type", "application/json")
  591. w.Header().Set("Access-Control-Allow-Origin", "*")
  592. data, err := a.CloudProvider.GetManagementPlatform()
  593. if err != nil {
  594. w.Write(WrapData(data, err))
  595. return
  596. }
  597. w.Write(WrapData(data, err))
  598. return
  599. }
  600. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  601. w.Header().Set("Content-Type", "application/json")
  602. w.Header().Set("Access-Control-Allow-Origin", "*")
  603. data := GetClusterInfo(a.KubeClientSet, a.CloudProvider)
  604. w.Write(WrapData(data, nil))
  605. }
  606. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  607. w.Header().Set("Content-Type", "application/json")
  608. w.Header().Set("Access-Control-Allow-Origin", "*")
  609. data := a.ClusterMap.AsMap()
  610. w.Write(WrapData(data, nil))
  611. }
  612. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  613. w.Header().Set("Content-Type", "application/json")
  614. w.Header().Set("Access-Control-Allow-Origin", "*")
  615. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  616. }
  617. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  618. w.Header().Set("Content-Type", "application/json")
  619. w.Header().Set("Access-Control-Allow-Origin", "*")
  620. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  621. }
  622. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  623. w.Header().Set("Content-Type", "application/json")
  624. w.Header().Set("Access-Control-Allow-Origin", "*")
  625. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  626. }
  627. func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  628. w.Header().Set("Content-Type", "application/json")
  629. w.Header().Set("Access-Control-Allow-Origin", "*")
  630. w.Write(WrapData(prom.Validate(a.PrometheusClient)))
  631. }
  632. func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  633. w.Header().Set("Content-Type", "application/json")
  634. w.Header().Set("Access-Control-Allow-Origin", "*")
  635. qp := httputil.NewQueryParams(r.URL.Query())
  636. query := qp.Get("query", "")
  637. if query == "" {
  638. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  639. return
  640. }
  641. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  642. body, err := ctx.RawQuery(query)
  643. if err != nil {
  644. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  645. return
  646. }
  647. w.Write(body)
  648. }
  649. func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  650. w.Header().Set("Content-Type", "application/json")
  651. w.Header().Set("Access-Control-Allow-Origin", "*")
  652. qp := httputil.NewQueryParams(r.URL.Query())
  653. query := qp.Get("query", "")
  654. if query == "" {
  655. fmt.Fprintf(w, "Error parsing query from request parameters.")
  656. return
  657. }
  658. start, end, duration, err := toStartEndStep(qp)
  659. if err != nil {
  660. fmt.Fprintf(w, err.Error())
  661. return
  662. }
  663. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  664. body, err := ctx.RawQueryRange(query, start, end, duration)
  665. if err != nil {
  666. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  667. return
  668. }
  669. w.Write(body)
  670. }
  671. func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  672. w.Header().Set("Content-Type", "application/json")
  673. w.Header().Set("Access-Control-Allow-Origin", "*")
  674. if !thanos.IsEnabled() {
  675. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  676. return
  677. }
  678. qp := httputil.NewQueryParams(r.URL.Query())
  679. query := qp.Get("query", "")
  680. if query == "" {
  681. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  682. return
  683. }
  684. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  685. body, err := ctx.RawQuery(query)
  686. if err != nil {
  687. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  688. return
  689. }
  690. w.Write(body)
  691. }
  692. func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  693. w.Header().Set("Content-Type", "application/json")
  694. w.Header().Set("Access-Control-Allow-Origin", "*")
  695. if !thanos.IsEnabled() {
  696. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  697. return
  698. }
  699. qp := httputil.NewQueryParams(r.URL.Query())
  700. query := qp.Get("query", "")
  701. if query == "" {
  702. fmt.Fprintf(w, "Error parsing query from request parameters.")
  703. return
  704. }
  705. start, end, duration, err := toStartEndStep(qp)
  706. if err != nil {
  707. fmt.Fprintf(w, err.Error())
  708. return
  709. }
  710. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  711. body, err := ctx.RawQueryRange(query, start, end, duration)
  712. if err != nil {
  713. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  714. return
  715. }
  716. w.Write(body)
  717. }
  718. // helper for query range proxy requests
  719. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  720. var e error
  721. ss := qp.Get("start", "")
  722. es := qp.Get("end", "")
  723. ds := qp.Get("duration", "")
  724. layout := "2006-01-02T15:04:05.000Z"
  725. start, e = time.Parse(layout, ss)
  726. if e != nil {
  727. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  728. return
  729. }
  730. end, e = time.Parse(layout, es)
  731. if e != nil {
  732. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  733. return
  734. }
  735. step, e = time.ParseDuration(ds)
  736. if e != nil {
  737. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  738. return
  739. }
  740. err = nil
  741. return
  742. }
  743. func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  744. w.Header().Set("Content-Type", "application/json")
  745. w.Header().Set("Access-Control-Allow-Origin", "*")
  746. promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
  747. if err != nil {
  748. w.Write(WrapData(nil, err))
  749. return
  750. }
  751. result := map[string]*prom.PrometheusQueueState{
  752. "prometheus": promQueueState,
  753. }
  754. if thanos.IsEnabled() {
  755. thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
  756. if err != nil {
  757. log.Warningf("Error getting Thanos queue state: %s", err)
  758. } else {
  759. result["thanos"] = thanosQueueState
  760. }
  761. }
  762. w.Write(WrapData(result, nil))
  763. }
  764. // GetPrometheusMetrics retrieves availability of Prometheus and Thanos metrics
  765. func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  766. w.Header().Set("Content-Type", "application/json")
  767. w.Header().Set("Access-Control-Allow-Origin", "*")
  768. promMetrics, err := prom.GetPrometheusMetrics(a.PrometheusClient, "")
  769. if err != nil {
  770. w.Write(WrapData(nil, err))
  771. return
  772. }
  773. result := map[string][]*prom.PrometheusDiagnostic{
  774. "prometheus": promMetrics,
  775. }
  776. if thanos.IsEnabled() {
  777. thanosMetrics, err := prom.GetPrometheusMetrics(a.ThanosClient, thanos.QueryOffset())
  778. if err != nil {
  779. log.Warningf("Error getting Thanos queue state: %s", err)
  780. } else {
  781. result["thanos"] = thanosMetrics
  782. }
  783. }
  784. w.Write(WrapData(result, nil))
  785. }
  786. func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  787. w.Header().Set("Content-Type", "application/json")
  788. w.Header().Set("Access-Control-Allow-Origin", "*")
  789. pvList, err := a.KubeClientSet.CoreV1().PersistentVolumes().List(r.Context(), metav1.ListOptions{})
  790. if err != nil {
  791. fmt.Fprintf(w, "Error getting persistent volume %v\n", err)
  792. }
  793. body, err := json.Marshal(pvList)
  794. if err != nil {
  795. fmt.Fprintf(w, "Error decoding persistent volumes: "+err.Error())
  796. } else {
  797. w.Write(body)
  798. }
  799. }
  800. func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  801. w.Header().Set("Content-Type", "application/json")
  802. w.Header().Set("Access-Control-Allow-Origin", "*")
  803. namespace := r.URL.Query().Get("namespace")
  804. deploymentsList, err := a.KubeClientSet.AppsV1().Deployments(namespace).List(r.Context(), metav1.ListOptions{})
  805. if err != nil {
  806. fmt.Fprintf(w, "Error getting deployments %v\n", err)
  807. }
  808. body, err := json.Marshal(deploymentsList)
  809. if err != nil {
  810. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  811. } else {
  812. w.Write(body)
  813. }
  814. }
  815. func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  816. w.Header().Set("Content-Type", "application/json")
  817. w.Header().Set("Access-Control-Allow-Origin", "*")
  818. scList, err := a.KubeClientSet.StorageV1().StorageClasses().List(r.Context(), metav1.ListOptions{})
  819. if err != nil {
  820. fmt.Fprintf(w, "Error getting storageclasses: "+err.Error())
  821. }
  822. body, err := json.Marshal(scList)
  823. if err != nil {
  824. fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
  825. } else {
  826. w.Write(body)
  827. }
  828. }
  829. func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  830. w.Header().Set("Content-Type", "application/json")
  831. w.Header().Set("Access-Control-Allow-Origin", "*")
  832. namespace := r.URL.Query().Get("namespace")
  833. deploymentsList, err := a.KubeClientSet.AppsV1().StatefulSets(namespace).List(r.Context(), metav1.ListOptions{})
  834. if err != nil {
  835. fmt.Fprintf(w, "Error getting deployments %v\n", err)
  836. }
  837. body, err := json.Marshal(deploymentsList)
  838. if err != nil {
  839. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  840. } else {
  841. w.Write(body)
  842. }
  843. }
  844. func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  845. w.Header().Set("Content-Type", "application/json")
  846. w.Header().Set("Access-Control-Allow-Origin", "*")
  847. nodeList, err := a.KubeClientSet.CoreV1().Nodes().List(r.Context(), metav1.ListOptions{})
  848. if err != nil {
  849. fmt.Fprintf(w, "Error getting node %v\n", err)
  850. }
  851. body, err := json.Marshal(nodeList)
  852. if err != nil {
  853. fmt.Fprintf(w, "Error decoding nodes: "+err.Error())
  854. } else {
  855. w.Write(body)
  856. }
  857. }
  858. func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  859. w.Header().Set("Content-Type", "application/json")
  860. w.Header().Set("Access-Control-Allow-Origin", "*")
  861. podlist, err := a.KubeClientSet.CoreV1().Pods("").List(r.Context(), metav1.ListOptions{})
  862. if err != nil {
  863. fmt.Fprintf(w, "Error getting pod %v\n", err)
  864. }
  865. body, err := json.Marshal(podlist)
  866. if err != nil {
  867. fmt.Fprintf(w, "Error decoding pods: "+err.Error())
  868. } else {
  869. w.Write(body)
  870. }
  871. }
  872. func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  873. w.Header().Set("Content-Type", "application/json")
  874. w.Header().Set("Access-Control-Allow-Origin", "*")
  875. namespaces, err := a.KubeClientSet.CoreV1().Namespaces().List(r.Context(), metav1.ListOptions{})
  876. if err != nil {
  877. fmt.Fprintf(w, "Error getting namespaces %v\n", err)
  878. }
  879. body, err := json.Marshal(namespaces)
  880. if err != nil {
  881. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  882. } else {
  883. w.Write(body)
  884. }
  885. }
  886. func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  887. w.Header().Set("Content-Type", "application/json")
  888. w.Header().Set("Access-Control-Allow-Origin", "*")
  889. daemonSets, err := a.KubeClientSet.AppsV1().DaemonSets("").List(r.Context(), metav1.ListOptions{})
  890. if err != nil {
  891. fmt.Fprintf(w, "Error getting daemon sets %v\n", err)
  892. }
  893. body, err := json.Marshal(daemonSets)
  894. if err != nil {
  895. fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
  896. } else {
  897. w.Write(body)
  898. }
  899. }
  900. func (a *Accesses) GetPod(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  901. w.Header().Set("Content-Type", "application/json")
  902. w.Header().Set("Access-Control-Allow-Origin", "*")
  903. podName := ps.ByName("name")
  904. podNamespace := ps.ByName("namespace")
  905. // Examples for error handling:
  906. // - Use helper functions like e.g. errors.IsNotFound()
  907. // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
  908. pod, err := a.KubeClientSet.CoreV1().Pods(podNamespace).Get(r.Context(), podName, metav1.GetOptions{})
  909. if k8serrors.IsNotFound(err) {
  910. fmt.Fprintf(w, "Pod not found\n")
  911. } else if statusError, isStatus := err.(*k8serrors.StatusError); isStatus {
  912. fmt.Fprintf(w, "Error getting pod %v\n", statusError.ErrStatus.Message)
  913. } else if err != nil {
  914. fmt.Fprintf(w, "Error getting pod: "+err.Error())
  915. } else {
  916. body, err := json.Marshal(pod)
  917. if err != nil {
  918. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  919. } else {
  920. w.Write(body)
  921. }
  922. }
  923. }
  924. func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  925. w.Header().Set("Content-Type", "application/json")
  926. w.Header().Set("Access-Control-Allow-Origin", "*")
  927. u := a.PrometheusClient.URL(epRules, nil)
  928. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  929. if err != nil {
  930. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  931. }
  932. _, body, _, err := a.PrometheusClient.Do(r.Context(), req)
  933. if err != nil {
  934. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  935. } else {
  936. w.Write(body)
  937. }
  938. }
  939. func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  940. w.Header().Set("Content-Type", "application/json")
  941. w.Header().Set("Access-Control-Allow-Origin", "*")
  942. pConfig := make(map[string]string)
  943. pConfig["address"] = os.Getenv("PROMETHEUS_SERVER_ENDPOINT")
  944. body, err := json.Marshal(pConfig)
  945. if err != nil {
  946. fmt.Fprintf(w, "Error marshalling prometheus config")
  947. } else {
  948. w.Write(body)
  949. }
  950. }
  951. func (a *Accesses) PrometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  952. w.Header().Set("Content-Type", "application/json")
  953. w.Header().Set("Access-Control-Allow-Origin", "*")
  954. u := a.PrometheusClient.URL(epTargets, nil)
  955. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  956. if err != nil {
  957. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  958. }
  959. _, body, _, err := a.PrometheusClient.Do(r.Context(), req)
  960. if err != nil {
  961. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  962. } else {
  963. w.Write(body)
  964. }
  965. }
  966. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  967. w.Header().Set("Content-Type", "application/json")
  968. w.Header().Set("Access-Control-Allow-Origin", "*")
  969. podlist, err := a.KubeClientSet.CoreV1().Pods("").List(r.Context(), metav1.ListOptions{})
  970. if err != nil {
  971. fmt.Fprintf(w, "Error getting pod %v\n", err)
  972. }
  973. var lonePods []v1.Pod
  974. for _, pod := range podlist.Items {
  975. if len(pod.OwnerReferences) == 0 {
  976. lonePods = append(lonePods, pod)
  977. }
  978. }
  979. body, err := json.Marshal(lonePods)
  980. if err != nil {
  981. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  982. } else {
  983. w.Write(body)
  984. }
  985. }
  986. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  987. w.Header().Set("Content-Type", "application/json")
  988. w.Header().Set("Access-Control-Allow-Origin", "*")
  989. ns := os.Getenv("KUBECOST_NAMESPACE")
  990. w.Write([]byte(ns))
  991. }
  992. func logsFor(c kubernetes.Interface, namespace string, pod string, container string, dur time.Duration, ctx context.Context) (string, error) {
  993. since := time.Now().UTC().Add(-dur)
  994. logOpts := v1.PodLogOptions{
  995. SinceTime: &metav1.Time{Time: since},
  996. }
  997. if container != "" {
  998. logOpts.Container = container
  999. }
  1000. req := c.CoreV1().Pods(namespace).GetLogs(pod, &logOpts)
  1001. reader, err := req.Stream(ctx)
  1002. if err != nil {
  1003. return "", err
  1004. }
  1005. podLogs, err := ioutil.ReadAll(reader)
  1006. if err != nil {
  1007. return "", err
  1008. }
  1009. return string(podLogs), nil
  1010. }
  1011. func (a *Accesses) GetPodLogs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1012. w.Header().Set("Content-Type", "application/json")
  1013. w.Header().Set("Access-Control-Allow-Origin", "*")
  1014. q := r.URL.Query()
  1015. ns := q.Get("namespace")
  1016. if ns == "" {
  1017. ns = os.Getenv("KUBECOST_NAMESPACE")
  1018. }
  1019. pod := q.Get("pod")
  1020. selector := q.Get("selector")
  1021. container := q.Get("container")
  1022. since := q.Get("since")
  1023. if since == "" {
  1024. since = "24h"
  1025. }
  1026. sinceDuration, err := time.ParseDuration(since)
  1027. if err != nil {
  1028. fmt.Fprintf(w, "Invalid Duration String: "+err.Error())
  1029. return
  1030. }
  1031. var logResult string
  1032. appendLog := func(ns string, pod string, container string, l string) {
  1033. if l == "" {
  1034. return
  1035. }
  1036. logResult += fmt.Sprintf("%s\n| %s:%s:%s\n%s\n%s\n\n", LogSeparator, ns, pod, container, LogSeparator, l)
  1037. }
  1038. if pod != "" {
  1039. pd, err := a.KubeClientSet.CoreV1().Pods(ns).Get(r.Context(), pod, metav1.GetOptions{})
  1040. if err != nil {
  1041. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1042. return
  1043. }
  1044. if container != "" {
  1045. var foundContainer bool
  1046. for _, cont := range pd.Spec.Containers {
  1047. if strings.EqualFold(cont.Name, container) {
  1048. foundContainer = true
  1049. break
  1050. }
  1051. }
  1052. if !foundContainer {
  1053. fmt.Fprintf(w, "Could not find container: "+container)
  1054. return
  1055. }
  1056. }
  1057. logs, err := logsFor(a.KubeClientSet, ns, pod, container, sinceDuration, r.Context())
  1058. if err != nil {
  1059. fmt.Fprintf(w, "Error Getting Logs: "+err.Error())
  1060. return
  1061. }
  1062. appendLog(ns, pod, container, logs)
  1063. w.Write([]byte(logResult))
  1064. return
  1065. }
  1066. if selector != "" {
  1067. pods, err := a.KubeClientSet.CoreV1().Pods(ns).List(r.Context(), metav1.ListOptions{LabelSelector: selector})
  1068. if err != nil {
  1069. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1070. return
  1071. }
  1072. for _, pd := range pods.Items {
  1073. for _, cont := range pd.Spec.Containers {
  1074. logs, err := logsFor(a.KubeClientSet, ns, pd.Name, cont.Name, sinceDuration, r.Context())
  1075. if err != nil {
  1076. continue
  1077. }
  1078. appendLog(ns, pd.Name, cont.Name, logs)
  1079. }
  1080. }
  1081. }
  1082. w.Write([]byte(logResult))
  1083. }
  1084. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1085. w.Header().Set("Content-Type", "application/json")
  1086. w.Header().Set("Access-Control-Allow-Origin", "*")
  1087. r.ParseForm()
  1088. //p.CloudProvider.AddServiceKey(r.PostForm)
  1089. key := r.PostForm.Get("key")
  1090. k := []byte(key)
  1091. err := ioutil.WriteFile("/var/configs/key.json", k, 0644)
  1092. if err != nil {
  1093. fmt.Fprintf(w, "Error writing service key: "+err.Error())
  1094. }
  1095. w.WriteHeader(http.StatusOK)
  1096. }
  1097. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1098. w.Header().Set("Content-Type", "application/json")
  1099. w.Header().Set("Access-Control-Allow-Origin", "*")
  1100. encodedValues := os.Getenv("HELM_VALUES")
  1101. if encodedValues == "" {
  1102. fmt.Fprintf(w, "Values reporting disabled")
  1103. return
  1104. }
  1105. result, err := base64.StdEncoding.DecodeString(encodedValues)
  1106. if err != nil {
  1107. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  1108. return
  1109. }
  1110. w.Write(result)
  1111. }
  1112. func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  1113. w.Header().Set("Content-Type", "application/json")
  1114. w.Header().Set("Access-Control-Allow-Origin", "*")
  1115. api := prometheusAPI.NewAPI(a.PrometheusClient)
  1116. result, err := api.Config(r.Context())
  1117. if err != nil {
  1118. fmt.Fprintf(w, "Using Prometheus at "+os.Getenv("PROMETHEUS_SERVER_ENDPOINT")+". Error: "+err.Error())
  1119. } else {
  1120. fmt.Fprintf(w, "Using Prometheus at "+os.Getenv("PROMETHEUS_SERVER_ENDPOINT")+". PrometheusConfig: "+result.YAML)
  1121. }
  1122. }
  1123. // captures the panic event in sentry
  1124. func capturePanicEvent(err string, stack string) {
  1125. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  1126. klog.V(1).Infoln(msg)
  1127. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  1128. Level: sentry.LevelError,
  1129. Message: msg,
  1130. })
  1131. sentry.Flush(5 * time.Second)
  1132. }
  1133. // handle any panics reported by the errors package
  1134. func handlePanic(p errors.Panic) bool {
  1135. err := p.Error
  1136. if err != nil {
  1137. if err, ok := err.(error); ok {
  1138. capturePanicEvent(err.Error(), p.Stack)
  1139. }
  1140. if err, ok := err.(string); ok {
  1141. capturePanicEvent(err, p.Stack)
  1142. }
  1143. }
  1144. // Return true to recover iff the type is http, otherwise allow kubernetes
  1145. // to recover.
  1146. return p.Type == errors.PanicTypeHTTP
  1147. }
  1148. func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  1149. klog.InitFlags(nil)
  1150. flag.Set("v", "3")
  1151. flag.Parse()
  1152. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", env.GetAppVersion())
  1153. configWatchers := watcher.NewConfigMapWatchers(additionalConfigWatchers...)
  1154. var err error
  1155. if errorReportingEnabled {
  1156. err = sentry.Init(sentry.ClientOptions{Release: env.GetAppVersion()})
  1157. if err != nil {
  1158. klog.Infof("Failed to initialize sentry for error reporting")
  1159. } else {
  1160. err = errors.SetPanicHandler(handlePanic)
  1161. if err != nil {
  1162. klog.Infof("Failed to set panic handler: %s", err)
  1163. }
  1164. }
  1165. }
  1166. address := env.GetPrometheusServerEndpoint()
  1167. if address == "" {
  1168. klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  1169. }
  1170. queryConcurrency := env.GetMaxQueryConcurrency()
  1171. klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  1172. timeout := 120 * time.Second
  1173. keepAlive := 120 * time.Second
  1174. scrapeInterval := time.Minute
  1175. promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
  1176. if err != nil {
  1177. klog.Fatalf("Failed to create prometheus client, Error: %v", err)
  1178. }
  1179. m, err := prom.Validate(promCli)
  1180. if err != nil || !m.Running {
  1181. if err != nil {
  1182. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1183. } else if !m.Running {
  1184. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  1185. }
  1186. } else {
  1187. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  1188. }
  1189. api := prometheusAPI.NewAPI(promCli)
  1190. _, err = api.Config(context.Background())
  1191. if err != nil {
  1192. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1193. } else {
  1194. klog.Infof("Retrieved a prometheus config file from: %s", address)
  1195. }
  1196. // Lookup scrape interval for kubecost job, update if found
  1197. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  1198. if err == nil {
  1199. scrapeInterval = si
  1200. }
  1201. klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  1202. // Kubernetes API setup
  1203. var kc *rest.Config
  1204. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  1205. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  1206. } else {
  1207. kc, err = rest.InClusterConfig()
  1208. }
  1209. if err != nil {
  1210. panic(err.Error())
  1211. }
  1212. kubeClientset, err := kubernetes.NewForConfig(kc)
  1213. if err != nil {
  1214. panic(err.Error())
  1215. }
  1216. // Create Kubernetes Cluster Cache + Watchers
  1217. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  1218. k8sCache.Run()
  1219. cloudProviderKey := env.GetCloudProviderAPIKey()
  1220. cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey)
  1221. if err != nil {
  1222. panic(err.Error())
  1223. }
  1224. // Append the pricing config watcher
  1225. configWatchers.AddWatcher(cloud.ConfigWatcherFor(cloudProvider))
  1226. watchConfigFunc := configWatchers.ToWatchFunc()
  1227. watchedConfigs := configWatchers.GetWatchedConfigs()
  1228. kubecostNamespace := env.GetKubecostNamespace()
  1229. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  1230. for _, cw := range watchedConfigs {
  1231. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
  1232. if err != nil {
  1233. klog.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
  1234. } else {
  1235. klog.Infof("Found configmap %s, watching...", configs.Name)
  1236. watchConfigFunc(configs)
  1237. }
  1238. }
  1239. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  1240. remoteEnabled := env.IsRemoteEnabled()
  1241. if remoteEnabled {
  1242. info, err := cloudProvider.ClusterInfo()
  1243. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  1244. if err != nil {
  1245. klog.Infof("Error saving cluster id %s", err.Error())
  1246. }
  1247. _, _, err = cloud.GetOrCreateClusterMeta(info["id"], info["name"])
  1248. if err != nil {
  1249. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  1250. }
  1251. }
  1252. // Thanos Client
  1253. var thanosClient prometheus.Client
  1254. if thanos.IsEnabled() {
  1255. thanosAddress := thanos.QueryURL()
  1256. if thanosAddress != "" {
  1257. thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
  1258. _, err = prom.Validate(thanosCli)
  1259. if err != nil {
  1260. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  1261. thanosClient = thanosCli
  1262. } else {
  1263. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  1264. thanosClient = thanosCli
  1265. }
  1266. } else {
  1267. klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  1268. }
  1269. }
  1270. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  1271. var clusterMap clusters.ClusterMap
  1272. localCIProvider := NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  1273. if thanosClient != nil {
  1274. clusterMap = clusters.NewClusterMap(thanosClient, localCIProvider, 10*time.Minute)
  1275. } else {
  1276. clusterMap = clusters.NewClusterMap(promCli, localCIProvider, 5*time.Minute)
  1277. }
  1278. // cache responses from model and aggregation for a default of 10 minutes;
  1279. // clear expired responses every 20 minutes
  1280. aggregateCache := cache.New(time.Minute*10, time.Minute*20)
  1281. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  1282. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1283. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  1284. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1285. // query durations that should be cached longer should be registered here
  1286. // use relatively prime numbers to minimize likelihood of synchronized
  1287. // attempts at cache warming
  1288. day := 24 * time.Hour
  1289. cacheExpiration := map[time.Duration]time.Duration{
  1290. day: maxCacheMinutes1d * time.Minute,
  1291. 2 * day: maxCacheMinutes2d * time.Minute,
  1292. 7 * day: maxCacheMinutes7d * time.Minute,
  1293. 30 * day: maxCacheMinutes30d * time.Minute,
  1294. }
  1295. var pc prometheus.Client
  1296. if thanosClient != nil {
  1297. pc = thanosClient
  1298. } else {
  1299. pc = promCli
  1300. }
  1301. costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
  1302. metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
  1303. a := &Accesses{
  1304. Router: httprouter.New(),
  1305. PrometheusClient: promCli,
  1306. ThanosClient: thanosClient,
  1307. KubeClientSet: kubeClientset,
  1308. ClusterMap: clusterMap,
  1309. CloudProvider: cloudProvider,
  1310. Model: costModel,
  1311. MetricsEmitter: metricsEmitter,
  1312. AggregateCache: aggregateCache,
  1313. CostDataCache: costDataCache,
  1314. ClusterCostsCache: clusterCostsCache,
  1315. OutOfClusterCache: outOfClusterCache,
  1316. SettingsCache: settingsCache,
  1317. CacheExpiration: cacheExpiration,
  1318. httpServices: services.NewCostModelServices(),
  1319. }
  1320. // Use the Accesses instance, itself, as the CostModelAggregator. This is
  1321. // confusing and unconventional, but necessary so that we can swap it
  1322. // out for the ETL-adapted version elsewhere.
  1323. // TODO clean this up once ETL is open-sourced.
  1324. a.AggAPI = a
  1325. // Initialize mechanism for subscribing to settings changes
  1326. a.InitializeSettingsPubSub()
  1327. err = a.CloudProvider.DownloadPricingData()
  1328. if err != nil {
  1329. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  1330. }
  1331. // Warm the aggregate cache unless explicitly set to false
  1332. if env.IsCacheWarmingEnabled() {
  1333. log.Infof("Init: AggregateCostModel cache warming enabled")
  1334. a.warmAggregateCostModelCache()
  1335. } else {
  1336. log.Infof("Init: AggregateCostModel cache warming disabled")
  1337. }
  1338. if !env.IsKubecostMetricsPodEnabled() {
  1339. a.MetricsEmitter.Start()
  1340. }
  1341. a.Router.GET("/costDataModel", a.CostDataModel)
  1342. a.Router.GET("/costDataModelRange", a.CostDataModelRange)
  1343. a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
  1344. a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
  1345. a.Router.GET("/outOfClusterCosts", a.OutOfClusterCostsWithCache)
  1346. a.Router.GET("/allNodePricing", a.GetAllNodePricing)
  1347. a.Router.POST("/refreshPricing", a.RefreshPricingData)
  1348. a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  1349. a.Router.GET("/clusterCosts", a.ClusterCosts)
  1350. a.Router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  1351. a.Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  1352. a.Router.GET("/managementPlatform", a.ManagementPlatform)
  1353. a.Router.GET("/clusterInfo", a.ClusterInfo)
  1354. a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  1355. a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  1356. a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  1357. a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  1358. // endpoints migrated from server
  1359. a.Router.GET("/allPersistentVolumes", a.GetAllPersistentVolumes)
  1360. a.Router.GET("/allDeployments", a.GetAllDeployments)
  1361. a.Router.GET("/allStorageClasses", a.GetAllStorageClasses)
  1362. a.Router.GET("/allStatefulSets", a.GetAllStatefulSets)
  1363. a.Router.GET("/allNodes", a.GetAllNodes)
  1364. a.Router.GET("/allPods", a.GetAllPods)
  1365. a.Router.GET("/allNamespaces", a.GetAllNamespaces)
  1366. a.Router.GET("/allDaemonSets", a.GetAllDaemonSets)
  1367. a.Router.GET("/pod/:namespace/:name", a.GetPod)
  1368. a.Router.GET("/prometheusRecordingRules", a.PrometheusRecordingRules)
  1369. a.Router.GET("/prometheusConfig", a.PrometheusConfig)
  1370. a.Router.GET("/prometheusTargets", a.PrometheusTargets)
  1371. a.Router.GET("/orphanedPods", a.GetOrphanedPods)
  1372. a.Router.GET("/installNamespace", a.GetInstallNamespace)
  1373. a.Router.GET("/podLogs", a.GetPodLogs)
  1374. a.Router.POST("/serviceKey", a.AddServiceKey)
  1375. a.Router.GET("/helmValues", a.GetHelmValues)
  1376. a.Router.GET("/status", a.Status)
  1377. // prom query proxies
  1378. a.Router.GET("/prometheusQuery", a.PrometheusQuery)
  1379. a.Router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
  1380. a.Router.GET("/thanosQuery", a.ThanosQuery)
  1381. a.Router.GET("/thanosQueryRange", a.ThanosQueryRange)
  1382. // diagnostics
  1383. a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
  1384. a.Router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
  1385. a.httpServices.RegisterAll(a.Router)
  1386. return a
  1387. }