cpuallocation.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package synthetic
  2. import (
  3. "maps"
  4. "math"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/source"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  9. )
  10. // CpuUsageMetric contains the last two samples of a CPU instant metric.
  11. type CpuUsageMetric struct {
  12. current *InstantMetric
  13. prev *InstantMetric
  14. }
  15. // NewCpuUsageMetric creates a new cpu usage metric initialized to the provided instant metric
  16. // data.
  17. func NewCpuUsageMetric(t time.Time, m *metric.Update) *CpuUsageMetric {
  18. return new(CpuUsageMetric).Push(t, m)
  19. }
  20. // Push accepts new instant metric data, advances any current data to previous, and sets the new
  21. // current to the provided metric.
  22. func (usage *CpuUsageMetric) Push(t time.Time, m *metric.Update) *CpuUsageMetric {
  23. if usage.current == nil {
  24. usage.current = &InstantMetric{t, m}
  25. return usage
  26. }
  27. usage.prev = usage.current
  28. usage.current = &InstantMetric{t, m}
  29. return usage
  30. }
  31. // Labels returns the labels for any current if it exists first, then looks to any previous data next.
  32. func (usage *CpuUsageMetric) Labels() map[string]string {
  33. if usage.current != nil {
  34. return usage.current.update.Labels
  35. }
  36. if usage.prev != nil {
  37. return usage.prev.update.Labels
  38. }
  39. return map[string]string{}
  40. }
  41. // IsValid returns true when usage is non-nil, the current instant metric is non-nil, and the previous
  42. // instant metric is non-nil
  43. func (usage *CpuUsageMetric) IsValid() bool {
  44. return usage != nil && usage.current != nil && usage.prev != nil
  45. }
  46. // IsEmpty returns true when there are no valid samples
  47. func (usage *CpuUsageMetric) IsEmpty() bool {
  48. return usage == nil || (usage.current == nil && usage.prev == nil)
  49. }
  50. // Value returns the irate of the two metric samples if they exist, and 0 if they don't.
  51. func (usage *CpuUsageMetric) Value() float64 {
  52. if usage.current == nil || usage.prev == nil {
  53. return 0.0
  54. }
  55. v1, t1 := usage.current.update.Value, usage.current.timestamp
  56. v2, t2 := usage.prev.update.Value, usage.prev.timestamp
  57. seconds := t1.Sub(t2).Seconds()
  58. if seconds <= 0.0 {
  59. return 0.0
  60. }
  61. irate := (v1 - v2) / seconds
  62. return irate
  63. }
  64. // Shift will set the previous to the current metric, and set the current metric to nil.
  65. func (usage *CpuUsageMetric) Shift() {
  66. if usage == nil {
  67. return
  68. }
  69. usage.prev = usage.current
  70. usage.current = nil
  71. }
  72. // ContainerCpuAllocationMetric is the grouping unit for cpu usage and cpu request metrics.
  73. type ContainerCpuAllocationMetric struct {
  74. requestMetric *metric.Update
  75. usageMetric *CpuUsageMetric
  76. }
  77. // IsValid returns true if we can synthesize an update from the samples available
  78. func (cmam *ContainerCpuAllocationMetric) IsValid() bool {
  79. return cmam.requestMetric != nil || cmam.usageMetric.IsValid()
  80. }
  81. // Synthesize returns a new CpuAllocation metric update with the max(request, usage)
  82. func (cmam *ContainerCpuAllocationMetric) Synthesize() metric.Update {
  83. if cmam.requestMetric != nil && cmam.usageMetric.IsValid() {
  84. req := cmam.requestMetric.Value
  85. if math.IsNaN(req) {
  86. log.Debugf("NaN value found during cpu allocation synthesis for requests.")
  87. req = 0.0
  88. }
  89. used := cmam.usageMetric.Value()
  90. if math.IsNaN(used) {
  91. log.Debugf("NaN value found during cpu allocation synthesis for used.")
  92. used = 0.0
  93. }
  94. // TODO: validate and merge labels if they both have keys?
  95. labels := maps.Clone(cmam.usageMetric.Labels())
  96. return metric.Update{
  97. Name: metric.ContainerCPUAllocation,
  98. Labels: labels,
  99. Value: max(req, used),
  100. }
  101. } else if cmam.requestMetric != nil {
  102. req := cmam.requestMetric.Value
  103. if math.IsNaN(req) {
  104. log.Debugf("NaN value found during cpu allocation synthesis for requests.")
  105. req = 0.0
  106. }
  107. // drop the "extra" labels
  108. labels := maps.Clone(cmam.requestMetric.Labels)
  109. delete(labels, source.ResourceLabel)
  110. delete(labels, source.UnitLabel)
  111. return metric.Update{
  112. Name: metric.ContainerCPUAllocation,
  113. Labels: labels,
  114. Value: req,
  115. }
  116. }
  117. // not possible for both request and usage to be nil, so we can assume only used is
  118. // valid here
  119. used := cmam.usageMetric.Value()
  120. if math.IsNaN(used) {
  121. log.Debugf("NaN value found during cpu allocation synthesis for used.")
  122. used = 0.0
  123. }
  124. labels := maps.Clone(cmam.usageMetric.Labels())
  125. return metric.Update{
  126. Name: metric.ContainerCPUAllocation,
  127. Labels: labels,
  128. Value: used,
  129. }
  130. }
  131. // IsEmpty returns true if there are no valid samples to extract from
  132. func (cmam *ContainerCpuAllocationMetric) IsEmpty() bool {
  133. return cmam.requestMetric == nil && cmam.usageMetric.IsEmpty()
  134. }
  135. // Cycle will advance the usage sample buffer and clear the request sample.
  136. func (cmam *ContainerCpuAllocationMetric) Cycle() {
  137. cmam.requestMetric = nil
  138. cmam.usageMetric.Shift()
  139. }
  140. // ContainerCpuAllocationSynthesizer is a MetricSynthesizer that leverages pod uid and container name grouping
  141. // to match relevant request and usage metrics to build the cpu allocation data.
  142. type ContainerCpuAllocationSynthesizer struct {
  143. byPod map[string]map[string]*ContainerCpuAllocationMetric
  144. }
  145. // NewContainerCpuAllocationSynthesizer creates a new ContainerCpuAllocationSynthesizer which synthesizes
  146. // metric updates for ContainerCPUAllocation from cpu requests and cpu usage metrics.
  147. func NewContainerCpuAllocationSynthesizer() *ContainerCpuAllocationSynthesizer {
  148. return &ContainerCpuAllocationSynthesizer{
  149. byPod: make(map[string]map[string]*ContainerCpuAllocationMetric),
  150. }
  151. }
  152. // Process only processes cpu requests and cpu usage metrics
  153. func (cmas *ContainerCpuAllocationSynthesizer) Process(t time.Time, update *metric.Update) {
  154. switch update.Name {
  155. case metric.KubePodContainerResourceRequests:
  156. cmas.addRequestsMetric(update)
  157. case metric.ContainerCPUUsageSecondsTotal:
  158. cmas.addUsageMetric(t, update)
  159. }
  160. }
  161. // Synthesize will synthesize all valid synthesizers within the pod/container mapping.
  162. func (cmas *ContainerCpuAllocationSynthesizer) Synthesize() []metric.Update {
  163. var updates []metric.Update
  164. for _, pod := range cmas.byPod {
  165. for _, synthesizer := range pod {
  166. isValid := synthesizer.IsValid()
  167. if isValid {
  168. updates = append(updates, synthesizer.Synthesize())
  169. }
  170. }
  171. }
  172. return updates
  173. }
  174. // Clear for the CpuAllocationSynthesis must cycle the samples, and only remove them if there is no
  175. // more valid sample data remaining.
  176. func (cmas *ContainerCpuAllocationSynthesizer) Clear() {
  177. for podKey, pod := range cmas.byPod {
  178. for synthKey, synthesizer := range pod {
  179. synthesizer.Cycle()
  180. if synthesizer.IsEmpty() {
  181. delete(pod, synthKey)
  182. }
  183. }
  184. if len(pod) == 0 {
  185. delete(cmas.byPod, podKey)
  186. }
  187. }
  188. }
  189. func (cmas *ContainerCpuAllocationSynthesizer) addRequestsMetric(update *metric.Update) {
  190. if !cmas.isValidRequests(update.Labels) {
  191. return
  192. }
  193. podUID := update.Labels[source.UIDLabel]
  194. container := update.Labels[source.ContainerLabel]
  195. if _, ok := cmas.byPod[podUID]; !ok {
  196. cmas.byPod[podUID] = make(map[string]*ContainerCpuAllocationMetric)
  197. }
  198. if _, ok := cmas.byPod[podUID][container]; !ok {
  199. cmas.byPod[podUID][container] = &ContainerCpuAllocationMetric{
  200. requestMetric: update,
  201. }
  202. } else {
  203. cmas.byPod[podUID][container].requestMetric = update
  204. }
  205. }
  206. func (cmas *ContainerCpuAllocationSynthesizer) addUsageMetric(t time.Time, update *metric.Update) {
  207. if !cmas.isValidUsage(update.Labels) {
  208. return
  209. }
  210. podUID := update.Labels[source.UIDLabel]
  211. container := update.Labels[source.ContainerLabel]
  212. if _, ok := cmas.byPod[podUID]; !ok {
  213. cmas.byPod[podUID] = make(map[string]*ContainerCpuAllocationMetric)
  214. }
  215. if _, ok := cmas.byPod[podUID][container]; !ok {
  216. cmas.byPod[podUID][container] = &ContainerCpuAllocationMetric{
  217. usageMetric: NewCpuUsageMetric(t, update),
  218. }
  219. } else {
  220. cpuAllocMetric := cmas.byPod[podUID][container]
  221. if cpuAllocMetric.usageMetric == nil {
  222. cpuAllocMetric.usageMetric = NewCpuUsageMetric(t, update)
  223. } else {
  224. cpuAllocMetric.usageMetric.Push(t, update)
  225. }
  226. }
  227. }
  228. func (cmas *ContainerCpuAllocationSynthesizer) isValidRequests(labels map[string]string) bool {
  229. return labels[source.ResourceLabel] == "cpu" &&
  230. labels[source.UnitLabel] == "core" &&
  231. labels[source.ContainerLabel] != "POD" &&
  232. labels[source.ContainerLabel] != "" &&
  233. labels[source.NodeLabel] != "" &&
  234. labels[source.UIDLabel] != ""
  235. }
  236. func (cmas *ContainerCpuAllocationSynthesizer) isValidUsage(labels map[string]string) bool {
  237. return labels[source.ContainerLabel] != "POD" &&
  238. labels[source.ContainerLabel] != "" &&
  239. labels[source.UIDLabel] != ""
  240. }