router.go 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "net/http"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "k8s.io/klog"
  13. "github.com/julienschmidt/httprouter"
  14. sentry "github.com/getsentry/sentry-go"
  15. costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
  16. "github.com/kubecost/cost-model/pkg/clustercache"
  17. cm "github.com/kubecost/cost-model/pkg/clustermanager"
  18. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  19. "github.com/kubecost/cost-model/pkg/env"
  20. "github.com/kubecost/cost-model/pkg/errors"
  21. "github.com/kubecost/cost-model/pkg/prom"
  22. "github.com/kubecost/cost-model/pkg/thanos"
  23. prometheusClient "github.com/prometheus/client_golang/api"
  24. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "github.com/patrickmn/go-cache"
  28. "github.com/prometheus/client_golang/prometheus"
  29. "k8s.io/client-go/kubernetes"
  30. "k8s.io/client-go/rest"
  31. "k8s.io/client-go/tools/clientcmd"
  32. )
  33. const (
  34. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  35. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  36. )
  37. var (
  38. // gitCommit is set by the build system
  39. gitCommit string
  40. )
  41. var Router = httprouter.New()
  42. var A Accesses
  43. type Accesses struct {
  44. PrometheusClient prometheusClient.Client
  45. ThanosClient prometheusClient.Client
  46. KubeClientSet kubernetes.Interface
  47. ClusterManager *cm.ClusterManager
  48. ClusterMap clusters.ClusterMap
  49. Cloud costAnalyzerCloud.Provider
  50. CPUPriceRecorder *prometheus.GaugeVec
  51. RAMPriceRecorder *prometheus.GaugeVec
  52. PersistentVolumePriceRecorder *prometheus.GaugeVec
  53. GPUPriceRecorder *prometheus.GaugeVec
  54. NodeTotalPriceRecorder *prometheus.GaugeVec
  55. NodeSpotRecorder *prometheus.GaugeVec
  56. RAMAllocationRecorder *prometheus.GaugeVec
  57. CPUAllocationRecorder *prometheus.GaugeVec
  58. GPUAllocationRecorder *prometheus.GaugeVec
  59. PVAllocationRecorder *prometheus.GaugeVec
  60. ClusterManagementCostRecorder *prometheus.GaugeVec
  61. LBCostRecorder *prometheus.GaugeVec
  62. NetworkZoneEgressRecorder prometheus.Gauge
  63. NetworkRegionEgressRecorder prometheus.Gauge
  64. NetworkInternetEgressRecorder prometheus.Gauge
  65. ServiceSelectorRecorder *prometheus.GaugeVec
  66. DeploymentSelectorRecorder *prometheus.GaugeVec
  67. Model *CostModel
  68. OutOfClusterCache *cache.Cache
  69. }
  70. type DataEnvelope struct {
  71. Code int `json:"code"`
  72. Status string `json:"status"`
  73. Data interface{} `json:"data"`
  74. Message string `json:"message,omitempty"`
  75. }
  76. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  77. type FilterFunc func(*CostData) (bool, string)
  78. // FilterCostData allows through only CostData that matches all the given filter functions
  79. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  80. result := make(map[string]*CostData)
  81. filteredEnvironments := make(map[string]int)
  82. filteredContainers := 0
  83. DataLoop:
  84. for key, datum := range data {
  85. for _, rf := range retains {
  86. if ok, _ := rf(datum); ok {
  87. result[key] = datum
  88. // if any retain function passes, the data is retained and move on
  89. continue DataLoop
  90. }
  91. }
  92. for _, ff := range filters {
  93. if ok, environment := ff(datum); !ok {
  94. if environment != "" {
  95. filteredEnvironments[environment]++
  96. }
  97. filteredContainers++
  98. // if any filter function check fails, move on to the next datum
  99. continue DataLoop
  100. }
  101. }
  102. result[key] = datum
  103. }
  104. return result, filteredContainers, filteredEnvironments
  105. }
  106. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  107. fs := strings.Split(fields, ",")
  108. fmap := make(map[string]bool)
  109. for _, f := range fs {
  110. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  111. klog.V(1).Infof("to delete: %s", fieldNameLower)
  112. fmap[fieldNameLower] = true
  113. }
  114. filteredData := make(map[string]CostData)
  115. for cname, costdata := range data {
  116. s := reflect.TypeOf(*costdata)
  117. val := reflect.ValueOf(*costdata)
  118. costdata2 := CostData{}
  119. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  120. n := s.NumField()
  121. for i := 0; i < n; i++ {
  122. field := s.Field(i)
  123. value := val.Field(i)
  124. value2 := cd2.Field(i)
  125. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  126. value2.Set(reflect.Value(value))
  127. }
  128. }
  129. filteredData[cname] = cd2.Interface().(CostData)
  130. }
  131. return filteredData
  132. }
  133. func normalizeTimeParam(param string) (string, error) {
  134. if param == "" {
  135. return "", fmt.Errorf("invalid time param")
  136. }
  137. // convert days to hours
  138. if param[len(param)-1:] == "d" {
  139. count := param[:len(param)-1]
  140. val, err := strconv.ParseInt(count, 10, 64)
  141. if err != nil {
  142. return "", err
  143. }
  144. val = val * 24
  145. param = fmt.Sprintf("%dh", val)
  146. }
  147. return param, nil
  148. }
  149. // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  150. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  151. // return 0.0.
  152. func ParsePercentString(percentStr string) (float64, error) {
  153. if len(percentStr) == 0 {
  154. return 0.0, nil
  155. }
  156. if percentStr[len(percentStr)-1:] == "%" {
  157. percentStr = percentStr[:len(percentStr)-1]
  158. }
  159. discount, err := strconv.ParseFloat(percentStr, 64)
  160. if err != nil {
  161. return 0.0, err
  162. }
  163. discount *= 0.01
  164. return discount, nil
  165. }
  166. // parseDuration converts a Prometheus-style duration string into a Duration
  167. func ParseDuration(duration string) (*time.Duration, error) {
  168. unitStr := duration[len(duration)-1:]
  169. var unit time.Duration
  170. switch unitStr {
  171. case "s":
  172. unit = time.Second
  173. case "m":
  174. unit = time.Minute
  175. case "h":
  176. unit = time.Hour
  177. case "d":
  178. unit = 24.0 * time.Hour
  179. default:
  180. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  181. }
  182. amountStr := duration[:len(duration)-1]
  183. amount, err := strconv.ParseInt(amountStr, 10, 64)
  184. if err != nil {
  185. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  186. }
  187. dur := time.Duration(amount) * unit
  188. return &dur, nil
  189. }
  190. // ParseTimeRange returns a start and end time, respectively, which are converted from
  191. // a duration and offset, defined as strings with Prometheus-style syntax.
  192. func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
  193. // endTime defaults to the current time, unless an offset is explicity declared,
  194. // in which case it shifts endTime back by given duration
  195. endTime := time.Now()
  196. if offset != "" {
  197. o, err := ParseDuration(offset)
  198. if err != nil {
  199. return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
  200. }
  201. endTime = endTime.Add(-1 * *o)
  202. }
  203. // if duration is defined in terms of days, convert to hours
  204. // e.g. convert "2d" to "48h"
  205. durationNorm, err := normalizeTimeParam(duration)
  206. if err != nil {
  207. return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
  208. }
  209. // convert time duration into start and end times, formatted
  210. // as ISO datetime strings
  211. dur, err := time.ParseDuration(durationNorm)
  212. if err != nil {
  213. return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
  214. }
  215. startTime := endTime.Add(-1 * dur)
  216. return &startTime, &endTime, nil
  217. }
  218. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  219. var resp []byte
  220. if err != nil {
  221. klog.V(1).Infof("Error returned to client: %s", err.Error())
  222. resp, _ = json.Marshal(&DataEnvelope{
  223. Code: http.StatusInternalServerError,
  224. Status: "error",
  225. Message: err.Error(),
  226. Data: data,
  227. })
  228. } else {
  229. resp, _ = json.Marshal(&DataEnvelope{
  230. Code: http.StatusOK,
  231. Status: "success",
  232. Data: data,
  233. Message: message,
  234. })
  235. }
  236. return resp
  237. }
  238. func WrapData(data interface{}, err error) []byte {
  239. var resp []byte
  240. if err != nil {
  241. klog.V(1).Infof("Error returned to client: %s", err.Error())
  242. resp, _ = json.Marshal(&DataEnvelope{
  243. Code: http.StatusInternalServerError,
  244. Status: "error",
  245. Message: err.Error(),
  246. Data: data,
  247. })
  248. } else {
  249. resp, _ = json.Marshal(&DataEnvelope{
  250. Code: http.StatusOK,
  251. Status: "success",
  252. Data: data,
  253. })
  254. }
  255. return resp
  256. }
  257. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  258. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  259. w.Header().Set("Content-Type", "application/json")
  260. w.Header().Set("Access-Control-Allow-Origin", "*")
  261. err := a.Cloud.DownloadPricingData()
  262. w.Write(WrapData(nil, err))
  263. }
  264. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  265. w.Header().Set("Content-Type", "application/json")
  266. w.Header().Set("Access-Control-Allow-Origin", "*")
  267. window := r.URL.Query().Get("timeWindow")
  268. offset := r.URL.Query().Get("offset")
  269. fields := r.URL.Query().Get("filterFields")
  270. namespace := r.URL.Query().Get("namespace")
  271. if offset != "" {
  272. offset = "offset " + offset
  273. }
  274. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  275. if fields != "" {
  276. filteredData := filterFields(fields, data)
  277. w.Write(WrapData(filteredData, err))
  278. } else {
  279. w.Write(WrapData(data, err))
  280. }
  281. }
  282. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  283. w.Header().Set("Content-Type", "application/json")
  284. w.Header().Set("Access-Control-Allow-Origin", "*")
  285. window := r.URL.Query().Get("window")
  286. offset := r.URL.Query().Get("offset")
  287. useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
  288. if useThanos && !thanos.IsEnabled() {
  289. w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
  290. return
  291. }
  292. var client prometheusClient.Client
  293. if useThanos {
  294. client = a.ThanosClient
  295. offset = thanos.Offset()
  296. } else {
  297. client = a.PrometheusClient
  298. }
  299. data, err := ComputeClusterCosts(client, a.Cloud, window, offset, true)
  300. w.Write(WrapData(data, err))
  301. }
  302. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  303. w.Header().Set("Content-Type", "application/json")
  304. w.Header().Set("Access-Control-Allow-Origin", "*")
  305. start := r.URL.Query().Get("start")
  306. end := r.URL.Query().Get("end")
  307. window := r.URL.Query().Get("window")
  308. offset := r.URL.Query().Get("offset")
  309. data, err := ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  310. w.Write(WrapData(data, err))
  311. }
  312. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  313. w.Header().Set("Content-Type", "application/json")
  314. w.Header().Set("Access-Control-Allow-Origin", "*")
  315. start := r.URL.Query().Get("start")
  316. end := r.URL.Query().Get("end")
  317. window := r.URL.Query().Get("window")
  318. fields := r.URL.Query().Get("filterFields")
  319. namespace := r.URL.Query().Get("namespace")
  320. cluster := r.URL.Query().Get("cluster")
  321. remote := r.URL.Query().Get("remote")
  322. remoteEnabled := env.IsRemoteEnabled() && remote != "false"
  323. // Use Thanos Client if it exists (enabled) and remote flag set
  324. var pClient prometheusClient.Client
  325. if remote != "false" && a.ThanosClient != nil {
  326. pClient = a.ThanosClient
  327. } else {
  328. pClient = a.PrometheusClient
  329. }
  330. resolutionHours := 1.0
  331. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, resolutionHours, namespace, cluster, remoteEnabled)
  332. if err != nil {
  333. w.Write(WrapData(nil, err))
  334. }
  335. if fields != "" {
  336. filteredData := filterFields(fields, data)
  337. w.Write(WrapData(filteredData, err))
  338. } else {
  339. w.Write(WrapData(data, err))
  340. }
  341. }
  342. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  343. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  344. w.Header().Set("Content-Type", "application/json")
  345. w.Header().Set("Access-Control-Allow-Origin", "*")
  346. startString := r.URL.Query().Get("start")
  347. endString := r.URL.Query().Get("end")
  348. windowString := r.URL.Query().Get("window")
  349. var start time.Time
  350. var end time.Time
  351. var err error
  352. if windowString == "" {
  353. windowString = "1h"
  354. }
  355. if startString != "" {
  356. start, err = time.Parse(RFC3339Milli, startString)
  357. if err != nil {
  358. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  359. w.Write(WrapData(nil, err))
  360. }
  361. } else {
  362. window, err := time.ParseDuration(windowString)
  363. if err != nil {
  364. w.Write(WrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  365. }
  366. start = time.Now().Add(-2 * window)
  367. }
  368. if endString != "" {
  369. end, err = time.Parse(RFC3339Milli, endString)
  370. if err != nil {
  371. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  372. w.Write(WrapData(nil, err))
  373. }
  374. } else {
  375. end = time.Now()
  376. }
  377. remoteLayout := "2006-01-02T15:04:05Z"
  378. remoteStartStr := start.Format(remoteLayout)
  379. remoteEndStr := end.Format(remoteLayout)
  380. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  381. data, err := CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  382. w.Write(WrapData(data, err))
  383. }
  384. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  385. var key string
  386. var filter string
  387. var val []string
  388. if customAggregation != "" {
  389. key = customAggregation
  390. filter = filterType
  391. val = strings.Split(customAggregation, ",")
  392. } else {
  393. aggregations := strings.Split(aggregator, ",")
  394. for i, agg := range aggregations {
  395. aggregations[i] = "kubernetes_" + agg
  396. }
  397. key = strings.Join(aggregations, ",")
  398. filter = "kubernetes_" + filterType
  399. val = aggregations
  400. }
  401. return key, val, filter
  402. }
  403. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  404. w.Header().Set("Content-Type", "application/json")
  405. w.Header().Set("Access-Control-Allow-Origin", "*")
  406. start := r.URL.Query().Get("start")
  407. end := r.URL.Query().Get("end")
  408. aggregator := r.URL.Query().Get("aggregator")
  409. customAggregation := r.URL.Query().Get("customAggregation")
  410. filterType := r.URL.Query().Get("filterType")
  411. filterValue := r.URL.Query().Get("filterValue")
  412. var data []*costAnalyzerCloud.OutOfClusterAllocation
  413. var err error
  414. _, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
  415. data, err = a.Cloud.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
  416. w.Write(WrapData(data, err))
  417. }
  418. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  419. w.Header().Set("Content-Type", "application/json")
  420. w.Header().Set("Access-Control-Allow-Origin", "*")
  421. // start date for which to query costs, inclusive; format YYYY-MM-DD
  422. start := r.URL.Query().Get("start")
  423. // end date for which to query costs, inclusive; format YYYY-MM-DD
  424. end := r.URL.Query().Get("end")
  425. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  426. kubernetesAggregation := r.URL.Query().Get("aggregator")
  427. // customAggregation allows full customization of aggregator w/o prepending
  428. customAggregation := r.URL.Query().Get("customAggregation")
  429. // disableCache, if set to "true", tells this function to recompute and
  430. // cache the requested data
  431. disableCache := r.URL.Query().Get("disableCache") == "true"
  432. // clearCache, if set to "true", tells this function to flush the cache,
  433. // then recompute and cache the requested data
  434. clearCache := r.URL.Query().Get("clearCache") == "true"
  435. filterType := r.URL.Query().Get("filterType")
  436. filterValue := r.URL.Query().Get("filterValue")
  437. aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
  438. // clear cache prior to checking the cache so that a clearCache=true
  439. // request always returns a freshly computed value
  440. if clearCache {
  441. a.OutOfClusterCache.Flush()
  442. }
  443. // attempt to retrieve cost data from cache
  444. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
  445. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  446. if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
  447. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  448. return
  449. }
  450. klog.Errorf("caching error: failed to type cast data: %s", key)
  451. }
  452. data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
  453. if err == nil {
  454. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  455. }
  456. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  457. }
  458. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  459. w.Header().Set("Content-Type", "application/json")
  460. w.Header().Set("Access-Control-Allow-Origin", "*")
  461. data, err := p.Cloud.AllNodePricing()
  462. w.Write(WrapData(data, err))
  463. }
  464. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  465. w.Header().Set("Content-Type", "application/json")
  466. w.Header().Set("Access-Control-Allow-Origin", "*")
  467. data, err := p.Cloud.GetConfig()
  468. w.Write(WrapData(data, err))
  469. }
  470. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  471. w.Header().Set("Content-Type", "application/json")
  472. w.Header().Set("Access-Control-Allow-Origin", "*")
  473. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  474. if err != nil {
  475. w.Write(WrapData(data, err))
  476. return
  477. }
  478. w.Write(WrapData(data, err))
  479. err = p.Cloud.DownloadPricingData()
  480. if err != nil {
  481. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  482. }
  483. return
  484. }
  485. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  486. w.Header().Set("Content-Type", "application/json")
  487. w.Header().Set("Access-Control-Allow-Origin", "*")
  488. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  489. if err != nil {
  490. w.Write(WrapData(data, err))
  491. return
  492. }
  493. w.Write(WrapData(data, err))
  494. return
  495. }
  496. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  497. w.Header().Set("Content-Type", "application/json")
  498. w.Header().Set("Access-Control-Allow-Origin", "*")
  499. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  500. if err != nil {
  501. w.Write(WrapData(data, err))
  502. return
  503. }
  504. w.Write(WrapData(data, err))
  505. return
  506. }
  507. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  508. w.Header().Set("Content-Type", "application/json")
  509. w.Header().Set("Access-Control-Allow-Origin", "*")
  510. data, err := p.Cloud.UpdateConfig(r.Body, "")
  511. if err != nil {
  512. w.Write(WrapData(data, err))
  513. return
  514. }
  515. w.Write(WrapData(data, err))
  516. return
  517. }
  518. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  519. w.Header().Set("Content-Type", "application/json")
  520. w.Header().Set("Access-Control-Allow-Origin", "*")
  521. data, err := p.Cloud.GetManagementPlatform()
  522. if err != nil {
  523. w.Write(WrapData(data, err))
  524. return
  525. }
  526. w.Write(WrapData(data, err))
  527. return
  528. }
  529. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  530. w.Header().Set("Content-Type", "application/json")
  531. w.Header().Set("Access-Control-Allow-Origin", "*")
  532. data := GetClusterInfo(p.KubeClientSet, p.Cloud)
  533. w.Write(WrapData(data, nil))
  534. }
  535. func (p *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  536. w.Header().Set("Content-Type", "application/json")
  537. w.Header().Set("Access-Control-Allow-Origin", "*")
  538. data := p.ClusterMap.AsMap()
  539. w.Write(WrapData(data, nil))
  540. }
  541. func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  542. w.Header().Set("Content-Type", "application/json")
  543. w.Header().Set("Access-Control-Allow-Origin", "*")
  544. w.Write(WrapData(A.Cloud.ServiceAccountStatus(), nil))
  545. }
  546. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  547. w.Header().Set("Content-Type", "application/json")
  548. w.Header().Set("Access-Control-Allow-Origin", "*")
  549. w.Write(WrapData(prom.Validate(p.PrometheusClient)))
  550. }
  551. // Creates a new ClusterManager instance using a boltdb storage. If that fails,
  552. // then we fall back to a memory-only storage.
  553. func newClusterManager() *cm.ClusterManager {
  554. clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
  555. // Return a memory-backed cluster manager populated by configmap
  556. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  557. // NOTE: The following should be used with a persistent disk store. Since the
  558. // NOTE: configmap approach is currently the "persistent" source (entries are read-only
  559. // NOTE: on the backend), we don't currently need to store on disk.
  560. /*
  561. path := env.GetConfigPath()
  562. db, err := bolt.Open(path+"costmodel.db", 0600, nil)
  563. if err != nil {
  564. klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
  565. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  566. }
  567. store, err := cm.NewBoltDBClusterStorage("clusters", db)
  568. if err != nil {
  569. klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
  570. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  571. }
  572. return cm.NewConfiguredClusterManager(store, clustersConfigFile)
  573. */
  574. }
  575. type ConfigWatchers struct {
  576. ConfigmapName string
  577. WatchFunc func(string, map[string]string) error
  578. }
  579. // captures the panic event in sentry
  580. func capturePanicEvent(err string, stack string) {
  581. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  582. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  583. Level: sentry.LevelError,
  584. Message: msg,
  585. })
  586. sentry.Flush(5 * time.Second)
  587. }
  588. // handle any panics reported by the errors package
  589. func handlePanic(p errors.Panic) bool {
  590. err := p.Error
  591. if err != nil {
  592. if err, ok := err.(error); ok {
  593. capturePanicEvent(err.Error(), p.Stack)
  594. }
  595. if err, ok := err.(string); ok {
  596. capturePanicEvent(err, p.Stack)
  597. }
  598. }
  599. // Return true to recover iff the type is http, otherwise allow kubernetes
  600. // to recover.
  601. return p.Type == errors.PanicTypeHTTP
  602. }
  603. func Initialize(additionalConfigWatchers ...ConfigWatchers) {
  604. klog.InitFlags(nil)
  605. flag.Set("v", "3")
  606. flag.Parse()
  607. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  608. var err error
  609. if errorReportingEnabled {
  610. err = sentry.Init(sentry.ClientOptions{Release: gitCommit})
  611. if err != nil {
  612. klog.Infof("Failed to initialize sentry for error reporting")
  613. } else {
  614. err = errors.SetPanicHandler(handlePanic)
  615. if err != nil {
  616. klog.Infof("Failed to set panic handler: %s", err)
  617. }
  618. }
  619. }
  620. address := env.GetPrometheusServerEndpoint()
  621. if address == "" {
  622. klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  623. }
  624. queryConcurrency := env.GetMaxQueryConcurrency()
  625. klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
  626. timeout := 120 * time.Second
  627. keepAlive := 120 * time.Second
  628. promCli, _ := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
  629. m, err := prom.Validate(promCli)
  630. if err != nil || m.Running == false {
  631. if err != nil {
  632. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  633. } else if m.Running == false {
  634. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
  635. }
  636. api := prometheusAPI.NewAPI(promCli)
  637. _, err = api.Config(context.Background())
  638. if err != nil {
  639. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
  640. } else {
  641. klog.V(1).Info("Retrieved a prometheus config file from: " + address)
  642. }
  643. } else {
  644. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  645. }
  646. // Kubernetes API setup
  647. var kc *rest.Config
  648. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  649. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  650. } else {
  651. kc, err = rest.InClusterConfig()
  652. }
  653. if err != nil {
  654. panic(err.Error())
  655. }
  656. kubeClientset, err := kubernetes.NewForConfig(kc)
  657. if err != nil {
  658. panic(err.Error())
  659. }
  660. // Create Kubernetes Cluster Cache + Watchers
  661. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  662. k8sCache.Run()
  663. cloudProviderKey := env.GetCloudProviderAPIKey()
  664. cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
  665. if err != nil {
  666. panic(err.Error())
  667. }
  668. watchConfigFunc := func(c interface{}) {
  669. conf := c.(*v1.ConfigMap)
  670. if conf.GetName() == "pricing-configs" {
  671. _, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
  672. if err != nil {
  673. klog.Infof("ERROR UPDATING %s CONFIG: %s", "pricing-configs", err.Error())
  674. }
  675. }
  676. for _, cw := range additionalConfigWatchers {
  677. if conf.GetName() == cw.ConfigmapName {
  678. err := cw.WatchFunc(conf.GetName(), conf.Data)
  679. if err != nil {
  680. klog.Infof("ERROR UPDATING %s CONFIG: %s", cw.ConfigmapName, err.Error())
  681. }
  682. }
  683. }
  684. }
  685. kubecostNamespace := env.GetKubecostNamespace()
  686. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  687. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
  688. if err != nil {
  689. klog.Infof("No %s configmap found at installtime, using existing configs: %s", "pricing-configs", err.Error())
  690. } else {
  691. watchConfigFunc(configs)
  692. }
  693. for _, cw := range additionalConfigWatchers {
  694. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(cw.ConfigmapName, metav1.GetOptions{})
  695. if err != nil {
  696. klog.Infof("No %s configmap found at installtime, using existing configs: %s", cw.ConfigmapName, err.Error())
  697. } else {
  698. watchConfigFunc(configs)
  699. }
  700. }
  701. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  702. // TODO: General Architecture Note: Several passes have been made to modularize a lot of
  703. // TODO: our code, but the router still continues to be the obvious entry point for new \
  704. // TODO: features. We should look to split out the actual "router" functionality and
  705. // TODO: implement a builder -> controller for stitching new features and other dependencies.
  706. clusterManager := newClusterManager()
  707. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  708. Name: "node_cpu_hourly_cost",
  709. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  710. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  711. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  712. Name: "node_ram_hourly_cost",
  713. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  714. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  715. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  716. Name: "node_gpu_hourly_cost",
  717. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  718. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  719. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  720. Name: "node_total_hourly_cost",
  721. Help: "node_total_hourly_cost Total node cost per hour",
  722. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  723. spotGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  724. Name: "kubecost_node_is_spot",
  725. Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
  726. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  727. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  728. Name: "pv_hourly_cost",
  729. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  730. }, []string{"volumename", "persistentvolume", "provider_id"})
  731. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  732. Name: "container_memory_allocation_bytes",
  733. Help: "container_memory_allocation_bytes Bytes of RAM used",
  734. }, []string{"namespace", "pod", "container", "instance", "node"})
  735. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  736. Name: "container_cpu_allocation",
  737. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  738. }, []string{"namespace", "pod", "container", "instance", "node"})
  739. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  740. Name: "container_gpu_allocation",
  741. Help: "container_gpu_allocation GPU used",
  742. }, []string{"namespace", "pod", "container", "instance", "node"})
  743. PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  744. Name: "pod_pvc_allocation",
  745. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  746. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  747. NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  748. Name: "kubecost_network_zone_egress_cost",
  749. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  750. })
  751. NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  752. Name: "kubecost_network_region_egress_cost",
  753. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  754. })
  755. NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  756. Name: "kubecost_network_internet_egress_cost",
  757. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  758. })
  759. ClusterManagementCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  760. Name: "kubecost_cluster_management_cost",
  761. Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
  762. }, []string{"provisioner_name"})
  763. LBCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
  764. Name: "kubecost_load_balancer_cost",
  765. Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
  766. }, []string{"ingress_ip", "namespace", "service_name"}) // assumes one ingress IP per load balancer
  767. prometheus.MustRegister(cpuGv)
  768. prometheus.MustRegister(ramGv)
  769. prometheus.MustRegister(gpuGv)
  770. prometheus.MustRegister(totalGv)
  771. prometheus.MustRegister(pvGv)
  772. prometheus.MustRegister(spotGv)
  773. prometheus.MustRegister(RAMAllocation)
  774. prometheus.MustRegister(CPUAllocation)
  775. prometheus.MustRegister(PVAllocation)
  776. prometheus.MustRegister(GPUAllocation)
  777. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  778. prometheus.MustRegister(ClusterManagementCostRecorder)
  779. prometheus.MustRegister(LBCostRecorder)
  780. prometheus.MustRegister(ServiceCollector{
  781. KubeClientSet: kubeClientset,
  782. })
  783. prometheus.MustRegister(DeploymentCollector{
  784. KubeClientSet: kubeClientset,
  785. })
  786. prometheus.MustRegister(StatefulsetCollector{
  787. KubeClientSet: kubeClientset,
  788. })
  789. prometheus.MustRegister(ClusterInfoCollector{
  790. KubeClientSet: kubeClientset,
  791. Cloud: cloudProvider,
  792. })
  793. // cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
  794. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  795. remoteEnabled := env.IsRemoteEnabled()
  796. if remoteEnabled {
  797. info, err := cloudProvider.ClusterInfo()
  798. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  799. if err != nil {
  800. klog.Infof("Error saving cluster id %s", err.Error())
  801. }
  802. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  803. if err != nil {
  804. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  805. }
  806. }
  807. // Thanos Client
  808. var thanosClient prometheusClient.Client
  809. if thanos.IsEnabled() {
  810. thanosAddress := thanos.QueryURL()
  811. if thanosAddress != "" {
  812. thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
  813. _, err = prom.Validate(thanosCli)
  814. if err != nil {
  815. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
  816. thanosClient = thanosCli
  817. } else {
  818. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
  819. thanosClient = thanosCli
  820. }
  821. } else {
  822. klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
  823. }
  824. }
  825. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  826. var clusterMap clusters.ClusterMap
  827. if thanosClient != nil {
  828. clusterMap = clusters.NewClusterMap(thanosClient, 10*time.Minute)
  829. } else {
  830. clusterMap = clusters.NewClusterMap(promCli, 5*time.Minute)
  831. }
  832. A = Accesses{
  833. PrometheusClient: promCli,
  834. ThanosClient: thanosClient,
  835. KubeClientSet: kubeClientset,
  836. ClusterManager: clusterManager,
  837. ClusterMap: clusterMap,
  838. Cloud: cloudProvider,
  839. CPUPriceRecorder: cpuGv,
  840. RAMPriceRecorder: ramGv,
  841. GPUPriceRecorder: gpuGv,
  842. NodeTotalPriceRecorder: totalGv,
  843. NodeSpotRecorder: spotGv,
  844. RAMAllocationRecorder: RAMAllocation,
  845. CPUAllocationRecorder: CPUAllocation,
  846. GPUAllocationRecorder: GPUAllocation,
  847. PVAllocationRecorder: PVAllocation,
  848. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  849. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  850. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  851. PersistentVolumePriceRecorder: pvGv,
  852. ClusterManagementCostRecorder: ClusterManagementCostRecorder,
  853. LBCostRecorder: LBCostRecorder,
  854. Model: NewCostModel(k8sCache, clusterMap),
  855. OutOfClusterCache: outOfClusterCache,
  856. }
  857. err = A.Cloud.DownloadPricingData()
  858. if err != nil {
  859. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  860. }
  861. StartCostModelMetricRecording(&A)
  862. managerEndpoints := cm.NewClusterManagerEndpoints(A.ClusterManager)
  863. Router.GET("/costDataModel", A.CostDataModel)
  864. Router.GET("/costDataModelRange", A.CostDataModelRange)
  865. Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
  866. Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
  867. Router.GET("/allNodePricing", A.GetAllNodePricing)
  868. Router.POST("/refreshPricing", A.RefreshPricingData)
  869. Router.GET("/clusterCostsOverTime", A.ClusterCostsOverTime)
  870. Router.GET("/clusterCosts", A.ClusterCosts)
  871. Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
  872. Router.GET("/managementPlatform", A.ManagementPlatform)
  873. Router.GET("/clusterInfo", A.ClusterInfo)
  874. Router.GET("/clusterInfoMap", A.GetClusterInfoMap)
  875. Router.GET("/serviceAccountStatus", A.GetServiceAccountStatus)
  876. // cluster manager endpoints
  877. Router.GET("/clusters", managerEndpoints.GetAllClusters)
  878. Router.PUT("/clusters", managerEndpoints.PutCluster)
  879. Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
  880. }