| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- package metric
- import (
- "fmt"
- "path"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/opencost/opencost/core/pkg/exporter"
- "github.com/opencost/opencost/core/pkg/exporter/pathing"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/storage"
- "github.com/opencost/opencost/core/pkg/util/json"
- "github.com/opencost/opencost/modules/collector-source/pkg/util"
- )
- const ControllerEventName = "controller"
- type RepositoryConfig struct {
- }
- // MetricRepository is an MetricUpdater which applies calls to update to all resolutions being tracked. It holds the
- // MetricStore instances for each resolution.
- type MetricRepository struct {
- lock sync.Mutex
- resolutionStores map[string]*resolutionStores
- exporter exporter.EventExporter[UpdateSet]
- }
- func NewMetricRepository(
- clusterID string,
- resolutions []util.ResolutionConfiguration,
- store storage.Storage,
- storeFactory MetricStoreFactory,
- ) *MetricRepository {
- resoluationCollectors := make(map[string]*resolutionStores)
- for _, resconf := range resolutions {
- resolution, err := util.NewResolution(resconf)
- if err != nil {
- log.Errorf("failed to create resolution %s", err.Error())
- }
- resCollector, err := newResolutionStores(resolution, storeFactory)
- if err != nil {
- log.Errorf("NewMetricRepository: failed to init resolution metric: %s", err.Error())
- continue
- }
- resoluationCollectors[resolution.Interval()] = resCollector
- }
- repo := &MetricRepository{
- resolutionStores: resoluationCollectors,
- }
- if store != nil {
- pathFormatter, err := pathing.NewEventStoragePathFormatter("", clusterID, ControllerEventName)
- if err != nil {
- log.Errorf("filed to create path formatter for scrape controller: %s", err.Error())
- return repo
- }
- encoder := exporter.NewJSONEncoder[UpdateSet]()
- repo.exporter = exporter.NewEventStorageExporter(
- pathFormatter,
- encoder,
- store,
- )
- // attempt to restore state from files
- // get path of saved files
- dirPath := path.Dir(pathFormatter.ToFullPath("", time.Time{}, ""))
- files, err := store.List(dirPath)
- if err != nil {
- log.Errorf("failed to list files in scrape controller: %s", err.Error())
- }
- // find oldest limit
- limit := time.Now().UTC()
- for _, resStore := range repo.resolutionStores {
- if limit.After(resStore.resolution.Limit()) {
- limit = resStore.resolution.Limit()
- }
- }
- // find files that are within limit
- var filesToRun []string
- for _, file := range files {
- fileName := path.Base(file.Name)
- timeString := strings.TrimSuffix(fileName, "."+encoder.FileExt())
- timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
- if err != nil {
- log.Errorf("failed to parse fileName %s: %s", fileName, err.Error())
- continue
- }
- if timestamp.After(limit) {
- filesToRun = append(filesToRun, pathFormatter.ToFullPath("", timestamp, encoder.FileExt()))
- }
- }
- // sort files
- sort.Strings(filesToRun)
- // open files and run updates
- for _, fileName := range filesToRun {
- b, err := store.Read(fileName)
- if err != nil {
- log.Errorf("failed to load file contents for '%s': %s", fileName, err.Error())
- continue
- }
- updateSet := UpdateSet{}
- err = json.Unmarshal(b, &updateSet)
- if err != nil {
- log.Errorf("failed to unmarshal file %s: %s", fileName, err.Error())
- continue
- }
- filePrefix := path.Base(fileName)
- timeString := strings.TrimSuffix(filePrefix, "."+encoder.FileExt())
- timestamp, err := time.Parse(pathing.EventStorageTimeFormat, timeString)
- repo.Update(updateSet.Updates, timestamp)
- }
- }
- return repo
- }
- func (r *MetricRepository) GetCollector(interval string, t time.Time) (MetricStore, error) {
- r.lock.Lock()
- defer r.lock.Unlock()
- resCollector, ok := r.resolutionStores[interval]
- if !ok {
- return nil, fmt.Errorf("failed to find resolution for key %s", interval)
- }
- return resCollector.getCollector(t)
- }
- // Update calls Update on the collectors for each resolution
- func (r *MetricRepository) Update(
- updates []Update,
- timestamp time.Time,
- ) {
- r.lock.Lock()
- defer r.lock.Unlock()
- for _, update := range updates {
- // Call update on the collectors for each resolution
- for _, resCollector := range r.resolutionStores {
- resCollector.update(update.Name, update.Labels, update.Value, timestamp, update.AdditionalInfo)
- }
- }
- if r.exporter != nil {
- err := r.exporter.Export(timestamp, &UpdateSet{
- Updates: updates,
- })
- if err != nil {
- log.Errorf("failed to export update results: %s", err.Error())
- }
- }
- }
- type UpdateSet struct {
- Updates []Update `json:"updates"`
- }
- type Update struct {
- Name string `json:"name"`
- Labels map[string]string `json:"labels"`
- Value float64 `json:"value"`
- AdditionalInfo map[string]string `json:"additionalInfo"`
- }
- func (r *MetricRepository) Coverage() map[string][]time.Time {
- r.lock.Lock()
- defer r.lock.Unlock()
- result := make(map[string][]time.Time)
- for resKey, resCollector := range r.resolutionStores {
- var windowStarts []time.Time
- for _, key := range resCollector.getKeys() {
- windowStarts = append(windowStarts, time.Unix(key, 0).UTC())
- }
- result[resKey] = windowStarts
- }
- return result
- }
- // resolutionStores is a grouping of a resolution and the instances of MetricStore that it is used to manage
- type resolutionStores struct {
- lock sync.Mutex
- resolution *util.Resolution
- collectors map[int64]MetricStore
- factory func() MetricStore
- }
- func newResolutionStores(resolution *util.Resolution, factory MetricStoreFactory) (*resolutionStores, error) {
- resCol := &resolutionStores{
- resolution: resolution,
- collectors: map[int64]MetricStore{},
- factory: factory,
- }
- // Start loop which will remove expired MetricStore
- go func() {
- for {
- time.Sleep(resCol.resolution.Next().Sub(time.Now().UTC()))
- resCol.clean()
- }
- }()
- return resCol, nil
- }
- func (r *resolutionStores) clean() {
- r.lock.Lock()
- defer r.lock.Unlock()
- limitKey := r.resolution.Limit().UnixMilli()
- for key := range r.collectors {
- if key < limitKey {
- delete(r.collectors, key)
- }
- }
- }
- func (r *resolutionStores) update(
- metricName string,
- labels map[string]string,
- value float64,
- timestamp time.Time,
- additionalInformation map[string]string,
- ) {
- r.lock.Lock()
- defer r.lock.Unlock()
- limit := r.resolution.Limit()
- if timestamp.Before(limit) {
- log.Debugf(
- "failed to call update on resolution '%s' because Timestamp '%s' is before the limit '%s",
- r.resolution.Interval(),
- timestamp.Format(time.RFC3339),
- limit.Format(time.RFC3339),
- )
- return
- }
- key := r.resolution.Get(timestamp).UnixMilli()
- collector, ok := r.collectors[key]
- if !ok {
- collector = r.factory()
- r.collectors[key] = collector
- }
- collector.Update(metricName, labels, value, timestamp, additionalInformation)
- }
- func (r *resolutionStores) getCollector(t time.Time) (MetricStore, error) {
- r.lock.Lock()
- defer r.lock.Unlock()
- if t.Before(r.resolution.Limit()) {
- return nil, fmt.Errorf(
- "request for metric at time '%s' for resolution '%s' is past limit of '%s'",
- t.Format(time.RFC3339),
- r.resolution.Interval(),
- r.resolution.Limit().Format(time.RFC3339),
- )
- }
- key := r.resolution.Get(t).UnixMilli()
- collector, ok := r.collectors[key]
- if !ok {
- return nil, fmt.Errorf("failed to find MetricCollector for interval '%s' for time '%s'", r.resolution.Interval(), t.Format(time.RFC3339))
- }
- return collector, nil
- }
- func (r *resolutionStores) getKeys() []int64 {
- r.lock.Lock()
- defer r.lock.Unlock()
- var keys []int64
- for key := range r.collectors {
- keys = append(keys, key)
- }
- return keys
- }
|