costmodel.go 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "net/http"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  14. prometheusClient "github.com/prometheus/client_golang/api"
  15. v1 "k8s.io/api/core/v1"
  16. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  17. "k8s.io/apimachinery/pkg/labels"
  18. "k8s.io/client-go/kubernetes"
  19. "k8s.io/client-go/tools/cache"
  20. "k8s.io/client-go/util/workqueue"
  21. "k8s.io/klog"
  22. )
  23. const (
  24. statusAPIError = 422
  25. apiPrefix = "/api/v1"
  26. epAlertManagers = apiPrefix + "/alertmanagers"
  27. epQuery = apiPrefix + "/query"
  28. epQueryRange = apiPrefix + "/query_range"
  29. epLabelValues = apiPrefix + "/label/:name/values"
  30. epSeries = apiPrefix + "/series"
  31. epTargets = apiPrefix + "/targets"
  32. epSnapshot = apiPrefix + "/admin/tsdb/snapshot"
  33. epDeleteSeries = apiPrefix + "/admin/tsdb/delete_series"
  34. epCleanTombstones = apiPrefix + "/admin/tsdb/clean_tombstones"
  35. epConfig = apiPrefix + "/status/config"
  36. epFlags = apiPrefix + "/status/flags"
  37. )
  38. type CostModel struct {
  39. Controller *Controller
  40. }
  41. func NewCostModel(podListWatcher cache.ListerWatcher) *CostModel {
  42. cm := &CostModel{}
  43. cm.ContainerWatcher(podListWatcher)
  44. return cm
  45. }
  46. func (cm *CostModel) ContainerWatcher(podListWatcher cache.ListerWatcher) {
  47. // create the workqueue
  48. queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
  49. // Bind the workqueue to a cache with the help of an informer. This way we make sure that
  50. // whenever the cache is updated, the pod key is added to the workqueue.
  51. // Note that when we finally process the item from the workqueue, we might see a newer version
  52. // of the Pod than the version which was responsible for triggering the update.
  53. indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
  54. AddFunc: func(obj interface{}) {
  55. key, err := cache.MetaNamespaceKeyFunc(obj)
  56. if err == nil {
  57. queue.Add(key)
  58. }
  59. },
  60. UpdateFunc: func(old interface{}, new interface{}) {
  61. key, err := cache.MetaNamespaceKeyFunc(new)
  62. if err == nil {
  63. queue.Add(key)
  64. }
  65. },
  66. DeleteFunc: func(obj interface{}) {
  67. // IndexerInformer uses a delta queue, therefore for deletes we have to use this
  68. // key function.
  69. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  70. if err == nil {
  71. queue.Add(key)
  72. }
  73. },
  74. }, cache.Indexers{})
  75. cm.Controller = NewController(queue, indexer, informer)
  76. // Now let's start the controller
  77. stop := make(chan struct{})
  78. //defer close(stop)
  79. go cm.Controller.Run(1, stop)
  80. }
  81. type CostData struct {
  82. Name string `json:"name,omitempty"`
  83. PodName string `json:"podName,omitempty"`
  84. NodeName string `json:"nodeName,omitempty"`
  85. NodeData *costAnalyzerCloud.Node `json:"node,omitempty"`
  86. Namespace string `json:"namespace,omitempty"`
  87. Deployments []string `json:"deployments,omitempty"`
  88. Services []string `json:"services,omitempty"`
  89. Daemonsets []string `json:"daemonsets,omitempty"`
  90. Statefulsets []string `json:"statefulsets,omitempty"`
  91. Jobs []string `json:"jobs,omitempty"`
  92. RAMReq []*Vector `json:"ramreq,omitempty"`
  93. RAMUsed []*Vector `json:"ramused,omitempty"`
  94. CPUReq []*Vector `json:"cpureq,omitempty"`
  95. CPUUsed []*Vector `json:"cpuused,omitempty"`
  96. RAMAllocation []*Vector `json:"ramallocated,omitempty"`
  97. CPUAllocation []*Vector `json:"cpuallocated,omitempty"`
  98. GPUReq []*Vector `json:"gpureq,omitempty"`
  99. PVCData []*PersistentVolumeClaimData `json:"pvcData,omitempty"`
  100. Labels map[string]string `json:"labels,omitempty"`
  101. NamespaceLabels map[string]string `json:"namespaceLabels,omitempty"`
  102. }
  103. type Vector struct {
  104. Timestamp float64 `json:"timestamp"`
  105. Value float64 `json:"value"`
  106. }
  107. const (
  108. queryRAMRequestsStr = `avg(
  109. label_replace(
  110. label_replace(
  111. avg(
  112. count_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD", node!=""}[%s] %s)
  113. *
  114. avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD", node!=""}[%s] %s)
  115. ) by (namespace,container,pod,node) , "container_name","$1","container","(.+)"
  116. ), "pod_name","$1","pod","(.+)"
  117. )
  118. ) by (namespace,container_name,pod_name,node)`
  119. queryRAMUsageStr = `sort_desc(
  120. avg(
  121. label_replace(count_over_time(container_memory_working_set_bytes{container_name!="",container_name!="POD", instance!=""}[%s] %s), "node", "$1", "instance","(.+)")
  122. *
  123. label_replace(avg_over_time(container_memory_working_set_bytes{container_name!="",container_name!="POD", instance!=""}[%s] %s), "node", "$1", "instance","(.+)")
  124. ) by (namespace,container_name,pod_name,node)
  125. )`
  126. queryCPURequestsStr = `avg(
  127. label_replace(
  128. label_replace(
  129. avg(
  130. count_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD", node!=""}[%s] %s)
  131. *
  132. avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD", node!=""}[%s] %s)
  133. ) by (namespace,container,pod,node) , "container_name","$1","container","(.+)"
  134. ), "pod_name","$1","pod","(.+)"
  135. )
  136. ) by (namespace,container_name,pod_name,node)`
  137. queryCPUUsageStr = `avg(
  138. label_replace(
  139. rate(
  140. container_cpu_usage_seconds_total{container_name!="",container_name!="POD",instance!=""}[%s] %s
  141. ) , "node", "$1", "instance", "(.+)"
  142. )
  143. ) by (namespace,container_name,pod_name,node)`
  144. queryGPURequestsStr = `avg(
  145. label_replace(
  146. label_replace(
  147. avg(
  148. count_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s] %s)
  149. *
  150. avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s] %s)
  151. ) by (namespace,container,pod,node) , "container_name","$1","container","(.+)"
  152. ), "pod_name","$1","pod","(.+)"
  153. )
  154. ) by (namespace,container_name,pod_name,node)`
  155. queryPVRequestsStr = `avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace, volumename)
  156. *
  157. on (persistentvolumeclaim, namespace) group_right(storageclass, volumename)
  158. sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace)`
  159. normalizationStr = `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[%s] %s))`
  160. )
  161. type PrometheusMetadata struct {
  162. Running bool `json:"running"`
  163. KubecostDataExists bool `json:"kubecostDataExists"`
  164. }
  165. // ValidatePrometheus tells the model what data prometheus has on it.
  166. func ValidatePrometheus(cli prometheusClient.Client) (*PrometheusMetadata, error) {
  167. data, err := query(cli, "up")
  168. if err != nil {
  169. return &PrometheusMetadata{
  170. Running: false,
  171. KubecostDataExists: false,
  172. }, err
  173. }
  174. v, kcmetrics, err := getUptimeData(data)
  175. if err != nil {
  176. return &PrometheusMetadata{
  177. Running: false,
  178. KubecostDataExists: false,
  179. }, err
  180. }
  181. if len(v) > 0 {
  182. return &PrometheusMetadata{
  183. Running: true,
  184. KubecostDataExists: kcmetrics,
  185. }, nil
  186. } else {
  187. return &PrometheusMetadata{
  188. Running: false,
  189. KubecostDataExists: false,
  190. }, fmt.Errorf("No running jobs found on Prometheus at %s", cli.URL(epQuery, nil).Path)
  191. }
  192. }
  193. func getUptimeData(qr interface{}) ([]*Vector, bool, error) {
  194. data, ok := qr.(map[string]interface{})["data"]
  195. if !ok {
  196. e, err := wrapPrometheusError(qr)
  197. if err != nil {
  198. return nil, false, err
  199. }
  200. return nil, false, fmt.Errorf(e)
  201. }
  202. r, ok := data.(map[string]interface{})["result"]
  203. if !ok {
  204. return nil, false, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  205. }
  206. results, ok := r.([]interface{})
  207. if !ok {
  208. return nil, false, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  209. }
  210. jobData := []*Vector{}
  211. kubecostMetrics := false
  212. for _, val := range results {
  213. // For now, just do this for validation. TODO: This can be parsed to figure out the exact running jobs.
  214. metrics, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
  215. if !ok {
  216. return nil, false, fmt.Errorf("Prometheus vector does not have metric labels")
  217. }
  218. jobname, ok := metrics["job"]
  219. if !ok {
  220. return nil, false, fmt.Errorf("up query does not have job names")
  221. }
  222. if jobname == "kubecost" {
  223. kubecostMetrics = true
  224. }
  225. value, ok := val.(map[string]interface{})["value"]
  226. if !ok {
  227. return nil, false, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  228. }
  229. dataPoint, ok := value.([]interface{})
  230. if !ok || len(dataPoint) != 2 {
  231. return nil, false, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  232. }
  233. strVal := dataPoint[1].(string)
  234. v, _ := strconv.ParseFloat(strVal, 64)
  235. toReturn := &Vector{
  236. Timestamp: dataPoint[0].(float64),
  237. Value: v,
  238. }
  239. jobData = append(jobData, toReturn)
  240. }
  241. return jobData, kubecostMetrics, nil
  242. }
  243. func ComputeUptimes(cli prometheusClient.Client) (map[string]float64, error) {
  244. res, err := query(cli, `container_start_time_seconds{container_name != "POD",container_name != ""}`)
  245. if err != nil {
  246. return nil, err
  247. }
  248. vectors, err := getContainerMetricVector(res, false, 0)
  249. if err != nil {
  250. return nil, err
  251. }
  252. results := make(map[string]float64)
  253. for key, vector := range vectors {
  254. if err != nil {
  255. return nil, err
  256. }
  257. val := vector[0].Value
  258. uptime := time.Now().Sub(time.Unix(int64(val), 0)).Seconds()
  259. results[key] = uptime
  260. }
  261. return results, nil
  262. }
  263. func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
  264. queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, window, offset, window, offset)
  265. queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset)
  266. queryCPURequests := fmt.Sprintf(queryCPURequestsStr, window, offset, window, offset)
  267. queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, window, offset)
  268. queryGPURequests := fmt.Sprintf(queryGPURequestsStr, window, offset, window, offset)
  269. queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
  270. normalization := fmt.Sprintf(normalizationStr, window, offset)
  271. var wg sync.WaitGroup
  272. wg.Add(8)
  273. var promErr error
  274. var resultRAMRequests interface{}
  275. go func() {
  276. resultRAMRequests, promErr = query(cli, queryRAMRequests)
  277. defer wg.Done()
  278. }()
  279. var resultRAMUsage interface{}
  280. go func() {
  281. resultRAMUsage, promErr = query(cli, queryRAMUsage)
  282. defer wg.Done()
  283. }()
  284. var resultCPURequests interface{}
  285. go func() {
  286. resultCPURequests, promErr = query(cli, queryCPURequests)
  287. defer wg.Done()
  288. }()
  289. var resultCPUUsage interface{}
  290. go func() {
  291. resultCPUUsage, promErr = query(cli, queryCPUUsage)
  292. defer wg.Done()
  293. }()
  294. var resultGPURequests interface{}
  295. go func() {
  296. resultGPURequests, promErr = query(cli, queryGPURequests)
  297. defer wg.Done()
  298. }()
  299. var resultPVRequests interface{}
  300. go func() {
  301. resultPVRequests, promErr = query(cli, queryPVRequests)
  302. defer wg.Done()
  303. }()
  304. var normalizationResult interface{}
  305. go func() {
  306. normalizationResult, promErr = query(cli, normalization)
  307. defer wg.Done()
  308. }()
  309. podDeploymentsMapping := make(map[string]map[string][]string)
  310. podServicesMapping := make(map[string]map[string][]string)
  311. namespaceLabelsMapping := make(map[string]map[string]string)
  312. podlist := cm.Controller.GetAll()
  313. var k8sErr error
  314. go func() {
  315. defer wg.Done()
  316. podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
  317. if k8sErr != nil {
  318. return
  319. }
  320. podServicesMapping, k8sErr = getPodServices(clientset, podlist)
  321. if k8sErr != nil {
  322. return
  323. }
  324. namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
  325. if k8sErr != nil {
  326. return
  327. }
  328. }()
  329. wg.Wait()
  330. if promErr != nil {
  331. return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
  332. }
  333. if k8sErr != nil {
  334. return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
  335. }
  336. normalizationValue, err := getNormalization(normalizationResult)
  337. if err != nil {
  338. return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
  339. }
  340. nodes, err := getNodeCost(clientset, cloud)
  341. if err != nil {
  342. klog.V(1).Infof("Warning, no Node cost model available: " + err.Error())
  343. return nil, err
  344. }
  345. pvClaimMapping, err := getPVInfoVector(resultPVRequests)
  346. if err != nil {
  347. klog.Infof("Unable to get PV Data: %s", err.Error())
  348. }
  349. if pvClaimMapping != nil {
  350. err = addPVData(clientset, pvClaimMapping, cloud)
  351. if err != nil {
  352. return nil, err
  353. }
  354. }
  355. containerNameCost := make(map[string]*CostData)
  356. containers := make(map[string]bool)
  357. RAMReqMap, err := getContainerMetricVector(resultRAMRequests, true, normalizationValue)
  358. if err != nil {
  359. return nil, err
  360. }
  361. for key := range RAMReqMap {
  362. containers[key] = true
  363. }
  364. RAMUsedMap, err := getContainerMetricVector(resultRAMUsage, true, normalizationValue)
  365. if err != nil {
  366. return nil, err
  367. }
  368. for key := range RAMUsedMap {
  369. containers[key] = true
  370. }
  371. CPUReqMap, err := getContainerMetricVector(resultCPURequests, true, normalizationValue)
  372. if err != nil {
  373. return nil, err
  374. }
  375. for key := range CPUReqMap {
  376. containers[key] = true
  377. }
  378. GPUReqMap, err := getContainerMetricVector(resultGPURequests, true, normalizationValue)
  379. if err != nil {
  380. return nil, err
  381. }
  382. for key := range GPUReqMap {
  383. containers[key] = true
  384. }
  385. CPUUsedMap, err := getContainerMetricVector(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
  386. if err != nil {
  387. return nil, err
  388. }
  389. for key := range CPUUsedMap {
  390. containers[key] = true
  391. }
  392. currentContainers := make(map[string]v1.Pod)
  393. for _, pod := range podlist {
  394. if pod.Status.Phase != "Running" {
  395. continue
  396. }
  397. cs, err := newContainerMetricsFromPod(*pod)
  398. if err != nil {
  399. return nil, err
  400. }
  401. for _, c := range cs {
  402. containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
  403. currentContainers[c.Key()] = *pod
  404. }
  405. }
  406. missingNodes := make(map[string]*costAnalyzerCloud.Node)
  407. for key := range containers {
  408. if _, ok := containerNameCost[key]; ok {
  409. continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
  410. }
  411. if pod, ok := currentContainers[key]; ok {
  412. podName := pod.GetObjectMeta().GetName()
  413. ns := pod.GetObjectMeta().GetNamespace()
  414. nsLabels := namespaceLabelsMapping[ns]
  415. podLabels := pod.GetObjectMeta().GetLabels()
  416. nodeName := pod.Spec.NodeName
  417. var nodeData *costAnalyzerCloud.Node
  418. if _, ok := nodes[nodeName]; ok {
  419. nodeData = nodes[nodeName]
  420. }
  421. var podDeployments []string
  422. if _, ok := podDeploymentsMapping[ns]; ok {
  423. if ds, ok := podDeploymentsMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  424. podDeployments = ds
  425. } else {
  426. podDeployments = []string{}
  427. }
  428. }
  429. var podPVs []*PersistentVolumeClaimData
  430. podClaims := pod.Spec.Volumes
  431. for _, vol := range podClaims {
  432. if vol.PersistentVolumeClaim != nil {
  433. name := vol.PersistentVolumeClaim.ClaimName
  434. if pvClaim, ok := pvClaimMapping[ns+","+name]; ok {
  435. podPVs = append(podPVs, pvClaim)
  436. }
  437. }
  438. }
  439. var podServices []string
  440. if _, ok := podServicesMapping[ns]; ok {
  441. if svcs, ok := podServicesMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  442. podServices = svcs
  443. } else {
  444. podServices = []string{}
  445. }
  446. }
  447. for i, container := range pod.Spec.Containers {
  448. containerName := container.Name
  449. // recreate the key and look up data for this container
  450. newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName).Key()
  451. RAMReqV, ok := RAMReqMap[newKey]
  452. if !ok {
  453. klog.V(4).Info("no RAM requests for " + newKey)
  454. RAMReqV = []*Vector{&Vector{}}
  455. }
  456. RAMUsedV, ok := RAMUsedMap[newKey]
  457. if !ok {
  458. klog.V(4).Info("no RAM usage for " + newKey)
  459. RAMUsedV = []*Vector{&Vector{}}
  460. }
  461. CPUReqV, ok := CPUReqMap[newKey]
  462. if !ok {
  463. klog.V(4).Info("no CPU requests for " + newKey)
  464. CPUReqV = []*Vector{&Vector{}}
  465. }
  466. GPUReqV, ok := GPUReqMap[newKey]
  467. if !ok {
  468. klog.V(4).Info("no GPU requests for " + newKey)
  469. GPUReqV = []*Vector{&Vector{}}
  470. }
  471. CPUUsedV, ok := CPUUsedMap[newKey]
  472. if !ok {
  473. klog.V(4).Info("no CPU usage for " + newKey)
  474. CPUUsedV = []*Vector{&Vector{}}
  475. }
  476. var pvReq []*PersistentVolumeClaimData
  477. if i == 0 { // avoid duplicating by just assigning all claims to the first container.
  478. pvReq = podPVs
  479. }
  480. costs := &CostData{
  481. Name: containerName,
  482. PodName: podName,
  483. NodeName: nodeName,
  484. Namespace: ns,
  485. Deployments: podDeployments,
  486. Services: podServices,
  487. Daemonsets: getDaemonsetsOfPod(pod),
  488. Jobs: getJobsOfPod(pod),
  489. Statefulsets: getStatefulSetsOfPod(pod),
  490. NodeData: nodeData,
  491. RAMReq: RAMReqV,
  492. RAMUsed: RAMUsedV,
  493. CPUReq: CPUReqV,
  494. CPUUsed: CPUUsedV,
  495. GPUReq: GPUReqV,
  496. PVCData: pvReq,
  497. Labels: podLabels,
  498. NamespaceLabels: nsLabels,
  499. }
  500. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  501. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  502. if filterNamespace == "" {
  503. containerNameCost[newKey] = costs
  504. } else if costs.Namespace == filterNamespace {
  505. containerNameCost[newKey] = costs
  506. }
  507. }
  508. } else {
  509. // The container has been deleted. Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
  510. klog.V(4).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
  511. c, err := NewContainerMetricFromKey(key)
  512. if err != nil {
  513. return nil, err
  514. }
  515. RAMReqV, ok := RAMReqMap[key]
  516. if !ok {
  517. klog.V(4).Info("no RAM requests for " + key)
  518. RAMReqV = []*Vector{&Vector{}}
  519. }
  520. RAMUsedV, ok := RAMUsedMap[key]
  521. if !ok {
  522. klog.V(4).Info("no RAM usage for " + key)
  523. RAMUsedV = []*Vector{&Vector{}}
  524. }
  525. CPUReqV, ok := CPUReqMap[key]
  526. if !ok {
  527. klog.V(4).Info("no CPU requests for " + key)
  528. CPUReqV = []*Vector{&Vector{}}
  529. }
  530. GPUReqV, ok := GPUReqMap[key]
  531. if !ok {
  532. klog.V(4).Info("no GPU requests for " + key)
  533. GPUReqV = []*Vector{&Vector{}}
  534. }
  535. CPUUsedV, ok := CPUUsedMap[key]
  536. if !ok {
  537. klog.V(4).Info("no CPU usage for " + key)
  538. CPUUsedV = []*Vector{&Vector{}}
  539. }
  540. node, ok := nodes[c.NodeName]
  541. if !ok {
  542. klog.V(2).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
  543. if n, ok := missingNodes[c.NodeName]; ok {
  544. node = n
  545. } else {
  546. node = &costAnalyzerCloud.Node{}
  547. missingNodes[c.NodeName] = node
  548. }
  549. }
  550. costs := &CostData{
  551. Name: c.ContainerName,
  552. PodName: c.PodName,
  553. NodeName: c.NodeName,
  554. NodeData: node,
  555. Namespace: c.Namespace,
  556. RAMReq: RAMReqV,
  557. RAMUsed: RAMUsedV,
  558. CPUReq: CPUReqV,
  559. CPUUsed: CPUUsedV,
  560. GPUReq: GPUReqV,
  561. }
  562. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  563. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  564. if filterNamespace == "" {
  565. containerNameCost[key] = costs
  566. } else if costs.Namespace == filterNamespace {
  567. containerNameCost[key] = costs
  568. }
  569. }
  570. }
  571. err = findDeletedNodeInfo(cli, missingNodes, window)
  572. if err != nil {
  573. return nil, err
  574. }
  575. return containerNameCost, err
  576. }
  577. func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*costAnalyzerCloud.Node, window string) error {
  578. if len(missingNodes) > 0 {
  579. q := make([]string, 0, len(missingNodes))
  580. for nodename := range missingNodes {
  581. klog.V(3).Infof("Finding data for deleted node %v", nodename)
  582. q = append(q, nodename)
  583. }
  584. l := strings.Join(q, "|")
  585. queryHistoricalCPUCost := fmt.Sprintf(`avg_over_time(node_cpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
  586. queryHistoricalRAMCost := fmt.Sprintf(`avg_over_time(node_ram_hourly_cost{instance=~"%s"}[%s])`, l, window)
  587. queryHistoricalGPUCost := fmt.Sprintf(`avg_over_time(node_gpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
  588. cpuCostResult, err := query(cli, queryHistoricalCPUCost)
  589. if err != nil {
  590. return fmt.Errorf("Error fetching cpu cost data: " + err.Error())
  591. }
  592. ramCostResult, err := query(cli, queryHistoricalRAMCost)
  593. if err != nil {
  594. return fmt.Errorf("Error fetching ram cost data: " + err.Error())
  595. }
  596. gpuCostResult, err := query(cli, queryHistoricalGPUCost)
  597. if err != nil {
  598. return fmt.Errorf("Error fetching gpu cost data: " + err.Error())
  599. }
  600. cpuCosts, err := getCost(cpuCostResult)
  601. if err != nil {
  602. return err
  603. }
  604. ramCosts, err := getCost(ramCostResult)
  605. if err != nil {
  606. return err
  607. }
  608. gpuCosts, err := getCost(gpuCostResult)
  609. if err != nil {
  610. return err
  611. }
  612. if len(cpuCosts) == 0 {
  613. klog.V(1).Infof("Historical data for node prices not available. Ingest this server's /metrics endpoint to get that data.")
  614. }
  615. for node, costv := range cpuCosts {
  616. if _, ok := missingNodes[node]; ok {
  617. missingNodes[node].VCPUCost = fmt.Sprintf("%f", costv[0].Value)
  618. }
  619. }
  620. for node, costv := range ramCosts {
  621. if _, ok := missingNodes[node]; ok {
  622. missingNodes[node].RAMCost = fmt.Sprintf("%f", costv[0].Value)
  623. }
  624. }
  625. for node, costv := range gpuCosts {
  626. if _, ok := missingNodes[node]; ok {
  627. missingNodes[node].GPUCost = fmt.Sprintf("%f", costv[0].Value)
  628. }
  629. }
  630. }
  631. return nil
  632. }
  633. func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
  634. if req == nil || len(req) == 0 {
  635. for _, usedV := range used {
  636. if usedV.Timestamp == 0 {
  637. continue
  638. }
  639. usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
  640. }
  641. return used
  642. }
  643. if used == nil || len(used) == 0 {
  644. for _, reqV := range req {
  645. if reqV.Timestamp == 0 {
  646. continue
  647. }
  648. reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
  649. }
  650. return req
  651. }
  652. var allocation []*Vector
  653. var timestamps []float64
  654. reqMap := make(map[float64]float64)
  655. for _, reqV := range req {
  656. if reqV.Timestamp == 0 {
  657. continue
  658. }
  659. reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
  660. reqMap[reqV.Timestamp] = reqV.Value
  661. timestamps = append(timestamps, reqV.Timestamp)
  662. }
  663. usedMap := make(map[float64]float64)
  664. for _, usedV := range used {
  665. if usedV.Timestamp == 0 {
  666. continue
  667. }
  668. usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
  669. usedMap[usedV.Timestamp] = usedV.Value
  670. if _, ok := reqMap[usedV.Timestamp]; !ok { // no need to double add, since we'll range over sorted timestamps and check.
  671. timestamps = append(timestamps, usedV.Timestamp)
  672. }
  673. }
  674. sort.Float64s(timestamps)
  675. for _, t := range timestamps {
  676. rv, okR := reqMap[t]
  677. uv, okU := usedMap[t]
  678. allocationVector := &Vector{
  679. Timestamp: t,
  680. }
  681. if okR && okU {
  682. allocationVector.Value = math.Max(rv, uv)
  683. } else if okR {
  684. allocationVector.Value = rv
  685. } else if okU {
  686. allocationVector.Value = uv
  687. }
  688. allocation = append(allocation, allocationVector)
  689. }
  690. return allocation
  691. }
  692. func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
  693. storageClasses, err := clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
  694. if err != nil {
  695. return err
  696. }
  697. storageClassMap := make(map[string]map[string]string)
  698. for _, storageClass := range storageClasses.Items {
  699. params := storageClass.Parameters
  700. storageClassMap[storageClass.ObjectMeta.Name] = params
  701. }
  702. pvs, err := clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
  703. if err != nil {
  704. return err
  705. }
  706. pvMap := make(map[string]*costAnalyzerCloud.PV)
  707. for _, pv := range pvs.Items {
  708. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  709. if !ok {
  710. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  711. }
  712. cacPv := &costAnalyzerCloud.PV{
  713. Class: pv.Spec.StorageClassName,
  714. Region: pv.Labels[v1.LabelZoneRegion],
  715. Parameters: parameters,
  716. }
  717. err := GetPVCost(cacPv, &pv, cloud)
  718. if err != nil {
  719. return err
  720. }
  721. pvMap[pv.Name] = cacPv
  722. }
  723. for _, pvc := range pvClaimMapping {
  724. pvc.Volume = pvMap[pvc.VolumeName]
  725. }
  726. return nil
  727. }
  728. func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cloud costAnalyzerCloud.Provider) error {
  729. cfg, err := cloud.GetConfig()
  730. key := cloud.GetPVKey(kpv, pv.Parameters)
  731. pvWithCost, err := cloud.PVPricing(key)
  732. if err != nil {
  733. return err
  734. }
  735. if pvWithCost == nil || pvWithCost.Cost == "" {
  736. pv.Cost = cfg.Storage
  737. return nil // set default cost
  738. }
  739. pv.Cost = pvWithCost.Cost
  740. return nil
  741. }
  742. func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
  743. cfg, err := cloud.GetConfig()
  744. if err != nil {
  745. return nil, err
  746. }
  747. nodeList, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  748. if err != nil {
  749. return nil, err
  750. }
  751. nodes := make(map[string]*costAnalyzerCloud.Node)
  752. for _, n := range nodeList.Items {
  753. name := n.GetObjectMeta().GetName()
  754. nodeLabels := n.GetObjectMeta().GetLabels()
  755. nodeLabels["providerID"] = n.Spec.ProviderID
  756. cnode, err := cloud.NodePricing(cloud.GetKey(nodeLabels))
  757. if err != nil {
  758. klog.V(1).Infof("Error getting node. Error: " + err.Error())
  759. nodes[name] = cnode
  760. continue
  761. }
  762. newCnode := *cnode
  763. var cpu float64
  764. if newCnode.VCPU == "" {
  765. cpu = float64(n.Status.Capacity.Cpu().Value())
  766. newCnode.VCPU = n.Status.Capacity.Cpu().String()
  767. } else {
  768. cpu, _ = strconv.ParseFloat(newCnode.VCPU, 64)
  769. }
  770. var ram float64
  771. if newCnode.RAM == "" {
  772. newCnode.RAM = n.Status.Capacity.Memory().String()
  773. }
  774. ram = float64(n.Status.Capacity.Memory().Value())
  775. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  776. if newCnode.GPU != "" && newCnode.GPUCost == "" { // We couldn't find a gpu cost, so fix cpu and ram, then accordingly
  777. klog.V(4).Infof("GPU without cost found for %s, calculating...", cloud.GetKey(nodeLabels).Features())
  778. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  779. if err != nil {
  780. klog.V(3).Infof("Could not parse default cpu price")
  781. return nil, err
  782. }
  783. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  784. if err != nil {
  785. klog.V(3).Infof("Could not parse default ram price")
  786. return nil, err
  787. }
  788. defaultGPU, err := strconv.ParseFloat(cfg.RAM, 64)
  789. if err != nil {
  790. klog.V(3).Infof("Could not parse default gpu price")
  791. return nil, err
  792. }
  793. cpuToRAMRatio := defaultCPU / defaultRAM
  794. gpuToRAMRatio := defaultGPU / defaultRAM
  795. ramGB := ram / 1024 / 1024 / 1024
  796. ramMultiple := gpuToRAMRatio + cpu*cpuToRAMRatio + ramGB
  797. var nodePrice float64
  798. if newCnode.Cost != "" {
  799. nodePrice, err = strconv.ParseFloat(newCnode.Cost, 64)
  800. if err != nil {
  801. klog.V(3).Infof("Could not parse total node price")
  802. return nil, err
  803. }
  804. } else {
  805. nodePrice, err = strconv.ParseFloat(newCnode.VCPUCost, 64) // all the price was allocated the the CPU
  806. if err != nil {
  807. klog.V(3).Infof("Could not parse node vcpu price")
  808. return nil, err
  809. }
  810. }
  811. ramPrice := (nodePrice / ramMultiple)
  812. cpuPrice := ramPrice * cpuToRAMRatio
  813. gpuPrice := ramPrice * gpuToRAMRatio
  814. newCnode.VCPUCost = fmt.Sprintf("%f", cpuPrice)
  815. newCnode.RAMCost = fmt.Sprintf("%f", ramPrice)
  816. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  817. newCnode.GPUCost = fmt.Sprintf("%f", gpuPrice)
  818. } else {
  819. if newCnode.RAMCost == "" { // We couldn't find a ramcost, so fix cpu and allocate ram accordingly
  820. klog.V(4).Infof("No RAM cost found for %s, calculating...", cloud.GetKey(nodeLabels).Features())
  821. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  822. if err != nil {
  823. klog.V(3).Infof("Could not parse default cpu price")
  824. return nil, err
  825. }
  826. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  827. if err != nil {
  828. klog.V(3).Infof("Could not parse default ram price")
  829. return nil, err
  830. }
  831. cpuToRAMRatio := defaultCPU / defaultRAM
  832. ramGB := ram / 1024 / 1024 / 1024
  833. ramMultiple := cpu*cpuToRAMRatio + ramGB
  834. var nodePrice float64
  835. if newCnode.Cost != "" {
  836. nodePrice, err = strconv.ParseFloat(newCnode.Cost, 64)
  837. if err != nil {
  838. klog.V(3).Infof("Could not parse total node price")
  839. return nil, err
  840. }
  841. } else {
  842. nodePrice, err = strconv.ParseFloat(newCnode.VCPUCost, 64) // all the price was allocated the the CPU
  843. if err != nil {
  844. klog.V(3).Infof("Could not parse node vcpu price")
  845. return nil, err
  846. }
  847. }
  848. ramPrice := (nodePrice / ramMultiple)
  849. cpuPrice := ramPrice * cpuToRAMRatio
  850. newCnode.VCPUCost = fmt.Sprintf("%f", cpuPrice)
  851. newCnode.RAMCost = fmt.Sprintf("%f", ramPrice)
  852. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  853. klog.V(4).Infof("Computed \"%s\" RAM Cost := %v", name, newCnode.RAMCost)
  854. }
  855. }
  856. nodes[name] = &newCnode
  857. }
  858. return nodes, nil
  859. }
  860. func getPodServices(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
  861. servicesList, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
  862. if err != nil {
  863. return nil, err
  864. }
  865. podServicesMapping := make(map[string]map[string][]string)
  866. for _, service := range servicesList.Items {
  867. namespace := service.GetObjectMeta().GetNamespace()
  868. name := service.GetObjectMeta().GetName()
  869. if _, ok := podServicesMapping[namespace]; !ok {
  870. podServicesMapping[namespace] = make(map[string][]string)
  871. }
  872. s := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
  873. for _, pod := range podList {
  874. labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
  875. if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
  876. services, ok := podServicesMapping[namespace][pod.GetObjectMeta().GetName()]
  877. if ok {
  878. podServicesMapping[namespace][pod.GetObjectMeta().GetName()] = append(services, name)
  879. } else {
  880. podServicesMapping[namespace][pod.GetObjectMeta().GetName()] = []string{name}
  881. }
  882. }
  883. }
  884. }
  885. return podServicesMapping, nil
  886. }
  887. func getPodDeployments(clientset kubernetes.Interface, podList []*v1.Pod) (map[string]map[string][]string, error) {
  888. deploymentsList, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
  889. if err != nil {
  890. return nil, err
  891. }
  892. podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
  893. for _, deployment := range deploymentsList.Items {
  894. namespace := deployment.GetObjectMeta().GetNamespace()
  895. name := deployment.GetObjectMeta().GetName()
  896. if _, ok := podDeploymentsMapping[namespace]; !ok {
  897. podDeploymentsMapping[namespace] = make(map[string][]string)
  898. }
  899. s, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
  900. if err != nil {
  901. klog.V(2).Infof("Error doing deployment label conversion: " + err.Error())
  902. }
  903. for _, pod := range podList {
  904. labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
  905. if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
  906. deployments, ok := podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()]
  907. if ok {
  908. podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()] = append(deployments, name)
  909. } else {
  910. podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()] = []string{name}
  911. }
  912. }
  913. }
  914. }
  915. return podDeploymentsMapping, nil
  916. }
  917. func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider,
  918. startString, endString, windowString string, filterNamespace string) (map[string]*CostData, error) {
  919. queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
  920. queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
  921. queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
  922. queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
  923. queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "")
  924. queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
  925. normalization := fmt.Sprintf(normalizationStr, windowString, "")
  926. layout := "2006-01-02T15:04:05.000Z"
  927. start, err := time.Parse(layout, startString)
  928. if err != nil {
  929. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  930. return nil, err
  931. }
  932. end, err := time.Parse(layout, endString)
  933. if err != nil {
  934. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  935. return nil, err
  936. }
  937. window, err := time.ParseDuration(windowString)
  938. if err != nil {
  939. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  940. return nil, err
  941. }
  942. var wg sync.WaitGroup
  943. wg.Add(8)
  944. var promErr error
  945. var resultRAMRequests interface{}
  946. go func() {
  947. resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
  948. defer wg.Done()
  949. }()
  950. var resultRAMUsage interface{}
  951. go func() {
  952. resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
  953. defer wg.Done()
  954. }()
  955. var resultCPURequests interface{}
  956. go func() {
  957. resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
  958. defer wg.Done()
  959. }()
  960. var resultCPUUsage interface{}
  961. go func() {
  962. resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
  963. defer wg.Done()
  964. }()
  965. var resultGPURequests interface{}
  966. go func() {
  967. resultGPURequests, promErr = QueryRange(cli, queryGPURequests, start, end, window)
  968. defer wg.Done()
  969. }()
  970. var resultPVRequests interface{}
  971. go func() {
  972. resultPVRequests, promErr = QueryRange(cli, queryPVRequests, start, end, window)
  973. defer wg.Done()
  974. }()
  975. var normalizationResult interface{}
  976. go func() {
  977. normalizationResult, promErr = query(cli, normalization)
  978. defer wg.Done()
  979. }()
  980. podDeploymentsMapping := make(map[string]map[string][]string)
  981. podServicesMapping := make(map[string]map[string][]string)
  982. namespaceLabelsMapping := make(map[string]map[string]string)
  983. podlist := cm.Controller.GetAll()
  984. var k8sErr error
  985. go func() {
  986. podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
  987. if k8sErr != nil {
  988. return
  989. }
  990. podServicesMapping, k8sErr = getPodServices(clientset, podlist)
  991. if k8sErr != nil {
  992. return
  993. }
  994. namespaceLabelsMapping, k8sErr = getNamespaceLabels(clientset)
  995. if k8sErr != nil {
  996. return
  997. }
  998. wg.Done()
  999. }()
  1000. wg.Wait()
  1001. if promErr != nil {
  1002. return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
  1003. }
  1004. if k8sErr != nil {
  1005. return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
  1006. }
  1007. normalizationValue, err := getNormalization(normalizationResult)
  1008. if err != nil {
  1009. return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
  1010. }
  1011. nodes, err := getNodeCost(clientset, cloud)
  1012. if err != nil {
  1013. klog.V(1).Infof("Warning, no cost model available: " + err.Error())
  1014. return nil, err
  1015. }
  1016. pvClaimMapping, err := getPVInfoVectors(resultPVRequests)
  1017. if err != nil {
  1018. // Just log for compatibility with KSM less than 1.6
  1019. klog.Infof("Unable to get PV Data: %s", err.Error())
  1020. }
  1021. if pvClaimMapping != nil {
  1022. err = addPVData(clientset, pvClaimMapping, cloud)
  1023. if err != nil {
  1024. return nil, err
  1025. }
  1026. }
  1027. containerNameCost := make(map[string]*CostData)
  1028. containers := make(map[string]bool)
  1029. RAMReqMap, err := GetContainerMetricVectors(resultRAMRequests, true, normalizationValue)
  1030. if err != nil {
  1031. return nil, err
  1032. }
  1033. for key := range RAMReqMap {
  1034. containers[key] = true
  1035. }
  1036. RAMUsedMap, err := GetContainerMetricVectors(resultRAMUsage, true, normalizationValue)
  1037. if err != nil {
  1038. return nil, err
  1039. }
  1040. for key := range RAMUsedMap {
  1041. containers[key] = true
  1042. }
  1043. CPUReqMap, err := GetContainerMetricVectors(resultCPURequests, true, normalizationValue)
  1044. if err != nil {
  1045. return nil, err
  1046. }
  1047. for key := range CPUReqMap {
  1048. containers[key] = true
  1049. }
  1050. GPUReqMap, err := GetContainerMetricVectors(resultGPURequests, true, normalizationValue)
  1051. if err != nil {
  1052. return nil, err
  1053. }
  1054. for key := range GPUReqMap {
  1055. containers[key] = true
  1056. }
  1057. CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
  1058. if err != nil {
  1059. return nil, err
  1060. }
  1061. for key := range CPUUsedMap {
  1062. containers[key] = true
  1063. }
  1064. currentContainers := make(map[string]v1.Pod)
  1065. for _, pod := range podlist {
  1066. if pod.Status.Phase != "Running" {
  1067. continue
  1068. }
  1069. cs, err := newContainerMetricsFromPod(*pod)
  1070. if err != nil {
  1071. return nil, err
  1072. }
  1073. for _, c := range cs {
  1074. containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
  1075. currentContainers[c.Key()] = *pod
  1076. }
  1077. }
  1078. missingNodes := make(map[string]*costAnalyzerCloud.Node)
  1079. for key := range containers {
  1080. if _, ok := containerNameCost[key]; ok {
  1081. continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
  1082. }
  1083. if pod, ok := currentContainers[key]; ok {
  1084. podName := pod.GetObjectMeta().GetName()
  1085. ns := pod.GetObjectMeta().GetNamespace()
  1086. podLabels := pod.GetObjectMeta().GetLabels()
  1087. nodeName := pod.Spec.NodeName
  1088. var nodeData *costAnalyzerCloud.Node
  1089. if _, ok := nodes[nodeName]; ok {
  1090. nodeData = nodes[nodeName]
  1091. }
  1092. var podDeployments []string
  1093. if _, ok := podDeploymentsMapping[ns]; ok {
  1094. if ds, ok := podDeploymentsMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  1095. podDeployments = ds
  1096. } else {
  1097. podDeployments = []string{}
  1098. }
  1099. }
  1100. var podPVs []*PersistentVolumeClaimData
  1101. podClaims := pod.Spec.Volumes
  1102. for _, vol := range podClaims {
  1103. if vol.PersistentVolumeClaim != nil {
  1104. name := vol.PersistentVolumeClaim.ClaimName
  1105. if pvClaim, ok := pvClaimMapping[ns+","+name]; ok {
  1106. podPVs = append(podPVs, pvClaim)
  1107. }
  1108. }
  1109. }
  1110. var podServices []string
  1111. if _, ok := podServicesMapping[ns]; ok {
  1112. if svcs, ok := podServicesMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  1113. podServices = svcs
  1114. } else {
  1115. podServices = []string{}
  1116. }
  1117. }
  1118. nsLabels := namespaceLabelsMapping[ns]
  1119. for i, container := range pod.Spec.Containers {
  1120. containerName := container.Name
  1121. newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName).Key()
  1122. RAMReqV, ok := RAMReqMap[newKey]
  1123. if !ok {
  1124. klog.V(4).Info("no RAM requests for " + newKey)
  1125. RAMReqV = []*Vector{}
  1126. }
  1127. RAMUsedV, ok := RAMUsedMap[newKey]
  1128. if !ok {
  1129. klog.V(4).Info("no RAM usage for " + newKey)
  1130. RAMUsedV = []*Vector{}
  1131. }
  1132. CPUReqV, ok := CPUReqMap[newKey]
  1133. if !ok {
  1134. klog.V(4).Info("no CPU requests for " + newKey)
  1135. CPUReqV = []*Vector{}
  1136. }
  1137. GPUReqV, ok := GPUReqMap[newKey]
  1138. if !ok {
  1139. klog.V(4).Info("no GPU requests for " + newKey)
  1140. GPUReqV = []*Vector{}
  1141. }
  1142. CPUUsedV, ok := CPUUsedMap[newKey]
  1143. if !ok {
  1144. klog.V(4).Info("no CPU usage for " + newKey)
  1145. CPUUsedV = []*Vector{}
  1146. }
  1147. var pvReq []*PersistentVolumeClaimData
  1148. if i == 0 { // avoid duplicating by just assigning all claims to the first container.
  1149. pvReq = podPVs
  1150. }
  1151. costs := &CostData{
  1152. Name: containerName,
  1153. PodName: podName,
  1154. NodeName: nodeName,
  1155. Namespace: ns,
  1156. Deployments: podDeployments,
  1157. Services: podServices,
  1158. Daemonsets: getDaemonsetsOfPod(pod),
  1159. Jobs: getJobsOfPod(pod),
  1160. Statefulsets: getStatefulSetsOfPod(pod),
  1161. NodeData: nodeData,
  1162. RAMReq: RAMReqV,
  1163. RAMUsed: RAMUsedV,
  1164. CPUReq: CPUReqV,
  1165. CPUUsed: CPUUsedV,
  1166. GPUReq: GPUReqV,
  1167. PVCData: pvReq,
  1168. Labels: podLabels,
  1169. NamespaceLabels: nsLabels,
  1170. }
  1171. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  1172. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  1173. if filterNamespace == "" {
  1174. containerNameCost[newKey] = costs
  1175. } else if costs.Namespace == filterNamespace {
  1176. containerNameCost[newKey] = costs
  1177. }
  1178. }
  1179. } else {
  1180. // The container has been deleted. Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
  1181. klog.V(4).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
  1182. c, _ := NewContainerMetricFromKey(key)
  1183. RAMReqV, ok := RAMReqMap[key]
  1184. if !ok {
  1185. klog.V(4).Info("no RAM requests for " + key)
  1186. RAMReqV = []*Vector{}
  1187. }
  1188. RAMUsedV, ok := RAMUsedMap[key]
  1189. if !ok {
  1190. klog.V(4).Info("no RAM usage for " + key)
  1191. RAMUsedV = []*Vector{}
  1192. }
  1193. CPUReqV, ok := CPUReqMap[key]
  1194. if !ok {
  1195. klog.V(4).Info("no CPU requests for " + key)
  1196. CPUReqV = []*Vector{}
  1197. }
  1198. GPUReqV, ok := GPUReqMap[key]
  1199. if !ok {
  1200. klog.V(4).Info("no GPU requests for " + key)
  1201. GPUReqV = []*Vector{}
  1202. }
  1203. CPUUsedV, ok := CPUUsedMap[key]
  1204. if !ok {
  1205. klog.V(4).Info("no CPU usage for " + key)
  1206. CPUUsedV = []*Vector{}
  1207. }
  1208. node, ok := nodes[c.NodeName]
  1209. if !ok {
  1210. klog.V(2).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
  1211. if n, ok := missingNodes[c.NodeName]; ok {
  1212. node = n
  1213. } else {
  1214. node = &costAnalyzerCloud.Node{}
  1215. missingNodes[c.NodeName] = node
  1216. }
  1217. }
  1218. costs := &CostData{
  1219. Name: c.ContainerName,
  1220. PodName: c.PodName,
  1221. NodeName: c.NodeName,
  1222. NodeData: node,
  1223. Namespace: c.Namespace,
  1224. RAMReq: RAMReqV,
  1225. RAMUsed: RAMUsedV,
  1226. CPUReq: CPUReqV,
  1227. CPUUsed: CPUUsedV,
  1228. GPUReq: GPUReqV,
  1229. }
  1230. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  1231. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  1232. if filterNamespace == "" {
  1233. containerNameCost[key] = costs
  1234. } else if costs.Namespace == filterNamespace {
  1235. containerNameCost[key] = costs
  1236. }
  1237. }
  1238. }
  1239. w := end.Sub(start)
  1240. if w.Minutes() > 0 {
  1241. wStr := fmt.Sprintf("%dm", int(w.Minutes()))
  1242. err = findDeletedNodeInfo(cli, missingNodes, wStr)
  1243. if err != nil {
  1244. return nil, err
  1245. }
  1246. }
  1247. return containerNameCost, err
  1248. }
  1249. func getNamespaceLabels(clientset kubernetes.Interface) (map[string]map[string]string, error) {
  1250. nsToLabels := make(map[string]map[string]string)
  1251. nss, err := clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
  1252. if err != nil {
  1253. return nil, err
  1254. }
  1255. for _, ns := range nss.Items {
  1256. nsToLabels[ns.Name] = ns.Labels
  1257. }
  1258. return nsToLabels, nil
  1259. }
  1260. func getDaemonsetsOfPod(pod v1.Pod) []string {
  1261. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  1262. if ownerReference.Kind == "DaemonSet" {
  1263. return []string{ownerReference.Name}
  1264. }
  1265. }
  1266. return []string{}
  1267. }
  1268. func getJobsOfPod(pod v1.Pod) []string {
  1269. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  1270. if ownerReference.Kind == "Job" {
  1271. return []string{ownerReference.Name}
  1272. }
  1273. }
  1274. return []string{}
  1275. }
  1276. func getStatefulSetsOfPod(pod v1.Pod) []string {
  1277. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  1278. if ownerReference.Kind == "StatefulSet" {
  1279. return []string{ownerReference.Name}
  1280. }
  1281. }
  1282. return []string{}
  1283. }
  1284. type PersistentVolumeClaimData struct {
  1285. Class string `json:"class"`
  1286. Claim string `json:"claim"`
  1287. Namespace string `json:"namespace"`
  1288. VolumeName string `json:"volumeName"`
  1289. Volume *costAnalyzerCloud.PV `json:"persistentVolume"`
  1290. Values []*Vector `json:"values"`
  1291. }
  1292. func getCost(qr interface{}) (map[string][]*Vector, error) {
  1293. toReturn := make(map[string][]*Vector)
  1294. for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
  1295. metricInterface, ok := val.(map[string]interface{})["metric"]
  1296. if !ok {
  1297. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  1298. }
  1299. metricMap, ok := metricInterface.(map[string]interface{})
  1300. if !ok {
  1301. return nil, fmt.Errorf("Metric field is improperly formatted")
  1302. }
  1303. instance, ok := metricMap["instance"]
  1304. if !ok {
  1305. return nil, fmt.Errorf("Instance field does not exist in data result vector")
  1306. }
  1307. instanceStr, ok := instance.(string)
  1308. if !ok {
  1309. return nil, fmt.Errorf("Instance is improperly formatted")
  1310. }
  1311. dataPoint, ok := val.(map[string]interface{})["value"]
  1312. if !ok {
  1313. return nil, fmt.Errorf("Value field does not exist in data result vector")
  1314. }
  1315. value, ok := dataPoint.([]interface{})
  1316. if !ok || len(value) != 2 {
  1317. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1318. }
  1319. var vectors []*Vector
  1320. strVal := value[1].(string)
  1321. v, _ := strconv.ParseFloat(strVal, 64)
  1322. vectors = append(vectors, &Vector{
  1323. Timestamp: value[0].(float64),
  1324. Value: v,
  1325. })
  1326. toReturn[instanceStr] = vectors
  1327. }
  1328. return toReturn, nil
  1329. }
  1330. func getPVInfoVectors(qr interface{}) (map[string]*PersistentVolumeClaimData, error) {
  1331. pvmap := make(map[string]*PersistentVolumeClaimData)
  1332. data, ok := qr.(map[string]interface{})["data"]
  1333. if !ok {
  1334. e, err := wrapPrometheusError(qr)
  1335. if err != nil {
  1336. return nil, err
  1337. }
  1338. return nil, fmt.Errorf(e)
  1339. }
  1340. d, ok := data.(map[string]interface{})
  1341. if !ok {
  1342. return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
  1343. }
  1344. result, ok := d["result"]
  1345. if !ok {
  1346. return nil, fmt.Errorf("Result field not present in prometheus response")
  1347. }
  1348. results, ok := result.([]interface{})
  1349. if !ok {
  1350. return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
  1351. }
  1352. for _, val := range results {
  1353. metricInterface, ok := val.(map[string]interface{})["metric"]
  1354. if !ok {
  1355. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  1356. }
  1357. metricMap, ok := metricInterface.(map[string]interface{})
  1358. if !ok {
  1359. return nil, fmt.Errorf("Metric field is improperly formatted")
  1360. }
  1361. pvclaim, ok := metricMap["persistentvolumeclaim"]
  1362. if !ok {
  1363. return nil, fmt.Errorf("Claim field does not exist in data result vector")
  1364. }
  1365. pvclaimStr, ok := pvclaim.(string)
  1366. if !ok {
  1367. return nil, fmt.Errorf("Claim field improperly formatted")
  1368. }
  1369. pvnamespace, ok := metricMap["namespace"]
  1370. if !ok {
  1371. return nil, fmt.Errorf("Namespace field does not exist in data result vector")
  1372. }
  1373. pvnamespaceStr, ok := pvnamespace.(string)
  1374. if !ok {
  1375. return nil, fmt.Errorf("Namespace field improperly formatted")
  1376. }
  1377. pv, ok := metricMap["volumename"]
  1378. if !ok {
  1379. klog.V(3).Infof("Warning: Unfulfilled claim %s: volumename field does not exist in data result vector", pvclaimStr)
  1380. pv = ""
  1381. }
  1382. pvStr, ok := pv.(string)
  1383. if !ok {
  1384. return nil, fmt.Errorf("Volumename field improperly formatted")
  1385. }
  1386. pvclass, ok := metricMap["storageclass"]
  1387. if !ok { // TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
  1388. klog.V(2).Infof("Storage Class not found for claim \"%s/%s\".", pvnamespaceStr, pvclaimStr)
  1389. pvclass = ""
  1390. }
  1391. pvclassStr, ok := pvclass.(string)
  1392. if !ok {
  1393. return nil, fmt.Errorf("StorageClass field improperly formatted")
  1394. }
  1395. values, ok := val.(map[string]interface{})["values"].([]interface{})
  1396. if !ok {
  1397. return nil, fmt.Errorf("Values field is improperly formatted")
  1398. }
  1399. var vectors []*Vector
  1400. for _, value := range values {
  1401. dataPoint, ok := value.([]interface{})
  1402. if !ok || len(dataPoint) != 2 {
  1403. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1404. }
  1405. strVal := dataPoint[1].(string)
  1406. v, _ := strconv.ParseFloat(strVal, 64)
  1407. vectors = append(vectors, &Vector{
  1408. Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
  1409. Value: v,
  1410. })
  1411. }
  1412. key := pvnamespaceStr + "," + pvclaimStr
  1413. pvmap[key] = &PersistentVolumeClaimData{
  1414. Class: pvclassStr,
  1415. Claim: pvclaimStr,
  1416. Namespace: pvnamespaceStr,
  1417. VolumeName: pvStr,
  1418. Values: vectors,
  1419. }
  1420. }
  1421. return pvmap, nil
  1422. }
  1423. func getPVInfoVector(qr interface{}) (map[string]*PersistentVolumeClaimData, error) {
  1424. pvmap := make(map[string]*PersistentVolumeClaimData)
  1425. data, ok := qr.(map[string]interface{})["data"]
  1426. if !ok {
  1427. e, err := wrapPrometheusError(qr)
  1428. if err != nil {
  1429. return nil, err
  1430. }
  1431. return nil, fmt.Errorf(e)
  1432. }
  1433. d, ok := data.(map[string]interface{})
  1434. if !ok {
  1435. return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
  1436. }
  1437. result, ok := d["result"]
  1438. if !ok {
  1439. return nil, fmt.Errorf("Result field not present in prometheus response")
  1440. }
  1441. results, ok := result.([]interface{})
  1442. if !ok {
  1443. return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
  1444. }
  1445. for _, val := range results {
  1446. metricInterface, ok := val.(map[string]interface{})["metric"]
  1447. if !ok {
  1448. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  1449. }
  1450. metricMap, ok := metricInterface.(map[string]interface{})
  1451. if !ok {
  1452. return nil, fmt.Errorf("Metric field is improperly formatted")
  1453. }
  1454. pvclaim, ok := metricMap["persistentvolumeclaim"]
  1455. if !ok {
  1456. return nil, fmt.Errorf("Claim field does not exist in data result vector")
  1457. }
  1458. pvclaimStr, ok := pvclaim.(string)
  1459. if !ok {
  1460. return nil, fmt.Errorf("Claim field improperly formatted")
  1461. }
  1462. pvnamespace, ok := metricMap["namespace"]
  1463. if !ok {
  1464. return nil, fmt.Errorf("Namespace field does not exist in data result vector")
  1465. }
  1466. pvnamespaceStr, ok := pvnamespace.(string)
  1467. if !ok {
  1468. return nil, fmt.Errorf("Namespace field improperly formatted")
  1469. }
  1470. pv, ok := metricMap["volumename"]
  1471. if !ok {
  1472. klog.V(3).Infof("Warning: Unfulfilled claim %s: volumename field does not exist in data result vector", pvclaimStr)
  1473. pv = ""
  1474. }
  1475. pvStr, ok := pv.(string)
  1476. if !ok {
  1477. return nil, fmt.Errorf("Volumename field improperly formatted")
  1478. }
  1479. pvclass, ok := metricMap["storageclass"]
  1480. if !ok { // TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
  1481. klog.V(2).Infof("Storage Class not found for claim \"%s/%s\".", pvnamespaceStr, pvclaimStr)
  1482. pvclass = ""
  1483. }
  1484. pvclassStr, ok := pvclass.(string)
  1485. if !ok {
  1486. return nil, fmt.Errorf("StorageClass field improperly formatted")
  1487. }
  1488. dataPoint, ok := val.(map[string]interface{})["value"]
  1489. if !ok {
  1490. return nil, fmt.Errorf("Value field does not exist in data result vector")
  1491. }
  1492. value, ok := dataPoint.([]interface{})
  1493. if !ok || len(value) != 2 {
  1494. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1495. }
  1496. var vectors []*Vector
  1497. strVal := value[1].(string)
  1498. v, _ := strconv.ParseFloat(strVal, 64)
  1499. vectors = append(vectors, &Vector{
  1500. Timestamp: value[0].(float64),
  1501. Value: v,
  1502. })
  1503. key := pvnamespaceStr + "," + pvclaimStr
  1504. pvmap[key] = &PersistentVolumeClaimData{
  1505. Class: pvclassStr,
  1506. Claim: pvclaimStr,
  1507. Namespace: pvnamespaceStr,
  1508. VolumeName: pvStr,
  1509. Values: vectors,
  1510. }
  1511. }
  1512. return pvmap, nil
  1513. }
  1514. func QueryRange(cli prometheusClient.Client, query string, start, end time.Time, step time.Duration) (interface{}, error) {
  1515. u := cli.URL(epQueryRange, nil)
  1516. q := u.Query()
  1517. q.Set("query", query)
  1518. q.Set("start", start.Format(time.RFC3339Nano))
  1519. q.Set("end", end.Format(time.RFC3339Nano))
  1520. q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
  1521. u.RawQuery = q.Encode()
  1522. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  1523. if err != nil {
  1524. return nil, err
  1525. }
  1526. _, body, err := cli.Do(context.Background(), req)
  1527. if err != nil {
  1528. klog.V(1).Infof("ERROR" + err.Error())
  1529. }
  1530. if err != nil {
  1531. return nil, err
  1532. }
  1533. var toReturn interface{}
  1534. err = json.Unmarshal(body, &toReturn)
  1535. if err != nil {
  1536. klog.V(1).Infof("ERROR" + err.Error())
  1537. }
  1538. return toReturn, err
  1539. }
  1540. func query(cli prometheusClient.Client, query string) (interface{}, error) {
  1541. u := cli.URL(epQuery, nil)
  1542. q := u.Query()
  1543. q.Set("query", query)
  1544. u.RawQuery = q.Encode()
  1545. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  1546. if err != nil {
  1547. return nil, err
  1548. }
  1549. _, body, err := cli.Do(context.Background(), req)
  1550. if err != nil {
  1551. return nil, err
  1552. }
  1553. var toReturn interface{}
  1554. err = json.Unmarshal(body, &toReturn)
  1555. if err != nil {
  1556. klog.V(1).Infof("ERROR" + err.Error())
  1557. }
  1558. return toReturn, err
  1559. }
  1560. //todo: don't cast, implement unmarshaler interface
  1561. func getNormalization(qr interface{}) (float64, error) {
  1562. data, ok := qr.(map[string]interface{})["data"]
  1563. if !ok {
  1564. e, err := wrapPrometheusError(qr)
  1565. if err != nil {
  1566. return 0, err
  1567. }
  1568. return 0, fmt.Errorf(e)
  1569. }
  1570. results, ok := data.(map[string]interface{})["result"].([]interface{})
  1571. if !ok {
  1572. return 0, fmt.Errorf("Result field not found in normalization response, aborting")
  1573. }
  1574. if len(results) > 0 {
  1575. dataPoint := results[0].(map[string]interface{})["value"].([]interface{})
  1576. if len(dataPoint) == 2 {
  1577. strNorm := dataPoint[1].(string)
  1578. val, _ := strconv.ParseFloat(strNorm, 64)
  1579. return val, nil
  1580. }
  1581. return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1582. }
  1583. return 0, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
  1584. }
  1585. type ContainerMetric struct {
  1586. Namespace string
  1587. PodName string
  1588. ContainerName string
  1589. NodeName string
  1590. }
  1591. func (c *ContainerMetric) Key() string {
  1592. return c.Namespace + "," + c.PodName + "," + c.ContainerName + "," + c.NodeName
  1593. }
  1594. func NewContainerMetricFromKey(key string) (*ContainerMetric, error) {
  1595. s := strings.Split(key, ",")
  1596. if len(s) == 4 {
  1597. return &ContainerMetric{
  1598. Namespace: s[0],
  1599. PodName: s[1],
  1600. ContainerName: s[2],
  1601. NodeName: s[3],
  1602. }, nil
  1603. }
  1604. return nil, fmt.Errorf("Not a valid key")
  1605. }
  1606. func newContainerMetricFromValues(ns string, podName string, containerName string, nodeName string) *ContainerMetric {
  1607. return &ContainerMetric{
  1608. Namespace: ns,
  1609. PodName: podName,
  1610. ContainerName: containerName,
  1611. NodeName: nodeName,
  1612. }
  1613. }
  1614. func newContainerMetricsFromPod(pod v1.Pod) ([]*ContainerMetric, error) {
  1615. podName := pod.GetObjectMeta().GetName()
  1616. ns := pod.GetObjectMeta().GetNamespace()
  1617. node := pod.Spec.NodeName
  1618. var cs []*ContainerMetric
  1619. for _, container := range pod.Spec.Containers {
  1620. containerName := container.Name
  1621. cs = append(cs, &ContainerMetric{
  1622. Namespace: ns,
  1623. PodName: podName,
  1624. ContainerName: containerName,
  1625. NodeName: node,
  1626. })
  1627. }
  1628. return cs, nil
  1629. }
  1630. func newContainerMetricFromPrometheus(metrics map[string]interface{}) (*ContainerMetric, error) {
  1631. cName, ok := metrics["container_name"]
  1632. if !ok {
  1633. return nil, fmt.Errorf("Prometheus vector does not have container name")
  1634. }
  1635. containerName, ok := cName.(string)
  1636. if !ok {
  1637. return nil, fmt.Errorf("Prometheus vector does not have string container name")
  1638. }
  1639. pName, ok := metrics["pod_name"]
  1640. if !ok {
  1641. return nil, fmt.Errorf("Prometheus vector does not have pod name")
  1642. }
  1643. podName, ok := pName.(string)
  1644. if !ok {
  1645. return nil, fmt.Errorf("Prometheus vector does not have string pod name")
  1646. }
  1647. ns, ok := metrics["namespace"]
  1648. if !ok {
  1649. return nil, fmt.Errorf("Prometheus vector does not have namespace")
  1650. }
  1651. namespace, ok := ns.(string)
  1652. if !ok {
  1653. return nil, fmt.Errorf("Prometheus vector does not have string namespace")
  1654. }
  1655. node, ok := metrics["node"]
  1656. if !ok {
  1657. klog.V(4).Info("Prometheus vector does not have node name")
  1658. node = ""
  1659. }
  1660. nodeName, ok := node.(string)
  1661. if !ok {
  1662. return nil, fmt.Errorf("Prometheus vector does not have string node")
  1663. }
  1664. return &ContainerMetric{
  1665. ContainerName: containerName,
  1666. PodName: podName,
  1667. Namespace: namespace,
  1668. NodeName: nodeName,
  1669. }, nil
  1670. }
  1671. func getContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
  1672. data, ok := qr.(map[string]interface{})["data"]
  1673. if !ok {
  1674. e, err := wrapPrometheusError(qr)
  1675. if err != nil {
  1676. return nil, err
  1677. }
  1678. return nil, fmt.Errorf(e)
  1679. }
  1680. r, ok := data.(map[string]interface{})["result"]
  1681. if !ok {
  1682. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  1683. }
  1684. results, ok := r.([]interface{})
  1685. if !ok {
  1686. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  1687. }
  1688. containerData := make(map[string][]*Vector)
  1689. for _, val := range results {
  1690. metric, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
  1691. if !ok {
  1692. return nil, fmt.Errorf("Prometheus vector does not have metric labels")
  1693. }
  1694. containerMetric, err := newContainerMetricFromPrometheus(metric)
  1695. if err != nil {
  1696. return nil, err
  1697. }
  1698. value, ok := val.(map[string]interface{})["value"]
  1699. if !ok {
  1700. return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  1701. }
  1702. dataPoint, ok := value.([]interface{})
  1703. if !ok || len(dataPoint) != 2 {
  1704. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1705. }
  1706. strVal := dataPoint[1].(string)
  1707. v, _ := strconv.ParseFloat(strVal, 64)
  1708. if normalize && normalizationValue != 0 {
  1709. v = v / normalizationValue
  1710. }
  1711. toReturn := &Vector{
  1712. Timestamp: dataPoint[0].(float64),
  1713. Value: v,
  1714. }
  1715. klog.V(4).Info("key: " + containerMetric.Key())
  1716. containerData[containerMetric.Key()] = []*Vector{toReturn}
  1717. }
  1718. return containerData, nil
  1719. }
  1720. func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
  1721. data, ok := qr.(map[string]interface{})["data"]
  1722. if !ok {
  1723. e, err := wrapPrometheusError(qr)
  1724. if err != nil {
  1725. return nil, err
  1726. }
  1727. return nil, fmt.Errorf(e)
  1728. }
  1729. r, ok := data.(map[string]interface{})["result"]
  1730. if !ok {
  1731. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  1732. }
  1733. results, ok := r.([]interface{})
  1734. if !ok {
  1735. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  1736. }
  1737. containerData := make(map[string][]*Vector)
  1738. for _, val := range results {
  1739. metric, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
  1740. if !ok {
  1741. return nil, fmt.Errorf("Prometheus vector does not have metric labels")
  1742. }
  1743. containerMetric, err := newContainerMetricFromPrometheus(metric)
  1744. if err != nil {
  1745. return nil, err
  1746. }
  1747. vs, ok := val.(map[string]interface{})["values"]
  1748. if !ok {
  1749. return nil, fmt.Errorf("Improperly formatted results from prometheus, values is not a field in the vector")
  1750. }
  1751. values, ok := vs.([]interface{})
  1752. if !ok {
  1753. return nil, fmt.Errorf("Improperly formatted results from prometheus, values is not a slice")
  1754. }
  1755. var vectors []*Vector
  1756. for _, value := range values {
  1757. dataPoint, ok := value.([]interface{})
  1758. if !ok || len(dataPoint) != 2 {
  1759. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1760. }
  1761. strVal := dataPoint[1].(string)
  1762. v, _ := strconv.ParseFloat(strVal, 64)
  1763. if normalize && normalizationValue != 0 {
  1764. v = v / normalizationValue
  1765. }
  1766. vectors = append(vectors, &Vector{
  1767. Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
  1768. Value: v,
  1769. })
  1770. }
  1771. containerData[containerMetric.Key()] = vectors
  1772. }
  1773. return containerData, nil
  1774. }
  1775. func wrapPrometheusError(qr interface{}) (string, error) {
  1776. e, ok := qr.(map[string]interface{})["error"]
  1777. if !ok {
  1778. return "", fmt.Errorf("Unexpected response from Prometheus")
  1779. }
  1780. eStr, ok := e.(string)
  1781. return eStr, nil
  1782. }