costmodel.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "net/http"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "time"
  12. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  13. prometheusClient "github.com/prometheus/client_golang/api"
  14. v1 "k8s.io/api/core/v1"
  15. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  16. "k8s.io/apimachinery/pkg/labels"
  17. "k8s.io/client-go/kubernetes"
  18. "k8s.io/klog"
  19. )
  20. const (
  21. statusAPIError = 422
  22. apiPrefix = "/api/v1"
  23. epAlertManagers = apiPrefix + "/alertmanagers"
  24. epQuery = apiPrefix + "/query"
  25. epQueryRange = apiPrefix + "/query_range"
  26. epLabelValues = apiPrefix + "/label/:name/values"
  27. epSeries = apiPrefix + "/series"
  28. epTargets = apiPrefix + "/targets"
  29. epSnapshot = apiPrefix + "/admin/tsdb/snapshot"
  30. epDeleteSeries = apiPrefix + "/admin/tsdb/delete_series"
  31. epCleanTombstones = apiPrefix + "/admin/tsdb/clean_tombstones"
  32. epConfig = apiPrefix + "/status/config"
  33. epFlags = apiPrefix + "/status/flags"
  34. )
  35. type CostData struct {
  36. Name string `json:"name"`
  37. PodName string `json:"podName"`
  38. NodeName string `json:"nodeName"`
  39. NodeData *costAnalyzerCloud.Node `json:"node"`
  40. Namespace string `json:"namespace"`
  41. Deployments []string `json:"deployments"`
  42. Services []string `json:"services"`
  43. Daemonsets []string `json:"daemonsets"`
  44. Statefulsets []string `json:"statefulsets"`
  45. Jobs []string `json:"jobs"`
  46. RAMReq []*Vector `json:"ramreq"`
  47. RAMUsed []*Vector `json:"ramused"`
  48. CPUReq []*Vector `json:"cpureq"`
  49. CPUUsed []*Vector `json:"cpuused"`
  50. RAMAllocation []*Vector `json:"ramallocated"`
  51. CPUAllocation []*Vector `json:"cpuallocated"`
  52. GPUReq []*Vector `json:"gpureq"`
  53. PVData []*PersistentVolumeData `json:"pvData"`
  54. Labels map[string]string `json:"labels"`
  55. NamespaceLabels map[string]string `json:"namespaceLabels"`
  56. }
  57. type Vector struct {
  58. Timestamp float64 `json:"timestamp"`
  59. Value float64 `json:"value"`
  60. }
  61. func ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider, window string) (map[string]*CostData, error) {
  62. queryRAMRequests := `avg(label_replace(label_replace(avg((count_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD"}[` + window + `]) * avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD"}[` + window + `]))) by (namespace,container,pod) , "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)") ) by (namespace,container_name, pod_name)`
  63. queryRAMUsage := `sort_desc(avg(count_over_time(container_memory_usage_bytes{container_name!="",container_name!="POD"}[` + window + `]) * avg_over_time(container_memory_usage_bytes{container_name!="",container_name!="POD"}[` + window + `])) by (namespace,container_name,pod_name,instance))`
  64. queryCPURequests := `avg(label_replace(label_replace(avg((count_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD"}[` + window + `]) * avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD"}[` + window + `]))) by (namespace,container,pod) , "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)") ) by (namespace,container_name, pod_name)`
  65. queryCPUUsage := `avg(rate(container_cpu_usage_seconds_total{container_name!="",container_name!="POD"}[` + window + `])) by (namespace,container_name,pod_name,instance)`
  66. queryGPURequests := `avg(label_replace(label_replace(avg((count_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD"}[` + window + `]) * avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD"}[` + window + `]))) by (namespace,container,pod) , "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)") ) by (namespace,container_name, pod_name)`
  67. queryPVRequests := `avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace)
  68. *
  69. on (persistentvolumeclaim, namespace) group_right(storageclass)
  70. sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace)`
  71. normalization := `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[` + window + `]))`
  72. resultRAMRequests, err := query(cli, queryRAMRequests)
  73. if err != nil {
  74. return nil, fmt.Errorf("Error fetching RAM requests: " + err.Error())
  75. }
  76. resultRAMUsage, err := query(cli, queryRAMUsage)
  77. if err != nil {
  78. return nil, fmt.Errorf("Error fetching RAM usage: " + err.Error())
  79. }
  80. resultCPURequests, err := query(cli, queryCPURequests)
  81. if err != nil {
  82. return nil, fmt.Errorf("Error fetching CPU requests: " + err.Error())
  83. }
  84. resultCPUUsage, err := query(cli, queryCPUUsage)
  85. if err != nil {
  86. return nil, fmt.Errorf("Error fetching CPUUsage requests: " + err.Error())
  87. }
  88. resultGPURequests, err := query(cli, queryGPURequests)
  89. if err != nil {
  90. return nil, fmt.Errorf("Error fetching GPU requests: " + err.Error())
  91. }
  92. resultPVRequests, err := query(cli, queryPVRequests)
  93. if err != nil {
  94. return nil, fmt.Errorf("Error fetching PV requests: " + err.Error())
  95. }
  96. normalizationResult, err := query(cli, normalization)
  97. if err != nil {
  98. return nil, fmt.Errorf("Error fetching normalization data: " + err.Error())
  99. }
  100. normalizationValue, err := getNormalization(normalizationResult)
  101. if err != nil {
  102. return nil, err
  103. }
  104. nodes, err := getNodeCost(clientset, cloud)
  105. if err != nil {
  106. klog.V(1).Infof("Warning, no Node cost model available: " + err.Error())
  107. return nil, err
  108. }
  109. podlist, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
  110. if err != nil {
  111. return nil, err
  112. }
  113. podDeploymentsMapping, err := getPodDeployments(clientset, podlist)
  114. if err != nil {
  115. return nil, err
  116. }
  117. podServicesMapping, err := getPodServices(clientset, podlist)
  118. if err != nil {
  119. return nil, err
  120. }
  121. namespaceLabelsMapping, err := getNamespaceLabels(clientset)
  122. if err != nil {
  123. return nil, err
  124. }
  125. pvClaimMapping, err := getPVInfoVector(resultPVRequests)
  126. if err != nil {
  127. return nil, err
  128. }
  129. containerNameCost := make(map[string]*CostData)
  130. containers := make(map[string]bool)
  131. RAMReqMap, err := getContainerMetricVector(resultRAMRequests, true, normalizationValue)
  132. if err != nil {
  133. return nil, err
  134. }
  135. for key := range RAMReqMap {
  136. containers[key] = true
  137. }
  138. RAMUsedMap, err := getContainerMetricVector(resultRAMUsage, true, normalizationValue)
  139. if err != nil {
  140. return nil, err
  141. }
  142. for key := range RAMUsedMap {
  143. containers[key] = true
  144. }
  145. CPUReqMap, err := getContainerMetricVector(resultCPURequests, true, normalizationValue)
  146. if err != nil {
  147. return nil, err
  148. }
  149. for key := range CPUReqMap {
  150. containers[key] = true
  151. }
  152. GPUReqMap, err := getContainerMetricVector(resultGPURequests, true, normalizationValue)
  153. if err != nil {
  154. return nil, err
  155. }
  156. for key := range GPUReqMap {
  157. containers[key] = true
  158. }
  159. CPUUsedMap, err := getContainerMetricVector(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
  160. if err != nil {
  161. return nil, err
  162. }
  163. for key := range CPUUsedMap {
  164. containers[key] = true
  165. }
  166. currentContainers := make(map[string]v1.Pod)
  167. for _, pod := range podlist.Items {
  168. cs, err := newContainerMetricsFromPod(pod)
  169. if err != nil {
  170. return nil, err
  171. }
  172. for _, c := range cs {
  173. containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
  174. currentContainers[c.Key()] = pod
  175. }
  176. }
  177. for key := range containers {
  178. if _, ok := containerNameCost[key]; ok {
  179. continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
  180. }
  181. if pod, ok := currentContainers[key]; ok {
  182. podName := pod.GetObjectMeta().GetName()
  183. ns := pod.GetObjectMeta().GetNamespace()
  184. nsLabels := namespaceLabelsMapping[ns]
  185. podLabels := pod.GetObjectMeta().GetLabels()
  186. nodeName := pod.Spec.NodeName
  187. var nodeData *costAnalyzerCloud.Node
  188. if _, ok := nodes[nodeName]; ok {
  189. nodeData = nodes[nodeName]
  190. }
  191. var podDeployments []string
  192. if _, ok := podDeploymentsMapping[ns]; ok {
  193. if ds, ok := podDeploymentsMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  194. podDeployments = ds
  195. } else {
  196. podDeployments = []string{}
  197. }
  198. }
  199. var podPVs []*PersistentVolumeData
  200. podClaims := pod.Spec.Volumes
  201. for _, vol := range podClaims {
  202. if vol.PersistentVolumeClaim != nil {
  203. name := vol.PersistentVolumeClaim.ClaimName
  204. if pvClaim, ok := pvClaimMapping[ns+","+name]; ok {
  205. podPVs = append(podPVs, pvClaim)
  206. }
  207. }
  208. }
  209. var podServices []string
  210. if _, ok := podServicesMapping[ns]; ok {
  211. if svcs, ok := podServicesMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  212. podServices = svcs
  213. } else {
  214. podServices = []string{}
  215. }
  216. }
  217. for i, container := range pod.Spec.Containers {
  218. containerName := container.Name
  219. // recreate the key and look up data for this container
  220. newKey := ns + "," + podName + "," + containerName
  221. RAMReqV, ok := RAMReqMap[newKey]
  222. if !ok {
  223. klog.V(2).Info("no RAM requests for " + newKey)
  224. RAMReqV = []*Vector{&Vector{}}
  225. }
  226. RAMUsedV, ok := RAMUsedMap[newKey]
  227. if !ok {
  228. klog.V(2).Info("no RAM usage for " + newKey)
  229. RAMUsedV = []*Vector{&Vector{}}
  230. }
  231. CPUReqV, ok := CPUReqMap[newKey]
  232. if !ok {
  233. klog.V(2).Info("no CPU requests for " + newKey)
  234. CPUReqV = []*Vector{&Vector{}}
  235. }
  236. GPUReqV, ok := GPUReqMap[newKey]
  237. if !ok {
  238. klog.V(2).Info("no GPU requests for " + newKey)
  239. GPUReqV = []*Vector{&Vector{}}
  240. }
  241. CPUUsedV, ok := CPUUsedMap[newKey]
  242. if !ok {
  243. klog.V(2).Info("no CPU usage for " + newKey)
  244. CPUUsedV = []*Vector{&Vector{}}
  245. }
  246. var pvReq []*PersistentVolumeData
  247. if i == 0 { // avoid duplicating by just assigning all claims to the first container.
  248. pvReq = podPVs
  249. }
  250. costs := &CostData{
  251. Name: containerName,
  252. PodName: podName,
  253. NodeName: nodeName,
  254. Namespace: ns,
  255. Deployments: podDeployments,
  256. Services: podServices,
  257. Daemonsets: getDaemonsetsOfPod(pod),
  258. Jobs: getJobsOfPod(pod),
  259. Statefulsets: getStatefulSetsOfPod(pod),
  260. NodeData: nodeData,
  261. RAMReq: RAMReqV,
  262. RAMUsed: RAMUsedV,
  263. CPUReq: CPUReqV,
  264. CPUUsed: CPUUsedV,
  265. GPUReq: GPUReqV,
  266. PVData: pvReq,
  267. Labels: podLabels,
  268. NamespaceLabels: nsLabels,
  269. }
  270. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  271. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  272. containerNameCost[newKey] = costs
  273. }
  274. } else {
  275. // The container has been deleted. Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
  276. // TODO: The nodename should be available from the prometheus query. Check if that node still exists and use that price
  277. klog.V(3).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
  278. c, _ := newContainerMetricFromKey(key)
  279. RAMReqV, ok := RAMReqMap[key]
  280. if !ok {
  281. klog.V(2).Info("no RAM requests for " + key)
  282. RAMReqV = []*Vector{&Vector{}}
  283. }
  284. RAMUsedV, ok := RAMUsedMap[key]
  285. if !ok {
  286. klog.V(2).Info("no RAM usage for " + key)
  287. RAMUsedV = []*Vector{&Vector{}}
  288. }
  289. CPUReqV, ok := CPUReqMap[key]
  290. if !ok {
  291. klog.V(2).Info("no CPU requests for " + key)
  292. CPUReqV = []*Vector{&Vector{}}
  293. }
  294. GPUReqV, ok := GPUReqMap[key]
  295. if !ok {
  296. klog.V(2).Info("no GPU requests for " + key)
  297. GPUReqV = []*Vector{&Vector{}}
  298. }
  299. CPUUsedV, ok := CPUUsedMap[key]
  300. if !ok {
  301. klog.V(2).Info("no CPU usage for " + key)
  302. CPUUsedV = []*Vector{&Vector{}}
  303. }
  304. costs := &CostData{ // TODO: Expand the prometheus query/use prometheus to query for more data here if it exists.
  305. Name: c.ContainerName,
  306. PodName: c.PodName,
  307. Namespace: c.Namespace,
  308. RAMReq: RAMReqV,
  309. RAMUsed: RAMUsedV,
  310. CPUReq: CPUReqV,
  311. CPUUsed: CPUUsedV,
  312. GPUReq: GPUReqV,
  313. }
  314. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  315. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  316. containerNameCost[key] = costs
  317. }
  318. }
  319. return containerNameCost, err
  320. }
  321. func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
  322. if req == nil || len(req) == 0 {
  323. return used
  324. }
  325. if used == nil || len(used) == 0 {
  326. return req
  327. }
  328. var allocation []*Vector
  329. var timestamps []float64
  330. reqMap := make(map[float64]float64)
  331. for _, reqV := range req {
  332. if reqV.Timestamp == 0 {
  333. continue
  334. }
  335. reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
  336. reqMap[reqV.Timestamp] = reqV.Value
  337. timestamps = append(timestamps, reqV.Timestamp)
  338. }
  339. usedMap := make(map[float64]float64)
  340. for _, usedV := range used {
  341. if usedV.Timestamp == 0 {
  342. continue
  343. }
  344. usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
  345. usedMap[usedV.Timestamp] = usedV.Value
  346. if _, ok := reqMap[usedV.Timestamp]; !ok { // no need to double add, since we'll range over sorted timestamps and check.
  347. timestamps = append(timestamps, usedV.Timestamp)
  348. }
  349. }
  350. sort.Float64s(timestamps)
  351. for _, t := range timestamps {
  352. rv, okR := reqMap[t]
  353. uv, okU := usedMap[t]
  354. allocationVector := &Vector{
  355. Timestamp: t,
  356. }
  357. if okR && okU {
  358. allocationVector.Value = math.Max(rv, uv)
  359. } else if okR {
  360. allocationVector.Value = rv
  361. } else if okU {
  362. allocationVector.Value = uv
  363. }
  364. allocation = append(allocation, allocationVector)
  365. }
  366. return allocation
  367. }
  368. func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
  369. nodeList, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  370. if err != nil {
  371. return nil, err
  372. }
  373. nodes := make(map[string]*costAnalyzerCloud.Node)
  374. for _, n := range nodeList.Items {
  375. name := n.GetObjectMeta().GetName()
  376. nodeLabels := n.GetObjectMeta().GetLabels()
  377. nodeLabels["providerID"] = n.Spec.ProviderID
  378. cnode, err := cloud.NodePricing(cloud.GetKey(nodeLabels))
  379. if err != nil {
  380. klog.V(1).Infof("Error getting node. Error: " + err.Error())
  381. continue
  382. }
  383. var cpu float64
  384. if cnode.VCPU == "" {
  385. cpu = float64(n.Status.Capacity.Cpu().Value())
  386. cnode.VCPU = n.Status.Capacity.Cpu().String()
  387. } else {
  388. cpu, _ = strconv.ParseFloat(cnode.VCPU, 64)
  389. }
  390. var ram float64
  391. if cnode.RAM == "" {
  392. cnode.RAM = n.Status.Capacity.Memory().String()
  393. }
  394. ram = float64(n.Status.Capacity.Memory().Value())
  395. if cnode.RAMCost == "" { // We couldn't find a ramcost, so fix cpu and allocate ram accordingly
  396. basePrice, _ := strconv.ParseFloat(cnode.BaseCPUPrice, 64)
  397. totalCPUPrice := basePrice * cpu
  398. var nodePrice float64
  399. if cnode.Cost != "" {
  400. klog.V(3).Infof("Use given nodeprice as whole node price")
  401. nodePrice, _ = strconv.ParseFloat(cnode.Cost, 64)
  402. } else {
  403. klog.V(3).Infof("Use cpuprice as whole node price")
  404. nodePrice, _ = strconv.ParseFloat(cnode.VCPUCost, 64) // all the price was allocated the the CPU
  405. }
  406. if totalCPUPrice >= nodePrice {
  407. totalCPUPrice = 0.9 * nodePrice // just allocate RAM costs to 10% of the node price here to avoid 0 or negative in the numerator
  408. }
  409. ramPrice := (nodePrice - totalCPUPrice) / (ram / 1024 / 1024 / 1024)
  410. cpuPrice := totalCPUPrice / cpu
  411. cnode.VCPUCost = fmt.Sprintf("%f", cpuPrice)
  412. cnode.RAMCost = fmt.Sprintf("%f", ramPrice)
  413. cnode.RAMBytes = fmt.Sprintf("%f", ram)
  414. klog.V(2).Infof("Node \"%s\" RAM Cost := %v", name, cnode.RAMCost)
  415. }
  416. nodes[name] = cnode
  417. }
  418. return nodes, nil
  419. }
  420. func getPodServices(clientset kubernetes.Interface, podList *v1.PodList) (map[string]map[string][]string, error) {
  421. servicesList, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
  422. if err != nil {
  423. return nil, err
  424. }
  425. podServicesMapping := make(map[string]map[string][]string)
  426. for _, service := range servicesList.Items {
  427. namespace := service.GetObjectMeta().GetNamespace()
  428. name := service.GetObjectMeta().GetName()
  429. if _, ok := podServicesMapping[namespace]; !ok {
  430. podServicesMapping[namespace] = make(map[string][]string)
  431. }
  432. s := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
  433. if err != nil {
  434. klog.V(2).Infof("Error doing service label conversion: " + err.Error())
  435. }
  436. for _, pod := range podList.Items {
  437. labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
  438. if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
  439. services, ok := podServicesMapping[namespace][pod.GetObjectMeta().GetName()]
  440. if ok {
  441. podServicesMapping[namespace][pod.GetObjectMeta().GetName()] = append(services, name)
  442. } else {
  443. podServicesMapping[namespace][pod.GetObjectMeta().GetName()] = []string{name}
  444. }
  445. }
  446. }
  447. }
  448. return podServicesMapping, nil
  449. }
  450. func getPodDeployments(clientset kubernetes.Interface, podList *v1.PodList) (map[string]map[string][]string, error) {
  451. deploymentsList, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
  452. if err != nil {
  453. return nil, err
  454. }
  455. podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
  456. for _, deployment := range deploymentsList.Items {
  457. namespace := deployment.GetObjectMeta().GetNamespace()
  458. name := deployment.GetObjectMeta().GetName()
  459. if _, ok := podDeploymentsMapping[namespace]; !ok {
  460. podDeploymentsMapping[namespace] = make(map[string][]string)
  461. }
  462. s, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
  463. if err != nil {
  464. klog.V(2).Infof("Error doing deployment label conversion: " + err.Error())
  465. }
  466. for _, pod := range podList.Items {
  467. labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
  468. if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
  469. deployments, ok := podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()]
  470. if ok {
  471. podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()] = append(deployments, name)
  472. } else {
  473. podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()] = []string{name}
  474. }
  475. }
  476. }
  477. }
  478. return podDeploymentsMapping, nil
  479. }
  480. func ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider,
  481. startString, endString, windowString string) (map[string]*CostData, error) {
  482. queryRAMRequests := `avg(label_replace(label_replace(avg((count_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD"}[` + windowString + `]) * avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD"}[` + windowString + `]))) by (namespace,container,pod) , "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)") ) by (namespace,container_name, pod_name)`
  483. queryRAMUsage := `sort_desc(avg(count_over_time(container_memory_usage_bytes{container_name!="",container_name!="POD"}[` + windowString + `]) * avg_over_time(container_memory_usage_bytes{container_name!="",container_name!="POD"}[` + windowString + `])) by (namespace,container_name,pod_name,instance))`
  484. queryCPURequests := `avg(label_replace(label_replace(avg((count_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD"}[` + windowString + `]) * avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD"}[` + windowString + `]))) by (namespace,container,pod) , "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)") ) by (namespace,container_name, pod_name)`
  485. queryCPUUsage := `avg(rate(container_cpu_usage_seconds_total{container_name!="",container_name!="POD"}[` + windowString + `])) by (namespace,container_name,pod_name,instance)`
  486. queryGPURequests := `avg(label_replace(label_replace(avg((count_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD"}[` + windowString + `]) * avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD"}[` + windowString + `]))) by (namespace,container,pod) , "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)") ) by (namespace,container_name, pod_name)`
  487. queryPVRequests := `avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace)
  488. *
  489. on (persistentvolumeclaim, namespace) group_right(storageclass)
  490. sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace)`
  491. normalization := `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[` + windowString + `]))`
  492. layout := "2006-01-02T15:04:05.000Z"
  493. start, err := time.Parse(layout, startString)
  494. if err != nil {
  495. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  496. return nil, err
  497. }
  498. end, err := time.Parse(layout, endString)
  499. if err != nil {
  500. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  501. return nil, err
  502. }
  503. window, err := time.ParseDuration(windowString)
  504. if err != nil {
  505. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  506. return nil, err
  507. }
  508. resultRAMRequests, err := queryRange(cli, queryRAMRequests, start, end, window)
  509. if err != nil {
  510. return nil, fmt.Errorf("Error fetching RAM requests: " + err.Error())
  511. }
  512. resultRAMUsage, err := queryRange(cli, queryRAMUsage, start, end, window)
  513. if err != nil {
  514. return nil, fmt.Errorf("Error fetching RAM usage: " + err.Error())
  515. }
  516. resultCPURequests, err := queryRange(cli, queryCPURequests, start, end, window)
  517. if err != nil {
  518. return nil, fmt.Errorf("Error fetching CPU requests: " + err.Error())
  519. }
  520. resultCPUUsage, err := queryRange(cli, queryCPUUsage, start, end, window)
  521. if err != nil {
  522. return nil, fmt.Errorf("Error fetching CPU usage: " + err.Error())
  523. }
  524. resultGPURequests, err := queryRange(cli, queryGPURequests, start, end, window)
  525. if err != nil {
  526. return nil, fmt.Errorf("Error fetching GPU requests: " + err.Error())
  527. }
  528. resultPVRequests, err := queryRange(cli, queryPVRequests, start, end, window)
  529. if err != nil {
  530. return nil, fmt.Errorf("Error fetching PV requests: " + err.Error())
  531. }
  532. normalizationResult, err := query(cli, normalization)
  533. if err != nil {
  534. return nil, fmt.Errorf("Error fetching normalization data: " + err.Error())
  535. }
  536. normalizationValue, err := getNormalization(normalizationResult)
  537. if err != nil {
  538. return nil, err
  539. }
  540. nodes, err := getNodeCost(clientset, cloud)
  541. if err != nil {
  542. //return nil, err
  543. klog.V(1).Infof("Warning, no cost model available: " + err.Error())
  544. }
  545. podlist, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
  546. if err != nil {
  547. return nil, err
  548. }
  549. podDeploymentsMapping, err := getPodDeployments(clientset, podlist)
  550. if err != nil {
  551. return nil, err
  552. }
  553. podServicesMapping, err := getPodServices(clientset, podlist)
  554. if err != nil {
  555. return nil, err
  556. }
  557. namespaceLabelsMapping, err := getNamespaceLabels(clientset)
  558. if err != nil {
  559. return nil, err
  560. }
  561. pvClaimMapping, err := getPVInfoVectors(resultPVRequests)
  562. if err != nil {
  563. return nil, err
  564. }
  565. containerNameCost := make(map[string]*CostData)
  566. containers := make(map[string]bool)
  567. RAMReqMap, err := getContainerMetricVectors(resultRAMRequests, true, normalizationValue)
  568. if err != nil {
  569. return nil, err
  570. }
  571. for key := range RAMReqMap {
  572. containers[key] = true
  573. }
  574. RAMUsedMap, err := getContainerMetricVectors(resultRAMUsage, true, normalizationValue)
  575. if err != nil {
  576. return nil, err
  577. }
  578. for key := range RAMUsedMap {
  579. containers[key] = true
  580. }
  581. CPUReqMap, err := getContainerMetricVectors(resultCPURequests, true, normalizationValue)
  582. if err != nil {
  583. return nil, err
  584. }
  585. for key := range CPUReqMap {
  586. containers[key] = true
  587. }
  588. GPUReqMap, err := getContainerMetricVectors(resultGPURequests, true, normalizationValue)
  589. if err != nil {
  590. return nil, err
  591. }
  592. for key := range GPUReqMap {
  593. containers[key] = true
  594. }
  595. CPUUsedMap, err := getContainerMetricVectors(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
  596. if err != nil {
  597. return nil, err
  598. }
  599. for key := range CPUUsedMap {
  600. containers[key] = true
  601. }
  602. currentContainers := make(map[string]v1.Pod)
  603. for _, pod := range podlist.Items {
  604. cs, err := newContainerMetricsFromPod(pod)
  605. if err != nil {
  606. return nil, err
  607. }
  608. for _, c := range cs {
  609. containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
  610. currentContainers[c.Key()] = pod
  611. }
  612. }
  613. for key := range containers {
  614. if _, ok := containerNameCost[key]; ok {
  615. continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
  616. }
  617. if pod, ok := currentContainers[key]; ok {
  618. podName := pod.GetObjectMeta().GetName()
  619. ns := pod.GetObjectMeta().GetNamespace()
  620. podLabels := pod.GetObjectMeta().GetLabels()
  621. nodeName := pod.Spec.NodeName
  622. var nodeData *costAnalyzerCloud.Node
  623. if _, ok := nodes[nodeName]; ok {
  624. nodeData = nodes[nodeName]
  625. }
  626. var podDeployments []string
  627. if _, ok := podDeploymentsMapping[ns]; ok {
  628. if ds, ok := podDeploymentsMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  629. podDeployments = ds
  630. } else {
  631. podDeployments = []string{}
  632. }
  633. }
  634. var podPVs []*PersistentVolumeData
  635. podClaims := pod.Spec.Volumes
  636. for _, vol := range podClaims {
  637. if vol.PersistentVolumeClaim != nil {
  638. name := vol.PersistentVolumeClaim.ClaimName
  639. if pvClaim, ok := pvClaimMapping[ns+","+name]; ok {
  640. podPVs = append(podPVs, pvClaim)
  641. }
  642. }
  643. }
  644. var podServices []string
  645. if _, ok := podServicesMapping[ns]; ok {
  646. if svcs, ok := podServicesMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  647. podServices = svcs
  648. } else {
  649. podServices = []string{}
  650. }
  651. }
  652. nsLabels := namespaceLabelsMapping[ns]
  653. for i, container := range pod.Spec.Containers {
  654. containerName := container.Name
  655. newKey := ns + "," + podName + "," + containerName
  656. RAMReqV, ok := RAMReqMap[newKey]
  657. if !ok {
  658. klog.V(2).Info("no RAM requests for " + newKey)
  659. RAMReqV = []*Vector{}
  660. }
  661. RAMUsedV, ok := RAMUsedMap[newKey]
  662. if !ok {
  663. klog.V(2).Info("no RAM usage for " + newKey)
  664. RAMUsedV = []*Vector{}
  665. }
  666. CPUReqV, ok := CPUReqMap[newKey]
  667. if !ok {
  668. klog.V(2).Info("no CPU requests for " + newKey)
  669. CPUReqV = []*Vector{}
  670. }
  671. GPUReqV, ok := GPUReqMap[newKey]
  672. if !ok {
  673. klog.V(2).Info("no GPU requests for " + newKey)
  674. GPUReqV = []*Vector{}
  675. }
  676. CPUUsedV, ok := CPUUsedMap[newKey]
  677. if !ok {
  678. klog.V(2).Info("no CPU usage for " + newKey)
  679. CPUUsedV = []*Vector{}
  680. }
  681. var pvReq []*PersistentVolumeData
  682. if i == 0 { // avoid duplicating by just assigning all claims to the first container.
  683. pvReq = podPVs
  684. }
  685. costs := &CostData{
  686. Name: containerName,
  687. PodName: podName,
  688. NodeName: nodeName,
  689. Namespace: ns,
  690. Deployments: podDeployments,
  691. Services: podServices,
  692. Daemonsets: getDaemonsetsOfPod(pod),
  693. Jobs: getJobsOfPod(pod),
  694. Statefulsets: getStatefulSetsOfPod(pod),
  695. NodeData: nodeData,
  696. RAMReq: RAMReqV,
  697. RAMUsed: RAMUsedV,
  698. CPUReq: CPUReqV,
  699. CPUUsed: CPUUsedV,
  700. GPUReq: GPUReqV,
  701. PVData: pvReq,
  702. Labels: podLabels,
  703. NamespaceLabels: nsLabels,
  704. }
  705. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  706. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  707. containerNameCost[newKey] = costs
  708. }
  709. } else {
  710. // The container has been deleted. Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
  711. // TODO: The nodename should be available from the prometheus query. Check if that node still exists and use that price
  712. klog.V(3).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
  713. c, _ := newContainerMetricFromKey(key)
  714. RAMReqV, ok := RAMReqMap[key]
  715. if !ok {
  716. klog.V(2).Info("no RAM requests for " + key)
  717. RAMReqV = []*Vector{}
  718. }
  719. RAMUsedV, ok := RAMUsedMap[key]
  720. if !ok {
  721. klog.V(2).Info("no RAM usage for " + key)
  722. RAMUsedV = []*Vector{}
  723. }
  724. CPUReqV, ok := CPUReqMap[key]
  725. if !ok {
  726. klog.V(2).Info("no CPU requests for " + key)
  727. CPUReqV = []*Vector{}
  728. }
  729. GPUReqV, ok := GPUReqMap[key]
  730. if !ok {
  731. klog.V(2).Info("no GPU requests for " + key)
  732. GPUReqV = []*Vector{}
  733. }
  734. CPUUsedV, ok := CPUUsedMap[key]
  735. if !ok {
  736. klog.V(2).Info("no CPU usage for " + key)
  737. CPUUsedV = []*Vector{}
  738. }
  739. costs := &CostData{ // TODO: Expand the prometheus query/use prometheus to query for more data here if it exists.
  740. Name: c.ContainerName,
  741. PodName: c.PodName,
  742. Namespace: c.Namespace,
  743. RAMReq: RAMReqV,
  744. RAMUsed: RAMUsedV,
  745. CPUReq: CPUReqV,
  746. CPUUsed: CPUUsedV,
  747. GPUReq: GPUReqV,
  748. }
  749. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  750. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  751. containerNameCost[key] = costs
  752. }
  753. }
  754. return containerNameCost, err
  755. }
  756. func getNamespaceLabels(clientset kubernetes.Interface) (map[string]map[string]string, error) {
  757. nsToLabels := make(map[string]map[string]string)
  758. nss, err := clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
  759. if err != nil {
  760. return nil, err
  761. }
  762. for _, ns := range nss.Items {
  763. nsToLabels[ns.Name] = ns.Labels
  764. }
  765. return nsToLabels, nil
  766. }
  767. func getDaemonsetsOfPod(pod v1.Pod) []string {
  768. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  769. if ownerReference.Kind == "DaemonSet" {
  770. return []string{ownerReference.Name}
  771. }
  772. }
  773. return []string{}
  774. }
  775. func getJobsOfPod(pod v1.Pod) []string {
  776. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  777. if ownerReference.Kind == "Job" {
  778. return []string{ownerReference.Name}
  779. }
  780. }
  781. return []string{}
  782. }
  783. func getStatefulSetsOfPod(pod v1.Pod) []string {
  784. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  785. if ownerReference.Kind == "StatefulSet" {
  786. return []string{ownerReference.Name}
  787. }
  788. }
  789. return []string{}
  790. }
  791. type PersistentVolumeData struct {
  792. Class string `json:"class"`
  793. Claim string `json:"claim"`
  794. Namespace string `json:"namespace"`
  795. Values []*Vector `json:"values"`
  796. }
  797. func getPVInfoVectors(qr interface{}) (map[string]*PersistentVolumeData, error) {
  798. pvmap := make(map[string]*PersistentVolumeData)
  799. for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
  800. metricInterface, ok := val.(map[string]interface{})["metric"]
  801. if !ok {
  802. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  803. }
  804. metricMap, ok := metricInterface.(map[string]interface{})
  805. if !ok {
  806. return nil, fmt.Errorf("Metric field is improperly formatted")
  807. }
  808. pvclaim := metricMap["persistentvolumeclaim"]
  809. pvclass := metricMap["storageclass"]
  810. pvnamespace := metricMap["namespace"]
  811. values, ok := val.(map[string]interface{})["values"].([]interface{})
  812. if !ok {
  813. return nil, fmt.Errorf("Values field is improperly formatted")
  814. }
  815. var vectors []*Vector
  816. for _, value := range values {
  817. dataPoint, ok := value.([]interface{})
  818. if !ok || len(dataPoint) != 2 {
  819. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  820. }
  821. strVal := dataPoint[1].(string)
  822. v, _ := strconv.ParseFloat(strVal, 64)
  823. vectors = append(vectors, &Vector{
  824. Timestamp: dataPoint[0].(float64),
  825. Value: v,
  826. })
  827. }
  828. key := pvnamespace.(string) + "," + pvclaim.(string)
  829. pvmap[key] = &PersistentVolumeData{
  830. Class: pvclass.(string),
  831. Claim: pvclaim.(string),
  832. Namespace: pvnamespace.(string),
  833. Values: vectors,
  834. }
  835. }
  836. return pvmap, nil
  837. }
  838. func getPVInfoVector(qr interface{}) (map[string]*PersistentVolumeData, error) {
  839. pvmap := make(map[string]*PersistentVolumeData)
  840. for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
  841. metricInterface, ok := val.(map[string]interface{})["metric"]
  842. if !ok {
  843. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  844. }
  845. metricMap, ok := metricInterface.(map[string]interface{})
  846. if !ok {
  847. return nil, fmt.Errorf("Metric field is improperly formatted")
  848. }
  849. pvclaim := metricMap["persistentvolumeclaim"]
  850. pvclass := metricMap["storageclass"]
  851. pvnamespace := metricMap["namespace"]
  852. dataPoint, ok := val.(map[string]interface{})["value"]
  853. if !ok {
  854. return nil, fmt.Errorf("Value field does not exist in data result vector")
  855. }
  856. value, ok := dataPoint.([]interface{})
  857. if !ok || len(value) != 2 {
  858. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  859. }
  860. var vectors []*Vector
  861. strVal := value[1].(string)
  862. v, _ := strconv.ParseFloat(strVal, 64)
  863. vectors = append(vectors, &Vector{
  864. Timestamp: value[0].(float64),
  865. Value: v,
  866. })
  867. key := pvclaim.(string) + "," + pvnamespace.(string)
  868. pvmap[key] = &PersistentVolumeData{
  869. Class: pvclass.(string),
  870. Claim: pvclaim.(string),
  871. Namespace: pvnamespace.(string),
  872. Values: vectors,
  873. }
  874. }
  875. return pvmap, nil
  876. }
  877. func queryRange(cli prometheusClient.Client, query string, start, end time.Time, step time.Duration) (interface{}, error) {
  878. u := cli.URL(epQueryRange, nil)
  879. q := u.Query()
  880. q.Set("query", query)
  881. q.Set("start", start.Format(time.RFC3339Nano))
  882. q.Set("end", end.Format(time.RFC3339Nano))
  883. q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
  884. u.RawQuery = q.Encode()
  885. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  886. if err != nil {
  887. return nil, err
  888. }
  889. _, body, err := cli.Do(context.Background(), req)
  890. if err != nil {
  891. klog.V(1).Infof("ERROR" + err.Error())
  892. }
  893. if err != nil {
  894. return nil, err
  895. }
  896. var toReturn interface{}
  897. err = json.Unmarshal(body, &toReturn)
  898. if err != nil {
  899. klog.V(1).Infof("ERROR" + err.Error())
  900. }
  901. return toReturn, err
  902. }
  903. func query(cli prometheusClient.Client, query string) (interface{}, error) {
  904. u := cli.URL(epQuery, nil)
  905. q := u.Query()
  906. q.Set("query", query)
  907. u.RawQuery = q.Encode()
  908. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  909. if err != nil {
  910. return nil, err
  911. }
  912. _, body, err := cli.Do(context.Background(), req)
  913. if err != nil {
  914. return nil, err
  915. }
  916. var toReturn interface{}
  917. err = json.Unmarshal(body, &toReturn)
  918. if err != nil {
  919. klog.V(1).Infof("ERROR" + err.Error())
  920. }
  921. return toReturn, err
  922. }
  923. //todo: don't cast, implement unmarshaler interface
  924. func getNormalization(qr interface{}) (float64, error) {
  925. data, ok := qr.(map[string]interface{})["data"]
  926. if !ok {
  927. return 0, fmt.Errorf("Data field not found in normalization response, aborting")
  928. }
  929. results, ok := data.(map[string]interface{})["result"].([]interface{})
  930. if !ok {
  931. return 0, fmt.Errorf("Result field not found in normalization response, aborting")
  932. }
  933. if len(results) > 0 {
  934. dataPoint := results[0].(map[string]interface{})["value"].([]interface{})
  935. if len(dataPoint) == 2 {
  936. strNorm := dataPoint[1].(string)
  937. val, _ := strconv.ParseFloat(strNorm, 64)
  938. return val, nil
  939. }
  940. return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  941. }
  942. return 0, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
  943. }
  944. type ContainerMetric struct {
  945. Namespace string
  946. PodName string
  947. ContainerName string
  948. }
  949. func (c *ContainerMetric) Key() string {
  950. return c.Namespace + "," + c.PodName + "," + c.ContainerName
  951. }
  952. func newContainerMetricFromKey(key string) (*ContainerMetric, error) {
  953. s := strings.Split(key, ",")
  954. if len(s) == 3 {
  955. return &ContainerMetric{
  956. Namespace: s[0],
  957. PodName: s[1],
  958. ContainerName: s[2],
  959. }, nil
  960. }
  961. return nil, fmt.Errorf("Not a valid key")
  962. }
  963. func newContainerMetricsFromPod(pod v1.Pod) ([]*ContainerMetric, error) {
  964. podName := pod.GetObjectMeta().GetName()
  965. ns := pod.GetObjectMeta().GetNamespace()
  966. var cs []*ContainerMetric
  967. for _, container := range pod.Spec.Containers {
  968. containerName := container.Name
  969. cs = append(cs, &ContainerMetric{
  970. Namespace: ns,
  971. PodName: podName,
  972. ContainerName: containerName,
  973. })
  974. }
  975. return cs, nil
  976. }
  977. func newContainerMetricFromPrometheus(metrics map[string]interface{}) (*ContainerMetric, error) {
  978. cName, ok := metrics["container_name"]
  979. if !ok {
  980. return nil, fmt.Errorf("Prometheus vector does not have container name")
  981. }
  982. containerName, ok := cName.(string)
  983. if !ok {
  984. return nil, fmt.Errorf("Prometheus vector does not have string container name")
  985. }
  986. pName, ok := metrics["pod_name"]
  987. if !ok {
  988. return nil, fmt.Errorf("Prometheus vector does not have pod name")
  989. }
  990. podName, ok := pName.(string)
  991. if !ok {
  992. return nil, fmt.Errorf("Prometheus vector does not have string pod name")
  993. }
  994. ns, ok := metrics["namespace"]
  995. if !ok {
  996. return nil, fmt.Errorf("Prometheus vector does not have namespace")
  997. }
  998. namespace, ok := ns.(string)
  999. if !ok {
  1000. return nil, fmt.Errorf("Prometheus vector does not have string namespace")
  1001. }
  1002. return &ContainerMetric{
  1003. ContainerName: containerName,
  1004. PodName: podName,
  1005. Namespace: namespace,
  1006. }, nil
  1007. }
  1008. func getContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
  1009. data, ok := qr.(map[string]interface{})["data"]
  1010. if !ok {
  1011. return nil, fmt.Errorf("Improperly formatted response from prometheus, response has no data field")
  1012. }
  1013. r, ok := data.(map[string]interface{})["result"]
  1014. if !ok {
  1015. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  1016. }
  1017. results, ok := r.([]interface{})
  1018. if !ok {
  1019. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  1020. }
  1021. containerData := make(map[string][]*Vector)
  1022. for _, val := range results {
  1023. metric, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
  1024. if !ok {
  1025. return nil, fmt.Errorf("Prometheus vector does not have metric labels")
  1026. }
  1027. containerMetric, err := newContainerMetricFromPrometheus(metric)
  1028. if err != nil {
  1029. return nil, err
  1030. }
  1031. value, ok := val.(map[string]interface{})["value"]
  1032. if !ok {
  1033. return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  1034. }
  1035. dataPoint, ok := value.([]interface{})
  1036. if !ok || len(dataPoint) != 2 {
  1037. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1038. }
  1039. strVal := dataPoint[1].(string)
  1040. v, _ := strconv.ParseFloat(strVal, 64)
  1041. if normalize && normalizationValue != 0 {
  1042. v = v / normalizationValue
  1043. }
  1044. toReturn := &Vector{
  1045. Timestamp: dataPoint[0].(float64),
  1046. Value: v,
  1047. }
  1048. klog.V(2).Info("key: " + containerMetric.Key())
  1049. containerData[containerMetric.Key()] = []*Vector{toReturn}
  1050. }
  1051. return containerData, nil
  1052. }
  1053. func getContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
  1054. data, ok := qr.(map[string]interface{})["data"]
  1055. if !ok {
  1056. return nil, fmt.Errorf("Improperly formatted response from prometheus, response has no data field")
  1057. }
  1058. r, ok := data.(map[string]interface{})["result"]
  1059. if !ok {
  1060. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  1061. }
  1062. results, ok := r.([]interface{})
  1063. if !ok {
  1064. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  1065. }
  1066. containerData := make(map[string][]*Vector)
  1067. for _, val := range results {
  1068. metric, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
  1069. if !ok {
  1070. return nil, fmt.Errorf("Prometheus vector does not have metric labels")
  1071. }
  1072. containerMetric, err := newContainerMetricFromPrometheus(metric)
  1073. if err != nil {
  1074. return nil, err
  1075. }
  1076. vs, ok := val.(map[string]interface{})["values"]
  1077. if !ok {
  1078. return nil, fmt.Errorf("Improperly formatted results from prometheus, values is not a field in the vector")
  1079. }
  1080. values, ok := vs.([]interface{})
  1081. if !ok {
  1082. return nil, fmt.Errorf("Improperly formatted results from prometheus, values is not a slice")
  1083. }
  1084. var vectors []*Vector
  1085. for _, value := range values {
  1086. dataPoint, ok := value.([]interface{})
  1087. if !ok || len(dataPoint) != 2 {
  1088. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1089. }
  1090. strVal := dataPoint[1].(string)
  1091. v, _ := strconv.ParseFloat(strVal, 64)
  1092. if normalize && normalizationValue != 0 {
  1093. v = v / normalizationValue
  1094. }
  1095. vectors = append(vectors, &Vector{
  1096. Timestamp: dataPoint[0].(float64),
  1097. Value: v,
  1098. })
  1099. }
  1100. containerData[containerMetric.Key()] = vectors
  1101. }
  1102. return containerData, nil
  1103. }