pod.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package kubemodel
  2. import (
  3. "time"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/model/kubemodel"
  6. "github.com/opencost/opencost/core/pkg/source"
  7. )
  8. func (km *KubeModel) computePods(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  9. grp := source.NewQueryGroup()
  10. metrics := km.ds.Metrics()
  11. podInfoResultFuture := source.WithGroup(grp, metrics.QueryPodInfo(start, end))
  12. podUptimeResultFuture := source.WithGroup(grp, metrics.QueryPodUptime(start, end))
  13. podOwnerResultFuture := source.WithGroup(grp, metrics.QueryPodOwners(start, end))
  14. podPVCVolumesResultFuture := source.WithGroup(grp, metrics.QueryPodPVCVolumes(start, end))
  15. podLabelsResultFuture := source.WithGroup(grp, metrics.QueryPodLabels(start, end))
  16. podAnnosResultFuture := source.WithGroup(grp, metrics.QueryPodAnnotations(start, end))
  17. podNetworkEgressBytesResultFuture := source.WithGroup(grp, metrics.QueryPodNetworkEgressBytes(start, end))
  18. podNetworkIngressBytesResultFuture := source.WithGroup(grp, metrics.QueryPodNetworkIngressBytes(start, end))
  19. podMap := make(map[string]*kubemodel.Pod)
  20. podInfoResult, _ := podInfoResultFuture.Await()
  21. for _, res := range podInfoResult {
  22. podMap[res.UID] = &kubemodel.Pod{
  23. UID: res.UID,
  24. Name: res.Pod,
  25. NamespaceUID: res.NamespaceUID,
  26. NodeUID: res.NodeUID,
  27. }
  28. }
  29. podUptimeResult, _ := podUptimeResultFuture.Await()
  30. for _, res := range podUptimeResult {
  31. pod, ok := podMap[res.UID]
  32. if !ok {
  33. log.Warnf("pod with UID '%s' has not been initialized to add uptime", res.UID)
  34. continue
  35. }
  36. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  37. pod.Start = s
  38. pod.End = e
  39. }
  40. podOwnersResult, _ := podOwnerResultFuture.Await()
  41. for _, res := range podOwnersResult {
  42. pod, ok := podMap[res.UID]
  43. if !ok {
  44. log.Warnf("pod with UID '%s' has not been initialized to add labels", res.UID)
  45. continue
  46. }
  47. pod.Owners = append(pod.Owners, kubemodel.Owner{
  48. UID: res.OwnerUID,
  49. Kind: kubemodel.ParseOwnerKind(res.OwnerKind),
  50. Controller: res.Controller,
  51. })
  52. }
  53. podPVCVolumesResult, _ := podPVCVolumesResultFuture.Await()
  54. for _, res := range podPVCVolumesResult {
  55. pod, ok := podMap[res.UID]
  56. if !ok {
  57. log.Warnf("pod with UID '%s' has not been initialized to add PVC volumes", res.UID)
  58. continue
  59. }
  60. pod.PVCVolumes = append(pod.PVCVolumes, kubemodel.PodPVCVolume{
  61. Name: res.PodVolumeName,
  62. PersistentVolumeClaimUID: res.PVCUID,
  63. })
  64. }
  65. podLabelsResult, _ := podLabelsResultFuture.Await()
  66. for _, res := range podLabelsResult {
  67. pod, ok := podMap[res.UID]
  68. if !ok {
  69. log.Warnf("pod with UID '%s' has not been initialized to add labels", res.UID)
  70. continue
  71. }
  72. pod.Labels = res.Labels
  73. }
  74. podAnnosResult, _ := podAnnosResultFuture.Await()
  75. for _, res := range podAnnosResult {
  76. pod, ok := podMap[res.UID]
  77. if !ok {
  78. log.Warnf("pod with UID '%s' has not been initialized to add annotations", res.UID)
  79. continue
  80. }
  81. pod.Annotations = res.Annotations
  82. }
  83. appendDetail := func(uid string, dir kubemodel.TrafficDirection, tt kubemodel.TrafficType, isNatGateway bool, endpoint string, bytes float64) {
  84. pod, ok := podMap[uid]
  85. if !ok || bytes <= 0 {
  86. return
  87. }
  88. pod.NetworkTrafficDetails = append(pod.NetworkTrafficDetails, kubemodel.NetworkTrafficDetail{
  89. PodUID: uid,
  90. TrafficDirection: dir,
  91. TrafficType: tt,
  92. IsNatGateway: isNatGateway,
  93. Endpoint: endpoint,
  94. Bytes: bytes,
  95. })
  96. }
  97. networkTrafficType := func(res *source.PodNetworkBytesResult) (kubemodel.TrafficType, bool) {
  98. if res.Internet {
  99. return kubemodel.TrafficTypeInternet, true
  100. }
  101. if !res.SameRegion {
  102. return kubemodel.TrafficTypeCrossRegion, true
  103. }
  104. if !res.SameZone {
  105. return kubemodel.TrafficTypeCrossZone, true
  106. }
  107. return "", false
  108. }
  109. podNetworkEgressResult, _ := podNetworkEgressBytesResultFuture.Await()
  110. for _, res := range podNetworkEgressResult {
  111. tt, ok := networkTrafficType(res)
  112. if !ok {
  113. continue
  114. }
  115. appendDetail(res.UID, kubemodel.TrafficDirectionEgress, tt, res.NatGateway, res.Service, res.Value)
  116. }
  117. podNetworkIngressResult, _ := podNetworkIngressBytesResultFuture.Await()
  118. for _, res := range podNetworkIngressResult {
  119. tt, ok := networkTrafficType(res)
  120. if !ok {
  121. continue
  122. }
  123. appendDetail(res.UID, kubemodel.TrafficDirectionIngress, tt, res.NatGateway, res.Service, res.Value)
  124. }
  125. for _, pod := range podMap {
  126. err := kms.RegisterPod(pod)
  127. if err != nil {
  128. log.Warnf("Failed to register pod: %s", err.Error())
  129. }
  130. }
  131. return nil
  132. }