kubemodel.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/env"
  6. "github.com/opencost/opencost/core/pkg/model/kubemodel"
  7. "github.com/opencost/opencost/core/pkg/source"
  8. "github.com/opencost/opencost/core/pkg/stats"
  9. )
  10. const logTimeFmt string = "2006-01-02T15:04:05"
  11. // ComputeKubeModel uses the CostModel instance to compute an KubeModelSet
  12. // for the window defined by the given start and end times. The KubeModels
  13. // returned are unaggregated (i.e. down to the container level).
  14. func (cm *CostModel) ComputeKubeModel(start, end time.Time) (*kubemodel.KubeModelSet, error) {
  15. // 1. Initialize new KubeModelSet for requested Window
  16. kms := kubemodel.NewKubeModelSet(start, end)
  17. // 2. Query CostModel for each set of objects
  18. var err error
  19. // 2.1 Compute Cluster
  20. err = cm.kmComputeCluster(kms, start, end)
  21. if err != nil {
  22. kms.Metadata.Errors = append(kms.Metadata.Errors, err)
  23. return kms, fmt.Errorf("error computing kubemodel.Cluster for (%s, %s): %w", start.Format(logTimeFmt), end.Format(logTimeFmt), err)
  24. }
  25. // 2.2 Compute Namespaces
  26. err = cm.kmComputeNamespaces(kms, start, end)
  27. if err != nil {
  28. kms.Metadata.Errors = append(kms.Metadata.Errors, err)
  29. }
  30. kms.Metadata.ObjectCount += len(kms.Namespaces)
  31. // 2.3 Compute ResourceQuotas
  32. err = cm.kmComputeResourceQuotas(kms, start, end)
  33. if err != nil {
  34. kms.Metadata.Errors = append(kms.Metadata.Errors, err)
  35. }
  36. kms.Metadata.ObjectCount += len(kms.ResourceQuotas)
  37. // 3. Mark KubeModelSet as completed
  38. kms.Metadata.CompletedAt = time.Now().UTC()
  39. return kms, nil
  40. }
  41. func (cm *CostModel) kmComputeCluster(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  42. // TODO: determine where Cluster data comes from
  43. // - Should it come from direct queries?
  44. // - Or should it come from pre-processed data from other objects?
  45. kms.Cluster = &kubemodel.Cluster{
  46. UID: env.GetClusterID(), // TODO: should we instead grab these from Metrics()?
  47. Name: env.GetClusterID(), // TODO: do we still want to use this env var for Name?
  48. }
  49. kms.Metadata.ObjectCount += 1
  50. return nil
  51. }
  52. func (cm *CostModel) kmComputeNamespaces(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  53. grp := source.NewQueryGroup()
  54. ds := cm.DataSource.Metrics()
  55. nsLabelsResultFuture := source.WithGroup(grp, ds.QueryNamespaceLabels(start, end))
  56. nsAnnosResultFuture := source.WithGroup(grp, ds.QueryNamespaceAnnotations(start, end))
  57. nsLabelsResult, _ := nsLabelsResultFuture.Await()
  58. nsAnnosResult, _ := nsAnnosResultFuture.Await()
  59. for _, res := range nsLabelsResult {
  60. kms.RegisterNamespace(res.UID, res.Namespace)
  61. kms.Namespaces[res.UID].Labels = res.Labels
  62. }
  63. for _, res := range nsAnnosResult {
  64. kms.RegisterNamespace(res.UID, res.Namespace)
  65. kms.Namespaces[res.UID].Annotations = res.Annotations
  66. }
  67. return nil
  68. }
  69. func (cm *CostModel) kmComputeResourceQuotas(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  70. grp := source.NewQueryGroup()
  71. ds := cm.DataSource.Metrics()
  72. // spec.hard.requests
  73. rqSpecCPURequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPURequestAverage(start, end))
  74. rqSpecCPURequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPURequestMax(start, end))
  75. rqSpecRAMRequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMRequestAverage(start, end))
  76. rqSpecRAMRequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMRequestMax(start, end))
  77. // spec.hard.limits
  78. rqSpecCPULimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPULimitAverage(start, end))
  79. rqSpecCPULimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPULimitMax(start, end))
  80. rqSpecRAMLimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMLimitAverage(start, end))
  81. rqSpecRAMLimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMLimitMax(start, end))
  82. // status.used.requests
  83. rqStatusUsedCPURequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPURequestAverage(start, end))
  84. rqStatusUsedCPURequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPURequestMax(start, end))
  85. rqStatusUsedRAMRequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMRequestAverage(start, end))
  86. rqStatusUsedRAMRequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMRequestMax(start, end))
  87. // status.used.limits
  88. rqStatusUsedCPULimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPULimitAverage(start, end))
  89. rqStatusUsedCPULimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPULimitMax(start, end))
  90. rqStatusUsedRAMLimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMLimitAverage(start, end))
  91. rqStatusUsedRAMLimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMLimitMax(start, end))
  92. rqSpecCPURequestAverageResult, _ := rqSpecCPURequestAverageResultFuture.Await()
  93. for _, res := range rqSpecCPURequestAverageResult {
  94. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  95. mcpu := res.Data[0].Value * 1000
  96. kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Avg, mcpu)
  97. }
  98. rqSpecCPURequestMaxResult, _ := rqSpecCPURequestMaxResultFuture.Await()
  99. for _, res := range rqSpecCPURequestMaxResult {
  100. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  101. mcpu := res.Data[0].Value * 1000
  102. kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Max, mcpu)
  103. }
  104. rqSpecRAMRequestAverageResult, _ := rqSpecRAMRequestAverageResultFuture.Await()
  105. for _, res := range rqSpecRAMRequestAverageResult {
  106. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  107. kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Avg, res.Data[0].Value)
  108. }
  109. rqSpecRAMRequestMaxResult, _ := rqSpecRAMRequestMaxResultFuture.Await()
  110. for _, res := range rqSpecRAMRequestMaxResult {
  111. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  112. kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Max, res.Data[0].Value)
  113. }
  114. rqSpecCPULimitAverageResult, _ := rqSpecCPULimitAverageResultFuture.Await()
  115. for _, res := range rqSpecCPULimitAverageResult {
  116. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  117. mcpu := res.Data[0].Value * 1000
  118. kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Avg, mcpu)
  119. }
  120. rqSpecCPULimitMaxResult, _ := rqSpecCPULimitMaxResultFuture.Await()
  121. for _, res := range rqSpecCPULimitMaxResult {
  122. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  123. mcpu := res.Data[0].Value * 1000
  124. kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Max, mcpu)
  125. }
  126. rqSpecRAMLimitAverageResult, _ := rqSpecRAMLimitAverageResultFuture.Await()
  127. for _, res := range rqSpecRAMLimitAverageResult {
  128. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  129. kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Avg, res.Data[0].Value)
  130. }
  131. rqSpecRAMLimitMaxResult, _ := rqSpecRAMLimitMaxResultFuture.Await()
  132. for _, res := range rqSpecRAMLimitMaxResult {
  133. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  134. kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Max, res.Data[0].Value)
  135. }
  136. rqStatusUsedCPURequestAverageResult, _ := rqStatusUsedCPURequestAverageResultFuture.Await()
  137. for _, res := range rqStatusUsedCPURequestAverageResult {
  138. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  139. mcpu := res.Data[0].Value * 1000
  140. kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Avg, mcpu)
  141. }
  142. rqStatusUsedCPURequestMaxResult, _ := rqStatusUsedCPURequestMaxResultFuture.Await()
  143. for _, res := range rqStatusUsedCPURequestMaxResult {
  144. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  145. mcpu := res.Data[0].Value * 1000
  146. kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Max, mcpu)
  147. }
  148. rqStatusUsedRAMRequestAverageResult, _ := rqStatusUsedRAMRequestAverageResultFuture.Await()
  149. for _, res := range rqStatusUsedRAMRequestAverageResult {
  150. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  151. kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Avg, res.Data[0].Value)
  152. }
  153. rqStatusUsedRAMRequestMaxResult, _ := rqStatusUsedRAMRequestMaxResultFuture.Await()
  154. for _, res := range rqStatusUsedRAMRequestMaxResult {
  155. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  156. kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Max, res.Data[0].Value)
  157. }
  158. rqStatusUsedCPULimitAverageResult, _ := rqStatusUsedCPULimitAverageResultFuture.Await()
  159. for _, res := range rqStatusUsedCPULimitAverageResult {
  160. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  161. mcpu := res.Data[0].Value * 1000
  162. kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Avg, mcpu)
  163. }
  164. rqStatusUsedCPULimitMaxResult, _ := rqStatusUsedCPULimitMaxResultFuture.Await()
  165. for _, res := range rqStatusUsedCPULimitMaxResult {
  166. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  167. mcpu := res.Data[0].Value * 1000
  168. kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Max, mcpu)
  169. }
  170. rqStatusUsedRAMLimitAverageResult, _ := rqStatusUsedRAMLimitAverageResultFuture.Await()
  171. for _, res := range rqStatusUsedRAMLimitAverageResult {
  172. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  173. kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Avg, res.Data[0].Value)
  174. }
  175. rqStatusUsedRAMLimitMaxResult, _ := rqStatusUsedRAMLimitMaxResultFuture.Await()
  176. for _, res := range rqStatusUsedRAMLimitMaxResult {
  177. kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  178. kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Max, res.Data[0].Value)
  179. }
  180. return nil
  181. }