| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- package cloudcost
- import (
- "fmt"
- "sync"
- "time"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/opencost"
- "github.com/opencost/opencost/pkg/cloud"
- "github.com/opencost/opencost/pkg/cloud/config"
- )
- // IngestionManager is a config.Observer which creates Ingestor instances based on the signals that it receives from the
- // config.Controller
- type IngestionManager struct {
- lock sync.Mutex
- ingestors map[string]*ingestor
- config IngestorConfig
- repo Repository
- }
- // NewIngestionManager creates a new IngestionManager and registers it with the provided integration controller
- func NewIngestionManager(controller *config.Controller, repo Repository, ingConf IngestorConfig) *IngestionManager {
- // return empty ingestion manager if store or integration controller are nil
- if controller == nil || repo == nil {
- return &IngestionManager{
- ingestors: map[string]*ingestor{},
- }
- }
- im := &IngestionManager{
- ingestors: map[string]*ingestor{},
- repo: repo,
- config: ingConf,
- }
- controller.RegisterObserver(im)
- return im
- }
- // PutConfig is an imperative function which puts an ingestor for the provided Integration
- func (im *IngestionManager) PutConfig(kc cloud.KeyedConfig) {
- im.lock.Lock()
- defer im.lock.Unlock()
- err := im.createIngestor(kc)
- if err != nil {
- log.Errorf("IngestionManager: PutConfig failed to create billing integration: %s", err.Error())
- }
- }
- // DeleteConfig is an imperative function which removes an ingestor with a matching key
- func (im *IngestionManager) DeleteConfig(key string) {
- im.lock.Lock()
- defer im.lock.Unlock()
- im.deleteIngestor(key)
- }
- // SetConfigs is a declarative function for setting which BillingIntegrations IngestionManager should have ingestors for
- func (im *IngestionManager) SetConfigs(configs map[string]cloud.KeyedConfig) {
- im.lock.Lock()
- defer im.lock.Unlock()
- // delete any exiting ingestors
- for key := range im.ingestors {
- im.deleteIngestor(key)
- }
- // create ingestors for provided
- for _, conf := range configs {
- err := im.createIngestor(conf)
- if err != nil {
- log.Errorf("IngestionManager: error creating ingestor: %s", err.Error())
- }
- }
- }
- func (im *IngestionManager) StartAll() {
- im.lock.Lock()
- defer im.lock.Unlock()
- var wg sync.WaitGroup
- wg.Add(len(im.ingestors))
- for key := range im.ingestors {
- ing := im.ingestors[key]
- go func() {
- defer wg.Done()
- ing.Start(false)
- }()
- }
- wg.Wait()
- }
- func (im *IngestionManager) StopAll() {
- im.lock.Lock()
- defer im.lock.Unlock()
- var wg sync.WaitGroup
- wg.Add(len(im.ingestors))
- for key := range im.ingestors {
- ing := im.ingestors[key]
- go func() {
- defer wg.Done()
- ing.Stop()
- }()
- }
- wg.Wait()
- }
- func (im *IngestionManager) RebuildAll() {
- im.lock.Lock()
- defer im.lock.Unlock()
- var wg sync.WaitGroup
- wg.Add(len(im.ingestors))
- for key := range im.ingestors {
- go func(ing *ingestor) {
- defer wg.Done()
- ing.Stop()
- ing.Start(true)
- }(im.ingestors[key])
- }
- wg.Wait()
- }
- // GetIngestors returns a copy of the ingestors map
- func (im *IngestionManager) GetIngestors() map[string]*ingestor {
- im.lock.Lock()
- defer im.lock.Unlock()
- // Return a copy to avoid race conditions
- copy := make(map[string]*ingestor)
- for k, v := range im.ingestors {
- copy[k] = v
- }
- return copy
- }
- func (im *IngestionManager) Rebuild(integrationKey string) error {
- im.lock.Lock()
- defer im.lock.Unlock()
- ing, ok := im.ingestors[integrationKey]
- if !ok {
- return fmt.Errorf("CloudCost: IngestionManager: Rebuild: failed to rebuild, integration with key does not exist: %s", integrationKey)
- }
- ing.Stop()
- ing.Start(true)
- return nil
- }
- func (im *IngestionManager) RepairAll(start, end time.Time) error {
- im.lock.Lock()
- defer im.lock.Unlock()
- s := opencost.RoundForward(start, im.config.Resolution)
- e := opencost.RoundForward(end, im.config.Resolution)
- windows, err := opencost.GetWindowsForQueryWindow(s, e, im.config.QueryWindow)
- if err != nil {
- return fmt.Errorf("CloudCost: IngestionManager: Repair could not retrieve windows: %s", err.Error())
- }
- for key := range im.ingestors {
- go func(ing *ingestor) {
- for _, window := range windows {
- ing.BuildWindow(*window.Start(), *window.End())
- }
- }(im.ingestors[key])
- }
- return nil
- }
- func (im *IngestionManager) Repair(integrationKey string, start, end time.Time) error {
- im.lock.Lock()
- defer im.lock.Unlock()
- s := opencost.RoundForward(start, im.config.Resolution)
- e := opencost.RoundForward(end, im.config.Resolution)
- windows, err := opencost.GetWindowsForQueryWindow(s, e, im.config.QueryWindow)
- if err != nil {
- return fmt.Errorf("CloudCost: IngestionManager: Repair could not retrieve windows: %s", err.Error())
- }
- ing, ok := im.ingestors[integrationKey]
- if !ok {
- return fmt.Errorf("CloudCost: IngestionManager: Repair: failed to rebuild, integration with key does not exist: %s", integrationKey)
- }
- go func(ing *ingestor) {
- for _, window := range windows {
- ing.BuildWindow(*window.Start(), *window.End())
- }
- }(ing)
- return nil
- }
- // deleteIngestor stops then removes an ingestor from the map of ingestors
- func (im *IngestionManager) deleteIngestor(integrationKey string) {
- ing, ok := im.ingestors[integrationKey]
- if !ok {
- return
- }
- log.Infof("CloudCost: IngestionManager: deleting integration with key: %s", integrationKey)
- ing.Stop()
- delete(im.ingestors, integrationKey)
- }
- // createIngestor stops existing ingestor with matching key then creates and starts and new ingestor
- func (im *IngestionManager) createIngestor(config cloud.KeyedConfig) error {
- if config == nil {
- return fmt.Errorf("cannot create ingestor from nil integration")
- }
- // delete ingestor with matching key if it exists
- im.deleteIngestor(config.Key())
- log.Infof("CloudCost: IngestionManager: creating integration with key: %s", config.Key())
- ing, err := NewIngestor(im.config, im.repo, config)
- if err != nil {
- return fmt.Errorf("IngestionManager: createIngestor: %w", err)
- }
- ing.RefreshStatus()
- ing.Start(false)
- im.ingestors[config.Key()] = ing
- return nil
- }
|