router.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035
  1. package costmodel
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/json"
  6. "flag"
  7. "fmt"
  8. "net"
  9. "net/http"
  10. "reflect"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "k8s.io/klog"
  15. "github.com/julienschmidt/httprouter"
  16. sentry "github.com/getsentry/sentry-go"
  17. costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
  18. "github.com/kubecost/cost-model/pkg/clustercache"
  19. cm "github.com/kubecost/cost-model/pkg/clustermanager"
  20. "github.com/kubecost/cost-model/pkg/env"
  21. "github.com/kubecost/cost-model/pkg/errors"
  22. "github.com/kubecost/cost-model/pkg/prom"
  23. "github.com/kubecost/cost-model/pkg/thanos"
  24. prometheusClient "github.com/prometheus/client_golang/api"
  25. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  26. v1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "github.com/patrickmn/go-cache"
  29. "github.com/prometheus/client_golang/prometheus"
  30. "k8s.io/client-go/kubernetes"
  31. "k8s.io/client-go/rest"
  32. "k8s.io/client-go/tools/clientcmd"
  33. )
  34. const (
  35. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  36. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  37. )
  38. var (
  39. // gitCommit is set by the build system
  40. gitCommit string
  41. dbBasicAuthUsername string = env.GetDBBasicAuthUsername()
  42. dbBasicAuthPW string = env.GetDBBasicAuthUserPassword()
  43. dbBearerToken string = env.GetDBBearerToken()
  44. multiclusterDBBasicAuthUsername string = env.GetMultiClusterBasicAuthUsername()
  45. multiclusterDBBasicAuthPW string = env.GetMultiClusterBasicAuthPassword()
  46. multiClusterBearerToken string = env.GetMultiClusterBearerToken()
  47. )
  48. var Router = httprouter.New()
  49. var A Accesses
  50. type Accesses struct {
  51. PrometheusClient prometheusClient.Client
  52. ThanosClient prometheusClient.Client
  53. KubeClientSet kubernetes.Interface
  54. ClusterManager *cm.ClusterManager
  55. Cloud costAnalyzerCloud.Provider
  56. CPUPriceRecorder *prometheus.GaugeVec
  57. RAMPriceRecorder *prometheus.GaugeVec
  58. PersistentVolumePriceRecorder *prometheus.GaugeVec
  59. GPUPriceRecorder *prometheus.GaugeVec
  60. NodeTotalPriceRecorder *prometheus.GaugeVec
  61. NodeSpotRecorder *prometheus.GaugeVec
  62. RAMAllocationRecorder *prometheus.GaugeVec
  63. CPUAllocationRecorder *prometheus.GaugeVec
  64. GPUAllocationRecorder *prometheus.GaugeVec
  65. PVAllocationRecorder *prometheus.GaugeVec
  66. ClusterManagementCostRecorder *prometheus.GaugeVec
  67. LBCostRecorder *prometheus.GaugeVec
  68. NetworkZoneEgressRecorder prometheus.Gauge
  69. NetworkRegionEgressRecorder prometheus.Gauge
  70. NetworkInternetEgressRecorder prometheus.Gauge
  71. ServiceSelectorRecorder *prometheus.GaugeVec
  72. DeploymentSelectorRecorder *prometheus.GaugeVec
  73. Model *CostModel
  74. OutOfClusterCache *cache.Cache
  75. }
  76. type DataEnvelope struct {
  77. Code int `json:"code"`
  78. Status string `json:"status"`
  79. Data interface{} `json:"data"`
  80. Message string `json:"message,omitempty"`
  81. }
  82. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  83. type FilterFunc func(*CostData) (bool, string)
  84. // FilterCostData allows through only CostData that matches all the given filter functions
  85. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  86. result := make(map[string]*CostData)
  87. filteredEnvironments := make(map[string]int)
  88. filteredContainers := 0
  89. DataLoop:
  90. for key, datum := range data {
  91. for _, rf := range retains {
  92. if ok, _ := rf(datum); ok {
  93. result[key] = datum
  94. // if any retain function passes, the data is retained and move on
  95. continue DataLoop
  96. }
  97. }
  98. for _, ff := range filters {
  99. if ok, environment := ff(datum); !ok {
  100. if environment != "" {
  101. filteredEnvironments[environment]++
  102. }
  103. filteredContainers++
  104. // if any filter function check fails, move on to the next datum
  105. continue DataLoop
  106. }
  107. }
  108. result[key] = datum
  109. }
  110. return result, filteredContainers, filteredEnvironments
  111. }
  112. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  113. fs := strings.Split(fields, ",")
  114. fmap := make(map[string]bool)
  115. for _, f := range fs {
  116. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  117. klog.V(1).Infof("to delete: %s", fieldNameLower)
  118. fmap[fieldNameLower] = true
  119. }
  120. filteredData := make(map[string]CostData)
  121. for cname, costdata := range data {
  122. s := reflect.TypeOf(*costdata)
  123. val := reflect.ValueOf(*costdata)
  124. costdata2 := CostData{}
  125. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  126. n := s.NumField()
  127. for i := 0; i < n; i++ {
  128. field := s.Field(i)
  129. value := val.Field(i)
  130. value2 := cd2.Field(i)
  131. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  132. value2.Set(reflect.Value(value))
  133. }
  134. }
  135. filteredData[cname] = cd2.Interface().(CostData)
  136. }
  137. return filteredData
  138. }
  139. func normalizeTimeParam(param string) (string, error) {
  140. if param == "" {
  141. return "", fmt.Errorf("invalid time param")
  142. }
  143. // convert days to hours
  144. if param[len(param)-1:] == "d" {
  145. count := param[:len(param)-1]
  146. val, err := strconv.ParseInt(count, 10, 64)
  147. if err != nil {
  148. return "", err
  149. }
  150. val = val * 24
  151. param = fmt.Sprintf("%dh", val)
  152. }
  153. return param, nil
  154. }
  155. // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  156. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  157. // return 0.0.
  158. func ParsePercentString(percentStr string) (float64, error) {
  159. if len(percentStr) == 0 {
  160. return 0.0, nil
  161. }
  162. if percentStr[len(percentStr)-1:] == "%" {
  163. percentStr = percentStr[:len(percentStr)-1]
  164. }
  165. discount, err := strconv.ParseFloat(percentStr, 64)
  166. if err != nil {
  167. return 0.0, err
  168. }
  169. discount *= 0.01
  170. return discount, nil
  171. }
  172. // parseDuration converts a Prometheus-style duration string into a Duration
  173. func ParseDuration(duration string) (*time.Duration, error) {
  174. unitStr := duration[len(duration)-1:]
  175. var unit time.Duration
  176. switch unitStr {
  177. case "s":
  178. unit = time.Second
  179. case "m":
  180. unit = time.Minute
  181. case "h":
  182. unit = time.Hour
  183. case "d":
  184. unit = 24.0 * time.Hour
  185. default:
  186. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  187. }
  188. amountStr := duration[:len(duration)-1]
  189. amount, err := strconv.ParseInt(amountStr, 10, 64)
  190. if err != nil {
  191. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  192. }
  193. dur := time.Duration(amount) * unit
  194. return &dur, nil
  195. }
  196. // ParseTimeRange returns a start and end time, respectively, which are converted from
  197. // a duration and offset, defined as strings with Prometheus-style syntax.
  198. func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
  199. // endTime defaults to the current time, unless an offset is explicity declared,
  200. // in which case it shifts endTime back by given duration
  201. endTime := time.Now()
  202. if offset != "" {
  203. o, err := ParseDuration(offset)
  204. if err != nil {
  205. return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
  206. }
  207. endTime = endTime.Add(-1 * *o)
  208. }
  209. // if duration is defined in terms of days, convert to hours
  210. // e.g. convert "2d" to "48h"
  211. durationNorm, err := normalizeTimeParam(duration)
  212. if err != nil {
  213. return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
  214. }
  215. // convert time duration into start and end times, formatted
  216. // as ISO datetime strings
  217. dur, err := time.ParseDuration(durationNorm)
  218. if err != nil {
  219. return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
  220. }
  221. startTime := endTime.Add(-1 * dur)
  222. return &startTime, &endTime, nil
  223. }
  224. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  225. var resp []byte
  226. if err != nil {
  227. klog.V(1).Infof("Error returned to client: %s", err.Error())
  228. resp, _ = json.Marshal(&DataEnvelope{
  229. Code: http.StatusInternalServerError,
  230. Status: "error",
  231. Message: err.Error(),
  232. Data: data,
  233. })
  234. } else {
  235. resp, _ = json.Marshal(&DataEnvelope{
  236. Code: http.StatusOK,
  237. Status: "success",
  238. Data: data,
  239. Message: message,
  240. })
  241. }
  242. return resp
  243. }
  244. func WrapData(data interface{}, err error) []byte {
  245. var resp []byte
  246. if err != nil {
  247. klog.V(1).Infof("Error returned to client: %s", err.Error())
  248. resp, _ = json.Marshal(&DataEnvelope{
  249. Code: http.StatusInternalServerError,
  250. Status: "error",
  251. Message: err.Error(),
  252. Data: data,
  253. })
  254. } else {
  255. resp, _ = json.Marshal(&DataEnvelope{
  256. Code: http.StatusOK,
  257. Status: "success",
  258. Data: data,
  259. })
  260. }
  261. return resp
  262. }
  263. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  264. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  265. w.Header().Set("Content-Type", "application/json")
  266. w.Header().Set("Access-Control-Allow-Origin", "*")
  267. err := a.Cloud.DownloadPricingData()
  268. w.Write(WrapData(nil, err))
  269. }
  270. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  271. w.Header().Set("Content-Type", "application/json")
  272. w.Header().Set("Access-Control-Allow-Origin", "*")
  273. window := r.URL.Query().Get("timeWindow")
  274. offset := r.URL.Query().Get("offset")
  275. fields := r.URL.Query().Get("filterFields")
  276. namespace := r.URL.Query().Get("namespace")
  277. if offset != "" {
  278. offset = "offset " + offset
  279. }
  280. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  281. if fields != "" {
  282. filteredData := filterFields(fields, data)
  283. w.Write(WrapData(filteredData, err))
  284. } else {
  285. w.Write(WrapData(data, err))
  286. }
  287. }
  288. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  289. w.Header().Set("Content-Type", "application/json")
  290. w.Header().Set("Access-Control-Allow-Origin", "*")
  291. window := r.URL.Query().Get("window")
  292. offset := r.URL.Query().Get("offset")
  293. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  294. if useThanos && !thanos.IsEnabled() {
  295. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  296. return
  297. }
  298. var client prometheusClient.Client
  299. if useThanos {
  300. client = a.ThanosClient
  301. offset = thanos.Offset()
  302. } else {
  303. client = a.PrometheusClient
  304. }
  305. data, err := ComputeClusterCosts(client, a.Cloud, window, offset, true)
  306. w.Write(WrapData(data, err))
  307. }
  308. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  309. w.Header().Set("Content-Type", "application/json")
  310. w.Header().Set("Access-Control-Allow-Origin", "*")
  311. start := r.URL.Query().Get("start")
  312. end := r.URL.Query().Get("end")
  313. window := r.URL.Query().Get("window")
  314. offset := r.URL.Query().Get("offset")
  315. data, err := ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  316. w.Write(WrapData(data, err))
  317. }
  318. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  319. w.Header().Set("Content-Type", "application/json")
  320. w.Header().Set("Access-Control-Allow-Origin", "*")
  321. start := r.URL.Query().Get("start")
  322. end := r.URL.Query().Get("end")
  323. window := r.URL.Query().Get("window")
  324. fields := r.URL.Query().Get("filterFields")
  325. namespace := r.URL.Query().Get("namespace")
  326. cluster := r.URL.Query().Get("cluster")
  327. remote := r.URL.Query().Get("remote")
  328. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  329. // Use Thanos Client if it exists (enabled) and remote flag set
  330. var pClient prometheusClient.Client
  331. if remote != "false" && a.ThanosClient != nil {
  332. pClient = a.ThanosClient
  333. } else {
  334. pClient = a.PrometheusClient
  335. }
  336. resolutionHours := 1.0
  337. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, resolutionHours, namespace, cluster, remoteEnabled)
  338. if err != nil {
  339. w.Write(WrapData(nil, err))
  340. }
  341. if fields != "" {
  342. filteredData := filterFields(fields, data)
  343. w.Write(WrapData(filteredData, err))
  344. } else {
  345. w.Write(WrapData(data, err))
  346. }
  347. }
  348. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  349. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  350. w.Header().Set("Content-Type", "application/json")
  351. w.Header().Set("Access-Control-Allow-Origin", "*")
  352. startString := r.URL.Query().Get("start")
  353. endString := r.URL.Query().Get("end")
  354. windowString := r.URL.Query().Get("window")
  355. var start time.Time
  356. var end time.Time
  357. var err error
  358. if windowString == "" {
  359. windowString = "1h"
  360. }
  361. if startString != "" {
  362. start, err = time.Parse(RFC3339Milli, startString)
  363. if err != nil {
  364. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  365. w.Write(WrapData(nil, err))
  366. }
  367. } else {
  368. window, err := time.ParseDuration(windowString)
  369. if err != nil {
  370. w.Write(WrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  371. }
  372. start = time.Now().Add(-2 * window)
  373. }
  374. if endString != "" {
  375. end, err = time.Parse(RFC3339Milli, endString)
  376. if err != nil {
  377. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  378. w.Write(WrapData(nil, err))
  379. }
  380. } else {
  381. end = time.Now()
  382. }
  383. remoteLayout := "2006-01-02T15:04:05Z"
  384. remoteStartStr := start.Format(remoteLayout)
  385. remoteEndStr := end.Format(remoteLayout)
  386. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  387. data, err := CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  388. w.Write(WrapData(data, err))
  389. }
  390. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  391. var key string
  392. var filter string
  393. var val []string
  394. if customAggregation != "" {
  395. key = customAggregation
  396. filter = filterType
  397. val = strings.Split(customAggregation, ",")
  398. } else {
  399. aggregations := strings.Split(aggregator, ",")
  400. for i, agg := range aggregations {
  401. aggregations[i] = "kubernetes_" + agg
  402. }
  403. key = strings.Join(aggregations, ",")
  404. filter = "kubernetes_" + filterType
  405. val = aggregations
  406. }
  407. return key, val, filter
  408. }
  409. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  410. w.Header().Set("Content-Type", "application/json")
  411. w.Header().Set("Access-Control-Allow-Origin", "*")
  412. start := r.URL.Query().Get("start")
  413. end := r.URL.Query().Get("end")
  414. aggregator := r.URL.Query().Get("aggregator")
  415. customAggregation := r.URL.Query().Get("customAggregation")
  416. filterType := r.URL.Query().Get("filterType")
  417. filterValue := r.URL.Query().Get("filterValue")
  418. var data []*costAnalyzerCloud.OutOfClusterAllocation
  419. var err error
  420. _, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
  421. data, err = a.Cloud.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
  422. w.Write(WrapData(data, err))
  423. }
  424. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  425. w.Header().Set("Content-Type", "application/json")
  426. w.Header().Set("Access-Control-Allow-Origin", "*")
  427. // start date for which to query costs, inclusive; format YYYY-MM-DD
  428. start := r.URL.Query().Get("start")
  429. // end date for which to query costs, inclusive; format YYYY-MM-DD
  430. end := r.URL.Query().Get("end")
  431. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  432. kubernetesAggregation := r.URL.Query().Get("aggregator")
  433. // customAggregation allows full customization of aggregator w/o prepending
  434. customAggregation := r.URL.Query().Get("customAggregation")
  435. // disableCache, if set to "true", tells this function to recompute and
  436. // cache the requested data
  437. disableCache := r.URL.Query().Get("disableCache") == "true"
  438. // clearCache, if set to "true", tells this function to flush the cache,
  439. // then recompute and cache the requested data
  440. clearCache := r.URL.Query().Get("clearCache") == "true"
  441. filterType := r.URL.Query().Get("filterType")
  442. filterValue := r.URL.Query().Get("filterValue")
  443. aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
  444. // clear cache prior to checking the cache so that a clearCache=true
  445. // request always returns a freshly computed value
  446. if clearCache {
  447. a.OutOfClusterCache.Flush()
  448. }
  449. // attempt to retrieve cost data from cache
  450. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
  451. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  452. if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
  453. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  454. return
  455. }
  456. klog.Errorf("caching error: failed to type cast data: %s", key)
  457. }
  458. data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
  459. if err == nil {
  460. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  461. }
  462. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  463. }
  464. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  465. w.Header().Set("Content-Type", "application/json")
  466. w.Header().Set("Access-Control-Allow-Origin", "*")
  467. data, err := p.Cloud.AllNodePricing()
  468. w.Write(WrapData(data, err))
  469. }
  470. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  471. w.Header().Set("Content-Type", "application/json")
  472. w.Header().Set("Access-Control-Allow-Origin", "*")
  473. data, err := p.Cloud.GetConfig()
  474. w.Write(WrapData(data, err))
  475. }
  476. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  477. w.Header().Set("Content-Type", "application/json")
  478. w.Header().Set("Access-Control-Allow-Origin", "*")
  479. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  480. if err != nil {
  481. w.Write(WrapData(data, err))
  482. return
  483. }
  484. w.Write(WrapData(data, err))
  485. err = p.Cloud.DownloadPricingData()
  486. if err != nil {
  487. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  488. }
  489. return
  490. }
  491. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  492. w.Header().Set("Content-Type", "application/json")
  493. w.Header().Set("Access-Control-Allow-Origin", "*")
  494. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  495. if err != nil {
  496. w.Write(WrapData(data, err))
  497. return
  498. }
  499. w.Write(WrapData(data, err))
  500. return
  501. }
  502. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  503. w.Header().Set("Content-Type", "application/json")
  504. w.Header().Set("Access-Control-Allow-Origin", "*")
  505. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  506. if err != nil {
  507. w.Write(WrapData(data, err))
  508. return
  509. }
  510. w.Write(WrapData(data, err))
  511. return
  512. }
  513. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  514. w.Header().Set("Content-Type", "application/json")
  515. w.Header().Set("Access-Control-Allow-Origin", "*")
  516. data, err := p.Cloud.UpdateConfig(r.Body, "")
  517. if err != nil {
  518. w.Write(WrapData(data, err))
  519. return
  520. }
  521. w.Write(WrapData(data, err))
  522. return
  523. }
  524. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  525. w.Header().Set("Content-Type", "application/json")
  526. w.Header().Set("Access-Control-Allow-Origin", "*")
  527. data, err := p.Cloud.GetManagementPlatform()
  528. if err != nil {
  529. w.Write(WrapData(data, err))
  530. return
  531. }
  532. w.Write(WrapData(data, err))
  533. return
  534. }
  535. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  536. w.Header().Set("Content-Type", "application/json")
  537. w.Header().Set("Access-Control-Allow-Origin", "*")
  538. data := GetClusterInfo(p.KubeClientSet, p.Cloud)
  539. w.Write(WrapData(data, nil))
  540. }
  541. func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  542. w.Header().Set("Content-Type", "application/json")
  543. w.Header().Set("Access-Control-Allow-Origin", "*")
  544. w.Write(WrapData(A.Cloud.ServiceAccountStatus(), nil))
  545. }
  546. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  547. w.Header().Set("Content-Type", "application/json")
  548. w.Header().Set("Access-Control-Allow-Origin", "*")
  549. w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
  550. }
  551. // Creates a new ClusterManager instance using a boltdb storage. If that fails,
  552. // then we fall back to a memory-only storage.
  553. func newClusterManager() *cm.ClusterManager {
  554. clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
  555. // Return a memory-backed cluster manager populated by configmap
  556. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  557. // NOTE: The following should be used with a persistent disk store. Since the
  558. // NOTE: configmap approach is currently the "persistent" source (entries are read-only
  559. // NOTE: on the backend), we don't currently need to store on disk.
  560. /*
  561. path := env.GetConfigPath()
  562. db, err := bolt.Open(path+"costmodel.db", 0600, nil)
  563. if err != nil {
  564. klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
  565. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  566. }
  567. store, err := cm.NewBoltDBClusterStorage("clusters", db)
  568. if err != nil {
  569. klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
  570. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  571. }
  572. return cm.NewConfiguredClusterManager(store, clustersConfigFile)
  573. */
  574. }
  575. type ConfigWatchers struct {
  576. ConfigmapName string
  577. WatchFunc func(string, map[string]string) error
  578. }
  579. // captures the panic event in sentry
  580. func capturePanicEvent(err string, stack string) {
  581. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  582. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  583. Level: sentry.LevelError,
  584. Message: msg,
  585. })
  586. sentry.Flush(5 * time.Second)
  587. }
  588. // handle any panics reported by the errors package
  589. func handlePanic(p errors.Panic) bool {
  590. err := p.Error
  591. if err != nil {
  592. if err, ok := err.(error); ok {
  593. capturePanicEvent(err.Error(), p.Stack)
  594. }
  595. if err, ok := err.(string); ok {
  596. capturePanicEvent(err, p.Stack)
  597. }
  598. }
  599. // Return true to recover iff the type is http, otherwise allow kubernetes
  600. // to recover.
  601. return p.Type == errors.PanicTypeHTTP
  602. }
  603. func Initialize(additionalConfigWatchers ...ConfigWatchers) {
  604. klog.InitFlags(nil)
  605. flag.Set("v", "3")
  606. flag.Parse()
  607. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  608. var err error
  609. if errorReportingEnabled {
  610. err = sentry.Init(sentry.ClientOptions{Release: gitCommit})
  611. if err != nil {
  612. klog.Infof("Failed to initialize sentry for error reporting")
  613. } else {
  614. err = errors.SetPanicHandler(handlePanic)
  615. if err != nil {
  616. klog.Infof("Failed to set panic handler: %s", err)
  617. }
  618. }
  619. }
  620. address := env.GetPrometheusServerEndpoint()
  621. if address == "" {
  622. klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  623. }
  624. queryConcurrency := env.GetMaxQueryConcurrency()
  625. klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  626. tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
  627. var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
  628. Proxy: http.ProxyFromEnvironment,
  629. DialContext: (&net.Dialer{
  630. Timeout: 120 * time.Second,
  631. KeepAlive: 120 * time.Second,
  632. }).DialContext,
  633. TLSHandshakeTimeout: 10 * time.Second,
  634. TLSClientConfig: tlsConfig,
  635. }
  636. pc := prometheusClient.Config{
  637. Address: address,
  638. RoundTripper: LongTimeoutRoundTripper,
  639. }
  640. promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency, dbBasicAuthUsername, dbBasicAuthPW, dbBearerToken)
  641. m, err := ValidatePrometheus(promCli, false)
  642. if err != nil || m.Running == false {
  643. if err != nil {
  644. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  645. } else if m.Running == false {
  646. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
  647. }
  648. api := prometheusAPI.NewAPI(promCli)
  649. _, err = api.Config(context.Background())
  650. if err != nil {
  651. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
  652. } else {
  653. klog.V(1).Info("Retrieved a prometheus config file from: " + address)
  654. }
  655. } else {
  656. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  657. }
  658. // Kubernetes API setup
  659. var kc *rest.Config
  660. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  661. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  662. } else {
  663. kc, err = rest.InClusterConfig()
  664. }
  665. if err != nil {
  666. panic(err.Error())
  667. }
  668. kubeClientset, err := kubernetes.NewForConfig(kc)
  669. if err != nil {
  670. panic(err.Error())
  671. }
  672. // Create Kubernetes Cluster Cache + Watchers
  673. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  674. k8sCache.Run()
  675. cloudProviderKey := env.GetCloudProviderAPIKey()
  676. cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
  677. if err != nil {
  678. panic(err.Error())
  679. }
  680. watchConfigFunc := func(c interface{}) {
  681. conf := c.(*v1.ConfigMap)
  682. if conf.GetName() == "pricing-configs" {
  683. _, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
  684. if err != nil {
  685. klog.Infof("ERROR UPDATING %s CONFIG: %s", "pricing-configs", err.Error())
  686. }
  687. }
  688. for _, cw := range additionalConfigWatchers {
  689. if conf.GetName() == cw.ConfigmapName {
  690. err := cw.WatchFunc(conf.GetName(), conf.Data)
  691. if err != nil {
  692. klog.Infof("ERROR UPDATING %s CONFIG: %s", cw.ConfigmapName, err.Error())
  693. }
  694. }
  695. }
  696. }
  697. kubecostNamespace := env.GetKubecostNamespace()
  698. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  699. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
  700. if err != nil {
  701. klog.Infof("No %s configmap found at installtime, using existing configs: %s", "pricing-configs", err.Error())
  702. } else {
  703. watchConfigFunc(configs)
  704. }
  705. for _, cw := range additionalConfigWatchers {
  706. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(cw.ConfigmapName, metav1.GetOptions{})
  707. if err != nil {
  708. klog.Infof("No %s configmap found at installtime, using existing configs: %s", cw.ConfigmapName, err.Error())
  709. } else {
  710. watchConfigFunc(configs)
  711. }
  712. }
  713. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  714. // TODO: General Architecture Note: Several passes have been made to modularize a lot of
  715. // TODO: our code, but the router still continues to be the obvious entry point for new \
  716. // TODO: features. We should look to split out the actual "router" functionality and
  717. // TODO: implement a builder -> controller for stitching new features and other dependencies.
  718. clusterManager := newClusterManager()
  719. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  720. Name: "node_cpu_hourly_cost",
  721. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  722. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  723. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  724. Name: "node_ram_hourly_cost",
  725. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  726. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  727. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  728. Name: "node_gpu_hourly_cost",
  729. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  730. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  731. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  732. Name: "node_total_hourly_cost",
  733. Help: "node_total_hourly_cost Total node cost per hour",
  734. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  735. spotGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  736. Name: "kubecost_node_is_spot",
  737. Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
  738. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  739. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  740. Name: "pv_hourly_cost",
  741. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  742. }, []string{"volumename", "persistentvolume"})
  743. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  744. Name: "container_memory_allocation_bytes",
  745. Help: "container_memory_allocation_bytes Bytes of RAM used",
  746. }, []string{"namespace", "pod", "container", "instance", "node"})
  747. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  748. Name: "container_cpu_allocation",
  749. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  750. }, []string{"namespace", "pod", "container", "instance", "node"})
  751. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  752. Name: "container_gpu_allocation",
  753. Help: "container_gpu_allocation GPU used",
  754. }, []string{"namespace", "pod", "container", "instance", "node"})
  755. PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  756. Name: "pod_pvc_allocation",
  757. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  758. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  759. NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  760. Name: "kubecost_network_zone_egress_cost",
  761. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  762. })
  763. NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  764. Name: "kubecost_network_region_egress_cost",
  765. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  766. })
  767. NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  768. Name: "kubecost_network_internet_egress_cost",
  769. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  770. })
  771. ClusterManagementCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  772. Name: "kubecost_cluster_management_cost",
  773. Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
  774. }, []string{"provisioner_name"})
  775. LBCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
  776. Name: "kubecost_load_balancer_cost",
  777. Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
  778. }, []string{"ingress_ip", "namespace", "service_name"}) // assumes one ingress IP per load balancer
  779. prometheus.MustRegister(cpuGv)
  780. prometheus.MustRegister(ramGv)
  781. prometheus.MustRegister(gpuGv)
  782. prometheus.MustRegister(totalGv)
  783. prometheus.MustRegister(pvGv)
  784. prometheus.MustRegister(spotGv)
  785. prometheus.MustRegister(RAMAllocation)
  786. prometheus.MustRegister(CPUAllocation)
  787. prometheus.MustRegister(PVAllocation)
  788. prometheus.MustRegister(GPUAllocation)
  789. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  790. prometheus.MustRegister(ClusterManagementCostRecorder)
  791. prometheus.MustRegister(LBCostRecorder)
  792. prometheus.MustRegister(ServiceCollector{
  793. KubeClientSet: kubeClientset,
  794. })
  795. prometheus.MustRegister(DeploymentCollector{
  796. KubeClientSet: kubeClientset,
  797. })
  798. prometheus.MustRegister(StatefulsetCollector{
  799. KubeClientSet: kubeClientset,
  800. })
  801. prometheus.MustRegister(ClusterInfoCollector{
  802. KubeClientSet: kubeClientset,
  803. Cloud: cloudProvider,
  804. })
  805. // cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
  806. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  807. A = Accesses{
  808. PrometheusClient: promCli,
  809. KubeClientSet: kubeClientset,
  810. ClusterManager: clusterManager,
  811. Cloud: cloudProvider,
  812. CPUPriceRecorder: cpuGv,
  813. RAMPriceRecorder: ramGv,
  814. GPUPriceRecorder: gpuGv,
  815. NodeTotalPriceRecorder: totalGv,
  816. NodeSpotRecorder: spotGv,
  817. RAMAllocationRecorder: RAMAllocation,
  818. CPUAllocationRecorder: CPUAllocation,
  819. GPUAllocationRecorder: GPUAllocation,
  820. PVAllocationRecorder: PVAllocation,
  821. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  822. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  823. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  824. PersistentVolumePriceRecorder: pvGv,
  825. ClusterManagementCostRecorder: ClusterManagementCostRecorder,
  826. LBCostRecorder: LBCostRecorder,
  827. Model: NewCostModel(k8sCache),
  828. OutOfClusterCache: outOfClusterCache,
  829. }
  830. remoteEnabled := env.IsRemoteEnabled()
  831. if remoteEnabled {
  832. info, err := cloudProvider.ClusterInfo()
  833. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  834. if err != nil {
  835. klog.Infof("Error saving cluster id %s", err.Error())
  836. }
  837. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  838. if err != nil {
  839. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  840. }
  841. }
  842. // Thanos Client
  843. if thanos.IsEnabled() {
  844. thanosUrl := thanos.QueryURL()
  845. if thanosUrl != "" {
  846. var thanosRT http.RoundTripper = &http.Transport{
  847. Proxy: http.ProxyFromEnvironment,
  848. DialContext: (&net.Dialer{
  849. Timeout: 120 * time.Second,
  850. KeepAlive: 120 * time.Second,
  851. }).DialContext,
  852. TLSHandshakeTimeout: 10 * time.Second,
  853. TLSClientConfig: tlsConfig,
  854. }
  855. thanosConfig := prometheusClient.Config{
  856. Address: thanosUrl,
  857. RoundTripper: thanosRT,
  858. }
  859. thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency, multiclusterDBBasicAuthUsername, multiclusterDBBasicAuthPW, multiClusterBearerToken)
  860. _, err = ValidatePrometheus(thanosCli, true)
  861. if err != nil {
  862. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
  863. A.ThanosClient = thanosCli
  864. } else {
  865. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
  866. A.ThanosClient = thanosCli
  867. }
  868. } else {
  869. klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  870. }
  871. }
  872. err = A.Cloud.DownloadPricingData()
  873. if err != nil {
  874. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  875. }
  876. StartCostModelMetricRecording(&A)
  877. managerEndpoints := cm.NewClusterManagerEndpoints(A.ClusterManager)
  878. Router.GET("/costDataModel", A.CostDataModel)
  879. Router.GET("/costDataModelRange", A.CostDataModelRange)
  880. Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
  881. Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
  882. Router.GET("/allNodePricing", A.GetAllNodePricing)
  883. Router.POST("/refreshPricing", A.RefreshPricingData)
  884. Router.GET("/clusterCostsOverTime", A.ClusterCostsOverTime)
  885. Router.GET("/clusterCosts", A.ClusterCosts)
  886. Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
  887. Router.GET("/managementPlatform", A.ManagementPlatform)
  888. Router.GET("/clusterInfo", A.ClusterInfo)
  889. Router.GET("/clusters", managerEndpoints.GetAllClusters)
  890. Router.GET("/serviceAccountStatus", A.GetServiceAccountStatus)
  891. Router.PUT("/clusters", managerEndpoints.PutCluster)
  892. Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
  893. }