costmodel.go 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "net/http"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "time"
  12. costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
  13. prometheusClient "github.com/prometheus/client_golang/api"
  14. v1 "k8s.io/api/core/v1"
  15. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  16. "k8s.io/apimachinery/pkg/labels"
  17. "k8s.io/client-go/kubernetes"
  18. "k8s.io/klog"
  19. )
  20. const (
  21. statusAPIError = 422
  22. apiPrefix = "/api/v1"
  23. epAlertManagers = apiPrefix + "/alertmanagers"
  24. epQuery = apiPrefix + "/query"
  25. epQueryRange = apiPrefix + "/query_range"
  26. epLabelValues = apiPrefix + "/label/:name/values"
  27. epSeries = apiPrefix + "/series"
  28. epTargets = apiPrefix + "/targets"
  29. epSnapshot = apiPrefix + "/admin/tsdb/snapshot"
  30. epDeleteSeries = apiPrefix + "/admin/tsdb/delete_series"
  31. epCleanTombstones = apiPrefix + "/admin/tsdb/clean_tombstones"
  32. epConfig = apiPrefix + "/status/config"
  33. epFlags = apiPrefix + "/status/flags"
  34. )
  35. type CostData struct {
  36. Name string `json:"name,omitempty"`
  37. PodName string `json:"podName,omitempty"`
  38. NodeName string `json:"nodeName,omitempty"`
  39. NodeData *costAnalyzerCloud.Node `json:"node,omitempty"`
  40. Namespace string `json:"namespace,omitempty"`
  41. Deployments []string `json:"deployments,omitempty"`
  42. Services []string `json:"services,omitempty"`
  43. Daemonsets []string `json:"daemonsets,omitempty"`
  44. Statefulsets []string `json:"statefulsets,omitempty"`
  45. Jobs []string `json:"jobs,omitempty"`
  46. RAMReq []*Vector `json:"ramreq,omitempty"`
  47. RAMUsed []*Vector `json:"ramused,omitempty"`
  48. CPUReq []*Vector `json:"cpureq,omitempty"`
  49. CPUUsed []*Vector `json:"cpuused,omitempty"`
  50. RAMAllocation []*Vector `json:"ramallocated,omitempty"`
  51. CPUAllocation []*Vector `json:"cpuallocated,omitempty"`
  52. GPUReq []*Vector `json:"gpureq,omitempty"`
  53. PVCData []*PersistentVolumeClaimData `json:"pvcData,omitempty"`
  54. Labels map[string]string `json:"labels,omitempty"`
  55. NamespaceLabels map[string]string `json:"namespaceLabels,omitempty"`
  56. }
  57. type Vector struct {
  58. Timestamp float64 `json:"timestamp"`
  59. Value float64 `json:"value"`
  60. }
  61. const (
  62. queryRAMRequestsStr = `avg(
  63. label_replace(
  64. label_replace(
  65. avg(
  66. count_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD", node!=""}[%s] %s)
  67. *
  68. avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD", node!=""}[%s] %s)
  69. ) by (namespace,container,pod,node) , "container_name","$1","container","(.+)"
  70. ), "pod_name","$1","pod","(.+)"
  71. )
  72. ) by (namespace,container_name,pod_name,node)`
  73. queryRAMUsageStr = `sort_desc(
  74. avg(
  75. label_replace(count_over_time(container_memory_working_set_bytes{container_name!="",container_name!="POD", instance!=""}[%s] %s), "node", "$1", "instance","(.+)")
  76. *
  77. label_replace(avg_over_time(container_memory_working_set_bytes{container_name!="",container_name!="POD", instance!=""}[%s] %s), "node", "$1", "instance","(.+)")
  78. ) by (namespace,container_name,pod_name,node)
  79. )`
  80. queryCPURequestsStr = `avg(
  81. label_replace(
  82. label_replace(
  83. avg(
  84. count_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD", node!=""}[%s] %s)
  85. *
  86. avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD", node!=""}[%s] %s)
  87. ) by (namespace,container,pod,node) , "container_name","$1","container","(.+)"
  88. ), "pod_name","$1","pod","(.+)"
  89. )
  90. ) by (namespace,container_name,pod_name,node)`
  91. queryCPUUsageStr = `avg(
  92. label_replace(
  93. rate(
  94. container_cpu_usage_seconds_total{container_name!="",container_name!="POD",instance!=""}[%s] %s
  95. ) , "node", "$1", "instance", "(.+)"
  96. )
  97. ) by (namespace,container_name,pod_name,node)`
  98. queryGPURequestsStr = `avg(
  99. label_replace(
  100. label_replace(
  101. avg(
  102. count_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s] %s)
  103. *
  104. avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s] %s)
  105. ) by (namespace,container,pod,node) , "container_name","$1","container","(.+)"
  106. ), "pod_name","$1","pod","(.+)"
  107. )
  108. ) by (namespace,container_name,pod_name,node)`
  109. queryPVRequestsStr = `avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace, volumename)
  110. *
  111. on (persistentvolumeclaim, namespace) group_right(storageclass, volumename)
  112. sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace)`
  113. normalizationStr = `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[%s] %s))`
  114. )
  115. type PrometheusMetadata struct {
  116. Running bool `json:"running"`
  117. KubecostDataExists bool `json:"kubecostDataExists"`
  118. }
  119. // ValidatePrometheus tells the model what data prometheus has on it.
  120. func ValidatePrometheus(cli prometheusClient.Client) (*PrometheusMetadata, error) {
  121. data, err := query(cli, "up")
  122. if err != nil {
  123. return &PrometheusMetadata{
  124. Running: false,
  125. KubecostDataExists: false,
  126. }, err
  127. }
  128. v, kcmetrics, err := getUptimeData(data)
  129. if err != nil {
  130. return &PrometheusMetadata{
  131. Running: false,
  132. KubecostDataExists: false,
  133. }, err
  134. }
  135. if len(v) > 0 {
  136. return &PrometheusMetadata{
  137. Running: true,
  138. KubecostDataExists: kcmetrics,
  139. }, nil
  140. } else {
  141. return &PrometheusMetadata{
  142. Running: false,
  143. KubecostDataExists: false,
  144. }, fmt.Errorf("No running jobs found on Prometheus at %s", cli.URL(epQuery, nil).Path)
  145. }
  146. }
  147. func getUptimeData(qr interface{}) ([]*Vector, bool, error) {
  148. data, ok := qr.(map[string]interface{})["data"]
  149. if !ok {
  150. e, err := wrapPrometheusError(qr)
  151. if err != nil {
  152. return nil, false, err
  153. }
  154. return nil, false, fmt.Errorf(e)
  155. }
  156. r, ok := data.(map[string]interface{})["result"]
  157. if !ok {
  158. return nil, false, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  159. }
  160. results, ok := r.([]interface{})
  161. if !ok {
  162. return nil, false, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  163. }
  164. jobData := []*Vector{}
  165. kubecostMetrics := false
  166. for _, val := range results {
  167. // For now, just do this for validation. TODO: This can be parsed to figure out the exact running jobs.
  168. metrics, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
  169. if !ok {
  170. return nil, false, fmt.Errorf("Prometheus vector does not have metric labels")
  171. }
  172. jobname, ok := metrics["job"]
  173. if !ok {
  174. return nil, false, fmt.Errorf("up query does not have job names")
  175. }
  176. if jobname == "kubecost" {
  177. kubecostMetrics = true
  178. }
  179. value, ok := val.(map[string]interface{})["value"]
  180. if !ok {
  181. return nil, false, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  182. }
  183. dataPoint, ok := value.([]interface{})
  184. if !ok || len(dataPoint) != 2 {
  185. return nil, false, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  186. }
  187. strVal := dataPoint[1].(string)
  188. v, _ := strconv.ParseFloat(strVal, 64)
  189. toReturn := &Vector{
  190. Timestamp: dataPoint[0].(float64),
  191. Value: v,
  192. }
  193. jobData = append(jobData, toReturn)
  194. }
  195. return jobData, kubecostMetrics, nil
  196. }
  197. func ComputeUptimes(cli prometheusClient.Client) (map[string]float64, error) {
  198. res, err := query(cli, `container_start_time_seconds{container_name != "POD",container_name != ""}`)
  199. if err != nil {
  200. return nil, err
  201. }
  202. vectors, err := getContainerMetricVector(res, false, 0)
  203. if err != nil {
  204. return nil, err
  205. }
  206. results := make(map[string]float64)
  207. for key, vector := range vectors {
  208. if err != nil {
  209. return nil, err
  210. }
  211. val := vector[0].Value
  212. uptime := time.Now().Sub(time.Unix(int64(val), 0)).Seconds()
  213. results[key] = uptime
  214. }
  215. return results, nil
  216. }
  217. func ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
  218. queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, window, offset, window, offset)
  219. queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset)
  220. queryCPURequests := fmt.Sprintf(queryCPURequestsStr, window, offset, window, offset)
  221. queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, window, offset)
  222. queryGPURequests := fmt.Sprintf(queryGPURequestsStr, window, offset, window, offset)
  223. queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
  224. normalization := fmt.Sprintf(normalizationStr, window, offset)
  225. resultRAMRequests, err := query(cli, queryRAMRequests)
  226. if err != nil {
  227. return nil, fmt.Errorf("Error fetching RAM requests: " + err.Error())
  228. }
  229. resultRAMUsage, err := query(cli, queryRAMUsage)
  230. if err != nil {
  231. return nil, fmt.Errorf("Error fetching RAM usage: " + err.Error())
  232. }
  233. resultCPURequests, err := query(cli, queryCPURequests)
  234. if err != nil {
  235. return nil, fmt.Errorf("Error fetching CPU requests: " + err.Error())
  236. }
  237. resultCPUUsage, err := query(cli, queryCPUUsage)
  238. if err != nil {
  239. return nil, fmt.Errorf("Error fetching CPUUsage requests: " + err.Error())
  240. }
  241. resultGPURequests, err := query(cli, queryGPURequests)
  242. if err != nil {
  243. return nil, fmt.Errorf("Error fetching GPU requests: " + err.Error())
  244. }
  245. resultPVRequests, err := query(cli, queryPVRequests)
  246. if err != nil {
  247. return nil, fmt.Errorf("Error fetching PV requests: " + err.Error())
  248. }
  249. normalizationResult, err := query(cli, normalization)
  250. if err != nil {
  251. return nil, fmt.Errorf("Error fetching normalization data: " + err.Error())
  252. }
  253. normalizationValue, err := getNormalization(normalizationResult)
  254. if err != nil {
  255. return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
  256. }
  257. nodes, err := getNodeCost(clientset, cloud)
  258. if err != nil {
  259. klog.V(1).Infof("Warning, no Node cost model available: " + err.Error())
  260. return nil, err
  261. }
  262. podlist, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
  263. if err != nil {
  264. return nil, err
  265. }
  266. podDeploymentsMapping, err := getPodDeployments(clientset, podlist)
  267. if err != nil {
  268. return nil, err
  269. }
  270. podServicesMapping, err := getPodServices(clientset, podlist)
  271. if err != nil {
  272. return nil, err
  273. }
  274. namespaceLabelsMapping, err := getNamespaceLabels(clientset)
  275. if err != nil {
  276. return nil, err
  277. }
  278. pvClaimMapping, err := getPVInfoVector(resultPVRequests)
  279. if err != nil {
  280. klog.Infof("Unable to get PV Data: %s", err.Error())
  281. }
  282. if pvClaimMapping != nil {
  283. err = addPVData(clientset, pvClaimMapping, cloud)
  284. if err != nil {
  285. return nil, err
  286. }
  287. }
  288. err = addPVData(clientset, pvClaimMapping, cloud)
  289. if err != nil {
  290. return nil, err
  291. }
  292. containerNameCost := make(map[string]*CostData)
  293. containers := make(map[string]bool)
  294. RAMReqMap, err := getContainerMetricVector(resultRAMRequests, true, normalizationValue)
  295. if err != nil {
  296. return nil, err
  297. }
  298. for key := range RAMReqMap {
  299. containers[key] = true
  300. }
  301. RAMUsedMap, err := getContainerMetricVector(resultRAMUsage, true, normalizationValue)
  302. if err != nil {
  303. return nil, err
  304. }
  305. for key := range RAMUsedMap {
  306. containers[key] = true
  307. }
  308. CPUReqMap, err := getContainerMetricVector(resultCPURequests, true, normalizationValue)
  309. if err != nil {
  310. return nil, err
  311. }
  312. for key := range CPUReqMap {
  313. containers[key] = true
  314. }
  315. GPUReqMap, err := getContainerMetricVector(resultGPURequests, true, normalizationValue)
  316. if err != nil {
  317. return nil, err
  318. }
  319. for key := range GPUReqMap {
  320. containers[key] = true
  321. }
  322. CPUUsedMap, err := getContainerMetricVector(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
  323. if err != nil {
  324. return nil, err
  325. }
  326. for key := range CPUUsedMap {
  327. containers[key] = true
  328. }
  329. currentContainers := make(map[string]v1.Pod)
  330. for _, pod := range podlist.Items {
  331. if pod.Status.Phase != "Running" {
  332. continue
  333. }
  334. cs, err := newContainerMetricsFromPod(pod)
  335. if err != nil {
  336. return nil, err
  337. }
  338. for _, c := range cs {
  339. containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
  340. currentContainers[c.Key()] = pod
  341. }
  342. }
  343. missingNodes := make(map[string]*costAnalyzerCloud.Node)
  344. for key := range containers {
  345. if _, ok := containerNameCost[key]; ok {
  346. continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
  347. }
  348. if pod, ok := currentContainers[key]; ok {
  349. podName := pod.GetObjectMeta().GetName()
  350. ns := pod.GetObjectMeta().GetNamespace()
  351. nsLabels := namespaceLabelsMapping[ns]
  352. podLabels := pod.GetObjectMeta().GetLabels()
  353. nodeName := pod.Spec.NodeName
  354. var nodeData *costAnalyzerCloud.Node
  355. if _, ok := nodes[nodeName]; ok {
  356. nodeData = nodes[nodeName]
  357. }
  358. var podDeployments []string
  359. if _, ok := podDeploymentsMapping[ns]; ok {
  360. if ds, ok := podDeploymentsMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  361. podDeployments = ds
  362. } else {
  363. podDeployments = []string{}
  364. }
  365. }
  366. var podPVs []*PersistentVolumeClaimData
  367. podClaims := pod.Spec.Volumes
  368. for _, vol := range podClaims {
  369. if vol.PersistentVolumeClaim != nil {
  370. name := vol.PersistentVolumeClaim.ClaimName
  371. if pvClaim, ok := pvClaimMapping[ns+","+name]; ok {
  372. podPVs = append(podPVs, pvClaim)
  373. }
  374. }
  375. }
  376. var podServices []string
  377. if _, ok := podServicesMapping[ns]; ok {
  378. if svcs, ok := podServicesMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  379. podServices = svcs
  380. } else {
  381. podServices = []string{}
  382. }
  383. }
  384. for i, container := range pod.Spec.Containers {
  385. containerName := container.Name
  386. // recreate the key and look up data for this container
  387. newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName).Key()
  388. RAMReqV, ok := RAMReqMap[newKey]
  389. if !ok {
  390. klog.V(4).Info("no RAM requests for " + newKey)
  391. RAMReqV = []*Vector{&Vector{}}
  392. }
  393. RAMUsedV, ok := RAMUsedMap[newKey]
  394. if !ok {
  395. klog.V(4).Info("no RAM usage for " + newKey)
  396. RAMUsedV = []*Vector{&Vector{}}
  397. }
  398. CPUReqV, ok := CPUReqMap[newKey]
  399. if !ok {
  400. klog.V(4).Info("no CPU requests for " + newKey)
  401. CPUReqV = []*Vector{&Vector{}}
  402. }
  403. GPUReqV, ok := GPUReqMap[newKey]
  404. if !ok {
  405. klog.V(4).Info("no GPU requests for " + newKey)
  406. GPUReqV = []*Vector{&Vector{}}
  407. }
  408. CPUUsedV, ok := CPUUsedMap[newKey]
  409. if !ok {
  410. klog.V(4).Info("no CPU usage for " + newKey)
  411. CPUUsedV = []*Vector{&Vector{}}
  412. }
  413. var pvReq []*PersistentVolumeClaimData
  414. if i == 0 { // avoid duplicating by just assigning all claims to the first container.
  415. pvReq = podPVs
  416. }
  417. costs := &CostData{
  418. Name: containerName,
  419. PodName: podName,
  420. NodeName: nodeName,
  421. Namespace: ns,
  422. Deployments: podDeployments,
  423. Services: podServices,
  424. Daemonsets: getDaemonsetsOfPod(pod),
  425. Jobs: getJobsOfPod(pod),
  426. Statefulsets: getStatefulSetsOfPod(pod),
  427. NodeData: nodeData,
  428. RAMReq: RAMReqV,
  429. RAMUsed: RAMUsedV,
  430. CPUReq: CPUReqV,
  431. CPUUsed: CPUUsedV,
  432. GPUReq: GPUReqV,
  433. PVCData: pvReq,
  434. Labels: podLabels,
  435. NamespaceLabels: nsLabels,
  436. }
  437. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  438. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  439. if filterNamespace == "" {
  440. containerNameCost[newKey] = costs
  441. } else if costs.Namespace == filterNamespace {
  442. containerNameCost[newKey] = costs
  443. }
  444. }
  445. } else {
  446. // The container has been deleted. Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
  447. klog.V(4).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
  448. c, err := NewContainerMetricFromKey(key)
  449. if err != nil {
  450. return nil, err
  451. }
  452. RAMReqV, ok := RAMReqMap[key]
  453. if !ok {
  454. klog.V(4).Info("no RAM requests for " + key)
  455. RAMReqV = []*Vector{&Vector{}}
  456. }
  457. RAMUsedV, ok := RAMUsedMap[key]
  458. if !ok {
  459. klog.V(4).Info("no RAM usage for " + key)
  460. RAMUsedV = []*Vector{&Vector{}}
  461. }
  462. CPUReqV, ok := CPUReqMap[key]
  463. if !ok {
  464. klog.V(4).Info("no CPU requests for " + key)
  465. CPUReqV = []*Vector{&Vector{}}
  466. }
  467. GPUReqV, ok := GPUReqMap[key]
  468. if !ok {
  469. klog.V(4).Info("no GPU requests for " + key)
  470. GPUReqV = []*Vector{&Vector{}}
  471. }
  472. CPUUsedV, ok := CPUUsedMap[key]
  473. if !ok {
  474. klog.V(4).Info("no CPU usage for " + key)
  475. CPUUsedV = []*Vector{&Vector{}}
  476. }
  477. node, ok := nodes[c.NodeName]
  478. if !ok {
  479. klog.V(2).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
  480. if n, ok := missingNodes[c.NodeName]; ok {
  481. node = n
  482. } else {
  483. node = &costAnalyzerCloud.Node{}
  484. missingNodes[c.NodeName] = node
  485. }
  486. }
  487. costs := &CostData{
  488. Name: c.ContainerName,
  489. PodName: c.PodName,
  490. NodeName: c.NodeName,
  491. NodeData: node,
  492. Namespace: c.Namespace,
  493. RAMReq: RAMReqV,
  494. RAMUsed: RAMUsedV,
  495. CPUReq: CPUReqV,
  496. CPUUsed: CPUUsedV,
  497. GPUReq: GPUReqV,
  498. }
  499. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  500. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  501. if filterNamespace == "" {
  502. containerNameCost[key] = costs
  503. } else if costs.Namespace == filterNamespace {
  504. containerNameCost[key] = costs
  505. }
  506. }
  507. }
  508. err = findDeletedNodeInfo(cli, missingNodes, window)
  509. if err != nil {
  510. return nil, err
  511. }
  512. return containerNameCost, err
  513. }
  514. func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*costAnalyzerCloud.Node, window string) error {
  515. if len(missingNodes) > 0 {
  516. q := make([]string, 0, len(missingNodes))
  517. for nodename := range missingNodes {
  518. klog.V(3).Infof("Finding data for deleted node %v", nodename)
  519. q = append(q, nodename)
  520. }
  521. l := strings.Join(q, "|")
  522. queryHistoricalCPUCost := fmt.Sprintf(`avg_over_time(node_cpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
  523. queryHistoricalRAMCost := fmt.Sprintf(`avg_over_time(node_ram_hourly_cost{instance=~"%s"}[%s])`, l, window)
  524. queryHistoricalGPUCost := fmt.Sprintf(`avg_over_time(node_gpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
  525. cpuCostResult, err := query(cli, queryHistoricalCPUCost)
  526. if err != nil {
  527. return fmt.Errorf("Error fetching cpu cost data: " + err.Error())
  528. }
  529. ramCostResult, err := query(cli, queryHistoricalRAMCost)
  530. if err != nil {
  531. return fmt.Errorf("Error fetching ram cost data: " + err.Error())
  532. }
  533. gpuCostResult, err := query(cli, queryHistoricalGPUCost)
  534. if err != nil {
  535. return fmt.Errorf("Error fetching gpu cost data: " + err.Error())
  536. }
  537. cpuCosts, err := getCost(cpuCostResult)
  538. if err != nil {
  539. return err
  540. }
  541. ramCosts, err := getCost(ramCostResult)
  542. if err != nil {
  543. return err
  544. }
  545. gpuCosts, err := getCost(gpuCostResult)
  546. if err != nil {
  547. return err
  548. }
  549. if len(cpuCosts) == 0 {
  550. klog.V(1).Infof("Historical data for node prices not available. Ingest this server's /metrics endpoint to get that data.")
  551. }
  552. for node, costv := range cpuCosts {
  553. if _, ok := missingNodes[node]; ok {
  554. missingNodes[node].VCPUCost = fmt.Sprintf("%f", costv[0].Value)
  555. }
  556. }
  557. for node, costv := range ramCosts {
  558. if _, ok := missingNodes[node]; ok {
  559. missingNodes[node].RAMCost = fmt.Sprintf("%f", costv[0].Value)
  560. }
  561. }
  562. for node, costv := range gpuCosts {
  563. if _, ok := missingNodes[node]; ok {
  564. missingNodes[node].GPUCost = fmt.Sprintf("%f", costv[0].Value)
  565. }
  566. }
  567. }
  568. return nil
  569. }
  570. func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
  571. if req == nil || len(req) == 0 {
  572. for _, usedV := range used {
  573. if usedV.Timestamp == 0 {
  574. continue
  575. }
  576. usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
  577. }
  578. return used
  579. }
  580. if used == nil || len(used) == 0 {
  581. for _, reqV := range req {
  582. if reqV.Timestamp == 0 {
  583. continue
  584. }
  585. reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
  586. }
  587. return req
  588. }
  589. var allocation []*Vector
  590. var timestamps []float64
  591. reqMap := make(map[float64]float64)
  592. for _, reqV := range req {
  593. if reqV.Timestamp == 0 {
  594. continue
  595. }
  596. reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
  597. reqMap[reqV.Timestamp] = reqV.Value
  598. timestamps = append(timestamps, reqV.Timestamp)
  599. }
  600. usedMap := make(map[float64]float64)
  601. for _, usedV := range used {
  602. if usedV.Timestamp == 0 {
  603. continue
  604. }
  605. usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
  606. usedMap[usedV.Timestamp] = usedV.Value
  607. if _, ok := reqMap[usedV.Timestamp]; !ok { // no need to double add, since we'll range over sorted timestamps and check.
  608. timestamps = append(timestamps, usedV.Timestamp)
  609. }
  610. }
  611. sort.Float64s(timestamps)
  612. for _, t := range timestamps {
  613. rv, okR := reqMap[t]
  614. uv, okU := usedMap[t]
  615. allocationVector := &Vector{
  616. Timestamp: t,
  617. }
  618. if okR && okU {
  619. allocationVector.Value = math.Max(rv, uv)
  620. } else if okR {
  621. allocationVector.Value = rv
  622. } else if okU {
  623. allocationVector.Value = uv
  624. }
  625. allocation = append(allocation, allocationVector)
  626. }
  627. return allocation
  628. }
  629. func addPVData(clientset kubernetes.Interface, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
  630. storageClasses, err := clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
  631. if err != nil {
  632. return err
  633. }
  634. storageClassMap := make(map[string]map[string]string)
  635. for _, storageClass := range storageClasses.Items {
  636. params := storageClass.Parameters
  637. storageClassMap[storageClass.ObjectMeta.Name] = params
  638. }
  639. pvs, err := clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
  640. if err != nil {
  641. return err
  642. }
  643. pvMap := make(map[string]*costAnalyzerCloud.PV)
  644. for _, pv := range pvs.Items {
  645. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  646. if !ok {
  647. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  648. }
  649. cacPv := &costAnalyzerCloud.PV{
  650. Class: pv.Spec.StorageClassName,
  651. Region: pv.Labels[v1.LabelZoneRegion],
  652. Parameters: parameters,
  653. }
  654. err := GetPVCost(cacPv, &pv, cloud)
  655. if err != nil {
  656. return err
  657. }
  658. pvMap[pv.Name] = cacPv
  659. }
  660. for _, pvc := range pvClaimMapping {
  661. pvc.Volume = pvMap[pvc.VolumeName]
  662. }
  663. return nil
  664. }
  665. func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cloud costAnalyzerCloud.Provider) error {
  666. cfg, err := cloud.GetConfig()
  667. key := cloud.GetPVKey(kpv, pv.Parameters)
  668. pvWithCost, err := cloud.PVPricing(key)
  669. if err != nil {
  670. return err
  671. }
  672. if pvWithCost == nil || pvWithCost.Cost == "" {
  673. pv.Cost = cfg.Storage
  674. return nil // set default cost
  675. }
  676. pv.Cost = pvWithCost.Cost
  677. return nil
  678. }
  679. func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
  680. cfg, err := cloud.GetConfig()
  681. if err != nil {
  682. return nil, err
  683. }
  684. nodeList, err := clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  685. if err != nil {
  686. return nil, err
  687. }
  688. nodes := make(map[string]*costAnalyzerCloud.Node)
  689. for _, n := range nodeList.Items {
  690. name := n.GetObjectMeta().GetName()
  691. nodeLabels := n.GetObjectMeta().GetLabels()
  692. nodeLabels["providerID"] = n.Spec.ProviderID
  693. cnode, err := cloud.NodePricing(cloud.GetKey(nodeLabels))
  694. if err != nil {
  695. klog.V(1).Infof("Error getting node. Error: " + err.Error())
  696. nodes[name] = cnode
  697. continue
  698. }
  699. newCnode := *cnode
  700. var cpu float64
  701. if newCnode.VCPU == "" {
  702. cpu = float64(n.Status.Capacity.Cpu().Value())
  703. newCnode.VCPU = n.Status.Capacity.Cpu().String()
  704. } else {
  705. cpu, _ = strconv.ParseFloat(newCnode.VCPU, 64)
  706. }
  707. var ram float64
  708. if newCnode.RAM == "" {
  709. newCnode.RAM = n.Status.Capacity.Memory().String()
  710. }
  711. ram = float64(n.Status.Capacity.Memory().Value())
  712. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  713. if newCnode.GPU != "" && newCnode.GPUCost == "" { // We couldn't find a gpu cost, so fix cpu and ram, then accordingly
  714. klog.V(4).Infof("GPU without cost found for %s, calculating...", cloud.GetKey(nodeLabels).Features())
  715. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  716. if err != nil {
  717. klog.V(3).Infof("Could not parse default cpu price")
  718. return nil, err
  719. }
  720. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  721. if err != nil {
  722. klog.V(3).Infof("Could not parse default ram price")
  723. return nil, err
  724. }
  725. defaultGPU, err := strconv.ParseFloat(cfg.RAM, 64)
  726. if err != nil {
  727. klog.V(3).Infof("Could not parse default gpu price")
  728. return nil, err
  729. }
  730. cpuToRAMRatio := defaultCPU / defaultRAM
  731. gpuToRAMRatio := defaultGPU / defaultRAM
  732. ramGB := ram / 1024 / 1024 / 1024
  733. ramMultiple := gpuToRAMRatio + cpu*cpuToRAMRatio + ramGB
  734. var nodePrice float64
  735. if newCnode.Cost != "" {
  736. nodePrice, err = strconv.ParseFloat(newCnode.Cost, 64)
  737. if err != nil {
  738. klog.V(3).Infof("Could not parse total node price")
  739. return nil, err
  740. }
  741. } else {
  742. nodePrice, err = strconv.ParseFloat(newCnode.VCPUCost, 64) // all the price was allocated the the CPU
  743. if err != nil {
  744. klog.V(3).Infof("Could not parse node vcpu price")
  745. return nil, err
  746. }
  747. }
  748. ramPrice := (nodePrice / ramMultiple)
  749. cpuPrice := ramPrice * cpuToRAMRatio
  750. gpuPrice := ramPrice * gpuToRAMRatio
  751. newCnode.VCPUCost = fmt.Sprintf("%f", cpuPrice)
  752. newCnode.RAMCost = fmt.Sprintf("%f", ramPrice)
  753. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  754. newCnode.GPUCost = fmt.Sprintf("%f", gpuPrice)
  755. } else {
  756. if newCnode.RAMCost == "" { // We couldn't find a ramcost, so fix cpu and allocate ram accordingly
  757. klog.V(4).Infof("No RAM cost found for %s, calculating...", cloud.GetKey(nodeLabels).Features())
  758. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  759. if err != nil {
  760. klog.V(3).Infof("Could not parse default cpu price")
  761. return nil, err
  762. }
  763. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  764. if err != nil {
  765. klog.V(3).Infof("Could not parse default ram price")
  766. return nil, err
  767. }
  768. cpuToRAMRatio := defaultCPU / defaultRAM
  769. ramGB := ram / 1024 / 1024 / 1024
  770. ramMultiple := cpu*cpuToRAMRatio + ramGB
  771. var nodePrice float64
  772. if newCnode.Cost != "" {
  773. nodePrice, err = strconv.ParseFloat(newCnode.Cost, 64)
  774. if err != nil {
  775. klog.V(3).Infof("Could not parse total node price")
  776. return nil, err
  777. }
  778. } else {
  779. nodePrice, err = strconv.ParseFloat(newCnode.VCPUCost, 64) // all the price was allocated the the CPU
  780. if err != nil {
  781. klog.V(3).Infof("Could not parse node vcpu price")
  782. return nil, err
  783. }
  784. }
  785. ramPrice := (nodePrice / ramMultiple)
  786. cpuPrice := ramPrice * cpuToRAMRatio
  787. newCnode.VCPUCost = fmt.Sprintf("%f", cpuPrice)
  788. newCnode.RAMCost = fmt.Sprintf("%f", ramPrice)
  789. newCnode.RAMBytes = fmt.Sprintf("%f", ram)
  790. klog.V(4).Infof("Computed \"%s\" RAM Cost := %v", name, newCnode.RAMCost)
  791. }
  792. }
  793. nodes[name] = &newCnode
  794. }
  795. return nodes, nil
  796. }
  797. func getPodServices(clientset kubernetes.Interface, podList *v1.PodList) (map[string]map[string][]string, error) {
  798. servicesList, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
  799. if err != nil {
  800. return nil, err
  801. }
  802. podServicesMapping := make(map[string]map[string][]string)
  803. for _, service := range servicesList.Items {
  804. namespace := service.GetObjectMeta().GetNamespace()
  805. name := service.GetObjectMeta().GetName()
  806. if _, ok := podServicesMapping[namespace]; !ok {
  807. podServicesMapping[namespace] = make(map[string][]string)
  808. }
  809. s := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
  810. for _, pod := range podList.Items {
  811. labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
  812. if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
  813. services, ok := podServicesMapping[namespace][pod.GetObjectMeta().GetName()]
  814. if ok {
  815. podServicesMapping[namespace][pod.GetObjectMeta().GetName()] = append(services, name)
  816. } else {
  817. podServicesMapping[namespace][pod.GetObjectMeta().GetName()] = []string{name}
  818. }
  819. }
  820. }
  821. }
  822. return podServicesMapping, nil
  823. }
  824. func getPodDeployments(clientset kubernetes.Interface, podList *v1.PodList) (map[string]map[string][]string, error) {
  825. deploymentsList, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
  826. if err != nil {
  827. return nil, err
  828. }
  829. podDeploymentsMapping := make(map[string]map[string][]string) // namespace: podName: [deploymentNames]
  830. for _, deployment := range deploymentsList.Items {
  831. namespace := deployment.GetObjectMeta().GetNamespace()
  832. name := deployment.GetObjectMeta().GetName()
  833. if _, ok := podDeploymentsMapping[namespace]; !ok {
  834. podDeploymentsMapping[namespace] = make(map[string][]string)
  835. }
  836. s, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
  837. if err != nil {
  838. klog.V(2).Infof("Error doing deployment label conversion: " + err.Error())
  839. }
  840. for _, pod := range podList.Items {
  841. labelSet := labels.Set(pod.GetObjectMeta().GetLabels())
  842. if s.Matches(labelSet) && pod.GetObjectMeta().GetNamespace() == namespace {
  843. deployments, ok := podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()]
  844. if ok {
  845. podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()] = append(deployments, name)
  846. } else {
  847. podDeploymentsMapping[namespace][pod.GetObjectMeta().GetName()] = []string{name}
  848. }
  849. }
  850. }
  851. }
  852. return podDeploymentsMapping, nil
  853. }
  854. func ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider,
  855. startString, endString, windowString string, filterNamespace string) (map[string]*CostData, error) {
  856. queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
  857. queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
  858. queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
  859. queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
  860. queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "")
  861. queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
  862. normalization := fmt.Sprintf(normalizationStr, windowString, "")
  863. layout := "2006-01-02T15:04:05.000Z"
  864. start, err := time.Parse(layout, startString)
  865. if err != nil {
  866. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  867. return nil, err
  868. }
  869. end, err := time.Parse(layout, endString)
  870. if err != nil {
  871. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  872. return nil, err
  873. }
  874. window, err := time.ParseDuration(windowString)
  875. if err != nil {
  876. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  877. return nil, err
  878. }
  879. resultRAMRequests, err := queryRange(cli, queryRAMRequests, start, end, window)
  880. if err != nil {
  881. return nil, fmt.Errorf("Error fetching RAM requests: " + err.Error())
  882. }
  883. resultRAMUsage, err := queryRange(cli, queryRAMUsage, start, end, window)
  884. if err != nil {
  885. return nil, fmt.Errorf("Error fetching RAM usage: " + err.Error())
  886. }
  887. resultCPURequests, err := queryRange(cli, queryCPURequests, start, end, window)
  888. if err != nil {
  889. return nil, fmt.Errorf("Error fetching CPU requests: " + err.Error())
  890. }
  891. resultCPUUsage, err := queryRange(cli, queryCPUUsage, start, end, window)
  892. if err != nil {
  893. return nil, fmt.Errorf("Error fetching CPU usage: " + err.Error())
  894. }
  895. resultGPURequests, err := queryRange(cli, queryGPURequests, start, end, window)
  896. if err != nil {
  897. return nil, fmt.Errorf("Error fetching GPU requests: " + err.Error())
  898. }
  899. resultPVRequests, err := queryRange(cli, queryPVRequests, start, end, window)
  900. if err != nil {
  901. return nil, fmt.Errorf("Error fetching PV requests: " + err.Error())
  902. }
  903. normalizationResult, err := query(cli, normalization)
  904. if err != nil {
  905. return nil, fmt.Errorf("Error fetching normalization data: " + err.Error())
  906. }
  907. normalizationValue, err := getNormalization(normalizationResult)
  908. if err != nil {
  909. return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
  910. }
  911. nodes, err := getNodeCost(clientset, cloud)
  912. if err != nil {
  913. klog.V(1).Infof("Warning, no cost model available: " + err.Error())
  914. return nil, err
  915. }
  916. podlist, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
  917. if err != nil {
  918. return nil, err
  919. }
  920. podDeploymentsMapping, err := getPodDeployments(clientset, podlist)
  921. if err != nil {
  922. return nil, err
  923. }
  924. podServicesMapping, err := getPodServices(clientset, podlist)
  925. if err != nil {
  926. return nil, err
  927. }
  928. namespaceLabelsMapping, err := getNamespaceLabels(clientset)
  929. if err != nil {
  930. return nil, err
  931. }
  932. pvClaimMapping, err := getPVInfoVectors(resultPVRequests)
  933. if err != nil {
  934. // Just log for compatibility with KSM less than 1.6
  935. klog.Infof("Unable to get PV Data: %s", err.Error())
  936. }
  937. if pvClaimMapping != nil {
  938. err = addPVData(clientset, pvClaimMapping, cloud)
  939. if err != nil {
  940. return nil, err
  941. }
  942. } else {
  943. klog.Infof("WHY IS THIS NIL??")
  944. }
  945. containerNameCost := make(map[string]*CostData)
  946. containers := make(map[string]bool)
  947. RAMReqMap, err := getContainerMetricVectors(resultRAMRequests, true, normalizationValue)
  948. if err != nil {
  949. return nil, err
  950. }
  951. for key := range RAMReqMap {
  952. containers[key] = true
  953. }
  954. RAMUsedMap, err := getContainerMetricVectors(resultRAMUsage, true, normalizationValue)
  955. if err != nil {
  956. return nil, err
  957. }
  958. for key := range RAMUsedMap {
  959. containers[key] = true
  960. }
  961. CPUReqMap, err := getContainerMetricVectors(resultCPURequests, true, normalizationValue)
  962. if err != nil {
  963. return nil, err
  964. }
  965. for key := range CPUReqMap {
  966. containers[key] = true
  967. }
  968. GPUReqMap, err := getContainerMetricVectors(resultGPURequests, true, normalizationValue)
  969. if err != nil {
  970. return nil, err
  971. }
  972. for key := range GPUReqMap {
  973. containers[key] = true
  974. }
  975. CPUUsedMap, err := getContainerMetricVectors(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
  976. if err != nil {
  977. return nil, err
  978. }
  979. for key := range CPUUsedMap {
  980. containers[key] = true
  981. }
  982. currentContainers := make(map[string]v1.Pod)
  983. for _, pod := range podlist.Items {
  984. if pod.Status.Phase != "Running" {
  985. continue
  986. }
  987. cs, err := newContainerMetricsFromPod(pod)
  988. if err != nil {
  989. return nil, err
  990. }
  991. for _, c := range cs {
  992. containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
  993. currentContainers[c.Key()] = pod
  994. }
  995. }
  996. missingNodes := make(map[string]*costAnalyzerCloud.Node)
  997. for key := range containers {
  998. if _, ok := containerNameCost[key]; ok {
  999. continue // because ordering is important for the allocation model (all PV's applied to the first), just dedupe if it's already been added.
  1000. }
  1001. if pod, ok := currentContainers[key]; ok {
  1002. podName := pod.GetObjectMeta().GetName()
  1003. ns := pod.GetObjectMeta().GetNamespace()
  1004. podLabels := pod.GetObjectMeta().GetLabels()
  1005. nodeName := pod.Spec.NodeName
  1006. var nodeData *costAnalyzerCloud.Node
  1007. if _, ok := nodes[nodeName]; ok {
  1008. nodeData = nodes[nodeName]
  1009. }
  1010. var podDeployments []string
  1011. if _, ok := podDeploymentsMapping[ns]; ok {
  1012. if ds, ok := podDeploymentsMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  1013. podDeployments = ds
  1014. } else {
  1015. podDeployments = []string{}
  1016. }
  1017. }
  1018. var podPVs []*PersistentVolumeClaimData
  1019. podClaims := pod.Spec.Volumes
  1020. for _, vol := range podClaims {
  1021. if vol.PersistentVolumeClaim != nil {
  1022. name := vol.PersistentVolumeClaim.ClaimName
  1023. if pvClaim, ok := pvClaimMapping[ns+","+name]; ok {
  1024. podPVs = append(podPVs, pvClaim)
  1025. }
  1026. }
  1027. }
  1028. var podServices []string
  1029. if _, ok := podServicesMapping[ns]; ok {
  1030. if svcs, ok := podServicesMapping[ns][pod.GetObjectMeta().GetName()]; ok {
  1031. podServices = svcs
  1032. } else {
  1033. podServices = []string{}
  1034. }
  1035. }
  1036. nsLabels := namespaceLabelsMapping[ns]
  1037. for i, container := range pod.Spec.Containers {
  1038. containerName := container.Name
  1039. newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName).Key()
  1040. RAMReqV, ok := RAMReqMap[newKey]
  1041. if !ok {
  1042. klog.V(4).Info("no RAM requests for " + newKey)
  1043. RAMReqV = []*Vector{}
  1044. }
  1045. RAMUsedV, ok := RAMUsedMap[newKey]
  1046. if !ok {
  1047. klog.V(4).Info("no RAM usage for " + newKey)
  1048. RAMUsedV = []*Vector{}
  1049. }
  1050. CPUReqV, ok := CPUReqMap[newKey]
  1051. if !ok {
  1052. klog.V(4).Info("no CPU requests for " + newKey)
  1053. CPUReqV = []*Vector{}
  1054. }
  1055. GPUReqV, ok := GPUReqMap[newKey]
  1056. if !ok {
  1057. klog.V(4).Info("no GPU requests for " + newKey)
  1058. GPUReqV = []*Vector{}
  1059. }
  1060. CPUUsedV, ok := CPUUsedMap[newKey]
  1061. if !ok {
  1062. klog.V(4).Info("no CPU usage for " + newKey)
  1063. CPUUsedV = []*Vector{}
  1064. }
  1065. var pvReq []*PersistentVolumeClaimData
  1066. if i == 0 { // avoid duplicating by just assigning all claims to the first container.
  1067. pvReq = podPVs
  1068. }
  1069. costs := &CostData{
  1070. Name: containerName,
  1071. PodName: podName,
  1072. NodeName: nodeName,
  1073. Namespace: ns,
  1074. Deployments: podDeployments,
  1075. Services: podServices,
  1076. Daemonsets: getDaemonsetsOfPod(pod),
  1077. Jobs: getJobsOfPod(pod),
  1078. Statefulsets: getStatefulSetsOfPod(pod),
  1079. NodeData: nodeData,
  1080. RAMReq: RAMReqV,
  1081. RAMUsed: RAMUsedV,
  1082. CPUReq: CPUReqV,
  1083. CPUUsed: CPUUsedV,
  1084. GPUReq: GPUReqV,
  1085. PVCData: pvReq,
  1086. Labels: podLabels,
  1087. NamespaceLabels: nsLabels,
  1088. }
  1089. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  1090. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  1091. if filterNamespace == "" {
  1092. containerNameCost[newKey] = costs
  1093. } else if costs.Namespace == filterNamespace {
  1094. containerNameCost[newKey] = costs
  1095. }
  1096. }
  1097. } else {
  1098. // The container has been deleted. Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
  1099. klog.V(4).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
  1100. c, _ := NewContainerMetricFromKey(key)
  1101. RAMReqV, ok := RAMReqMap[key]
  1102. if !ok {
  1103. klog.V(4).Info("no RAM requests for " + key)
  1104. RAMReqV = []*Vector{}
  1105. }
  1106. RAMUsedV, ok := RAMUsedMap[key]
  1107. if !ok {
  1108. klog.V(4).Info("no RAM usage for " + key)
  1109. RAMUsedV = []*Vector{}
  1110. }
  1111. CPUReqV, ok := CPUReqMap[key]
  1112. if !ok {
  1113. klog.V(4).Info("no CPU requests for " + key)
  1114. CPUReqV = []*Vector{}
  1115. }
  1116. GPUReqV, ok := GPUReqMap[key]
  1117. if !ok {
  1118. klog.V(4).Info("no GPU requests for " + key)
  1119. GPUReqV = []*Vector{}
  1120. }
  1121. CPUUsedV, ok := CPUUsedMap[key]
  1122. if !ok {
  1123. klog.V(4).Info("no CPU usage for " + key)
  1124. CPUUsedV = []*Vector{}
  1125. }
  1126. node, ok := nodes[c.NodeName]
  1127. if !ok {
  1128. klog.V(2).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
  1129. if n, ok := missingNodes[c.NodeName]; ok {
  1130. node = n
  1131. } else {
  1132. node = &costAnalyzerCloud.Node{}
  1133. missingNodes[c.NodeName] = node
  1134. }
  1135. }
  1136. costs := &CostData{
  1137. Name: c.ContainerName,
  1138. PodName: c.PodName,
  1139. NodeName: c.NodeName,
  1140. NodeData: node,
  1141. Namespace: c.Namespace,
  1142. RAMReq: RAMReqV,
  1143. RAMUsed: RAMUsedV,
  1144. CPUReq: CPUReqV,
  1145. CPUUsed: CPUUsedV,
  1146. GPUReq: GPUReqV,
  1147. }
  1148. costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
  1149. costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
  1150. if filterNamespace == "" {
  1151. containerNameCost[key] = costs
  1152. } else if costs.Namespace == filterNamespace {
  1153. containerNameCost[key] = costs
  1154. }
  1155. }
  1156. }
  1157. w := end.Sub(start)
  1158. if w.Minutes() > 0 {
  1159. wStr := fmt.Sprintf("%dm", int(w.Minutes()))
  1160. err = findDeletedNodeInfo(cli, missingNodes, wStr)
  1161. if err != nil {
  1162. return nil, err
  1163. }
  1164. }
  1165. return containerNameCost, err
  1166. }
  1167. func getNamespaceLabels(clientset kubernetes.Interface) (map[string]map[string]string, error) {
  1168. nsToLabels := make(map[string]map[string]string)
  1169. nss, err := clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
  1170. if err != nil {
  1171. return nil, err
  1172. }
  1173. for _, ns := range nss.Items {
  1174. nsToLabels[ns.Name] = ns.Labels
  1175. }
  1176. return nsToLabels, nil
  1177. }
  1178. func getDaemonsetsOfPod(pod v1.Pod) []string {
  1179. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  1180. if ownerReference.Kind == "DaemonSet" {
  1181. return []string{ownerReference.Name}
  1182. }
  1183. }
  1184. return []string{}
  1185. }
  1186. func getJobsOfPod(pod v1.Pod) []string {
  1187. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  1188. if ownerReference.Kind == "Job" {
  1189. return []string{ownerReference.Name}
  1190. }
  1191. }
  1192. return []string{}
  1193. }
  1194. func getStatefulSetsOfPod(pod v1.Pod) []string {
  1195. for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
  1196. if ownerReference.Kind == "StatefulSet" {
  1197. return []string{ownerReference.Name}
  1198. }
  1199. }
  1200. return []string{}
  1201. }
  1202. type PersistentVolumeClaimData struct {
  1203. Class string `json:"class"`
  1204. Claim string `json:"claim"`
  1205. Namespace string `json:"namespace"`
  1206. VolumeName string `json:"volumeName"`
  1207. Volume *costAnalyzerCloud.PV `json:"persistentVolume"`
  1208. Values []*Vector `json:"values"`
  1209. }
  1210. func getCost(qr interface{}) (map[string][]*Vector, error) {
  1211. toReturn := make(map[string][]*Vector)
  1212. for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
  1213. metricInterface, ok := val.(map[string]interface{})["metric"]
  1214. if !ok {
  1215. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  1216. }
  1217. metricMap, ok := metricInterface.(map[string]interface{})
  1218. if !ok {
  1219. return nil, fmt.Errorf("Metric field is improperly formatted")
  1220. }
  1221. instance, ok := metricMap["instance"]
  1222. if !ok {
  1223. return nil, fmt.Errorf("Instance field does not exist in data result vector")
  1224. }
  1225. instanceStr, ok := instance.(string)
  1226. if !ok {
  1227. return nil, fmt.Errorf("Instance is improperly formatted")
  1228. }
  1229. dataPoint, ok := val.(map[string]interface{})["value"]
  1230. if !ok {
  1231. return nil, fmt.Errorf("Value field does not exist in data result vector")
  1232. }
  1233. value, ok := dataPoint.([]interface{})
  1234. if !ok || len(value) != 2 {
  1235. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1236. }
  1237. var vectors []*Vector
  1238. strVal := value[1].(string)
  1239. v, _ := strconv.ParseFloat(strVal, 64)
  1240. vectors = append(vectors, &Vector{
  1241. Timestamp: value[0].(float64),
  1242. Value: v,
  1243. })
  1244. toReturn[instanceStr] = vectors
  1245. }
  1246. return toReturn, nil
  1247. }
  1248. func getPVInfoVectors(qr interface{}) (map[string]*PersistentVolumeClaimData, error) {
  1249. pvmap := make(map[string]*PersistentVolumeClaimData)
  1250. data, ok := qr.(map[string]interface{})["data"]
  1251. if !ok {
  1252. e, err := wrapPrometheusError(qr)
  1253. if err != nil {
  1254. return nil, err
  1255. }
  1256. return nil, fmt.Errorf(e)
  1257. }
  1258. d, ok := data.(map[string]interface{})
  1259. if !ok {
  1260. return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
  1261. }
  1262. result, ok := d["result"]
  1263. if !ok {
  1264. return nil, fmt.Errorf("Result field not present in prometheus response")
  1265. }
  1266. results, ok := result.([]interface{})
  1267. if !ok {
  1268. return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
  1269. }
  1270. for _, val := range results {
  1271. metricInterface, ok := val.(map[string]interface{})["metric"]
  1272. if !ok {
  1273. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  1274. }
  1275. metricMap, ok := metricInterface.(map[string]interface{})
  1276. if !ok {
  1277. return nil, fmt.Errorf("Metric field is improperly formatted")
  1278. }
  1279. pvclaim, ok := metricMap["persistentvolumeclaim"]
  1280. if !ok {
  1281. return nil, fmt.Errorf("Claim field does not exist in data result vector")
  1282. }
  1283. pvclaimStr, ok := pvclaim.(string)
  1284. if !ok {
  1285. return nil, fmt.Errorf("Claim field improperly formatted")
  1286. }
  1287. pvnamespace, ok := metricMap["namespace"]
  1288. if !ok {
  1289. return nil, fmt.Errorf("Namespace field does not exist in data result vector")
  1290. }
  1291. pvnamespaceStr, ok := pvnamespace.(string)
  1292. if !ok {
  1293. return nil, fmt.Errorf("Namespace field improperly formatted")
  1294. }
  1295. pv, ok := metricMap["volumename"]
  1296. if !ok {
  1297. klog.V(3).Infof("Warning: Unfulfilled claim %s: volumename field does not exist in data result vector", pvclaimStr)
  1298. pv = ""
  1299. }
  1300. pvStr, ok := pv.(string)
  1301. if !ok {
  1302. return nil, fmt.Errorf("Volumename field improperly formatted")
  1303. }
  1304. pvclass, ok := metricMap["storageclass"]
  1305. if !ok { // TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
  1306. klog.V(2).Infof("Storage Class not found for claim \"%s/%s\".", pvnamespaceStr, pvclaimStr)
  1307. pvclass = ""
  1308. }
  1309. pvclassStr, ok := pvclass.(string)
  1310. if !ok {
  1311. return nil, fmt.Errorf("StorageClass field improperly formatted")
  1312. }
  1313. values, ok := val.(map[string]interface{})["values"].([]interface{})
  1314. if !ok {
  1315. return nil, fmt.Errorf("Values field is improperly formatted")
  1316. }
  1317. var vectors []*Vector
  1318. for _, value := range values {
  1319. dataPoint, ok := value.([]interface{})
  1320. if !ok || len(dataPoint) != 2 {
  1321. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1322. }
  1323. strVal := dataPoint[1].(string)
  1324. v, _ := strconv.ParseFloat(strVal, 64)
  1325. vectors = append(vectors, &Vector{
  1326. Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
  1327. Value: v,
  1328. })
  1329. }
  1330. key := pvnamespaceStr + "," + pvclaimStr
  1331. pvmap[key] = &PersistentVolumeClaimData{
  1332. Class: pvclassStr,
  1333. Claim: pvclaimStr,
  1334. Namespace: pvnamespaceStr,
  1335. VolumeName: pvStr,
  1336. Values: vectors,
  1337. }
  1338. }
  1339. return pvmap, nil
  1340. }
  1341. func getPVInfoVector(qr interface{}) (map[string]*PersistentVolumeClaimData, error) {
  1342. pvmap := make(map[string]*PersistentVolumeClaimData)
  1343. data, ok := qr.(map[string]interface{})["data"]
  1344. if !ok {
  1345. e, err := wrapPrometheusError(qr)
  1346. if err != nil {
  1347. return nil, err
  1348. }
  1349. return nil, fmt.Errorf(e)
  1350. }
  1351. d, ok := data.(map[string]interface{})
  1352. if !ok {
  1353. return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
  1354. }
  1355. result, ok := d["result"]
  1356. if !ok {
  1357. return nil, fmt.Errorf("Result field not present in prometheus response")
  1358. }
  1359. results, ok := result.([]interface{})
  1360. if !ok {
  1361. return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
  1362. }
  1363. for _, val := range results {
  1364. metricInterface, ok := val.(map[string]interface{})["metric"]
  1365. if !ok {
  1366. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  1367. }
  1368. metricMap, ok := metricInterface.(map[string]interface{})
  1369. if !ok {
  1370. return nil, fmt.Errorf("Metric field is improperly formatted")
  1371. }
  1372. pvclaim, ok := metricMap["persistentvolumeclaim"]
  1373. if !ok {
  1374. return nil, fmt.Errorf("Claim field does not exist in data result vector")
  1375. }
  1376. pvclaimStr, ok := pvclaim.(string)
  1377. if !ok {
  1378. return nil, fmt.Errorf("Claim field improperly formatted")
  1379. }
  1380. pvnamespace, ok := metricMap["namespace"]
  1381. if !ok {
  1382. return nil, fmt.Errorf("Namespace field does not exist in data result vector")
  1383. }
  1384. pvnamespaceStr, ok := pvnamespace.(string)
  1385. if !ok {
  1386. return nil, fmt.Errorf("Namespace field improperly formatted")
  1387. }
  1388. pv, ok := metricMap["volumename"]
  1389. if !ok {
  1390. klog.V(3).Infof("Warning: Unfulfilled claim %s: volumename field does not exist in data result vector", pvclaimStr)
  1391. pv = ""
  1392. }
  1393. pvStr, ok := pv.(string)
  1394. if !ok {
  1395. return nil, fmt.Errorf("Volumename field improperly formatted")
  1396. }
  1397. pvclass, ok := metricMap["storageclass"]
  1398. if !ok { // TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
  1399. klog.V(2).Infof("Storage Class not found for claim \"%s/%s\".", pvnamespaceStr, pvclaimStr)
  1400. pvclass = ""
  1401. }
  1402. pvclassStr, ok := pvclass.(string)
  1403. if !ok {
  1404. return nil, fmt.Errorf("StorageClass field improperly formatted")
  1405. }
  1406. dataPoint, ok := val.(map[string]interface{})["value"]
  1407. if !ok {
  1408. return nil, fmt.Errorf("Value field does not exist in data result vector")
  1409. }
  1410. value, ok := dataPoint.([]interface{})
  1411. if !ok || len(value) != 2 {
  1412. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1413. }
  1414. var vectors []*Vector
  1415. strVal := value[1].(string)
  1416. v, _ := strconv.ParseFloat(strVal, 64)
  1417. vectors = append(vectors, &Vector{
  1418. Timestamp: value[0].(float64),
  1419. Value: v,
  1420. })
  1421. key := pvnamespaceStr + "," + pvclaimStr
  1422. pvmap[key] = &PersistentVolumeClaimData{
  1423. Class: pvclassStr,
  1424. Claim: pvclaimStr,
  1425. Namespace: pvnamespaceStr,
  1426. VolumeName: pvStr,
  1427. Values: vectors,
  1428. }
  1429. }
  1430. return pvmap, nil
  1431. }
  1432. func queryRange(cli prometheusClient.Client, query string, start, end time.Time, step time.Duration) (interface{}, error) {
  1433. u := cli.URL(epQueryRange, nil)
  1434. q := u.Query()
  1435. q.Set("query", query)
  1436. q.Set("start", start.Format(time.RFC3339Nano))
  1437. q.Set("end", end.Format(time.RFC3339Nano))
  1438. q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
  1439. u.RawQuery = q.Encode()
  1440. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  1441. if err != nil {
  1442. return nil, err
  1443. }
  1444. _, body, err := cli.Do(context.Background(), req)
  1445. if err != nil {
  1446. klog.V(1).Infof("ERROR" + err.Error())
  1447. }
  1448. if err != nil {
  1449. return nil, err
  1450. }
  1451. var toReturn interface{}
  1452. err = json.Unmarshal(body, &toReturn)
  1453. if err != nil {
  1454. klog.V(1).Infof("ERROR" + err.Error())
  1455. }
  1456. return toReturn, err
  1457. }
  1458. func query(cli prometheusClient.Client, query string) (interface{}, error) {
  1459. u := cli.URL(epQuery, nil)
  1460. q := u.Query()
  1461. q.Set("query", query)
  1462. u.RawQuery = q.Encode()
  1463. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  1464. if err != nil {
  1465. return nil, err
  1466. }
  1467. _, body, err := cli.Do(context.Background(), req)
  1468. if err != nil {
  1469. return nil, err
  1470. }
  1471. var toReturn interface{}
  1472. err = json.Unmarshal(body, &toReturn)
  1473. if err != nil {
  1474. klog.V(1).Infof("ERROR" + err.Error())
  1475. }
  1476. return toReturn, err
  1477. }
  1478. //todo: don't cast, implement unmarshaler interface
  1479. func getNormalization(qr interface{}) (float64, error) {
  1480. data, ok := qr.(map[string]interface{})["data"]
  1481. if !ok {
  1482. e, err := wrapPrometheusError(qr)
  1483. if err != nil {
  1484. return 0, err
  1485. }
  1486. return 0, fmt.Errorf(e)
  1487. }
  1488. results, ok := data.(map[string]interface{})["result"].([]interface{})
  1489. if !ok {
  1490. return 0, fmt.Errorf("Result field not found in normalization response, aborting")
  1491. }
  1492. if len(results) > 0 {
  1493. dataPoint := results[0].(map[string]interface{})["value"].([]interface{})
  1494. if len(dataPoint) == 2 {
  1495. strNorm := dataPoint[1].(string)
  1496. val, _ := strconv.ParseFloat(strNorm, 64)
  1497. return val, nil
  1498. }
  1499. return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1500. }
  1501. return 0, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
  1502. }
  1503. type ContainerMetric struct {
  1504. Namespace string
  1505. PodName string
  1506. ContainerName string
  1507. NodeName string
  1508. }
  1509. func (c *ContainerMetric) Key() string {
  1510. return c.Namespace + "," + c.PodName + "," + c.ContainerName + "," + c.NodeName
  1511. }
  1512. func NewContainerMetricFromKey(key string) (*ContainerMetric, error) {
  1513. s := strings.Split(key, ",")
  1514. if len(s) == 4 {
  1515. return &ContainerMetric{
  1516. Namespace: s[0],
  1517. PodName: s[1],
  1518. ContainerName: s[2],
  1519. NodeName: s[3],
  1520. }, nil
  1521. }
  1522. return nil, fmt.Errorf("Not a valid key")
  1523. }
  1524. func newContainerMetricFromValues(ns string, podName string, containerName string, nodeName string) *ContainerMetric {
  1525. return &ContainerMetric{
  1526. Namespace: ns,
  1527. PodName: podName,
  1528. ContainerName: containerName,
  1529. NodeName: nodeName,
  1530. }
  1531. }
  1532. func newContainerMetricsFromPod(pod v1.Pod) ([]*ContainerMetric, error) {
  1533. podName := pod.GetObjectMeta().GetName()
  1534. ns := pod.GetObjectMeta().GetNamespace()
  1535. node := pod.Spec.NodeName
  1536. var cs []*ContainerMetric
  1537. for _, container := range pod.Spec.Containers {
  1538. containerName := container.Name
  1539. cs = append(cs, &ContainerMetric{
  1540. Namespace: ns,
  1541. PodName: podName,
  1542. ContainerName: containerName,
  1543. NodeName: node,
  1544. })
  1545. }
  1546. return cs, nil
  1547. }
  1548. func newContainerMetricFromPrometheus(metrics map[string]interface{}) (*ContainerMetric, error) {
  1549. cName, ok := metrics["container_name"]
  1550. if !ok {
  1551. return nil, fmt.Errorf("Prometheus vector does not have container name")
  1552. }
  1553. containerName, ok := cName.(string)
  1554. if !ok {
  1555. return nil, fmt.Errorf("Prometheus vector does not have string container name")
  1556. }
  1557. pName, ok := metrics["pod_name"]
  1558. if !ok {
  1559. return nil, fmt.Errorf("Prometheus vector does not have pod name")
  1560. }
  1561. podName, ok := pName.(string)
  1562. if !ok {
  1563. return nil, fmt.Errorf("Prometheus vector does not have string pod name")
  1564. }
  1565. ns, ok := metrics["namespace"]
  1566. if !ok {
  1567. return nil, fmt.Errorf("Prometheus vector does not have namespace")
  1568. }
  1569. namespace, ok := ns.(string)
  1570. if !ok {
  1571. return nil, fmt.Errorf("Prometheus vector does not have string namespace")
  1572. }
  1573. node, ok := metrics["node"]
  1574. if !ok {
  1575. klog.V(4).Info("Prometheus vector does not have node name")
  1576. node = ""
  1577. }
  1578. nodeName, ok := node.(string)
  1579. if !ok {
  1580. return nil, fmt.Errorf("Prometheus vector does not have string node")
  1581. }
  1582. return &ContainerMetric{
  1583. ContainerName: containerName,
  1584. PodName: podName,
  1585. Namespace: namespace,
  1586. NodeName: nodeName,
  1587. }, nil
  1588. }
  1589. func getContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
  1590. data, ok := qr.(map[string]interface{})["data"]
  1591. if !ok {
  1592. e, err := wrapPrometheusError(qr)
  1593. if err != nil {
  1594. return nil, err
  1595. }
  1596. return nil, fmt.Errorf(e)
  1597. }
  1598. r, ok := data.(map[string]interface{})["result"]
  1599. if !ok {
  1600. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  1601. }
  1602. results, ok := r.([]interface{})
  1603. if !ok {
  1604. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  1605. }
  1606. containerData := make(map[string][]*Vector)
  1607. for _, val := range results {
  1608. metric, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
  1609. if !ok {
  1610. return nil, fmt.Errorf("Prometheus vector does not have metric labels")
  1611. }
  1612. containerMetric, err := newContainerMetricFromPrometheus(metric)
  1613. if err != nil {
  1614. return nil, err
  1615. }
  1616. value, ok := val.(map[string]interface{})["value"]
  1617. if !ok {
  1618. return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
  1619. }
  1620. dataPoint, ok := value.([]interface{})
  1621. if !ok || len(dataPoint) != 2 {
  1622. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1623. }
  1624. strVal := dataPoint[1].(string)
  1625. v, _ := strconv.ParseFloat(strVal, 64)
  1626. if normalize && normalizationValue != 0 {
  1627. v = v / normalizationValue
  1628. }
  1629. toReturn := &Vector{
  1630. Timestamp: dataPoint[0].(float64),
  1631. Value: v,
  1632. }
  1633. klog.V(4).Info("key: " + containerMetric.Key())
  1634. containerData[containerMetric.Key()] = []*Vector{toReturn}
  1635. }
  1636. return containerData, nil
  1637. }
  1638. func getContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
  1639. data, ok := qr.(map[string]interface{})["data"]
  1640. if !ok {
  1641. e, err := wrapPrometheusError(qr)
  1642. if err != nil {
  1643. return nil, err
  1644. }
  1645. return nil, fmt.Errorf(e)
  1646. }
  1647. r, ok := data.(map[string]interface{})["result"]
  1648. if !ok {
  1649. return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
  1650. }
  1651. results, ok := r.([]interface{})
  1652. if !ok {
  1653. return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
  1654. }
  1655. containerData := make(map[string][]*Vector)
  1656. for _, val := range results {
  1657. metric, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
  1658. if !ok {
  1659. return nil, fmt.Errorf("Prometheus vector does not have metric labels")
  1660. }
  1661. containerMetric, err := newContainerMetricFromPrometheus(metric)
  1662. if err != nil {
  1663. return nil, err
  1664. }
  1665. vs, ok := val.(map[string]interface{})["values"]
  1666. if !ok {
  1667. return nil, fmt.Errorf("Improperly formatted results from prometheus, values is not a field in the vector")
  1668. }
  1669. values, ok := vs.([]interface{})
  1670. if !ok {
  1671. return nil, fmt.Errorf("Improperly formatted results from prometheus, values is not a slice")
  1672. }
  1673. var vectors []*Vector
  1674. for _, value := range values {
  1675. dataPoint, ok := value.([]interface{})
  1676. if !ok || len(dataPoint) != 2 {
  1677. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  1678. }
  1679. strVal := dataPoint[1].(string)
  1680. v, _ := strconv.ParseFloat(strVal, 64)
  1681. if normalize && normalizationValue != 0 {
  1682. v = v / normalizationValue
  1683. }
  1684. vectors = append(vectors, &Vector{
  1685. Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
  1686. Value: v,
  1687. })
  1688. }
  1689. containerData[containerMetric.Key()] = vectors
  1690. }
  1691. return containerData, nil
  1692. }
  1693. func wrapPrometheusError(qr interface{}) (string, error) {
  1694. e, ok := qr.(map[string]interface{})["error"]
  1695. if !ok {
  1696. return "", fmt.Errorf("Unexpected response from Prometheus")
  1697. }
  1698. eStr, ok := e.(string)
  1699. return eStr, nil
  1700. }