router.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "path"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/opencost/opencost/core/pkg/source"
  15. "github.com/opencost/opencost/core/pkg/util/retry"
  16. "github.com/opencost/opencost/core/pkg/util/timeutil"
  17. "github.com/opencost/opencost/core/pkg/version"
  18. "github.com/opencost/opencost/pkg/cloud/aws"
  19. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  20. "github.com/opencost/opencost/pkg/cloud/gcp"
  21. "github.com/opencost/opencost/pkg/cloud/provider"
  22. "github.com/opencost/opencost/pkg/cloudcost"
  23. "github.com/opencost/opencost/pkg/config"
  24. "github.com/opencost/opencost/pkg/customcost"
  25. "github.com/opencost/opencost/pkg/kubeconfig"
  26. "github.com/opencost/opencost/pkg/metrics"
  27. "github.com/opencost/opencost/pkg/services"
  28. "github.com/opencost/opencost/pkg/util/watcher"
  29. "github.com/julienschmidt/httprouter"
  30. "github.com/getsentry/sentry-go"
  31. "github.com/opencost/opencost/core/pkg/clusters"
  32. sysenv "github.com/opencost/opencost/core/pkg/env"
  33. "github.com/opencost/opencost/core/pkg/log"
  34. "github.com/opencost/opencost/core/pkg/util/json"
  35. "github.com/opencost/opencost/modules/prometheus-source/pkg/prom"
  36. "github.com/opencost/opencost/pkg/cloud/azure"
  37. "github.com/opencost/opencost/pkg/cloud/models"
  38. "github.com/opencost/opencost/pkg/clustercache"
  39. "github.com/opencost/opencost/pkg/env"
  40. "github.com/opencost/opencost/pkg/errors"
  41. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  42. "github.com/patrickmn/go-cache"
  43. "k8s.io/client-go/kubernetes"
  44. )
  45. const (
  46. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  47. maxCacheMinutes1d = 11
  48. maxCacheMinutes2d = 17
  49. maxCacheMinutes7d = 37
  50. maxCacheMinutes30d = 137
  51. CustomPricingSetting = "CustomPricing"
  52. DiscountSetting = "Discount"
  53. )
  54. var (
  55. // gitCommit is set by the build system
  56. gitCommit string
  57. )
  58. // Accesses defines a singleton application instance, providing access to
  59. // Prometheus, Kubernetes, the cloud provider, and caches.
  60. type Accesses struct {
  61. DataSource source.OpenCostDataSource
  62. KubeClientSet kubernetes.Interface
  63. ClusterCache clustercache.ClusterCache
  64. ClusterMap clusters.ClusterMap
  65. CloudProvider models.Provider
  66. ConfigFileManager *config.ConfigFileManager
  67. ClusterInfoProvider clusters.ClusterInfoProvider
  68. Model *CostModel
  69. MetricsEmitter *CostModelMetricsEmitter
  70. OutOfClusterCache *cache.Cache
  71. CostDataCache *cache.Cache
  72. ClusterCostsCache *cache.Cache
  73. CacheExpiration map[time.Duration]time.Duration
  74. // SettingsCache stores current state of app settings
  75. SettingsCache *cache.Cache
  76. // settingsSubscribers tracks channels through which changes to different
  77. // settings will be published in a pub/sub model
  78. settingsSubscribers map[string][]chan string
  79. settingsMutex sync.Mutex
  80. // registered http service instances
  81. httpServices services.HTTPServices
  82. }
  83. // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
  84. // If one does not exists, it returns the default cache expiration, which is defined by
  85. // the particular cache.
  86. func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
  87. if expiration, ok := a.CacheExpiration[dur]; ok {
  88. return expiration
  89. }
  90. return cache.DefaultExpiration
  91. }
  92. // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
  93. // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
  94. // not found or is less than 2 minutes.
  95. func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
  96. expiry := a.GetCacheExpiration(dur).Minutes()
  97. if expiry <= 2.0 {
  98. return time.Minute
  99. }
  100. mins := time.Duration(expiry/2.0) * time.Minute
  101. return mins
  102. }
  103. func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  104. w.Header().Set("Content-Type", "application/json")
  105. duration := 24 * time.Hour
  106. offset := time.Minute
  107. durationHrs := "24h"
  108. fmtOffset := "1m"
  109. dataSource := a.DataSource
  110. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  111. if data, valid := a.ClusterCostsCache.Get(key); valid {
  112. clusterCosts := data.(map[string]*ClusterCosts)
  113. w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
  114. } else {
  115. data, err := a.ComputeClusterCosts(dataSource, a.CloudProvider, duration, offset, true)
  116. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
  117. }
  118. }
  119. type Response struct {
  120. Code int `json:"code"`
  121. Status string `json:"status"`
  122. Data interface{} `json:"data"`
  123. Message string `json:"message,omitempty"`
  124. Warning string `json:"warning,omitempty"`
  125. }
  126. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  127. type FilterFunc func(*CostData) (bool, string)
  128. // FilterCostData allows through only CostData that matches all the given filter functions
  129. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  130. result := make(map[string]*CostData)
  131. filteredEnvironments := make(map[string]int)
  132. filteredContainers := 0
  133. DataLoop:
  134. for key, datum := range data {
  135. for _, rf := range retains {
  136. if ok, _ := rf(datum); ok {
  137. result[key] = datum
  138. // if any retain function passes, the data is retained and move on
  139. continue DataLoop
  140. }
  141. }
  142. for _, ff := range filters {
  143. if ok, environment := ff(datum); !ok {
  144. if environment != "" {
  145. filteredEnvironments[environment]++
  146. }
  147. filteredContainers++
  148. // if any filter function check fails, move on to the next datum
  149. continue DataLoop
  150. }
  151. }
  152. result[key] = datum
  153. }
  154. return result, filteredContainers, filteredEnvironments
  155. }
  156. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  157. fs := strings.Split(fields, ",")
  158. fmap := make(map[string]bool)
  159. for _, f := range fs {
  160. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  161. log.Debugf("to delete: %s", fieldNameLower)
  162. fmap[fieldNameLower] = true
  163. }
  164. filteredData := make(map[string]CostData)
  165. for cname, costdata := range data {
  166. s := reflect.TypeOf(*costdata)
  167. val := reflect.ValueOf(*costdata)
  168. costdata2 := CostData{}
  169. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  170. n := s.NumField()
  171. for i := 0; i < n; i++ {
  172. field := s.Field(i)
  173. value := val.Field(i)
  174. value2 := cd2.Field(i)
  175. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  176. value2.Set(reflect.Value(value))
  177. }
  178. }
  179. filteredData[cname] = cd2.Interface().(CostData)
  180. }
  181. return filteredData
  182. }
  183. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  184. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  185. // return 0.0.
  186. func ParsePercentString(percentStr string) (float64, error) {
  187. if len(percentStr) == 0 {
  188. return 0.0, nil
  189. }
  190. if percentStr[len(percentStr)-1:] == "%" {
  191. percentStr = percentStr[:len(percentStr)-1]
  192. }
  193. discount, err := strconv.ParseFloat(percentStr, 64)
  194. if err != nil {
  195. return 0.0, err
  196. }
  197. discount *= 0.01
  198. return discount, nil
  199. }
  200. func WrapData(data interface{}, err error) []byte {
  201. var resp []byte
  202. if err != nil {
  203. log.Errorf("Error returned to client: %s", err.Error())
  204. resp, _ = json.Marshal(&Response{
  205. Code: http.StatusInternalServerError,
  206. Status: "error",
  207. Message: err.Error(),
  208. Data: data,
  209. })
  210. } else {
  211. resp, err = json.Marshal(&Response{
  212. Code: http.StatusOK,
  213. Status: "success",
  214. Data: data,
  215. })
  216. if err != nil {
  217. log.Errorf("error marshaling response json: %s", err.Error())
  218. }
  219. }
  220. return resp
  221. }
  222. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  223. var resp []byte
  224. if err != nil {
  225. log.Errorf("Error returned to client: %s", err.Error())
  226. resp, _ = json.Marshal(&Response{
  227. Code: http.StatusInternalServerError,
  228. Status: "error",
  229. Message: err.Error(),
  230. Data: data,
  231. })
  232. } else {
  233. resp, _ = json.Marshal(&Response{
  234. Code: http.StatusOK,
  235. Status: "success",
  236. Data: data,
  237. Message: message,
  238. })
  239. }
  240. return resp
  241. }
  242. func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
  243. var resp []byte
  244. if err != nil {
  245. log.Errorf("Error returned to client: %s", err.Error())
  246. resp, _ = json.Marshal(&Response{
  247. Code: http.StatusInternalServerError,
  248. Status: "error",
  249. Message: err.Error(),
  250. Warning: warning,
  251. Data: data,
  252. })
  253. } else {
  254. resp, _ = json.Marshal(&Response{
  255. Code: http.StatusOK,
  256. Status: "success",
  257. Data: data,
  258. Warning: warning,
  259. })
  260. }
  261. return resp
  262. }
  263. func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
  264. var resp []byte
  265. if err != nil {
  266. log.Errorf("Error returned to client: %s", err.Error())
  267. resp, _ = json.Marshal(&Response{
  268. Code: http.StatusInternalServerError,
  269. Status: "error",
  270. Message: err.Error(),
  271. Warning: warning,
  272. Data: data,
  273. })
  274. } else {
  275. resp, _ = json.Marshal(&Response{
  276. Code: http.StatusOK,
  277. Status: "success",
  278. Data: data,
  279. Message: message,
  280. Warning: warning,
  281. })
  282. }
  283. return resp
  284. }
  285. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  286. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  287. w.Header().Set("Content-Type", "application/json")
  288. w.Header().Set("Access-Control-Allow-Origin", "*")
  289. err := a.CloudProvider.DownloadPricingData()
  290. if err != nil {
  291. log.Errorf("Error refreshing pricing data: %s", err.Error())
  292. }
  293. w.Write(WrapData(nil, err))
  294. }
  295. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  296. w.Header().Set("Content-Type", "application/json")
  297. w.Header().Set("Access-Control-Allow-Origin", "*")
  298. window := r.URL.Query().Get("timeWindow")
  299. offset := r.URL.Query().Get("offset")
  300. fields := r.URL.Query().Get("filterFields")
  301. namespace := r.URL.Query().Get("namespace")
  302. duration, err := timeutil.ParseDuration(window)
  303. if err != nil {
  304. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  305. return
  306. }
  307. end := time.Now()
  308. if offset != "" {
  309. offsetDur, err := timeutil.ParseDuration(offset)
  310. if err != nil {
  311. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  312. return
  313. }
  314. end = end.Add(-offsetDur)
  315. }
  316. start := end.Add(-duration)
  317. data, err := a.Model.ComputeCostData(start, end)
  318. // apply filter by removing if != namespace
  319. if namespace != "" {
  320. for key, costData := range data {
  321. if costData.Namespace != namespace {
  322. delete(data, key)
  323. }
  324. }
  325. }
  326. if fields != "" {
  327. filteredData := filterFields(fields, data)
  328. w.Write(WrapData(filteredData, err))
  329. } else {
  330. w.Write(WrapData(data, err))
  331. }
  332. }
  333. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  334. w.Header().Set("Content-Type", "application/json")
  335. w.Header().Set("Access-Control-Allow-Origin", "*")
  336. window := r.URL.Query().Get("window")
  337. offset := r.URL.Query().Get("offset")
  338. if window == "" {
  339. w.Write(WrapData(nil, fmt.Errorf("missing window argument")))
  340. return
  341. }
  342. windowDur, err := timeutil.ParseDuration(window)
  343. if err != nil {
  344. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  345. return
  346. }
  347. // offset is not a required parameter
  348. var offsetDur time.Duration
  349. if offset != "" {
  350. offsetDur, err = timeutil.ParseDuration(offset)
  351. if err != nil {
  352. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  353. return
  354. }
  355. }
  356. /*
  357. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  358. if useThanos && !thanos.IsEnabled() {
  359. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  360. return
  361. }
  362. var client prometheus.Client
  363. if useThanos {
  364. client = a.ThanosClient
  365. offsetDur = thanos.OffsetDuration()
  366. } else {
  367. client = a.PrometheusClient
  368. }
  369. */
  370. data, err := a.ComputeClusterCosts(a.DataSource, a.CloudProvider, windowDur, offsetDur, true)
  371. w.Write(WrapData(data, err))
  372. }
  373. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  374. w.Header().Set("Content-Type", "application/json")
  375. w.Header().Set("Access-Control-Allow-Origin", "*")
  376. startString := r.URL.Query().Get("start")
  377. endString := r.URL.Query().Get("end")
  378. window := r.URL.Query().Get("window")
  379. offset := r.URL.Query().Get("offset")
  380. if window == "" {
  381. w.Write(WrapData(nil, fmt.Errorf("missing window argument")))
  382. return
  383. }
  384. windowDur, err := timeutil.ParseDuration(window)
  385. if err != nil {
  386. w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
  387. return
  388. }
  389. // offset is not a required parameter
  390. var offsetDur time.Duration
  391. if offset != "" {
  392. offsetDur, err = timeutil.ParseDuration(offset)
  393. if err != nil {
  394. w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
  395. return
  396. }
  397. }
  398. const layout = "2006-01-02T15:04:05.000Z"
  399. start, err := time.Parse(layout, startString)
  400. if err != nil {
  401. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  402. w.Write(WrapData(nil, fmt.Errorf("error parsing 'start': %s: %w", startString, err)))
  403. return
  404. }
  405. end, err := time.Parse(layout, endString)
  406. if err != nil {
  407. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  408. w.Write(WrapData(nil, fmt.Errorf("error parsing 'end': %s: %w", endString, err)))
  409. return
  410. }
  411. data, err := ClusterCostsOverTime(a.DataSource, a.CloudProvider, start, end, windowDur, offsetDur)
  412. w.Write(WrapData(data, err))
  413. }
  414. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  415. w.Header().Set("Content-Type", "application/json")
  416. w.Header().Set("Access-Control-Allow-Origin", "*")
  417. data, err := a.CloudProvider.AllNodePricing()
  418. w.Write(WrapData(data, err))
  419. }
  420. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  421. w.Header().Set("Content-Type", "application/json")
  422. w.Header().Set("Access-Control-Allow-Origin", "*")
  423. data, err := a.CloudProvider.GetConfig()
  424. w.Write(WrapData(data, err))
  425. }
  426. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  427. w.Header().Set("Content-Type", "application/json")
  428. w.Header().Set("Access-Control-Allow-Origin", "*")
  429. data, err := a.CloudProvider.UpdateConfig(r.Body, aws.SpotInfoUpdateType)
  430. if err != nil {
  431. w.Write(WrapData(data, err))
  432. return
  433. }
  434. w.Write(WrapData(data, err))
  435. err = a.CloudProvider.DownloadPricingData()
  436. if err != nil {
  437. log.Errorf("Error redownloading data on config update: %s", err.Error())
  438. }
  439. }
  440. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  441. w.Header().Set("Content-Type", "application/json")
  442. w.Header().Set("Access-Control-Allow-Origin", "*")
  443. data, err := a.CloudProvider.UpdateConfig(r.Body, aws.AthenaInfoUpdateType)
  444. if err != nil {
  445. w.Write(WrapData(data, err))
  446. return
  447. }
  448. w.Write(WrapData(data, err))
  449. }
  450. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  451. w.Header().Set("Content-Type", "application/json")
  452. w.Header().Set("Access-Control-Allow-Origin", "*")
  453. data, err := a.CloudProvider.UpdateConfig(r.Body, gcp.BigqueryUpdateType)
  454. if err != nil {
  455. w.Write(WrapData(data, err))
  456. return
  457. }
  458. w.Write(WrapData(data, err))
  459. }
  460. func (a *Accesses) UpdateAzureStorageConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  461. w.Header().Set("Content-Type", "application/json")
  462. w.Header().Set("Access-Control-Allow-Origin", "*")
  463. data, err := a.CloudProvider.UpdateConfig(r.Body, azure.AzureStorageUpdateType)
  464. if err != nil {
  465. w.Write(WrapData(data, err))
  466. return
  467. }
  468. w.Write(WrapData(data, err))
  469. }
  470. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  471. w.Header().Set("Content-Type", "application/json")
  472. w.Header().Set("Access-Control-Allow-Origin", "*")
  473. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  474. if err != nil {
  475. w.Write(WrapData(data, err))
  476. return
  477. }
  478. w.Write(WrapData(data, err))
  479. }
  480. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  481. w.Header().Set("Content-Type", "application/json")
  482. w.Header().Set("Access-Control-Allow-Origin", "*")
  483. data, err := a.CloudProvider.GetManagementPlatform()
  484. if err != nil {
  485. w.Write(WrapData(data, err))
  486. return
  487. }
  488. w.Write(WrapData(data, err))
  489. }
  490. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  491. w.Header().Set("Content-Type", "application/json")
  492. w.Header().Set("Access-Control-Allow-Origin", "*")
  493. data := a.ClusterInfoProvider.GetClusterInfo()
  494. w.Write(WrapData(data, nil))
  495. }
  496. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  497. w.Header().Set("Content-Type", "application/json")
  498. w.Header().Set("Access-Control-Allow-Origin", "*")
  499. data := a.ClusterMap.AsMap()
  500. w.Write(WrapData(data, nil))
  501. }
  502. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  503. w.Header().Set("Content-Type", "application/json")
  504. w.Header().Set("Access-Control-Allow-Origin", "*")
  505. w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
  506. }
  507. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  508. w.Header().Set("Content-Type", "application/json")
  509. w.Header().Set("Access-Control-Allow-Origin", "*")
  510. w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
  511. }
  512. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  513. w.Header().Set("Content-Type", "application/json")
  514. w.Header().Set("Access-Control-Allow-Origin", "*")
  515. w.Write(WrapData(a.Model.GetPricingSourceCounts()))
  516. }
  517. func (a *Accesses) GetPricingSourceSummary(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
  518. w.Header().Set("Content-Type", "application/json")
  519. w.Header().Set("Access-Control-Allow-Origin", "*")
  520. data := a.CloudProvider.PricingSourceSummary()
  521. w.Write(WrapData(data, nil))
  522. }
  523. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  524. w.Header().Set("Content-Type", "application/json")
  525. w.Header().Set("Access-Control-Allow-Origin", "*")
  526. podlist := a.ClusterCache.GetAllPods()
  527. var lonePods []*clustercache.Pod
  528. for _, pod := range podlist {
  529. if len(pod.OwnerReferences) == 0 {
  530. lonePods = append(lonePods, pod)
  531. }
  532. }
  533. body, err := json.Marshal(lonePods)
  534. if err != nil {
  535. fmt.Fprintf(w, "Error decoding pod: %s", err)
  536. } else {
  537. w.Write(body)
  538. }
  539. }
  540. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  541. w.Header().Set("Content-Type", "application/json")
  542. w.Header().Set("Access-Control-Allow-Origin", "*")
  543. ns := env.GetKubecostNamespace()
  544. w.Write([]byte(ns))
  545. }
  546. type InstallInfo struct {
  547. Containers []ContainerInfo `json:"containers"`
  548. ClusterInfo map[string]string `json:"clusterInfo"`
  549. Version string `json:"version"`
  550. }
  551. type ContainerInfo struct {
  552. ContainerName string `json:"containerName"`
  553. Image string `json:"image"`
  554. StartTime string `json:"startTime"`
  555. }
  556. func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  557. w.Header().Set("Content-Type", "application/json")
  558. w.Header().Set("Access-Control-Allow-Origin", "*")
  559. containers, err := GetKubecostContainers(a.KubeClientSet)
  560. if err != nil {
  561. writeErrorResponse(w, 500, fmt.Sprintf("Unable to list pods: %s", err.Error()))
  562. return
  563. }
  564. info := InstallInfo{
  565. Containers: containers,
  566. ClusterInfo: make(map[string]string),
  567. Version: version.FriendlyVersion(),
  568. }
  569. nodes := a.ClusterCache.GetAllNodes()
  570. cachePods := a.ClusterCache.GetAllPods()
  571. info.ClusterInfo["nodeCount"] = strconv.Itoa(len(nodes))
  572. info.ClusterInfo["podCount"] = strconv.Itoa(len(cachePods))
  573. body, err := json.Marshal(info)
  574. if err != nil {
  575. writeErrorResponse(w, 500, fmt.Sprintf("Error decoding pod: %s", err.Error()))
  576. return
  577. }
  578. w.Write(body)
  579. }
  580. func GetKubecostContainers(kubeClientSet kubernetes.Interface) ([]ContainerInfo, error) {
  581. pods, err := kubeClientSet.CoreV1().Pods(env.GetKubecostNamespace()).List(context.Background(), metav1.ListOptions{
  582. LabelSelector: "app=cost-analyzer",
  583. FieldSelector: "status.phase=Running",
  584. Limit: 1,
  585. })
  586. if err != nil {
  587. return nil, fmt.Errorf("failed to query kubernetes client for kubecost pods: %s", err)
  588. }
  589. // If we have zero pods either something is weird with the install since the app selector is not exposed in the helm
  590. // chart or more likely we are running locally - in either case Images field will return as null
  591. var containers []ContainerInfo
  592. if len(pods.Items) > 0 {
  593. for _, pod := range pods.Items {
  594. for _, container := range pod.Spec.Containers {
  595. c := ContainerInfo{
  596. ContainerName: container.Name,
  597. Image: container.Image,
  598. StartTime: pod.Status.StartTime.String(),
  599. }
  600. containers = append(containers, c)
  601. }
  602. }
  603. }
  604. return containers, nil
  605. }
  606. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  607. w.Header().Set("Content-Type", "application/json")
  608. w.Header().Set("Access-Control-Allow-Origin", "*")
  609. r.ParseForm()
  610. key := r.PostForm.Get("key")
  611. k := []byte(key)
  612. err := os.WriteFile(path.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "key.json"), k, 0644)
  613. if err != nil {
  614. fmt.Fprintf(w, "Error writing service key: %s", err)
  615. }
  616. w.WriteHeader(http.StatusOK)
  617. }
  618. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  619. w.Header().Set("Content-Type", "application/json")
  620. w.Header().Set("Access-Control-Allow-Origin", "*")
  621. encodedValues := sysenv.Get("HELM_VALUES", "")
  622. if encodedValues == "" {
  623. fmt.Fprintf(w, "Values reporting disabled")
  624. return
  625. }
  626. result, err := base64.StdEncoding.DecodeString(encodedValues)
  627. if err != nil {
  628. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  629. return
  630. }
  631. w.Write(result)
  632. }
  633. // captures the panic event in sentry
  634. func capturePanicEvent(err string, stack string) {
  635. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  636. log.Infof("%s", msg)
  637. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  638. Level: sentry.LevelError,
  639. Message: msg,
  640. })
  641. sentry.Flush(5 * time.Second)
  642. }
  643. // handle any panics reported by the errors package
  644. func handlePanic(p errors.Panic) bool {
  645. err := p.Error
  646. if err != nil {
  647. if err, ok := err.(error); ok {
  648. capturePanicEvent(err.Error(), p.Stack)
  649. }
  650. if err, ok := err.(string); ok {
  651. capturePanicEvent(err, p.Stack)
  652. }
  653. }
  654. // Return true to recover iff the type is http, otherwise allow kubernetes
  655. // to recover.
  656. return p.Type == errors.PanicTypeHTTP
  657. }
  658. func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  659. var err error
  660. if errorReportingEnabled {
  661. err = sentry.Init(sentry.ClientOptions{Release: version.FriendlyVersion()})
  662. if err != nil {
  663. log.Infof("Failed to initialize sentry for error reporting")
  664. } else {
  665. err = errors.SetPanicHandler(handlePanic)
  666. if err != nil {
  667. log.Infof("Failed to set panic handler: %s", err)
  668. }
  669. }
  670. }
  671. const maxRetries = 10
  672. const retryInterval = 10 * time.Second
  673. var fatalErr error
  674. ctx, cancel := context.WithCancel(context.Background())
  675. dataSource, _ := retry.Retry(
  676. ctx,
  677. func() (source.OpenCostDataSource, error) {
  678. ds, e := prom.NewDefaultPrometheusDataSource()
  679. if e != nil {
  680. if source.IsRetryable(e) {
  681. return nil, e
  682. }
  683. fatalErr = e
  684. cancel()
  685. }
  686. return ds, e
  687. },
  688. maxRetries,
  689. retryInterval,
  690. )
  691. if fatalErr != nil {
  692. log.Fatalf("Failed to create Prometheus data source: %s", fatalErr)
  693. panic(fatalErr)
  694. }
  695. // Kubernetes API setup
  696. kubeClientset, err := kubeconfig.LoadKubeClient("")
  697. if err != nil {
  698. log.Fatalf("Failed to build Kubernetes client: %s", err.Error())
  699. }
  700. // Create ConfigFileManager for synchronization of shared configuration
  701. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  702. BucketStoreConfig: env.GetKubecostConfigBucket(),
  703. LocalConfigPath: "/",
  704. })
  705. configPrefix := env.GetConfigPathWithDefault("/var/configs/")
  706. // Create Kubernetes Cluster Cache + Watchers
  707. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  708. k8sCache.Run()
  709. cloudProviderKey := env.GetCloudProviderAPIKey()
  710. cloudProvider, err := provider.NewProvider(k8sCache, cloudProviderKey, confManager)
  711. if err != nil {
  712. panic(err.Error())
  713. }
  714. // Append the pricing config watcher
  715. kubecostNamespace := env.GetKubecostNamespace()
  716. configWatchers := watcher.NewConfigMapWatchers(kubeClientset, kubecostNamespace, additionalConfigWatchers...)
  717. configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
  718. configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
  719. configWatchers.Watch()
  720. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  721. var clusterInfoProvider clusters.ClusterInfoProvider
  722. if env.IsClusterInfoFileEnabled() {
  723. clusterInfoFile := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-info.json"))
  724. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  725. } else {
  726. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, dataSource, cloudProvider)
  727. }
  728. clusterMap := dataSource.NewClusterMap(clusterInfoProvider)
  729. // cache responses from model and aggregation for a default of 10 minutes;
  730. // clear expired responses every 20 minutes
  731. costDataCache := cache.New(time.Minute*10, time.Minute*20)
  732. clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  733. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  734. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  735. // query durations that should be cached longer should be registered here
  736. // use relatively prime numbers to minimize likelihood of synchronized
  737. // attempts at cache warming
  738. day := 24 * time.Hour
  739. cacheExpiration := map[time.Duration]time.Duration{
  740. day: maxCacheMinutes1d * time.Minute,
  741. 2 * day: maxCacheMinutes2d * time.Minute,
  742. 7 * day: maxCacheMinutes7d * time.Minute,
  743. 30 * day: maxCacheMinutes30d * time.Minute,
  744. }
  745. costModel := NewCostModel(dataSource, cloudProvider, k8sCache, clusterMap, dataSource.BatchDuration())
  746. metricsEmitter := NewCostModelMetricsEmitter(k8sCache, cloudProvider, clusterInfoProvider, costModel)
  747. a := &Accesses{
  748. httpServices: services.NewCostModelServices(),
  749. DataSource: dataSource,
  750. KubeClientSet: kubeClientset,
  751. ClusterCache: k8sCache,
  752. ClusterMap: clusterMap,
  753. CloudProvider: cloudProvider,
  754. ConfigFileManager: confManager,
  755. ClusterInfoProvider: clusterInfoProvider,
  756. Model: costModel,
  757. MetricsEmitter: metricsEmitter,
  758. CostDataCache: costDataCache,
  759. ClusterCostsCache: clusterCostsCache,
  760. OutOfClusterCache: outOfClusterCache,
  761. SettingsCache: settingsCache,
  762. CacheExpiration: cacheExpiration,
  763. }
  764. // Initialize mechanism for subscribing to settings changes
  765. a.InitializeSettingsPubSub()
  766. err = a.CloudProvider.DownloadPricingData()
  767. if err != nil {
  768. log.Infof("Failed to download pricing data: %s", err)
  769. }
  770. // NOTE: (bolt) this only warms the cache for cluster costs.
  771. if env.IsCacheWarmingEnabled() {
  772. log.Infof("Init: ClusterCosts cache warming enabled")
  773. a.warmAggregateCostModelCache()
  774. } else {
  775. log.Infof("Init: ClusterCosts cache warming disabled")
  776. }
  777. if !env.IsKubecostMetricsPodEnabled() {
  778. a.MetricsEmitter.Start()
  779. }
  780. a.httpServices.RegisterAll(router)
  781. a.DataSource.RegisterEndPoints(router)
  782. router.GET("/costDataModel", a.CostDataModel)
  783. router.GET("/allocation/compute", a.ComputeAllocationHandler)
  784. router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
  785. router.GET("/allNodePricing", a.GetAllNodePricing)
  786. router.POST("/refreshPricing", a.RefreshPricingData)
  787. router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
  788. router.GET("/clusterCosts", a.ClusterCosts)
  789. router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
  790. router.GET("/managementPlatform", a.ManagementPlatform)
  791. router.GET("/clusterInfo", a.ClusterInfo)
  792. router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  793. router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  794. router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  795. router.GET("/pricingSourceSummary", a.GetPricingSourceSummary)
  796. router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  797. router.GET("/orphanedPods", a.GetOrphanedPods)
  798. router.GET("/installNamespace", a.GetInstallNamespace)
  799. router.GET("/installInfo", a.GetInstallInfo)
  800. router.POST("/serviceKey", a.AddServiceKey)
  801. router.GET("/helmValues", a.GetHelmValues)
  802. return a
  803. }
  804. // InitializeCloudCost Initializes Cloud Cost pipeline and querier and registers endpoints
  805. func InitializeCloudCost(router *httprouter.Router, providerConfig models.ProviderConfig) {
  806. log.Debugf("Cloud Cost config path: %s", env.GetCloudCostConfigPath())
  807. cloudConfigController := cloudconfig.NewMemoryController(providerConfig)
  808. repo := cloudcost.NewMemoryRepository()
  809. cloudCostPipelineService := cloudcost.NewPipelineService(repo, cloudConfigController, cloudcost.DefaultIngestorConfiguration())
  810. repoQuerier := cloudcost.NewRepositoryQuerier(repo)
  811. cloudCostQueryService := cloudcost.NewQueryService(repoQuerier, repoQuerier)
  812. router.GET("/cloud/config/export", cloudConfigController.GetExportConfigHandler())
  813. router.GET("/cloud/config/enable", cloudConfigController.GetEnableConfigHandler())
  814. router.GET("/cloud/config/disable", cloudConfigController.GetDisableConfigHandler())
  815. router.GET("/cloud/config/delete", cloudConfigController.GetDeleteConfigHandler())
  816. router.GET("/cloudCost", cloudCostQueryService.GetCloudCostHandler())
  817. router.GET("/cloudCost/view/graph", cloudCostQueryService.GetCloudCostViewGraphHandler())
  818. router.GET("/cloudCost/view/totals", cloudCostQueryService.GetCloudCostViewTotalsHandler())
  819. router.GET("/cloudCost/view/table", cloudCostQueryService.GetCloudCostViewTableHandler())
  820. router.GET("/cloudCost/status", cloudCostPipelineService.GetCloudCostStatusHandler())
  821. router.GET("/cloudCost/rebuild", cloudCostPipelineService.GetCloudCostRebuildHandler())
  822. router.GET("/cloudCost/repair", cloudCostPipelineService.GetCloudCostRepairHandler())
  823. }
  824. func InitializeCustomCost(router *httprouter.Router) *customcost.PipelineService {
  825. hourlyRepo := customcost.NewMemoryRepository()
  826. dailyRepo := customcost.NewMemoryRepository()
  827. ingConfig := customcost.DefaultIngestorConfiguration()
  828. var err error
  829. customCostPipelineService, err := customcost.NewPipelineService(hourlyRepo, dailyRepo, ingConfig)
  830. if err != nil {
  831. log.Errorf("error instantiating custom cost pipeline service: %v", err)
  832. return nil
  833. }
  834. customCostQuerier := customcost.NewRepositoryQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
  835. customCostQueryService := customcost.NewQueryService(customCostQuerier)
  836. router.GET("/customCost/total", customCostQueryService.GetCustomCostTotalHandler())
  837. router.GET("/customCost/timeseries", customCostQueryService.GetCustomCostTimeseriesHandler())
  838. return customCostPipelineService
  839. }
  840. func writeErrorResponse(w http.ResponseWriter, code int, message string) {
  841. out := map[string]string{
  842. "message": message,
  843. }
  844. bytes, err := json.Marshal(out)
  845. if err != nil {
  846. w.Header().Set("Content-Type", "text/plain")
  847. w.WriteHeader(500)
  848. fmt.Fprint(w, "unable to marshall json for error")
  849. log.Warnf("Failed to marshall JSON for error response: %s", err.Error())
  850. return
  851. }
  852. w.WriteHeader(code)
  853. fmt.Fprint(w, string(bytes))
  854. }