increase.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package aggregator
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. type increaseAggregator struct {
  7. lock sync.Mutex
  8. labelValues []string
  9. currentTime time.Time
  10. previousTime time.Time
  11. previous float64
  12. current float64
  13. increase float64
  14. }
  15. func Increase(labelValues []string) MetricAggregator {
  16. return &increaseAggregator{
  17. labelValues: labelValues,
  18. }
  19. }
  20. // getIncrease returns the current increase without updating the state
  21. func (a *increaseAggregator) getIncrease() float64 {
  22. increase := a.increase
  23. // ignore decreases and do not return increase if only one sample has been recorded
  24. if a.previous < a.current && !a.previousTime.IsZero() {
  25. increase += a.current - a.previous
  26. }
  27. return increase
  28. }
  29. func (a *increaseAggregator) AdditionInfo() map[string]string {
  30. return nil
  31. }
  32. func (a *increaseAggregator) LabelValues() []string {
  33. return a.labelValues
  34. }
  35. func (a *increaseAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
  36. a.lock.Lock()
  37. defer a.lock.Unlock()
  38. if timestamp.After(a.currentTime) {
  39. // update state and reset current
  40. a.increase = a.getIncrease()
  41. a.previousTime = a.currentTime
  42. a.currentTime = timestamp
  43. a.previous = a.current
  44. a.current = 0
  45. }
  46. a.current += value
  47. }
  48. func (a *increaseAggregator) Value() []MetricValue {
  49. a.lock.Lock()
  50. defer a.lock.Unlock()
  51. return []MetricValue{
  52. {Value: a.getIncrease()},
  53. }
  54. }