| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- package pricingmodel
- import (
- "fmt"
- "sync"
- "time"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/model/pricingmodel"
- corestorage "github.com/opencost/opencost/core/pkg/storage"
- "github.com/opencost/opencost/pkg/cloud/aws"
- "github.com/opencost/opencost/pkg/cloud/azure"
- "github.com/opencost/opencost/pkg/cloud/gcp"
- )
- // Pipeline manages a set of runners, one per PricingSource, exporting pricing
- // model snapshots to bucket storage on a configured interval.
- //
- // Initially constructed with a fixed set of always-on sources. Additional
- // sources can be registered dynamically via AddSource to support
- // config-driven sources in the future (similar to the CloudCost ingestion
- // manager's observer pattern).
- type Pipeline struct {
- lock sync.Mutex
- runners map[string]*runner
- store *storageWriter
- config PipelineConfig
- }
- // NewPipeline creates a Pipeline for the given sources and storage backend.
- // If cfg is nil, DefaultPipelineConfig is used.
- // The storage should be initialized by the caller via storage.InitializeStorage
- // or storage.GetDefaultStorage, matching how CloudCost storage is wired up.
- func NewPipeline(store corestorage.Storage, cfg PipelineConfig) (*Pipeline, error) {
- ps, err := newStorageWriter(store, cfg.AppName)
- if err != nil {
- return nil, fmt.Errorf("NewPipeline: %w", err)
- }
- p := &Pipeline{
- runners: make(map[string]*runner),
- store: ps,
- config: cfg,
- }
- lastUpdates, err := ps.LastUpdates()
- if err != nil {
- log.Warnf("NewPipeline: failed to load last update times, runners will start immediately: %s", err.Error())
- lastUpdates = map[string]time.Time{}
- }
- if cfg.AWSRunnerConfig.Enabled {
- src := aws.NewPricingListPricingSource(aws.PricingListPricingSourceConfig{
- CurrencyCode: cfg.CurrencyCode,
- })
- rc := runnerConfig{
- interval: cfg.AWSRunnerConfig.RefreshInterval,
- }
- if t, ok := lastUpdates[src.PricingSourceKey()]; ok {
- rc.lastRun = &t
- }
- p.addSource(src, rc)
- }
- if cfg.AzureRunnerConfig.Enabled {
- src := azure.NewAzureRetailPricingSource(azure.AzureRetailPricingSourceConfig{
- CurrencyCode: cfg.CurrencyCode,
- })
- rc := runnerConfig{
- interval: cfg.AzureRunnerConfig.RefreshInterval,
- }
- if t, ok := lastUpdates[src.PricingSourceKey()]; ok {
- rc.lastRun = &t
- }
- p.addSource(src, rc)
- }
- if cfg.GCPRunnerConfig.Enabled {
- src, err := gcp.NewGCPBillingPricingSource(gcp.GCPBillingPricingSourceConfig{
- APIKey: cfg.GCPRunnerConfig.APIKey,
- CurrencyCode: cfg.CurrencyCode,
- })
- if err != nil {
- log.Error(err.Error())
- } else {
- rc := runnerConfig{
- interval: cfg.GCPRunnerConfig.RefreshInterval,
- }
- if t, ok := lastUpdates[src.PricingSourceKey()]; ok {
- rc.lastRun = &t
- }
- p.addSource(src, rc)
- }
- }
- return p, nil
- }
- // StartAll starts all registered runners.
- func (p *Pipeline) StartAll() {
- p.lock.Lock()
- defer p.lock.Unlock()
- for _, r := range p.runners {
- r.Start()
- }
- }
- // StopAll stops all registered runners.
- func (p *Pipeline) StopAll() {
- p.lock.Lock()
- defer p.lock.Unlock()
- var wg sync.WaitGroup
- wg.Add(len(p.runners))
- for _, r := range p.runners {
- go func(r *runner) {
- defer wg.Done()
- r.Stop()
- }(r)
- }
- wg.Wait()
- }
- // AddSource registers a new PricingSource and starts its runner. If a source
- // with the same key already exists it is stopped and replaced.
- func (p *Pipeline) AddSource(src pricingmodel.PricingSource, cfg runnerConfig) {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.addSource(src, cfg)
- }
- // RemoveSource stops and removes the runner for the given source key.
- func (p *Pipeline) RemoveSource(key string) {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.removeSource(key)
- }
- func (p *Pipeline) addSource(src pricingmodel.PricingSource, cfg runnerConfig) {
- key := src.PricingSourceKey()
- p.removeSource(key)
- log.Infof("PricingModel: pipeline: adding source %s", key)
- r := newRunner(src, p.store, cfg)
- r.Start()
- p.runners[key] = r
- }
- // Status returns the current status of all runners.
- func (p *Pipeline) Status() []Status {
- p.lock.Lock()
- defer p.lock.Unlock()
- statuses := make([]Status, 0, len(p.runners))
- for _, r := range p.runners {
- statuses = append(statuses, r.Status())
- }
- return statuses
- }
- // Rebuild triggers an immediate export on all runners outside the scheduled tick.
- func (p *Pipeline) Rebuild() {
- p.lock.Lock()
- runners := make([]*runner, 0, len(p.runners))
- for _, r := range p.runners {
- runners = append(runners, r)
- }
- p.lock.Unlock()
- for _, r := range runners {
- go r.export()
- }
- }
- // RebuildSource triggers an immediate export for the runner with the given source key.
- func (p *Pipeline) RebuildSource(sourceKey string) error {
- p.lock.Lock()
- r, ok := p.runners[sourceKey]
- p.lock.Unlock()
- if !ok {
- return fmt.Errorf("PricingModel: no runner found for source key %q", sourceKey)
- }
- go r.export()
- return nil
- }
- func (p *Pipeline) removeSource(key string) {
- r, ok := p.runners[key]
- if !ok {
- return
- }
- log.Infof("PricingModel: pipeline: removing source %s", key)
- r.Stop()
- delete(p.runners, key)
- }
|