statsummary.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package scrape
  2. import (
  3. "github.com/kubecost/events"
  4. "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/nodestats"
  7. "github.com/opencost/opencost/core/pkg/source"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/event"
  9. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  10. stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
  11. )
  12. type StatSummaryScraper struct {
  13. client nodestats.StatSummaryClient
  14. clusterCache clustercache.ClusterCache
  15. }
  16. func newStatSummaryScraper(client nodestats.StatSummaryClient, clusterCache clustercache.ClusterCache) Scraper {
  17. return &StatSummaryScraper{
  18. client: client,
  19. clusterCache: clusterCache,
  20. }
  21. }
  22. func (s *StatSummaryScraper) Scrape() []metric.Update {
  23. nodeNameToUID := buildNodeIndex(s.clusterCache.GetAllNodes())
  24. pvcNameToUID := buildPVCIndex(s.clusterCache.GetAllPersistentVolumeClaims())
  25. var scrapeResults []metric.Update
  26. nodeStats, err := s.client.GetNodeData()
  27. // record errors but process successfully retrieved nodes
  28. errs := make([]error, 0)
  29. if err != nil {
  30. if multiErr, ok := err.(interface{ Unwrap() []error }); ok {
  31. errs = multiErr.Unwrap()
  32. } else {
  33. errs = []error{err}
  34. }
  35. log.Errorf("error retrieving node stat data: %s", err.Error())
  36. }
  37. // track if a pvc has already been seen when updating KubeletVolumeStatsUsedBytes
  38. seenPVC := map[stats.PVCReference]struct{}{}
  39. for _, stat := range nodeStats {
  40. nodeName := stat.Node.NodeName
  41. nodeUID := string(nodeNameToUID[nodeName])
  42. if stat.Node.CPU != nil && stat.Node.CPU.UsageCoreNanoSeconds != nil {
  43. scrapeResults = append(scrapeResults, metric.Update{
  44. Name: metric.NodeCPUSecondsTotal,
  45. Labels: map[string]string{
  46. source.KubernetesNodeLabel: nodeName,
  47. source.UIDLabel: nodeUID,
  48. source.ModeLabel: "", // TODO
  49. },
  50. Value: float64(*stat.Node.CPU.UsageCoreNanoSeconds) * 1e-9,
  51. })
  52. }
  53. if stat.Node.Fs != nil && stat.Node.Fs.CapacityBytes != nil {
  54. scrapeResults = append(scrapeResults, metric.Update{
  55. Name: metric.NodeFSCapacityBytes,
  56. Labels: map[string]string{
  57. source.InstanceLabel: nodeName,
  58. source.UIDLabel: nodeUID,
  59. source.DeviceLabel: "local", // This value has to be populated but isn't important here
  60. },
  61. Value: float64(*stat.Node.Fs.CapacityBytes),
  62. })
  63. }
  64. for _, pod := range stat.Pods {
  65. podName := pod.PodRef.Name
  66. namespace := pod.PodRef.Namespace
  67. podUID := pod.PodRef.UID
  68. if pod.Network != nil {
  69. networkLabels := map[string]string{
  70. source.UIDLabel: podUID,
  71. source.NodeUIDLabel: nodeUID,
  72. source.PodLabel: podName,
  73. source.NamespaceLabel: namespace,
  74. }
  75. // The network may contain a list of stats or itself be a single stat, if the list is not present
  76. // scrape the object itself
  77. if pod.Network.Interfaces != nil {
  78. for _, networkStat := range pod.Network.Interfaces {
  79. scrapeNetworkStats(&scrapeResults, networkLabels, networkStat)
  80. }
  81. } else {
  82. scrapeNetworkStats(&scrapeResults, networkLabels, pod.Network.InterfaceStats)
  83. }
  84. }
  85. for _, volumeStats := range pod.VolumeStats {
  86. if volumeStats.PVCRef == nil || volumeStats.UsedBytes == nil {
  87. continue
  88. }
  89. if _, ok := seenPVC[*volumeStats.PVCRef]; ok {
  90. continue
  91. }
  92. pvcUID := string(pvcNameToUID[pvcKey{name: volumeStats.PVCRef.Name, namespace: volumeStats.PVCRef.Namespace}])
  93. scrapeResults = append(scrapeResults, metric.Update{
  94. Name: metric.KubeletVolumeStatsUsedBytes,
  95. Labels: map[string]string{
  96. source.PVCLabel: volumeStats.PVCRef.Name,
  97. source.NamespaceLabel: volumeStats.PVCRef.Namespace,
  98. source.UIDLabel: podUID,
  99. source.NodeUIDLabel: nodeUID,
  100. source.PVCUIDLabel: pvcUID,
  101. },
  102. Value: float64(*volumeStats.UsedBytes),
  103. })
  104. seenPVC[*volumeStats.PVCRef] = struct{}{}
  105. }
  106. for _, container := range pod.Containers {
  107. if container.CPU != nil && container.CPU.UsageCoreNanoSeconds != nil {
  108. scrapeResults = append(scrapeResults, metric.Update{
  109. Name: metric.ContainerCPUUsageSecondsTotal,
  110. Labels: map[string]string{
  111. source.ContainerLabel: container.Name,
  112. source.PodLabel: podName,
  113. source.NamespaceLabel: namespace,
  114. source.NodeLabel: nodeName,
  115. source.InstanceLabel: nodeName,
  116. source.UIDLabel: podUID,
  117. source.NodeUIDLabel: nodeUID,
  118. },
  119. Value: float64(*container.CPU.UsageCoreNanoSeconds) * 1e-9,
  120. })
  121. }
  122. if container.Memory != nil && container.Memory.WorkingSetBytes != nil {
  123. scrapeResults = append(scrapeResults, metric.Update{
  124. Name: metric.ContainerMemoryWorkingSetBytes,
  125. Labels: map[string]string{
  126. source.ContainerLabel: container.Name,
  127. source.PodLabel: podName,
  128. source.NamespaceLabel: namespace,
  129. source.NodeLabel: nodeName,
  130. source.InstanceLabel: nodeName,
  131. source.UIDLabel: podUID,
  132. source.NodeUIDLabel: nodeUID,
  133. },
  134. Value: float64(*container.Memory.WorkingSetBytes),
  135. })
  136. }
  137. if container.Rootfs != nil && container.Rootfs.UsedBytes != nil {
  138. scrapeResults = append(scrapeResults, metric.Update{
  139. Name: metric.ContainerFSUsageBytes,
  140. Labels: map[string]string{
  141. source.InstanceLabel: nodeName,
  142. source.DeviceLabel: "local",
  143. source.UIDLabel: podUID,
  144. source.NodeUIDLabel: nodeUID,
  145. source.ContainerLabel: container.Name,
  146. },
  147. Value: float64(*container.Rootfs.UsedBytes),
  148. })
  149. }
  150. }
  151. }
  152. }
  153. events.Dispatch(event.ScrapeEvent{
  154. ScraperName: event.NodeStatsScraperName,
  155. Targets: len(nodeStats) + len(errs),
  156. Errors: errs,
  157. })
  158. return scrapeResults
  159. }
  160. func scrapeNetworkStats(scrapeResults *[]metric.Update, labels map[string]string, networkStats stats.InterfaceStats) {
  161. // Skip stats for cni0 which tracks internal cluster traffic
  162. if networkStats.Name == "cni0" {
  163. return
  164. }
  165. if networkStats.RxBytes != nil {
  166. *scrapeResults = append(*scrapeResults, metric.Update{
  167. Name: metric.ContainerNetworkReceiveBytesTotal,
  168. Labels: labels,
  169. Value: float64(*networkStats.RxBytes),
  170. })
  171. }
  172. if networkStats.TxBytes != nil {
  173. *scrapeResults = append(*scrapeResults, metric.Update{
  174. Name: metric.ContainerNetworkTransmitBytesTotal,
  175. Labels: labels,
  176. Value: float64(*networkStats.TxBytes),
  177. })
  178. }
  179. }