| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package metric
- import (
- "fmt"
- "sync"
- "time"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/modules/collector-source/pkg/util"
- )
- // MetricRepository is an MetricUpdater which applies calls to update to all resolutions being tracked. It holds the
- // MetricStore instances for each resolution.
- type MetricRepository struct {
- lock sync.Mutex
- resolutionStores map[string]*resolutionStores
- }
- func NewMetricRepository(
- resolutions []*util.Resolution,
- storeFactory MetricStoreFactory,
- ) *MetricRepository {
- resoluationCollectors := make(map[string]*resolutionStores)
- var limitResolution *util.Resolution
- for _, resolution := range resolutions {
- if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
- limitResolution = resolution
- }
- resCollector, err := newResolutionStores(resolution, storeFactory)
- if err != nil {
- log.Errorf("NewMetricRepository: failed to init resolution metric: %s", err.Error())
- continue
- }
- resoluationCollectors[resolution.Interval()] = resCollector
- }
- return &MetricRepository{
- resolutionStores: resoluationCollectors,
- }
- }
- func (r *MetricRepository) GetCollector(interval string, t time.Time) (MetricStore, error) {
- r.lock.Lock()
- defer r.lock.Unlock()
- resCollector, ok := r.resolutionStores[interval]
- if !ok {
- return nil, fmt.Errorf("failed to find resolution for key %s", interval)
- }
- return resCollector.getCollector(t)
- }
- // Update calls Update on the collectors for each resolution
- func (r *MetricRepository) Update(
- updateSet *UpdateSet,
- ) {
- r.lock.Lock()
- defer r.lock.Unlock()
- if updateSet == nil {
- return
- }
- // Call update on the collectors for each resolution
- for _, resCollector := range r.resolutionStores {
- resCollector.update(updateSet)
- }
- }
- func (r *MetricRepository) Coverage() map[string][]time.Time {
- r.lock.Lock()
- defer r.lock.Unlock()
- result := make(map[string][]time.Time)
- for resKey, resCollector := range r.resolutionStores {
- var windowStarts []time.Time
- for _, key := range resCollector.getKeys() {
- windowStarts = append(windowStarts, time.UnixMilli(key).UTC())
- }
- result[resKey] = windowStarts
- }
- return result
- }
- // resolutionStores is a grouping of a resolution and the instances of MetricStore that it is used to manage
- type resolutionStores struct {
- lock sync.Mutex
- resolution *util.Resolution
- collectors map[int64]MetricStore
- factory func() MetricStore
- }
- func newResolutionStores(resolution *util.Resolution, factory MetricStoreFactory) (*resolutionStores, error) {
- resCol := &resolutionStores{
- resolution: resolution,
- collectors: map[int64]MetricStore{},
- factory: factory,
- }
- // Start loop which will remove expired MetricStore
- go func() {
- for {
- time.Sleep(resCol.resolution.Next().Sub(time.Now().UTC()))
- resCol.clean()
- }
- }()
- return resCol, nil
- }
- func (r *resolutionStores) clean() {
- r.lock.Lock()
- defer r.lock.Unlock()
- limitKey := r.resolution.Limit().UnixMilli()
- for key := range r.collectors {
- if key < limitKey {
- delete(r.collectors, key)
- }
- }
- }
- func (r *resolutionStores) update(
- updateSet *UpdateSet,
- ) {
- r.lock.Lock()
- defer r.lock.Unlock()
- limit := r.resolution.Limit()
- if updateSet.Timestamp.Before(limit) {
- log.Debugf(
- "skipping update on resolution '%s' because Timestamp '%s' is before the limit '%s",
- r.resolution.Interval(),
- updateSet.Timestamp.Format(time.RFC3339),
- limit.Format(time.RFC3339),
- )
- return
- }
- resolutionStart := r.resolution.Get(updateSet.Timestamp)
- key := resolutionStart.UnixMilli()
- collector, ok := r.collectors[key]
- if !ok {
- collector = r.factory()
- r.collectors[key] = collector
- }
- for _, update := range updateSet.Updates {
- collector.Update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
- }
- // check if update needs to be applied to previous collector, because some aggregators are inclusive
- if resolutionStart.Equal(updateSet.Timestamp) {
- prevKey := r.resolution.Get(updateSet.Timestamp.Add(-1)).UnixMilli()
- if prevCollector, ok := r.collectors[prevKey]; ok {
- for _, update := range updateSet.Updates {
- prevCollector.Update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
- }
- }
- }
- }
- func (r *resolutionStores) getCollector(t time.Time) (MetricStore, error) {
- r.lock.Lock()
- defer r.lock.Unlock()
- if t.Before(r.resolution.Limit()) {
- return nil, fmt.Errorf(
- "request for metric at time '%s' for resolution '%s' is past limit of '%s'",
- t.Format(time.RFC3339),
- r.resolution.Interval(),
- r.resolution.Limit().Format(time.RFC3339),
- )
- }
- key := r.resolution.Get(t).UnixMilli()
- collector, ok := r.collectors[key]
- if !ok {
- return nil, fmt.Errorf("failed to find MetricCollector for interval '%s' for time '%s'", r.resolution.Interval(), t.Format(time.RFC3339))
- }
- return collector, nil
- }
- func (r *resolutionStores) getKeys() []int64 {
- r.lock.Lock()
- defer r.lock.Unlock()
- var keys []int64
- for key := range r.collectors {
- keys = append(keys, key)
- }
- return keys
- }
|