rate.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package aggregator
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // rateAggregator is a MetricAggregator which returns the average rate per second change of the samples that it tracks.
  7. // to function properly calls to Update must have a timestamp greater than or equal to the last call to update.
  8. type rateAggregator struct {
  9. lock sync.Mutex
  10. labelValues []string
  11. initialized bool
  12. initialTime time.Time
  13. currentTime time.Time
  14. initial float64
  15. current float64
  16. }
  17. func Rate(labelValues []string) MetricAggregator {
  18. return &rateAggregator{
  19. labelValues: labelValues,
  20. }
  21. }
  22. func (a *rateAggregator) AdditionInfo() map[string]string {
  23. return nil
  24. }
  25. func (a *rateAggregator) LabelValues() []string {
  26. return a.labelValues
  27. }
  28. func (a *rateAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
  29. a.lock.Lock()
  30. defer a.lock.Unlock()
  31. if !a.initialized {
  32. a.initialTime = timestamp
  33. a.currentTime = timestamp
  34. a.initialized = true
  35. }
  36. if a.initialTime == timestamp {
  37. a.initial += value
  38. }
  39. if a.currentTime.Before(timestamp) {
  40. a.currentTime = timestamp
  41. a.current = 0
  42. }
  43. a.current += value
  44. }
  45. func (a *rateAggregator) Value() []MetricValue {
  46. a.lock.Lock()
  47. defer a.lock.Unlock()
  48. seconds := a.currentTime.Sub(a.initialTime).Seconds()
  49. if seconds == 0 {
  50. return []MetricValue{
  51. {Value: 0},
  52. }
  53. }
  54. increase := a.current - a.initial
  55. return []MetricValue{
  56. {Value: increase / seconds},
  57. }
  58. }