pipeline.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package pricingmodel
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/model/pricingmodel"
  8. corestorage "github.com/opencost/opencost/core/pkg/storage"
  9. "github.com/opencost/opencost/pkg/cloud/aws"
  10. "github.com/opencost/opencost/pkg/cloud/azure"
  11. "github.com/opencost/opencost/pkg/cloud/gcp"
  12. )
  13. // Pipeline manages a set of runners, one per PricingSource, exporting pricing
  14. // model snapshots to bucket storage on a configured interval.
  15. //
  16. // Initially constructed with a fixed set of always-on sources. Additional
  17. // sources can be registered dynamically via AddSource to support
  18. // config-driven sources in the future (similar to the CloudCost ingestion
  19. // manager's observer pattern).
  20. type Pipeline struct {
  21. lock sync.Mutex
  22. runners map[string]*runner
  23. store *storageWriter
  24. config PipelineConfig
  25. }
  26. // NewPipeline creates a Pipeline for the given sources and storage backend.
  27. // If cfg is nil, DefaultPipelineConfig is used.
  28. // The storage should be initialized by the caller via storage.InitializeStorage
  29. // or storage.GetDefaultStorage, matching how CloudCost storage is wired up.
  30. func NewPipeline(store corestorage.Storage, cfg PipelineConfig) (*Pipeline, error) {
  31. ps, err := newStorageWriter(store, cfg.AppName)
  32. if err != nil {
  33. return nil, fmt.Errorf("NewPipeline: %w", err)
  34. }
  35. p := &Pipeline{
  36. runners: make(map[string]*runner),
  37. store: ps,
  38. config: cfg,
  39. }
  40. lastUpdates, err := ps.LastUpdates()
  41. if err != nil {
  42. log.Warnf("NewPipeline: failed to load last update times, runners will start immediately: %s", err.Error())
  43. lastUpdates = map[string]time.Time{}
  44. }
  45. if cfg.AWSRunnerConfig.Enabled {
  46. src := aws.NewPricingListPricingSource(aws.PricingListPricingSourceConfig{
  47. CurrencyCode: cfg.CurrencyCode,
  48. })
  49. rc := runnerConfig{
  50. interval: cfg.AWSRunnerConfig.RefreshInterval,
  51. }
  52. if t, ok := lastUpdates[src.PricingSourceKey()]; ok {
  53. rc.lastRun = &t
  54. }
  55. p.addSource(src, rc)
  56. }
  57. if cfg.AzureRunnerConfig.Enabled {
  58. src := azure.NewAzureRetailPricingSource(azure.AzureRetailPricingSourceConfig{
  59. CurrencyCode: cfg.CurrencyCode,
  60. })
  61. rc := runnerConfig{
  62. interval: cfg.AzureRunnerConfig.RefreshInterval,
  63. }
  64. if t, ok := lastUpdates[src.PricingSourceKey()]; ok {
  65. rc.lastRun = &t
  66. }
  67. p.addSource(src, rc)
  68. }
  69. if cfg.GCPRunnerConfig.Enabled {
  70. src, err := gcp.NewGCPBillingPricingSource(gcp.GCPBillingPricingSourceConfig{
  71. APIKey: cfg.GCPRunnerConfig.APIKey,
  72. CurrencyCode: cfg.CurrencyCode,
  73. })
  74. if err != nil {
  75. log.Error(err.Error())
  76. } else {
  77. rc := runnerConfig{
  78. interval: cfg.GCPRunnerConfig.RefreshInterval,
  79. }
  80. if t, ok := lastUpdates[src.PricingSourceKey()]; ok {
  81. rc.lastRun = &t
  82. }
  83. p.addSource(src, rc)
  84. }
  85. }
  86. return p, nil
  87. }
  88. // StartAll starts all registered runners.
  89. func (p *Pipeline) StartAll() {
  90. p.lock.Lock()
  91. defer p.lock.Unlock()
  92. for _, r := range p.runners {
  93. r.Start()
  94. }
  95. }
  96. // StopAll stops all registered runners.
  97. func (p *Pipeline) StopAll() {
  98. p.lock.Lock()
  99. defer p.lock.Unlock()
  100. var wg sync.WaitGroup
  101. wg.Add(len(p.runners))
  102. for _, r := range p.runners {
  103. go func(r *runner) {
  104. defer wg.Done()
  105. r.Stop()
  106. }(r)
  107. }
  108. wg.Wait()
  109. }
  110. // AddSource registers a new PricingSource and starts its runner. If a source
  111. // with the same key already exists it is stopped and replaced.
  112. func (p *Pipeline) AddSource(src pricingmodel.PricingSource, cfg runnerConfig) {
  113. p.lock.Lock()
  114. defer p.lock.Unlock()
  115. p.addSource(src, cfg)
  116. }
  117. // RemoveSource stops and removes the runner for the given source key.
  118. func (p *Pipeline) RemoveSource(key string) {
  119. p.lock.Lock()
  120. defer p.lock.Unlock()
  121. p.removeSource(key)
  122. }
  123. func (p *Pipeline) addSource(src pricingmodel.PricingSource, cfg runnerConfig) {
  124. key := src.PricingSourceKey()
  125. p.removeSource(key)
  126. log.Infof("PricingModel: pipeline: adding source %s", key)
  127. r := newRunner(src, p.store, cfg)
  128. r.Start()
  129. p.runners[key] = r
  130. }
  131. // Status returns the current status of all runners.
  132. func (p *Pipeline) Status() []Status {
  133. p.lock.Lock()
  134. defer p.lock.Unlock()
  135. statuses := make([]Status, 0, len(p.runners))
  136. for _, r := range p.runners {
  137. statuses = append(statuses, r.Status())
  138. }
  139. return statuses
  140. }
  141. // Rebuild triggers an immediate export on all runners outside the scheduled tick.
  142. func (p *Pipeline) Rebuild() {
  143. p.lock.Lock()
  144. runners := make([]*runner, 0, len(p.runners))
  145. for _, r := range p.runners {
  146. runners = append(runners, r)
  147. }
  148. p.lock.Unlock()
  149. for _, r := range runners {
  150. go r.export()
  151. }
  152. }
  153. // RebuildSource triggers an immediate export for the runner with the given source key.
  154. func (p *Pipeline) RebuildSource(sourceKey string) error {
  155. p.lock.Lock()
  156. r, ok := p.runners[sourceKey]
  157. p.lock.Unlock()
  158. if !ok {
  159. return fmt.Errorf("PricingModel: no runner found for source key %q", sourceKey)
  160. }
  161. go r.export()
  162. return nil
  163. }
  164. func (p *Pipeline) removeSource(key string) {
  165. r, ok := p.runners[key]
  166. if !ok {
  167. return
  168. }
  169. log.Infof("PricingModel: pipeline: removing source %s", key)
  170. r.Stop()
  171. delete(p.runners, key)
  172. }