ingestionmanager.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package cloudcost
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/pkg/cloud"
  9. "github.com/opencost/opencost/pkg/cloud/config"
  10. )
  11. // IngestionManager is a config.Observer which creates Ingestor instances based on the signals that it receives from the
  12. // config.Controller
  13. type IngestionManager struct {
  14. lock sync.Mutex
  15. ingestors map[string]*ingestor
  16. config IngestorConfig
  17. repo Repository
  18. }
  19. // NewIngestionManager creates a new IngestionManager and registers it with the provided integration controller
  20. func NewIngestionManager(controller *config.Controller, repo Repository, ingConf IngestorConfig) *IngestionManager {
  21. // return empty ingestion manager if store or integration controller are nil
  22. if controller == nil || repo == nil {
  23. return &IngestionManager{
  24. ingestors: map[string]*ingestor{},
  25. }
  26. }
  27. im := &IngestionManager{
  28. ingestors: map[string]*ingestor{},
  29. repo: repo,
  30. config: ingConf,
  31. }
  32. controller.RegisterObserver(im)
  33. return im
  34. }
  35. // PutConfig is an imperative function which puts an ingestor for the provided Integration
  36. func (im *IngestionManager) PutConfig(kc cloud.KeyedConfig) {
  37. im.lock.Lock()
  38. defer im.lock.Unlock()
  39. err := im.createIngestor(kc)
  40. if err != nil {
  41. log.Errorf("IngestionManager: PutConfig failed to create billing integration: %s", err.Error())
  42. }
  43. }
  44. // DeleteConfig is an imperative function which removes an ingestor with a matching key
  45. func (im *IngestionManager) DeleteConfig(key string) {
  46. im.lock.Lock()
  47. defer im.lock.Unlock()
  48. im.deleteIngestor(key)
  49. }
  50. // SetConfigs is a declarative function for setting which BillingIntegrations IngestionManager should have ingestors for
  51. func (im *IngestionManager) SetConfigs(configs map[string]cloud.KeyedConfig) {
  52. im.lock.Lock()
  53. defer im.lock.Unlock()
  54. // delete any exiting ingestors
  55. for key, _ := range im.ingestors {
  56. im.deleteIngestor(key)
  57. }
  58. // create ingestors for provided
  59. for _, conf := range configs {
  60. err := im.createIngestor(conf)
  61. if err != nil {
  62. log.Errorf("IngestionManager: error creating ingestor: %s", err.Error())
  63. }
  64. }
  65. }
  66. func (im *IngestionManager) StartAll() {
  67. im.lock.Lock()
  68. defer im.lock.Unlock()
  69. var wg sync.WaitGroup
  70. wg.Add(len(im.ingestors))
  71. for key := range im.ingestors {
  72. ing := im.ingestors[key]
  73. go func() {
  74. defer wg.Done()
  75. ing.Start(false)
  76. }()
  77. }
  78. wg.Wait()
  79. }
  80. func (im *IngestionManager) StopAll() {
  81. im.lock.Lock()
  82. defer im.lock.Unlock()
  83. var wg sync.WaitGroup
  84. wg.Add(len(im.ingestors))
  85. for key := range im.ingestors {
  86. ing := im.ingestors[key]
  87. go func() {
  88. defer wg.Done()
  89. ing.Stop()
  90. }()
  91. }
  92. wg.Wait()
  93. }
  94. func (im *IngestionManager) RebuildAll() {
  95. im.lock.Lock()
  96. defer im.lock.Unlock()
  97. var wg sync.WaitGroup
  98. wg.Add(len(im.ingestors))
  99. for key := range im.ingestors {
  100. go func(ing *ingestor) {
  101. defer wg.Done()
  102. ing.Stop()
  103. ing.Start(true)
  104. }(im.ingestors[key])
  105. }
  106. wg.Wait()
  107. }
  108. func (im *IngestionManager) Rebuild(integrationKey string) error {
  109. im.lock.Lock()
  110. defer im.lock.Unlock()
  111. ing, ok := im.ingestors[integrationKey]
  112. if !ok {
  113. return fmt.Errorf("CloudCost: IngestionManager: Rebuild: failed to rebuild, integration with key does not exist: %s", integrationKey)
  114. }
  115. ing.Stop()
  116. ing.Start(true)
  117. return nil
  118. }
  119. func (im *IngestionManager) RepairAll(start, end time.Time) error {
  120. im.lock.Lock()
  121. defer im.lock.Unlock()
  122. s := opencost.RoundForward(start, im.config.Resolution)
  123. e := opencost.RoundForward(end, im.config.Resolution)
  124. windows, err := opencost.GetWindowsForQueryWindow(s, e, im.config.QueryWindow)
  125. if err != nil {
  126. return fmt.Errorf("CloudCost: IngestionManager: Repair could not retrieve windows: %s", err.Error())
  127. }
  128. for key := range im.ingestors {
  129. go func(ing *ingestor) {
  130. for _, window := range windows {
  131. ing.BuildWindow(*window.Start(), *window.End())
  132. }
  133. }(im.ingestors[key])
  134. }
  135. return nil
  136. }
  137. func (im *IngestionManager) Repair(integrationKey string, start, end time.Time) error {
  138. im.lock.Lock()
  139. defer im.lock.Unlock()
  140. s := opencost.RoundForward(start, im.config.Resolution)
  141. e := opencost.RoundForward(end, im.config.Resolution)
  142. windows, err := opencost.GetWindowsForQueryWindow(s, e, im.config.QueryWindow)
  143. if err != nil {
  144. return fmt.Errorf("CloudCost: IngestionManager: Repair could not retrieve windows: %s", err.Error())
  145. }
  146. ing, ok := im.ingestors[integrationKey]
  147. if !ok {
  148. return fmt.Errorf("CloudCost: IngestionManager: Repair: failed to rebuild, integration with key does not exist: %s", integrationKey)
  149. }
  150. go func(ing *ingestor) {
  151. for _, window := range windows {
  152. ing.BuildWindow(*window.Start(), *window.End())
  153. }
  154. }(ing)
  155. return nil
  156. }
  157. // deleteIngestor stops then removes an ingestor from the map of ingestors
  158. func (im *IngestionManager) deleteIngestor(integrationKey string) {
  159. ing, ok := im.ingestors[integrationKey]
  160. if !ok {
  161. return
  162. }
  163. log.Infof("CloudCost: IngestionManager: deleting integration with key: %s", integrationKey)
  164. ing.Stop()
  165. delete(im.ingestors, integrationKey)
  166. }
  167. // createIngestor stops existing ingestor with matching key then creates and starts and new ingestor
  168. func (im *IngestionManager) createIngestor(config cloud.KeyedConfig) error {
  169. if config == nil {
  170. return fmt.Errorf("cannot create ingestor from nil integration")
  171. }
  172. // delete ingestor with matching key if it exists
  173. im.deleteIngestor(config.Key())
  174. log.Infof("CloudCost: IngestionManager: creating integration with key: %s", config.Key())
  175. ing, err := NewIngestor(im.config, im.repo, config)
  176. if err != nil {
  177. return fmt.Errorf("IngestionManager: createIngestor: %w", err)
  178. }
  179. ing.Start(false)
  180. im.ingestors[config.Key()] = ing
  181. return nil
  182. }