router.go 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "path"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/microcosm-cc/bluemonday"
  15. "github.com/opencost/opencost/core/pkg/opencost"
  16. "github.com/opencost/opencost/core/pkg/util/httputil"
  17. "github.com/opencost/opencost/core/pkg/util/timeutil"
  18. "github.com/opencost/opencost/core/pkg/version"
  19. "github.com/opencost/opencost/pkg/cloud/aws"
  20. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  21. "github.com/opencost/opencost/pkg/cloud/gcp"
  22. "github.com/opencost/opencost/pkg/cloud/provider"
  23. "github.com/opencost/opencost/pkg/cloudcost"
  24. "github.com/opencost/opencost/pkg/config"
  25. clustermap "github.com/opencost/opencost/pkg/costmodel/clusters"
  26. "github.com/opencost/opencost/pkg/customcost"
  27. "github.com/opencost/opencost/pkg/kubeconfig"
  28. "github.com/opencost/opencost/pkg/metrics"
  29. "github.com/opencost/opencost/pkg/services"
  30. "github.com/opencost/opencost/pkg/util/watcher"
  31. "github.com/julienschmidt/httprouter"
  32. "github.com/getsentry/sentry-go"
  33. "github.com/opencost/opencost/core/pkg/clusters"
  34. sysenv "github.com/opencost/opencost/core/pkg/env"
  35. "github.com/opencost/opencost/core/pkg/log"
  36. "github.com/opencost/opencost/core/pkg/util/json"
  37. "github.com/opencost/opencost/pkg/cloud/azure"
  38. "github.com/opencost/opencost/pkg/cloud/models"
  39. "github.com/opencost/opencost/pkg/cloud/utils"
  40. "github.com/opencost/opencost/pkg/clustercache"
  41. "github.com/opencost/opencost/pkg/env"
  42. "github.com/opencost/opencost/pkg/errors"
  43. "github.com/opencost/opencost/pkg/prom"
  44. "github.com/opencost/opencost/pkg/thanos"
  45. prometheus "github.com/prometheus/client_golang/api"
  46. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  47. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  48. "github.com/patrickmn/go-cache"
  49. "k8s.io/client-go/kubernetes"
  50. )
  51. var sanitizePolicy = bluemonday.UGCPolicy()
  52. const (
  53. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  54. maxCacheMinutes1d = 11
  55. maxCacheMinutes2d = 17
  56. maxCacheMinutes7d = 37
  57. maxCacheMinutes30d = 137
  58. CustomPricingSetting = "CustomPricing"
  59. DiscountSetting = "Discount"
  60. epRules = apiPrefix + "/rules"
  61. )
  62. var (
  63. // gitCommit is set by the build system
  64. gitCommit string
  65. )
  66. // Accesses defines a singleton application instance, providing access to
  67. // Prometheus, Kubernetes, the cloud provider, and caches.
  68. type Accesses struct {
  69. PrometheusClient prometheus.Client
  70. ThanosClient prometheus.Client
  71. KubeClientSet kubernetes.Interface
  72. ClusterCache clustercache.ClusterCache
  73. ClusterMap clusters.ClusterMap
  74. CloudProvider models.Provider
  75. ConfigFileManager *config.ConfigFileManager
  76. ClusterInfoProvider clusters.ClusterInfoProvider
  77. Model *CostModel
  78. MetricsEmitter *CostModelMetricsEmitter
  79. OutOfClusterCache *cache.Cache
  80. AggregateCache *cache.Cache
  81. CostDataCache *cache.Cache
  82. ClusterCostsCache *cache.Cache
  83. CacheExpiration map[time.Duration]time.Duration
  84. AggAPI Aggregator
  85. // SettingsCache stores current state of app settings
  86. SettingsCache *cache.Cache
  87. // settingsSubscribers tracks channels through which changes to different
  88. // settings will be published in a pub/sub model
  89. settingsSubscribers map[string][]chan string
  90. settingsMutex sync.Mutex
  91. // registered http service instances
  92. httpServices services.HTTPServices
  93. }
  94. // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
  95. // should be used.
  96. func (a *Accesses) GetPrometheusClient(remote bool) prometheus.Client {
  97. // Use Thanos Client if it exists (enabled) and remote flag set
  98. var pc prometheus.Client
  99. if remote && a.ThanosClient != nil {
  100. pc = a.ThanosClient
  101. } else {
  102. pc = a.PrometheusClient
  103. }
  104. return pc
  105. }
  106. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  107. // If one does not exists, it returns the default cache expiration, which is defined by
  108. // the particular cache.
  109. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  110. if expiration, ok := a.CacheExpiration[dur]; ok {
  111. return expiration
  112. }
  113. return cache.DefaultExpiration
  114. }
  115. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  116. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  117. // not found or is less than 2 minutes.
  118. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  119. expiry := a.GetCacheExpiration(dur).Minutes()
  120. if expiry <= 2.0 {
  121. return time.Minute
  122. }
  123. mins := time.Duration(expiry/2.0) * time.Minute
  124. return mins
  125. }
  126. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  127. w.Header().Set("Content-Type", "application/json")
  128. duration := 24 * time.Hour
  129. offset := time.Minute
  130. durationHrs := "24h"
  131. fmtOffset := "1m"
  132. pClient := a.GetPrometheusClient(true)
  133. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  134. if data, valid := a.ClusterCostsCache.Get(key); valid {
  135. clusterCosts := data.(map[string]*ClusterCosts)
  136. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  137. } else {
  138. data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
  139. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  140. }
  141. }
  142. type Response struct {
  143. Code int `json:"code"`
  144. Status string `json:"status"`
  145. Data interface{} `json:"data"`
  146. Message string `json:"message,omitempty"`
  147. Warning string `json:"warning,omitempty"`
  148. }
  149. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  150. type FilterFunc func(*CostData) (bool, string)
  151. // FilterCostData allows through only CostData that matches all the given filter functions
  152. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  153. result := make(map[string]*CostData)
  154. filteredEnvironments := make(map[string]int)
  155. filteredContainers := 0
  156. DataLoop:
  157. for key, datum := range data {
  158. for _, rf := range retains {
  159. if ok, _ := rf(datum); ok {
  160. result[key] = datum
  161. // if any retain function passes, the data is retained and move on
  162. continue DataLoop
  163. }
  164. }
  165. for _, ff := range filters {
  166. if ok, environment := ff(datum); !ok {
  167. if environment != "" {
  168. filteredEnvironments[environment]++
  169. }
  170. filteredContainers++
  171. // if any filter function check fails, move on to the next datum
  172. continue DataLoop
  173. }
  174. }
  175. result[key] = datum
  176. }
  177. return result, filteredContainers, filteredEnvironments
  178. }
  179. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  180. fs := strings.Split(fields, ",")
  181. fmap := make(map[string]bool)
  182. for _, f := range fs {
  183. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  184. log.Debugf("to delete: %s", fieldNameLower)
  185. fmap[fieldNameLower] = true
  186. }
  187. filteredData := make(map[string]CostData)
  188. for cname, costdata := range data {
  189. s := reflect.TypeOf(*costdata)
  190. val := reflect.ValueOf(*costdata)
  191. costdata2 := CostData{}
  192. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  193. n := s.NumField()
  194. for i := 0; i < n; i++ {
  195. field := s.Field(i)
  196. value := val.Field(i)
  197. value2 := cd2.Field(i)
  198. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  199. value2.Set(reflect.Value(value))
  200. }
  201. }
  202. filteredData[cname] = cd2.Interface().(CostData)
  203. }
  204. return filteredData
  205. }
  206. func normalizeTimeParam(param string) (string, error) {
  207. if param == "" {
  208. return "", fmt.Errorf("invalid time param")
  209. }
  210. // convert days to hours
  211. if param[len(param)-1:] == "d" {
  212. count := param[:len(param)-1]
  213. val, err := strconv.ParseInt(count, 10, 64)
  214. if err != nil {
  215. return "", err
  216. }
  217. val = val * 24
  218. param = fmt.Sprintf("%dh", val)
  219. }
  220. return param, nil
  221. }
  222. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  223. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  224. // return 0.0.
  225. func ParsePercentString(percentStr string) (float64, error) {
  226. if len(percentStr) == 0 {
  227. return 0.0, nil
  228. }
  229. if percentStr[len(percentStr)-1:] == "%" {
  230. percentStr = percentStr[:len(percentStr)-1]
  231. }
  232. discount, err := strconv.ParseFloat(percentStr, 64)
  233. if err != nil {
  234. return 0.0, err
  235. }
  236. discount *= 0.01
  237. return discount, nil
  238. }
  239. func WrapData(data interface{}, err error) []byte {
  240. var resp []byte
  241. if err != nil {
  242. log.Errorf("Error returned to client: %s", err.Error())
  243. resp, _ = json.Marshal(&Response{
  244. Code: http.StatusInternalServerError,
  245. Status: "error",
  246. Message: err.Error(),
  247. Data: data,
  248. })
  249. } else {
  250. resp, err = json.Marshal(&Response{
  251. Code: http.StatusOK,
  252. Status: "success",
  253. Data: data,
  254. })
  255. if err != nil {
  256. log.Errorf("error marshaling response json: %s", err.Error())
  257. }
  258. }
  259. return resp
  260. }
  261. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  262. var resp []byte
  263. if err != nil {
  264. log.Errorf("Error returned to client: %s", err.Error())
  265. resp, _ = json.Marshal(&Response{
  266. Code: http.StatusInternalServerError,
  267. Status: "error",
  268. Message: err.Error(),
  269. Data: data,
  270. })
  271. } else {
  272. resp, _ = json.Marshal(&Response{
  273. Code: http.StatusOK,
  274. Status: "success",
  275. Data: data,
  276. Message: message,
  277. })
  278. }
  279. return resp
  280. }
  281. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  282. var resp []byte
  283. if err != nil {
  284. log.Errorf("Error returned to client: %s", err.Error())
  285. resp, _ = json.Marshal(&Response{
  286. Code: http.StatusInternalServerError,
  287. Status: "error",
  288. Message: err.Error(),
  289. Warning: warning,
  290. Data: data,
  291. })
  292. } else {
  293. resp, _ = json.Marshal(&Response{
  294. Code: http.StatusOK,
  295. Status: "success",
  296. Data: data,
  297. Warning: warning,
  298. })
  299. }
  300. return resp
  301. }
  302. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  303. var resp []byte
  304. if err != nil {
  305. log.Errorf("Error returned to client: %s", err.Error())
  306. resp, _ = json.Marshal(&Response{
  307. Code: http.StatusInternalServerError,
  308. Status: "error",
  309. Message: err.Error(),
  310. Warning: warning,
  311. Data: data,
  312. })
  313. } else {
  314. resp, _ = json.Marshal(&Response{
  315. Code: http.StatusOK,
  316. Status: "success",
  317. Data: data,
  318. Message: message,
  319. Warning: warning,
  320. })
  321. }
  322. return resp
  323. }
  324. // wrapAsObjectItems wraps a slice of items into an object containing a single items list
  325. // allows our k8s proxy methods to emulate a List() request to k8s API
  326. func wrapAsObjectItems(items interface{}) map[string]interface{} {
  327. return map[string]interface{}{
  328. "items": items,
  329. }
  330. }
  331. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  332. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  333. w.Header().Set("Content-Type", "application/json")
  334. w.Header().Set("Access-Control-Allow-Origin", "*")
  335. err := a.CloudProvider.DownloadPricingData()
  336. if err != nil {
  337. log.Errorf("Error refreshing pricing data: %s", err.Error())
  338. }
  339. w.Write(WrapData(nil, err))
  340. }
  341. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  342. w.Header().Set("Content-Type", "application/json")
  343. w.Header().Set("Access-Control-Allow-Origin", "*")
  344. window := r.URL.Query().Get("timeWindow")
  345. offset := r.URL.Query().Get("offset")
  346. fields := r.URL.Query().Get("filterFields")
  347. namespace := r.URL.Query().Get("namespace")
  348. if offset != "" {
  349. offset = "offset " + offset
  350. }
  351. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
  352. if fields != "" {
  353. filteredData := filterFields(fields, data)
  354. w.Write(WrapData(filteredData, err))
  355. } else {
  356. w.Write(WrapData(data, err))
  357. }
  358. }
  359. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  360. w.Header().Set("Content-Type", "application/json")
  361. w.Header().Set("Access-Control-Allow-Origin", "*")
  362. window := r.URL.Query().Get("window")
  363. offset := r.URL.Query().Get("offset")
  364. if window == "" {
  365. w.Write(WrapData(nil, fmt.Errorf("missing window argument")))
  366. return
  367. }
  368. windowDur, err := timeutil.ParseDuration(window)
  369. if err != nil {
  370. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  371. return
  372. }
  373. // offset is not a required parameter
  374. var offsetDur time.Duration
  375. if offset != "" {
  376. offsetDur, err = timeutil.ParseDuration(offset)
  377. if err != nil {
  378. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  379. return
  380. }
  381. }
  382. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  383. if useThanos && !thanos.IsEnabled() {
  384. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  385. return
  386. }
  387. var client prometheus.Client
  388. if useThanos {
  389. client = a.ThanosClient
  390. offsetDur = thanos.OffsetDuration()
  391. } else {
  392. client = a.PrometheusClient
  393. }
  394. data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
  395. w.Write(WrapData(data, err))
  396. }
  397. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  398. w.Header().Set("Content-Type", "application/json")
  399. w.Header().Set("Access-Control-Allow-Origin", "*")
  400. start := r.URL.Query().Get("start")
  401. end := r.URL.Query().Get("end")
  402. window := r.URL.Query().Get("window")
  403. offset := r.URL.Query().Get("offset")
  404. if window == "" {
  405. w.Write(WrapData(nil, fmt.Errorf("missing window argument")))
  406. return
  407. }
  408. windowDur, err := timeutil.ParseDuration(window)
  409. if err != nil {
  410. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  411. return
  412. }
  413. // offset is not a required parameter
  414. var offsetDur time.Duration
  415. if offset != "" {
  416. offsetDur, err = timeutil.ParseDuration(offset)
  417. if err != nil {
  418. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  419. return
  420. }
  421. }
  422. data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
  423. w.Write(WrapData(data, err))
  424. }
  425. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  426. w.Header().Set("Content-Type", "application/json")
  427. w.Header().Set("Access-Control-Allow-Origin", "*")
  428. startStr := r.URL.Query().Get("start")
  429. endStr := r.URL.Query().Get("end")
  430. windowStr := r.URL.Query().Get("window")
  431. fields := r.URL.Query().Get("filterFields")
  432. namespace := r.URL.Query().Get("namespace")
  433. cluster := r.URL.Query().Get("cluster")
  434. remote := r.URL.Query().Get("remote")
  435. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  436. layout := "2006-01-02T15:04:05.000Z"
  437. start, err := time.Parse(layout, startStr)
  438. if err != nil {
  439. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
  440. return
  441. }
  442. end, err := time.Parse(layout, endStr)
  443. if err != nil {
  444. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
  445. return
  446. }
  447. window := opencost.NewWindow(&start, &end)
  448. if window.IsOpen() || !window.HasDuration() || window.IsNegative() {
  449. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
  450. return
  451. }
  452. resolution := time.Hour
  453. if resDur, err := time.ParseDuration(windowStr); err == nil {
  454. resolution = resDur
  455. }
  456. // Use Thanos Client if it exists (enabled) and remote flag set
  457. var pClient prometheus.Client
  458. if remote != "false" && a.ThanosClient != nil {
  459. pClient = a.ThanosClient
  460. } else {
  461. pClient = a.PrometheusClient
  462. }
  463. data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
  464. if err != nil {
  465. w.Write(WrapData(nil, err))
  466. }
  467. if fields != "" {
  468. filteredData := filterFields(fields, data)
  469. w.Write(WrapData(filteredData, err))
  470. } else {
  471. w.Write(WrapData(data, err))
  472. }
  473. }
  474. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  475. var key string
  476. var filter string
  477. var val []string
  478. if customAggregation != "" {
  479. key = customAggregation
  480. filter = filterType
  481. val = strings.Split(customAggregation, ",")
  482. } else {
  483. aggregations := strings.Split(aggregator, ",")
  484. for i, agg := range aggregations {
  485. aggregations[i] = "kubernetes_" + agg
  486. }
  487. key = strings.Join(aggregations, ",")
  488. filter = "kubernetes_" + filterType
  489. val = aggregations
  490. }
  491. return key, val, filter
  492. }
  493. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  494. w.Header().Set("Content-Type", "application/json")
  495. w.Header().Set("Access-Control-Allow-Origin", "*")
  496. data, err := a.CloudProvider.AllNodePricing()
  497. w.Write(WrapData(data, err))
  498. }
  499. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  500. w.Header().Set("Content-Type", "application/json")
  501. w.Header().Set("Access-Control-Allow-Origin", "*")
  502. data, err := a.CloudProvider.GetConfig()
  503. w.Write(WrapData(data, err))
  504. }
  505. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  506. w.Header().Set("Content-Type", "application/json")
  507. w.Header().Set("Access-Control-Allow-Origin", "*")
  508. data, err := a.CloudProvider.UpdateConfig(r.Body, aws.SpotInfoUpdateType)
  509. if err != nil {
  510. w.Write(WrapData(data, err))
  511. return
  512. }
  513. w.Write(WrapData(data, err))
  514. err = a.CloudProvider.DownloadPricingData()
  515. if err != nil {
  516. log.Errorf("Error redownloading data on config update: %s", err.Error())
  517. }
  518. return
  519. }
  520. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  521. w.Header().Set("Content-Type", "application/json")
  522. w.Header().Set("Access-Control-Allow-Origin", "*")
  523. data, err := a.CloudProvider.UpdateConfig(r.Body, aws.AthenaInfoUpdateType)
  524. if err != nil {
  525. w.Write(WrapData(data, err))
  526. return
  527. }
  528. w.Write(WrapData(data, err))
  529. return
  530. }
  531. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  532. w.Header().Set("Content-Type", "application/json")
  533. w.Header().Set("Access-Control-Allow-Origin", "*")
  534. data, err := a.CloudProvider.UpdateConfig(r.Body, gcp.BigqueryUpdateType)
  535. if err != nil {
  536. w.Write(WrapData(data, err))
  537. return
  538. }
  539. w.Write(WrapData(data, err))
  540. return
  541. }
  542. func (a *Accesses) UpdateAzureStorageConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  543. w.Header().Set("Content-Type", "application/json")
  544. w.Header().Set("Access-Control-Allow-Origin", "*")
  545. data, err := a.CloudProvider.UpdateConfig(r.Body, azure.AzureStorageUpdateType)
  546. if err != nil {
  547. w.Write(WrapData(data, err))
  548. return
  549. }
  550. w.Write(WrapData(data, err))
  551. return
  552. }
  553. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  554. w.Header().Set("Content-Type", "application/json")
  555. w.Header().Set("Access-Control-Allow-Origin", "*")
  556. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  557. if err != nil {
  558. w.Write(WrapData(data, err))
  559. return
  560. }
  561. w.Write(WrapData(data, err))
  562. return
  563. }
  564. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  565. w.Header().Set("Content-Type", "application/json")
  566. w.Header().Set("Access-Control-Allow-Origin", "*")
  567. data, err := a.CloudProvider.GetManagementPlatform()
  568. if err != nil {
  569. w.Write(WrapData(data, err))
  570. return
  571. }
  572. w.Write(WrapData(data, err))
  573. return
  574. }
  575. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  576. w.Header().Set("Content-Type", "application/json")
  577. w.Header().Set("Access-Control-Allow-Origin", "*")
  578. data := a.ClusterInfoProvider.GetClusterInfo()
  579. w.Write(WrapData(data, nil))
  580. }
  581. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  582. w.Header().Set("Content-Type", "application/json")
  583. w.Header().Set("Access-Control-Allow-Origin", "*")
  584. data := a.ClusterMap.AsMap()
  585. w.Write(WrapData(data, nil))
  586. }
  587. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  588. w.Header().Set("Content-Type", "application/json")
  589. w.Header().Set("Access-Control-Allow-Origin", "*")
  590. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  591. }
  592. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  593. w.Header().Set("Content-Type", "application/json")
  594. w.Header().Set("Access-Control-Allow-Origin", "*")
  595. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  596. }
  597. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  598. w.Header().Set("Content-Type", "application/json")
  599. w.Header().Set("Access-Control-Allow-Origin", "*")
  600. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  601. }
  602. func (a *Accesses) GetPricingSourceSummary(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
  603. w.Header().Set("Content-Type", "application/json")
  604. w.Header().Set("Access-Control-Allow-Origin", "*")
  605. data := a.CloudProvider.PricingSourceSummary()
  606. w.Write(WrapData(data, nil))
  607. }
  608. func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  609. w.Header().Set("Content-Type", "application/json")
  610. w.Header().Set("Access-Control-Allow-Origin", "*")
  611. w.Write(WrapData(prom.Validate(a.PrometheusClient)))
  612. }
  613. func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  614. w.Header().Set("Content-Type", "application/json")
  615. w.Header().Set("Access-Control-Allow-Origin", "*")
  616. qp := httputil.NewQueryParams(r.URL.Query())
  617. query := qp.Get("query", "")
  618. if query == "" {
  619. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  620. return
  621. }
  622. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  623. var timeVal time.Time
  624. timeStr := qp.Get("time", "")
  625. if len(timeStr) > 0 {
  626. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  627. timeVal = time.Unix(t, 0)
  628. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  629. timeVal = t
  630. }
  631. // If time is given, but not parse-able, return an error
  632. if timeVal.IsZero() {
  633. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  634. }
  635. }
  636. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  637. body, err := ctx.RawQuery(query, timeVal)
  638. if err != nil {
  639. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  640. return
  641. }
  642. w.Write(body)
  643. }
  644. func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  645. w.Header().Set("Content-Type", "application/json")
  646. w.Header().Set("Access-Control-Allow-Origin", "*")
  647. qp := httputil.NewQueryParams(r.URL.Query())
  648. query := qp.Get("query", "")
  649. if query == "" {
  650. fmt.Fprintf(w, "Error parsing query from request parameters.")
  651. return
  652. }
  653. start, end, duration, err := toStartEndStep(qp)
  654. if err != nil {
  655. fmt.Fprintf(w, err.Error())
  656. return
  657. }
  658. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  659. body, err := ctx.RawQueryRange(query, start, end, duration)
  660. if err != nil {
  661. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  662. return
  663. }
  664. w.Write(body)
  665. }
  666. func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  667. w.Header().Set("Content-Type", "application/json")
  668. w.Header().Set("Access-Control-Allow-Origin", "*")
  669. if !thanos.IsEnabled() {
  670. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  671. return
  672. }
  673. qp := httputil.NewQueryParams(r.URL.Query())
  674. query := qp.Get("query", "")
  675. if query == "" {
  676. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  677. return
  678. }
  679. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  680. var timeVal time.Time
  681. timeStr := qp.Get("time", "")
  682. if len(timeStr) > 0 {
  683. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  684. timeVal = time.Unix(t, 0)
  685. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  686. timeVal = t
  687. }
  688. // If time is given, but not parse-able, return an error
  689. if timeVal.IsZero() {
  690. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  691. }
  692. }
  693. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  694. body, err := ctx.RawQuery(query, timeVal)
  695. if err != nil {
  696. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  697. return
  698. }
  699. w.Write(body)
  700. }
  701. func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  702. w.Header().Set("Content-Type", "application/json")
  703. w.Header().Set("Access-Control-Allow-Origin", "*")
  704. if !thanos.IsEnabled() {
  705. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  706. return
  707. }
  708. qp := httputil.NewQueryParams(r.URL.Query())
  709. query := qp.Get("query", "")
  710. if query == "" {
  711. fmt.Fprintf(w, "Error parsing query from request parameters.")
  712. return
  713. }
  714. start, end, duration, err := toStartEndStep(qp)
  715. if err != nil {
  716. fmt.Fprintf(w, err.Error())
  717. return
  718. }
  719. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  720. body, err := ctx.RawQueryRange(query, start, end, duration)
  721. if err != nil {
  722. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  723. return
  724. }
  725. w.Write(body)
  726. }
  727. // helper for query range proxy requests
  728. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  729. var e error
  730. ss := qp.Get("start", "")
  731. es := qp.Get("end", "")
  732. ds := qp.Get("duration", "")
  733. layout := "2006-01-02T15:04:05.000Z"
  734. start, e = time.Parse(layout, ss)
  735. if e != nil {
  736. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  737. return
  738. }
  739. end, e = time.Parse(layout, es)
  740. if e != nil {
  741. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  742. return
  743. }
  744. step, e = time.ParseDuration(ds)
  745. if e != nil {
  746. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  747. return
  748. }
  749. err = nil
  750. return
  751. }
  752. func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  753. w.Header().Set("Content-Type", "application/json")
  754. w.Header().Set("Access-Control-Allow-Origin", "*")
  755. promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
  756. if err != nil {
  757. w.Write(WrapData(nil, err))
  758. return
  759. }
  760. result := map[string]*prom.PrometheusQueueState{
  761. "prometheus": promQueueState,
  762. }
  763. if thanos.IsEnabled() {
  764. thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
  765. if err != nil {
  766. log.Warnf("Error getting Thanos queue state: %s", err)
  767. } else {
  768. result["thanos"] = thanosQueueState
  769. }
  770. }
  771. w.Write(WrapData(result, nil))
  772. }
  773. // GetPrometheusMetrics retrieves availability of Prometheus and Thanos metrics
  774. func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  775. w.Header().Set("Content-Type", "application/json")
  776. w.Header().Set("Access-Control-Allow-Origin", "*")
  777. promMetrics := prom.GetPrometheusMetrics(a.PrometheusClient, "")
  778. result := map[string][]*prom.PrometheusDiagnostic{
  779. "prometheus": promMetrics,
  780. }
  781. if thanos.IsEnabled() {
  782. thanosMetrics := prom.GetPrometheusMetrics(a.ThanosClient, thanos.QueryOffset())
  783. result["thanos"] = thanosMetrics
  784. }
  785. w.Write(WrapData(result, nil))
  786. }
  787. func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  788. w.Header().Set("Content-Type", "application/json")
  789. w.Header().Set("Access-Control-Allow-Origin", "*")
  790. u := a.PrometheusClient.URL(epRules, nil)
  791. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  792. if err != nil {
  793. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  794. }
  795. _, body, err := a.PrometheusClient.Do(r.Context(), req)
  796. if err != nil {
  797. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  798. } else {
  799. w.Write(body)
  800. }
  801. }
  802. func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  803. w.Header().Set("Content-Type", "application/json")
  804. w.Header().Set("Access-Control-Allow-Origin", "*")
  805. pConfig := map[string]string{
  806. "address": env.GetPrometheusServerEndpoint(),
  807. }
  808. body, err := json.Marshal(pConfig)
  809. if err != nil {
  810. fmt.Fprintf(w, "Error marshalling prometheus config")
  811. } else {
  812. w.Write(body)
  813. }
  814. }
  815. func (a *Accesses) PrometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  816. w.Header().Set("Content-Type", "application/json")
  817. w.Header().Set("Access-Control-Allow-Origin", "*")
  818. u := a.PrometheusClient.URL(epTargets, nil)
  819. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  820. if err != nil {
  821. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  822. }
  823. _, body, err := a.PrometheusClient.Do(r.Context(), req)
  824. if err != nil {
  825. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  826. } else {
  827. w.Write(body)
  828. }
  829. }
  830. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  831. w.Header().Set("Content-Type", "application/json")
  832. w.Header().Set("Access-Control-Allow-Origin", "*")
  833. podlist := a.ClusterCache.GetAllPods()
  834. var lonePods []*clustercache.Pod
  835. for _, pod := range podlist {
  836. if len(pod.OwnerReferences) == 0 {
  837. lonePods = append(lonePods, pod)
  838. }
  839. }
  840. body, err := json.Marshal(lonePods)
  841. if err != nil {
  842. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  843. } else {
  844. w.Write(body)
  845. }
  846. }
  847. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  848. w.Header().Set("Content-Type", "application/json")
  849. w.Header().Set("Access-Control-Allow-Origin", "*")
  850. ns := env.GetKubecostNamespace()
  851. w.Write([]byte(ns))
  852. }
  853. type InstallInfo struct {
  854. Containers []ContainerInfo `json:"containers"`
  855. ClusterInfo map[string]string `json:"clusterInfo"`
  856. Version string `json:"version"`
  857. }
  858. type ContainerInfo struct {
  859. ContainerName string `json:"containerName"`
  860. Image string `json:"image"`
  861. StartTime string `json:"startTime"`
  862. }
  863. func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  864. w.Header().Set("Content-Type", "application/json")
  865. w.Header().Set("Access-Control-Allow-Origin", "*")
  866. containers, err := GetKubecostContainers(a.KubeClientSet)
  867. if err != nil {
  868. writeErrorResponse(w, 500, fmt.Sprintf("Unable to list pods: %s", err.Error()))
  869. return
  870. }
  871. info := InstallInfo{
  872. Containers: containers,
  873. ClusterInfo: make(map[string]string),
  874. Version: version.FriendlyVersion(),
  875. }
  876. nodes := a.ClusterCache.GetAllNodes()
  877. cachePods := a.ClusterCache.GetAllPods()
  878. info.ClusterInfo["nodeCount"] = strconv.Itoa(len(nodes))
  879. info.ClusterInfo["podCount"] = strconv.Itoa(len(cachePods))
  880. body, err := json.Marshal(info)
  881. if err != nil {
  882. writeErrorResponse(w, 500, fmt.Sprintf("Error decoding pod: %s", err.Error()))
  883. return
  884. }
  885. w.Write(body)
  886. }
  887. func GetKubecostContainers(kubeClientSet kubernetes.Interface) ([]ContainerInfo, error) {
  888. pods, err := kubeClientSet.CoreV1().Pods(env.GetKubecostNamespace()).List(context.Background(), metav1.ListOptions{
  889. LabelSelector: "app=cost-analyzer",
  890. FieldSelector: "status.phase=Running",
  891. Limit: 1,
  892. })
  893. if err != nil {
  894. return nil, fmt.Errorf("failed to query kubernetes client for kubecost pods: %s", err)
  895. }
  896. // If we have zero pods either something is weird with the install since the app selector is not exposed in the helm
  897. // chart or more likely we are running locally - in either case Images field will return as null
  898. var containers []ContainerInfo
  899. if len(pods.Items) > 0 {
  900. for _, pod := range pods.Items {
  901. for _, container := range pod.Spec.Containers {
  902. c := ContainerInfo{
  903. ContainerName: container.Name,
  904. Image: container.Image,
  905. StartTime: pod.Status.StartTime.String(),
  906. }
  907. containers = append(containers, c)
  908. }
  909. }
  910. }
  911. return containers, nil
  912. }
  913. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  914. w.Header().Set("Content-Type", "application/json")
  915. w.Header().Set("Access-Control-Allow-Origin", "*")
  916. r.ParseForm()
  917. key := r.PostForm.Get("key")
  918. k := []byte(key)
  919. err := os.WriteFile(path.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "key.json"), k, 0644)
  920. if err != nil {
  921. fmt.Fprintf(w, "Error writing service key: "+err.Error())
  922. }
  923. w.WriteHeader(http.StatusOK)
  924. }
  925. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  926. w.Header().Set("Content-Type", "application/json")
  927. w.Header().Set("Access-Control-Allow-Origin", "*")
  928. encodedValues := sysenv.Get("HELM_VALUES", "")
  929. if encodedValues == "" {
  930. fmt.Fprintf(w, "Values reporting disabled")
  931. return
  932. }
  933. result, err := base64.StdEncoding.DecodeString(encodedValues)
  934. if err != nil {
  935. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  936. return
  937. }
  938. w.Write(result)
  939. }
  940. func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  941. w.Header().Set("Content-Type", "application/json")
  942. w.Header().Set("Access-Control-Allow-Origin", "*")
  943. promServer := env.GetPrometheusServerEndpoint()
  944. api := prometheusAPI.NewAPI(a.PrometheusClient)
  945. result, err := api.Buildinfo(r.Context())
  946. if err != nil {
  947. fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
  948. } else {
  949. fmt.Fprintf(w, "Using Prometheus at "+promServer+". Version: "+result.Version)
  950. }
  951. }
  952. // captures the panic event in sentry
  953. func capturePanicEvent(err string, stack string) {
  954. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  955. log.Infof(msg)
  956. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  957. Level: sentry.LevelError,
  958. Message: msg,
  959. })
  960. sentry.Flush(5 * time.Second)
  961. }
  962. // handle any panics reported by the errors package
  963. func handlePanic(p errors.Panic) bool {
  964. err := p.Error
  965. if err != nil {
  966. if err, ok := err.(error); ok {
  967. capturePanicEvent(err.Error(), p.Stack)
  968. }
  969. if err, ok := err.(string); ok {
  970. capturePanicEvent(err, p.Stack)
  971. }
  972. }
  973. // Return true to recover iff the type is http, otherwise allow kubernetes
  974. // to recover.
  975. return p.Type == errors.PanicTypeHTTP
  976. }
  977. func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  978. var err error
  979. if errorReportingEnabled {
  980. err = sentry.Init(sentry.ClientOptions{Release: version.FriendlyVersion()})
  981. if err != nil {
  982. log.Infof("Failed to initialize sentry for error reporting")
  983. } else {
  984. err = errors.SetPanicHandler(handlePanic)
  985. if err != nil {
  986. log.Infof("Failed to set panic handler: %s", err)
  987. }
  988. }
  989. }
  990. address := env.GetPrometheusServerEndpoint()
  991. if address == "" {
  992. log.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  993. }
  994. queryConcurrency := env.GetMaxQueryConcurrency()
  995. log.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  996. timeout := 120 * time.Second
  997. keepAlive := 120 * time.Second
  998. tlsHandshakeTimeout := 10 * time.Second
  999. scrapeInterval := env.GetKubecostScrapeInterval()
  1000. var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
  1001. if env.IsPrometheusRetryOnRateLimitResponse() {
  1002. rateLimitRetryOpts = &prom.RateLimitRetryOpts{
  1003. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  1004. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  1005. }
  1006. }
  1007. promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
  1008. Timeout: timeout,
  1009. KeepAlive: keepAlive,
  1010. TLSHandshakeTimeout: tlsHandshakeTimeout,
  1011. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  1012. RateLimitRetryOpts: rateLimitRetryOpts,
  1013. Auth: &prom.ClientAuth{
  1014. Username: env.GetDBBasicAuthUsername(),
  1015. Password: env.GetDBBasicAuthUserPassword(),
  1016. BearerToken: env.GetDBBearerToken(),
  1017. },
  1018. QueryConcurrency: queryConcurrency,
  1019. QueryLogFile: "",
  1020. HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
  1021. })
  1022. if err != nil {
  1023. log.Fatalf("Failed to create prometheus client, Error: %v", err)
  1024. }
  1025. m, err := prom.Validate(promCli)
  1026. if err != nil || !m.Running {
  1027. if err != nil {
  1028. log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1029. } else if !m.Running {
  1030. log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  1031. }
  1032. } else {
  1033. log.Infof("Success: retrieved the 'up' query against prometheus at: " + address)
  1034. }
  1035. api := prometheusAPI.NewAPI(promCli)
  1036. _, err = api.Buildinfo(context.Background())
  1037. if err != nil {
  1038. log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1039. } else {
  1040. log.Infof("Retrieved a prometheus config file from: %s", address)
  1041. }
  1042. if scrapeInterval == 0 {
  1043. scrapeInterval = time.Minute
  1044. // Lookup scrape interval for kubecost job, update if found
  1045. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  1046. if err == nil {
  1047. scrapeInterval = si
  1048. }
  1049. }
  1050. log.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  1051. // Kubernetes API setup
  1052. kubeClientset, err := kubeconfig.LoadKubeClient("")
  1053. if err != nil {
  1054. log.Fatalf("Failed to build Kubernetes client: %s", err.Error())
  1055. }
  1056. // Create ConfigFileManager for synchronization of shared configuration
  1057. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  1058. BucketStoreConfig: env.GetKubecostConfigBucket(),
  1059. LocalConfigPath: "/",
  1060. })
  1061. configPrefix := env.GetConfigPathWithDefault("/var/configs/")
  1062. // Create Kubernetes Cluster Cache + Watchers
  1063. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  1064. k8sCache.Run()
  1065. cloudProviderKey := env.GetCloudProviderAPIKey()
  1066. cloudProvider, err := provider.NewProvider(k8sCache, cloudProviderKey, confManager)
  1067. if err != nil {
  1068. panic(err.Error())
  1069. }
  1070. // Append the pricing config watcher
  1071. kubecostNamespace := env.GetKubecostNamespace()
  1072. configWatchers := watcher.NewConfigMapWatchers(kubeClientset, kubecostNamespace, additionalConfigWatchers...)
  1073. configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
  1074. configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
  1075. configWatchers.Watch()
  1076. remoteEnabled := env.IsRemoteEnabled()
  1077. if remoteEnabled {
  1078. info, err := cloudProvider.ClusterInfo()
  1079. log.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  1080. if err != nil {
  1081. log.Infof("Error saving cluster id %s", err.Error())
  1082. }
  1083. _, _, err = utils.GetOrCreateClusterMeta(info["id"], info["name"])
  1084. if err != nil {
  1085. log.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  1086. }
  1087. }
  1088. // Thanos Client
  1089. var thanosClient prometheus.Client
  1090. if thanos.IsEnabled() {
  1091. thanosAddress := thanos.QueryURL()
  1092. if thanosAddress != "" {
  1093. thanosCli, _ := thanos.NewThanosClient(thanosAddress, &prom.PrometheusClientConfig{
  1094. Timeout: timeout,
  1095. KeepAlive: keepAlive,
  1096. TLSHandshakeTimeout: tlsHandshakeTimeout,
  1097. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  1098. RateLimitRetryOpts: rateLimitRetryOpts,
  1099. Auth: &prom.ClientAuth{
  1100. Username: env.GetMultiClusterBasicAuthUsername(),
  1101. Password: env.GetMultiClusterBasicAuthPassword(),
  1102. BearerToken: env.GetMultiClusterBearerToken(),
  1103. },
  1104. QueryConcurrency: queryConcurrency,
  1105. QueryLogFile: env.GetQueryLoggingFile(),
  1106. })
  1107. _, err = prom.Validate(thanosCli)
  1108. if err != nil {
  1109. log.Warnf("Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  1110. thanosClient = thanosCli
  1111. } else {
  1112. log.Infof("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  1113. thanosClient = thanosCli
  1114. }
  1115. } else {
  1116. log.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  1117. }
  1118. }
  1119. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  1120. var clusterInfoProvider clusters.ClusterInfoProvider
  1121. if env.IsClusterInfoFileEnabled() {
  1122. clusterInfoFile := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-info.json"))
  1123. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  1124. } else {
  1125. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  1126. }
  1127. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  1128. var clusterMap clusters.ClusterMap
  1129. if thanosClient != nil {
  1130. clusterMap = clustermap.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
  1131. } else {
  1132. clusterMap = clustermap.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
  1133. }
  1134. // cache responses from model and aggregation for a default of 10 minutes;
  1135. // clear expired responses every 20 minutes
  1136. aggregateCache := cache.New(time.Minute*10, time.Minute*20)
  1137. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  1138. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1139. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  1140. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1141. // query durations that should be cached longer should be registered here
  1142. // use relatively prime numbers to minimize likelihood of synchronized
  1143. // attempts at cache warming
  1144. day := 24 * time.Hour
  1145. cacheExpiration := map[time.Duration]time.Duration{
  1146. day: maxCacheMinutes1d * time.Minute,
  1147. 2 * day: maxCacheMinutes2d * time.Minute,
  1148. 7 * day: maxCacheMinutes7d * time.Minute,
  1149. 30 * day: maxCacheMinutes30d * time.Minute,
  1150. }
  1151. var pc prometheus.Client
  1152. if thanosClient != nil {
  1153. pc = thanosClient
  1154. } else {
  1155. pc = promCli
  1156. }
  1157. costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
  1158. metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
  1159. a := &Accesses{
  1160. httpServices: services.NewCostModelServices(),
  1161. PrometheusClient: promCli,
  1162. ThanosClient: thanosClient,
  1163. KubeClientSet: kubeClientset,
  1164. ClusterCache: k8sCache,
  1165. ClusterMap: clusterMap,
  1166. CloudProvider: cloudProvider,
  1167. ConfigFileManager: confManager,
  1168. ClusterInfoProvider: clusterInfoProvider,
  1169. Model: costModel,
  1170. MetricsEmitter: metricsEmitter,
  1171. AggregateCache: aggregateCache,
  1172. CostDataCache: costDataCache,
  1173. ClusterCostsCache: clusterCostsCache,
  1174. OutOfClusterCache: outOfClusterCache,
  1175. SettingsCache: settingsCache,
  1176. CacheExpiration: cacheExpiration,
  1177. }
  1178. // Use the Accesses instance, itself, as the CostModelAggregator. This is
  1179. // confusing and unconventional, but necessary so that we can swap it
  1180. // out for the ETL-adapted version elsewhere.
  1181. // TODO clean this up once ETL is open-sourced.
  1182. a.AggAPI = a
  1183. // Initialize mechanism for subscribing to settings changes
  1184. a.InitializeSettingsPubSub()
  1185. err = a.CloudProvider.DownloadPricingData()
  1186. if err != nil {
  1187. log.Infof("Failed to download pricing data: " + err.Error())
  1188. }
  1189. // Warm the aggregate cache unless explicitly set to false
  1190. if env.IsCacheWarmingEnabled() {
  1191. log.Infof("Init: AggregateCostModel cache warming enabled")
  1192. a.warmAggregateCostModelCache()
  1193. } else {
  1194. log.Infof("Init: AggregateCostModel cache warming disabled")
  1195. }
  1196. if !env.IsKubecostMetricsPodEnabled() {
  1197. a.MetricsEmitter.Start()
  1198. }
  1199. a.httpServices.RegisterAll(router)
  1200. router.GET("/costDataModel", a.CostDataModel)
  1201. router.GET("/costDataModelRange", a.CostDataModelRange)
  1202. router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
  1203. router.GET("/allocation/compute", a.ComputeAllocationHandler)
  1204. router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
  1205. router.GET("/allNodePricing", a.GetAllNodePricing)
  1206. router.POST("/refreshPricing", a.RefreshPricingData)
  1207. router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  1208. router.GET("/clusterCosts", a.ClusterCosts)
  1209. router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  1210. router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  1211. router.GET("/managementPlatform", a.ManagementPlatform)
  1212. router.GET("/clusterInfo", a.ClusterInfo)
  1213. router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  1214. router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  1215. router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  1216. router.GET("/pricingSourceSummary", a.GetPricingSourceSummary)
  1217. router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  1218. // endpoints migrated from server
  1219. router.GET("/prometheusRecordingRules", a.PrometheusRecordingRules)
  1220. router.GET("/prometheusConfig", a.PrometheusConfig)
  1221. router.GET("/prometheusTargets", a.PrometheusTargets)
  1222. router.GET("/orphanedPods", a.GetOrphanedPods)
  1223. router.GET("/installNamespace", a.GetInstallNamespace)
  1224. router.GET("/installInfo", a.GetInstallInfo)
  1225. router.POST("/serviceKey", a.AddServiceKey)
  1226. router.GET("/helmValues", a.GetHelmValues)
  1227. router.GET("/status", a.Status)
  1228. // prom query proxies
  1229. router.GET("/prometheusQuery", a.PrometheusQuery)
  1230. router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
  1231. router.GET("/thanosQuery", a.ThanosQuery)
  1232. router.GET("/thanosQueryRange", a.ThanosQueryRange)
  1233. // diagnostics
  1234. router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
  1235. router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
  1236. return a
  1237. }
  1238. // InitializeCloudCost Initializes Cloud Cost pipeline and querier and registers endpoints
  1239. func InitializeCloudCost(router *httprouter.Router, providerConfig models.ProviderConfig) {
  1240. log.Debugf("Cloud Cost config path: %s", env.GetCloudCostConfigPath())
  1241. cloudConfigController := cloudconfig.NewMemoryController(providerConfig)
  1242. repo := cloudcost.NewMemoryRepository()
  1243. cloudCostPipelineService := cloudcost.NewPipelineService(repo, cloudConfigController, cloudcost.DefaultIngestorConfiguration())
  1244. repoQuerier := cloudcost.NewRepositoryQuerier(repo)
  1245. cloudCostQueryService := cloudcost.NewQueryService(repoQuerier, repoQuerier)
  1246. router.GET("/cloud/config/export", cloudConfigController.GetExportConfigHandler())
  1247. router.GET("/cloud/config/enable", cloudConfigController.GetEnableConfigHandler())
  1248. router.GET("/cloud/config/disable", cloudConfigController.GetDisableConfigHandler())
  1249. router.GET("/cloud/config/delete", cloudConfigController.GetDeleteConfigHandler())
  1250. router.GET("/cloudCost", cloudCostQueryService.GetCloudCostHandler())
  1251. router.GET("/cloudCost/view/graph", cloudCostQueryService.GetCloudCostViewGraphHandler())
  1252. router.GET("/cloudCost/view/totals", cloudCostQueryService.GetCloudCostViewTotalsHandler())
  1253. router.GET("/cloudCost/view/table", cloudCostQueryService.GetCloudCostViewTableHandler())
  1254. router.GET("/cloudCost/status", cloudCostPipelineService.GetCloudCostStatusHandler())
  1255. router.GET("/cloudCost/rebuild", cloudCostPipelineService.GetCloudCostRebuildHandler())
  1256. router.GET("/cloudCost/repair", cloudCostPipelineService.GetCloudCostRepairHandler())
  1257. }
  1258. func InitializeCustomCost(router *httprouter.Router) *customcost.PipelineService {
  1259. hourlyRepo := customcost.NewMemoryRepository()
  1260. dailyRepo := customcost.NewMemoryRepository()
  1261. ingConfig := customcost.DefaultIngestorConfiguration()
  1262. var err error
  1263. customCostPipelineService, err := customcost.NewPipelineService(hourlyRepo, dailyRepo, ingConfig)
  1264. if err != nil {
  1265. log.Errorf("error instantiating custom cost pipeline service: %v", err)
  1266. return nil
  1267. }
  1268. customCostQuerier := customcost.NewRepositoryQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
  1269. customCostQueryService := customcost.NewQueryService(customCostQuerier)
  1270. router.GET("/customCost/total", customCostQueryService.GetCustomCostTotalHandler())
  1271. router.GET("/customCost/timeseries", customCostQueryService.GetCustomCostTimeseriesHandler())
  1272. return customCostPipelineService
  1273. }
  1274. func writeErrorResponse(w http.ResponseWriter, code int, message string) {
  1275. out := map[string]string{
  1276. "message": message,
  1277. }
  1278. bytes, err := json.Marshal(out)
  1279. if err != nil {
  1280. w.Header().Set("Content-Type", "text/plain")
  1281. w.WriteHeader(500)
  1282. fmt.Fprint(w, "unable to marshall json for error")
  1283. log.Warnf("Failed to marshall JSON for error response: %s", err.Error())
  1284. return
  1285. }
  1286. w.WriteHeader(code)
  1287. fmt.Fprint(w, string(bytes))
  1288. }