container.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package kubemodel
  2. import (
  3. "time"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/model/kubemodel"
  6. "github.com/opencost/opencost/core/pkg/source"
  7. )
  8. // resourceUnitValue converts prometheus resource/unit strings from ResourceResult
  9. // into kubemodel types, applying any necessary unit conversions.
  10. func resourceUnitValue(resource, unit string, value float64) (kubemodel.Resource, kubemodel.Unit, float64) {
  11. switch resource {
  12. case "cpu":
  13. return kubemodel.ResourceCPU, kubemodel.UnitCore, value
  14. case "memory":
  15. return kubemodel.ResourceMemory, kubemodel.UnitByte, value
  16. default:
  17. return kubemodel.Resource(resource), kubemodel.Unit(unit), value
  18. }
  19. }
  20. func (km *KubeModel) computeContainers(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  21. grp := source.NewQueryGroup()
  22. metrics := km.ds.Metrics()
  23. containerUptimeFuture := source.WithGroup(grp, metrics.QueryContainerUptime(start, end))
  24. containerResourceRequestsFuture := source.WithGroup(grp, metrics.QueryContainerResourceRequests(start, end))
  25. containerResourceLimitsFuture := source.WithGroup(grp, metrics.QueryContainerResourceLimits(start, end))
  26. cpuCoresAllocatedFuture := source.WithGroup(grp, metrics.QueryCPUCoresAllocated(start, end))
  27. cpuUsageAvgFuture := source.WithGroup(grp, metrics.QueryCPUUsageAvg(start, end))
  28. cpuUsageMaxFuture := source.WithGroup(grp, metrics.QueryCPUUsageMax(start, end))
  29. ramBytesAllocatedFuture := source.WithGroup(grp, metrics.QueryRAMBytesAllocated(start, end))
  30. ramUsageAvgFuture := source.WithGroup(grp, metrics.QueryRAMUsageAvg(start, end))
  31. ramUsageMaxFuture := source.WithGroup(grp, metrics.QueryRAMUsageMax(start, end))
  32. type containerKey struct {
  33. podUID string
  34. name string
  35. }
  36. containerMap := make(map[containerKey]*kubemodel.Container)
  37. containerUptimeResult, _ := containerUptimeFuture.Await()
  38. for _, res := range containerUptimeResult {
  39. key := containerKey{podUID: res.UID, name: res.Container}
  40. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  41. containerMap[key] = &kubemodel.Container{
  42. PodUID: res.UID,
  43. Name: res.Container,
  44. ResourceRequests: make(kubemodel.ResourceQuantities),
  45. ResourceLimits: make(kubemodel.ResourceQuantities),
  46. Start: s,
  47. End: e,
  48. }
  49. }
  50. containerResourceRequestsResult, _ := containerResourceRequestsFuture.Await()
  51. for _, res := range containerResourceRequestsResult {
  52. key := containerKey{podUID: res.UID, name: res.Container}
  53. container, ok := containerMap[key]
  54. if !ok {
  55. log.Warnf("container %s/%s has not been initialized to add resource requests", res.UID, res.Container)
  56. continue
  57. }
  58. resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
  59. container.ResourceRequests.Set(resource, unit, kubemodel.StatAvg, value)
  60. }
  61. containerResourceLimitsResult, _ := containerResourceLimitsFuture.Await()
  62. for _, res := range containerResourceLimitsResult {
  63. key := containerKey{podUID: res.UID, name: res.Container}
  64. container, ok := containerMap[key]
  65. if !ok {
  66. log.Warnf("container %s/%s has not been initialized to add resource limits", res.UID, res.Container)
  67. continue
  68. }
  69. resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
  70. container.ResourceLimits.Set(resource, unit, kubemodel.StatAvg, value)
  71. }
  72. cpuCoresAllocatedResult, _ := cpuCoresAllocatedFuture.Await()
  73. for _, res := range cpuCoresAllocatedResult {
  74. key := containerKey{podUID: res.UID, name: res.Container}
  75. container, ok := containerMap[key]
  76. if !ok {
  77. log.Warnf("container %s/%s has not been initialized to add CPU cores allocated", res.UID, res.Container)
  78. continue
  79. }
  80. if len(res.Data) > 0 {
  81. container.CPUCoresAllocated = res.Data[0].Value
  82. }
  83. }
  84. ramBytesAllocatedResult, _ := ramBytesAllocatedFuture.Await()
  85. for _, res := range ramBytesAllocatedResult {
  86. key := containerKey{podUID: res.UID, name: res.Container}
  87. container, ok := containerMap[key]
  88. if !ok {
  89. log.Warnf("container %s/%s has not been initialized to add RAM bytes allocated", res.UID, res.Container)
  90. continue
  91. }
  92. if len(res.Data) > 0 {
  93. container.RAMBytesAllocated = res.Data[0].Value
  94. }
  95. }
  96. cpuUsageAvgResult, _ := cpuUsageAvgFuture.Await()
  97. for _, res := range cpuUsageAvgResult {
  98. key := containerKey{podUID: res.UID, name: res.Container}
  99. container, ok := containerMap[key]
  100. if !ok {
  101. log.Warnf("container %s/%s has not been initialized to add CPU usage avg", res.UID, res.Container)
  102. continue
  103. }
  104. if len(res.Data) > 0 {
  105. container.CPUCoreUsageAvg = res.Data[0].Value
  106. }
  107. }
  108. cpuUsageMaxResult, _ := cpuUsageMaxFuture.Await()
  109. for _, res := range cpuUsageMaxResult {
  110. key := containerKey{podUID: res.UID, name: res.Container}
  111. container, ok := containerMap[key]
  112. if !ok {
  113. log.Warnf("container %s/%s has not been initialized to add CPU usage max", res.UID, res.Container)
  114. continue
  115. }
  116. if len(res.Data) > 0 {
  117. container.CPUCoreUsageMax = res.Data[0].Value
  118. }
  119. }
  120. ramUsageAvgResult, _ := ramUsageAvgFuture.Await()
  121. for _, res := range ramUsageAvgResult {
  122. key := containerKey{podUID: res.UID, name: res.Container}
  123. container, ok := containerMap[key]
  124. if !ok {
  125. log.Warnf("container %s/%s has not been initialized to add RAM usage avg", res.UID, res.Container)
  126. continue
  127. }
  128. if len(res.Data) > 0 {
  129. container.RAMBytesUsageAvg = res.Data[0].Value
  130. }
  131. }
  132. ramUsageMaxResult, _ := ramUsageMaxFuture.Await()
  133. for _, res := range ramUsageMaxResult {
  134. key := containerKey{podUID: res.UID, name: res.Container}
  135. container, ok := containerMap[key]
  136. if !ok {
  137. log.Warnf("container %s/%s has not been initialized to add RAM usage max", res.UID, res.Container)
  138. continue
  139. }
  140. if len(res.Data) > 0 {
  141. container.RAMBytesUsageMax = res.Data[0].Value
  142. }
  143. }
  144. for _, container := range containerMap {
  145. err := kms.RegisterContainer(container)
  146. if err != nil {
  147. log.Warnf("Failed to register container: %s", err.Error())
  148. }
  149. }
  150. return nil
  151. }