node.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package kubemodel
  2. import (
  3. "time"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/model/kubemodel"
  6. "github.com/opencost/opencost/core/pkg/source"
  7. )
  8. func (km *KubeModel) computeNodes(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  9. grp := source.NewQueryGroup()
  10. metrics := km.ds.Metrics()
  11. nodeInfoResultFuture := source.WithGroup(grp, metrics.QueryNodeInfo(start, end))
  12. nodeUptimeResultFuture := source.WithGroup(grp, metrics.QueryNodeUptime(start, end))
  13. nodeLabelsResultFuture := source.WithGroup(grp, metrics.QueryNodeLabels(start, end))
  14. nodeResourceCapacitiesFuture := source.WithGroup(grp, metrics.QueryNodeResourceCapacities(start, end))
  15. nodeResourcesAllocatableFuture := source.WithGroup(grp, metrics.QueryNodeResourcesAllocatable(start, end))
  16. localStorageBytesFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageBytes(start, end))
  17. localStorageUsedAvgFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageUsedAvg(start, end))
  18. localStorageUsedMaxFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageUsedMax(start, end))
  19. nodeMap := make(map[string]*kubemodel.Node)
  20. nodeInfoResult, _ := nodeInfoResultFuture.Await()
  21. for _, res := range nodeInfoResult {
  22. nodeMap[res.UID] = &kubemodel.Node{
  23. UID: res.UID,
  24. ProviderID: res.ProviderID,
  25. Name: res.Node,
  26. ResourceCapacities: make(kubemodel.ResourceQuantities),
  27. ResourcesAllocatable: make(kubemodel.ResourceQuantities),
  28. }
  29. }
  30. nodeUptimeResult, _ := nodeUptimeResultFuture.Await()
  31. for _, res := range nodeUptimeResult {
  32. node, ok := nodeMap[res.UID]
  33. if !ok {
  34. log.Warnf("node with UID '%s' has not been initialized to add uptime", res.UID)
  35. continue
  36. }
  37. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  38. node.Start = s
  39. node.End = e
  40. }
  41. nodeResourceCapacitiesResult, _ := nodeResourceCapacitiesFuture.Await()
  42. for _, res := range nodeResourceCapacitiesResult {
  43. node, ok := nodeMap[res.UID]
  44. if !ok {
  45. log.Warnf("node with UID '%s' has not been initialized to add resource capacities", res.UID)
  46. continue
  47. }
  48. resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
  49. node.ResourceCapacities.Set(resource, unit, kubemodel.StatAvg, value)
  50. }
  51. nodeResourcesAllocatableResult, _ := nodeResourcesAllocatableFuture.Await()
  52. for _, res := range nodeResourcesAllocatableResult {
  53. node, ok := nodeMap[res.UID]
  54. if !ok {
  55. log.Warnf("node with UID '%s' has not been initialized to add resources allocatable", res.UID)
  56. continue
  57. }
  58. resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
  59. node.ResourcesAllocatable.Set(resource, unit, kubemodel.StatAvg, value)
  60. }
  61. nodeLabelsResult, _ := nodeLabelsResultFuture.Await()
  62. for _, res := range nodeLabelsResult {
  63. node, ok := nodeMap[res.UID]
  64. if !ok {
  65. log.Warnf("node with UID '%s' has not been initialized to add labels", res.UID)
  66. continue
  67. }
  68. node.Labels = res.Labels
  69. }
  70. localStorageBytesResult, _ := localStorageBytesFuture.Await()
  71. for _, res := range localStorageBytesResult {
  72. node, ok := nodeMap[res.UID]
  73. if ok {
  74. node.FileSystem.CapacityBytes = res.Value
  75. }
  76. }
  77. localStorageUsedAvgResult, _ := localStorageUsedAvgFuture.Await()
  78. for _, res := range localStorageUsedAvgResult {
  79. node, ok := nodeMap[res.UID]
  80. if ok {
  81. node.FileSystem.UsageByteAvg = res.Value
  82. }
  83. }
  84. localStorageUsedMaxResult, _ := localStorageUsedMaxFuture.Await()
  85. for _, res := range localStorageUsedMaxResult {
  86. node, ok := nodeMap[res.UID]
  87. if ok {
  88. node.FileSystem.UsageByteMax = res.Value
  89. }
  90. }
  91. for _, node := range nodeMap {
  92. err := kms.RegisterNode(node)
  93. if err != nil {
  94. log.Warnf("Failed to register node: %s", err.Error())
  95. }
  96. }
  97. return nil
  98. }