router.go 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "os"
  9. "path"
  10. "reflect"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/microcosm-cc/bluemonday"
  17. "github.com/opencost/opencost/core/pkg/opencost"
  18. "github.com/opencost/opencost/core/pkg/util/httputil"
  19. "github.com/opencost/opencost/core/pkg/util/timeutil"
  20. "github.com/opencost/opencost/core/pkg/util/watcher"
  21. "github.com/opencost/opencost/core/pkg/version"
  22. "github.com/opencost/opencost/pkg/cloud/aws"
  23. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  24. "github.com/opencost/opencost/pkg/cloud/gcp"
  25. "github.com/opencost/opencost/pkg/cloud/provider"
  26. "github.com/opencost/opencost/pkg/cloudcost"
  27. "github.com/opencost/opencost/pkg/config"
  28. clustermap "github.com/opencost/opencost/pkg/costmodel/clusters"
  29. "github.com/opencost/opencost/pkg/customcost"
  30. "github.com/opencost/opencost/pkg/kubeconfig"
  31. "github.com/opencost/opencost/pkg/metrics"
  32. "github.com/opencost/opencost/pkg/services"
  33. "github.com/spf13/viper"
  34. v1 "k8s.io/api/core/v1"
  35. "github.com/julienschmidt/httprouter"
  36. "github.com/getsentry/sentry-go"
  37. "github.com/opencost/opencost/core/pkg/clusters"
  38. sysenv "github.com/opencost/opencost/core/pkg/env"
  39. "github.com/opencost/opencost/core/pkg/log"
  40. "github.com/opencost/opencost/core/pkg/util/json"
  41. "github.com/opencost/opencost/pkg/cloud/azure"
  42. "github.com/opencost/opencost/pkg/cloud/models"
  43. "github.com/opencost/opencost/pkg/cloud/utils"
  44. "github.com/opencost/opencost/pkg/clustercache"
  45. "github.com/opencost/opencost/pkg/env"
  46. "github.com/opencost/opencost/pkg/errors"
  47. "github.com/opencost/opencost/pkg/prom"
  48. "github.com/opencost/opencost/pkg/thanos"
  49. prometheus "github.com/prometheus/client_golang/api"
  50. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  51. appsv1 "k8s.io/api/apps/v1"
  52. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  53. "github.com/patrickmn/go-cache"
  54. "k8s.io/client-go/kubernetes"
  55. )
  56. var sanitizePolicy = bluemonday.UGCPolicy()
  57. const (
  58. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  59. maxCacheMinutes1d = 11
  60. maxCacheMinutes2d = 17
  61. maxCacheMinutes7d = 37
  62. maxCacheMinutes30d = 137
  63. CustomPricingSetting = "CustomPricing"
  64. DiscountSetting = "Discount"
  65. epRules = apiPrefix + "/rules"
  66. LogSeparator = "+-------------------------------------------------------------------------------------"
  67. )
  68. var (
  69. // gitCommit is set by the build system
  70. gitCommit string
  71. // ANSIRegex matches ANSI escape and colors https://en.wikipedia.org/wiki/ANSI_escape_code
  72. ANSIRegex = regexp.MustCompile("\x1b\\[[0-9;]*m")
  73. )
  74. // Accesses defines a singleton application instance, providing access to
  75. // Prometheus, Kubernetes, the cloud provider, and caches.
  76. type Accesses struct {
  77. Router *httprouter.Router
  78. PrometheusClient prometheus.Client
  79. ThanosClient prometheus.Client
  80. KubeClientSet kubernetes.Interface
  81. ClusterCache clustercache.ClusterCache
  82. ClusterMap clusters.ClusterMap
  83. CloudProvider models.Provider
  84. ConfigFileManager *config.ConfigFileManager
  85. CloudConfigController *cloudconfig.Controller
  86. CloudCostPipelineService *cloudcost.PipelineService
  87. CloudCostQueryService *cloudcost.QueryService
  88. CustomCostQueryService *customcost.QueryService
  89. CustomCostPipelineService *customcost.PipelineService
  90. ClusterInfoProvider clusters.ClusterInfoProvider
  91. Model *CostModel
  92. MetricsEmitter *CostModelMetricsEmitter
  93. OutOfClusterCache *cache.Cache
  94. AggregateCache *cache.Cache
  95. CostDataCache *cache.Cache
  96. ClusterCostsCache *cache.Cache
  97. CacheExpiration map[time.Duration]time.Duration
  98. AggAPI Aggregator
  99. // SettingsCache stores current state of app settings
  100. SettingsCache *cache.Cache
  101. // settingsSubscribers tracks channels through which changes to different
  102. // settings will be published in a pub/sub model
  103. settingsSubscribers map[string][]chan string
  104. settingsMutex sync.Mutex
  105. // registered http service instances
  106. httpServices services.HTTPServices
  107. }
  108. // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
  109. // should be used.
  110. func (a *Accesses) GetPrometheusClient(remote bool) prometheus.Client {
  111. // Use Thanos Client if it exists (enabled) and remote flag set
  112. var pc prometheus.Client
  113. if remote && a.ThanosClient != nil {
  114. pc = a.ThanosClient
  115. } else {
  116. pc = a.PrometheusClient
  117. }
  118. return pc
  119. }
  120. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  121. // If one does not exists, it returns the default cache expiration, which is defined by
  122. // the particular cache.
  123. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  124. if expiration, ok := a.CacheExpiration[dur]; ok {
  125. return expiration
  126. }
  127. return cache.DefaultExpiration
  128. }
  129. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  130. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  131. // not found or is less than 2 minutes.
  132. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  133. expiry := a.GetCacheExpiration(dur).Minutes()
  134. if expiry <= 2.0 {
  135. return time.Minute
  136. }
  137. mins := time.Duration(expiry/2.0) * time.Minute
  138. return mins
  139. }
  140. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  141. w.Header().Set("Content-Type", "application/json")
  142. duration := 24 * time.Hour
  143. offset := time.Minute
  144. durationHrs := "24h"
  145. fmtOffset := "1m"
  146. pClient := a.GetPrometheusClient(true)
  147. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  148. if data, valid := a.ClusterCostsCache.Get(key); valid {
  149. clusterCosts := data.(map[string]*ClusterCosts)
  150. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  151. } else {
  152. data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
  153. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  154. }
  155. }
  156. type Response struct {
  157. Code int `json:"code"`
  158. Status string `json:"status"`
  159. Data interface{} `json:"data"`
  160. Message string `json:"message,omitempty"`
  161. Warning string `json:"warning,omitempty"`
  162. }
  163. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  164. type FilterFunc func(*CostData) (bool, string)
  165. // FilterCostData allows through only CostData that matches all the given filter functions
  166. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  167. result := make(map[string]*CostData)
  168. filteredEnvironments := make(map[string]int)
  169. filteredContainers := 0
  170. DataLoop:
  171. for key, datum := range data {
  172. for _, rf := range retains {
  173. if ok, _ := rf(datum); ok {
  174. result[key] = datum
  175. // if any retain function passes, the data is retained and move on
  176. continue DataLoop
  177. }
  178. }
  179. for _, ff := range filters {
  180. if ok, environment := ff(datum); !ok {
  181. if environment != "" {
  182. filteredEnvironments[environment]++
  183. }
  184. filteredContainers++
  185. // if any filter function check fails, move on to the next datum
  186. continue DataLoop
  187. }
  188. }
  189. result[key] = datum
  190. }
  191. return result, filteredContainers, filteredEnvironments
  192. }
  193. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  194. fs := strings.Split(fields, ",")
  195. fmap := make(map[string]bool)
  196. for _, f := range fs {
  197. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  198. log.Debugf("to delete: %s", fieldNameLower)
  199. fmap[fieldNameLower] = true
  200. }
  201. filteredData := make(map[string]CostData)
  202. for cname, costdata := range data {
  203. s := reflect.TypeOf(*costdata)
  204. val := reflect.ValueOf(*costdata)
  205. costdata2 := CostData{}
  206. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  207. n := s.NumField()
  208. for i := 0; i < n; i++ {
  209. field := s.Field(i)
  210. value := val.Field(i)
  211. value2 := cd2.Field(i)
  212. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  213. value2.Set(reflect.Value(value))
  214. }
  215. }
  216. filteredData[cname] = cd2.Interface().(CostData)
  217. }
  218. return filteredData
  219. }
  220. func normalizeTimeParam(param string) (string, error) {
  221. if param == "" {
  222. return "", fmt.Errorf("invalid time param")
  223. }
  224. // convert days to hours
  225. if param[len(param)-1:] == "d" {
  226. count := param[:len(param)-1]
  227. val, err := strconv.ParseInt(count, 10, 64)
  228. if err != nil {
  229. return "", err
  230. }
  231. val = val * 24
  232. param = fmt.Sprintf("%dh", val)
  233. }
  234. return param, nil
  235. }
  236. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  237. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  238. // return 0.0.
  239. func ParsePercentString(percentStr string) (float64, error) {
  240. if len(percentStr) == 0 {
  241. return 0.0, nil
  242. }
  243. if percentStr[len(percentStr)-1:] == "%" {
  244. percentStr = percentStr[:len(percentStr)-1]
  245. }
  246. discount, err := strconv.ParseFloat(percentStr, 64)
  247. if err != nil {
  248. return 0.0, err
  249. }
  250. discount *= 0.01
  251. return discount, nil
  252. }
  253. func WrapData(data interface{}, err error) []byte {
  254. var resp []byte
  255. if err != nil {
  256. log.Errorf("Error returned to client: %s", err.Error())
  257. resp, _ = json.Marshal(&Response{
  258. Code: http.StatusInternalServerError,
  259. Status: "error",
  260. Message: err.Error(),
  261. Data: data,
  262. })
  263. } else {
  264. resp, err = json.Marshal(&Response{
  265. Code: http.StatusOK,
  266. Status: "success",
  267. Data: data,
  268. })
  269. if err != nil {
  270. log.Errorf("error marshaling response json: %s", err.Error())
  271. }
  272. }
  273. return resp
  274. }
  275. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  276. var resp []byte
  277. if err != nil {
  278. log.Errorf("Error returned to client: %s", err.Error())
  279. resp, _ = json.Marshal(&Response{
  280. Code: http.StatusInternalServerError,
  281. Status: "error",
  282. Message: err.Error(),
  283. Data: data,
  284. })
  285. } else {
  286. resp, _ = json.Marshal(&Response{
  287. Code: http.StatusOK,
  288. Status: "success",
  289. Data: data,
  290. Message: message,
  291. })
  292. }
  293. return resp
  294. }
  295. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  296. var resp []byte
  297. if err != nil {
  298. log.Errorf("Error returned to client: %s", err.Error())
  299. resp, _ = json.Marshal(&Response{
  300. Code: http.StatusInternalServerError,
  301. Status: "error",
  302. Message: err.Error(),
  303. Warning: warning,
  304. Data: data,
  305. })
  306. } else {
  307. resp, _ = json.Marshal(&Response{
  308. Code: http.StatusOK,
  309. Status: "success",
  310. Data: data,
  311. Warning: warning,
  312. })
  313. }
  314. return resp
  315. }
  316. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  317. var resp []byte
  318. if err != nil {
  319. log.Errorf("Error returned to client: %s", err.Error())
  320. resp, _ = json.Marshal(&Response{
  321. Code: http.StatusInternalServerError,
  322. Status: "error",
  323. Message: err.Error(),
  324. Warning: warning,
  325. Data: data,
  326. })
  327. } else {
  328. resp, _ = json.Marshal(&Response{
  329. Code: http.StatusOK,
  330. Status: "success",
  331. Data: data,
  332. Message: message,
  333. Warning: warning,
  334. })
  335. }
  336. return resp
  337. }
  338. // wrapAsObjectItems wraps a slice of items into an object containing a single items list
  339. // allows our k8s proxy methods to emulate a List() request to k8s API
  340. func wrapAsObjectItems(items interface{}) map[string]interface{} {
  341. return map[string]interface{}{
  342. "items": items,
  343. }
  344. }
  345. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  346. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  347. w.Header().Set("Content-Type", "application/json")
  348. w.Header().Set("Access-Control-Allow-Origin", "*")
  349. err := a.CloudProvider.DownloadPricingData()
  350. if err != nil {
  351. log.Errorf("Error refreshing pricing data: %s", err.Error())
  352. }
  353. w.Write(WrapData(nil, err))
  354. }
  355. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  356. w.Header().Set("Content-Type", "application/json")
  357. w.Header().Set("Access-Control-Allow-Origin", "*")
  358. window := r.URL.Query().Get("timeWindow")
  359. offset := r.URL.Query().Get("offset")
  360. fields := r.URL.Query().Get("filterFields")
  361. namespace := r.URL.Query().Get("namespace")
  362. if offset != "" {
  363. offset = "offset " + offset
  364. }
  365. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
  366. if fields != "" {
  367. filteredData := filterFields(fields, data)
  368. w.Write(WrapData(filteredData, err))
  369. } else {
  370. w.Write(WrapData(data, err))
  371. }
  372. }
  373. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  374. w.Header().Set("Content-Type", "application/json")
  375. w.Header().Set("Access-Control-Allow-Origin", "*")
  376. window := r.URL.Query().Get("window")
  377. offset := r.URL.Query().Get("offset")
  378. if window == "" {
  379. w.Write(WrapData(nil, fmt.Errorf("missing window argument")))
  380. return
  381. }
  382. windowDur, err := timeutil.ParseDuration(window)
  383. if err != nil {
  384. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  385. return
  386. }
  387. // offset is not a required parameter
  388. var offsetDur time.Duration
  389. if offset != "" {
  390. offsetDur, err = timeutil.ParseDuration(offset)
  391. if err != nil {
  392. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  393. return
  394. }
  395. }
  396. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  397. if useThanos && !thanos.IsEnabled() {
  398. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  399. return
  400. }
  401. var client prometheus.Client
  402. if useThanos {
  403. client = a.ThanosClient
  404. offsetDur = thanos.OffsetDuration()
  405. } else {
  406. client = a.PrometheusClient
  407. }
  408. data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
  409. w.Write(WrapData(data, err))
  410. }
  411. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  412. w.Header().Set("Content-Type", "application/json")
  413. w.Header().Set("Access-Control-Allow-Origin", "*")
  414. start := r.URL.Query().Get("start")
  415. end := r.URL.Query().Get("end")
  416. window := r.URL.Query().Get("window")
  417. offset := r.URL.Query().Get("offset")
  418. if window == "" {
  419. w.Write(WrapData(nil, fmt.Errorf("missing window argument")))
  420. return
  421. }
  422. windowDur, err := timeutil.ParseDuration(window)
  423. if err != nil {
  424. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  425. return
  426. }
  427. // offset is not a required parameter
  428. var offsetDur time.Duration
  429. if offset != "" {
  430. offsetDur, err = timeutil.ParseDuration(offset)
  431. if err != nil {
  432. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  433. return
  434. }
  435. }
  436. data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
  437. w.Write(WrapData(data, err))
  438. }
  439. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  440. w.Header().Set("Content-Type", "application/json")
  441. w.Header().Set("Access-Control-Allow-Origin", "*")
  442. startStr := r.URL.Query().Get("start")
  443. endStr := r.URL.Query().Get("end")
  444. windowStr := r.URL.Query().Get("window")
  445. fields := r.URL.Query().Get("filterFields")
  446. namespace := r.URL.Query().Get("namespace")
  447. cluster := r.URL.Query().Get("cluster")
  448. remote := r.URL.Query().Get("remote")
  449. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  450. layout := "2006-01-02T15:04:05.000Z"
  451. start, err := time.Parse(layout, startStr)
  452. if err != nil {
  453. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
  454. return
  455. }
  456. end, err := time.Parse(layout, endStr)
  457. if err != nil {
  458. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
  459. return
  460. }
  461. window := opencost.NewWindow(&start, &end)
  462. if window.IsOpen() || !window.HasDuration() || window.IsNegative() {
  463. w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
  464. return
  465. }
  466. resolution := time.Hour
  467. if resDur, err := time.ParseDuration(windowStr); err == nil {
  468. resolution = resDur
  469. }
  470. // Use Thanos Client if it exists (enabled) and remote flag set
  471. var pClient prometheus.Client
  472. if remote != "false" && a.ThanosClient != nil {
  473. pClient = a.ThanosClient
  474. } else {
  475. pClient = a.PrometheusClient
  476. }
  477. data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
  478. if err != nil {
  479. w.Write(WrapData(nil, err))
  480. }
  481. if fields != "" {
  482. filteredData := filterFields(fields, data)
  483. w.Write(WrapData(filteredData, err))
  484. } else {
  485. w.Write(WrapData(data, err))
  486. }
  487. }
  488. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  489. var key string
  490. var filter string
  491. var val []string
  492. if customAggregation != "" {
  493. key = customAggregation
  494. filter = filterType
  495. val = strings.Split(customAggregation, ",")
  496. } else {
  497. aggregations := strings.Split(aggregator, ",")
  498. for i, agg := range aggregations {
  499. aggregations[i] = "kubernetes_" + agg
  500. }
  501. key = strings.Join(aggregations, ",")
  502. filter = "kubernetes_" + filterType
  503. val = aggregations
  504. }
  505. return key, val, filter
  506. }
  507. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  508. w.Header().Set("Content-Type", "application/json")
  509. w.Header().Set("Access-Control-Allow-Origin", "*")
  510. data, err := a.CloudProvider.AllNodePricing()
  511. w.Write(WrapData(data, err))
  512. }
  513. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  514. w.Header().Set("Content-Type", "application/json")
  515. w.Header().Set("Access-Control-Allow-Origin", "*")
  516. data, err := a.CloudProvider.GetConfig()
  517. w.Write(WrapData(data, err))
  518. }
  519. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  520. w.Header().Set("Content-Type", "application/json")
  521. w.Header().Set("Access-Control-Allow-Origin", "*")
  522. data, err := a.CloudProvider.UpdateConfig(r.Body, aws.SpotInfoUpdateType)
  523. if err != nil {
  524. w.Write(WrapData(data, err))
  525. return
  526. }
  527. w.Write(WrapData(data, err))
  528. err = a.CloudProvider.DownloadPricingData()
  529. if err != nil {
  530. log.Errorf("Error redownloading data on config update: %s", err.Error())
  531. }
  532. return
  533. }
  534. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  535. w.Header().Set("Content-Type", "application/json")
  536. w.Header().Set("Access-Control-Allow-Origin", "*")
  537. data, err := a.CloudProvider.UpdateConfig(r.Body, aws.AthenaInfoUpdateType)
  538. if err != nil {
  539. w.Write(WrapData(data, err))
  540. return
  541. }
  542. w.Write(WrapData(data, err))
  543. return
  544. }
  545. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  546. w.Header().Set("Content-Type", "application/json")
  547. w.Header().Set("Access-Control-Allow-Origin", "*")
  548. data, err := a.CloudProvider.UpdateConfig(r.Body, gcp.BigqueryUpdateType)
  549. if err != nil {
  550. w.Write(WrapData(data, err))
  551. return
  552. }
  553. w.Write(WrapData(data, err))
  554. return
  555. }
  556. func (a *Accesses) UpdateAzureStorageConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  557. w.Header().Set("Content-Type", "application/json")
  558. w.Header().Set("Access-Control-Allow-Origin", "*")
  559. data, err := a.CloudProvider.UpdateConfig(r.Body, azure.AzureStorageUpdateType)
  560. if err != nil {
  561. w.Write(WrapData(data, err))
  562. return
  563. }
  564. w.Write(WrapData(data, err))
  565. return
  566. }
  567. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  568. w.Header().Set("Content-Type", "application/json")
  569. w.Header().Set("Access-Control-Allow-Origin", "*")
  570. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  571. if err != nil {
  572. w.Write(WrapData(data, err))
  573. return
  574. }
  575. w.Write(WrapData(data, err))
  576. return
  577. }
  578. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  579. w.Header().Set("Content-Type", "application/json")
  580. w.Header().Set("Access-Control-Allow-Origin", "*")
  581. data, err := a.CloudProvider.GetManagementPlatform()
  582. if err != nil {
  583. w.Write(WrapData(data, err))
  584. return
  585. }
  586. w.Write(WrapData(data, err))
  587. return
  588. }
  589. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  590. w.Header().Set("Content-Type", "application/json")
  591. w.Header().Set("Access-Control-Allow-Origin", "*")
  592. data := a.ClusterInfoProvider.GetClusterInfo()
  593. w.Write(WrapData(data, nil))
  594. }
  595. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  596. w.Header().Set("Content-Type", "application/json")
  597. w.Header().Set("Access-Control-Allow-Origin", "*")
  598. data := a.ClusterMap.AsMap()
  599. w.Write(WrapData(data, nil))
  600. }
  601. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  602. w.Header().Set("Content-Type", "application/json")
  603. w.Header().Set("Access-Control-Allow-Origin", "*")
  604. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  605. }
  606. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  607. w.Header().Set("Content-Type", "application/json")
  608. w.Header().Set("Access-Control-Allow-Origin", "*")
  609. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  610. }
  611. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  612. w.Header().Set("Content-Type", "application/json")
  613. w.Header().Set("Access-Control-Allow-Origin", "*")
  614. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  615. }
  616. func (a *Accesses) GetPricingSourceSummary(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
  617. w.Header().Set("Content-Type", "application/json")
  618. w.Header().Set("Access-Control-Allow-Origin", "*")
  619. data := a.CloudProvider.PricingSourceSummary()
  620. w.Write(WrapData(data, nil))
  621. }
  622. func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  623. w.Header().Set("Content-Type", "application/json")
  624. w.Header().Set("Access-Control-Allow-Origin", "*")
  625. w.Write(WrapData(prom.Validate(a.PrometheusClient)))
  626. }
  627. func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  628. w.Header().Set("Content-Type", "application/json")
  629. w.Header().Set("Access-Control-Allow-Origin", "*")
  630. qp := httputil.NewQueryParams(r.URL.Query())
  631. query := qp.Get("query", "")
  632. if query == "" {
  633. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  634. return
  635. }
  636. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  637. var timeVal time.Time
  638. timeStr := qp.Get("time", "")
  639. if len(timeStr) > 0 {
  640. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  641. timeVal = time.Unix(t, 0)
  642. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  643. timeVal = t
  644. }
  645. // If time is given, but not parse-able, return an error
  646. if timeVal.IsZero() {
  647. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  648. }
  649. }
  650. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  651. body, err := ctx.RawQuery(query, timeVal)
  652. if err != nil {
  653. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  654. return
  655. }
  656. w.Write(body)
  657. }
  658. func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  659. w.Header().Set("Content-Type", "application/json")
  660. w.Header().Set("Access-Control-Allow-Origin", "*")
  661. qp := httputil.NewQueryParams(r.URL.Query())
  662. query := qp.Get("query", "")
  663. if query == "" {
  664. fmt.Fprintf(w, "Error parsing query from request parameters.")
  665. return
  666. }
  667. start, end, duration, err := toStartEndStep(qp)
  668. if err != nil {
  669. fmt.Fprintf(w, err.Error())
  670. return
  671. }
  672. ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
  673. body, err := ctx.RawQueryRange(query, start, end, duration)
  674. if err != nil {
  675. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  676. return
  677. }
  678. w.Write(body)
  679. }
  680. func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  681. w.Header().Set("Content-Type", "application/json")
  682. w.Header().Set("Access-Control-Allow-Origin", "*")
  683. if !thanos.IsEnabled() {
  684. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  685. return
  686. }
  687. qp := httputil.NewQueryParams(r.URL.Query())
  688. query := qp.Get("query", "")
  689. if query == "" {
  690. w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
  691. return
  692. }
  693. // Attempt to parse time as either a unix timestamp or as an RFC3339 value
  694. var timeVal time.Time
  695. timeStr := qp.Get("time", "")
  696. if len(timeStr) > 0 {
  697. if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
  698. timeVal = time.Unix(t, 0)
  699. } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
  700. timeVal = t
  701. }
  702. // If time is given, but not parse-able, return an error
  703. if timeVal.IsZero() {
  704. http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
  705. }
  706. }
  707. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  708. body, err := ctx.RawQuery(query, timeVal)
  709. if err != nil {
  710. w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
  711. return
  712. }
  713. w.Write(body)
  714. }
  715. func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  716. w.Header().Set("Content-Type", "application/json")
  717. w.Header().Set("Access-Control-Allow-Origin", "*")
  718. if !thanos.IsEnabled() {
  719. w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
  720. return
  721. }
  722. qp := httputil.NewQueryParams(r.URL.Query())
  723. query := qp.Get("query", "")
  724. if query == "" {
  725. fmt.Fprintf(w, "Error parsing query from request parameters.")
  726. return
  727. }
  728. start, end, duration, err := toStartEndStep(qp)
  729. if err != nil {
  730. fmt.Fprintf(w, err.Error())
  731. return
  732. }
  733. ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
  734. body, err := ctx.RawQueryRange(query, start, end, duration)
  735. if err != nil {
  736. fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
  737. return
  738. }
  739. w.Write(body)
  740. }
  741. // helper for query range proxy requests
  742. func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
  743. var e error
  744. ss := qp.Get("start", "")
  745. es := qp.Get("end", "")
  746. ds := qp.Get("duration", "")
  747. layout := "2006-01-02T15:04:05.000Z"
  748. start, e = time.Parse(layout, ss)
  749. if e != nil {
  750. err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
  751. return
  752. }
  753. end, e = time.Parse(layout, es)
  754. if e != nil {
  755. err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
  756. return
  757. }
  758. step, e = time.ParseDuration(ds)
  759. if e != nil {
  760. err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
  761. return
  762. }
  763. err = nil
  764. return
  765. }
  766. func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  767. w.Header().Set("Content-Type", "application/json")
  768. w.Header().Set("Access-Control-Allow-Origin", "*")
  769. promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
  770. if err != nil {
  771. w.Write(WrapData(nil, err))
  772. return
  773. }
  774. result := map[string]*prom.PrometheusQueueState{
  775. "prometheus": promQueueState,
  776. }
  777. if thanos.IsEnabled() {
  778. thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
  779. if err != nil {
  780. log.Warnf("Error getting Thanos queue state: %s", err)
  781. } else {
  782. result["thanos"] = thanosQueueState
  783. }
  784. }
  785. w.Write(WrapData(result, nil))
  786. }
  787. // GetPrometheusMetrics retrieves availability of Prometheus and Thanos metrics
  788. func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  789. w.Header().Set("Content-Type", "application/json")
  790. w.Header().Set("Access-Control-Allow-Origin", "*")
  791. promMetrics := prom.GetPrometheusMetrics(a.PrometheusClient, "")
  792. result := map[string][]*prom.PrometheusDiagnostic{
  793. "prometheus": promMetrics,
  794. }
  795. if thanos.IsEnabled() {
  796. thanosMetrics := prom.GetPrometheusMetrics(a.ThanosClient, thanos.QueryOffset())
  797. result["thanos"] = thanosMetrics
  798. }
  799. w.Write(WrapData(result, nil))
  800. }
  801. func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  802. w.Header().Set("Content-Type", "application/json")
  803. w.Header().Set("Access-Control-Allow-Origin", "*")
  804. pvList := a.ClusterCache.GetAllPersistentVolumes()
  805. body, err := json.Marshal(wrapAsObjectItems(pvList))
  806. if err != nil {
  807. fmt.Fprintf(w, "Error decoding persistent volumes: "+err.Error())
  808. } else {
  809. w.Write(body)
  810. }
  811. }
  812. func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  813. w.Header().Set("Content-Type", "application/json")
  814. w.Header().Set("Access-Control-Allow-Origin", "*")
  815. qp := httputil.NewQueryParams(r.URL.Query())
  816. namespace := qp.Get("namespace", "")
  817. deploymentsList := a.ClusterCache.GetAllDeployments()
  818. // filter for provided namespace
  819. var deployments []*appsv1.Deployment
  820. if namespace == "" {
  821. deployments = deploymentsList
  822. } else {
  823. deployments = []*appsv1.Deployment{}
  824. for _, d := range deploymentsList {
  825. if d.Namespace == namespace {
  826. deployments = append(deployments, d)
  827. }
  828. }
  829. }
  830. body, err := json.Marshal(wrapAsObjectItems(deployments))
  831. if err != nil {
  832. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  833. } else {
  834. w.Write(body)
  835. }
  836. }
  837. func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  838. w.Header().Set("Content-Type", "application/json")
  839. w.Header().Set("Access-Control-Allow-Origin", "*")
  840. scList := a.ClusterCache.GetAllStorageClasses()
  841. body, err := json.Marshal(wrapAsObjectItems(scList))
  842. if err != nil {
  843. fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
  844. } else {
  845. w.Write(body)
  846. }
  847. }
  848. func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  849. w.Header().Set("Content-Type", "application/json")
  850. w.Header().Set("Access-Control-Allow-Origin", "*")
  851. qp := httputil.NewQueryParams(r.URL.Query())
  852. namespace := qp.Get("namespace", "")
  853. statefulSetsList := a.ClusterCache.GetAllStatefulSets()
  854. // filter for provided namespace
  855. var statefulSets []*appsv1.StatefulSet
  856. if namespace == "" {
  857. statefulSets = statefulSetsList
  858. } else {
  859. statefulSets = []*appsv1.StatefulSet{}
  860. for _, ss := range statefulSetsList {
  861. if ss.Namespace == namespace {
  862. statefulSets = append(statefulSets, ss)
  863. }
  864. }
  865. }
  866. body, err := json.Marshal(wrapAsObjectItems(statefulSets))
  867. if err != nil {
  868. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  869. } else {
  870. w.Write(body)
  871. }
  872. }
  873. func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  874. w.Header().Set("Content-Type", "application/json")
  875. w.Header().Set("Access-Control-Allow-Origin", "*")
  876. nodeList := a.ClusterCache.GetAllNodes()
  877. body, err := json.Marshal(wrapAsObjectItems(nodeList))
  878. if err != nil {
  879. fmt.Fprintf(w, "Error decoding nodes: "+err.Error())
  880. } else {
  881. w.Write(body)
  882. }
  883. }
  884. func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  885. w.Header().Set("Content-Type", "application/json")
  886. w.Header().Set("Access-Control-Allow-Origin", "*")
  887. podlist := a.ClusterCache.GetAllPods()
  888. body, err := json.Marshal(wrapAsObjectItems(podlist))
  889. if err != nil {
  890. fmt.Fprintf(w, "Error decoding pods: "+err.Error())
  891. } else {
  892. w.Write(body)
  893. }
  894. }
  895. func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  896. w.Header().Set("Content-Type", "application/json")
  897. w.Header().Set("Access-Control-Allow-Origin", "*")
  898. namespaces := a.ClusterCache.GetAllNamespaces()
  899. body, err := json.Marshal(wrapAsObjectItems(namespaces))
  900. if err != nil {
  901. fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
  902. } else {
  903. w.Write(body)
  904. }
  905. }
  906. func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  907. w.Header().Set("Content-Type", "application/json")
  908. w.Header().Set("Access-Control-Allow-Origin", "*")
  909. daemonSets := a.ClusterCache.GetAllDaemonSets()
  910. body, err := json.Marshal(wrapAsObjectItems(daemonSets))
  911. if err != nil {
  912. fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
  913. } else {
  914. w.Write(body)
  915. }
  916. }
  917. func (a *Accesses) GetPod(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  918. w.Header().Set("Content-Type", "application/json")
  919. w.Header().Set("Access-Control-Allow-Origin", "*")
  920. podName := ps.ByName("name")
  921. podNamespace := ps.ByName("namespace")
  922. // TODO: ClusterCache API could probably afford to have some better filtering
  923. allPods := a.ClusterCache.GetAllPods()
  924. for _, pod := range allPods {
  925. for _, container := range pod.Spec.Containers {
  926. container.Env = make([]v1.EnvVar, 0)
  927. }
  928. if pod.Namespace == podNamespace && pod.Name == podName {
  929. body, err := json.Marshal(pod)
  930. if err != nil {
  931. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  932. } else {
  933. w.Write(body)
  934. }
  935. return
  936. }
  937. }
  938. fmt.Fprintf(w, "Pod not found\n")
  939. }
  940. func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  941. w.Header().Set("Content-Type", "application/json")
  942. w.Header().Set("Access-Control-Allow-Origin", "*")
  943. u := a.PrometheusClient.URL(epRules, nil)
  944. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  945. if err != nil {
  946. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  947. }
  948. _, body, err := a.PrometheusClient.Do(r.Context(), req)
  949. if err != nil {
  950. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  951. } else {
  952. w.Write(body)
  953. }
  954. }
  955. func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  956. w.Header().Set("Content-Type", "application/json")
  957. w.Header().Set("Access-Control-Allow-Origin", "*")
  958. pConfig := map[string]string{
  959. "address": env.GetPrometheusServerEndpoint(),
  960. }
  961. body, err := json.Marshal(pConfig)
  962. if err != nil {
  963. fmt.Fprintf(w, "Error marshalling prometheus config")
  964. } else {
  965. w.Write(body)
  966. }
  967. }
  968. func (a *Accesses) PrometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  969. w.Header().Set("Content-Type", "application/json")
  970. w.Header().Set("Access-Control-Allow-Origin", "*")
  971. u := a.PrometheusClient.URL(epTargets, nil)
  972. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  973. if err != nil {
  974. fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
  975. }
  976. _, body, err := a.PrometheusClient.Do(r.Context(), req)
  977. if err != nil {
  978. fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
  979. } else {
  980. w.Write(body)
  981. }
  982. }
  983. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  984. w.Header().Set("Content-Type", "application/json")
  985. w.Header().Set("Access-Control-Allow-Origin", "*")
  986. podlist := a.ClusterCache.GetAllPods()
  987. var lonePods []*v1.Pod
  988. for _, pod := range podlist {
  989. if len(pod.OwnerReferences) == 0 {
  990. lonePods = append(lonePods, pod)
  991. }
  992. }
  993. body, err := json.Marshal(lonePods)
  994. if err != nil {
  995. fmt.Fprintf(w, "Error decoding pod: "+err.Error())
  996. } else {
  997. w.Write(body)
  998. }
  999. }
  1000. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  1001. w.Header().Set("Content-Type", "application/json")
  1002. w.Header().Set("Access-Control-Allow-Origin", "*")
  1003. ns := env.GetKubecostNamespace()
  1004. w.Write([]byte(ns))
  1005. }
  1006. type InstallInfo struct {
  1007. Containers []ContainerInfo `json:"containers"`
  1008. ClusterInfo map[string]string `json:"clusterInfo"`
  1009. Version string `json:"version"`
  1010. }
  1011. type ContainerInfo struct {
  1012. ContainerName string `json:"containerName"`
  1013. Image string `json:"image"`
  1014. StartTime string `json:"startTime"`
  1015. }
  1016. func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  1017. w.Header().Set("Content-Type", "application/json")
  1018. w.Header().Set("Access-Control-Allow-Origin", "*")
  1019. containers, err := GetKubecostContainers(a.KubeClientSet)
  1020. if err != nil {
  1021. writeErrorResponse(w, 500, fmt.Sprintf("Unable to list pods: %s", err.Error()))
  1022. return
  1023. }
  1024. info := InstallInfo{
  1025. Containers: containers,
  1026. ClusterInfo: make(map[string]string),
  1027. Version: version.FriendlyVersion(),
  1028. }
  1029. nodes := a.ClusterCache.GetAllNodes()
  1030. cachePods := a.ClusterCache.GetAllPods()
  1031. info.ClusterInfo["nodeCount"] = strconv.Itoa(len(nodes))
  1032. info.ClusterInfo["podCount"] = strconv.Itoa(len(cachePods))
  1033. body, err := json.Marshal(info)
  1034. if err != nil {
  1035. writeErrorResponse(w, 500, fmt.Sprintf("Error decoding pod: %s", err.Error()))
  1036. return
  1037. }
  1038. w.Write(body)
  1039. }
  1040. func GetKubecostContainers(kubeClientSet kubernetes.Interface) ([]ContainerInfo, error) {
  1041. pods, err := kubeClientSet.CoreV1().Pods(env.GetKubecostNamespace()).List(context.Background(), metav1.ListOptions{
  1042. LabelSelector: "app=cost-analyzer",
  1043. FieldSelector: "status.phase=Running",
  1044. Limit: 1,
  1045. })
  1046. if err != nil {
  1047. return nil, fmt.Errorf("failed to query kubernetes client for kubecost pods: %s", err)
  1048. }
  1049. // If we have zero pods either something is weird with the install since the app selector is not exposed in the helm
  1050. // chart or more likely we are running locally - in either case Images field will return as null
  1051. var containers []ContainerInfo
  1052. if len(pods.Items) > 0 {
  1053. for _, pod := range pods.Items {
  1054. for _, container := range pod.Spec.Containers {
  1055. c := ContainerInfo{
  1056. ContainerName: container.Name,
  1057. Image: container.Image,
  1058. StartTime: pod.Status.StartTime.String(),
  1059. }
  1060. containers = append(containers, c)
  1061. }
  1062. }
  1063. }
  1064. return containers, nil
  1065. }
  1066. // logsFor pulls the logs for a specific pod, namespace, and container
  1067. func logsFor(c kubernetes.Interface, namespace string, pod string, container string, dur time.Duration, ctx context.Context) (string, error) {
  1068. since := time.Now().UTC().Add(-dur)
  1069. logOpts := v1.PodLogOptions{
  1070. SinceTime: &metav1.Time{Time: since},
  1071. }
  1072. if container != "" {
  1073. logOpts.Container = container
  1074. }
  1075. req := c.CoreV1().Pods(namespace).GetLogs(pod, &logOpts)
  1076. reader, err := req.Stream(ctx)
  1077. if err != nil {
  1078. return "", err
  1079. }
  1080. podLogs, err := io.ReadAll(reader)
  1081. if err != nil {
  1082. return "", err
  1083. }
  1084. // If color is already disabled then we don't need to process the logs
  1085. // to drop ANSI colors
  1086. if !viper.GetBool("disable-log-color") {
  1087. podLogs = ANSIRegex.ReplaceAll(podLogs, []byte{})
  1088. }
  1089. return string(podLogs), nil
  1090. }
  1091. func (a *Accesses) GetPodLogs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1092. w.Header().Set("Content-Type", "application/json")
  1093. w.Header().Set("Access-Control-Allow-Origin", "*")
  1094. qp := httputil.NewQueryParams(r.URL.Query())
  1095. ns := qp.Get("namespace", env.GetKubecostNamespace())
  1096. pod := qp.Get("pod", "")
  1097. selector := qp.Get("selector", "")
  1098. container := qp.Get("container", "")
  1099. since := qp.Get("since", "24h")
  1100. sinceDuration, err := time.ParseDuration(since)
  1101. if err != nil {
  1102. fmt.Fprintf(w, "Invalid Duration String: "+err.Error())
  1103. return
  1104. }
  1105. var logResult string
  1106. appendLog := func(ns string, pod string, container string, l string) {
  1107. if l == "" {
  1108. return
  1109. }
  1110. logResult += fmt.Sprintf("%s\n| %s:%s:%s\n%s\n%s\n\n", LogSeparator, ns, pod, container, LogSeparator, l)
  1111. }
  1112. if pod != "" {
  1113. pd, err := a.KubeClientSet.CoreV1().Pods(ns).Get(r.Context(), pod, metav1.GetOptions{})
  1114. if err != nil {
  1115. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1116. return
  1117. }
  1118. if container != "" {
  1119. var foundContainer bool
  1120. for _, cont := range pd.Spec.Containers {
  1121. if strings.EqualFold(cont.Name, container) {
  1122. foundContainer = true
  1123. break
  1124. }
  1125. }
  1126. if !foundContainer {
  1127. fmt.Fprintf(w, "Could not find container: "+container)
  1128. return
  1129. }
  1130. }
  1131. logs, err := logsFor(a.KubeClientSet, ns, pod, container, sinceDuration, r.Context())
  1132. if err != nil {
  1133. fmt.Fprintf(w, "Error Getting Logs: "+err.Error())
  1134. return
  1135. }
  1136. appendLog(ns, pod, container, logs)
  1137. w.Write([]byte(logResult))
  1138. return
  1139. }
  1140. if selector != "" {
  1141. pods, err := a.KubeClientSet.CoreV1().Pods(ns).List(r.Context(), metav1.ListOptions{LabelSelector: selector})
  1142. if err != nil {
  1143. fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
  1144. return
  1145. }
  1146. for _, pd := range pods.Items {
  1147. for _, cont := range pd.Spec.Containers {
  1148. logs, err := logsFor(a.KubeClientSet, ns, pd.Name, cont.Name, sinceDuration, r.Context())
  1149. if err != nil {
  1150. continue
  1151. }
  1152. appendLog(ns, pd.Name, cont.Name, logs)
  1153. }
  1154. }
  1155. }
  1156. w.Write([]byte(logResult))
  1157. }
  1158. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1159. w.Header().Set("Content-Type", "application/json")
  1160. w.Header().Set("Access-Control-Allow-Origin", "*")
  1161. r.ParseForm()
  1162. key := r.PostForm.Get("key")
  1163. k := []byte(key)
  1164. err := os.WriteFile(path.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "key.json"), k, 0644)
  1165. if err != nil {
  1166. fmt.Fprintf(w, "Error writing service key: "+err.Error())
  1167. }
  1168. w.WriteHeader(http.StatusOK)
  1169. }
  1170. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1171. w.Header().Set("Content-Type", "application/json")
  1172. w.Header().Set("Access-Control-Allow-Origin", "*")
  1173. encodedValues := sysenv.Get("HELM_VALUES", "")
  1174. if encodedValues == "" {
  1175. fmt.Fprintf(w, "Values reporting disabled")
  1176. return
  1177. }
  1178. result, err := base64.StdEncoding.DecodeString(encodedValues)
  1179. if err != nil {
  1180. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  1181. return
  1182. }
  1183. w.Write(result)
  1184. }
  1185. func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  1186. w.Header().Set("Content-Type", "application/json")
  1187. w.Header().Set("Access-Control-Allow-Origin", "*")
  1188. promServer := env.GetPrometheusServerEndpoint()
  1189. api := prometheusAPI.NewAPI(a.PrometheusClient)
  1190. result, err := api.Buildinfo(r.Context())
  1191. if err != nil {
  1192. fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
  1193. } else {
  1194. fmt.Fprintf(w, "Using Prometheus at "+promServer+". Version: "+result.Version)
  1195. }
  1196. }
  1197. type LogLevelRequestResponse struct {
  1198. Level string `json:"level"`
  1199. }
  1200. func (a *Accesses) GetLogLevel(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  1201. w.Header().Set("Content-Type", "application/json")
  1202. w.Header().Set("Access-Control-Allow-Origin", "*")
  1203. level := log.GetLogLevel()
  1204. llrr := LogLevelRequestResponse{
  1205. Level: level,
  1206. }
  1207. body, err := json.Marshal(llrr)
  1208. if err != nil {
  1209. http.Error(w, fmt.Sprintf("unable to retrive log level"), http.StatusInternalServerError)
  1210. return
  1211. }
  1212. _, err = w.Write(body)
  1213. if err != nil {
  1214. http.Error(w, fmt.Sprintf("unable to write response: %s", body), http.StatusInternalServerError)
  1215. return
  1216. }
  1217. }
  1218. func (a *Accesses) SetLogLevel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1219. params := LogLevelRequestResponse{}
  1220. err := json.NewDecoder(r.Body).Decode(&params)
  1221. if err != nil {
  1222. http.Error(w, fmt.Sprintf("unable to decode request body, error: %s", err), http.StatusBadRequest)
  1223. return
  1224. }
  1225. err = log.SetLogLevel(params.Level)
  1226. if err != nil {
  1227. http.Error(w, fmt.Sprintf("level must be a valid log level according to zerolog; level given: %s, error: %s", params.Level, err), http.StatusBadRequest)
  1228. return
  1229. }
  1230. w.WriteHeader(http.StatusOK)
  1231. }
  1232. // captures the panic event in sentry
  1233. func capturePanicEvent(err string, stack string) {
  1234. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  1235. log.Infof(msg)
  1236. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  1237. Level: sentry.LevelError,
  1238. Message: msg,
  1239. })
  1240. sentry.Flush(5 * time.Second)
  1241. }
  1242. // handle any panics reported by the errors package
  1243. func handlePanic(p errors.Panic) bool {
  1244. err := p.Error
  1245. if err != nil {
  1246. if err, ok := err.(error); ok {
  1247. capturePanicEvent(err.Error(), p.Stack)
  1248. }
  1249. if err, ok := err.(string); ok {
  1250. capturePanicEvent(err, p.Stack)
  1251. }
  1252. }
  1253. // Return true to recover iff the type is http, otherwise allow kubernetes
  1254. // to recover.
  1255. return p.Type == errors.PanicTypeHTTP
  1256. }
  1257. func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  1258. configWatchers := watcher.NewConfigMapWatchers(additionalConfigWatchers...)
  1259. var err error
  1260. if errorReportingEnabled {
  1261. err = sentry.Init(sentry.ClientOptions{Release: version.FriendlyVersion()})
  1262. if err != nil {
  1263. log.Infof("Failed to initialize sentry for error reporting")
  1264. } else {
  1265. err = errors.SetPanicHandler(handlePanic)
  1266. if err != nil {
  1267. log.Infof("Failed to set panic handler: %s", err)
  1268. }
  1269. }
  1270. }
  1271. address := env.GetPrometheusServerEndpoint()
  1272. if address == "" {
  1273. log.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  1274. }
  1275. queryConcurrency := env.GetMaxQueryConcurrency()
  1276. log.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  1277. timeout := 120 * time.Second
  1278. keepAlive := 120 * time.Second
  1279. tlsHandshakeTimeout := 10 * time.Second
  1280. scrapeInterval := env.GetKubecostScrapeInterval()
  1281. var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
  1282. if env.IsPrometheusRetryOnRateLimitResponse() {
  1283. rateLimitRetryOpts = &prom.RateLimitRetryOpts{
  1284. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  1285. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  1286. }
  1287. }
  1288. promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
  1289. Timeout: timeout,
  1290. KeepAlive: keepAlive,
  1291. TLSHandshakeTimeout: tlsHandshakeTimeout,
  1292. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  1293. RateLimitRetryOpts: rateLimitRetryOpts,
  1294. Auth: &prom.ClientAuth{
  1295. Username: env.GetDBBasicAuthUsername(),
  1296. Password: env.GetDBBasicAuthUserPassword(),
  1297. BearerToken: env.GetDBBearerToken(),
  1298. },
  1299. QueryConcurrency: queryConcurrency,
  1300. QueryLogFile: "",
  1301. HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
  1302. })
  1303. if err != nil {
  1304. log.Fatalf("Failed to create prometheus client, Error: %v", err)
  1305. }
  1306. m, err := prom.Validate(promCli)
  1307. if err != nil || !m.Running {
  1308. if err != nil {
  1309. log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1310. } else if !m.Running {
  1311. log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  1312. }
  1313. } else {
  1314. log.Infof("Success: retrieved the 'up' query against prometheus at: " + address)
  1315. }
  1316. api := prometheusAPI.NewAPI(promCli)
  1317. _, err = api.Buildinfo(context.Background())
  1318. if err != nil {
  1319. log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  1320. } else {
  1321. log.Infof("Retrieved a prometheus config file from: %s", address)
  1322. }
  1323. if scrapeInterval == 0 {
  1324. scrapeInterval = time.Minute
  1325. // Lookup scrape interval for kubecost job, update if found
  1326. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  1327. if err == nil {
  1328. scrapeInterval = si
  1329. }
  1330. }
  1331. log.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  1332. // Kubernetes API setup
  1333. kubeClientset, err := kubeconfig.LoadKubeClient("")
  1334. if err != nil {
  1335. log.Fatalf("Failed to build Kubernetes client: %s", err.Error())
  1336. }
  1337. // Create ConfigFileManager for synchronization of shared configuration
  1338. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  1339. BucketStoreConfig: env.GetKubecostConfigBucket(),
  1340. LocalConfigPath: "/",
  1341. })
  1342. configPrefix := env.GetConfigPathWithDefault("/var/configs/")
  1343. // Create Kubernetes Cluster Cache + Watchers
  1344. var k8sCache clustercache.ClusterCache
  1345. if env.IsClusterCacheFileEnabled() {
  1346. importLocation := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-cache.json"))
  1347. k8sCache = clustercache.NewClusterImporter(importLocation)
  1348. } else {
  1349. k8sCache = clustercache.NewKubernetesClusterCache(kubeClientset)
  1350. }
  1351. k8sCache.Run()
  1352. cloudProviderKey := env.GetCloudProviderAPIKey()
  1353. cloudProvider, err := provider.NewProvider(k8sCache, cloudProviderKey, confManager)
  1354. if err != nil {
  1355. panic(err.Error())
  1356. }
  1357. // Append the pricing config watcher
  1358. configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
  1359. configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
  1360. watchConfigFunc := configWatchers.ToWatchFunc()
  1361. watchedConfigs := configWatchers.GetWatchedConfigs()
  1362. kubecostNamespace := env.GetKubecostNamespace()
  1363. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  1364. for _, cw := range watchedConfigs {
  1365. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
  1366. if err != nil {
  1367. log.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
  1368. } else {
  1369. log.Infof("Found configmap %s, watching...", configs.Name)
  1370. watchConfigFunc(configs)
  1371. }
  1372. }
  1373. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  1374. remoteEnabled := env.IsRemoteEnabled()
  1375. if remoteEnabled {
  1376. info, err := cloudProvider.ClusterInfo()
  1377. log.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  1378. if err != nil {
  1379. log.Infof("Error saving cluster id %s", err.Error())
  1380. }
  1381. _, _, err = utils.GetOrCreateClusterMeta(info["id"], info["name"])
  1382. if err != nil {
  1383. log.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  1384. }
  1385. }
  1386. // Thanos Client
  1387. var thanosClient prometheus.Client
  1388. if thanos.IsEnabled() {
  1389. thanosAddress := thanos.QueryURL()
  1390. if thanosAddress != "" {
  1391. thanosCli, _ := thanos.NewThanosClient(thanosAddress, &prom.PrometheusClientConfig{
  1392. Timeout: timeout,
  1393. KeepAlive: keepAlive,
  1394. TLSHandshakeTimeout: tlsHandshakeTimeout,
  1395. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  1396. RateLimitRetryOpts: rateLimitRetryOpts,
  1397. Auth: &prom.ClientAuth{
  1398. Username: env.GetMultiClusterBasicAuthUsername(),
  1399. Password: env.GetMultiClusterBasicAuthPassword(),
  1400. BearerToken: env.GetMultiClusterBearerToken(),
  1401. },
  1402. QueryConcurrency: queryConcurrency,
  1403. QueryLogFile: env.GetQueryLoggingFile(),
  1404. })
  1405. _, err = prom.Validate(thanosCli)
  1406. if err != nil {
  1407. log.Warnf("Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  1408. thanosClient = thanosCli
  1409. } else {
  1410. log.Infof("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  1411. thanosClient = thanosCli
  1412. }
  1413. } else {
  1414. log.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  1415. }
  1416. }
  1417. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  1418. var clusterInfoProvider clusters.ClusterInfoProvider
  1419. if env.IsClusterInfoFileEnabled() {
  1420. clusterInfoFile := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-info.json"))
  1421. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  1422. } else {
  1423. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  1424. }
  1425. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  1426. var clusterMap clusters.ClusterMap
  1427. if thanosClient != nil {
  1428. clusterMap = clustermap.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
  1429. } else {
  1430. clusterMap = clustermap.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
  1431. }
  1432. // cache responses from model and aggregation for a default of 10 minutes;
  1433. // clear expired responses every 20 minutes
  1434. aggregateCache := cache.New(time.Minute*10, time.Minute*20)
  1435. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  1436. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1437. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  1438. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  1439. // query durations that should be cached longer should be registered here
  1440. // use relatively prime numbers to minimize likelihood of synchronized
  1441. // attempts at cache warming
  1442. day := 24 * time.Hour
  1443. cacheExpiration := map[time.Duration]time.Duration{
  1444. day: maxCacheMinutes1d * time.Minute,
  1445. 2 * day: maxCacheMinutes2d * time.Minute,
  1446. 7 * day: maxCacheMinutes7d * time.Minute,
  1447. 30 * day: maxCacheMinutes30d * time.Minute,
  1448. }
  1449. var pc prometheus.Client
  1450. if thanosClient != nil {
  1451. pc = thanosClient
  1452. } else {
  1453. pc = promCli
  1454. }
  1455. costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
  1456. metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
  1457. a := &Accesses{
  1458. Router: httprouter.New(),
  1459. PrometheusClient: promCli,
  1460. ThanosClient: thanosClient,
  1461. KubeClientSet: kubeClientset,
  1462. ClusterCache: k8sCache,
  1463. ClusterMap: clusterMap,
  1464. CloudProvider: cloudProvider,
  1465. CloudConfigController: cloudconfig.NewController(cloudProvider),
  1466. ConfigFileManager: confManager,
  1467. ClusterInfoProvider: clusterInfoProvider,
  1468. Model: costModel,
  1469. MetricsEmitter: metricsEmitter,
  1470. AggregateCache: aggregateCache,
  1471. CostDataCache: costDataCache,
  1472. ClusterCostsCache: clusterCostsCache,
  1473. OutOfClusterCache: outOfClusterCache,
  1474. SettingsCache: settingsCache,
  1475. CacheExpiration: cacheExpiration,
  1476. httpServices: services.NewCostModelServices(),
  1477. }
  1478. // Use the Accesses instance, itself, as the CostModelAggregator. This is
  1479. // confusing and unconventional, but necessary so that we can swap it
  1480. // out for the ETL-adapted version elsewhere.
  1481. // TODO clean this up once ETL is open-sourced.
  1482. a.AggAPI = a
  1483. // Initialize mechanism for subscribing to settings changes
  1484. a.InitializeSettingsPubSub()
  1485. err = a.CloudProvider.DownloadPricingData()
  1486. if err != nil {
  1487. log.Infof("Failed to download pricing data: " + err.Error())
  1488. }
  1489. // Warm the aggregate cache unless explicitly set to false
  1490. if env.IsCacheWarmingEnabled() {
  1491. log.Infof("Init: AggregateCostModel cache warming enabled")
  1492. a.warmAggregateCostModelCache()
  1493. } else {
  1494. log.Infof("Init: AggregateCostModel cache warming disabled")
  1495. }
  1496. if !env.IsKubecostMetricsPodEnabled() {
  1497. a.MetricsEmitter.Start()
  1498. }
  1499. log.Infof("Custom Costs enabled: %t", env.IsCustomCostEnabled())
  1500. if env.IsCustomCostEnabled() {
  1501. hourlyRepo := customcost.NewMemoryRepository()
  1502. dailyRepo := customcost.NewMemoryRepository()
  1503. ingConfig := customcost.DefaultIngestorConfiguration()
  1504. var err error
  1505. a.CustomCostPipelineService, err = customcost.NewPipelineService(hourlyRepo, dailyRepo, ingConfig)
  1506. if err != nil {
  1507. log.Errorf("error instantiating custom cost pipeline service: %v", err)
  1508. return nil
  1509. }
  1510. customCostQuerier := customcost.NewRepositoryQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
  1511. a.CustomCostQueryService = customcost.NewQueryService(customCostQuerier)
  1512. }
  1513. a.Router.GET("/costDataModel", a.CostDataModel)
  1514. a.Router.GET("/costDataModelRange", a.CostDataModelRange)
  1515. a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
  1516. a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
  1517. a.Router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
  1518. a.Router.GET("/allNodePricing", a.GetAllNodePricing)
  1519. a.Router.POST("/refreshPricing", a.RefreshPricingData)
  1520. a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  1521. a.Router.GET("/clusterCosts", a.ClusterCosts)
  1522. a.Router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  1523. a.Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
  1524. a.Router.GET("/managementPlatform", a.ManagementPlatform)
  1525. a.Router.GET("/clusterInfo", a.ClusterInfo)
  1526. a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  1527. a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  1528. a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  1529. a.Router.GET("/pricingSourceSummary", a.GetPricingSourceSummary)
  1530. a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  1531. // endpoints migrated from server
  1532. a.Router.GET("/allPersistentVolumes", a.GetAllPersistentVolumes)
  1533. a.Router.GET("/allDeployments", a.GetAllDeployments)
  1534. a.Router.GET("/allStorageClasses", a.GetAllStorageClasses)
  1535. a.Router.GET("/allStatefulSets", a.GetAllStatefulSets)
  1536. a.Router.GET("/allNodes", a.GetAllNodes)
  1537. a.Router.GET("/allPods", a.GetAllPods)
  1538. a.Router.GET("/allNamespaces", a.GetAllNamespaces)
  1539. a.Router.GET("/allDaemonSets", a.GetAllDaemonSets)
  1540. a.Router.GET("/pod/:namespace/:name", a.GetPod)
  1541. a.Router.GET("/prometheusRecordingRules", a.PrometheusRecordingRules)
  1542. a.Router.GET("/prometheusConfig", a.PrometheusConfig)
  1543. a.Router.GET("/prometheusTargets", a.PrometheusTargets)
  1544. a.Router.GET("/orphanedPods", a.GetOrphanedPods)
  1545. a.Router.GET("/installNamespace", a.GetInstallNamespace)
  1546. a.Router.GET("/installInfo", a.GetInstallInfo)
  1547. a.Router.GET("/podLogs", a.GetPodLogs)
  1548. a.Router.POST("/serviceKey", a.AddServiceKey)
  1549. a.Router.GET("/helmValues", a.GetHelmValues)
  1550. a.Router.GET("/status", a.Status)
  1551. // prom query proxies
  1552. a.Router.GET("/prometheusQuery", a.PrometheusQuery)
  1553. a.Router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
  1554. a.Router.GET("/thanosQuery", a.ThanosQuery)
  1555. a.Router.GET("/thanosQueryRange", a.ThanosQueryRange)
  1556. // diagnostics
  1557. a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
  1558. a.Router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
  1559. a.Router.GET("/logs/level", a.GetLogLevel)
  1560. a.Router.POST("/logs/level", a.SetLogLevel)
  1561. a.Router.GET("/cloud/config/export", a.CloudConfigController.GetExportConfigHandler())
  1562. a.Router.GET("/cloud/config/enable", a.CloudConfigController.GetEnableConfigHandler())
  1563. a.Router.GET("/cloud/config/disable", a.CloudConfigController.GetDisableConfigHandler())
  1564. a.Router.GET("/cloud/config/delete", a.CloudConfigController.GetDeleteConfigHandler())
  1565. if env.IsCustomCostEnabled() {
  1566. a.Router.GET("/customCost/total", a.CustomCostQueryService.GetCustomCostTotalHandler())
  1567. a.Router.GET("/customCost/timeseries", a.CustomCostQueryService.GetCustomCostTimeseriesHandler())
  1568. }
  1569. // this endpoint is intentionally left out of the "if env.IsCustomCostEnabled()" conditional; in the handler, it is
  1570. // valid for CustomCostPipelineService to be nil
  1571. a.Router.GET("/customCost/status", a.CustomCostPipelineService.GetCustomCostStatusHandler())
  1572. a.httpServices.RegisterAll(a.Router)
  1573. return a
  1574. }
  1575. func InitializeWithoutKubernetes() *Accesses {
  1576. var err error
  1577. if errorReportingEnabled {
  1578. err = sentry.Init(sentry.ClientOptions{Release: version.FriendlyVersion()})
  1579. if err != nil {
  1580. log.Infof("Failed to initialize sentry for error reporting")
  1581. } else {
  1582. err = errors.SetPanicHandler(handlePanic)
  1583. if err != nil {
  1584. log.Infof("Failed to set panic handler: %s", err)
  1585. }
  1586. }
  1587. }
  1588. a := &Accesses{
  1589. Router: httprouter.New(),
  1590. CloudConfigController: cloudconfig.NewController(nil),
  1591. httpServices: services.NewCostModelServices(),
  1592. }
  1593. a.Router.GET("/logs/level", a.GetLogLevel)
  1594. a.Router.POST("/logs/level", a.SetLogLevel)
  1595. a.httpServices.RegisterAll(a.Router)
  1596. return a
  1597. }
  1598. func writeErrorResponse(w http.ResponseWriter, code int, message string) {
  1599. out := map[string]string{
  1600. "message": message,
  1601. }
  1602. bytes, err := json.Marshal(out)
  1603. if err != nil {
  1604. w.Header().Set("Content-Type", "text/plain")
  1605. w.WriteHeader(500)
  1606. fmt.Fprint(w, "unable to marshall json for error")
  1607. log.Warnf("Failed to marshall JSON for error response: %s", err.Error())
  1608. return
  1609. }
  1610. w.WriteHeader(code)
  1611. fmt.Fprint(w, string(bytes))
  1612. }