kubemodel.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package kubemodel
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/env"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/model/kubemodel"
  9. "github.com/opencost/opencost/core/pkg/source"
  10. )
  11. const logTimeFmt string = "2006-01-02T15:04:05"
  12. type KubeModel struct {
  13. ds source.OpenCostDataSource
  14. clusterUID string
  15. }
  16. func NewKubeModel(clusterUID string, dataSource source.OpenCostDataSource) (*KubeModel, error) {
  17. if dataSource == nil {
  18. return nil, errors.New("OpenCostDataSource cannot be nil")
  19. }
  20. km := &KubeModel{
  21. ds: dataSource,
  22. clusterUID: clusterUID,
  23. }
  24. km.clusterUID = clusterUID
  25. log.Debugf("NewKubeModel(%s)", km.clusterUID)
  26. return km, nil
  27. }
  28. // ComputeKubeModel uses the CostModel instance to compute an KubeModelSet
  29. // for the window defined by the given start and end times. The KubeModels
  30. // returned are unaggregated (i.e. down to the container level).
  31. func (km *KubeModel) ComputeKubeModelSet(start, end time.Time) (*kubemodel.KubeModelSet, error) {
  32. // 1. Initialize new KubeModelSet for requested Window
  33. kms := kubemodel.NewKubeModelSet(start, end)
  34. // 2. Query CostModel for each set of objects
  35. var err error
  36. // 2.1 Compute Cluster
  37. err = km.computeCluster(kms, start, end)
  38. if err != nil {
  39. kms.Error(err)
  40. return kms, fmt.Errorf("error computing kubemodel.Cluster for (%s, %s): %w", start.Format(logTimeFmt), end.Format(logTimeFmt), err)
  41. }
  42. // 2.2 Compute Namespaces
  43. err = km.computeNamespaces(kms, start, end)
  44. if err != nil {
  45. kms.Error(err)
  46. }
  47. // 2.3 Compute ResourceQuotas
  48. err = km.computeResourceQuotas(kms, start, end)
  49. if err != nil {
  50. kms.Error(err)
  51. }
  52. // 3. Mark KubeModelSet as completed
  53. kms.Metadata.CompletedAt = time.Now().UTC()
  54. return kms, nil
  55. }
  56. func (km *KubeModel) computeCluster(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  57. kms.Cluster = &kubemodel.Cluster{
  58. UID: km.clusterUID,
  59. Name: env.GetClusterID(),
  60. }
  61. grp := source.NewQueryGroup()
  62. metrics := km.ds.Metrics()
  63. clusterUptimeResultFuture := source.WithGroup(grp, metrics.QueryClusterUptime(start, end))
  64. clusterUptimeResult, _ := clusterUptimeResultFuture.Await()
  65. if len(clusterUptimeResult) != 1 {
  66. kms.Errorf("%d clusters returning from cluster uptime query", len(clusterUptimeResult))
  67. }
  68. for _, res := range clusterUptimeResult {
  69. if res.UID == km.clusterUID {
  70. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  71. kms.Cluster.Start = s
  72. kms.Cluster.End = e
  73. }
  74. }
  75. return nil
  76. }
  77. func (km *KubeModel) computeNamespaces(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  78. grp := source.NewQueryGroup()
  79. metrics := km.ds.Metrics()
  80. nsUptimeResultFuture := source.WithGroup(grp, metrics.QueryNamespaceUptime(start, end))
  81. nsLabelsResultFuture := source.WithGroup(grp, metrics.QueryNamespaceLabels(start, end))
  82. nsAnnosResultFuture := source.WithGroup(grp, metrics.QueryNamespaceAnnotations(start, end))
  83. nsUptimeResult, _ := nsUptimeResultFuture.Await()
  84. nsLabelsResult, _ := nsLabelsResultFuture.Await()
  85. nsAnnosResult, _ := nsAnnosResultFuture.Await()
  86. for _, res := range nsLabelsResult {
  87. err := kms.RegisterNamespace(res.UID, res.Namespace)
  88. if err != nil {
  89. log.Warnf("error registering namespace (%s, %s): %s", res.UID, res.Namespace, err)
  90. continue
  91. }
  92. kms.Namespaces[res.UID].Labels = res.Labels
  93. }
  94. for _, res := range nsAnnosResult {
  95. err := kms.RegisterNamespace(res.UID, res.Namespace)
  96. if err != nil {
  97. log.Warnf("error registering namespace (%s, %s): %s", res.UID, res.Namespace, err)
  98. continue
  99. }
  100. kms.Namespaces[res.UID].Annotations = res.Annotations
  101. }
  102. for _, res := range nsUptimeResult {
  103. if _, ok := kms.Namespaces[res.UID]; !ok {
  104. log.Warnf("could not find ns with uid '%s'", res.UID)
  105. continue
  106. }
  107. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  108. kms.Namespaces[res.UID].Start = s
  109. kms.Namespaces[res.UID].End = e
  110. }
  111. return nil
  112. }
  113. func (km *KubeModel) computeResourceQuotas(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  114. grp := source.NewQueryGroup()
  115. metrics := km.ds.Metrics()
  116. rqUptimeResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaUptime(start, end))
  117. // spec.hard.requests
  118. rqSpecCPURequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPURequestAverage(start, end))
  119. rqSpecCPURequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPURequestMax(start, end))
  120. rqSpecRAMRequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMRequestAverage(start, end))
  121. rqSpecRAMRequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMRequestMax(start, end))
  122. // spec.hard.limits
  123. rqSpecCPULimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPULimitAverage(start, end))
  124. rqSpecCPULimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPULimitMax(start, end))
  125. rqSpecRAMLimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMLimitAverage(start, end))
  126. rqSpecRAMLimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMLimitMax(start, end))
  127. // status.used.requests
  128. rqStatusUsedCPURequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPURequestAverage(start, end))
  129. rqStatusUsedCPURequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPURequestMax(start, end))
  130. rqStatusUsedRAMRequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMRequestAverage(start, end))
  131. rqStatusUsedRAMRequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMRequestMax(start, end))
  132. // status.used.limits
  133. rqStatusUsedCPULimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPULimitAverage(start, end))
  134. rqStatusUsedCPULimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPULimitMax(start, end))
  135. rqStatusUsedRAMLimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMLimitAverage(start, end))
  136. rqStatusUsedRAMLimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMLimitMax(start, end))
  137. rqSpecCPURequestAverageResult, _ := rqSpecCPURequestAverageResultFuture.Await()
  138. for _, res := range rqSpecCPURequestAverageResult {
  139. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  140. if err != nil {
  141. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  142. continue
  143. }
  144. mcpu := res.Data[0].Value * 1000
  145. kms.ResourceQuotas[res.UID].Spec.Hard.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
  146. }
  147. rqSpecCPURequestMaxResult, _ := rqSpecCPURequestMaxResultFuture.Await()
  148. for _, res := range rqSpecCPURequestMaxResult {
  149. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  150. if err != nil {
  151. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  152. continue
  153. }
  154. mcpu := res.Data[0].Value * 1000
  155. kms.ResourceQuotas[res.UID].Spec.Hard.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
  156. }
  157. rqSpecRAMRequestAverageResult, _ := rqSpecRAMRequestAverageResultFuture.Await()
  158. for _, res := range rqSpecRAMRequestAverageResult {
  159. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  160. if err != nil {
  161. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  162. continue
  163. }
  164. kms.ResourceQuotas[res.UID].Spec.Hard.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Data[0].Value)
  165. }
  166. rqSpecRAMRequestMaxResult, _ := rqSpecRAMRequestMaxResultFuture.Await()
  167. for _, res := range rqSpecRAMRequestMaxResult {
  168. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  169. if err != nil {
  170. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  171. continue
  172. }
  173. kms.ResourceQuotas[res.UID].Spec.Hard.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Data[0].Value)
  174. }
  175. rqSpecCPULimitAverageResult, _ := rqSpecCPULimitAverageResultFuture.Await()
  176. for _, res := range rqSpecCPULimitAverageResult {
  177. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  178. if err != nil {
  179. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  180. continue
  181. }
  182. mcpu := res.Data[0].Value * 1000
  183. kms.ResourceQuotas[res.UID].Spec.Hard.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
  184. }
  185. rqSpecCPULimitMaxResult, _ := rqSpecCPULimitMaxResultFuture.Await()
  186. for _, res := range rqSpecCPULimitMaxResult {
  187. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  188. if err != nil {
  189. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  190. continue
  191. }
  192. mcpu := res.Data[0].Value * 1000
  193. kms.ResourceQuotas[res.UID].Spec.Hard.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
  194. }
  195. rqSpecRAMLimitAverageResult, _ := rqSpecRAMLimitAverageResultFuture.Await()
  196. for _, res := range rqSpecRAMLimitAverageResult {
  197. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  198. if err != nil {
  199. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  200. continue
  201. }
  202. kms.ResourceQuotas[res.UID].Spec.Hard.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Data[0].Value)
  203. }
  204. rqSpecRAMLimitMaxResult, _ := rqSpecRAMLimitMaxResultFuture.Await()
  205. for _, res := range rqSpecRAMLimitMaxResult {
  206. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  207. if err != nil {
  208. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  209. continue
  210. }
  211. kms.ResourceQuotas[res.UID].Spec.Hard.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Data[0].Value)
  212. }
  213. rqStatusUsedCPURequestAverageResult, _ := rqStatusUsedCPURequestAverageResultFuture.Await()
  214. for _, res := range rqStatusUsedCPURequestAverageResult {
  215. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  216. if err != nil {
  217. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  218. continue
  219. }
  220. mcpu := res.Data[0].Value * 1000
  221. kms.ResourceQuotas[res.UID].Status.Used.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
  222. }
  223. rqStatusUsedCPURequestMaxResult, _ := rqStatusUsedCPURequestMaxResultFuture.Await()
  224. for _, res := range rqStatusUsedCPURequestMaxResult {
  225. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  226. if err != nil {
  227. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  228. continue
  229. }
  230. mcpu := res.Data[0].Value * 1000
  231. kms.ResourceQuotas[res.UID].Status.Used.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
  232. }
  233. rqStatusUsedRAMRequestAverageResult, _ := rqStatusUsedRAMRequestAverageResultFuture.Await()
  234. for _, res := range rqStatusUsedRAMRequestAverageResult {
  235. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  236. if err != nil {
  237. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  238. continue
  239. }
  240. kms.ResourceQuotas[res.UID].Status.Used.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Data[0].Value)
  241. }
  242. rqStatusUsedRAMRequestMaxResult, _ := rqStatusUsedRAMRequestMaxResultFuture.Await()
  243. for _, res := range rqStatusUsedRAMRequestMaxResult {
  244. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  245. if err != nil {
  246. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  247. continue
  248. }
  249. kms.ResourceQuotas[res.UID].Status.Used.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Data[0].Value)
  250. }
  251. rqStatusUsedCPULimitAverageResult, _ := rqStatusUsedCPULimitAverageResultFuture.Await()
  252. for _, res := range rqStatusUsedCPULimitAverageResult {
  253. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  254. if err != nil {
  255. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  256. continue
  257. }
  258. mcpu := res.Data[0].Value * 1000
  259. kms.ResourceQuotas[res.UID].Status.Used.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
  260. }
  261. rqStatusUsedCPULimitMaxResult, _ := rqStatusUsedCPULimitMaxResultFuture.Await()
  262. for _, res := range rqStatusUsedCPULimitMaxResult {
  263. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  264. if err != nil {
  265. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  266. continue
  267. }
  268. mcpu := res.Data[0].Value * 1000
  269. kms.ResourceQuotas[res.UID].Status.Used.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
  270. }
  271. rqStatusUsedRAMLimitAverageResult, _ := rqStatusUsedRAMLimitAverageResultFuture.Await()
  272. for _, res := range rqStatusUsedRAMLimitAverageResult {
  273. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  274. if err != nil {
  275. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  276. continue
  277. }
  278. kms.ResourceQuotas[res.UID].Status.Used.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Data[0].Value)
  279. }
  280. rqStatusUsedRAMLimitMaxResult, _ := rqStatusUsedRAMLimitMaxResultFuture.Await()
  281. for _, res := range rqStatusUsedRAMLimitMaxResult {
  282. err := kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
  283. if err != nil {
  284. log.Warnf("error registering resource quota (%s, %s, %s): %s", res.UID, res.ResourceQuota, res.Namespace, err)
  285. continue
  286. }
  287. kms.ResourceQuotas[res.UID].Status.Used.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Data[0].Value)
  288. }
  289. rqUptimeResult, _ := rqUptimeResultFuture.Await()
  290. for _, res := range rqUptimeResult {
  291. if _, ok := kms.ResourceQuotas[res.UID]; !ok {
  292. log.Warnf("could not find rq with uid '%s'", res.UID)
  293. continue
  294. }
  295. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  296. kms.ResourceQuotas[res.UID].Start = s
  297. kms.ResourceQuotas[res.UID].End = e
  298. }
  299. return nil
  300. }