| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- package pricingmodel
- import (
- "sync"
- "sync/atomic"
- "time"
- "github.com/opencost/opencost/core/pkg/errors"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/model/pricingmodel"
- "github.com/opencost/opencost/core/pkg/util/timeutil"
- )
- type runnerConfig struct {
- interval time.Duration
- lastRun *time.Time
- }
- // runner periodically fetches pricing from a PricingSource and writes it to storage.
- // The storage path is derived from PricingModelSet.Source set by the PricingSource implementation.
- type runner struct {
- source pricingmodel.PricingSource
- store *storageWriter
- config runnerConfig
- isRunning atomic.Bool
- isStopping atomic.Bool
- exitCh chan struct{}
- statusLock sync.RWMutex
- status Status
- }
- func newRunner(source pricingmodel.PricingSource, store *storageWriter, config runnerConfig) *runner {
- status := Status{
- SourceKey: source.PricingSourceKey(),
- CreatedAt: time.Now().UTC(),
- RefreshRate: config.interval.String(),
- }
- return &runner{
- source: source,
- store: store,
- config: config,
- status: status,
- }
- }
- // initialDelay computes how long to wait before the first tick.
- // If lastRun is set and lastRun+interval is still in the future, wait until then.
- // Otherwise run immediately.
- func (r *runner) initialDelay() time.Duration {
- if r.config.lastRun == nil {
- return 0
- }
- r.status.LastRun = *r.config.lastRun
- next := r.config.lastRun.Add(r.config.interval)
- delay := time.Until(next)
- if delay <= 0 {
- r.status.NextRun = time.Now()
- return 0
- }
- r.status.NextRun = next
- log.Infof("PricingModel[%s]: runner: previous run at '%s' next run '%s'",
- r.source.PricingSourceKey(),
- r.status.LastRun.Format(time.RFC3339),
- r.status.NextRun.Format(time.RFC3339))
- return delay
- }
- func (r *runner) Start() {
- if !r.isRunning.CompareAndSwap(false, true) {
- return
- }
- r.exitCh = make(chan struct{})
- go r.run()
- }
- func (r *runner) Stop() {
- if !r.isStopping.CompareAndSwap(false, true) {
- return
- }
- close(r.exitCh)
- r.isRunning.Store(false)
- r.isStopping.Store(false)
- }
- func (r *runner) Status() Status {
- r.statusLock.RLock()
- defer r.statusLock.RUnlock()
- return r.status
- }
- func (r *runner) run() {
- defer errors.HandlePanic()
- ticker := timeutil.NewJobTicker()
- defer ticker.Close()
- ticker.TickIn(r.initialDelay())
- for {
- select {
- case <-r.exitCh:
- return
- case <-ticker.Ch:
- }
- r.export()
- r.statusLock.Lock()
- r.status.NextRun = time.Now().UTC().Add(r.config.interval)
- r.statusLock.Unlock()
- ticker.TickIn(r.config.interval)
- }
- }
- func (r *runner) export() {
- pms, err := r.source.GetPricing()
- if err != nil {
- log.Errorf("PricingModel: runner: failed to get pricing: %v", err)
- r.statusLock.Lock()
- r.status.LastError = err.Error()
- r.statusLock.Unlock()
- return
- }
- err = r.store.Write(pms)
- if err != nil {
- log.Errorf("PricingModel[%s]: runner: failed to write pricing model set to storage: %v", r.source.PricingSourceKey(), err)
- r.statusLock.Lock()
- r.status.LastError = err.Error()
- r.statusLock.Unlock()
- return
- }
- r.statusLock.Lock()
- r.status.LastRun = time.Now().UTC()
- r.status.Runs++
- r.status.LastError = ""
- r.statusLock.Unlock()
- }
|