router.go 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "os"
  10. "reflect"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "k8s.io/klog"
  15. "github.com/julienschmidt/httprouter"
  16. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  17. "github.com/patrickmn/go-cache"
  18. prometheusClient "github.com/prometheus/client_golang/api"
  19. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  20. "github.com/prometheus/client_golang/prometheus"
  21. v1 "k8s.io/api/core/v1"
  22. "k8s.io/client-go/kubernetes"
  23. "k8s.io/client-go/rest"
  24. )
  25. const (
  26. prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
  27. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  28. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  29. )
  30. var (
  31. // gitCommit is set by the build system
  32. gitCommit string
  33. )
  34. var Router = httprouter.New()
  35. var A Accesses
  36. type Accesses struct {
  37. PrometheusClient prometheusClient.Client
  38. ThanosClient prometheusClient.Client
  39. KubeClientSet kubernetes.Interface
  40. Cloud costAnalyzerCloud.Provider
  41. CPUPriceRecorder *prometheus.GaugeVec
  42. RAMPriceRecorder *prometheus.GaugeVec
  43. PersistentVolumePriceRecorder *prometheus.GaugeVec
  44. GPUPriceRecorder *prometheus.GaugeVec
  45. NodeTotalPriceRecorder *prometheus.GaugeVec
  46. RAMAllocationRecorder *prometheus.GaugeVec
  47. CPUAllocationRecorder *prometheus.GaugeVec
  48. GPUAllocationRecorder *prometheus.GaugeVec
  49. PVAllocationRecorder *prometheus.GaugeVec
  50. ContainerUptimeRecorder *prometheus.GaugeVec
  51. NetworkZoneEgressRecorder prometheus.Gauge
  52. NetworkRegionEgressRecorder prometheus.Gauge
  53. NetworkInternetEgressRecorder prometheus.Gauge
  54. ServiceSelectorRecorder *prometheus.GaugeVec
  55. DeploymentSelectorRecorder *prometheus.GaugeVec
  56. Model *CostModel
  57. AggregateCache *cache.Cache
  58. CostDataCache *cache.Cache
  59. OutOfClusterCache *cache.Cache
  60. }
  61. type DataEnvelope struct {
  62. Code int `json:"code"`
  63. Status string `json:"status"`
  64. Data interface{} `json:"data"`
  65. Message string `json:"message,omitempty"`
  66. }
  67. // filterCostData allows through only CostData that matches the given filters for namespace and clusterId
  68. func filterCostData(data map[string]*CostData, namespace, clusterId string) map[string]*CostData {
  69. result := make(map[string]*CostData)
  70. for key, datum := range data {
  71. if costDataPassesFilters(datum, namespace, clusterId) {
  72. result[key] = datum
  73. }
  74. }
  75. return result
  76. }
  77. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  78. fs := strings.Split(fields, ",")
  79. fmap := make(map[string]bool)
  80. for _, f := range fs {
  81. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  82. klog.V(1).Infof("to delete: %s", fieldNameLower)
  83. fmap[fieldNameLower] = true
  84. }
  85. filteredData := make(map[string]CostData)
  86. for cname, costdata := range data {
  87. s := reflect.TypeOf(*costdata)
  88. val := reflect.ValueOf(*costdata)
  89. costdata2 := CostData{}
  90. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  91. n := s.NumField()
  92. for i := 0; i < n; i++ {
  93. field := s.Field(i)
  94. value := val.Field(i)
  95. value2 := cd2.Field(i)
  96. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  97. value2.Set(reflect.Value(value))
  98. }
  99. }
  100. filteredData[cname] = cd2.Interface().(CostData)
  101. }
  102. return filteredData
  103. }
  104. func normalizeTimeParam(param string) (string, error) {
  105. // convert days to hours
  106. if param[len(param)-1:] == "d" {
  107. count := param[:len(param)-1]
  108. val, err := strconv.ParseInt(count, 10, 64)
  109. if err != nil {
  110. return "", err
  111. }
  112. val = val * 24
  113. param = fmt.Sprintf("%dh", val)
  114. }
  115. return param, nil
  116. }
  117. // parseDuration converts a Prometheus-style duration string into a Duration
  118. func parseDuration(duration string) (*time.Duration, error) {
  119. unitStr := duration[len(duration)-1:]
  120. var unit time.Duration
  121. switch unitStr {
  122. case "s":
  123. unit = time.Second
  124. case "m":
  125. unit = time.Minute
  126. case "h":
  127. unit = time.Hour
  128. case "d":
  129. unit = 24.0 * time.Hour
  130. default:
  131. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  132. }
  133. amountStr := duration[:len(duration)-1]
  134. amount, err := strconv.ParseInt(amountStr, 10, 64)
  135. if err != nil {
  136. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  137. }
  138. dur := time.Duration(amount) * unit
  139. return &dur, nil
  140. }
  141. // parseTimeRange returns a start and end time, respectively, which are converted from
  142. // a duration and offset, defined as strings with Prometheus-style syntax.
  143. func parseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
  144. // endTime defaults to the current time, unless an offset is explicity declared,
  145. // in which case it shifts endTime back by given duration
  146. endTime := time.Now()
  147. if offset != "" {
  148. o, err := time.ParseDuration(offset)
  149. if err != nil {
  150. return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
  151. }
  152. endTime = endTime.Add(-1 * o)
  153. }
  154. // if duration is defined in terms of days, convert to hours
  155. // e.g. convert "2d" to "48h"
  156. durationNorm, err := normalizeTimeParam(duration)
  157. if err != nil {
  158. return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
  159. }
  160. // convert time duration into start and end times, formatted
  161. // as ISO datetime strings
  162. dur, err := time.ParseDuration(durationNorm)
  163. if err != nil {
  164. return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
  165. }
  166. startTime := endTime.Add(-1 * dur)
  167. return &startTime, &endTime, nil
  168. }
  169. func wrapDataWithMessage(data interface{}, err error, message string) []byte {
  170. var resp []byte
  171. if err != nil {
  172. klog.V(1).Infof("Error returned to client: %s", err.Error())
  173. resp, _ = json.Marshal(&DataEnvelope{
  174. Code: http.StatusInternalServerError,
  175. Status: "error",
  176. Message: err.Error(),
  177. Data: data,
  178. })
  179. } else {
  180. resp, _ = json.Marshal(&DataEnvelope{
  181. Code: http.StatusOK,
  182. Status: "success",
  183. Data: data,
  184. Message: message,
  185. })
  186. }
  187. return resp
  188. }
  189. func wrapData(data interface{}, err error) []byte {
  190. var resp []byte
  191. if err != nil {
  192. klog.V(1).Infof("Error returned to client: %s", err.Error())
  193. resp, _ = json.Marshal(&DataEnvelope{
  194. Code: http.StatusInternalServerError,
  195. Status: "error",
  196. Message: err.Error(),
  197. Data: data,
  198. })
  199. } else {
  200. resp, _ = json.Marshal(&DataEnvelope{
  201. Code: http.StatusOK,
  202. Status: "success",
  203. Data: data,
  204. })
  205. }
  206. return resp
  207. }
  208. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  209. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  210. w.Header().Set("Content-Type", "application/json")
  211. w.Header().Set("Access-Control-Allow-Origin", "*")
  212. err := a.Cloud.DownloadPricingData()
  213. w.Write(wrapData(nil, err))
  214. }
  215. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  216. w.Header().Set("Content-Type", "application/json")
  217. w.Header().Set("Access-Control-Allow-Origin", "*")
  218. window := r.URL.Query().Get("timeWindow")
  219. offset := r.URL.Query().Get("offset")
  220. fields := r.URL.Query().Get("filterFields")
  221. namespace := r.URL.Query().Get("namespace")
  222. aggregationField := r.URL.Query().Get("aggregation")
  223. subfields := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  224. if offset != "" {
  225. offset = "offset " + offset
  226. }
  227. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  228. if aggregationField != "" {
  229. c, err := a.Cloud.GetConfig()
  230. if err != nil {
  231. w.Write(wrapData(nil, err))
  232. return
  233. }
  234. discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
  235. if err != nil {
  236. w.Write(wrapData(nil, err))
  237. return
  238. }
  239. discount = discount * 0.01
  240. dur, err := time.ParseDuration(window)
  241. if err != nil {
  242. w.Write(wrapData(nil, err))
  243. return
  244. }
  245. // dataCount is the number of time series data expected for the given interval,
  246. // which we compute because Prometheus time series vectors omit zero values.
  247. // This assumes hourly data, incremented by one to capture the 0th data point.
  248. dataCount := int(dur.Hours())
  249. opts := &AggregationOptions{
  250. DataCount: dataCount,
  251. Discount: discount,
  252. IdleCoefficients: make(map[string]float64),
  253. }
  254. agg := AggregateCostData(data, aggregationField, subfields, a.Cloud, opts)
  255. w.Write(wrapData(agg, nil))
  256. } else {
  257. if fields != "" {
  258. filteredData := filterFields(fields, data)
  259. w.Write(wrapData(filteredData, err))
  260. } else {
  261. w.Write(wrapData(data, err))
  262. }
  263. }
  264. }
  265. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  266. w.Header().Set("Content-Type", "application/json")
  267. w.Header().Set("Access-Control-Allow-Origin", "*")
  268. window := r.URL.Query().Get("window")
  269. offset := r.URL.Query().Get("offset")
  270. data, err := ClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
  271. w.Write(wrapData(data, err))
  272. }
  273. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  274. w.Header().Set("Content-Type", "application/json")
  275. w.Header().Set("Access-Control-Allow-Origin", "*")
  276. start := r.URL.Query().Get("start")
  277. end := r.URL.Query().Get("end")
  278. window := r.URL.Query().Get("window")
  279. offset := r.URL.Query().Get("offset")
  280. data, err := ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  281. w.Write(wrapData(data, err))
  282. }
  283. // AggregateCostModel handles HTTP requests to the aggregated cost model API, which can be parametrized
  284. // by time period using window and offset, aggregation field and subfield (e.g. grouping by label.app
  285. // using aggregation=label, aggregationSubfield=app), and filtered by namespace and cluster.
  286. func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  287. w.Header().Set("Content-Type", "application/json")
  288. w.Header().Set("Access-Control-Allow-Origin", "*")
  289. duration := r.URL.Query().Get("window")
  290. offset := r.URL.Query().Get("offset")
  291. namespace := r.URL.Query().Get("namespace")
  292. cluster := r.URL.Query().Get("cluster")
  293. field := r.URL.Query().Get("aggregation")
  294. subfieldStr := r.URL.Query().Get("aggregationSubfield")
  295. rate := r.URL.Query().Get("rate")
  296. allocateIdle := r.URL.Query().Get("allocateIdle") == "true"
  297. sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
  298. sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
  299. sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
  300. remote := r.URL.Query().Get("remote") != "false"
  301. subfields := []string{}
  302. if len(subfieldStr) > 0 {
  303. subfields = strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  304. }
  305. // timeSeries == true maintains the time series dimension of the data,
  306. // which by default gets summed over the entire interval
  307. includeTimeSeries := r.URL.Query().Get("timeSeries") == "true"
  308. // efficiency == true aggregates and returns usage and efficiency data
  309. includeEfficiency := r.URL.Query().Get("efficiency") == "true"
  310. // disableCache, if set to "true", tells this function to recompute and
  311. // cache the requested data
  312. disableCache := r.URL.Query().Get("disableCache") == "true"
  313. // clearCache, if set to "true", tells this function to flush the cache,
  314. // then recompute and cache the requested data
  315. clearCache := r.URL.Query().Get("clearCache") == "true"
  316. // aggregation field is required
  317. if field == "" {
  318. w.WriteHeader(http.StatusBadRequest)
  319. w.Write(wrapData(nil, fmt.Errorf("Missing aggregation field parameter")))
  320. return
  321. }
  322. // aggregation subfield is required when aggregation field is "label"
  323. if field == "label" && len(subfields) == 0 {
  324. w.WriteHeader(http.StatusBadRequest)
  325. w.Write(wrapData(nil, fmt.Errorf("Missing aggregation subfield parameter for aggregation by label")))
  326. return
  327. }
  328. // enforce one of four available rate options
  329. if rate != "" && rate != "hourly" && rate != "daily" && rate != "monthly" {
  330. w.WriteHeader(http.StatusBadRequest)
  331. w.Write(wrapData(nil, fmt.Errorf("If set, rate parameter must be one of: 'hourly', 'daily', 'monthly'")))
  332. return
  333. }
  334. // clear cache prior to checking the cache so that a clearCache=true
  335. // request always returns a freshly computed value
  336. if clearCache {
  337. a.AggregateCache.Flush()
  338. a.CostDataCache.Flush()
  339. }
  340. // parametrize cache key by all request parameters
  341. aggKey := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t",
  342. duration, offset, namespace, cluster, field, strings.Join(subfields, ","), rate,
  343. allocateIdle, includeTimeSeries, includeEfficiency)
  344. // check the cache for aggregated response; if cache is hit and not disabled, return response
  345. if result, found := a.AggregateCache.Get(aggKey); found && !disableCache {
  346. w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache hit: %s", aggKey)))
  347. return
  348. }
  349. // enable remote if it is available and not disabled
  350. remoteAvailable := os.Getenv(remoteEnabled) == "true"
  351. remoteEnabled := remote && remoteAvailable
  352. // Use Thanos Client if it exists (enabled) and remote flag set
  353. var pClient prometheusClient.Client
  354. if remote && a.ThanosClient != nil {
  355. pClient = a.ThanosClient
  356. } else {
  357. pClient = a.PrometheusClient
  358. }
  359. // convert duration and offset to start and end times
  360. startTime, endTime, err := parseTimeRange(duration, offset)
  361. if err != nil {
  362. w.WriteHeader(http.StatusBadRequest)
  363. w.Write(wrapData(nil, fmt.Errorf("Error parsing duration (%s) and offset (%s)", duration, offset)))
  364. return
  365. }
  366. durationHours := endTime.Sub(*startTime).Hours()
  367. threeHoursAgo := time.Now().Add(-3 * time.Hour)
  368. if a.ThanosClient != nil && endTime.After(threeHoursAgo) {
  369. klog.Infof("Setting end time backwards to first present data")
  370. *endTime = time.Now().Add(-3 * time.Hour)
  371. }
  372. // determine resolution by size of duration
  373. resolution := "1h"
  374. if durationHours >= 2160 {
  375. // 90 days
  376. resolution = "72h"
  377. } else if durationHours >= 720 {
  378. // 30 days
  379. resolution = "24h"
  380. } else if durationHours >= 168 {
  381. // 7 days
  382. resolution = "6h"
  383. } else if durationHours >= 48 {
  384. // 2 days
  385. resolution = "2h"
  386. }
  387. resolutionDuration, err := parseDuration(resolution)
  388. resolutionHours := resolutionDuration.Hours()
  389. if err != nil {
  390. w.WriteHeader(http.StatusBadRequest)
  391. w.Write(wrapData(nil, fmt.Errorf("Error parsing resolution (%s)", resolution)))
  392. return
  393. }
  394. // exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
  395. // e.g. requesting duration=2d, offset=1d, resolution=1h on Jan 4 12:00:00 should provide data for Jan 1 12:00 - Jan 3 12:00
  396. // which has the equivalent start and end times of Jan 1 1:00 and Jan 3 12:00, respectively.
  397. *startTime = startTime.Add(1 * *resolutionDuration)
  398. // attempt to retrieve cost data from cache
  399. var costData map[string]*CostData
  400. key := fmt.Sprintf(`%s:%s:%s:%t`, duration, offset, resolution, remoteEnabled)
  401. cacheData, found := a.CostDataCache.Get(key)
  402. if found && !disableCache {
  403. ok := false
  404. costData, ok = cacheData.(map[string]*CostData)
  405. if !ok {
  406. klog.Errorf("caching error: failed to cast cost data to struct: %s", key)
  407. }
  408. } else {
  409. start := startTime.Format(RFC3339Milli)
  410. end := endTime.Format(RFC3339Milli)
  411. costData, err = a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, resolution, "", "", remoteEnabled)
  412. if err != nil {
  413. w.Write(wrapData(nil, err))
  414. return
  415. }
  416. a.CostDataCache.Set(key, costData, cache.DefaultExpiration)
  417. }
  418. // filter cost data by namespace and cluster after caching for maximal cache hits
  419. costData = filterCostData(costData, namespace, cluster)
  420. c, err := a.Cloud.GetConfig()
  421. if err != nil {
  422. w.Write(wrapData(nil, err))
  423. return
  424. }
  425. discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
  426. if err != nil {
  427. w.Write(wrapData(nil, err))
  428. return
  429. }
  430. discount = discount * 0.01
  431. idleCoefficients := make(map[string]float64)
  432. if allocateIdle {
  433. windowStr := fmt.Sprintf("%dh", int(durationHours))
  434. if a.ThanosClient != nil {
  435. klog.Infof("Setting offset to 3h")
  436. offset = "3h"
  437. }
  438. idleCoefficients, err = ComputeIdleCoefficient(costData, pClient, a.Cloud, discount, windowStr, offset, resolution)
  439. if err != nil {
  440. klog.Errorf("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
  441. w.Write(wrapData(nil, err))
  442. return
  443. }
  444. }
  445. sn := []string{}
  446. sln := []string{}
  447. slv := []string{}
  448. if sharedNamespaces != "" {
  449. sn = strings.Split(sharedNamespaces, ",")
  450. }
  451. if sharedLabelNames != "" {
  452. sln = strings.Split(sharedLabelNames, ",")
  453. slv = strings.Split(sharedLabelValues, ",")
  454. if len(sln) != len(slv) || slv[0] == "" {
  455. w.Write(wrapData(nil, fmt.Errorf("Supply exacly one label value per label name")))
  456. return
  457. }
  458. }
  459. var sr *SharedResourceInfo
  460. if len(sn) > 0 || len(sln) > 0 {
  461. sr = NewSharedResourceInfo(true, sn, sln, slv)
  462. }
  463. for cid, idleCoefficient := range idleCoefficients {
  464. klog.Infof("Idle Coeff: %s: %f", cid, idleCoefficient)
  465. }
  466. dataCount := int(durationHours / resolutionHours)
  467. klog.V(1).Infof("data count = %d for duration (%fh) resolution (%fh)", dataCount, durationHours, resolutionHours)
  468. // aggregate cost model data by given fields and cache the result for the default expiration
  469. opts := &AggregationOptions{
  470. DataCount: dataCount,
  471. Discount: discount,
  472. IdleCoefficients: idleCoefficients,
  473. IncludeEfficiency: includeEfficiency,
  474. IncludeTimeSeries: includeTimeSeries,
  475. Rate: rate,
  476. ResolutionCoefficient: resolutionHours,
  477. SharedResourceInfo: sr,
  478. }
  479. result := AggregateCostData(costData, field, subfields, a.Cloud, opts)
  480. a.AggregateCache.Set(aggKey, result, cache.DefaultExpiration)
  481. w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache miss: %s", aggKey)))
  482. }
  483. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  484. w.Header().Set("Content-Type", "application/json")
  485. w.Header().Set("Access-Control-Allow-Origin", "*")
  486. start := r.URL.Query().Get("start")
  487. end := r.URL.Query().Get("end")
  488. window := r.URL.Query().Get("window")
  489. fields := r.URL.Query().Get("filterFields")
  490. namespace := r.URL.Query().Get("namespace")
  491. cluster := r.URL.Query().Get("cluster")
  492. aggregationField := r.URL.Query().Get("aggregation")
  493. subfields := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  494. remote := r.URL.Query().Get("remote")
  495. remoteAvailable := os.Getenv(remoteEnabled)
  496. remoteEnabled := false
  497. if remoteAvailable == "true" && remote != "false" {
  498. remoteEnabled = true
  499. }
  500. // Use Thanos Client if it exists (enabled) and remote flag set
  501. var pClient prometheusClient.Client
  502. if remote != "false" && a.ThanosClient != nil {
  503. pClient = a.ThanosClient
  504. } else {
  505. pClient = a.PrometheusClient
  506. }
  507. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, namespace, cluster, remoteEnabled)
  508. if err != nil {
  509. w.Write(wrapData(nil, err))
  510. }
  511. if aggregationField != "" {
  512. c, err := a.Cloud.GetConfig()
  513. if err != nil {
  514. w.Write(wrapData(nil, err))
  515. return
  516. }
  517. discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
  518. if err != nil {
  519. w.Write(wrapData(nil, err))
  520. }
  521. discount = discount * 0.01
  522. opts := &AggregationOptions{
  523. Discount: discount,
  524. IdleCoefficients: make(map[string]float64),
  525. }
  526. agg := AggregateCostData(data, aggregationField, subfields, a.Cloud, opts)
  527. w.Write(wrapData(agg, nil))
  528. } else {
  529. if fields != "" {
  530. filteredData := filterFields(fields, data)
  531. w.Write(wrapData(filteredData, err))
  532. } else {
  533. w.Write(wrapData(data, err))
  534. }
  535. }
  536. }
  537. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  538. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  539. w.Header().Set("Content-Type", "application/json")
  540. w.Header().Set("Access-Control-Allow-Origin", "*")
  541. startString := r.URL.Query().Get("start")
  542. endString := r.URL.Query().Get("end")
  543. windowString := r.URL.Query().Get("window")
  544. var start time.Time
  545. var end time.Time
  546. var err error
  547. if windowString == "" {
  548. windowString = "1h"
  549. }
  550. if startString != "" {
  551. start, err = time.Parse(RFC3339Milli, startString)
  552. if err != nil {
  553. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  554. w.Write(wrapData(nil, err))
  555. }
  556. } else {
  557. window, err := time.ParseDuration(windowString)
  558. if err != nil {
  559. w.Write(wrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  560. }
  561. start = time.Now().Add(-2 * window)
  562. }
  563. if endString != "" {
  564. end, err = time.Parse(RFC3339Milli, endString)
  565. if err != nil {
  566. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  567. w.Write(wrapData(nil, err))
  568. }
  569. } else {
  570. end = time.Now()
  571. }
  572. remoteLayout := "2006-01-02T15:04:05Z"
  573. remoteStartStr := start.Format(remoteLayout)
  574. remoteEndStr := end.Format(remoteLayout)
  575. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  576. data, err := CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  577. w.Write(wrapData(data, err))
  578. }
  579. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  580. w.Header().Set("Content-Type", "application/json")
  581. w.Header().Set("Access-Control-Allow-Origin", "*")
  582. start := r.URL.Query().Get("start")
  583. end := r.URL.Query().Get("end")
  584. aggregator := r.URL.Query().Get("aggregator")
  585. customAggregation := r.URL.Query().Get("customAggregation")
  586. var data []*costAnalyzerCloud.OutOfClusterAllocation
  587. var err error
  588. if customAggregation != "" {
  589. data, err = a.Cloud.ExternalAllocations(start, end, customAggregation)
  590. } else {
  591. data, err = a.Cloud.ExternalAllocations(start, end, "kubernetes_"+aggregator)
  592. }
  593. w.Write(wrapData(data, err))
  594. }
  595. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  596. w.Header().Set("Content-Type", "application/json")
  597. w.Header().Set("Access-Control-Allow-Origin", "*")
  598. // start date for which to query costs, inclusive; format YYYY-MM-DD
  599. start := r.URL.Query().Get("start")
  600. // end date for which to query costs, inclusive; format YYYY-MM-DD
  601. end := r.URL.Query().Get("end")
  602. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  603. kubernetesAggregation := r.URL.Query().Get("aggregator")
  604. // customAggregation allows full customization of aggregator w/o prepending
  605. customAggregation := r.URL.Query().Get("customAggregation")
  606. // disableCache, if set to "true", tells this function to recompute and
  607. // cache the requested data
  608. disableCache := r.URL.Query().Get("disableCache") == "true"
  609. // clearCache, if set to "true", tells this function to flush the cache,
  610. // then recompute and cache the requested data
  611. clearCache := r.URL.Query().Get("clearCache") == "true"
  612. aggregation := "kubernetes_" + kubernetesAggregation
  613. if customAggregation != "" {
  614. aggregation = customAggregation
  615. }
  616. // clear cache prior to checking the cache so that a clearCache=true
  617. // request always returns a freshly computed value
  618. if clearCache {
  619. a.OutOfClusterCache.Flush()
  620. }
  621. // attempt to retrieve cost data from cache
  622. key := fmt.Sprintf(`%s:%s:%s`, start, end, aggregation)
  623. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  624. if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
  625. w.Write(wrapDataWithMessage(data, nil, fmt.Sprintf("out of cluser cache hit: %s", key)))
  626. return
  627. }
  628. klog.Errorf("caching error: failed to type cast data: %s", key)
  629. }
  630. data, err := a.Cloud.ExternalAllocations(start, end, aggregation)
  631. if err == nil {
  632. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  633. }
  634. w.Write(wrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  635. }
  636. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  637. w.Header().Set("Content-Type", "application/json")
  638. w.Header().Set("Access-Control-Allow-Origin", "*")
  639. data, err := p.Cloud.AllNodePricing()
  640. w.Write(wrapData(data, err))
  641. }
  642. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  643. w.Header().Set("Content-Type", "application/json")
  644. w.Header().Set("Access-Control-Allow-Origin", "*")
  645. data, err := p.Cloud.GetConfig()
  646. w.Write(wrapData(data, err))
  647. }
  648. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  649. w.Header().Set("Content-Type", "application/json")
  650. w.Header().Set("Access-Control-Allow-Origin", "*")
  651. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  652. if err != nil {
  653. w.Write(wrapData(data, err))
  654. return
  655. }
  656. w.Write(wrapData(data, err))
  657. err = p.Cloud.DownloadPricingData()
  658. if err != nil {
  659. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  660. }
  661. return
  662. }
  663. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  664. w.Header().Set("Content-Type", "application/json")
  665. w.Header().Set("Access-Control-Allow-Origin", "*")
  666. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  667. if err != nil {
  668. w.Write(wrapData(data, err))
  669. return
  670. }
  671. w.Write(wrapData(data, err))
  672. return
  673. }
  674. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  675. w.Header().Set("Content-Type", "application/json")
  676. w.Header().Set("Access-Control-Allow-Origin", "*")
  677. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  678. if err != nil {
  679. w.Write(wrapData(data, err))
  680. return
  681. }
  682. w.Write(wrapData(data, err))
  683. return
  684. }
  685. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  686. w.Header().Set("Content-Type", "application/json")
  687. w.Header().Set("Access-Control-Allow-Origin", "*")
  688. data, err := p.Cloud.UpdateConfig(r.Body, "")
  689. if err != nil {
  690. w.Write(wrapData(data, err))
  691. return
  692. }
  693. w.Write(wrapData(data, err))
  694. return
  695. }
  696. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  697. w.Header().Set("Content-Type", "application/json")
  698. w.Header().Set("Access-Control-Allow-Origin", "*")
  699. data, err := p.Cloud.GetManagementPlatform()
  700. if err != nil {
  701. w.Write(wrapData(data, err))
  702. return
  703. }
  704. w.Write(wrapData(data, err))
  705. return
  706. }
  707. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  708. w.Header().Set("Content-Type", "application/json")
  709. w.Header().Set("Access-Control-Allow-Origin", "*")
  710. data, err := p.Cloud.ClusterInfo()
  711. w.Write(wrapData(data, err))
  712. }
  713. func Healthz(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  714. w.WriteHeader(200)
  715. w.Header().Set("Content-Length", "0")
  716. w.Header().Set("Content-Type", "text/plain")
  717. }
  718. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  719. w.Header().Set("Content-Type", "application/json")
  720. w.Header().Set("Access-Control-Allow-Origin", "*")
  721. w.Write(wrapData(ValidatePrometheus(p.PrometheusClient, false)))
  722. }
  723. func (p *Accesses) ContainerUptimes(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  724. w.Header().Set("Content-Type", "application/json")
  725. w.Header().Set("Access-Control-Allow-Origin", "*")
  726. res, err := ComputeUptimes(p.PrometheusClient)
  727. w.Write(wrapData(res, err))
  728. }
  729. func (a *Accesses) recordPrices() {
  730. go func() {
  731. containerSeen := make(map[string]bool)
  732. nodeSeen := make(map[string]bool)
  733. pvSeen := make(map[string]bool)
  734. pvcSeen := make(map[string]bool)
  735. getKeyFromLabelStrings := func(labels ...string) string {
  736. return strings.Join(labels, ",")
  737. }
  738. getLabelStringsFromKey := func(key string) []string {
  739. return strings.Split(key, ",")
  740. }
  741. for {
  742. klog.V(4).Info("Recording prices...")
  743. podlist := a.Model.Cache.GetAllPods()
  744. podStatus := make(map[string]v1.PodPhase)
  745. for _, pod := range podlist {
  746. podStatus[pod.Name] = pod.Status.Phase
  747. }
  748. // Record network pricing at global scope
  749. networkCosts, err := a.Cloud.NetworkPricing()
  750. if err != nil {
  751. klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
  752. } else {
  753. a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  754. a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  755. a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  756. }
  757. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
  758. if err != nil {
  759. klog.V(1).Info("Error in price recording: " + err.Error())
  760. // zero the for loop so the time.Sleep will still work
  761. data = map[string]*CostData{}
  762. }
  763. for _, costs := range data {
  764. nodeName := costs.NodeName
  765. node := costs.NodeData
  766. if node == nil {
  767. klog.V(4).Infof("Skipping Node \"%s\" due to missing Node Data costs", nodeName)
  768. continue
  769. }
  770. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  771. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  772. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  773. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  774. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  775. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  776. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  777. namespace := costs.Namespace
  778. podName := costs.PodName
  779. containerName := costs.Name
  780. if costs.PVCData != nil {
  781. for _, pvc := range costs.PVCData {
  782. if pvc.Volume != nil {
  783. a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
  784. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  785. pvcSeen[labelKey] = true
  786. }
  787. }
  788. }
  789. a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(cpuCost)
  790. a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName).Set(ramCost)
  791. a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName).Set(gpuCost)
  792. a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName).Set(totalCost)
  793. labelKey := getKeyFromLabelStrings(nodeName, nodeName)
  794. nodeSeen[labelKey] = true
  795. if len(costs.RAMAllocation) > 0 {
  796. a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  797. }
  798. if len(costs.CPUAllocation) > 0 {
  799. a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  800. }
  801. if len(costs.GPUReq) > 0 {
  802. // allocation here is set to the request because shared GPU usage not yet supported.
  803. a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  804. }
  805. labelKey = getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  806. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  807. containerSeen[labelKey] = true
  808. } else {
  809. containerSeen[labelKey] = false
  810. }
  811. storageClasses := a.Model.Cache.GetAllStorageClasses()
  812. storageClassMap := make(map[string]map[string]string)
  813. for _, storageClass := range storageClasses {
  814. params := storageClass.Parameters
  815. storageClassMap[storageClass.ObjectMeta.Name] = params
  816. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  817. storageClassMap["default"] = params
  818. storageClassMap[""] = params
  819. }
  820. }
  821. pvs := a.Model.Cache.GetAllPersistentVolumes()
  822. for _, pv := range pvs {
  823. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  824. if !ok {
  825. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  826. }
  827. cacPv := &costAnalyzerCloud.PV{
  828. Class: pv.Spec.StorageClassName,
  829. Region: pv.Labels[v1.LabelZoneRegion],
  830. Parameters: parameters,
  831. }
  832. GetPVCost(cacPv, pv, a.Cloud)
  833. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  834. a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
  835. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  836. pvSeen[labelKey] = true
  837. }
  838. containerUptime, _ := ComputeUptimes(a.PrometheusClient)
  839. for key, uptime := range containerUptime {
  840. container, _ := NewContainerMetricFromKey(key)
  841. a.ContainerUptimeRecorder.WithLabelValues(container.Namespace, container.PodName, container.ContainerName).Set(uptime)
  842. }
  843. }
  844. for labelString, seen := range nodeSeen {
  845. if !seen {
  846. labels := getLabelStringsFromKey(labelString)
  847. a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  848. a.CPUPriceRecorder.DeleteLabelValues(labels...)
  849. a.GPUPriceRecorder.DeleteLabelValues(labels...)
  850. a.RAMPriceRecorder.DeleteLabelValues(labels...)
  851. delete(nodeSeen, labelString)
  852. }
  853. nodeSeen[labelString] = false
  854. }
  855. for labelString, seen := range containerSeen {
  856. if !seen {
  857. labels := getLabelStringsFromKey(labelString)
  858. a.RAMAllocationRecorder.DeleteLabelValues(labels...)
  859. a.CPUAllocationRecorder.DeleteLabelValues(labels...)
  860. a.GPUAllocationRecorder.DeleteLabelValues(labels...)
  861. a.ContainerUptimeRecorder.DeleteLabelValues(labels...)
  862. delete(containerSeen, labelString)
  863. }
  864. containerSeen[labelString] = false
  865. }
  866. for labelString, seen := range pvSeen {
  867. if !seen {
  868. labels := getLabelStringsFromKey(labelString)
  869. a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  870. delete(pvSeen, labelString)
  871. }
  872. pvSeen[labelString] = false
  873. }
  874. for labelString, seen := range pvcSeen {
  875. if !seen {
  876. labels := getLabelStringsFromKey(labelString)
  877. a.PVAllocationRecorder.DeleteLabelValues(labels...)
  878. delete(pvcSeen, labelString)
  879. }
  880. pvcSeen[labelString] = false
  881. }
  882. time.Sleep(time.Minute)
  883. }
  884. }()
  885. }
  886. func init() {
  887. klog.InitFlags(nil)
  888. flag.Set("v", "3")
  889. flag.Parse()
  890. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  891. address := os.Getenv(prometheusServerEndpointEnvVar)
  892. if address == "" {
  893. klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
  894. }
  895. var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
  896. Proxy: http.ProxyFromEnvironment,
  897. DialContext: (&net.Dialer{
  898. Timeout: 120 * time.Second,
  899. KeepAlive: 120 * time.Second,
  900. }).DialContext,
  901. TLSHandshakeTimeout: 10 * time.Second,
  902. }
  903. pc := prometheusClient.Config{
  904. Address: address,
  905. RoundTripper: LongTimeoutRoundTripper,
  906. }
  907. promCli, _ := prometheusClient.NewClient(pc)
  908. api := prometheusAPI.NewAPI(promCli)
  909. _, err := api.Config(context.Background())
  910. if err != nil {
  911. klog.Fatalf("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  912. }
  913. klog.V(1).Info("Success: retrieved a prometheus config file from: " + address)
  914. _, err = ValidatePrometheus(promCli, false)
  915. if err != nil {
  916. klog.Fatalf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  917. }
  918. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  919. // Kubernetes API setup
  920. kc, err := rest.InClusterConfig()
  921. if err != nil {
  922. panic(err.Error())
  923. }
  924. kubeClientset, err := kubernetes.NewForConfig(kc)
  925. if err != nil {
  926. panic(err.Error())
  927. }
  928. cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
  929. cloudProvider, err := costAnalyzerCloud.NewProvider(kubeClientset, cloudProviderKey)
  930. if err != nil {
  931. panic(err.Error())
  932. }
  933. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  934. Name: "node_cpu_hourly_cost",
  935. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  936. }, []string{"instance", "node"})
  937. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  938. Name: "node_ram_hourly_cost",
  939. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  940. }, []string{"instance", "node"})
  941. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  942. Name: "node_gpu_hourly_cost",
  943. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  944. }, []string{"instance", "node"})
  945. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  946. Name: "node_total_hourly_cost",
  947. Help: "node_total_hourly_cost Total node cost per hour",
  948. }, []string{"instance", "node"})
  949. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  950. Name: "pv_hourly_cost",
  951. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  952. }, []string{"volumename", "persistentvolume"})
  953. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  954. Name: "container_memory_allocation_bytes",
  955. Help: "container_memory_allocation_bytes Bytes of RAM used",
  956. }, []string{"namespace", "pod", "container", "instance", "node"})
  957. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  958. Name: "container_cpu_allocation",
  959. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  960. }, []string{"namespace", "pod", "container", "instance", "node"})
  961. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  962. Name: "container_gpu_allocation",
  963. Help: "container_gpu_allocation GPU used",
  964. }, []string{"namespace", "pod", "container", "instance", "node"})
  965. PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  966. Name: "pod_pvc_allocation",
  967. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  968. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  969. ContainerUptimeRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  970. Name: "container_uptime_seconds",
  971. Help: "container_uptime_seconds Seconds a container has been running",
  972. }, []string{"namespace", "pod", "container"})
  973. NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  974. Name: "kubecost_network_zone_egress_cost",
  975. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  976. })
  977. NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  978. Name: "kubecost_network_region_egress_cost",
  979. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  980. })
  981. NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  982. Name: "kubecost_network_internet_egress_cost",
  983. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  984. })
  985. prometheus.MustRegister(cpuGv)
  986. prometheus.MustRegister(ramGv)
  987. prometheus.MustRegister(gpuGv)
  988. prometheus.MustRegister(totalGv)
  989. prometheus.MustRegister(pvGv)
  990. prometheus.MustRegister(RAMAllocation)
  991. prometheus.MustRegister(CPUAllocation)
  992. prometheus.MustRegister(ContainerUptimeRecorder)
  993. prometheus.MustRegister(PVAllocation)
  994. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  995. prometheus.MustRegister(ServiceCollector{
  996. KubeClientSet: kubeClientset,
  997. })
  998. prometheus.MustRegister(DeploymentCollector{
  999. KubeClientSet: kubeClientset,
  1000. })
  1001. // cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
  1002. aggregateCache := cache.New(time.Minute*5, time.Minute*10)
  1003. costDataCache := cache.New(time.Minute*5, time.Minute*10)
  1004. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  1005. A = Accesses{
  1006. PrometheusClient: promCli,
  1007. KubeClientSet: kubeClientset,
  1008. Cloud: cloudProvider,
  1009. CPUPriceRecorder: cpuGv,
  1010. RAMPriceRecorder: ramGv,
  1011. GPUPriceRecorder: gpuGv,
  1012. NodeTotalPriceRecorder: totalGv,
  1013. RAMAllocationRecorder: RAMAllocation,
  1014. CPUAllocationRecorder: CPUAllocation,
  1015. GPUAllocationRecorder: GPUAllocation,
  1016. PVAllocationRecorder: PVAllocation,
  1017. ContainerUptimeRecorder: ContainerUptimeRecorder,
  1018. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  1019. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  1020. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  1021. PersistentVolumePriceRecorder: pvGv,
  1022. Model: NewCostModel(kubeClientset),
  1023. AggregateCache: aggregateCache,
  1024. CostDataCache: costDataCache,
  1025. OutOfClusterCache: outOfClusterCache,
  1026. }
  1027. remoteEnabled := os.Getenv(remoteEnabled)
  1028. if remoteEnabled == "true" {
  1029. info, err := cloudProvider.ClusterInfo()
  1030. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  1031. if err != nil {
  1032. klog.Infof("Error saving cluster id %s", err.Error())
  1033. }
  1034. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  1035. if err != nil {
  1036. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  1037. }
  1038. }
  1039. // Thanos Client
  1040. if os.Getenv(thanosEnabled) == "true" {
  1041. thanosUrl := os.Getenv(thanosQueryUrl)
  1042. if thanosUrl != "" {
  1043. var thanosRT http.RoundTripper = &http.Transport{
  1044. Proxy: http.ProxyFromEnvironment,
  1045. DialContext: (&net.Dialer{
  1046. Timeout: 120 * time.Second,
  1047. KeepAlive: 120 * time.Second,
  1048. }).DialContext,
  1049. TLSHandshakeTimeout: 10 * time.Second,
  1050. }
  1051. thanosConfig := prometheusClient.Config{
  1052. Address: thanosUrl,
  1053. RoundTripper: thanosRT,
  1054. }
  1055. thanosCli, _ := prometheusClient.NewClient(thanosConfig)
  1056. _, err = ValidatePrometheus(thanosCli, true)
  1057. if err != nil {
  1058. klog.Fatalf("Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
  1059. } else {
  1060. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
  1061. A.ThanosClient = thanosCli
  1062. }
  1063. } else {
  1064. klog.Infof("Error resolving environment variable: $%s", thanosQueryUrl)
  1065. }
  1066. }
  1067. err = A.Cloud.DownloadPricingData()
  1068. if err != nil {
  1069. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  1070. }
  1071. A.recordPrices()
  1072. Router.GET("/costDataModel", A.CostDataModel)
  1073. Router.GET("/costDataModelRange", A.CostDataModelRange)
  1074. Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
  1075. Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
  1076. Router.GET("/allNodePricing", A.GetAllNodePricing)
  1077. Router.GET("/healthz", Healthz)
  1078. Router.GET("/getConfigs", A.GetConfigs)
  1079. Router.POST("/refreshPricing", A.RefreshPricingData)
  1080. Router.POST("/updateSpotInfoConfigs", A.UpdateSpotInfoConfigs)
  1081. Router.POST("/updateAthenaInfoConfigs", A.UpdateAthenaInfoConfigs)
  1082. Router.POST("/updateBigQueryInfoConfigs", A.UpdateBigQueryInfoConfigs)
  1083. Router.POST("/updateConfigByKey", A.UpdateConfigByKey)
  1084. Router.GET("/clusterCostsOverTime", A.ClusterCostsOverTime)
  1085. Router.GET("/clusterCosts", A.ClusterCosts)
  1086. Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
  1087. Router.GET("/managementPlatform", A.ManagementPlatform)
  1088. Router.GET("/clusterInfo", A.ClusterInfo)
  1089. Router.GET("/containerUptimes", A.ContainerUptimes)
  1090. Router.GET("/aggregatedCostModel", A.AggregateCostModel)
  1091. }