2
0

runner.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package pricingmodel
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/errors"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/model/pricingmodel"
  9. "github.com/opencost/opencost/core/pkg/util/timeutil"
  10. )
  11. type runnerConfig struct {
  12. interval time.Duration
  13. lastRun *time.Time
  14. }
  15. // runner periodically fetches pricing from a PricingSource and writes it to storage.
  16. // The storage path is derived from PricingModelSet.Source set by the PricingSource implementation.
  17. type runner struct {
  18. source pricingmodel.PricingSource
  19. store *storageWriter
  20. config runnerConfig
  21. isRunning atomic.Bool
  22. isStopping atomic.Bool
  23. exitCh chan struct{}
  24. statusLock sync.RWMutex
  25. status Status
  26. }
  27. func newRunner(source pricingmodel.PricingSource, store *storageWriter, config runnerConfig) *runner {
  28. status := Status{
  29. SourceKey: source.PricingSourceKey(),
  30. CreatedAt: time.Now().UTC(),
  31. RefreshRate: config.interval.String(),
  32. }
  33. return &runner{
  34. source: source,
  35. store: store,
  36. config: config,
  37. status: status,
  38. }
  39. }
  40. // initialDelay computes how long to wait before the first tick.
  41. // If lastRun is set and lastRun+interval is still in the future, wait until then.
  42. // Otherwise run immediately.
  43. func (r *runner) initialDelay() time.Duration {
  44. if r.config.lastRun == nil {
  45. return 0
  46. }
  47. r.status.LastRun = *r.config.lastRun
  48. next := r.config.lastRun.Add(r.config.interval)
  49. delay := time.Until(next)
  50. if delay <= 0 {
  51. r.status.NextRun = time.Now()
  52. return 0
  53. }
  54. r.status.NextRun = next
  55. log.Infof("PricingModel[%s]: runner: previous run at '%s' next run '%s'",
  56. r.source.PricingSourceKey(),
  57. r.status.LastRun.Format(time.RFC3339),
  58. r.status.NextRun.Format(time.RFC3339))
  59. return delay
  60. }
  61. func (r *runner) Start() {
  62. if !r.isRunning.CompareAndSwap(false, true) {
  63. return
  64. }
  65. r.exitCh = make(chan struct{})
  66. go r.run()
  67. }
  68. func (r *runner) Stop() {
  69. if !r.isStopping.CompareAndSwap(false, true) {
  70. return
  71. }
  72. close(r.exitCh)
  73. r.isRunning.Store(false)
  74. r.isStopping.Store(false)
  75. }
  76. func (r *runner) Status() Status {
  77. r.statusLock.RLock()
  78. defer r.statusLock.RUnlock()
  79. return r.status
  80. }
  81. func (r *runner) run() {
  82. defer errors.HandlePanic()
  83. ticker := timeutil.NewJobTicker()
  84. defer ticker.Close()
  85. ticker.TickIn(r.initialDelay())
  86. for {
  87. select {
  88. case <-r.exitCh:
  89. return
  90. case <-ticker.Ch:
  91. }
  92. r.export()
  93. r.statusLock.Lock()
  94. r.status.NextRun = time.Now().UTC().Add(r.config.interval)
  95. r.statusLock.Unlock()
  96. ticker.TickIn(r.config.interval)
  97. }
  98. }
  99. func (r *runner) export() {
  100. pms, err := r.source.GetPricing()
  101. if err != nil {
  102. log.Errorf("PricingModel: runner: failed to get pricing: %v", err)
  103. r.statusLock.Lock()
  104. r.status.LastError = err.Error()
  105. r.statusLock.Unlock()
  106. return
  107. }
  108. err = r.store.Write(pms)
  109. if err != nil {
  110. log.Errorf("PricingModel[%s]: runner: failed to write pricing model set to storage: %v", r.source.PricingSourceKey(), err)
  111. r.statusLock.Lock()
  112. r.status.LastError = err.Error()
  113. r.statusLock.Unlock()
  114. return
  115. }
  116. r.statusLock.Lock()
  117. r.status.LastRun = time.Now().UTC()
  118. r.status.Runs++
  119. r.status.LastError = ""
  120. r.statusLock.Unlock()
  121. }