2
0

cpuallocation.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. package synthetic
  2. import (
  3. "maps"
  4. "math"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/source"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  9. )
  10. // CpuUsageMetric contains the last two samples of a CPU instant metric.
  11. type CpuUsageMetric struct {
  12. current *InstantMetric
  13. prev *InstantMetric
  14. }
  15. // NewCpuUsageMetric creates a new cpu usage metric initialized to the provided instant metric
  16. // data.
  17. func NewCpuUsageMetric(t time.Time, m *metric.Update) *CpuUsageMetric {
  18. return new(CpuUsageMetric).Push(t, m)
  19. }
  20. // Push accepts new instant metric data, advances any current data to previous, and sets the new
  21. // current to the provided metric.
  22. func (usage *CpuUsageMetric) Push(t time.Time, m *metric.Update) *CpuUsageMetric {
  23. if usage.current == nil {
  24. usage.current = &InstantMetric{t, m}
  25. return usage
  26. }
  27. usage.prev = usage.current
  28. usage.current = &InstantMetric{t, m}
  29. return usage
  30. }
  31. // Labels returns the labels for any current if it exists first, then looks to any previous data next.
  32. func (usage *CpuUsageMetric) Labels() map[string]string {
  33. if usage.current != nil {
  34. return usage.current.update.Labels
  35. }
  36. if usage.prev != nil {
  37. return usage.prev.update.Labels
  38. }
  39. return map[string]string{}
  40. }
  41. // IsValid returns true when usage is non-nil, the current instant metric is non-nil, and the previous
  42. // instant metric is non-nil
  43. func (usage *CpuUsageMetric) IsValid() bool {
  44. return usage != nil && usage.current != nil && usage.prev != nil
  45. }
  46. // IsEmpty returns true when there are no valid samples
  47. func (usage *CpuUsageMetric) IsEmpty() bool {
  48. return usage == nil || (usage.current == nil && usage.prev == nil)
  49. }
  50. // Value returns the irate of the two metric samples if they exist, and 0 if they don't.
  51. func (usage *CpuUsageMetric) Value() float64 {
  52. if usage.current == nil || usage.prev == nil {
  53. return 0.0
  54. }
  55. curr, t1 := usage.current.update.Value, usage.current.timestamp
  56. prev, t2 := usage.prev.update.Value, usage.prev.timestamp
  57. // handle case where current value is less than the previous value, signalling
  58. // that the running total was reset, or overflowed.
  59. if curr < prev {
  60. return 0.0
  61. }
  62. seconds := t1.Sub(t2).Seconds()
  63. // ensure positive non-zero duration between samples
  64. if seconds <= 0.0 {
  65. return 0.0
  66. }
  67. irate := (curr - prev) / seconds
  68. return max(0.0, irate)
  69. }
  70. // Shift will set the previous to the current metric, and set the current metric to nil.
  71. func (usage *CpuUsageMetric) Shift() {
  72. if usage == nil {
  73. return
  74. }
  75. usage.prev = usage.current
  76. usage.current = nil
  77. }
  78. // ContainerCpuAllocationMetric is the grouping unit for cpu usage and cpu request metrics.
  79. type ContainerCpuAllocationMetric struct {
  80. requestMetric *metric.Update
  81. usageMetric *CpuUsageMetric
  82. }
  83. // IsValid returns true if we can synthesize an update from the samples available
  84. func (cmam *ContainerCpuAllocationMetric) IsValid() bool {
  85. return cmam.requestMetric != nil || cmam.usageMetric.IsValid()
  86. }
  87. // Synthesize returns a new CpuAllocation metric update with the max(request, usage)
  88. func (cmam *ContainerCpuAllocationMetric) Synthesize() metric.Update {
  89. if cmam.requestMetric != nil && cmam.usageMetric.IsValid() {
  90. req := cmam.requestMetric.Value
  91. if math.IsNaN(req) {
  92. log.Debugf("NaN value found during cpu allocation synthesis for requests.")
  93. req = 0.0
  94. }
  95. used := cmam.usageMetric.Value()
  96. if math.IsNaN(used) {
  97. log.Debugf("NaN value found during cpu allocation synthesis for used.")
  98. used = 0.0
  99. }
  100. // TODO: validate and merge labels if they both have keys?
  101. labels := maps.Clone(cmam.usageMetric.Labels())
  102. return metric.Update{
  103. Name: metric.ContainerCPUAllocation,
  104. Labels: labels,
  105. Value: max(req, used),
  106. }
  107. } else if cmam.requestMetric != nil {
  108. req := cmam.requestMetric.Value
  109. if math.IsNaN(req) {
  110. log.Debugf("NaN value found during cpu allocation synthesis for requests.")
  111. req = 0.0
  112. }
  113. // drop the "extra" labels
  114. labels := maps.Clone(cmam.requestMetric.Labels)
  115. delete(labels, source.ResourceLabel)
  116. delete(labels, source.UnitLabel)
  117. return metric.Update{
  118. Name: metric.ContainerCPUAllocation,
  119. Labels: labels,
  120. Value: req,
  121. }
  122. }
  123. // not possible for both request and usage to be nil, so we can assume only used is
  124. // valid here
  125. used := cmam.usageMetric.Value()
  126. if math.IsNaN(used) {
  127. log.Debugf("NaN value found during cpu allocation synthesis for used.")
  128. used = 0.0
  129. }
  130. labels := maps.Clone(cmam.usageMetric.Labels())
  131. return metric.Update{
  132. Name: metric.ContainerCPUAllocation,
  133. Labels: labels,
  134. Value: used,
  135. }
  136. }
  137. // IsEmpty returns true if there are no valid samples to extract from
  138. func (cmam *ContainerCpuAllocationMetric) IsEmpty() bool {
  139. return cmam.requestMetric == nil && cmam.usageMetric.IsEmpty()
  140. }
  141. // Cycle will advance the usage sample buffer and clear the request sample.
  142. func (cmam *ContainerCpuAllocationMetric) Cycle() {
  143. cmam.requestMetric = nil
  144. cmam.usageMetric.Shift()
  145. }
  146. // ContainerCpuAllocationSynthesizer is a MetricSynthesizer that leverages pod uid and container name grouping
  147. // to match relevant request and usage metrics to build the cpu allocation data.
  148. type ContainerCpuAllocationSynthesizer struct {
  149. byPod map[string]map[string]*ContainerCpuAllocationMetric
  150. }
  151. // NewContainerCpuAllocationSynthesizer creates a new ContainerCpuAllocationSynthesizer which synthesizes
  152. // metric updates for ContainerCPUAllocation from cpu requests and cpu usage metrics.
  153. func NewContainerCpuAllocationSynthesizer() *ContainerCpuAllocationSynthesizer {
  154. return &ContainerCpuAllocationSynthesizer{
  155. byPod: make(map[string]map[string]*ContainerCpuAllocationMetric),
  156. }
  157. }
  158. // Process only processes cpu requests and cpu usage metrics
  159. func (cmas *ContainerCpuAllocationSynthesizer) Process(t time.Time, update *metric.Update) {
  160. switch update.Name {
  161. case metric.KubePodContainerResourceRequests:
  162. cmas.addRequestsMetric(update)
  163. case metric.ContainerCPUUsageSecondsTotal:
  164. cmas.addUsageMetric(t, update)
  165. }
  166. }
  167. // Synthesize will synthesize all valid synthesizers within the pod/container mapping.
  168. func (cmas *ContainerCpuAllocationSynthesizer) Synthesize() []metric.Update {
  169. var updates []metric.Update
  170. for _, pod := range cmas.byPod {
  171. for _, synthesizer := range pod {
  172. isValid := synthesizer.IsValid()
  173. if isValid {
  174. updates = append(updates, synthesizer.Synthesize())
  175. }
  176. }
  177. }
  178. return updates
  179. }
  180. // Clear for the CpuAllocationSynthesis must cycle the samples, and only remove them if there is no
  181. // more valid sample data remaining.
  182. func (cmas *ContainerCpuAllocationSynthesizer) Clear() {
  183. for podKey, pod := range cmas.byPod {
  184. for synthKey, synthesizer := range pod {
  185. synthesizer.Cycle()
  186. if synthesizer.IsEmpty() {
  187. delete(pod, synthKey)
  188. }
  189. }
  190. if len(pod) == 0 {
  191. delete(cmas.byPod, podKey)
  192. }
  193. }
  194. }
  195. func (cmas *ContainerCpuAllocationSynthesizer) addRequestsMetric(update *metric.Update) {
  196. if !cmas.isValidRequests(update.Labels) {
  197. return
  198. }
  199. podUID := update.Labels[source.UIDLabel]
  200. container := update.Labels[source.ContainerLabel]
  201. if _, ok := cmas.byPod[podUID]; !ok {
  202. cmas.byPod[podUID] = make(map[string]*ContainerCpuAllocationMetric)
  203. }
  204. if _, ok := cmas.byPod[podUID][container]; !ok {
  205. cmas.byPod[podUID][container] = &ContainerCpuAllocationMetric{
  206. requestMetric: update,
  207. }
  208. } else {
  209. cmas.byPod[podUID][container].requestMetric = update
  210. }
  211. }
  212. func (cmas *ContainerCpuAllocationSynthesizer) addUsageMetric(t time.Time, update *metric.Update) {
  213. if !cmas.isValidUsage(update.Labels) {
  214. return
  215. }
  216. podUID := update.Labels[source.UIDLabel]
  217. container := update.Labels[source.ContainerLabel]
  218. if _, ok := cmas.byPod[podUID]; !ok {
  219. cmas.byPod[podUID] = make(map[string]*ContainerCpuAllocationMetric)
  220. }
  221. if _, ok := cmas.byPod[podUID][container]; !ok {
  222. cmas.byPod[podUID][container] = &ContainerCpuAllocationMetric{
  223. usageMetric: NewCpuUsageMetric(t, update),
  224. }
  225. } else {
  226. cpuAllocMetric := cmas.byPod[podUID][container]
  227. if cpuAllocMetric.usageMetric == nil {
  228. cpuAllocMetric.usageMetric = NewCpuUsageMetric(t, update)
  229. } else {
  230. cpuAllocMetric.usageMetric.Push(t, update)
  231. }
  232. }
  233. }
  234. func (cmas *ContainerCpuAllocationSynthesizer) isValidRequests(labels map[string]string) bool {
  235. return labels[source.ResourceLabel] == "cpu" &&
  236. labels[source.UnitLabel] == "core" &&
  237. labels[source.ContainerLabel] != "POD" &&
  238. labels[source.ContainerLabel] != "" &&
  239. labels[source.NodeLabel] != "" &&
  240. labels[source.UIDLabel] != ""
  241. }
  242. func (cmas *ContainerCpuAllocationSynthesizer) isValidUsage(labels map[string]string) bool {
  243. return labels[source.ContainerLabel] != "POD" &&
  244. labels[source.ContainerLabel] != "" &&
  245. labels[source.UIDLabel] != ""
  246. }