router.go 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/kubecost/cost-model/pkg/config"
  14. "github.com/kubecost/cost-model/pkg/metrics"
  15. "github.com/kubecost/cost-model/pkg/services"
  16. "github.com/kubecost/cost-model/pkg/util/httputil"
  17. "github.com/kubecost/cost-model/pkg/util/timeutil"
  18. "github.com/kubecost/cost-model/pkg/util/watcher"
  19. "github.com/microcosm-cc/bluemonday"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/klog"
  22. "github.com/julienschmidt/httprouter"
  23. sentry "github.com/getsentry/sentry-go"
  24. "github.com/kubecost/cost-model/pkg/cloud"
  25. "github.com/kubecost/cost-model/pkg/clustercache"
  26. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  27. "github.com/kubecost/cost-model/pkg/env"
  28. "github.com/kubecost/cost-model/pkg/errors"
  29. "github.com/kubecost/cost-model/pkg/kubecost"
  30. "github.com/kubecost/cost-model/pkg/log"
  31. "github.com/kubecost/cost-model/pkg/prom"
  32. "github.com/kubecost/cost-model/pkg/thanos"
  33. "github.com/kubecost/cost-model/pkg/util/json"
  34. prometheus "github.com/prometheus/client_golang/api"
  35. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  36. appsv1 "k8s.io/api/apps/v1"
  37. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  38. "github.com/patrickmn/go-cache"
  39. "k8s.io/client-go/kubernetes"
  40. "k8s.io/client-go/rest"
  41. "k8s.io/client-go/tools/clientcmd"
  42. )
  43. var sanitizePolicy = bluemonday.UGCPolicy()
  44. const (
  45. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  46. maxCacheMinutes1d = 11
  47. maxCacheMinutes2d = 17
  48. maxCacheMinutes7d = 37
  49. maxCacheMinutes30d = 137
  50. CustomPricingSetting = "CustomPricing"
  51. DiscountSetting = "Discount"
  52. epRules = apiPrefix + "/rules"
  53. LogSeparator = "+-------------------------------------------------------------------------------------"
  54. )
  55. var (
  56. // gitCommit is set by the build system
  57. gitCommit string
  58. )
  59. // Accesses defines a singleton application instance, providing access to
  60. // Prometheus, Kubernetes, the cloud provider, and caches.
  61. type Accesses struct {
  62. Router *httprouter.Router
  63. PrometheusClient prometheus.Client
  64. ThanosClient prometheus.Client
  65. KubeClientSet kubernetes.Interface
  66. ClusterCache clustercache.ClusterCache
  67. ClusterMap clusters.ClusterMap
  68. CloudProvider cloud.Provider
  69. ConfigFileManager *config.ConfigFileManager
  70. ClusterInfoProvider clusters.ClusterInfoProvider
  71. Model *CostModel
  72. MetricsEmitter *CostModelMetricsEmitter
  73. OutOfClusterCache *cache.Cache
  74. AggregateCache *cache.Cache
  75. CostDataCache *cache.Cache
  76. ClusterCostsCache *cache.Cache
  77. CacheExpiration map[time.Duration]time.Duration
  78. AggAPI Aggregator
  79. // SettingsCache stores current state of app settings
  80. SettingsCache *cache.Cache
  81. // settingsSubscribers tracks channels through which changes to different
  82. // settings will be published in a pub/sub model
  83. settingsSubscribers map[string][]chan string
  84. settingsMutex sync.Mutex
  85. // registered http service instances
  86. httpServices services.HTTPServices
  87. }
  88. // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
  89. // should be used.
  90. func (a *Accesses) GetPrometheusClient(remote bool) prometheus.Client {
  91. // Use Thanos Client if it exists (enabled) and remote flag set
  92. var pc prometheus.Client
  93. if remote && a.ThanosClient != nil {
  94. pc = a.ThanosClient
  95. } else {
  96. pc = a.PrometheusClient
  97. }
  98. return pc
  99. }
  100. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  101. // If one does not exists, it returns the default cache expiration, which is defined by
  102. // the particular cache.
  103. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  104. if expiration, ok := a.CacheExpiration[dur]; ok {
  105. return expiration
  106. }
  107. return cache.DefaultExpiration
  108. }
  109. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  110. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  111. // not found or is less than 2 minutes.
  112. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  113. expiry := a.GetCacheExpiration(dur).Minutes()
  114. if expiry <= 2.0 {
  115. return time.Minute
  116. }
  117. mins := time.Duration(expiry/2.0) * time.Minute
  118. return mins
  119. }
  120. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  121. w.Header().Set("Content-Type", "application/json")
  122. duration := 24 * time.Hour
  123. offset := time.Minute
  124. durationHrs := "24h"
  125. fmtOffset := "1m"
  126. pClient := a.GetPrometheusClient(true)
  127. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  128. if data, valid := a.ClusterCostsCache.Get(key); valid {
  129. clusterCosts := data.(map[string]*ClusterCosts)
  130. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  131. } else {
  132. data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
  133. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  134. }
  135. }
  136. type Response struct {
  137. Code int `json:"code"`
  138. Status string `json:"status"`
  139. Data interface{} `json:"data"`
  140. Message string `json:"message,omitempty"`
  141. Warning string `json:"warning,omitempty"`
  142. }
  143. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  144. type FilterFunc func(*CostData) (bool, string)
  145. // FilterCostData allows through only CostData that matches all the given filter functions
  146. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  147. result := make(map[string]*CostData)
  148. filteredEnvironments := make(map[string]int)
  149. filteredContainers := 0
  150. DataLoop:
  151. for key, datum := range data {
  152. for _, rf := range retains {
  153. if ok, _ := rf(datum); ok {
  154. result[key] = datum
  155. // if any retain function passes, the data is retained and move on
  156. continue DataLoop
  157. }
  158. }
  159. for _, ff := range filters {
  160. if ok, environment := ff(datum); !ok {
  161. if environment != "" {
  162. filteredEnvironments[environment]++
  163. }
  164. filteredContainers++
  165. // if any filter function check fails, move on to the next datum
  166. continue DataLoop
  167. }
  168. }
  169. result[key] = datum
  170. }
  171. return result, filteredContainers, filteredEnvironments
  172. }
  173. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  174. fs := strings.Split(fields, ",")
  175. fmap := make(map[string]bool)
  176. for _, f := range fs {
  177. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  178. klog.V(1).Infof("to delete: %s", fieldNameLower)
  179. fmap[fieldNameLower] = true
  180. }
  181. filteredData := make(map[string]CostData)
  182. for cname, costdata := range data {
  183. s := reflect.TypeOf(*costdata)
  184. val := reflect.ValueOf(*costdata)
  185. costdata2 := CostData{}
  186. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  187. n := s.NumField()
  188. for i := 0; i < n; i++ {
  189. field := s.Field(i)
  190. value := val.Field(i)
  191. value2 := cd2.Field(i)
  192. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  193. value2.Set(reflect.Value(value))
  194. }
  195. }
  196. filteredData[cname] = cd2.Interface().(CostData)
  197. }
  198. return filteredData
  199. }
  200. func normalizeTimeParam(param string) (string, error) {
  201. if param == "" {
  202. return "", fmt.Errorf("invalid time param")
  203. }
  204. // convert days to hours
  205. if param[len(param)-1:] == "d" {
  206. count := param[:len(param)-1]
  207. val, err := strconv.ParseInt(count, 10, 64)
  208. if err != nil {
  209. return "", err
  210. }
  211. val = val * 24
  212. param = fmt.Sprintf("%dh", val)
  213. }
  214. return param, nil
  215. }
  216. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  217. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  218. // return 0.0.
  219. func ParsePercentString(percentStr string) (float64, error) {
  220. if len(percentStr) == 0 {
  221. return 0.0, nil
  222. }
  223. if percentStr[len(percentStr)-1:] == "%" {
  224. percentStr = percentStr[:len(percentStr)-1]
  225. }
  226. discount, err := strconv.ParseFloat(percentStr, 64)
  227. if err != nil {
  228. return 0.0, err
  229. }
  230. discount *= 0.01
  231. return discount, nil
  232. }
  233. func WrapData(data interface{}, err error) []byte {
  234. var resp []byte
  235. if err != nil {
  236. klog.V(1).Infof("Error returned to client: %s", err.Error())
  237. resp, _ = json.Marshal(&Response{
  238. Code: http.StatusInternalServerError,
  239. Status: "error",
  240. Message: err.Error(),
  241. Data: data,
  242. })
  243. } else {
  244. resp, _ = json.Marshal(&Response{
  245. Code: http.StatusOK,
  246. Status: "success",
  247. Data: data,
  248. })
  249. }
  250. return resp
  251. }
  252. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  253. var resp []byte
  254. if err != nil {
  255. klog.V(1).Infof("Error returned to client: %s", err.Error())
  256. resp, _ = json.Marshal(&Response{
  257. Code: http.StatusInternalServerError,
  258. Status: "error",
  259. Message: err.Error(),
  260. Data: data,
  261. })
  262. } else {
  263. resp, _ = json.Marshal(&Response{
  264. Code: http.StatusOK,
  265. Status: "success",
  266. Data: data,
  267. Message: message,
  268. })
  269. }
  270. return resp
  271. }
  272. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  273. var resp []byte
  274. if err != nil {
  275. klog.V(1).Infof("Error returned to client: %s", err.Error())
  276. resp, _ = json.Marshal(&Response{
  277. Code: http.StatusInternalServerError,
  278. Status: "error",
  279. Message: err.Error(),
  280. Warning: warning,
  281. Data: data,
  282. })
  283. } else {
  284. resp, _ = json.Marshal(&Response{
  285. Code: http.StatusOK,
  286. Status: "success",
  287. Data: data,
  288. Warning: warning,
  289. })
  290. }
  291. return resp
  292. }
  293. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  294. var resp []byte
  295. if err != nil {
  296. klog.V(1).Infof("Error returned to client: %s", err.Error())
  297. resp, _ = json.Marshal(&Response{
  298. Code: http.StatusInternalServerError,
  299. Status: "error",
  300. Message: err.Error(),
  301. Warning: warning,
  302. Data: data,
  303. })
  304. } else {
  305. resp, _ = json.Marshal(&Response{
  306. Code: http.StatusOK,
  307. Status: "success",
  308. Data: data,
  309. Message: message,
  310. Warning: warning,
  311. })
  312. }
  313. return resp
  314. }
  315. // wrapAsObjectItems wraps a slice of items into an object containing a single items list
  316. // allows our k8s proxy methods to emulate a List() request to k8s API
  317. func wrapAsObjectItems(items interface{}) map[string]interface{} {
  318. return map[string]interface{}{
  319. "items": items,
  320. }
  321. }
  322. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  323. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  324. w.Header().Set("Content-Type", "application/json")
  325. w.Header().Set("Access-Control-Allow-Origin", "*")
  326. err := a.CloudProvider.DownloadPricingData()
  327. if err != nil {
  328. klog.V(1).Infof("Error refreshing pricing data: %s", err.Error())
  329. }
  330. w.Write(WrapData(nil, err))
  331. }
  332. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  333. w.Header().Set("Content-Type", "application/json")
  334. w.Header().Set("Access-Control-Allow-Origin", "*")
  335. window := r.URL.Query().Get("timeWindow")
  336. offset := r.URL.Query().Get("offset")
  337. fields := r.URL.Query().Get("filterFields")
  338. namespace := r.URL.Query().Get("namespace")
  339. if offset != "" {
  340. offset = "offset " + offset
  341. }
  342. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
  343. if fields != "" {
  344. filteredData := filterFields(fields, data)
  345. w.Write(WrapData(filteredData, err))
  346. } else {
  347. w.Write(WrapData(data, err))
  348. }
  349. }
  350. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  351. w.Header().Set("Content-Type", "application/json")
  352. w.Header().Set("Access-Control-Allow-Origin", "*")
  353. window := r.URL.Query().Get("window")
  354. offset := r.URL.Query().Get("offset")
  355. if window == "" {
  356. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  357. return
  358. }
  359. windowDur, err := timeutil.ParseDuration(window)
  360. if err != nil {
  361. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  362. return
  363. }
  364. // offset is not a required parameter
  365. var offsetDur time.Duration
  366. if offset != "" {
  367. offsetDur, err = timeutil.ParseDuration(offset)
  368. if err != nil {
  369. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  370. return
  371. }
  372. }
  373. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  374. if useThanos && !thanos.IsEnabled() {
  375. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  376. return
  377. }
  378. var client prometheus.Client
  379. if useThanos {
  380. client = a.ThanosClient
  381. offsetDur = thanos.OffsetDuration()
  382. } else {
  383. client = a.PrometheusClient
  384. }
  385. data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
  386. w.Write(WrapData(data, err))
  387. }
  388. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  389. w.Header().Set("Content-Type", "application/json")
  390. w.Header().Set("Access-Control-Allow-Origin", "*")
  391. start := r.URL.Query().Get("start")
  392. end := r.URL.Query().Get("end")
  393. window := r.URL.Query().Get("window")
  394. offset := r.URL.Query().Get("offset")
  395. if window == "" {
  396. w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
  397. return
  398. }
  399. windowDur, err := timeutil.ParseDuration(window)
  400. if err != nil {
  401. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  402. return
  403. }
  404. // offset is not a required parameter
  405. var offsetDur time.Duration
  406. if offset != "" {
  407. offsetDur, err = timeutil.ParseDuration(offset)
  408. if err != nil {
  409. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  410. return
  411. }
  412. }
  413. data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
  414. w.Write(WrapData(data, err))
  415. }
  416. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  417. w.Header().Set("Content-Type", "application/json")
  418. w.Header().Set("Access-Control-Allow-Origin", "*")
  419. startStr := r.URL.Query().Get("start")
  420. endStr := r.URL.Query().Get("end")
  421. windowStr := r.URL.Query().Get("window")
  422. fields := r.URL.Query().Get("filterFields")
  423. namespace := r.URL.Query().Get("namespace")
  424. cluster := r.URL.Query().Get("cluster")
  425. remote := r.URL.Query().Get("remote")
  426. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  427. layout := "2006-01-02T15:04:05.000Z"
  428. start, err := time.Parse(layout, startStr)
  429. if err != nil {
  430. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
  431. return
  432. }
  433. end, err := time.Parse(layout, endStr)
  434. if err != nil {
  435. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
  436. return
  437. }
  438. window := kubecost.NewWindow(&start, &end)
  439. if window.IsOpen() || window.IsEmpty() || window.IsNegative() {
  440. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
  441. return
  442. }
  443. resolution := time.Hour
  444. if resDur, err := time.ParseDuration(windowStr); err == nil {
  445. resolution = resDur
  446. }
  447. // Use Thanos Client if it exists (enabled) and remote flag set
  448. var pClient prometheus.Client
  449. if remote != "false" && a.ThanosClient != nil {
  450. pClient = a.ThanosClient
  451. } else {
  452. pClient = a.PrometheusClient
  453. }
  454. data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
  455. if err != nil {
  456. w.Write(WrapData(nil, err))
  457. }
  458. if fields != "" {
  459. filteredData := filterFields(fields, data)
  460. w.Write(WrapData(filteredData, err))
  461. } else {
  462. w.Write(WrapData(data, err))
  463. }
  464. }
  465. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  466. var key string
  467. var filter string
  468. var val []string
  469. if customAggregation != "" {
  470. key = customAggregation
  471. filter = filterType
  472. val = strings.Split(customAggregation, ",")
  473. } else {
  474. aggregations := strings.Split(aggregator, ",")
  475. for i, agg := range aggregations {
  476. aggregations[i] = "kubernetes_" + agg
  477. }
  478. key = strings.Join(aggregations, ",")
  479. filter = "kubernetes_" + filterType
  480. val = aggregations
  481. }
  482. return key, val, filter
  483. }
  484. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  485. w.Header().Set("Content-Type", "application/json")
  486. w.Header().Set("Access-Control-Allow-Origin", "*")
  487. data, err := a.CloudProvider.AllNodePricing()
  488. w.Write(WrapData(data, err))
  489. }
  490. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  491. w.Header().Set("Content-Type", "application/json")
  492. w.Header().Set("Access-Control-Allow-Origin", "*")
  493. data, err := a.CloudProvider.GetConfig()
  494. w.Write(WrapData(data, err))
  495. }
  496. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  497. w.Header().Set("Content-Type", "application/json")
  498. w.Header().Set("Access-Control-Allow-Origin", "*")
  499. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.SpotInfoUpdateType)
  500. if err != nil {
  501. w.Write(WrapData(data, err))
  502. return
  503. }
  504. w.Write(WrapData(data, err))
  505. err = a.CloudProvider.DownloadPricingData()
  506. if err != nil {
  507. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  508. }
  509. return
  510. }
  511. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  512. w.Header().Set("Content-Type", "application/json")
  513. w.Header().Set("Access-Control-Allow-Origin", "*")
  514. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.AthenaInfoUpdateType)
  515. if err != nil {
  516. w.Write(WrapData(data, err))
  517. return
  518. }
  519. w.Write(WrapData(data, err))
  520. return
  521. }
  522. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  523. w.Header().Set("Content-Type", "application/json")
  524. w.Header().Set("Access-Control-Allow-Origin", "*")
  525. data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.BigqueryUpdateType)
  526. if err != nil {
  527. w.Write(WrapData(data, err))
  528. return
  529. }
  530. w.Write(WrapData(data, err))
  531. return
  532. }
  533. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  534. w.Header().Set("Content-Type", "application/json")
  535. w.Header().Set("Access-Control-Allow-Origin", "*")
  536. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  537. if err != nil {
  538. w.Write(WrapData(data, err))
  539. return
  540. }
  541. w.Write(WrapData(data, err))
  542. return
  543. }
  544. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  545. w.Header().Set("Content-Type", "application/json")
  546. w.Header().Set("Access-Control-Allow-Origin", "*")
  547. data, err := a.CloudProvider.GetManagementPlatform()
  548. if err != nil {
  549. w.Write(WrapData(data, err))
  550. return
  551. }
  552. w.Write(WrapData(data, err))
  553. return
  554. }
  555. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  556. w.Header().Set("Content-Type", "application/json")
  557. w.Header().Set("Access-Control-Allow-Origin", "*")
  558. data := a.ClusterInfoProvider.GetClusterInfo()
  559. w.Write(WrapData(data, nil))
  560. }
  561. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  562. w.Header().Set("Content-Type", "application/json")
  563. w.Header().Set("Access-Control-Allow-Origin", "*")
  564. data := a.ClusterMap.AsMap()
  565. w.Write(WrapData(data, nil))
  566. }
  567. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  568. w.Header().Set("Content-Type", "application/json")
  569. w.Header().Set("Access-Control-Allow-Origin", "*")
  570. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  571. }
  572. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  573. w.Header().Set("Content-Type", "application/json")
  574. w.Header().Set("Access-Control-Allow-Origin", "*")
  575. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  576. }
  577. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  578. w.Header().Set("Content-Type", "application/json")
  579. w.Header().Set("Access-Control-Allow-Origin", "*")
  580. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  581. }
  582. func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  583. w.Header().Set("Content-Type", "application/json")
  584. w.Header().Set("Access-Control-Allow-Origin", "*")
  585. w.Write(WrapData(prom.Validate(a.PrometheusClient)))
  586. }
  587. func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  588. w.Header().Set("Content-Type", "application/json")
  589. w.Header().Set("Access-Control-Allow-Origin", "*")
  590. qp := httputil.NewQueryParams(r.URL.Query())
  591. query := qp.Get("query", "")
  592. if query == "" {
  593. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  594. return
  595. }
  596. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  597. var timeVal time.Time
  598. timeStr := qp.Get("time", "")
  599. if len(timeStr) > 0 {
  600. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  601. timeVal = time.Unix(t, 0)
  602. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  603. timeVal = t
  604. }
  605. // If time is given, but not parse-able, return an error
  606. if timeVal.IsZero() {
  607. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  608. }
  609. }
  610. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  611. body, err := ctx.RawQuery(query, timeVal)
  612. if err != nil {
  613. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  614. return
  615. }
  616. w.Write(body)
  617. }
  618. func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  619. w.Header().Set("Content-Type", "application/json")
  620. w.Header().Set("Access-Control-Allow-Origin", "*")
  621. qp := httputil.NewQueryParams(r.URL.Query())
  622. query := qp.Get("query", "")
  623. if query == "" {
  624. fmt.Fprintf(w, "Error parsing query from request parameters.")
  625. return
  626. }
  627. start, end, duration, err := toStartEndStep(qp)
  628. if err != nil {
  629. fmt.Fprintf(w, err.Error())
  630. return
  631. }
  632. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  633. body, err := ctx.RawQueryRange(query, start, end, duration)
  634. if err != nil {
  635. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  636. return
  637. }
  638. w.Write(body)
  639. }
  640. func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  641. w.Header().Set("Content-Type", "application/json")
  642. w.Header().Set("Access-Control-Allow-Origin", "*")
  643. if !thanos.IsEnabled() {
  644. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  645. return
  646. }
  647. qp := httputil.NewQueryParams(r.URL.Query())
  648. query := qp.Get("query", "")
  649. if query == "" {
  650. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  651. return
  652. }
  653. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  654. var timeVal time.Time
  655. timeStr := qp.Get("time", "")
  656. if len(timeStr) > 0 {
  657. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  658. timeVal = time.Unix(t, 0)
  659. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  660. timeVal = t
  661. }
  662. // If time is given, but not parse-able, return an error
  663. if timeVal.IsZero() {
  664. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  665. }
  666. }
  667. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  668. body, err := ctx.RawQuery(query, timeVal)
  669. if err != nil {
  670. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  671. return
  672. }
  673. w.Write(body)
  674. }
  675. func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  676. w.Header().Set("Content-Type", "application/json")
  677. w.Header().Set("Access-Control-Allow-Origin", "*")
  678. if !thanos.IsEnabled() {
  679. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  680. return
  681. }
  682. qp := httputil.NewQueryParams(r.URL.Query())
  683. query := qp.Get("query", "")
  684. if query == "" {
  685. fmt.Fprintf(w, "Error parsing query from request parameters.")
  686. return
  687. }
  688. start, end, duration, err := toStartEndStep(qp)
  689. if err != nil {
  690. fmt.Fprintf(w, err.Error())
  691. return
  692. }
  693. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  694. body, err := ctx.RawQueryRange(query, start, end, duration)
  695. if err != nil {
  696. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  697. return
  698. }
  699. w.Write(body)
  700. }
  701. // helper for query range proxy requests
  702. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  703. var e error
  704. ss := qp.Get("start", "")
  705. es := qp.Get("end", "")
  706. ds := qp.Get("duration", "")
  707. layout := "2006-01-02T15:04:05.000Z"
  708. start, e = time.Parse(layout, ss)
  709. if e != nil {
  710. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  711. return
  712. }
  713. end, e = time.Parse(layout, es)
  714. if e != nil {
  715. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  716. return
  717. }
  718. step, e = time.ParseDuration(ds)
  719. if e != nil {
  720. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  721. return
  722. }
  723. err = nil
  724. return
  725. }
  726. func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  727. w.Header().Set("Content-Type", "application/json")
  728. w.Header().Set("Access-Control-Allow-Origin", "*")
  729. promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
  730. if err != nil {
  731. w.Write(WrapData(nil, err))
  732. return
  733. }
  734. result := map[string]*prom.PrometheusQueueState{
  735. "prometheus": promQueueState,
  736. }
  737. if thanos.IsEnabled() {
  738. thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
  739. if err != nil {
  740. log.Warningf("Error getting Thanos queue state: %s", err)
  741. } else {
  742. result["thanos"] = thanosQueueState
  743. }
  744. }
  745. w.Write(WrapData(result, nil))
  746. }
  747. // GetPrometheusMetrics retrieves availability of Prometheus and Thanos metrics
  748. func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  749. w.Header().Set("Content-Type", "application/json")
  750. w.Header().Set("Access-Control-Allow-Origin", "*")
  751. promMetrics, err := prom.GetPrometheusMetrics(a.PrometheusClient, "")
  752. if err != nil {
  753. w.Write(WrapData(nil, err))
  754. return
  755. }
  756. result := map[string][]*prom.PrometheusDiagnostic{
  757. "prometheus": promMetrics,
  758. }
  759. if thanos.IsEnabled() {
  760. thanosMetrics, err := prom.GetPrometheusMetrics(a.ThanosClient, thanos.QueryOffset())
  761. if err != nil {
  762. log.Warningf("Error getting Thanos queue state: %s", err)
  763. } else {
  764. result["thanos"] = thanosMetrics
  765. }
  766. }
  767. w.Write(WrapData(result, nil))
  768. }
  769. func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  770. w.Header().Set("Content-Type", "application/json")
  771. w.Header().Set("Access-Control-Allow-Origin", "*")
  772. pvList := a.ClusterCache.GetAllPersistentVolumes()
  773. body, err := json.Marshal(wrapAsObjectItems(pvList))
  774. if err != nil {
  775. fmt.Fprintf(w, "Error decoding persistent volumes: "+err.Error())
  776. } else {
  777. w.Write(body)
  778. }
  779. }
  780. func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  781. w.Header().Set("Content-Type", "application/json")
  782. w.Header().Set("Access-Control-Allow-Origin", "*")
  783. qp := httputil.NewQueryParams(r.URL.Query())
  784. namespace := qp.Get("namespace", "")
  785. deploymentsList := a.ClusterCache.GetAllDeployments()
  786. // filter for provided namespace
  787. var deployments []*appsv1.Deployment
  788. if namespace == "" {
  789. deployments = deploymentsList
  790. } else {
  791. deployments = []*appsv1.Deployment{}
  792. for _, d := range deploymentsList {
  793. if d.Namespace == namespace {
  794. deployments = append(deployments, d)
  795. }
  796. }
  797. }
  798. body, err := json.Marshal(wrapAsObjectItems(deployments))
  799. if err != nil {
  800. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  801. } else {
  802. w.Write(body)
  803. }
  804. }
  805. func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  806. w.Header().Set("Content-Type", "application/json")
  807. w.Header().Set("Access-Control-Allow-Origin", "*")
  808. scList := a.ClusterCache.GetAllStorageClasses()
  809. body, err := json.Marshal(wrapAsObjectItems(scList))
  810. if err != nil {
  811. fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
  812. } else {
  813. w.Write(body)
  814. }
  815. }
  816. func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  817. w.Header().Set("Content-Type", "application/json")
  818. w.Header().Set("Access-Control-Allow-Origin", "*")
  819. qp := httputil.NewQueryParams(r.URL.Query())
  820. namespace := qp.Get("namespace", "")
  821. statefulSetsList := a.ClusterCache.GetAllStatefulSets()
  822. // filter for provided namespace
  823. var statefulSets []*appsv1.StatefulSet
  824. if namespace == "" {
  825. statefulSets = statefulSetsList
  826. } else {
  827. statefulSets = []*appsv1.StatefulSet{}
  828. for _, ss := range statefulSetsList {
  829. if ss.Namespace == namespace {
  830. statefulSets = append(statefulSets, ss)
  831. }
  832. }
  833. }
  834. body, err := json.Marshal(wrapAsObjectItems(statefulSets))
  835. if err != nil {
  836. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  837. } else {
  838. w.Write(body)
  839. }
  840. }
  841. func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  842. w.Header().Set("Content-Type", "application/json")
  843. w.Header().Set("Access-Control-Allow-Origin", "*")
  844. nodeList := a.ClusterCache.GetAllNodes()
  845. body, err := json.Marshal(wrapAsObjectItems(nodeList))
  846. if err != nil {
  847. fmt.Fprintf(w, "Error decoding nodes: "+err.Error())
  848. } else {
  849. w.Write(body)
  850. }
  851. }
  852. func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  853. w.Header().Set("Content-Type", "application/json")
  854. w.Header().Set("Access-Control-Allow-Origin", "*")
  855. podlist := a.ClusterCache.GetAllPods()
  856. body, err := json.Marshal(wrapAsObjectItems(podlist))
  857. if err != nil {
  858. fmt.Fprintf(w, "Error decoding pods: "+err.Error())
  859. } else {
  860. w.Write(body)
  861. }
  862. }
  863. func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  864. w.Header().Set("Content-Type", "application/json")
  865. w.Header().Set("Access-Control-Allow-Origin", "*")
  866. namespaces := a.ClusterCache.GetAllNamespaces()
  867. body, err := json.Marshal(wrapAsObjectItems(namespaces))
  868. if err != nil {
  869. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  870. } else {
  871. w.Write(body)
  872. }
  873. }
  874. func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  875. w.Header().Set("Content-Type", "application/json")
  876. w.Header().Set("Access-Control-Allow-Origin", "*")
  877. daemonSets := a.ClusterCache.GetAllDaemonSets()
  878. body, err := json.Marshal(wrapAsObjectItems(daemonSets))
  879. if err != nil {
  880. fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
  881. } else {
  882. w.Write(body)
  883. }
  884. }
  885. func (a *Accesses) GetPod(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  886. w.Header().Set("Content-Type", "application/json")
  887. w.Header().Set("Access-Control-Allow-Origin", "*")
  888. podName := ps.ByName("name")
  889. podNamespace := ps.ByName("namespace")
  890. // TODO: ClusterCache API could probably afford to have some better filtering
  891. allPods := a.ClusterCache.GetAllPods()
  892. for _, pod := range allPods {
  893. for _, container := range pod.Spec.Containers {
  894. container.Env = make([]v1.EnvVar, 0)
  895. }
  896. if pod.Namespace == podNamespace && pod.Name == podName {
  897. body, err := json.Marshal(pod)
  898. if err != nil {
  899. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  900. } else {
  901. w.Write(body)
  902. }
  903. return
  904. }
  905. }
  906. fmt.Fprintf(w, "Pod not found\n")
  907. }
  908. func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  909. w.Header().Set("Content-Type", "application/json")
  910. w.Header().Set("Access-Control-Allow-Origin", "*")
  911. u := a.PrometheusClient.URL(epRules, nil)
  912. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  913. if err != nil {
  914. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  915. }
  916. _, body, _, err := a.PrometheusClient.Do(r.Context(), req)
  917. if err != nil {
  918. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  919. } else {
  920. w.Write(body)
  921. }
  922. }
  923. func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  924. w.Header().Set("Content-Type", "application/json")
  925. w.Header().Set("Access-Control-Allow-Origin", "*")
  926. pConfig := map[string]string{
  927. "address": env.GetPrometheusServerEndpoint(),
  928. }
  929. body, err := json.Marshal(pConfig)
  930. if err != nil {
  931. fmt.Fprintf(w, "Error marshalling prometheus config")
  932. } else {
  933. w.Write(body)
  934. }
  935. }
  936. func (a *Accesses) PrometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  937. w.Header().Set("Content-Type", "application/json")
  938. w.Header().Set("Access-Control-Allow-Origin", "*")
  939. u := a.PrometheusClient.URL(epTargets, nil)
  940. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  941. if err != nil {
  942. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  943. }
  944. _, body, _, err := a.PrometheusClient.Do(r.Context(), req)
  945. if err != nil {
  946. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  947. } else {
  948. w.Write(body)
  949. }
  950. }
  951. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  952. w.Header().Set("Content-Type", "application/json")
  953. w.Header().Set("Access-Control-Allow-Origin", "*")
  954. podlist := a.ClusterCache.GetAllPods()
  955. var lonePods []*v1.Pod
  956. for _, pod := range podlist {
  957. if len(pod.OwnerReferences) == 0 {
  958. lonePods = append(lonePods, pod)
  959. }
  960. }
  961. body, err := json.Marshal(lonePods)
  962. if err != nil {
  963. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  964. } else {
  965. w.Write(body)
  966. }
  967. }
  968. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  969. w.Header().Set("Content-Type", "application/json")
  970. w.Header().Set("Access-Control-Allow-Origin", "*")
  971. ns := env.GetKubecostNamespace()
  972. w.Write([]byte(ns))
  973. }
  974. // logsFor pulls the logs for a specific pod, namespace, and container
  975. func logsFor(c kubernetes.Interface, namespace string, pod string, container string, dur time.Duration, ctx context.Context) (string, error) {
  976. since := time.Now().UTC().Add(-dur)
  977. logOpts := v1.PodLogOptions{
  978. SinceTime: &metav1.Time{Time: since},
  979. }
  980. if container != "" {
  981. logOpts.Container = container
  982. }
  983. req := c.CoreV1().Pods(namespace).GetLogs(pod, &logOpts)
  984. reader, err := req.Stream(ctx)
  985. if err != nil {
  986. return "", err
  987. }
  988. podLogs, err := ioutil.ReadAll(reader)
  989. if err != nil {
  990. return "", err
  991. }
  992. return string(podLogs), nil
  993. }
  994. func (a *Accesses) GetPodLogs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  995. w.Header().Set("Content-Type", "application/json")
  996. w.Header().Set("Access-Control-Allow-Origin", "*")
  997. qp := httputil.NewQueryParams(r.URL.Query())
  998. ns := qp.Get("namespace", env.GetKubecostNamespace())
  999. pod := qp.Get("pod", "")
  1000. selector := qp.Get("selector", "")
  1001. container := qp.Get("container", "")
  1002. since := qp.Get("since", "24h")
  1003. sinceDuration, err := time.ParseDuration(since)
  1004. if err != nil {
  1005. fmt.Fprintf(w, "Invalid Duration String: "+err.Error())
  1006. return
  1007. }
  1008. var logResult string
  1009. appendLog := func(ns string, pod string, container string, l string) {
  1010. if l == "" {
  1011. return
  1012. }
  1013. logResult += fmt.Sprintf("%s\n| %s:%s:%s\n%s\n%s\n\n", LogSeparator, ns, pod, container, LogSeparator, l)
  1014. }
  1015. if pod != "" {
  1016. pd, err := a.KubeClientSet.CoreV1().Pods(ns).Get(r.Context(), pod, metav1.GetOptions{})
  1017. if err != nil {
  1018. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1019. return
  1020. }
  1021. if container != "" {
  1022. var foundContainer bool
  1023. for _, cont := range pd.Spec.Containers {
  1024. if strings.EqualFold(cont.Name, container) {
  1025. foundContainer = true
  1026. break
  1027. }
  1028. }
  1029. if !foundContainer {
  1030. fmt.Fprintf(w, "Could not find container: "+container)
  1031. return
  1032. }
  1033. }
  1034. logs, err := logsFor(a.KubeClientSet, ns, pod, container, sinceDuration, r.Context())
  1035. if err != nil {
  1036. fmt.Fprintf(w, "Error Getting Logs: "+err.Error())
  1037. return
  1038. }
  1039. appendLog(ns, pod, container, logs)
  1040. w.Write([]byte(logResult))
  1041. return
  1042. }
  1043. if selector != "" {
  1044. pods, err := a.KubeClientSet.CoreV1().Pods(ns).List(r.Context(), metav1.ListOptions{LabelSelector: selector})
  1045. if err != nil {
  1046. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1047. return
  1048. }
  1049. for _, pd := range pods.Items {
  1050. for _, cont := range pd.Spec.Containers {
  1051. logs, err := logsFor(a.KubeClientSet, ns, pd.Name, cont.Name, sinceDuration, r.Context())
  1052. if err != nil {
  1053. continue
  1054. }
  1055. appendLog(ns, pd.Name, cont.Name, logs)
  1056. }
  1057. }
  1058. }
  1059. w.Write([]byte(logResult))
  1060. }
  1061. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1062. w.Header().Set("Content-Type", "application/json")
  1063. w.Header().Set("Access-Control-Allow-Origin", "*")
  1064. r.ParseForm()
  1065. key := r.PostForm.Get("key")
  1066. k := []byte(key)
  1067. err := ioutil.WriteFile("/var/configs/key.json", k, 0644)
  1068. if err != nil {
  1069. fmt.Fprintf(w, "Error writing service key: "+err.Error())
  1070. }
  1071. w.WriteHeader(http.StatusOK)
  1072. }
  1073. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1074. w.Header().Set("Content-Type", "application/json")
  1075. w.Header().Set("Access-Control-Allow-Origin", "*")
  1076. encodedValues := env.Get("HELM_VALUES", "")
  1077. if encodedValues == "" {
  1078. fmt.Fprintf(w, "Values reporting disabled")
  1079. return
  1080. }
  1081. result, err := base64.StdEncoding.DecodeString(encodedValues)
  1082. if err != nil {
  1083. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  1084. return
  1085. }
  1086. w.Write(result)
  1087. }
  1088. func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  1089. w.Header().Set("Content-Type", "application/json")
  1090. w.Header().Set("Access-Control-Allow-Origin", "*")
  1091. promServer := env.GetPrometheusServerEndpoint()
  1092. api := prometheusAPI.NewAPI(a.PrometheusClient)
  1093. result, err := api.Config(r.Context())
  1094. if err != nil {
  1095. fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
  1096. } else {
  1097. fmt.Fprintf(w, "Using Prometheus at "+promServer+". PrometheusConfig: "+result.YAML)
  1098. }
  1099. }
  1100. // captures the panic event in sentry
  1101. func capturePanicEvent(err string, stack string) {
  1102. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  1103. klog.V(1).Infoln(msg)
  1104. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  1105. Level: sentry.LevelError,
  1106. Message: msg,
  1107. })
  1108. sentry.Flush(5 * time.Second)
  1109. }
  1110. // handle any panics reported by the errors package
  1111. func handlePanic(p errors.Panic) bool {
  1112. err := p.Error
  1113. if err != nil {
  1114. if err, ok := err.(error); ok {
  1115. capturePanicEvent(err.Error(), p.Stack)
  1116. }
  1117. if err, ok := err.(string); ok {
  1118. capturePanicEvent(err, p.Stack)
  1119. }
  1120. }
  1121. // Return true to recover iff the type is http, otherwise allow kubernetes
  1122. // to recover.
  1123. return p.Type == errors.PanicTypeHTTP
  1124. }
  1125. func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  1126. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", env.GetAppVersion())
  1127. configWatchers := watcher.NewConfigMapWatchers(additionalConfigWatchers...)
  1128. var err error
  1129. if errorReportingEnabled {
  1130. err = sentry.Init(sentry.ClientOptions{Release: env.GetAppVersion()})
  1131. if err != nil {
  1132. klog.Infof("Failed to initialize sentry for error reporting")
  1133. } else {
  1134. err = errors.SetPanicHandler(handlePanic)
  1135. if err != nil {
  1136. klog.Infof("Failed to set panic handler: %s", err)
  1137. }
  1138. }
  1139. }
  1140. address := env.GetPrometheusServerEndpoint()
  1141. if address == "" {
  1142. klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  1143. }
  1144. queryConcurrency := env.GetMaxQueryConcurrency()
  1145. klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  1146. timeout := 120 * time.Second
  1147. keepAlive := 120 * time.Second
  1148. tlsHandshakeTimeout := 10 * time.Second
  1149. scrapeInterval := time.Minute
  1150. var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
  1151. if env.IsPrometheusRetryOnRateLimitResponse() {
  1152. rateLimitRetryOpts = &prom.RateLimitRetryOpts{
  1153. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  1154. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  1155. }
  1156. }
  1157. promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
  1158. Timeout: timeout,
  1159. KeepAlive: keepAlive,
  1160. TLSHandshakeTimeout: tlsHandshakeTimeout,
  1161. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  1162. RateLimitRetryOpts: rateLimitRetryOpts,
  1163. Auth: &prom.ClientAuth{
  1164. Username: env.GetDBBasicAuthUsername(),
  1165. Password: env.GetDBBasicAuthUserPassword(),
  1166. BearerToken: env.GetDBBearerToken(),
  1167. },
  1168. QueryConcurrency: queryConcurrency,
  1169. QueryLogFile: "",
  1170. })
  1171. if err != nil {
  1172. klog.Fatalf("Failed to create prometheus client, Error: %v", err)
  1173. }
  1174. m, err := prom.Validate(promCli)
  1175. if err != nil || !m.Running {
  1176. if err != nil {
  1177. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1178. } else if !m.Running {
  1179. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  1180. }
  1181. } else {
  1182. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  1183. }
  1184. api := prometheusAPI.NewAPI(promCli)
  1185. _, err = api.Config(context.Background())
  1186. if err != nil {
  1187. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1188. } else {
  1189. klog.Infof("Retrieved a prometheus config file from: %s", address)
  1190. }
  1191. // Lookup scrape interval for kubecost job, update if found
  1192. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  1193. if err == nil {
  1194. scrapeInterval = si
  1195. }
  1196. klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  1197. // Kubernetes API setup
  1198. var kc *rest.Config
  1199. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  1200. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  1201. } else {
  1202. kc, err = rest.InClusterConfig()
  1203. }
  1204. if err != nil {
  1205. panic(err.Error())
  1206. }
  1207. kubeClientset, err := kubernetes.NewForConfig(kc)
  1208. if err != nil {
  1209. panic(err.Error())
  1210. }
  1211. // Create ConfigFileManager for synchronization of shared configuration
  1212. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  1213. BucketStoreConfig: env.GetKubecostConfigBucket(),
  1214. LocalConfigPath: "/",
  1215. })
  1216. // Create Kubernetes Cluster Cache + Watchers
  1217. var k8sCache clustercache.ClusterCache
  1218. if env.IsClusterCacheFileEnabled() {
  1219. importLocation := confManager.ConfigFileAt("/var/configs/cluster-cache.json")
  1220. k8sCache = clustercache.NewClusterImporter(importLocation)
  1221. } else {
  1222. k8sCache = clustercache.NewKubernetesClusterCache(kubeClientset)
  1223. }
  1224. k8sCache.Run()
  1225. cloudProviderKey := env.GetCloudProviderAPIKey()
  1226. cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey, confManager)
  1227. if err != nil {
  1228. panic(err.Error())
  1229. }
  1230. // Append the pricing config watcher
  1231. configWatchers.AddWatcher(cloud.ConfigWatcherFor(cloudProvider))
  1232. configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
  1233. watchConfigFunc := configWatchers.ToWatchFunc()
  1234. watchedConfigs := configWatchers.GetWatchedConfigs()
  1235. kubecostNamespace := env.GetKubecostNamespace()
  1236. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  1237. for _, cw := range watchedConfigs {
  1238. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
  1239. if err != nil {
  1240. klog.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
  1241. } else {
  1242. klog.Infof("Found configmap %s, watching...", configs.Name)
  1243. watchConfigFunc(configs)
  1244. }
  1245. }
  1246. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  1247. remoteEnabled := env.IsRemoteEnabled()
  1248. if remoteEnabled {
  1249. info, err := cloudProvider.ClusterInfo()
  1250. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  1251. if err != nil {
  1252. klog.Infof("Error saving cluster id %s", err.Error())
  1253. }
  1254. _, _, err = cloud.GetOrCreateClusterMeta(info["id"], info["name"])
  1255. if err != nil {
  1256. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  1257. }
  1258. }
  1259. // Thanos Client
  1260. var thanosClient prometheus.Client
  1261. if thanos.IsEnabled() {
  1262. thanosAddress := thanos.QueryURL()
  1263. if thanosAddress != "" {
  1264. thanosCli, _ := thanos.NewThanosClient(thanosAddress, &prom.PrometheusClientConfig{
  1265. Timeout: timeout,
  1266. KeepAlive: keepAlive,
  1267. TLSHandshakeTimeout: tlsHandshakeTimeout,
  1268. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  1269. RateLimitRetryOpts: rateLimitRetryOpts,
  1270. Auth: &prom.ClientAuth{
  1271. Username: env.GetMultiClusterBasicAuthUsername(),
  1272. Password: env.GetMultiClusterBasicAuthPassword(),
  1273. BearerToken: env.GetMultiClusterBearerToken(),
  1274. },
  1275. QueryConcurrency: queryConcurrency,
  1276. QueryLogFile: env.GetQueryLoggingFile(),
  1277. })
  1278. _, err = prom.Validate(thanosCli)
  1279. if err != nil {
  1280. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  1281. thanosClient = thanosCli
  1282. } else {
  1283. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  1284. thanosClient = thanosCli
  1285. }
  1286. } else {
  1287. klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  1288. }
  1289. }
  1290. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  1291. var clusterInfoProvider clusters.ClusterInfoProvider
  1292. if env.IsClusterInfoFileEnabled() {
  1293. clusterInfoFile := confManager.ConfigFileAt("/var/configs/cluster-info.json")
  1294. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  1295. } else {
  1296. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  1297. }
  1298. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  1299. var clusterMap clusters.ClusterMap
  1300. if thanosClient != nil {
  1301. clusterMap = clusters.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
  1302. } else {
  1303. clusterMap = clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
  1304. }
  1305. // cache responses from model and aggregation for a default of 10 minutes;
  1306. // clear expired responses every 20 minutes
  1307. aggregateCache := cache.New(time.Minute*10, time.Minute*20)
  1308. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  1309. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1310. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  1311. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1312. // query durations that should be cached longer should be registered here
  1313. // use relatively prime numbers to minimize likelihood of synchronized
  1314. // attempts at cache warming
  1315. day := 24 * time.Hour
  1316. cacheExpiration := map[time.Duration]time.Duration{
  1317. day: maxCacheMinutes1d * time.Minute,
  1318. 2 * day: maxCacheMinutes2d * time.Minute,
  1319. 7 * day: maxCacheMinutes7d * time.Minute,
  1320. 30 * day: maxCacheMinutes30d * time.Minute,
  1321. }
  1322. var pc prometheus.Client
  1323. if thanosClient != nil {
  1324. pc = thanosClient
  1325. } else {
  1326. pc = promCli
  1327. }
  1328. costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
  1329. metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
  1330. a := &Accesses{
  1331. Router: httprouter.New(),
  1332. PrometheusClient: promCli,
  1333. ThanosClient: thanosClient,
  1334. KubeClientSet: kubeClientset,
  1335. ClusterCache: k8sCache,
  1336. ClusterMap: clusterMap,
  1337. CloudProvider: cloudProvider,
  1338. ConfigFileManager: confManager,
  1339. ClusterInfoProvider: clusterInfoProvider,
  1340. Model: costModel,
  1341. MetricsEmitter: metricsEmitter,
  1342. AggregateCache: aggregateCache,
  1343. CostDataCache: costDataCache,
  1344. ClusterCostsCache: clusterCostsCache,
  1345. OutOfClusterCache: outOfClusterCache,
  1346. SettingsCache: settingsCache,
  1347. CacheExpiration: cacheExpiration,
  1348. httpServices: services.NewCostModelServices(),
  1349. }
  1350. // Use the Accesses instance, itself, as the CostModelAggregator. This is
  1351. // confusing and unconventional, but necessary so that we can swap it
  1352. // out for the ETL-adapted version elsewhere.
  1353. // TODO clean this up once ETL is open-sourced.
  1354. a.AggAPI = a
  1355. // Initialize mechanism for subscribing to settings changes
  1356. a.InitializeSettingsPubSub()
  1357. err = a.CloudProvider.DownloadPricingData()
  1358. if err != nil {
  1359. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  1360. }
  1361. // Warm the aggregate cache unless explicitly set to false
  1362. if env.IsCacheWarmingEnabled() {
  1363. log.Infof("Init: AggregateCostModel cache warming enabled")
  1364. a.warmAggregateCostModelCache()
  1365. } else {
  1366. log.Infof("Init: AggregateCostModel cache warming disabled")
  1367. }
  1368. if !env.IsKubecostMetricsPodEnabled() {
  1369. a.MetricsEmitter.Start()
  1370. }
  1371. a.Router.GET("/costDataModel", a.CostDataModel)
  1372. a.Router.GET("/costDataModelRange", a.CostDataModelRange)
  1373. a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
  1374. a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
  1375. a.Router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
  1376. a.Router.GET("/allNodePricing", a.GetAllNodePricing)
  1377. a.Router.POST("/refreshPricing", a.RefreshPricingData)
  1378. a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  1379. a.Router.GET("/clusterCosts", a.ClusterCosts)
  1380. a.Router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  1381. a.Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  1382. a.Router.GET("/managementPlatform", a.ManagementPlatform)
  1383. a.Router.GET("/clusterInfo", a.ClusterInfo)
  1384. a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  1385. a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  1386. a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  1387. a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  1388. // endpoints migrated from server
  1389. a.Router.GET("/allPersistentVolumes", a.GetAllPersistentVolumes)
  1390. a.Router.GET("/allDeployments", a.GetAllDeployments)
  1391. a.Router.GET("/allStorageClasses", a.GetAllStorageClasses)
  1392. a.Router.GET("/allStatefulSets", a.GetAllStatefulSets)
  1393. a.Router.GET("/allNodes", a.GetAllNodes)
  1394. a.Router.GET("/allPods", a.GetAllPods)
  1395. a.Router.GET("/allNamespaces", a.GetAllNamespaces)
  1396. a.Router.GET("/allDaemonSets", a.GetAllDaemonSets)
  1397. a.Router.GET("/pod/:namespace/:name", a.GetPod)
  1398. a.Router.GET("/prometheusRecordingRules", a.PrometheusRecordingRules)
  1399. a.Router.GET("/prometheusConfig", a.PrometheusConfig)
  1400. a.Router.GET("/prometheusTargets", a.PrometheusTargets)
  1401. a.Router.GET("/orphanedPods", a.GetOrphanedPods)
  1402. a.Router.GET("/installNamespace", a.GetInstallNamespace)
  1403. a.Router.GET("/podLogs", a.GetPodLogs)
  1404. a.Router.POST("/serviceKey", a.AddServiceKey)
  1405. a.Router.GET("/helmValues", a.GetHelmValues)
  1406. a.Router.GET("/status", a.Status)
  1407. // prom query proxies
  1408. a.Router.GET("/prometheusQuery", a.PrometheusQuery)
  1409. a.Router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
  1410. a.Router.GET("/thanosQuery", a.ThanosQuery)
  1411. a.Router.GET("/thanosQueryRange", a.ThanosQueryRange)
  1412. // diagnostics
  1413. a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
  1414. a.Router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
  1415. a.httpServices.RegisterAll(a.Router)
  1416. return a
  1417. }