router.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "math"
  8. "net"
  9. "net/http"
  10. "os"
  11. "reflect"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "k8s.io/klog"
  16. "github.com/julienschmidt/httprouter"
  17. costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
  18. "github.com/kubecost/cost-model/pkg/clustercache"
  19. cm "github.com/kubecost/cost-model/pkg/clustermanager"
  20. prometheusClient "github.com/prometheus/client_golang/api"
  21. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  22. v1 "k8s.io/api/core/v1"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. bolt "github.com/etcd-io/bbolt"
  25. "github.com/patrickmn/go-cache"
  26. "github.com/prometheus/client_golang/prometheus"
  27. "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/rest"
  29. )
  30. const (
  31. prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
  32. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  33. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  34. )
  35. var (
  36. // gitCommit is set by the build system
  37. gitCommit string
  38. )
  39. var Router = httprouter.New()
  40. var A Accesses
  41. type Accesses struct {
  42. PrometheusClient prometheusClient.Client
  43. ThanosClient prometheusClient.Client
  44. KubeClientSet kubernetes.Interface
  45. ClusterManager *cm.ClusterManager
  46. Cloud costAnalyzerCloud.Provider
  47. CPUPriceRecorder *prometheus.GaugeVec
  48. RAMPriceRecorder *prometheus.GaugeVec
  49. PersistentVolumePriceRecorder *prometheus.GaugeVec
  50. GPUPriceRecorder *prometheus.GaugeVec
  51. NodeTotalPriceRecorder *prometheus.GaugeVec
  52. RAMAllocationRecorder *prometheus.GaugeVec
  53. CPUAllocationRecorder *prometheus.GaugeVec
  54. GPUAllocationRecorder *prometheus.GaugeVec
  55. PVAllocationRecorder *prometheus.GaugeVec
  56. ContainerUptimeRecorder *prometheus.GaugeVec
  57. NetworkZoneEgressRecorder prometheus.Gauge
  58. NetworkRegionEgressRecorder prometheus.Gauge
  59. NetworkInternetEgressRecorder prometheus.Gauge
  60. ServiceSelectorRecorder *prometheus.GaugeVec
  61. DeploymentSelectorRecorder *prometheus.GaugeVec
  62. Model *CostModel
  63. OutOfClusterCache *cache.Cache
  64. }
  65. type DataEnvelope struct {
  66. Code int `json:"code"`
  67. Status string `json:"status"`
  68. Data interface{} `json:"data"`
  69. Message string `json:"message,omitempty"`
  70. }
  71. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  72. type FilterFunc func(*CostData) (bool, string)
  73. // FilterCostData allows through only CostData that matches all the given filter functions
  74. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  75. result := make(map[string]*CostData)
  76. filteredEnvironments := make(map[string]int)
  77. filteredContainers := 0
  78. DataLoop:
  79. for key, datum := range data {
  80. for _, rf := range retains {
  81. if ok, _ := rf(datum); ok {
  82. result[key] = datum
  83. // if any retain function passes, the data is retained and move on
  84. continue DataLoop
  85. }
  86. }
  87. for _, ff := range filters {
  88. if ok, environment := ff(datum); !ok {
  89. if environment != "" {
  90. filteredEnvironments[environment]++
  91. }
  92. filteredContainers++
  93. // if any filter function check fails, move on to the next datum
  94. continue DataLoop
  95. }
  96. }
  97. result[key] = datum
  98. }
  99. return result, filteredContainers, filteredEnvironments
  100. }
  101. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  102. fs := strings.Split(fields, ",")
  103. fmap := make(map[string]bool)
  104. for _, f := range fs {
  105. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  106. klog.V(1).Infof("to delete: %s", fieldNameLower)
  107. fmap[fieldNameLower] = true
  108. }
  109. filteredData := make(map[string]CostData)
  110. for cname, costdata := range data {
  111. s := reflect.TypeOf(*costdata)
  112. val := reflect.ValueOf(*costdata)
  113. costdata2 := CostData{}
  114. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  115. n := s.NumField()
  116. for i := 0; i < n; i++ {
  117. field := s.Field(i)
  118. value := val.Field(i)
  119. value2 := cd2.Field(i)
  120. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  121. value2.Set(reflect.Value(value))
  122. }
  123. }
  124. filteredData[cname] = cd2.Interface().(CostData)
  125. }
  126. return filteredData
  127. }
  128. func normalizeTimeParam(param string) (string, error) {
  129. // convert days to hours
  130. if param[len(param)-1:] == "d" {
  131. count := param[:len(param)-1]
  132. val, err := strconv.ParseInt(count, 10, 64)
  133. if err != nil {
  134. return "", err
  135. }
  136. val = val * 24
  137. param = fmt.Sprintf("%dh", val)
  138. }
  139. return param, nil
  140. }
  141. // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  142. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  143. // return 0.0.
  144. func ParsePercentString(percentStr string) (float64, error) {
  145. if len(percentStr) == 0 {
  146. return 0.0, nil
  147. }
  148. if percentStr[len(percentStr)-1:] == "%" {
  149. percentStr = percentStr[:len(percentStr)-1]
  150. }
  151. discount, err := strconv.ParseFloat(percentStr, 64)
  152. if err != nil {
  153. return 0.0, err
  154. }
  155. discount *= 0.01
  156. return discount, nil
  157. }
  158. // parseDuration converts a Prometheus-style duration string into a Duration
  159. func ParseDuration(duration string) (*time.Duration, error) {
  160. unitStr := duration[len(duration)-1:]
  161. var unit time.Duration
  162. switch unitStr {
  163. case "s":
  164. unit = time.Second
  165. case "m":
  166. unit = time.Minute
  167. case "h":
  168. unit = time.Hour
  169. case "d":
  170. unit = 24.0 * time.Hour
  171. default:
  172. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  173. }
  174. amountStr := duration[:len(duration)-1]
  175. amount, err := strconv.ParseInt(amountStr, 10, 64)
  176. if err != nil {
  177. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  178. }
  179. dur := time.Duration(amount) * unit
  180. return &dur, nil
  181. }
  182. // ParseTimeRange returns a start and end time, respectively, which are converted from
  183. // a duration and offset, defined as strings with Prometheus-style syntax.
  184. func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
  185. // endTime defaults to the current time, unless an offset is explicity declared,
  186. // in which case it shifts endTime back by given duration
  187. endTime := time.Now()
  188. if offset != "" {
  189. o, err := ParseDuration(offset)
  190. if err != nil {
  191. return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
  192. }
  193. endTime = endTime.Add(-1 * *o)
  194. }
  195. // if duration is defined in terms of days, convert to hours
  196. // e.g. convert "2d" to "48h"
  197. durationNorm, err := normalizeTimeParam(duration)
  198. if err != nil {
  199. return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
  200. }
  201. // convert time duration into start and end times, formatted
  202. // as ISO datetime strings
  203. dur, err := time.ParseDuration(durationNorm)
  204. if err != nil {
  205. return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
  206. }
  207. startTime := endTime.Add(-1 * dur)
  208. return &startTime, &endTime, nil
  209. }
  210. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  211. var resp []byte
  212. if err != nil {
  213. klog.V(1).Infof("Error returned to client: %s", err.Error())
  214. resp, _ = json.Marshal(&DataEnvelope{
  215. Code: http.StatusInternalServerError,
  216. Status: "error",
  217. Message: err.Error(),
  218. Data: data,
  219. })
  220. } else {
  221. resp, _ = json.Marshal(&DataEnvelope{
  222. Code: http.StatusOK,
  223. Status: "success",
  224. Data: data,
  225. Message: message,
  226. })
  227. }
  228. return resp
  229. }
  230. func WrapData(data interface{}, err error) []byte {
  231. var resp []byte
  232. if err != nil {
  233. klog.V(1).Infof("Error returned to client: %s", err.Error())
  234. resp, _ = json.Marshal(&DataEnvelope{
  235. Code: http.StatusInternalServerError,
  236. Status: "error",
  237. Message: err.Error(),
  238. Data: data,
  239. })
  240. } else {
  241. resp, _ = json.Marshal(&DataEnvelope{
  242. Code: http.StatusOK,
  243. Status: "success",
  244. Data: data,
  245. })
  246. }
  247. return resp
  248. }
  249. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  250. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  251. w.Header().Set("Content-Type", "application/json")
  252. w.Header().Set("Access-Control-Allow-Origin", "*")
  253. err := a.Cloud.DownloadPricingData()
  254. w.Write(WrapData(nil, err))
  255. }
  256. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  257. w.Header().Set("Content-Type", "application/json")
  258. w.Header().Set("Access-Control-Allow-Origin", "*")
  259. window := r.URL.Query().Get("timeWindow")
  260. offset := r.URL.Query().Get("offset")
  261. fields := r.URL.Query().Get("filterFields")
  262. namespace := r.URL.Query().Get("namespace")
  263. if offset != "" {
  264. offset = "offset " + offset
  265. }
  266. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  267. if fields != "" {
  268. filteredData := filterFields(fields, data)
  269. w.Write(WrapData(filteredData, err))
  270. } else {
  271. w.Write(WrapData(data, err))
  272. }
  273. }
  274. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  275. w.Header().Set("Content-Type", "application/json")
  276. w.Header().Set("Access-Control-Allow-Origin", "*")
  277. window := r.URL.Query().Get("window")
  278. offset := r.URL.Query().Get("offset")
  279. data, err := ComputeClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
  280. w.Write(WrapData(data, err))
  281. }
  282. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  283. w.Header().Set("Content-Type", "application/json")
  284. w.Header().Set("Access-Control-Allow-Origin", "*")
  285. start := r.URL.Query().Get("start")
  286. end := r.URL.Query().Get("end")
  287. window := r.URL.Query().Get("window")
  288. offset := r.URL.Query().Get("offset")
  289. data, err := ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  290. w.Write(WrapData(data, err))
  291. }
  292. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  293. w.Header().Set("Content-Type", "application/json")
  294. w.Header().Set("Access-Control-Allow-Origin", "*")
  295. start := r.URL.Query().Get("start")
  296. end := r.URL.Query().Get("end")
  297. window := r.URL.Query().Get("window")
  298. fields := r.URL.Query().Get("filterFields")
  299. namespace := r.URL.Query().Get("namespace")
  300. cluster := r.URL.Query().Get("cluster")
  301. remote := r.URL.Query().Get("remote")
  302. remoteAvailable := os.Getenv(remoteEnabled)
  303. remoteEnabled := false
  304. if remoteAvailable == "true" && remote != "false" {
  305. remoteEnabled = true
  306. }
  307. // Use Thanos Client if it exists (enabled) and remote flag set
  308. var pClient prometheusClient.Client
  309. if remote != "false" && a.ThanosClient != nil {
  310. pClient = a.ThanosClient
  311. } else {
  312. pClient = a.PrometheusClient
  313. }
  314. resolutionHours := 1.0
  315. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, resolutionHours, namespace, cluster, remoteEnabled)
  316. if err != nil {
  317. w.Write(WrapData(nil, err))
  318. }
  319. if fields != "" {
  320. filteredData := filterFields(fields, data)
  321. w.Write(WrapData(filteredData, err))
  322. } else {
  323. w.Write(WrapData(data, err))
  324. }
  325. }
  326. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  327. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  328. w.Header().Set("Content-Type", "application/json")
  329. w.Header().Set("Access-Control-Allow-Origin", "*")
  330. startString := r.URL.Query().Get("start")
  331. endString := r.URL.Query().Get("end")
  332. windowString := r.URL.Query().Get("window")
  333. var start time.Time
  334. var end time.Time
  335. var err error
  336. if windowString == "" {
  337. windowString = "1h"
  338. }
  339. if startString != "" {
  340. start, err = time.Parse(RFC3339Milli, startString)
  341. if err != nil {
  342. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  343. w.Write(WrapData(nil, err))
  344. }
  345. } else {
  346. window, err := time.ParseDuration(windowString)
  347. if err != nil {
  348. w.Write(WrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  349. }
  350. start = time.Now().Add(-2 * window)
  351. }
  352. if endString != "" {
  353. end, err = time.Parse(RFC3339Milli, endString)
  354. if err != nil {
  355. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  356. w.Write(WrapData(nil, err))
  357. }
  358. } else {
  359. end = time.Now()
  360. }
  361. remoteLayout := "2006-01-02T15:04:05Z"
  362. remoteStartStr := start.Format(remoteLayout)
  363. remoteEndStr := end.Format(remoteLayout)
  364. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  365. data, err := CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  366. w.Write(WrapData(data, err))
  367. }
  368. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  369. var key string
  370. var filter string
  371. var val []string
  372. if customAggregation != "" {
  373. key = customAggregation
  374. filter = filterType
  375. val = strings.Split(customAggregation, ",")
  376. } else {
  377. aggregations := strings.Split(aggregator, ",")
  378. for i, agg := range aggregations {
  379. aggregations[i] = "kubernetes_" + agg
  380. }
  381. key = strings.Join(aggregations, ",")
  382. filter = "kubernetes_" + filterType
  383. val = aggregations
  384. }
  385. return key, val, filter
  386. }
  387. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  388. w.Header().Set("Content-Type", "application/json")
  389. w.Header().Set("Access-Control-Allow-Origin", "*")
  390. start := r.URL.Query().Get("start")
  391. end := r.URL.Query().Get("end")
  392. aggregator := r.URL.Query().Get("aggregator")
  393. customAggregation := r.URL.Query().Get("customAggregation")
  394. filterType := r.URL.Query().Get("filterType")
  395. filterValue := r.URL.Query().Get("filterValue")
  396. var data []*costAnalyzerCloud.OutOfClusterAllocation
  397. var err error
  398. _, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
  399. data, err = a.Cloud.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
  400. w.Write(WrapData(data, err))
  401. }
  402. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  403. w.Header().Set("Content-Type", "application/json")
  404. w.Header().Set("Access-Control-Allow-Origin", "*")
  405. // start date for which to query costs, inclusive; format YYYY-MM-DD
  406. start := r.URL.Query().Get("start")
  407. // end date for which to query costs, inclusive; format YYYY-MM-DD
  408. end := r.URL.Query().Get("end")
  409. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  410. kubernetesAggregation := r.URL.Query().Get("aggregator")
  411. // customAggregation allows full customization of aggregator w/o prepending
  412. customAggregation := r.URL.Query().Get("customAggregation")
  413. // disableCache, if set to "true", tells this function to recompute and
  414. // cache the requested data
  415. disableCache := r.URL.Query().Get("disableCache") == "true"
  416. // clearCache, if set to "true", tells this function to flush the cache,
  417. // then recompute and cache the requested data
  418. clearCache := r.URL.Query().Get("clearCache") == "true"
  419. filterType := r.URL.Query().Get("filterType")
  420. filterValue := r.URL.Query().Get("filterValue")
  421. aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
  422. // clear cache prior to checking the cache so that a clearCache=true
  423. // request always returns a freshly computed value
  424. if clearCache {
  425. a.OutOfClusterCache.Flush()
  426. }
  427. // attempt to retrieve cost data from cache
  428. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
  429. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  430. if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
  431. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  432. return
  433. }
  434. klog.Errorf("caching error: failed to type cast data: %s", key)
  435. }
  436. data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
  437. if err == nil {
  438. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  439. }
  440. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  441. }
  442. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  443. w.Header().Set("Content-Type", "application/json")
  444. w.Header().Set("Access-Control-Allow-Origin", "*")
  445. data, err := p.Cloud.AllNodePricing()
  446. w.Write(WrapData(data, err))
  447. }
  448. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  449. w.Header().Set("Content-Type", "application/json")
  450. w.Header().Set("Access-Control-Allow-Origin", "*")
  451. data, err := p.Cloud.GetConfig()
  452. w.Write(WrapData(data, err))
  453. }
  454. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  455. w.Header().Set("Content-Type", "application/json")
  456. w.Header().Set("Access-Control-Allow-Origin", "*")
  457. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  458. if err != nil {
  459. w.Write(WrapData(data, err))
  460. return
  461. }
  462. w.Write(WrapData(data, err))
  463. err = p.Cloud.DownloadPricingData()
  464. if err != nil {
  465. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  466. }
  467. return
  468. }
  469. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  470. w.Header().Set("Content-Type", "application/json")
  471. w.Header().Set("Access-Control-Allow-Origin", "*")
  472. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  473. if err != nil {
  474. w.Write(WrapData(data, err))
  475. return
  476. }
  477. w.Write(WrapData(data, err))
  478. return
  479. }
  480. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  481. w.Header().Set("Content-Type", "application/json")
  482. w.Header().Set("Access-Control-Allow-Origin", "*")
  483. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  484. if err != nil {
  485. w.Write(WrapData(data, err))
  486. return
  487. }
  488. w.Write(WrapData(data, err))
  489. return
  490. }
  491. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  492. w.Header().Set("Content-Type", "application/json")
  493. w.Header().Set("Access-Control-Allow-Origin", "*")
  494. data, err := p.Cloud.UpdateConfig(r.Body, "")
  495. if err != nil {
  496. w.Write(WrapData(data, err))
  497. return
  498. }
  499. w.Write(WrapData(data, err))
  500. return
  501. }
  502. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  503. w.Header().Set("Content-Type", "application/json")
  504. w.Header().Set("Access-Control-Allow-Origin", "*")
  505. data, err := p.Cloud.GetManagementPlatform()
  506. if err != nil {
  507. w.Write(WrapData(data, err))
  508. return
  509. }
  510. w.Write(WrapData(data, err))
  511. return
  512. }
  513. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  514. w.Header().Set("Content-Type", "application/json")
  515. w.Header().Set("Access-Control-Allow-Origin", "*")
  516. data, err := p.Cloud.ClusterInfo()
  517. kc, ok := p.KubeClientSet.(*kubernetes.Clientset)
  518. if ok && data != nil {
  519. v, err := kc.ServerVersion()
  520. if err != nil {
  521. klog.Infof("Could not get k8s version info: %s", err.Error())
  522. } else if v != nil {
  523. data["version"] = v.Major + "." + v.Minor
  524. }
  525. } else {
  526. klog.Infof("Could not get k8s version info: %s", err.Error())
  527. }
  528. w.Write(WrapData(data, err))
  529. }
  530. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  531. w.Header().Set("Content-Type", "application/json")
  532. w.Header().Set("Access-Control-Allow-Origin", "*")
  533. w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
  534. }
  535. func (p *Accesses) ContainerUptimes(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  536. w.Header().Set("Content-Type", "application/json")
  537. w.Header().Set("Access-Control-Allow-Origin", "*")
  538. res, err := ComputeUptimes(p.PrometheusClient)
  539. w.Write(WrapData(res, err))
  540. }
  541. func (a *Accesses) recordPrices() {
  542. go func() {
  543. containerSeen := make(map[string]bool)
  544. nodeSeen := make(map[string]bool)
  545. pvSeen := make(map[string]bool)
  546. pvcSeen := make(map[string]bool)
  547. getKeyFromLabelStrings := func(labels ...string) string {
  548. return strings.Join(labels, ",")
  549. }
  550. getLabelStringsFromKey := func(key string) []string {
  551. return strings.Split(key, ",")
  552. }
  553. var defaultRegion string = ""
  554. nodeList := a.Model.Cache.GetAllNodes()
  555. if len(nodeList) > 0 {
  556. defaultRegion = nodeList[0].Labels[v1.LabelZoneRegion]
  557. }
  558. for {
  559. klog.V(4).Info("Recording prices...")
  560. podlist := a.Model.Cache.GetAllPods()
  561. podStatus := make(map[string]v1.PodPhase)
  562. for _, pod := range podlist {
  563. podStatus[pod.Name] = pod.Status.Phase
  564. }
  565. cfg, _ := a.Cloud.GetConfig()
  566. // Record network pricing at global scope
  567. networkCosts, err := a.Cloud.NetworkPricing()
  568. if err != nil {
  569. klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
  570. } else {
  571. a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  572. a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  573. a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  574. }
  575. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
  576. if err != nil {
  577. klog.V(1).Info("Error in price recording: " + err.Error())
  578. // zero the for loop so the time.Sleep will still work
  579. data = map[string]*CostData{}
  580. }
  581. nodes, err := a.Model.GetNodeCost(a.Cloud)
  582. for nodeName, node := range nodes {
  583. // Emit costs, guarding against NaN inputs for custom pricing.
  584. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  585. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  586. cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
  587. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  588. cpuCost = 0
  589. }
  590. }
  591. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  592. if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
  593. cpu = 1 // Assume 1 CPU
  594. }
  595. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  596. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  597. ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
  598. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  599. ramCost = 0
  600. }
  601. }
  602. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  603. if math.IsNaN(ram) || math.IsInf(ram, 0) {
  604. ram = 0
  605. }
  606. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  607. if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
  608. gpu = 0
  609. }
  610. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  611. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  612. gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
  613. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  614. gpuCost = 0
  615. }
  616. }
  617. nodeType := node.InstanceType
  618. nodeRegion := node.Region
  619. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  620. a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion).Set(cpuCost)
  621. a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion).Set(ramCost)
  622. a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion).Set(gpuCost)
  623. a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion).Set(totalCost)
  624. labelKey := getKeyFromLabelStrings(nodeName, nodeName)
  625. nodeSeen[labelKey] = true
  626. }
  627. for _, costs := range data {
  628. nodeName := costs.NodeName
  629. namespace := costs.Namespace
  630. podName := costs.PodName
  631. containerName := costs.Name
  632. if costs.PVCData != nil {
  633. for _, pvc := range costs.PVCData {
  634. if pvc.Volume != nil {
  635. a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
  636. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  637. pvcSeen[labelKey] = true
  638. }
  639. }
  640. }
  641. if len(costs.RAMAllocation) > 0 {
  642. a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  643. }
  644. if len(costs.CPUAllocation) > 0 {
  645. a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  646. }
  647. if len(costs.GPUReq) > 0 {
  648. // allocation here is set to the request because shared GPU usage not yet supported.
  649. a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  650. }
  651. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  652. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  653. containerSeen[labelKey] = true
  654. } else {
  655. containerSeen[labelKey] = false
  656. }
  657. storageClasses := a.Model.Cache.GetAllStorageClasses()
  658. storageClassMap := make(map[string]map[string]string)
  659. for _, storageClass := range storageClasses {
  660. params := storageClass.Parameters
  661. storageClassMap[storageClass.ObjectMeta.Name] = params
  662. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  663. storageClassMap["default"] = params
  664. storageClassMap[""] = params
  665. }
  666. }
  667. pvs := a.Model.Cache.GetAllPersistentVolumes()
  668. for _, pv := range pvs {
  669. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  670. if !ok {
  671. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  672. }
  673. var region string
  674. if r, ok := pv.Labels[v1.LabelZoneRegion]; ok {
  675. region = r
  676. } else {
  677. region = defaultRegion
  678. }
  679. cacPv := &costAnalyzerCloud.PV{
  680. Class: pv.Spec.StorageClassName,
  681. Region: region,
  682. Parameters: parameters,
  683. }
  684. GetPVCost(cacPv, pv, a.Cloud, region)
  685. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  686. a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
  687. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  688. pvSeen[labelKey] = true
  689. }
  690. containerUptime, _ := ComputeUptimes(a.PrometheusClient)
  691. for key, uptime := range containerUptime {
  692. container, _ := NewContainerMetricFromKey(key)
  693. a.ContainerUptimeRecorder.WithLabelValues(container.Namespace, container.PodName, container.ContainerName).Set(uptime)
  694. }
  695. }
  696. for labelString, seen := range nodeSeen {
  697. if !seen {
  698. labels := getLabelStringsFromKey(labelString)
  699. a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  700. a.CPUPriceRecorder.DeleteLabelValues(labels...)
  701. a.GPUPriceRecorder.DeleteLabelValues(labels...)
  702. a.RAMPriceRecorder.DeleteLabelValues(labels...)
  703. delete(nodeSeen, labelString)
  704. }
  705. nodeSeen[labelString] = false
  706. }
  707. for labelString, seen := range containerSeen {
  708. if !seen {
  709. labels := getLabelStringsFromKey(labelString)
  710. a.RAMAllocationRecorder.DeleteLabelValues(labels...)
  711. a.CPUAllocationRecorder.DeleteLabelValues(labels...)
  712. a.GPUAllocationRecorder.DeleteLabelValues(labels...)
  713. a.ContainerUptimeRecorder.DeleteLabelValues(labels...)
  714. delete(containerSeen, labelString)
  715. }
  716. containerSeen[labelString] = false
  717. }
  718. for labelString, seen := range pvSeen {
  719. if !seen {
  720. labels := getLabelStringsFromKey(labelString)
  721. a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  722. delete(pvSeen, labelString)
  723. }
  724. pvSeen[labelString] = false
  725. }
  726. for labelString, seen := range pvcSeen {
  727. if !seen {
  728. labels := getLabelStringsFromKey(labelString)
  729. a.PVAllocationRecorder.DeleteLabelValues(labels...)
  730. delete(pvcSeen, labelString)
  731. }
  732. pvcSeen[labelString] = false
  733. }
  734. time.Sleep(time.Minute)
  735. }
  736. }()
  737. }
  738. // Creates a new ClusterManager instance using a boltdb storage. If that fails,
  739. // then we fall back to a memory-only storage.
  740. func newClusterManager() *cm.ClusterManager {
  741. clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
  742. path := os.Getenv("CONFIG_PATH")
  743. db, err := bolt.Open(path+"costmodel.db", 0600, nil)
  744. if err != nil {
  745. klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
  746. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  747. }
  748. store, err := cm.NewBoltDBClusterStorage("clusters", db)
  749. if err != nil {
  750. klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
  751. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  752. }
  753. return cm.NewConfiguredClusterManager(store, clustersConfigFile)
  754. }
  755. type ConfigWatchers struct {
  756. ConfigmapName string
  757. WatchFunc func(string, map[string]string) error
  758. }
  759. func Initialize(additionalConfigWatchers ...ConfigWatchers) {
  760. klog.InitFlags(nil)
  761. flag.Set("v", "3")
  762. flag.Parse()
  763. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  764. address := os.Getenv(prometheusServerEndpointEnvVar)
  765. if address == "" {
  766. klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
  767. }
  768. var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
  769. Proxy: http.ProxyFromEnvironment,
  770. DialContext: (&net.Dialer{
  771. Timeout: 120 * time.Second,
  772. KeepAlive: 120 * time.Second,
  773. }).DialContext,
  774. TLSHandshakeTimeout: 10 * time.Second,
  775. }
  776. pc := prometheusClient.Config{
  777. Address: address,
  778. RoundTripper: LongTimeoutRoundTripper,
  779. }
  780. promCli, _ := prometheusClient.NewClient(pc)
  781. api := prometheusAPI.NewAPI(promCli)
  782. _, err := api.Config(context.Background())
  783. if err != nil {
  784. klog.Fatalf("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  785. }
  786. klog.V(1).Info("Success: retrieved a prometheus config file from: " + address)
  787. _, err = ValidatePrometheus(promCli, false)
  788. if err != nil {
  789. klog.Fatalf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  790. }
  791. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  792. // Kubernetes API setup
  793. kc, err := rest.InClusterConfig()
  794. if err != nil {
  795. panic(err.Error())
  796. }
  797. kubeClientset, err := kubernetes.NewForConfig(kc)
  798. if err != nil {
  799. panic(err.Error())
  800. }
  801. // Create Kubernetes Cluster Cache + Watchers
  802. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  803. k8sCache.Run()
  804. cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
  805. cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
  806. if err != nil {
  807. panic(err.Error())
  808. }
  809. watchConfigFunc := func(c interface{}) {
  810. conf := c.(*v1.ConfigMap)
  811. if conf.GetName() == "pricing-configs" {
  812. _, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
  813. if err != nil {
  814. klog.Infof("ERROR UPDATING %s CONFIG: %s", "pricing-configs", err.Error())
  815. }
  816. }
  817. for _, cw := range additionalConfigWatchers {
  818. if conf.GetName() == cw.ConfigmapName {
  819. err := cw.WatchFunc(conf.GetName(), conf.Data)
  820. if err != nil {
  821. klog.Infof("ERROR UPDATING %s CONFIG: %s", cw.ConfigmapName, err.Error())
  822. }
  823. }
  824. }
  825. }
  826. kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
  827. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  828. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
  829. if err != nil {
  830. klog.Infof("No %s configmap found at installtime, using existing configs: %s", "pricing-configs", err.Error())
  831. } else {
  832. watchConfigFunc(configs)
  833. }
  834. for _, cw := range additionalConfigWatchers {
  835. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(cw.ConfigmapName, metav1.GetOptions{})
  836. if err != nil {
  837. klog.Infof("No %s configmap found at installtime, using existing configs: %s", cw.ConfigmapName, err.Error())
  838. } else {
  839. watchConfigFunc(configs)
  840. }
  841. }
  842. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  843. // TODO: General Architecture Note: Several passes have been made to modularize a lot of
  844. // TODO: our code, but the router still continues to be the obvious entry point for new \
  845. // TODO: features. We should look to spliting out the actual "router" functionality and
  846. // TODO: implement a builder -> controller for stitching new features and other dependencies.
  847. clusterManager := newClusterManager()
  848. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  849. Name: "node_cpu_hourly_cost",
  850. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  851. }, []string{"instance", "node", "instance_type", "region"})
  852. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  853. Name: "node_ram_hourly_cost",
  854. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  855. }, []string{"instance", "node", "instance_type", "region"})
  856. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  857. Name: "node_gpu_hourly_cost",
  858. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  859. }, []string{"instance", "node", "instance_type", "region"})
  860. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  861. Name: "node_total_hourly_cost",
  862. Help: "node_total_hourly_cost Total node cost per hour",
  863. }, []string{"instance", "node", "instance_type", "region"})
  864. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  865. Name: "pv_hourly_cost",
  866. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  867. }, []string{"volumename", "persistentvolume"})
  868. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  869. Name: "container_memory_allocation_bytes",
  870. Help: "container_memory_allocation_bytes Bytes of RAM used",
  871. }, []string{"namespace", "pod", "container", "instance", "node"})
  872. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  873. Name: "container_cpu_allocation",
  874. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  875. }, []string{"namespace", "pod", "container", "instance", "node"})
  876. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  877. Name: "container_gpu_allocation",
  878. Help: "container_gpu_allocation GPU used",
  879. }, []string{"namespace", "pod", "container", "instance", "node"})
  880. PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  881. Name: "pod_pvc_allocation",
  882. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  883. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  884. ContainerUptimeRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  885. Name: "container_uptime_seconds",
  886. Help: "container_uptime_seconds Seconds a container has been running",
  887. }, []string{"namespace", "pod", "container"})
  888. NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  889. Name: "kubecost_network_zone_egress_cost",
  890. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  891. })
  892. NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  893. Name: "kubecost_network_region_egress_cost",
  894. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  895. })
  896. NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  897. Name: "kubecost_network_internet_egress_cost",
  898. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  899. })
  900. prometheus.MustRegister(cpuGv)
  901. prometheus.MustRegister(ramGv)
  902. prometheus.MustRegister(gpuGv)
  903. prometheus.MustRegister(totalGv)
  904. prometheus.MustRegister(pvGv)
  905. prometheus.MustRegister(RAMAllocation)
  906. prometheus.MustRegister(CPUAllocation)
  907. prometheus.MustRegister(ContainerUptimeRecorder)
  908. prometheus.MustRegister(PVAllocation)
  909. prometheus.MustRegister(GPUAllocation)
  910. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  911. prometheus.MustRegister(ServiceCollector{
  912. KubeClientSet: kubeClientset,
  913. })
  914. prometheus.MustRegister(DeploymentCollector{
  915. KubeClientSet: kubeClientset,
  916. })
  917. prometheus.MustRegister(StatefulsetCollector{
  918. KubeClientSet: kubeClientset,
  919. })
  920. // cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
  921. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  922. A = Accesses{
  923. PrometheusClient: promCli,
  924. KubeClientSet: kubeClientset,
  925. ClusterManager: clusterManager,
  926. Cloud: cloudProvider,
  927. CPUPriceRecorder: cpuGv,
  928. RAMPriceRecorder: ramGv,
  929. GPUPriceRecorder: gpuGv,
  930. NodeTotalPriceRecorder: totalGv,
  931. RAMAllocationRecorder: RAMAllocation,
  932. CPUAllocationRecorder: CPUAllocation,
  933. GPUAllocationRecorder: GPUAllocation,
  934. PVAllocationRecorder: PVAllocation,
  935. ContainerUptimeRecorder: ContainerUptimeRecorder,
  936. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  937. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  938. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  939. PersistentVolumePriceRecorder: pvGv,
  940. Model: NewCostModel(k8sCache),
  941. OutOfClusterCache: outOfClusterCache,
  942. }
  943. remoteEnabled := os.Getenv(remoteEnabled)
  944. if remoteEnabled == "true" {
  945. info, err := cloudProvider.ClusterInfo()
  946. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  947. if err != nil {
  948. klog.Infof("Error saving cluster id %s", err.Error())
  949. }
  950. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  951. if err != nil {
  952. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  953. }
  954. }
  955. // Thanos Client
  956. if os.Getenv(thanosEnabled) == "true" {
  957. thanosUrl := os.Getenv(thanosQueryUrl)
  958. if thanosUrl != "" {
  959. var thanosRT http.RoundTripper = &http.Transport{
  960. Proxy: http.ProxyFromEnvironment,
  961. DialContext: (&net.Dialer{
  962. Timeout: 120 * time.Second,
  963. KeepAlive: 120 * time.Second,
  964. }).DialContext,
  965. TLSHandshakeTimeout: 10 * time.Second,
  966. }
  967. thanosConfig := prometheusClient.Config{
  968. Address: thanosUrl,
  969. RoundTripper: thanosRT,
  970. }
  971. thanosCli, _ := prometheusClient.NewClient(thanosConfig)
  972. _, err = ValidatePrometheus(thanosCli, true)
  973. if err != nil {
  974. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
  975. A.ThanosClient = thanosCli
  976. } else {
  977. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
  978. A.ThanosClient = thanosCli
  979. }
  980. } else {
  981. klog.Infof("Error resolving environment variable: $%s", thanosQueryUrl)
  982. }
  983. }
  984. err = A.Cloud.DownloadPricingData()
  985. if err != nil {
  986. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  987. }
  988. A.recordPrices()
  989. managerEndpoints := cm.NewClusterManagerEndpoints(A.ClusterManager)
  990. Router.GET("/costDataModel", A.CostDataModel)
  991. Router.GET("/costDataModelRange", A.CostDataModelRange)
  992. Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
  993. Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
  994. Router.GET("/allNodePricing", A.GetAllNodePricing)
  995. Router.POST("/refreshPricing", A.RefreshPricingData)
  996. Router.GET("/clusterCostsOverTime", A.ClusterCostsOverTime)
  997. Router.GET("/clusterCosts", A.ClusterCosts)
  998. Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
  999. Router.GET("/managementPlatform", A.ManagementPlatform)
  1000. Router.GET("/clusterInfo", A.ClusterInfo)
  1001. Router.GET("/containerUptimes", A.ContainerUptimes)
  1002. Router.GET("/clusters", managerEndpoints.GetAllClusters)
  1003. Router.PUT("/clusters", managerEndpoints.PutCluster)
  1004. Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
  1005. }