| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- package aggregator
- import (
- "sync"
- "time"
- )
- // rateAggregator is a MetricAggregator which returns the average rate per second change of the samples that it tracks.
- // to function properly calls to Update must have a timestamp greater than or equal to the last call to update.
- type rateAggregator struct {
- lock sync.Mutex
- labelValues []string
- previousTime time.Time
- previous float64
- currentTime time.Time
- current float64
- runningAvg float64
- seconds float64
- }
- func Rate(labelValues []string) MetricAggregator {
- return &rateAggregator{
- labelValues: labelValues,
- }
- }
- // getRunningAvgSeconds returns the running average without updating the state
- func (a *rateAggregator) getRunningAvgSeconds() (float64, float64) {
- runningAvg := a.runningAvg
- seconds := a.seconds
- // ignore decreases and base case where only one sample has been recorded
- if a.previous < a.current && !a.previousTime.IsZero() {
- currentSeconds := a.currentTime.Sub(a.previousTime).Seconds()
- // ratio used to add the rate since the last recorded timestamp into the running average
- weightingRatio := currentSeconds / (currentSeconds + seconds)
- currentRate := (a.current - a.previous) / currentSeconds
- runningAvg = (runningAvg * (1 - weightingRatio)) + (currentRate * weightingRatio)
- seconds += currentSeconds
- }
- return runningAvg, seconds
- }
- func (a *rateAggregator) AdditionInfo() map[string]string {
- return nil
- }
- func (a *rateAggregator) LabelValues() []string {
- return a.labelValues
- }
- func (a *rateAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
- a.lock.Lock()
- defer a.lock.Unlock()
- // If samples from a new timestamp finalize current values by moving them to previous
- if timestamp.After(a.currentTime) {
- // update state and reset current
- a.runningAvg, a.seconds = a.getRunningAvgSeconds()
- a.previous = a.current
- a.previousTime = a.currentTime
- a.currentTime = timestamp
- a.current = 0
- }
- a.current += value
- }
- func (a *rateAggregator) Value() []MetricValue {
- a.lock.Lock()
- defer a.lock.Unlock()
- average, seconds := a.getRunningAvgSeconds()
- if seconds == 0 {
- return []MetricValue{
- {Value: 0},
- }
- }
- return []MetricValue{
- {Value: average},
- }
- }
|