router.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "math"
  8. "net"
  9. "net/http"
  10. "os"
  11. "reflect"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "k8s.io/klog"
  16. "github.com/julienschmidt/httprouter"
  17. sentry "github.com/getsentry/sentry-go"
  18. costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
  19. "github.com/kubecost/cost-model/pkg/clustercache"
  20. cm "github.com/kubecost/cost-model/pkg/clustermanager"
  21. "github.com/kubecost/cost-model/pkg/errors"
  22. "github.com/kubecost/cost-model/pkg/log"
  23. "github.com/kubecost/cost-model/pkg/prom"
  24. prometheusClient "github.com/prometheus/client_golang/api"
  25. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  26. v1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "github.com/patrickmn/go-cache"
  29. "github.com/prometheus/client_golang/prometheus"
  30. "k8s.io/client-go/kubernetes"
  31. "k8s.io/client-go/rest"
  32. )
  33. const (
  34. logCollectionEnvVar = "LOG_COLLECTION_ENABLED"
  35. productAnalyticsEnvVar = "PRODUCT_ANALYTICS_ENABLED"
  36. errorReportingEnvVar = "ERROR_REPORTING_ENABLED"
  37. maxQueryConcurrencyEnvVar = "MAX_QUERY_CONCURRENCY"
  38. prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
  39. prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
  40. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  41. )
  42. var (
  43. // gitCommit is set by the build system
  44. gitCommit string
  45. logCollectionEnabled bool = strings.EqualFold(os.Getenv(logCollectionEnvVar), "true")
  46. productAnalyticsEnabled bool = strings.EqualFold(os.Getenv(productAnalyticsEnvVar), "true")
  47. errorReportingEnabled bool = strings.EqualFold(os.Getenv(errorReportingEnvVar), "true")
  48. )
  49. var Router = httprouter.New()
  50. var A Accesses
  51. type Accesses struct {
  52. PrometheusClient prometheusClient.Client
  53. ThanosClient prometheusClient.Client
  54. KubeClientSet kubernetes.Interface
  55. ClusterManager *cm.ClusterManager
  56. Cloud costAnalyzerCloud.Provider
  57. CPUPriceRecorder *prometheus.GaugeVec
  58. RAMPriceRecorder *prometheus.GaugeVec
  59. PersistentVolumePriceRecorder *prometheus.GaugeVec
  60. GPUPriceRecorder *prometheus.GaugeVec
  61. NodeTotalPriceRecorder *prometheus.GaugeVec
  62. RAMAllocationRecorder *prometheus.GaugeVec
  63. CPUAllocationRecorder *prometheus.GaugeVec
  64. GPUAllocationRecorder *prometheus.GaugeVec
  65. PVAllocationRecorder *prometheus.GaugeVec
  66. NetworkZoneEgressRecorder prometheus.Gauge
  67. NetworkRegionEgressRecorder prometheus.Gauge
  68. NetworkInternetEgressRecorder prometheus.Gauge
  69. ServiceSelectorRecorder *prometheus.GaugeVec
  70. DeploymentSelectorRecorder *prometheus.GaugeVec
  71. Model *CostModel
  72. OutOfClusterCache *cache.Cache
  73. }
  74. type DataEnvelope struct {
  75. Code int `json:"code"`
  76. Status string `json:"status"`
  77. Data interface{} `json:"data"`
  78. Message string `json:"message,omitempty"`
  79. }
  80. // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
  81. type FilterFunc func(*CostData) (bool, string)
  82. // FilterCostData allows through only CostData that matches all the given filter functions
  83. func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
  84. result := make(map[string]*CostData)
  85. filteredEnvironments := make(map[string]int)
  86. filteredContainers := 0
  87. DataLoop:
  88. for key, datum := range data {
  89. for _, rf := range retains {
  90. if ok, _ := rf(datum); ok {
  91. result[key] = datum
  92. // if any retain function passes, the data is retained and move on
  93. continue DataLoop
  94. }
  95. }
  96. for _, ff := range filters {
  97. if ok, environment := ff(datum); !ok {
  98. if environment != "" {
  99. filteredEnvironments[environment]++
  100. }
  101. filteredContainers++
  102. // if any filter function check fails, move on to the next datum
  103. continue DataLoop
  104. }
  105. }
  106. result[key] = datum
  107. }
  108. return result, filteredContainers, filteredEnvironments
  109. }
  110. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  111. fs := strings.Split(fields, ",")
  112. fmap := make(map[string]bool)
  113. for _, f := range fs {
  114. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  115. klog.V(1).Infof("to delete: %s", fieldNameLower)
  116. fmap[fieldNameLower] = true
  117. }
  118. filteredData := make(map[string]CostData)
  119. for cname, costdata := range data {
  120. s := reflect.TypeOf(*costdata)
  121. val := reflect.ValueOf(*costdata)
  122. costdata2 := CostData{}
  123. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  124. n := s.NumField()
  125. for i := 0; i < n; i++ {
  126. field := s.Field(i)
  127. value := val.Field(i)
  128. value2 := cd2.Field(i)
  129. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  130. value2.Set(reflect.Value(value))
  131. }
  132. }
  133. filteredData[cname] = cd2.Interface().(CostData)
  134. }
  135. return filteredData
  136. }
  137. func normalizeTimeParam(param string) (string, error) {
  138. // convert days to hours
  139. if param[len(param)-1:] == "d" {
  140. count := param[:len(param)-1]
  141. val, err := strconv.ParseInt(count, 10, 64)
  142. if err != nil {
  143. return "", err
  144. }
  145. val = val * 24
  146. param = fmt.Sprintf("%dh", val)
  147. }
  148. return param, nil
  149. }
  150. // Parses the max query concurrency environment variable
  151. func maxQueryConcurrency() int {
  152. v := os.Getenv(maxQueryConcurrencyEnvVar)
  153. if v == "" {
  154. return 5
  155. }
  156. result, err := strconv.Atoi(v)
  157. if err != nil {
  158. log.Warningf("Failed to parse MAX_QUERY_CONCURRENCY. Defaulting to 5 - %s", err)
  159. return 5
  160. }
  161. return result
  162. }
  163. // writeReportingFlags writes the reporting flags to the cluster info map
  164. func writeReportingFlags(clusterInfo map[string]string) {
  165. clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
  166. clusterInfo["productAnalytics"] = fmt.Sprintf("%t", productAnalyticsEnabled)
  167. clusterInfo["errorReporting"] = fmt.Sprintf("%t", errorReportingEnabled)
  168. }
  169. // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  170. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  171. // return 0.0.
  172. func ParsePercentString(percentStr string) (float64, error) {
  173. if len(percentStr) == 0 {
  174. return 0.0, nil
  175. }
  176. if percentStr[len(percentStr)-1:] == "%" {
  177. percentStr = percentStr[:len(percentStr)-1]
  178. }
  179. discount, err := strconv.ParseFloat(percentStr, 64)
  180. if err != nil {
  181. return 0.0, err
  182. }
  183. discount *= 0.01
  184. return discount, nil
  185. }
  186. // parseDuration converts a Prometheus-style duration string into a Duration
  187. func ParseDuration(duration string) (*time.Duration, error) {
  188. unitStr := duration[len(duration)-1:]
  189. var unit time.Duration
  190. switch unitStr {
  191. case "s":
  192. unit = time.Second
  193. case "m":
  194. unit = time.Minute
  195. case "h":
  196. unit = time.Hour
  197. case "d":
  198. unit = 24.0 * time.Hour
  199. default:
  200. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  201. }
  202. amountStr := duration[:len(duration)-1]
  203. amount, err := strconv.ParseInt(amountStr, 10, 64)
  204. if err != nil {
  205. return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
  206. }
  207. dur := time.Duration(amount) * unit
  208. return &dur, nil
  209. }
  210. // ParseTimeRange returns a start and end time, respectively, which are converted from
  211. // a duration and offset, defined as strings with Prometheus-style syntax.
  212. func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
  213. // endTime defaults to the current time, unless an offset is explicity declared,
  214. // in which case it shifts endTime back by given duration
  215. endTime := time.Now()
  216. if offset != "" {
  217. o, err := ParseDuration(offset)
  218. if err != nil {
  219. return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
  220. }
  221. endTime = endTime.Add(-1 * *o)
  222. }
  223. // if duration is defined in terms of days, convert to hours
  224. // e.g. convert "2d" to "48h"
  225. durationNorm, err := normalizeTimeParam(duration)
  226. if err != nil {
  227. return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
  228. }
  229. // convert time duration into start and end times, formatted
  230. // as ISO datetime strings
  231. dur, err := time.ParseDuration(durationNorm)
  232. if err != nil {
  233. return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
  234. }
  235. startTime := endTime.Add(-1 * dur)
  236. return &startTime, &endTime, nil
  237. }
  238. func WrapDataWithMessage(data interface{}, err error, message string) []byte {
  239. var resp []byte
  240. if err != nil {
  241. klog.V(1).Infof("Error returned to client: %s", err.Error())
  242. resp, _ = json.Marshal(&DataEnvelope{
  243. Code: http.StatusInternalServerError,
  244. Status: "error",
  245. Message: err.Error(),
  246. Data: data,
  247. })
  248. } else {
  249. resp, _ = json.Marshal(&DataEnvelope{
  250. Code: http.StatusOK,
  251. Status: "success",
  252. Data: data,
  253. Message: message,
  254. })
  255. }
  256. return resp
  257. }
  258. func WrapData(data interface{}, err error) []byte {
  259. var resp []byte
  260. if err != nil {
  261. klog.V(1).Infof("Error returned to client: %s", err.Error())
  262. resp, _ = json.Marshal(&DataEnvelope{
  263. Code: http.StatusInternalServerError,
  264. Status: "error",
  265. Message: err.Error(),
  266. Data: data,
  267. })
  268. } else {
  269. resp, _ = json.Marshal(&DataEnvelope{
  270. Code: http.StatusOK,
  271. Status: "success",
  272. Data: data,
  273. })
  274. }
  275. return resp
  276. }
  277. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  278. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  279. w.Header().Set("Content-Type", "application/json")
  280. w.Header().Set("Access-Control-Allow-Origin", "*")
  281. err := a.Cloud.DownloadPricingData()
  282. w.Write(WrapData(nil, err))
  283. }
  284. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  285. w.Header().Set("Content-Type", "application/json")
  286. w.Header().Set("Access-Control-Allow-Origin", "*")
  287. window := r.URL.Query().Get("timeWindow")
  288. offset := r.URL.Query().Get("offset")
  289. fields := r.URL.Query().Get("filterFields")
  290. namespace := r.URL.Query().Get("namespace")
  291. if offset != "" {
  292. offset = "offset " + offset
  293. }
  294. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
  295. if fields != "" {
  296. filteredData := filterFields(fields, data)
  297. w.Write(WrapData(filteredData, err))
  298. } else {
  299. w.Write(WrapData(data, err))
  300. }
  301. }
  302. func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  303. w.Header().Set("Content-Type", "application/json")
  304. w.Header().Set("Access-Control-Allow-Origin", "*")
  305. window := r.URL.Query().Get("window")
  306. offset := r.URL.Query().Get("offset")
  307. data, err := ComputeClusterCosts(a.PrometheusClient, a.Cloud, window, offset, true)
  308. w.Write(WrapData(data, err))
  309. }
  310. func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  311. w.Header().Set("Content-Type", "application/json")
  312. w.Header().Set("Access-Control-Allow-Origin", "*")
  313. start := r.URL.Query().Get("start")
  314. end := r.URL.Query().Get("end")
  315. window := r.URL.Query().Get("window")
  316. offset := r.URL.Query().Get("offset")
  317. data, err := ClusterCostsOverTime(a.PrometheusClient, a.Cloud, start, end, window, offset)
  318. w.Write(WrapData(data, err))
  319. }
  320. func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  321. w.Header().Set("Content-Type", "application/json")
  322. w.Header().Set("Access-Control-Allow-Origin", "*")
  323. start := r.URL.Query().Get("start")
  324. end := r.URL.Query().Get("end")
  325. window := r.URL.Query().Get("window")
  326. fields := r.URL.Query().Get("filterFields")
  327. namespace := r.URL.Query().Get("namespace")
  328. cluster := r.URL.Query().Get("cluster")
  329. remote := r.URL.Query().Get("remote")
  330. remoteAvailable := os.Getenv(remoteEnabled)
  331. remoteEnabled := false
  332. if remoteAvailable == "true" && remote != "false" {
  333. remoteEnabled = true
  334. }
  335. // Use Thanos Client if it exists (enabled) and remote flag set
  336. var pClient prometheusClient.Client
  337. if remote != "false" && a.ThanosClient != nil {
  338. pClient = a.ThanosClient
  339. } else {
  340. pClient = a.PrometheusClient
  341. }
  342. resolutionHours := 1.0
  343. data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, resolutionHours, namespace, cluster, remoteEnabled)
  344. if err != nil {
  345. w.Write(WrapData(nil, err))
  346. }
  347. if fields != "" {
  348. filteredData := filterFields(fields, data)
  349. w.Write(WrapData(filteredData, err))
  350. } else {
  351. w.Write(WrapData(data, err))
  352. }
  353. }
  354. // CostDataModelRangeLarge is experimental multi-cluster and long-term data storage in SQL support.
  355. func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  356. w.Header().Set("Content-Type", "application/json")
  357. w.Header().Set("Access-Control-Allow-Origin", "*")
  358. startString := r.URL.Query().Get("start")
  359. endString := r.URL.Query().Get("end")
  360. windowString := r.URL.Query().Get("window")
  361. var start time.Time
  362. var end time.Time
  363. var err error
  364. if windowString == "" {
  365. windowString = "1h"
  366. }
  367. if startString != "" {
  368. start, err = time.Parse(RFC3339Milli, startString)
  369. if err != nil {
  370. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  371. w.Write(WrapData(nil, err))
  372. }
  373. } else {
  374. window, err := time.ParseDuration(windowString)
  375. if err != nil {
  376. w.Write(WrapData(nil, fmt.Errorf("Invalid duration '%s'", windowString)))
  377. }
  378. start = time.Now().Add(-2 * window)
  379. }
  380. if endString != "" {
  381. end, err = time.Parse(RFC3339Milli, endString)
  382. if err != nil {
  383. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  384. w.Write(WrapData(nil, err))
  385. }
  386. } else {
  387. end = time.Now()
  388. }
  389. remoteLayout := "2006-01-02T15:04:05Z"
  390. remoteStartStr := start.Format(remoteLayout)
  391. remoteEndStr := end.Format(remoteLayout)
  392. klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
  393. data, err := CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
  394. w.Write(WrapData(data, err))
  395. }
  396. func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
  397. var key string
  398. var filter string
  399. var val []string
  400. if customAggregation != "" {
  401. key = customAggregation
  402. filter = filterType
  403. val = strings.Split(customAggregation, ",")
  404. } else {
  405. aggregations := strings.Split(aggregator, ",")
  406. for i, agg := range aggregations {
  407. aggregations[i] = "kubernetes_" + agg
  408. }
  409. key = strings.Join(aggregations, ",")
  410. filter = "kubernetes_" + filterType
  411. val = aggregations
  412. }
  413. return key, val, filter
  414. }
  415. func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  416. w.Header().Set("Content-Type", "application/json")
  417. w.Header().Set("Access-Control-Allow-Origin", "*")
  418. start := r.URL.Query().Get("start")
  419. end := r.URL.Query().Get("end")
  420. aggregator := r.URL.Query().Get("aggregator")
  421. customAggregation := r.URL.Query().Get("customAggregation")
  422. filterType := r.URL.Query().Get("filterType")
  423. filterValue := r.URL.Query().Get("filterValue")
  424. var data []*costAnalyzerCloud.OutOfClusterAllocation
  425. var err error
  426. _, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
  427. data, err = a.Cloud.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
  428. w.Write(WrapData(data, err))
  429. }
  430. func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  431. w.Header().Set("Content-Type", "application/json")
  432. w.Header().Set("Access-Control-Allow-Origin", "*")
  433. // start date for which to query costs, inclusive; format YYYY-MM-DD
  434. start := r.URL.Query().Get("start")
  435. // end date for which to query costs, inclusive; format YYYY-MM-DD
  436. end := r.URL.Query().Get("end")
  437. // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
  438. kubernetesAggregation := r.URL.Query().Get("aggregator")
  439. // customAggregation allows full customization of aggregator w/o prepending
  440. customAggregation := r.URL.Query().Get("customAggregation")
  441. // disableCache, if set to "true", tells this function to recompute and
  442. // cache the requested data
  443. disableCache := r.URL.Query().Get("disableCache") == "true"
  444. // clearCache, if set to "true", tells this function to flush the cache,
  445. // then recompute and cache the requested data
  446. clearCache := r.URL.Query().Get("clearCache") == "true"
  447. filterType := r.URL.Query().Get("filterType")
  448. filterValue := r.URL.Query().Get("filterValue")
  449. aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
  450. // clear cache prior to checking the cache so that a clearCache=true
  451. // request always returns a freshly computed value
  452. if clearCache {
  453. a.OutOfClusterCache.Flush()
  454. }
  455. // attempt to retrieve cost data from cache
  456. key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
  457. if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
  458. if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
  459. w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
  460. return
  461. }
  462. klog.Errorf("caching error: failed to type cast data: %s", key)
  463. }
  464. data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
  465. if err == nil {
  466. a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
  467. }
  468. w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
  469. }
  470. func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  471. w.Header().Set("Content-Type", "application/json")
  472. w.Header().Set("Access-Control-Allow-Origin", "*")
  473. data, err := p.Cloud.AllNodePricing()
  474. w.Write(WrapData(data, err))
  475. }
  476. func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  477. w.Header().Set("Content-Type", "application/json")
  478. w.Header().Set("Access-Control-Allow-Origin", "*")
  479. data, err := p.Cloud.GetConfig()
  480. w.Write(WrapData(data, err))
  481. }
  482. func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  483. w.Header().Set("Content-Type", "application/json")
  484. w.Header().Set("Access-Control-Allow-Origin", "*")
  485. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
  486. if err != nil {
  487. w.Write(WrapData(data, err))
  488. return
  489. }
  490. w.Write(WrapData(data, err))
  491. err = p.Cloud.DownloadPricingData()
  492. if err != nil {
  493. klog.V(1).Infof("Error redownloading data on config update: %s", err.Error())
  494. }
  495. return
  496. }
  497. func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  498. w.Header().Set("Content-Type", "application/json")
  499. w.Header().Set("Access-Control-Allow-Origin", "*")
  500. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
  501. if err != nil {
  502. w.Write(WrapData(data, err))
  503. return
  504. }
  505. w.Write(WrapData(data, err))
  506. return
  507. }
  508. func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  509. w.Header().Set("Content-Type", "application/json")
  510. w.Header().Set("Access-Control-Allow-Origin", "*")
  511. data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
  512. if err != nil {
  513. w.Write(WrapData(data, err))
  514. return
  515. }
  516. w.Write(WrapData(data, err))
  517. return
  518. }
  519. func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  520. w.Header().Set("Content-Type", "application/json")
  521. w.Header().Set("Access-Control-Allow-Origin", "*")
  522. data, err := p.Cloud.UpdateConfig(r.Body, "")
  523. if err != nil {
  524. w.Write(WrapData(data, err))
  525. return
  526. }
  527. w.Write(WrapData(data, err))
  528. return
  529. }
  530. func (p *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  531. w.Header().Set("Content-Type", "application/json")
  532. w.Header().Set("Access-Control-Allow-Origin", "*")
  533. data, err := p.Cloud.GetManagementPlatform()
  534. if err != nil {
  535. w.Write(WrapData(data, err))
  536. return
  537. }
  538. w.Write(WrapData(data, err))
  539. return
  540. }
  541. func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  542. w.Header().Set("Content-Type", "application/json")
  543. w.Header().Set("Access-Control-Allow-Origin", "*")
  544. data, err := p.Cloud.ClusterInfo()
  545. kc, ok := p.KubeClientSet.(*kubernetes.Clientset)
  546. if ok && data != nil {
  547. v, err := kc.ServerVersion()
  548. if err != nil {
  549. klog.Infof("Could not get k8s version info: %s", err.Error())
  550. } else if v != nil {
  551. data["version"] = v.Major + "." + v.Minor
  552. }
  553. } else {
  554. klog.Infof("Could not get k8s version info: %s", err.Error())
  555. }
  556. // Include Product Reporting Flags with Cluster Info
  557. writeReportingFlags(data)
  558. w.Write(WrapData(data, err))
  559. }
  560. func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  561. w.Header().Set("Content-Type", "application/json")
  562. w.Header().Set("Access-Control-Allow-Origin", "*")
  563. w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
  564. }
  565. func (a *Accesses) recordPrices() {
  566. go func() {
  567. defer errors.HandlePanic()
  568. containerSeen := make(map[string]bool)
  569. nodeSeen := make(map[string]bool)
  570. pvSeen := make(map[string]bool)
  571. pvcSeen := make(map[string]bool)
  572. getKeyFromLabelStrings := func(labels ...string) string {
  573. return strings.Join(labels, ",")
  574. }
  575. getLabelStringsFromKey := func(key string) []string {
  576. return strings.Split(key, ",")
  577. }
  578. var defaultRegion string = ""
  579. nodeList := a.Model.Cache.GetAllNodes()
  580. if len(nodeList) > 0 {
  581. defaultRegion = nodeList[0].Labels[v1.LabelZoneRegion]
  582. }
  583. for {
  584. klog.V(4).Info("Recording prices...")
  585. podlist := a.Model.Cache.GetAllPods()
  586. podStatus := make(map[string]v1.PodPhase)
  587. for _, pod := range podlist {
  588. podStatus[pod.Name] = pod.Status.Phase
  589. }
  590. cfg, _ := a.Cloud.GetConfig()
  591. // Record network pricing at global scope
  592. networkCosts, err := a.Cloud.NetworkPricing()
  593. if err != nil {
  594. klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
  595. } else {
  596. a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  597. a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  598. a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  599. }
  600. data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, "2m", "", "")
  601. if err != nil {
  602. klog.V(1).Info("Error in price recording: " + err.Error())
  603. // zero the for loop so the time.Sleep will still work
  604. data = map[string]*CostData{}
  605. }
  606. nodes, err := a.Model.GetNodeCost(a.Cloud)
  607. for nodeName, node := range nodes {
  608. // Emit costs, guarding against NaN inputs for custom pricing.
  609. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  610. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  611. cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
  612. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  613. cpuCost = 0
  614. }
  615. }
  616. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  617. if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
  618. cpu = 1 // Assume 1 CPU
  619. }
  620. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  621. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  622. ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
  623. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  624. ramCost = 0
  625. }
  626. }
  627. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  628. if math.IsNaN(ram) || math.IsInf(ram, 0) {
  629. ram = 0
  630. }
  631. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  632. if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
  633. gpu = 0
  634. }
  635. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  636. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  637. gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
  638. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  639. gpuCost = 0
  640. }
  641. }
  642. nodeType := node.InstanceType
  643. nodeRegion := node.Region
  644. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  645. a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion).Set(cpuCost)
  646. a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion).Set(ramCost)
  647. a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion).Set(gpuCost)
  648. a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion).Set(totalCost)
  649. labelKey := getKeyFromLabelStrings(nodeName, nodeName)
  650. nodeSeen[labelKey] = true
  651. }
  652. for _, costs := range data {
  653. nodeName := costs.NodeName
  654. namespace := costs.Namespace
  655. podName := costs.PodName
  656. containerName := costs.Name
  657. if costs.PVCData != nil {
  658. for _, pvc := range costs.PVCData {
  659. if pvc.Volume != nil {
  660. a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value)
  661. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  662. pvcSeen[labelKey] = true
  663. }
  664. }
  665. }
  666. if len(costs.RAMAllocation) > 0 {
  667. a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  668. }
  669. if len(costs.CPUAllocation) > 0 {
  670. a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  671. }
  672. if len(costs.GPUReq) > 0 {
  673. // allocation here is set to the request because shared GPU usage not yet supported.
  674. a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  675. }
  676. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  677. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  678. containerSeen[labelKey] = true
  679. } else {
  680. containerSeen[labelKey] = false
  681. }
  682. storageClasses := a.Model.Cache.GetAllStorageClasses()
  683. storageClassMap := make(map[string]map[string]string)
  684. for _, storageClass := range storageClasses {
  685. params := storageClass.Parameters
  686. storageClassMap[storageClass.ObjectMeta.Name] = params
  687. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  688. storageClassMap["default"] = params
  689. storageClassMap[""] = params
  690. }
  691. }
  692. pvs := a.Model.Cache.GetAllPersistentVolumes()
  693. for _, pv := range pvs {
  694. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  695. if !ok {
  696. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  697. }
  698. var region string
  699. if r, ok := pv.Labels[v1.LabelZoneRegion]; ok {
  700. region = r
  701. } else {
  702. region = defaultRegion
  703. }
  704. cacPv := &costAnalyzerCloud.PV{
  705. Class: pv.Spec.StorageClassName,
  706. Region: region,
  707. Parameters: parameters,
  708. }
  709. GetPVCost(cacPv, pv, a.Cloud, region)
  710. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  711. a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
  712. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  713. pvSeen[labelKey] = true
  714. }
  715. }
  716. for labelString, seen := range nodeSeen {
  717. if !seen {
  718. labels := getLabelStringsFromKey(labelString)
  719. a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  720. a.CPUPriceRecorder.DeleteLabelValues(labels...)
  721. a.GPUPriceRecorder.DeleteLabelValues(labels...)
  722. a.RAMPriceRecorder.DeleteLabelValues(labels...)
  723. delete(nodeSeen, labelString)
  724. }
  725. nodeSeen[labelString] = false
  726. }
  727. for labelString, seen := range containerSeen {
  728. if !seen {
  729. labels := getLabelStringsFromKey(labelString)
  730. a.RAMAllocationRecorder.DeleteLabelValues(labels...)
  731. a.CPUAllocationRecorder.DeleteLabelValues(labels...)
  732. a.GPUAllocationRecorder.DeleteLabelValues(labels...)
  733. delete(containerSeen, labelString)
  734. }
  735. containerSeen[labelString] = false
  736. }
  737. for labelString, seen := range pvSeen {
  738. if !seen {
  739. labels := getLabelStringsFromKey(labelString)
  740. a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  741. delete(pvSeen, labelString)
  742. }
  743. pvSeen[labelString] = false
  744. }
  745. for labelString, seen := range pvcSeen {
  746. if !seen {
  747. labels := getLabelStringsFromKey(labelString)
  748. a.PVAllocationRecorder.DeleteLabelValues(labels...)
  749. delete(pvcSeen, labelString)
  750. }
  751. pvcSeen[labelString] = false
  752. }
  753. time.Sleep(time.Minute)
  754. }
  755. }()
  756. }
  757. // Creates a new ClusterManager instance using a boltdb storage. If that fails,
  758. // then we fall back to a memory-only storage.
  759. func newClusterManager() *cm.ClusterManager {
  760. clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
  761. // Return a memory-backed cluster manager populated by configmap
  762. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  763. // NOTE: The following should be used with a persistent disk store. Since the
  764. // NOTE: configmap approach is currently the "persistent" source (entries are read-only
  765. // NOTE: on the backend), we don't currently need to store on disk.
  766. /*
  767. path := os.Getenv("CONFIG_PATH")
  768. db, err := bolt.Open(path+"costmodel.db", 0600, nil)
  769. if err != nil {
  770. klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
  771. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  772. }
  773. store, err := cm.NewBoltDBClusterStorage("clusters", db)
  774. if err != nil {
  775. klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
  776. return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
  777. }
  778. return cm.NewConfiguredClusterManager(store, clustersConfigFile)
  779. */
  780. }
  781. type ConfigWatchers struct {
  782. ConfigmapName string
  783. WatchFunc func(string, map[string]string) error
  784. }
  785. // captures the panic event in sentry
  786. func capturePanicEvent(err string, stack string) {
  787. msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
  788. sentry.CurrentHub().CaptureEvent(&sentry.Event{
  789. Level: sentry.LevelError,
  790. Message: msg,
  791. })
  792. sentry.Flush(5 * time.Second)
  793. }
  794. // handle any panics reported by the errors package
  795. func handlePanic(p errors.Panic) bool {
  796. err := p.Error
  797. if err != nil {
  798. if err, ok := err.(error); ok {
  799. capturePanicEvent(err.Error(), p.Stack)
  800. }
  801. if err, ok := err.(string); ok {
  802. capturePanicEvent(err, p.Stack)
  803. }
  804. }
  805. // Return true to recover iff the type is http, otherwise allow kubernetes
  806. // to recover.
  807. return p.Type == errors.PanicTypeHTTP
  808. }
  809. func Initialize(additionalConfigWatchers ...ConfigWatchers) {
  810. klog.InitFlags(nil)
  811. flag.Set("v", "3")
  812. flag.Parse()
  813. klog.V(1).Infof("Starting cost-model (git commit \"%s\")", gitCommit)
  814. var err error
  815. if errorReportingEnabled {
  816. err = sentry.Init(sentry.ClientOptions{Release: gitCommit})
  817. if err != nil {
  818. klog.Infof("Failed to initialize sentry for error reporting")
  819. } else {
  820. err = errors.SetPanicHandler(handlePanic)
  821. if err != nil {
  822. klog.Infof("Failed to set panic handler: %s", err)
  823. }
  824. }
  825. }
  826. address := os.Getenv(prometheusServerEndpointEnvVar)
  827. if address == "" {
  828. klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
  829. }
  830. queryConcurrency := maxQueryConcurrency()
  831. var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
  832. Proxy: http.ProxyFromEnvironment,
  833. DialContext: (&net.Dialer{
  834. Timeout: 120 * time.Second,
  835. KeepAlive: 120 * time.Second,
  836. }).DialContext,
  837. TLSHandshakeTimeout: 10 * time.Second,
  838. }
  839. pc := prometheusClient.Config{
  840. Address: address,
  841. RoundTripper: LongTimeoutRoundTripper,
  842. }
  843. promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency)
  844. m, err := ValidatePrometheus(promCli, false)
  845. if err != nil || m.Running == false {
  846. if err != nil {
  847. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
  848. } else if m.Running == false {
  849. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
  850. }
  851. api := prometheusAPI.NewAPI(promCli)
  852. _, err = api.Config(context.Background())
  853. if err != nil {
  854. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
  855. } else {
  856. klog.V(1).Info("Retrieved a prometheus config file from: " + address)
  857. }
  858. } else {
  859. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  860. }
  861. // Kubernetes API setup
  862. kc, err := rest.InClusterConfig()
  863. if err != nil {
  864. panic(err.Error())
  865. }
  866. kubeClientset, err := kubernetes.NewForConfig(kc)
  867. if err != nil {
  868. panic(err.Error())
  869. }
  870. // Create Kubernetes Cluster Cache + Watchers
  871. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  872. k8sCache.Run()
  873. cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
  874. cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
  875. if err != nil {
  876. panic(err.Error())
  877. }
  878. watchConfigFunc := func(c interface{}) {
  879. conf := c.(*v1.ConfigMap)
  880. if conf.GetName() == "pricing-configs" {
  881. _, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
  882. if err != nil {
  883. klog.Infof("ERROR UPDATING %s CONFIG: %s", "pricing-configs", err.Error())
  884. }
  885. }
  886. for _, cw := range additionalConfigWatchers {
  887. if conf.GetName() == cw.ConfigmapName {
  888. err := cw.WatchFunc(conf.GetName(), conf.Data)
  889. if err != nil {
  890. klog.Infof("ERROR UPDATING %s CONFIG: %s", cw.ConfigmapName, err.Error())
  891. }
  892. }
  893. }
  894. }
  895. kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
  896. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  897. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
  898. if err != nil {
  899. klog.Infof("No %s configmap found at installtime, using existing configs: %s", "pricing-configs", err.Error())
  900. } else {
  901. watchConfigFunc(configs)
  902. }
  903. for _, cw := range additionalConfigWatchers {
  904. configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(cw.ConfigmapName, metav1.GetOptions{})
  905. if err != nil {
  906. klog.Infof("No %s configmap found at installtime, using existing configs: %s", cw.ConfigmapName, err.Error())
  907. } else {
  908. watchConfigFunc(configs)
  909. }
  910. }
  911. k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
  912. // TODO: General Architecture Note: Several passes have been made to modularize a lot of
  913. // TODO: our code, but the router still continues to be the obvious entry point for new \
  914. // TODO: features. We should look to spliting out the actual "router" functionality and
  915. // TODO: implement a builder -> controller for stitching new features and other dependencies.
  916. clusterManager := newClusterManager()
  917. cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  918. Name: "node_cpu_hourly_cost",
  919. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  920. }, []string{"instance", "node", "instance_type", "region"})
  921. ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  922. Name: "node_ram_hourly_cost",
  923. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  924. }, []string{"instance", "node", "instance_type", "region"})
  925. gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  926. Name: "node_gpu_hourly_cost",
  927. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  928. }, []string{"instance", "node", "instance_type", "region"})
  929. totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  930. Name: "node_total_hourly_cost",
  931. Help: "node_total_hourly_cost Total node cost per hour",
  932. }, []string{"instance", "node", "instance_type", "region"})
  933. pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  934. Name: "pv_hourly_cost",
  935. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  936. }, []string{"volumename", "persistentvolume"})
  937. RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  938. Name: "container_memory_allocation_bytes",
  939. Help: "container_memory_allocation_bytes Bytes of RAM used",
  940. }, []string{"namespace", "pod", "container", "instance", "node"})
  941. CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  942. Name: "container_cpu_allocation",
  943. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  944. }, []string{"namespace", "pod", "container", "instance", "node"})
  945. GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  946. Name: "container_gpu_allocation",
  947. Help: "container_gpu_allocation GPU used",
  948. }, []string{"namespace", "pod", "container", "instance", "node"})
  949. PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
  950. Name: "pod_pvc_allocation",
  951. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  952. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  953. NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  954. Name: "kubecost_network_zone_egress_cost",
  955. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  956. })
  957. NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  958. Name: "kubecost_network_region_egress_cost",
  959. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  960. })
  961. NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
  962. Name: "kubecost_network_internet_egress_cost",
  963. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  964. })
  965. prometheus.MustRegister(cpuGv)
  966. prometheus.MustRegister(ramGv)
  967. prometheus.MustRegister(gpuGv)
  968. prometheus.MustRegister(totalGv)
  969. prometheus.MustRegister(pvGv)
  970. prometheus.MustRegister(RAMAllocation)
  971. prometheus.MustRegister(CPUAllocation)
  972. prometheus.MustRegister(PVAllocation)
  973. prometheus.MustRegister(GPUAllocation)
  974. prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
  975. prometheus.MustRegister(ServiceCollector{
  976. KubeClientSet: kubeClientset,
  977. })
  978. prometheus.MustRegister(DeploymentCollector{
  979. KubeClientSet: kubeClientset,
  980. })
  981. prometheus.MustRegister(StatefulsetCollector{
  982. KubeClientSet: kubeClientset,
  983. })
  984. // cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
  985. outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
  986. A = Accesses{
  987. PrometheusClient: promCli,
  988. KubeClientSet: kubeClientset,
  989. ClusterManager: clusterManager,
  990. Cloud: cloudProvider,
  991. CPUPriceRecorder: cpuGv,
  992. RAMPriceRecorder: ramGv,
  993. GPUPriceRecorder: gpuGv,
  994. NodeTotalPriceRecorder: totalGv,
  995. RAMAllocationRecorder: RAMAllocation,
  996. CPUAllocationRecorder: CPUAllocation,
  997. GPUAllocationRecorder: GPUAllocation,
  998. PVAllocationRecorder: PVAllocation,
  999. NetworkZoneEgressRecorder: NetworkZoneEgressRecorder,
  1000. NetworkRegionEgressRecorder: NetworkRegionEgressRecorder,
  1001. NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
  1002. PersistentVolumePriceRecorder: pvGv,
  1003. Model: NewCostModel(k8sCache),
  1004. OutOfClusterCache: outOfClusterCache,
  1005. }
  1006. remoteEnabled := os.Getenv(remoteEnabled)
  1007. if remoteEnabled == "true" {
  1008. info, err := cloudProvider.ClusterInfo()
  1009. klog.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
  1010. if err != nil {
  1011. klog.Infof("Error saving cluster id %s", err.Error())
  1012. }
  1013. _, _, err = costAnalyzerCloud.GetOrCreateClusterMeta(info["id"], info["name"])
  1014. if err != nil {
  1015. klog.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
  1016. }
  1017. }
  1018. // Thanos Client
  1019. if os.Getenv(thanosEnabled) == "true" {
  1020. thanosUrl := os.Getenv(thanosQueryUrl)
  1021. if thanosUrl != "" {
  1022. var thanosRT http.RoundTripper = &http.Transport{
  1023. Proxy: http.ProxyFromEnvironment,
  1024. DialContext: (&net.Dialer{
  1025. Timeout: 120 * time.Second,
  1026. KeepAlive: 120 * time.Second,
  1027. }).DialContext,
  1028. TLSHandshakeTimeout: 10 * time.Second,
  1029. }
  1030. thanosConfig := prometheusClient.Config{
  1031. Address: thanosUrl,
  1032. RoundTripper: thanosRT,
  1033. }
  1034. thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency)
  1035. _, err = ValidatePrometheus(thanosCli, true)
  1036. if err != nil {
  1037. klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
  1038. A.ThanosClient = thanosCli
  1039. } else {
  1040. klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
  1041. A.ThanosClient = thanosCli
  1042. }
  1043. } else {
  1044. klog.Infof("Error resolving environment variable: $%s", thanosQueryUrl)
  1045. }
  1046. }
  1047. err = A.Cloud.DownloadPricingData()
  1048. if err != nil {
  1049. klog.V(1).Info("Failed to download pricing data: " + err.Error())
  1050. }
  1051. A.recordPrices()
  1052. managerEndpoints := cm.NewClusterManagerEndpoints(A.ClusterManager)
  1053. Router.GET("/costDataModel", A.CostDataModel)
  1054. Router.GET("/costDataModelRange", A.CostDataModelRange)
  1055. Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
  1056. Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
  1057. Router.GET("/allNodePricing", A.GetAllNodePricing)
  1058. Router.POST("/refreshPricing", A.RefreshPricingData)
  1059. Router.GET("/clusterCostsOverTime", A.ClusterCostsOverTime)
  1060. Router.GET("/clusterCosts", A.ClusterCosts)
  1061. Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
  1062. Router.GET("/managementPlatform", A.ManagementPlatform)
  1063. Router.GET("/clusterInfo", A.ClusterInfo)
  1064. Router.GET("/clusters", managerEndpoints.GetAllClusters)
  1065. Router.PUT("/clusters", managerEndpoints.PutCluster)
  1066. Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
  1067. }