kubemodel.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589
  1. package kubemodel
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/model/kubemodel"
  8. "github.com/opencost/opencost/core/pkg/model/shared"
  9. "github.com/opencost/opencost/core/pkg/source"
  10. )
  11. const logTimeFmt string = "2006-01-02T15:04:05"
  12. type KubeModel struct {
  13. ds source.OpenCostDataSource
  14. clusterUID string
  15. }
  16. func NewKubeModel(clusterUID string, dataSource source.OpenCostDataSource) (*KubeModel, error) {
  17. if dataSource == nil {
  18. return nil, errors.New("OpenCostDataSource cannot be nil")
  19. }
  20. km := &KubeModel{
  21. ds: dataSource,
  22. clusterUID: clusterUID,
  23. }
  24. km.clusterUID = clusterUID
  25. log.Debugf("NewKubeModel(%s)", km.clusterUID)
  26. return km, nil
  27. }
  28. // ComputeKubeModel uses the CostModel instance to compute an KubeModelSet
  29. // for the window defined by the given start and end times. The KubeModels
  30. // returned are unaggregated (i.e. down to the container level).
  31. func (km *KubeModel) ComputeKubeModelSet(start, end time.Time) (*kubemodel.KubeModelSet, error) {
  32. // 1. Initialize new KubeModelSet for requested Window
  33. kms := kubemodel.NewKubeModelSet(start, end)
  34. // 2. Query CostModel for each set of objects
  35. var err error
  36. // 2.1 Compute Cluster
  37. err = km.computeCluster(kms, start, end)
  38. if err != nil {
  39. kms.Error(err)
  40. return kms, fmt.Errorf("error computing kubemodel.Cluster for (%s, %s): %w", start.Format(logTimeFmt), end.Format(logTimeFmt), err)
  41. }
  42. // 2.2 Compute Nodes
  43. err = km.computeNodes(kms, start, end)
  44. if err != nil {
  45. kms.Error(err)
  46. }
  47. // 2.3 Compute Namespaces
  48. err = km.computeNamespaces(kms, start, end)
  49. if err != nil {
  50. kms.Error(err)
  51. }
  52. // 2.5 Compute Pods
  53. err = km.computePods(kms, start, end)
  54. if err != nil {
  55. kms.Error(err)
  56. }
  57. // 2.6 Compute Deployments
  58. err = km.computeDeployments(kms, start, end)
  59. if err != nil {
  60. kms.Error(err)
  61. }
  62. // 2.7 Compute StatefulSets
  63. err = km.computeStatefulSets(kms, start, end)
  64. if err != nil {
  65. kms.Error(err)
  66. }
  67. // 2.8 Compute DaemonSets
  68. err = km.computeDaemonSets(kms, start, end)
  69. if err != nil {
  70. kms.Error(err)
  71. }
  72. // 2.9 Compute Jobs
  73. err = km.computeJobs(kms, start, end)
  74. if err != nil {
  75. kms.Error(err)
  76. }
  77. // 2.10 Compute CronJobs
  78. err = km.computeCronJobs(kms, start, end)
  79. if err != nil {
  80. kms.Error(err)
  81. }
  82. // 2.11 Compute ReplicaSets
  83. err = km.computeReplicaSets(kms, start, end)
  84. if err != nil {
  85. kms.Error(err)
  86. }
  87. // 2.12 Compute Containers
  88. err = km.computeContainers(kms, start, end)
  89. if err != nil {
  90. kms.Error(err)
  91. }
  92. // 2.13 Compute ResourceQuotas
  93. err = km.computeResourceQuotas(kms, start, end)
  94. if err != nil {
  95. kms.Error(err)
  96. }
  97. // 2.14 Compute Services
  98. err = km.computeServices(kms, start, end)
  99. if err != nil {
  100. kms.Error(err)
  101. }
  102. // 2.15 Compute PersistentVolumes
  103. err = km.computePersistentVolumes(kms, start, end)
  104. if err != nil {
  105. kms.Error(err)
  106. }
  107. // 2.16 Compute PersistentVolumeClaims
  108. err = km.computePersistentVolumeClaims(kms, start, end)
  109. if err != nil {
  110. kms.Error(err)
  111. }
  112. // 2.17 Compute DCGM Devices
  113. err = km.computeDCGMDevices(kms, start, end)
  114. if err != nil {
  115. kms.Error(err)
  116. }
  117. // 3. Mark KubeModelSet as completed
  118. kms.Metadata.CompletedAt = time.Now().UTC()
  119. return kms, nil
  120. }
  121. func (km *KubeModel) computeCluster(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  122. grp := source.NewQueryGroup()
  123. metrics := km.ds.Metrics()
  124. clusterInfoResultFuture := source.WithGroup(grp, metrics.QueryClusterInfo(start, end))
  125. clusterUptimeResultFuture := source.WithGroup(grp, metrics.QueryClusterUptime(start, end))
  126. clusterMap := make(map[string]*kubemodel.Cluster)
  127. clusterInfoResult, _ := clusterInfoResultFuture.Await()
  128. for _, res := range clusterInfoResult {
  129. clusterMap[res.UID] = &kubemodel.Cluster{
  130. UID: res.UID,
  131. Provider: shared.ParseProvider(res.Provider),
  132. Account: res.AccountID,
  133. Name: res.Cluster,
  134. Region: res.Region,
  135. }
  136. }
  137. clusterUptimeResult, _ := clusterUptimeResultFuture.Await()
  138. for _, res := range clusterUptimeResult {
  139. cluster, ok := clusterMap[res.UID]
  140. if !ok {
  141. log.Warnf("cluster with UID '%s' has not been initialized to add uptime", res.UID)
  142. continue
  143. }
  144. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  145. cluster.Start = s
  146. cluster.End = e
  147. }
  148. cluster, ok := clusterMap[km.clusterUID]
  149. if !ok {
  150. return fmt.Errorf("failed to compute cluster with UID '%s'", km.clusterUID)
  151. }
  152. kms.RegisterCluster(cluster)
  153. return nil
  154. }
  155. func (km *KubeModel) computeNodes(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  156. grp := source.NewQueryGroup()
  157. metrics := km.ds.Metrics()
  158. nodeInfoResultFuture := source.WithGroup(grp, metrics.QueryNodeInfo(start, end))
  159. nodeUptimeResultFuture := source.WithGroup(grp, metrics.QueryNodeUptime(start, end))
  160. nodeLabelsResultFuture := source.WithGroup(grp, metrics.QueryNodeLabels(start, end))
  161. nodeResourceCapacitiesFuture := source.WithGroup(grp, metrics.QueryNodeResourceCapacities(start, end))
  162. nodeResourcesAllocatableFuture := source.WithGroup(grp, metrics.QueryNodeResourcesAllocatable(start, end))
  163. localStorageBytesFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageBytes(start, end))
  164. localStorageUsedAvgFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageUsedAvg(start, end))
  165. localStorageUsedMaxFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageUsedMax(start, end))
  166. nodeMap := make(map[string]*kubemodel.Node)
  167. nodeInfoResult, _ := nodeInfoResultFuture.Await()
  168. for _, res := range nodeInfoResult {
  169. nodeMap[res.UID] = &kubemodel.Node{
  170. UID: res.UID,
  171. ProviderID: res.ProviderID,
  172. Name: res.Node,
  173. ResourceCapacities: make(kubemodel.ResourceQuantities),
  174. ResourcesAllocatable: make(kubemodel.ResourceQuantities),
  175. }
  176. }
  177. nodeUptimeResult, _ := nodeUptimeResultFuture.Await()
  178. for _, res := range nodeUptimeResult {
  179. node, ok := nodeMap[res.UID]
  180. if !ok {
  181. log.Warnf("node with UID '%s' has not been initialized to add uptime", res.UID)
  182. continue
  183. }
  184. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  185. node.Start = s
  186. node.End = e
  187. }
  188. nodeResourceCapacitiesResult, _ := nodeResourceCapacitiesFuture.Await()
  189. for _, res := range nodeResourceCapacitiesResult {
  190. node, ok := nodeMap[res.UID]
  191. if !ok {
  192. log.Warnf("node with UID '%s' has not been initialized to add resource capacities", res.UID)
  193. continue
  194. }
  195. resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
  196. node.ResourceCapacities.Set(resource, unit, kubemodel.StatAvg, value)
  197. }
  198. nodeResourcesAllocatableResult, _ := nodeResourcesAllocatableFuture.Await()
  199. for _, res := range nodeResourcesAllocatableResult {
  200. node, ok := nodeMap[res.UID]
  201. if !ok {
  202. log.Warnf("node with UID '%s' has not been initialized to add resources allocatable", res.UID)
  203. continue
  204. }
  205. resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
  206. node.ResourcesAllocatable.Set(resource, unit, kubemodel.StatAvg, value)
  207. }
  208. nodeLabelsResult, _ := nodeLabelsResultFuture.Await()
  209. for _, res := range nodeLabelsResult {
  210. node, ok := nodeMap[res.UID]
  211. if !ok {
  212. log.Warnf("node with UID '%s' has not been initialized to add labels", res.UID)
  213. continue
  214. }
  215. node.Labels = res.Labels
  216. }
  217. localStorageBytesResult, _ := localStorageBytesFuture.Await()
  218. for _, res := range localStorageBytesResult {
  219. node, ok := nodeMap[res.UID]
  220. if ok {
  221. node.FileSystem.CapacityBytes = res.Value
  222. }
  223. }
  224. localStorageUsedAvgResult, _ := localStorageUsedAvgFuture.Await()
  225. for _, res := range localStorageUsedAvgResult {
  226. node, ok := nodeMap[res.UID]
  227. if ok {
  228. node.FileSystem.UsageByteAvg = res.Value
  229. }
  230. }
  231. localStorageUsedMaxResult, _ := localStorageUsedMaxFuture.Await()
  232. for _, res := range localStorageUsedMaxResult {
  233. node, ok := nodeMap[res.UID]
  234. if ok {
  235. node.FileSystem.UsageByteMax = res.Value
  236. }
  237. }
  238. for _, node := range nodeMap {
  239. err := kms.RegisterNode(node)
  240. if err != nil {
  241. log.Warnf("Failed to register node: %s", err.Error())
  242. }
  243. }
  244. return nil
  245. }
  246. // resourceUnitValue converts prometheus resource/unit strings from ResourceResult
  247. // into kubemodel types, applying any necessary unit conversions.
  248. func resourceUnitValue(resource, unit string, value float64) (kubemodel.Resource, kubemodel.Unit, float64) {
  249. switch resource {
  250. case "cpu":
  251. return kubemodel.ResourceCPU, kubemodel.UnitCore, value
  252. case "memory":
  253. return kubemodel.ResourceMemory, kubemodel.UnitByte, value
  254. default:
  255. return kubemodel.Resource(resource), kubemodel.Unit(unit), value
  256. }
  257. }
  258. func (km *KubeModel) computeNamespaces(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  259. grp := source.NewQueryGroup()
  260. metrics := km.ds.Metrics()
  261. nsInfoResultFuture := source.WithGroup(grp, metrics.QueryNamespaceInfo(start, end))
  262. nsUptimeResultFuture := source.WithGroup(grp, metrics.QueryNamespaceUptime(start, end))
  263. nsLabelsResultFuture := source.WithGroup(grp, metrics.QueryNamespaceLabels(start, end))
  264. nsAnnosResultFuture := source.WithGroup(grp, metrics.QueryNamespaceAnnotations(start, end))
  265. nsMap := make(map[string]*kubemodel.Namespace)
  266. // Initialize namespaces from info
  267. nsInfoResult, _ := nsInfoResultFuture.Await()
  268. for _, res := range nsInfoResult {
  269. nsMap[res.UID] = &kubemodel.Namespace{
  270. UID: res.UID,
  271. Name: res.Namespace,
  272. }
  273. }
  274. nsUptimeResult, _ := nsUptimeResultFuture.Await()
  275. for _, res := range nsUptimeResult {
  276. ns, ok := nsMap[res.UID]
  277. if !ok {
  278. log.Warnf("namespace with UID '%s' has not been initialized to add uptime", res.UID)
  279. continue
  280. }
  281. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  282. ns.Start = s
  283. ns.End = e
  284. }
  285. nsLabelsResult, _ := nsLabelsResultFuture.Await()
  286. for _, res := range nsLabelsResult {
  287. ns, ok := nsMap[res.UID]
  288. if !ok {
  289. log.Warnf("namespace with UID '%s' has not been initialized to add labels", res.UID)
  290. continue
  291. }
  292. ns.Labels = res.Labels
  293. }
  294. nsAnnosResult, _ := nsAnnosResultFuture.Await()
  295. for _, res := range nsAnnosResult {
  296. ns, ok := nsMap[res.UID]
  297. if !ok {
  298. log.Warnf("namespace with UID '%s' has not been initialized to add annotations", res.UID)
  299. continue
  300. }
  301. ns.Annotations = res.Annotations
  302. }
  303. for _, namespace := range nsMap {
  304. err := kms.RegisterNamespace(namespace)
  305. if err != nil {
  306. log.Warnf("Failed to register namespace: %s", err.Error())
  307. }
  308. }
  309. return nil
  310. }
  311. func (km *KubeModel) computePods(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  312. grp := source.NewQueryGroup()
  313. metrics := km.ds.Metrics()
  314. podInfoResultFuture := source.WithGroup(grp, metrics.QueryPodInfo(start, end))
  315. podUptimeResultFuture := source.WithGroup(grp, metrics.QueryPodUptime(start, end))
  316. podOwnerResultFuture := source.WithGroup(grp, metrics.QueryPodOwners(start, end))
  317. podPVCVolumesResultFuture := source.WithGroup(grp, metrics.QueryPodPVCVolumes(start, end))
  318. podLabelsResultFuture := source.WithGroup(grp, metrics.QueryPodLabels(start, end))
  319. podAnnosResultFuture := source.WithGroup(grp, metrics.QueryPodAnnotations(start, end))
  320. podNetworkEgressBytesResultFuture := source.WithGroup(grp, metrics.QueryPodNetworkEgressBytes(start, end))
  321. podNetworkIngressBytesResultFuture := source.WithGroup(grp, metrics.QueryPodNetworkIngressBytes(start, end))
  322. podMap := make(map[string]*kubemodel.Pod)
  323. podInfoResult, _ := podInfoResultFuture.Await()
  324. for _, res := range podInfoResult {
  325. podMap[res.UID] = &kubemodel.Pod{
  326. UID: res.UID,
  327. Name: res.Pod,
  328. NamespaceUID: res.NamespaceUID,
  329. NodeUID: res.NodeUID,
  330. }
  331. }
  332. podUptimeResult, _ := podUptimeResultFuture.Await()
  333. for _, res := range podUptimeResult {
  334. pod, ok := podMap[res.UID]
  335. if !ok {
  336. log.Warnf("pod with UID '%s' has not been initialized to add uptime", res.UID)
  337. continue
  338. }
  339. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  340. pod.Start = s
  341. pod.End = e
  342. }
  343. podOwnersResult, _ := podOwnerResultFuture.Await()
  344. for _, res := range podOwnersResult {
  345. pod, ok := podMap[res.UID]
  346. if !ok {
  347. log.Warnf("pod with UID '%s' has not been initialized to add labels", res.UID)
  348. continue
  349. }
  350. pod.Owners = append(pod.Owners, kubemodel.Owner{
  351. UID: res.OwnerUID,
  352. Kind: kubemodel.ParseOwnerKind(res.OwnerKind),
  353. Controller: res.Controller,
  354. })
  355. }
  356. podPVCVolumesResult, _ := podPVCVolumesResultFuture.Await()
  357. for _, res := range podPVCVolumesResult {
  358. pod, ok := podMap[res.UID]
  359. if !ok {
  360. log.Warnf("pod with UID '%s' has not been initialized to add PVC volumes", res.UID)
  361. continue
  362. }
  363. pod.PVCVolumes = append(pod.PVCVolumes, kubemodel.PodPVCVolume{
  364. Name: res.PodVolumeName,
  365. PersistentVolumeClaimUID: res.PVCUID,
  366. })
  367. }
  368. podLabelsResult, _ := podLabelsResultFuture.Await()
  369. for _, res := range podLabelsResult {
  370. pod, ok := podMap[res.UID]
  371. if !ok {
  372. log.Warnf("pod with UID '%s' has not been initialized to add labels", res.UID)
  373. continue
  374. }
  375. pod.Labels = res.Labels
  376. }
  377. podAnnosResult, _ := podAnnosResultFuture.Await()
  378. for _, res := range podAnnosResult {
  379. pod, ok := podMap[res.UID]
  380. if !ok {
  381. log.Warnf("pod with UID '%s' has not been initialized to add annotations", res.UID)
  382. continue
  383. }
  384. pod.Annotations = res.Annotations
  385. }
  386. appendDetail := func(uid string, dir kubemodel.TrafficDirection, tt kubemodel.TrafficType, isNatGateway bool, endpoint string, bytes float64) {
  387. pod, ok := podMap[uid]
  388. if !ok || bytes <= 0 {
  389. return
  390. }
  391. pod.NetworkTrafficDetails = append(pod.NetworkTrafficDetails, kubemodel.NetworkTrafficDetail{
  392. PodUID: uid,
  393. TrafficDirection: dir,
  394. TrafficType: tt,
  395. IsNatGateway: isNatGateway,
  396. Endpoint: endpoint,
  397. Bytes: bytes,
  398. })
  399. }
  400. networkTrafficType := func(res *source.PodNetworkBytesResult) (kubemodel.TrafficType, bool) {
  401. if res.Internet {
  402. return kubemodel.TrafficTypeInternet, true
  403. }
  404. if !res.SameRegion {
  405. return kubemodel.TrafficTypeCrossRegion, true
  406. }
  407. if !res.SameZone {
  408. return kubemodel.TrafficTypeCrossZone, true
  409. }
  410. return "", false
  411. }
  412. podNetworkEgressResult, _ := podNetworkEgressBytesResultFuture.Await()
  413. for _, res := range podNetworkEgressResult {
  414. tt, ok := networkTrafficType(res)
  415. if !ok {
  416. continue
  417. }
  418. appendDetail(res.UID, kubemodel.TrafficDirectionEgress, tt, res.NatGateway, res.Service, res.Value)
  419. }
  420. podNetworkIngressResult, _ := podNetworkIngressBytesResultFuture.Await()
  421. for _, res := range podNetworkIngressResult {
  422. tt, ok := networkTrafficType(res)
  423. if !ok {
  424. continue
  425. }
  426. appendDetail(res.UID, kubemodel.TrafficDirectionIngress, tt, res.NatGateway, res.Service, res.Value)
  427. }
  428. for _, pod := range podMap {
  429. err := kms.RegisterPod(pod)
  430. if err != nil {
  431. log.Warnf("Failed to register pod: %s", err.Error())
  432. }
  433. }
  434. return nil
  435. }
  436. func (km *KubeModel) computeDeployments(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  437. grp := source.NewQueryGroup()
  438. metrics := km.ds.Metrics()
  439. deploymentInfoResultFuture := source.WithGroup(grp, metrics.QueryDeploymentInfo(start, end))
  440. deploymentUptimeResultFuture := source.WithGroup(grp, metrics.QueryDeploymentUptime(start, end))
  441. deploymentLabelsResultFuture := source.WithGroup(grp, metrics.QueryDeploymentLabels(start, end))
  442. deploymentAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryDeploymentAnnotations(start, end))
  443. deploymentMatchLabelsResultFuture := source.WithGroup(grp, metrics.QueryDeploymentMatchLabels(start, end))
  444. deploymentMap := make(map[string]*kubemodel.Deployment)
  445. deploymentInfoResult, _ := deploymentInfoResultFuture.Await()
  446. for _, res := range deploymentInfoResult {
  447. deploymentMap[res.UID] = &kubemodel.Deployment{
  448. UID: res.UID,
  449. Name: res.Deployment,
  450. NamespaceUID: res.NamespaceUID,
  451. }
  452. }
  453. deploymentUptimeResult, _ := deploymentUptimeResultFuture.Await()
  454. for _, res := range deploymentUptimeResult {
  455. deployment, ok := deploymentMap[res.UID]
  456. if !ok {
  457. log.Warnf("deployment with UID '%s' has not been initialized to add uptime", res.UID)
  458. continue
  459. }
  460. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  461. deployment.Start = s
  462. deployment.End = e
  463. }
  464. deploymentLabelsResult, _ := deploymentLabelsResultFuture.Await()
  465. for _, res := range deploymentLabelsResult {
  466. deployment, ok := deploymentMap[res.UID]
  467. if !ok {
  468. log.Warnf("deployment with UID '%s' has not been initialized to add labels", res.UID)
  469. continue
  470. }
  471. deployment.Labels = res.Labels
  472. }
  473. deploymentAnnotationsResult, _ := deploymentAnnotationsResultFuture.Await()
  474. for _, res := range deploymentAnnotationsResult {
  475. deployment, ok := deploymentMap[res.UID]
  476. if !ok {
  477. log.Warnf("deployment with UID '%s' has not been initialized to add annotations", res.UID)
  478. continue
  479. }
  480. deployment.Annotations = res.Annotations
  481. }
  482. deploymentMatchLabelsResult, _ := deploymentMatchLabelsResultFuture.Await()
  483. for _, res := range deploymentMatchLabelsResult {
  484. deployment, ok := deploymentMap[res.UID]
  485. if !ok {
  486. log.Warnf("deployment with UID '%s' has not been initialized to add match labels", res.UID)
  487. continue
  488. }
  489. deployment.MatchLabels = res.Labels
  490. }
  491. for _, deployment := range deploymentMap {
  492. err := kms.RegisterDeployment(deployment)
  493. if err != nil {
  494. log.Warnf("Failed to register deployment: %s", err.Error())
  495. }
  496. }
  497. return nil
  498. }
  499. func (km *KubeModel) computeStatefulSets(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  500. grp := source.NewQueryGroup()
  501. metrics := km.ds.Metrics()
  502. statefulSetInfoResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetInfo(start, end))
  503. statefulSetUptimeResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetUptime(start, end))
  504. statefulSetLabelsResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetLabels(start, end))
  505. statefulSetAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetAnnotations(start, end))
  506. statefulSetMatchLabelsResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetMatchLabels(start, end))
  507. statefulSetMap := make(map[string]*kubemodel.StatefulSet)
  508. statefulSetInfoResult, _ := statefulSetInfoResultFuture.Await()
  509. for _, res := range statefulSetInfoResult {
  510. statefulSetMap[res.UID] = &kubemodel.StatefulSet{
  511. UID: res.UID,
  512. Name: res.StatefulSet,
  513. NamespaceUID: res.NamespaceUID,
  514. }
  515. }
  516. statefulSetUptimeResult, _ := statefulSetUptimeResultFuture.Await()
  517. for _, res := range statefulSetUptimeResult {
  518. statefulSet, ok := statefulSetMap[res.UID]
  519. if !ok {
  520. log.Warnf("statefulset with UID '%s' has not been initialized to add uptime", res.UID)
  521. continue
  522. }
  523. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  524. statefulSet.Start = s
  525. statefulSet.End = e
  526. }
  527. statefulSetLabelsResult, _ := statefulSetLabelsResultFuture.Await()
  528. for _, res := range statefulSetLabelsResult {
  529. statefulSet, ok := statefulSetMap[res.UID]
  530. if !ok {
  531. log.Warnf("statefulset with UID '%s' has not been initialized to add labels", res.UID)
  532. continue
  533. }
  534. statefulSet.Labels = res.Labels
  535. }
  536. statefulSetAnnotationsResult, _ := statefulSetAnnotationsResultFuture.Await()
  537. for _, res := range statefulSetAnnotationsResult {
  538. statefulSet, ok := statefulSetMap[res.UID]
  539. if !ok {
  540. log.Warnf("statefulset with UID '%s' has not been initialized to add annotations", res.UID)
  541. continue
  542. }
  543. statefulSet.Annotations = res.Annotations
  544. }
  545. statefulSetMatchLabelsResult, _ := statefulSetMatchLabelsResultFuture.Await()
  546. for _, res := range statefulSetMatchLabelsResult {
  547. statefulSet, ok := statefulSetMap[res.UID]
  548. if !ok {
  549. log.Warnf("statefulset with UID '%s' has not been initialized to add match labels", res.UID)
  550. continue
  551. }
  552. statefulSet.MatchLabels = res.Labels
  553. }
  554. for _, statefulSet := range statefulSetMap {
  555. err := kms.RegisterStatefulSet(statefulSet)
  556. if err != nil {
  557. log.Warnf("Failed to register statefulset: %s", err.Error())
  558. }
  559. }
  560. return nil
  561. }
  562. func (km *KubeModel) computeDaemonSets(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  563. grp := source.NewQueryGroup()
  564. metrics := km.ds.Metrics()
  565. daemonSetInfoResultFuture := source.WithGroup(grp, metrics.QueryDaemonSetInfo(start, end))
  566. daemonSetUptimeResultFuture := source.WithGroup(grp, metrics.QueryDaemonSetUptime(start, end))
  567. daemonSetLabelsResultFuture := source.WithGroup(grp, metrics.QueryDaemonSetLabels(start, end))
  568. daemonSetAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryDaemonSetAnnotations(start, end))
  569. daemonSetMap := make(map[string]*kubemodel.DaemonSet)
  570. daemonSetInfoResult, _ := daemonSetInfoResultFuture.Await()
  571. for _, res := range daemonSetInfoResult {
  572. daemonSetMap[res.UID] = &kubemodel.DaemonSet{
  573. UID: res.UID,
  574. Name: res.DaemonSet,
  575. NamespaceUID: res.NamespaceUID,
  576. }
  577. }
  578. daemonSetUptimeResult, _ := daemonSetUptimeResultFuture.Await()
  579. for _, res := range daemonSetUptimeResult {
  580. daemonSet, ok := daemonSetMap[res.UID]
  581. if !ok {
  582. log.Warnf("daemonset with UID '%s' has not been initialized to add uptime", res.UID)
  583. continue
  584. }
  585. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  586. daemonSet.Start = s
  587. daemonSet.End = e
  588. }
  589. daemonSetLabelsResult, _ := daemonSetLabelsResultFuture.Await()
  590. for _, res := range daemonSetLabelsResult {
  591. daemonSet, ok := daemonSetMap[res.UID]
  592. if !ok {
  593. log.Warnf("daemonset with UID '%s' has not been initialized to add labels", res.UID)
  594. continue
  595. }
  596. daemonSet.Labels = res.Labels
  597. }
  598. daemonSetAnnotationsResult, _ := daemonSetAnnotationsResultFuture.Await()
  599. for _, res := range daemonSetAnnotationsResult {
  600. daemonSet, ok := daemonSetMap[res.UID]
  601. if !ok {
  602. log.Warnf("daemonset with UID '%s' has not been initialized to add annotations", res.UID)
  603. continue
  604. }
  605. daemonSet.Annotations = res.Annotations
  606. }
  607. for _, daemonSet := range daemonSetMap {
  608. err := kms.RegisterDaemonSet(daemonSet)
  609. if err != nil {
  610. log.Warnf("Failed to register daemonset: %s", err.Error())
  611. }
  612. }
  613. return nil
  614. }
  615. func (km *KubeModel) computeJobs(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  616. grp := source.NewQueryGroup()
  617. metrics := km.ds.Metrics()
  618. jobInfoResultFuture := source.WithGroup(grp, metrics.QueryJobInfo(start, end))
  619. jobUptimeResultFuture := source.WithGroup(grp, metrics.QueryJobUptime(start, end))
  620. jobLabelsResultFuture := source.WithGroup(grp, metrics.QueryJobLabels(start, end))
  621. jobAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryJobAnnotations(start, end))
  622. jobMap := make(map[string]*kubemodel.Job)
  623. jobInfoResult, _ := jobInfoResultFuture.Await()
  624. for _, res := range jobInfoResult {
  625. jobMap[res.UID] = &kubemodel.Job{
  626. UID: res.UID,
  627. Name: res.Job,
  628. NamespaceUID: res.NamespaceUID,
  629. }
  630. }
  631. jobUptimeResult, _ := jobUptimeResultFuture.Await()
  632. for _, res := range jobUptimeResult {
  633. job, ok := jobMap[res.UID]
  634. if !ok {
  635. log.Warnf("job with UID '%s' has not been initialized to add uptime", res.UID)
  636. continue
  637. }
  638. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  639. job.Start = s
  640. job.End = e
  641. }
  642. jobLabelsResult, _ := jobLabelsResultFuture.Await()
  643. for _, res := range jobLabelsResult {
  644. job, ok := jobMap[res.UID]
  645. if !ok {
  646. log.Warnf("job with UID '%s' has not been initialized to add labels", res.UID)
  647. continue
  648. }
  649. job.Labels = res.Labels
  650. }
  651. jobAnnotationsResult, _ := jobAnnotationsResultFuture.Await()
  652. for _, res := range jobAnnotationsResult {
  653. job, ok := jobMap[res.UID]
  654. if !ok {
  655. log.Warnf("job with UID '%s' has not been initialized to add annotations", res.UID)
  656. continue
  657. }
  658. job.Annotations = res.Annotations
  659. }
  660. for _, job := range jobMap {
  661. err := kms.RegisterJob(job)
  662. if err != nil {
  663. log.Warnf("Failed to register job: %s", err.Error())
  664. }
  665. }
  666. return nil
  667. }
  668. func (km *KubeModel) computeCronJobs(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  669. grp := source.NewQueryGroup()
  670. metrics := km.ds.Metrics()
  671. cronJobInfoResultFuture := source.WithGroup(grp, metrics.QueryCronJobInfo(start, end))
  672. cronJobUptimeResultFuture := source.WithGroup(grp, metrics.QueryCronJobUptime(start, end))
  673. cronJobLabelsResultFuture := source.WithGroup(grp, metrics.QueryCronJobLabels(start, end))
  674. cronJobAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryCronJobAnnotations(start, end))
  675. cronJobMap := make(map[string]*kubemodel.CronJob)
  676. cronJobInfoResult, _ := cronJobInfoResultFuture.Await()
  677. for _, res := range cronJobInfoResult {
  678. cronJobMap[res.UID] = &kubemodel.CronJob{
  679. UID: res.UID,
  680. Name: res.CronJob,
  681. NamespaceUID: res.NamespaceUID,
  682. }
  683. }
  684. cronJobUptimeResult, _ := cronJobUptimeResultFuture.Await()
  685. for _, res := range cronJobUptimeResult {
  686. cronJob, ok := cronJobMap[res.UID]
  687. if !ok {
  688. log.Warnf("cronjob with UID '%s' has not been initialized to add uptime", res.UID)
  689. continue
  690. }
  691. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  692. cronJob.Start = s
  693. cronJob.End = e
  694. }
  695. cronJobLabelsResult, _ := cronJobLabelsResultFuture.Await()
  696. for _, res := range cronJobLabelsResult {
  697. cronJob, ok := cronJobMap[res.UID]
  698. if !ok {
  699. log.Warnf("cronjob with UID '%s' has not been initialized to add labels", res.UID)
  700. continue
  701. }
  702. cronJob.Labels = res.Labels
  703. }
  704. cronJobAnnotationsResult, _ := cronJobAnnotationsResultFuture.Await()
  705. for _, res := range cronJobAnnotationsResult {
  706. cronJob, ok := cronJobMap[res.UID]
  707. if !ok {
  708. log.Warnf("cronjob with UID '%s' has not been initialized to add annotations", res.UID)
  709. continue
  710. }
  711. cronJob.Annotations = res.Annotations
  712. }
  713. for _, cronJob := range cronJobMap {
  714. err := kms.RegisterCronJob(cronJob)
  715. if err != nil {
  716. log.Warnf("Failed to register cronjob: %s", err.Error())
  717. }
  718. }
  719. return nil
  720. }
  721. func (km *KubeModel) computeReplicaSets(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  722. grp := source.NewQueryGroup()
  723. metrics := km.ds.Metrics()
  724. replicaSetInfoResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetInfo(start, end))
  725. replicaSetUptimeResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetUptime(start, end))
  726. replicaSetOwnerResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetOwners(start, end))
  727. replicaSetLabelsResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetLabels(start, end))
  728. replicaSetAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetAnnotations(start, end))
  729. replicaSetMap := make(map[string]*kubemodel.ReplicaSet)
  730. replicaSetInfoResult, _ := replicaSetInfoResultFuture.Await()
  731. for _, res := range replicaSetInfoResult {
  732. replicaSetMap[res.UID] = &kubemodel.ReplicaSet{
  733. UID: res.UID,
  734. Name: res.ReplicaSet,
  735. NamespaceUID: res.NamespaceUID,
  736. }
  737. }
  738. replicaSetUptimeResult, _ := replicaSetUptimeResultFuture.Await()
  739. for _, res := range replicaSetUptimeResult {
  740. replicaSet, ok := replicaSetMap[res.UID]
  741. if !ok {
  742. log.Warnf("replicaset with UID '%s' has not been initialized to add uptime", res.UID)
  743. continue
  744. }
  745. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  746. replicaSet.Start = s
  747. replicaSet.End = e
  748. }
  749. replicaSetOwnersResult, _ := replicaSetOwnerResultFuture.Await()
  750. for _, res := range replicaSetOwnersResult {
  751. replicaSet, ok := replicaSetMap[res.UID]
  752. if !ok {
  753. log.Warnf("replicaset with UID '%s' has not been initialized to add owner", res.UID)
  754. continue
  755. }
  756. replicaSet.Owners = append(replicaSet.Owners, kubemodel.Owner{
  757. UID: res.OwnerUID,
  758. Kind: kubemodel.ParseOwnerKind(res.OwnerKind),
  759. Controller: res.Controller,
  760. })
  761. }
  762. replicaSetLabelsResult, _ := replicaSetLabelsResultFuture.Await()
  763. for _, res := range replicaSetLabelsResult {
  764. replicaSet, ok := replicaSetMap[res.UID]
  765. if !ok {
  766. log.Warnf("replicaset with UID '%s' has not been initialized to add labels", res.UID)
  767. continue
  768. }
  769. replicaSet.Labels = res.Labels
  770. }
  771. replicaSetAnnotationsResult, _ := replicaSetAnnotationsResultFuture.Await()
  772. for _, res := range replicaSetAnnotationsResult {
  773. replicaSet, ok := replicaSetMap[res.UID]
  774. if !ok {
  775. log.Warnf("replicaset with UID '%s' has not been initialized to add annotations", res.UID)
  776. continue
  777. }
  778. replicaSet.Annotations = res.Annotations
  779. }
  780. for _, replicaSet := range replicaSetMap {
  781. err := kms.RegisterReplicaSet(replicaSet)
  782. if err != nil {
  783. log.Warnf("Failed to register replicaset: %s", err.Error())
  784. }
  785. }
  786. return nil
  787. }
  788. func (km *KubeModel) computeContainers(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  789. grp := source.NewQueryGroup()
  790. metrics := km.ds.Metrics()
  791. containerUptimeFuture := source.WithGroup(grp, metrics.QueryContainerUptime(start, end))
  792. containerResourceRequestsFuture := source.WithGroup(grp, metrics.QueryContainerResourceRequests(start, end))
  793. containerResourceLimitsFuture := source.WithGroup(grp, metrics.QueryContainerResourceLimits(start, end))
  794. cpuCoresAllocatedFuture := source.WithGroup(grp, metrics.QueryCPUCoresAllocated(start, end))
  795. cpuUsageAvgFuture := source.WithGroup(grp, metrics.QueryCPUUsageAvg(start, end))
  796. cpuUsageMaxFuture := source.WithGroup(grp, metrics.QueryCPUUsageMax(start, end))
  797. ramBytesAllocatedFuture := source.WithGroup(grp, metrics.QueryRAMBytesAllocated(start, end))
  798. ramUsageAvgFuture := source.WithGroup(grp, metrics.QueryRAMUsageAvg(start, end))
  799. ramUsageMaxFuture := source.WithGroup(grp, metrics.QueryRAMUsageMax(start, end))
  800. type containerKey struct {
  801. podUID string
  802. name string
  803. }
  804. containerMap := make(map[containerKey]*kubemodel.Container)
  805. containerUptimeResult, _ := containerUptimeFuture.Await()
  806. for _, res := range containerUptimeResult {
  807. key := containerKey{podUID: res.UID, name: res.Container}
  808. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  809. containerMap[key] = &kubemodel.Container{
  810. PodUID: res.UID,
  811. Name: res.Container,
  812. ResourceRequests: make(kubemodel.ResourceQuantities),
  813. ResourceLimits: make(kubemodel.ResourceQuantities),
  814. Start: s,
  815. End: e,
  816. }
  817. }
  818. containerResourceRequestsResult, _ := containerResourceRequestsFuture.Await()
  819. for _, res := range containerResourceRequestsResult {
  820. key := containerKey{podUID: res.UID, name: res.Container}
  821. container, ok := containerMap[key]
  822. if !ok {
  823. log.Warnf("container %s/%s has not been initialized to add resource requests", res.UID, res.Container)
  824. continue
  825. }
  826. resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
  827. container.ResourceRequests.Set(resource, unit, kubemodel.StatAvg, value)
  828. }
  829. containerResourceLimitsResult, _ := containerResourceLimitsFuture.Await()
  830. for _, res := range containerResourceLimitsResult {
  831. key := containerKey{podUID: res.UID, name: res.Container}
  832. container, ok := containerMap[key]
  833. if !ok {
  834. log.Warnf("container %s/%s has not been initialized to add resource limits", res.UID, res.Container)
  835. continue
  836. }
  837. resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
  838. container.ResourceLimits.Set(resource, unit, kubemodel.StatAvg, value)
  839. }
  840. cpuCoresAllocatedResult, _ := cpuCoresAllocatedFuture.Await()
  841. for _, res := range cpuCoresAllocatedResult {
  842. key := containerKey{podUID: res.UID, name: res.Container}
  843. container, ok := containerMap[key]
  844. if !ok {
  845. log.Warnf("container %s/%s has not been initialized to add CPU cores allocated", res.UID, res.Container)
  846. continue
  847. }
  848. if len(res.Data) > 0 {
  849. container.CPUCoresAllocated = res.Data[0].Value
  850. }
  851. }
  852. ramBytesAllocatedResult, _ := ramBytesAllocatedFuture.Await()
  853. for _, res := range ramBytesAllocatedResult {
  854. key := containerKey{podUID: res.UID, name: res.Container}
  855. container, ok := containerMap[key]
  856. if !ok {
  857. log.Warnf("container %s/%s has not been initialized to add RAM bytes allocated", res.UID, res.Container)
  858. continue
  859. }
  860. if len(res.Data) > 0 {
  861. container.RAMBytesAllocated = res.Data[0].Value
  862. }
  863. }
  864. cpuUsageAvgResult, _ := cpuUsageAvgFuture.Await()
  865. for _, res := range cpuUsageAvgResult {
  866. key := containerKey{podUID: res.UID, name: res.Container}
  867. container, ok := containerMap[key]
  868. if !ok {
  869. log.Warnf("container %s/%s has not been initialized to add CPU usage avg", res.UID, res.Container)
  870. continue
  871. }
  872. if len(res.Data) > 0 {
  873. container.CPUCoreUsageAvg = res.Data[0].Value
  874. }
  875. }
  876. cpuUsageMaxResult, _ := cpuUsageMaxFuture.Await()
  877. for _, res := range cpuUsageMaxResult {
  878. key := containerKey{podUID: res.UID, name: res.Container}
  879. container, ok := containerMap[key]
  880. if !ok {
  881. log.Warnf("container %s/%s has not been initialized to add CPU usage max", res.UID, res.Container)
  882. continue
  883. }
  884. if len(res.Data) > 0 {
  885. container.CPUCoreUsageMax = res.Data[0].Value
  886. }
  887. }
  888. ramUsageAvgResult, _ := ramUsageAvgFuture.Await()
  889. for _, res := range ramUsageAvgResult {
  890. key := containerKey{podUID: res.UID, name: res.Container}
  891. container, ok := containerMap[key]
  892. if !ok {
  893. log.Warnf("container %s/%s has not been initialized to add RAM usage avg", res.UID, res.Container)
  894. continue
  895. }
  896. if len(res.Data) > 0 {
  897. container.RAMBytesUsageAvg = res.Data[0].Value
  898. }
  899. }
  900. ramUsageMaxResult, _ := ramUsageMaxFuture.Await()
  901. for _, res := range ramUsageMaxResult {
  902. key := containerKey{podUID: res.UID, name: res.Container}
  903. container, ok := containerMap[key]
  904. if !ok {
  905. log.Warnf("container %s/%s has not been initialized to add RAM usage max", res.UID, res.Container)
  906. continue
  907. }
  908. if len(res.Data) > 0 {
  909. container.RAMBytesUsageMax = res.Data[0].Value
  910. }
  911. }
  912. for _, container := range containerMap {
  913. err := kms.RegisterContainer(container)
  914. if err != nil {
  915. log.Warnf("Failed to register container: %s", err.Error())
  916. }
  917. }
  918. return nil
  919. }
  920. func (km *KubeModel) computeResourceQuotas(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  921. grp := source.NewQueryGroup()
  922. metrics := km.ds.Metrics()
  923. rqInfoResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaInfo(start, end))
  924. rqUptimeResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaUptime(start, end))
  925. // spec.hard.requests
  926. rqSpecCPURequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPURequestAverage(start, end))
  927. rqSpecCPURequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPURequestMax(start, end))
  928. rqSpecRAMRequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMRequestAverage(start, end))
  929. rqSpecRAMRequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMRequestMax(start, end))
  930. // spec.hard.limits
  931. rqSpecCPULimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPULimitAverage(start, end))
  932. rqSpecCPULimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPULimitMax(start, end))
  933. rqSpecRAMLimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMLimitAverage(start, end))
  934. rqSpecRAMLimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMLimitMax(start, end))
  935. // status.used.requests
  936. rqStatusUsedCPURequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPURequestAverage(start, end))
  937. rqStatusUsedCPURequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPURequestMax(start, end))
  938. rqStatusUsedRAMRequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMRequestAverage(start, end))
  939. rqStatusUsedRAMRequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMRequestMax(start, end))
  940. // status.used.limits
  941. rqStatusUsedCPULimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPULimitAverage(start, end))
  942. rqStatusUsedCPULimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPULimitMax(start, end))
  943. rqStatusUsedRAMLimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMLimitAverage(start, end))
  944. rqStatusUsedRAMLimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMLimitMax(start, end))
  945. rqMap := make(map[string]*kubemodel.ResourceQuota)
  946. // Initialize resource quotas from info
  947. rqInfoResult, _ := rqInfoResultFuture.Await()
  948. for _, res := range rqInfoResult {
  949. rqMap[res.UID] = &kubemodel.ResourceQuota{
  950. UID: res.UID,
  951. Name: res.ResourceQuota,
  952. NamespaceUID: res.NamespaceUID,
  953. Spec: &kubemodel.ResourceQuotaSpec{Hard: &kubemodel.ResourceQuotaSpecHard{}},
  954. Status: &kubemodel.ResourceQuotaStatus{Used: &kubemodel.ResourceQuotaStatusUsed{}},
  955. }
  956. }
  957. rqUptimeResult, _ := rqUptimeResultFuture.Await()
  958. for _, res := range rqUptimeResult {
  959. rq, ok := rqMap[res.UID]
  960. if !ok {
  961. log.Warnf("resource quota with UID '%s' has not been initialized to add uptime", res.UID)
  962. continue
  963. }
  964. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  965. rq.Start = s
  966. rq.End = e
  967. }
  968. rqSpecCPURequestAverageResult, _ := rqSpecCPURequestAverageResultFuture.Await()
  969. for _, res := range rqSpecCPURequestAverageResult {
  970. rq, ok := rqMap[res.UID]
  971. if !ok {
  972. log.Warnf("resource quota with UID '%s' has not been initialized to add spec CPU request average", res.UID)
  973. continue
  974. }
  975. mcpu := res.Value * 1000
  976. rq.Spec.Hard.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
  977. }
  978. rqSpecCPURequestMaxResult, _ := rqSpecCPURequestMaxResultFuture.Await()
  979. for _, res := range rqSpecCPURequestMaxResult {
  980. rq, ok := rqMap[res.UID]
  981. if !ok {
  982. log.Warnf("resource quota with UID '%s' has not been initialized to add spec CPU request max", res.UID)
  983. continue
  984. }
  985. mcpu := res.Value * 1000
  986. rq.Spec.Hard.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
  987. }
  988. rqSpecRAMRequestAverageResult, _ := rqSpecRAMRequestAverageResultFuture.Await()
  989. for _, res := range rqSpecRAMRequestAverageResult {
  990. rq, ok := rqMap[res.UID]
  991. if !ok {
  992. log.Warnf("resource quota with UID '%s' has not been initialized to add spec RAM request average", res.UID)
  993. continue
  994. }
  995. rq.Spec.Hard.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Value)
  996. }
  997. rqSpecRAMRequestMaxResult, _ := rqSpecRAMRequestMaxResultFuture.Await()
  998. for _, res := range rqSpecRAMRequestMaxResult {
  999. rq, ok := rqMap[res.UID]
  1000. if !ok {
  1001. log.Warnf("resource quota with UID '%s' has not been initialized to add spec RAM request max", res.UID)
  1002. continue
  1003. }
  1004. rq.Spec.Hard.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Value)
  1005. }
  1006. rqSpecCPULimitAverageResult, _ := rqSpecCPULimitAverageResultFuture.Await()
  1007. for _, res := range rqSpecCPULimitAverageResult {
  1008. rq, ok := rqMap[res.UID]
  1009. if !ok {
  1010. log.Warnf("resource quota with UID '%s' has not been initialized to add spec CPU limit average", res.UID)
  1011. continue
  1012. }
  1013. mcpu := res.Value * 1000
  1014. rq.Spec.Hard.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
  1015. }
  1016. rqSpecCPULimitMaxResult, _ := rqSpecCPULimitMaxResultFuture.Await()
  1017. for _, res := range rqSpecCPULimitMaxResult {
  1018. rq, ok := rqMap[res.UID]
  1019. if !ok {
  1020. log.Warnf("resource quota with UID '%s' has not been initialized to add spec CPU limit max", res.UID)
  1021. continue
  1022. }
  1023. mcpu := res.Value * 1000
  1024. rq.Spec.Hard.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
  1025. }
  1026. rqSpecRAMLimitAverageResult, _ := rqSpecRAMLimitAverageResultFuture.Await()
  1027. for _, res := range rqSpecRAMLimitAverageResult {
  1028. rq, ok := rqMap[res.UID]
  1029. if !ok {
  1030. log.Warnf("resource quota with UID '%s' has not been initialized to add spec RAM limit average", res.UID)
  1031. continue
  1032. }
  1033. rq.Spec.Hard.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Value)
  1034. }
  1035. rqSpecRAMLimitMaxResult, _ := rqSpecRAMLimitMaxResultFuture.Await()
  1036. for _, res := range rqSpecRAMLimitMaxResult {
  1037. rq, ok := rqMap[res.UID]
  1038. if !ok {
  1039. log.Warnf("resource quota with UID '%s' has not been initialized to add spec RAM limit max", res.UID)
  1040. continue
  1041. }
  1042. rq.Spec.Hard.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Value)
  1043. }
  1044. rqStatusUsedCPURequestAverageResult, _ := rqStatusUsedCPURequestAverageResultFuture.Await()
  1045. for _, res := range rqStatusUsedCPURequestAverageResult {
  1046. rq, ok := rqMap[res.UID]
  1047. if !ok {
  1048. log.Warnf("resource quota with UID '%s' has not been initialized to add status CPU request average", res.UID)
  1049. continue
  1050. }
  1051. mcpu := res.Value * 1000
  1052. rq.Status.Used.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
  1053. }
  1054. rqStatusUsedCPURequestMaxResult, _ := rqStatusUsedCPURequestMaxResultFuture.Await()
  1055. for _, res := range rqStatusUsedCPURequestMaxResult {
  1056. rq, ok := rqMap[res.UID]
  1057. if !ok {
  1058. log.Warnf("resource quota with UID '%s' has not been initialized to add status CPU request max", res.UID)
  1059. continue
  1060. }
  1061. mcpu := res.Value * 1000
  1062. rq.Status.Used.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
  1063. }
  1064. rqStatusUsedRAMRequestAverageResult, _ := rqStatusUsedRAMRequestAverageResultFuture.Await()
  1065. for _, res := range rqStatusUsedRAMRequestAverageResult {
  1066. rq, ok := rqMap[res.UID]
  1067. if !ok {
  1068. log.Warnf("resource quota with UID '%s' has not been initialized to add status RAM request average", res.UID)
  1069. continue
  1070. }
  1071. rq.Status.Used.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Value)
  1072. }
  1073. rqStatusUsedRAMRequestMaxResult, _ := rqStatusUsedRAMRequestMaxResultFuture.Await()
  1074. for _, res := range rqStatusUsedRAMRequestMaxResult {
  1075. rq, ok := rqMap[res.UID]
  1076. if !ok {
  1077. log.Warnf("resource quota with UID '%s' has not been initialized to add status RAM request max", res.UID)
  1078. continue
  1079. }
  1080. rq.Status.Used.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Value)
  1081. }
  1082. rqStatusUsedCPULimitAverageResult, _ := rqStatusUsedCPULimitAverageResultFuture.Await()
  1083. for _, res := range rqStatusUsedCPULimitAverageResult {
  1084. rq, ok := rqMap[res.UID]
  1085. if !ok {
  1086. log.Warnf("resource quota with UID '%s' has not been initialized to add status CPU limit average", res.UID)
  1087. continue
  1088. }
  1089. mcpu := res.Value * 1000
  1090. rq.Status.Used.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
  1091. }
  1092. rqStatusUsedCPULimitMaxResult, _ := rqStatusUsedCPULimitMaxResultFuture.Await()
  1093. for _, res := range rqStatusUsedCPULimitMaxResult {
  1094. rq, ok := rqMap[res.UID]
  1095. if !ok {
  1096. log.Warnf("resource quota with UID '%s' has not been initialized to add status CPU limit max", res.UID)
  1097. continue
  1098. }
  1099. mcpu := res.Value * 1000
  1100. rq.Status.Used.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
  1101. }
  1102. rqStatusUsedRAMLimitAverageResult, _ := rqStatusUsedRAMLimitAverageResultFuture.Await()
  1103. for _, res := range rqStatusUsedRAMLimitAverageResult {
  1104. rq, ok := rqMap[res.UID]
  1105. if !ok {
  1106. log.Warnf("resource quota with UID '%s' has not been initialized to add status RAM limit average", res.UID)
  1107. continue
  1108. }
  1109. rq.Status.Used.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Value)
  1110. }
  1111. rqStatusUsedRAMLimitMaxResult, _ := rqStatusUsedRAMLimitMaxResultFuture.Await()
  1112. for _, res := range rqStatusUsedRAMLimitMaxResult {
  1113. rq, ok := rqMap[res.UID]
  1114. if !ok {
  1115. log.Warnf("resource quota with UID '%s' has not been initialized to add status RAM limit max", res.UID)
  1116. continue
  1117. }
  1118. rq.Status.Used.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Value)
  1119. }
  1120. for _, resourceQuota := range rqMap {
  1121. err := kms.RegisterResourceQuota(resourceQuota)
  1122. if err != nil {
  1123. log.Warnf("Failed to register resource quota: %s", err.Error())
  1124. }
  1125. }
  1126. return nil
  1127. }
  1128. func (km *KubeModel) computeServices(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  1129. grp := source.NewQueryGroup()
  1130. metrics := km.ds.Metrics()
  1131. serviceInfoResultFuture := source.WithGroup(grp, metrics.QueryServiceInfo(start, end))
  1132. serviceUptimeResultFuture := source.WithGroup(grp, metrics.QueryServiceUptime(start, end))
  1133. serviceSelectorLabelsResultFuture := source.WithGroup(grp, metrics.QueryServiceSelectorLabels(start, end))
  1134. serviceMap := make(map[string]*kubemodel.Service)
  1135. // Initialize services from info
  1136. serviceInfoResult, _ := serviceInfoResultFuture.Await()
  1137. for _, res := range serviceInfoResult {
  1138. serviceMap[res.UID] = &kubemodel.Service{
  1139. UID: res.UID,
  1140. NamespaceUID: res.NamespaceUID,
  1141. Name: res.Service,
  1142. Type: kubemodel.ParseServiceType(res.ServiceType),
  1143. LBIngressAddress: res.LBIngressAddress,
  1144. }
  1145. }
  1146. serviceUptimeResult, _ := serviceUptimeResultFuture.Await()
  1147. for _, res := range serviceUptimeResult {
  1148. service, ok := serviceMap[res.UID]
  1149. if !ok {
  1150. log.Warnf("service with UID '%s' has not been initialized to add uptime", res.UID)
  1151. continue
  1152. }
  1153. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  1154. service.Start = s
  1155. service.End = e
  1156. }
  1157. serviceSelectorLabelsResult, _ := serviceSelectorLabelsResultFuture.Await()
  1158. for _, res := range serviceSelectorLabelsResult {
  1159. service, ok := serviceMap[res.UID]
  1160. if !ok {
  1161. log.Warnf("service with UID '%s' has not been initialized to add selector labels", res.UID)
  1162. continue
  1163. }
  1164. service.Selector = res.Labels
  1165. }
  1166. for _, service := range serviceMap {
  1167. err := kms.RegisterService(service)
  1168. if err != nil {
  1169. log.Warnf("Failed to register service: %s", err.Error())
  1170. }
  1171. }
  1172. return nil
  1173. }
  1174. func (km *KubeModel) computePersistentVolumes(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  1175. grp := source.NewQueryGroup()
  1176. metrics := km.ds.Metrics()
  1177. pvInfoResultFuture := source.WithGroup(grp, metrics.QueryKMPVInfo(start, end))
  1178. pvUptimeResultFuture := source.WithGroup(grp, metrics.QueryPVUptime(start, end))
  1179. pvBytesResultFuture := source.WithGroup(grp, metrics.QueryPVBytes(start, end))
  1180. pvMap := make(map[string]*kubemodel.PersistentVolume)
  1181. pvInfoResult, _ := pvInfoResultFuture.Await()
  1182. for _, res := range pvInfoResult {
  1183. pvMap[res.UID] = &kubemodel.PersistentVolume{
  1184. UID: res.UID,
  1185. Name: res.PersistentVolume,
  1186. StorageClass: res.StorageClass,
  1187. CSIVolumeHandle: res.CSIVolumeHandle,
  1188. }
  1189. }
  1190. pvUptimeResult, _ := pvUptimeResultFuture.Await()
  1191. for _, res := range pvUptimeResult {
  1192. pv, ok := pvMap[res.UID]
  1193. if !ok {
  1194. log.Warnf("persistent volume with UID '%s' has not been initialized to add uptime", res.UID)
  1195. continue
  1196. }
  1197. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  1198. pv.Start = s
  1199. pv.End = e
  1200. }
  1201. pvBytesResult, _ := pvBytesResultFuture.Await()
  1202. for _, res := range pvBytesResult {
  1203. pv, ok := pvMap[res.UID]
  1204. if !ok {
  1205. log.Warnf("persistent volume with UID '%s' has not been initialized to add bytes", res.UID)
  1206. continue
  1207. }
  1208. pv.SizeBytes = res.Value
  1209. }
  1210. for _, pv := range pvMap {
  1211. err := kms.RegisterPersistentVolume(pv)
  1212. if err != nil {
  1213. log.Warnf("Failed to register persistent volume: %s", err.Error())
  1214. }
  1215. }
  1216. return nil
  1217. }
  1218. func (km *KubeModel) computePersistentVolumeClaims(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  1219. grp := source.NewQueryGroup()
  1220. metrics := km.ds.Metrics()
  1221. pvcInfoResultFuture := source.WithGroup(grp, metrics.QueryKMPVCInfo(start, end))
  1222. pvcUptimeResultFuture := source.WithGroup(grp, metrics.QueryPVCUptime(start, end))
  1223. pvcBytesRequestedResultFuture := source.WithGroup(grp, metrics.QueryPVCBytesRequested(start, end))
  1224. pvcBytesUsedAvgResultFuture := source.WithGroup(grp, metrics.QueryPVCBytesUsedAverage(start, end))
  1225. pvcBytesUsedMaxResultFuture := source.WithGroup(grp, metrics.QueryPVCBytesUsedMax(start, end))
  1226. pvcMap := make(map[string]*kubemodel.PersistentVolumeClaim)
  1227. pvcInfoResult, _ := pvcInfoResultFuture.Await()
  1228. for _, res := range pvcInfoResult {
  1229. pvcMap[res.UID] = &kubemodel.PersistentVolumeClaim{
  1230. UID: res.UID,
  1231. Name: res.PersistentVolumeClaim,
  1232. NamespaceUID: res.NamespaceUID,
  1233. PersistentVolumeUID: res.PVUID,
  1234. StorageClass: res.StorageClass,
  1235. }
  1236. }
  1237. pvcUptimeResult, _ := pvcUptimeResultFuture.Await()
  1238. for _, res := range pvcUptimeResult {
  1239. pvc, ok := pvcMap[res.UID]
  1240. if !ok {
  1241. log.Warnf("persistent volume claim with UID '%s' has not been initialized to add uptime", res.UID)
  1242. continue
  1243. }
  1244. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  1245. pvc.Start = s
  1246. pvc.End = e
  1247. }
  1248. pvcBytesRequestedResult, _ := pvcBytesRequestedResultFuture.Await()
  1249. for _, res := range pvcBytesRequestedResult {
  1250. pvc, ok := pvcMap[res.UID]
  1251. if !ok {
  1252. log.Warnf("persistent volume claim with UID '%s' has not been initialized to add requested bytes", res.UID)
  1253. continue
  1254. }
  1255. if len(res.Data) > 0 {
  1256. pvc.RequestedBytes = res.Data[0].Value
  1257. }
  1258. }
  1259. pvcBytesUsedAvgResult, _ := pvcBytesUsedAvgResultFuture.Await()
  1260. for _, res := range pvcBytesUsedAvgResult {
  1261. pvc, ok := pvcMap[res.UID]
  1262. if !ok {
  1263. log.Warnf("persistent volume claim with UID '%s' has not been initialized to add bytes used average", res.UID)
  1264. continue
  1265. }
  1266. pvc.UsageBytesAvg = res.Value
  1267. }
  1268. pvcBytesUsedMaxResult, _ := pvcBytesUsedMaxResultFuture.Await()
  1269. for _, res := range pvcBytesUsedMaxResult {
  1270. pvc, ok := pvcMap[res.UID]
  1271. if !ok {
  1272. log.Warnf("persistent volume claim with UID '%s' has not been initialized to add bytes used max", res.UID)
  1273. continue
  1274. }
  1275. pvc.UsageBytesMax = res.Value
  1276. }
  1277. for _, pvc := range pvcMap {
  1278. err := kms.RegisterPVC(pvc)
  1279. if err != nil {
  1280. log.Warnf("Failed to register persistent volume claim: %s", err.Error())
  1281. }
  1282. }
  1283. return nil
  1284. }
  1285. func (km *KubeModel) computeDCGMDevices(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  1286. grp := source.NewQueryGroup()
  1287. metrics := km.ds.Metrics()
  1288. dcgmInfoFuture := source.WithGroup(grp, metrics.QueryDCGMDeviceInfo(start, end))
  1289. dcgmUptimeFuture := source.WithGroup(grp, metrics.QueryDCGMDeviceUptime(start, end))
  1290. dcgmUsageAvgFuture := source.WithGroup(grp, metrics.QueryDCGMContainerUsageAvg(start, end))
  1291. dcgmUsageMaxFuture := source.WithGroup(grp, metrics.QueryDCGMContainerUsageMax(start, end))
  1292. deviceMap := make(map[string]*kubemodel.DCGMDevice)
  1293. dcgmInfoResult, _ := dcgmInfoFuture.Await()
  1294. for _, res := range dcgmInfoResult {
  1295. if res.UUID == "" {
  1296. continue
  1297. }
  1298. if _, ok := deviceMap[res.UUID]; ok {
  1299. continue
  1300. }
  1301. deviceMap[res.UUID] = &kubemodel.DCGMDevice{
  1302. UUID: res.UUID,
  1303. Device: res.Device,
  1304. ModelName: res.ModelName,
  1305. PodUsages: make(map[string]kubemodel.DCGMPod),
  1306. }
  1307. }
  1308. dcgmUptimeResult, _ := dcgmUptimeFuture.Await()
  1309. for _, res := range dcgmUptimeResult {
  1310. d, ok := deviceMap[res.UUID]
  1311. if !ok {
  1312. log.Warnf("DCGM uptime result for unknown device UUID '%s'", res.UUID)
  1313. continue
  1314. }
  1315. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  1316. d.Start = s
  1317. d.End = e
  1318. }
  1319. dcgmUsageAvgResult, _ := dcgmUsageAvgFuture.Await()
  1320. for _, res := range dcgmUsageAvgResult {
  1321. device, ok := deviceMap[res.UUID]
  1322. if !ok || res.PodUID == "" || res.Container == "" {
  1323. continue
  1324. }
  1325. pod, ok := device.PodUsages[res.PodUID]
  1326. if !ok {
  1327. pod = kubemodel.DCGMPod{ContainerUsages: make(map[string]kubemodel.DCGMContainer)}
  1328. }
  1329. c := pod.ContainerUsages[res.Container]
  1330. c.UsageAvg = res.Value
  1331. pod.ContainerUsages[res.Container] = c
  1332. device.PodUsages[res.PodUID] = pod
  1333. }
  1334. dcgmUsageMaxResult, _ := dcgmUsageMaxFuture.Await()
  1335. for _, res := range dcgmUsageMaxResult {
  1336. device, ok := deviceMap[res.UUID]
  1337. if !ok || res.PodUID == "" || res.Container == "" {
  1338. continue
  1339. }
  1340. pod, ok := device.PodUsages[res.PodUID]
  1341. if !ok {
  1342. pod = kubemodel.DCGMPod{ContainerUsages: make(map[string]kubemodel.DCGMContainer)}
  1343. }
  1344. c := pod.ContainerUsages[res.Container]
  1345. c.UsageMax = res.Value
  1346. pod.ContainerUsages[res.Container] = c
  1347. device.PodUsages[res.PodUID] = pod
  1348. }
  1349. for _, device := range deviceMap {
  1350. if err := kms.RegisterDCGMDevice(device); err != nil {
  1351. log.Warnf("Failed to register DCGM device: %s", err.Error())
  1352. }
  1353. }
  1354. return nil
  1355. }