rate.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package aggregator
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // rateAggregator is a MetricAggregator which returns the average rate per second change of the samples that it tracks.
  7. // to function properly calls to Update must have a timestamp greater than or equal to the last call to update.
  8. type rateAggregator struct {
  9. lock sync.Mutex
  10. labelValues []string
  11. previousTime time.Time
  12. previous float64
  13. currentTime time.Time
  14. current float64
  15. runningAvg float64
  16. seconds float64
  17. }
  18. func Rate(labelValues []string) MetricAggregator {
  19. return &rateAggregator{
  20. labelValues: labelValues,
  21. }
  22. }
  23. // getRunningAvgSeconds returns the running average without updating the state
  24. func (a *rateAggregator) getRunningAvgSeconds() (float64, float64) {
  25. runningAvg := a.runningAvg
  26. seconds := a.seconds
  27. // ignore decreases and base case where only one sample has been recorded
  28. if a.previous < a.current && !a.previousTime.IsZero() {
  29. currentSeconds := a.currentTime.Sub(a.previousTime).Seconds()
  30. // ratio used to add the rate since the last recorded timestamp into the running average
  31. weightingRatio := currentSeconds / (currentSeconds + seconds)
  32. currentRate := (a.current - a.previous) / currentSeconds
  33. runningAvg = (runningAvg * (1 - weightingRatio)) + (currentRate * weightingRatio)
  34. seconds += currentSeconds
  35. }
  36. return runningAvg, seconds
  37. }
  38. func (a *rateAggregator) AdditionInfo() map[string]string {
  39. return nil
  40. }
  41. func (a *rateAggregator) LabelValues() []string {
  42. return a.labelValues
  43. }
  44. func (a *rateAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
  45. a.lock.Lock()
  46. defer a.lock.Unlock()
  47. // If samples from a new timestamp finalize current values by moving them to previous
  48. if timestamp.After(a.currentTime) {
  49. // update state and reset current
  50. a.runningAvg, a.seconds = a.getRunningAvgSeconds()
  51. a.previous = a.current
  52. a.previousTime = a.currentTime
  53. a.currentTime = timestamp
  54. a.current = 0
  55. }
  56. a.current += value
  57. }
  58. func (a *rateAggregator) Value() []MetricValue {
  59. a.lock.Lock()
  60. defer a.lock.Unlock()
  61. average, seconds := a.getRunningAvgSeconds()
  62. if seconds == 0 {
  63. return []MetricValue{
  64. {Value: 0},
  65. }
  66. }
  67. return []MetricValue{
  68. {Value: average},
  69. }
  70. }