statsummary.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package scrape
  2. import (
  3. "github.com/kubecost/events"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/nodestats"
  6. "github.com/opencost/opencost/core/pkg/source"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/event"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  9. stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
  10. )
  11. type StatSummaryScraper struct {
  12. client nodestats.StatSummaryClient
  13. }
  14. func newStatSummaryScraper(client nodestats.StatSummaryClient) Scraper {
  15. return &StatSummaryScraper{
  16. client: client,
  17. }
  18. }
  19. func (s *StatSummaryScraper) Scrape() []metric.Update {
  20. var scrapeResults []metric.Update
  21. nodeStats, err := s.client.GetNodeData()
  22. // record errors but process successfully retrieved nodes
  23. errs := make([]error, 0)
  24. if err != nil {
  25. if multiErr, ok := err.(interface{ Unwrap() []error }); ok {
  26. errs = multiErr.Unwrap()
  27. } else {
  28. errs = []error{err}
  29. }
  30. log.Errorf("error retrieving node stat data: %s", err.Error())
  31. }
  32. // track if a pvc has already been seen when updating KubeletVolumeStatsUsedBytes
  33. seenPVC := map[stats.PVCReference]struct{}{}
  34. for _, stat := range nodeStats {
  35. nodeName := stat.Node.NodeName
  36. if stat.Node.CPU != nil && stat.Node.CPU.UsageCoreNanoSeconds != nil {
  37. scrapeResults = append(scrapeResults, metric.Update{
  38. Name: metric.NodeCPUSecondsTotal,
  39. Labels: map[string]string{
  40. source.KubernetesNodeLabel: nodeName,
  41. source.ModeLabel: "", // TODO
  42. },
  43. Value: float64(*stat.Node.CPU.UsageCoreNanoSeconds) * 1e-9,
  44. })
  45. }
  46. if stat.Node.Fs != nil && stat.Node.Fs.CapacityBytes != nil {
  47. scrapeResults = append(scrapeResults, metric.Update{
  48. Name: metric.NodeFSCapacityBytes,
  49. Labels: map[string]string{
  50. source.InstanceLabel: nodeName,
  51. source.DeviceLabel: "local", // This value has to be populated but isn't important here
  52. },
  53. Value: float64(*stat.Node.Fs.CapacityBytes),
  54. })
  55. }
  56. for _, pod := range stat.Pods {
  57. podName := pod.PodRef.Name
  58. namespace := pod.PodRef.Namespace
  59. podUID := pod.PodRef.UID
  60. if pod.Network != nil {
  61. networkLabels := map[string]string{
  62. source.UIDLabel: podUID,
  63. source.PodLabel: podName,
  64. source.NamespaceLabel: namespace,
  65. }
  66. // The network may contain a list of stats or itself be a single stat, if the list is not present
  67. // scrape the object itself
  68. if pod.Network.Interfaces != nil {
  69. for _, networkStat := range pod.Network.Interfaces {
  70. scrapeNetworkStats(&scrapeResults, networkLabels, networkStat)
  71. }
  72. } else {
  73. scrapeNetworkStats(&scrapeResults, networkLabels, pod.Network.InterfaceStats)
  74. }
  75. }
  76. for _, volumeStats := range pod.VolumeStats {
  77. if volumeStats.PVCRef == nil || volumeStats.UsedBytes == nil {
  78. continue
  79. }
  80. if _, ok := seenPVC[*volumeStats.PVCRef]; ok {
  81. continue
  82. }
  83. scrapeResults = append(scrapeResults, metric.Update{
  84. Name: metric.KubeletVolumeStatsUsedBytes,
  85. Labels: map[string]string{
  86. source.PVCLabel: volumeStats.PVCRef.Name,
  87. source.NamespaceLabel: volumeStats.PVCRef.Namespace,
  88. source.UIDLabel: podUID,
  89. },
  90. Value: float64(*volumeStats.UsedBytes),
  91. })
  92. seenPVC[*volumeStats.PVCRef] = struct{}{}
  93. }
  94. for _, container := range pod.Containers {
  95. if container.CPU != nil && container.CPU.UsageCoreNanoSeconds != nil {
  96. scrapeResults = append(scrapeResults, metric.Update{
  97. Name: metric.ContainerCPUUsageSecondsTotal,
  98. Labels: map[string]string{
  99. source.ContainerLabel: container.Name,
  100. source.PodLabel: podName,
  101. source.NamespaceLabel: namespace,
  102. source.NodeLabel: nodeName,
  103. source.InstanceLabel: nodeName,
  104. source.UIDLabel: podUID,
  105. },
  106. Value: float64(*container.CPU.UsageCoreNanoSeconds) * 1e-9,
  107. })
  108. }
  109. if container.Memory != nil && container.Memory.WorkingSetBytes != nil {
  110. scrapeResults = append(scrapeResults, metric.Update{
  111. Name: metric.ContainerMemoryWorkingSetBytes,
  112. Labels: map[string]string{
  113. source.ContainerLabel: container.Name,
  114. source.PodLabel: podName,
  115. source.NamespaceLabel: namespace,
  116. source.NodeLabel: nodeName,
  117. source.InstanceLabel: nodeName,
  118. source.UIDLabel: podUID,
  119. },
  120. Value: float64(*container.Memory.WorkingSetBytes),
  121. })
  122. }
  123. if container.Rootfs != nil && container.Rootfs.UsedBytes != nil {
  124. scrapeResults = append(scrapeResults, metric.Update{
  125. Name: metric.ContainerFSUsageBytes,
  126. Labels: map[string]string{
  127. source.InstanceLabel: nodeName,
  128. source.DeviceLabel: "local",
  129. source.UIDLabel: podUID,
  130. },
  131. Value: float64(*container.Rootfs.UsedBytes),
  132. })
  133. }
  134. }
  135. }
  136. }
  137. events.Dispatch(event.ScrapeEvent{
  138. ScraperName: event.NodeStatsScraperName,
  139. Targets: len(nodeStats) + len(errs),
  140. Errors: errs,
  141. })
  142. return scrapeResults
  143. }
  144. func scrapeNetworkStats(scrapeResults *[]metric.Update, labels map[string]string, networkStats stats.InterfaceStats) {
  145. // Skip stats for cni0 which tracks internal cluster traffic
  146. if networkStats.Name == "cni0" {
  147. return
  148. }
  149. if networkStats.RxBytes != nil {
  150. *scrapeResults = append(*scrapeResults, metric.Update{
  151. Name: metric.ContainerNetworkReceiveBytesTotal,
  152. Labels: labels,
  153. Value: float64(*networkStats.RxBytes),
  154. })
  155. }
  156. if networkStats.TxBytes != nil {
  157. *scrapeResults = append(*scrapeResults, metric.Update{
  158. Name: metric.ContainerNetworkTransmitBytesTotal,
  159. Labels: labels,
  160. Value: float64(*networkStats.TxBytes),
  161. })
  162. }
  163. }