ingestor.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package cloudcost
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/errors"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "github.com/opencost/opencost/core/pkg/util/stringutil"
  11. "github.com/opencost/opencost/core/pkg/util/timeutil"
  12. "github.com/opencost/opencost/pkg/cloud"
  13. "github.com/opencost/opencost/pkg/env"
  14. )
  15. // IngestorStatus includes diagnostic values for a given Ingestor
  16. type IngestorStatus struct {
  17. Created time.Time
  18. LastRun time.Time
  19. NextRun time.Time
  20. Runs int
  21. Coverage opencost.Window
  22. ConnectionStatus cloud.ConnectionStatus
  23. }
  24. // IngestorConfig is a configuration struct for an Ingestor
  25. type IngestorConfig struct {
  26. MonthToDateRunInterval int
  27. RefreshRate time.Duration
  28. Resolution time.Duration
  29. Duration time.Duration
  30. QueryWindow time.Duration
  31. RunWindow time.Duration
  32. }
  33. // DefaultIngestorConfiguration retrieves an IngestorConfig from env variables
  34. func DefaultIngestorConfiguration() IngestorConfig {
  35. return IngestorConfig{
  36. Resolution: timeutil.Day,
  37. Duration: timeutil.Day * time.Duration(env.GetCloudCost1dRetention()),
  38. MonthToDateRunInterval: env.GetCloudCostMonthToDateInterval(),
  39. RefreshRate: time.Hour * time.Duration(env.GetCloudCostRefreshRateHours()),
  40. QueryWindow: timeutil.Day * time.Duration(env.GetCloudCostQueryWindowDays()),
  41. RunWindow: timeutil.Day * time.Duration(env.GetCloudCostRunWindowDays()),
  42. }
  43. }
  44. // ingestor runs the process for ingesting CloudCost from its CloudCostIntegration and store it in a Repository
  45. type ingestor struct {
  46. key string
  47. integration CloudCostIntegration
  48. config IngestorConfig
  49. repo Repository
  50. runID string
  51. lastRun time.Time
  52. runs int
  53. creationTime time.Time
  54. coverage opencost.Window
  55. coverageLock sync.Mutex
  56. isRunning atomic.Bool
  57. isStopping atomic.Bool
  58. exitBuildCh chan string
  59. exitRunCh chan string
  60. }
  61. // NewIngestor is an initializer for ingestor
  62. func NewIngestor(ingestorConfig IngestorConfig, repo Repository, config cloud.KeyedConfig) (*ingestor, error) {
  63. if repo == nil {
  64. return nil, fmt.Errorf("CloudCost: NewIngestor: repository connot be nil")
  65. }
  66. if config == nil {
  67. return nil, fmt.Errorf("CloudCost: NewIngestor: integration connot be nil")
  68. }
  69. cci := GetIntegrationFromConfig(config)
  70. if cci == nil {
  71. return nil, fmt.Errorf("CloudCost: NewIngestor: provider integration config was not a valid type: %T", config)
  72. }
  73. now := time.Now().UTC()
  74. midnight := opencost.RoundForward(now, timeutil.Day)
  75. return &ingestor{
  76. config: ingestorConfig,
  77. repo: repo,
  78. key: config.Key(),
  79. integration: cci,
  80. creationTime: now,
  81. lastRun: now,
  82. coverage: opencost.NewClosedWindow(midnight, midnight),
  83. }, nil
  84. }
  85. func (ing *ingestor) LoadWindow(start, end time.Time) {
  86. windows, err := opencost.GetWindows(start, end, timeutil.Day)
  87. if err != nil {
  88. log.Errorf("CloudCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  89. return
  90. }
  91. for _, window := range windows {
  92. has, err2 := ing.repo.Has(*window.Start(), ing.key)
  93. if err2 != nil {
  94. log.Errorf("CloudCost[%s]: ingestor: error when loading window: %s", ing.key, err2.Error())
  95. }
  96. if !has {
  97. ing.BuildWindow(start, end)
  98. return
  99. }
  100. ing.expandCoverage(window)
  101. log.Debugf("CloudCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
  102. }
  103. }
  104. func (ing *ingestor) BuildWindow(start, end time.Time) {
  105. log.Infof("CloudCost[%s]: ingestor: building window %s", ing.key, opencost.NewWindow(&start, &end))
  106. ccsr, err := ing.integration.GetCloudCost(start, end)
  107. if err != nil {
  108. log.Errorf("CloudCost[%s]: ingestor: build failed for window %s: %s", ing.key, opencost.NewWindow(&start, &end), err.Error())
  109. return
  110. }
  111. for _, ccs := range ccsr.CloudCostSets {
  112. log.Debugf("BuildWindow[%s]: GetCloudCost: writing cloud costs for window %s: %d", ccs.Integration, ccs.Window, len(ccs.CloudCosts))
  113. err2 := ing.repo.Put(ccs)
  114. if err2 != nil {
  115. log.Errorf("CloudCost[%s]: ingestor: failed to save Cloud Cost Set with window %s: %s", ing.key, ccs.GetWindow().String(), err2.Error())
  116. }
  117. ing.expandCoverage(ccs.Window)
  118. }
  119. }
  120. func (ing *ingestor) Start(rebuild bool) {
  121. // If already running, log that and return.
  122. if !ing.isRunning.CompareAndSwap(false, true) {
  123. log.Infof("CloudCost: ingestor: is already running")
  124. return
  125. }
  126. ing.runID = stringutil.RandSeq(5)
  127. ing.exitBuildCh = make(chan string)
  128. ing.exitRunCh = make(chan string)
  129. // Build the store once, advancing backward in time from the earliest
  130. // point of coverage.
  131. go ing.build(rebuild)
  132. go ing.run()
  133. }
  134. func (ing *ingestor) Stop() {
  135. // If already stopping, log that and return.
  136. if !ing.isStopping.CompareAndSwap(false, true) {
  137. log.Infof("CloudCost: ingestor: is already stopping")
  138. return
  139. }
  140. msg := "Stopping"
  141. // If the processes are running (and thus there are channels available for
  142. // stopping them) then stop all sub-processes (i.e. build and run)
  143. var wg sync.WaitGroup
  144. if ing.exitBuildCh != nil {
  145. wg.Add(1)
  146. go func() {
  147. defer wg.Done()
  148. ing.exitBuildCh <- msg
  149. }()
  150. }
  151. if ing.exitRunCh != nil {
  152. wg.Add(1)
  153. go func() {
  154. defer wg.Done()
  155. ing.exitRunCh <- msg
  156. }()
  157. }
  158. wg.Wait()
  159. // Declare that the store is officially no longer running. This allows
  160. // Start to be called again, restarting the store from scratch.
  161. ing.isRunning.Store(false)
  162. ing.isStopping.Store(false)
  163. }
  164. // Status returns an IngestorStatus that describes the current state of the ingestor
  165. func (ing *ingestor) Status() IngestorStatus {
  166. return IngestorStatus{
  167. Created: ing.creationTime,
  168. LastRun: ing.lastRun,
  169. NextRun: ing.lastRun.Add(ing.config.RefreshRate).UTC(),
  170. Runs: ing.runs,
  171. Coverage: ing.coverage,
  172. ConnectionStatus: ing.integration.GetStatus(),
  173. }
  174. }
  175. func (ing *ingestor) build(rebuild bool) {
  176. defer errors.HandlePanic()
  177. // Profile the full Duration of the build time
  178. buildStart := time.Now()
  179. // Build as far back as the configures build Duration
  180. limit := opencost.RoundBack(time.Now().UTC().Add(-ing.config.Duration), ing.config.Resolution)
  181. queryWindowStr := timeutil.FormatStoreResolution(ing.config.QueryWindow)
  182. log.Infof("CloudCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, limit.String(), queryWindowStr)
  183. // Start with a window of the configured Duration and ending on the given
  184. // start time. Build windows repeating until the window reaches the
  185. // given limit time
  186. // Round end times back to nearest Resolution points in the past,
  187. // querying for exactly one interval
  188. e := opencost.RoundBack(time.Now().UTC(), ing.config.Resolution)
  189. s := e.Add(-ing.config.QueryWindow)
  190. // Continue until limit is reached
  191. for limit.Before(e) {
  192. // If exit instruction is received, log and return
  193. select {
  194. case <-ing.exitBuildCh:
  195. log.Debugf("CloudCost[%s]: ingestor: build[%s]: exiting", ing.key, ing.runID)
  196. return
  197. default:
  198. }
  199. // Profile the current build step
  200. stepStart := time.Now()
  201. // if rebuild is not specified then check for existing coverage on window
  202. if rebuild {
  203. ing.BuildWindow(s, e)
  204. } else {
  205. ing.LoadWindow(s, e)
  206. }
  207. log.Infof("CloudCost[%s]: ingestor: build[%s]: %s in %v", ing.key, ing.runID, opencost.NewClosedWindow(s, e), time.Since(stepStart))
  208. // Shift to next QueryWindow
  209. s = s.Add(-ing.config.QueryWindow)
  210. if s.Before(limit) {
  211. s = limit
  212. }
  213. e = e.Add(-ing.config.QueryWindow)
  214. }
  215. log.Infof("CloudCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart))
  216. // In order to be able to Stop, we have to wait on an exit message
  217. // here
  218. <-ing.exitBuildCh
  219. }
  220. func (ing *ingestor) run() {
  221. defer errors.HandlePanic()
  222. ticker := timeutil.NewJobTicker()
  223. defer ticker.Close()
  224. ticker.TickIn(0)
  225. for {
  226. // If an exit instruction is received, break the run loop
  227. select {
  228. case <-ing.exitRunCh:
  229. log.Debugf("CloudCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
  230. return
  231. case <-ticker.Ch:
  232. // Wait for next tick
  233. }
  234. // Start from the last covered time, minus the RunWindow
  235. start := ing.lastRun
  236. start = start.Add(-ing.config.RunWindow)
  237. // Every Nth (determined by the MonthToDateRunInterval) run should be a month to date run. Where the start is
  238. // truncated to the beginning of its current month this can mean that early in a new month we will build all of
  239. // last month and the first few days of the current month.
  240. if ing.runs%ing.config.MonthToDateRunInterval == 0 {
  241. start = time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
  242. log.Infof("CloudCost[%s]: ingestor: Run[%s]: running month-to-date update starting at %s", ing.key, ing.runID, start.String())
  243. }
  244. // Round start time back to the nearest Resolution point in the past from the
  245. // last update to the QueryWindow
  246. s := opencost.RoundBack(start.UTC(), ing.config.Resolution)
  247. e := s.Add(ing.config.QueryWindow)
  248. // Start with a window of the configured Duration and starting on the given
  249. // start time. Do the following, repeating until the window reaches the
  250. // current time:
  251. // 1. Instruct builder to build window
  252. // 2. Move window forward one Resolution
  253. for time.Now().After(s) {
  254. profStart := time.Now()
  255. ing.BuildWindow(s, e)
  256. log.Debugf("CloudCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
  257. s = s.Add(ing.config.QueryWindow)
  258. e = e.Add(ing.config.QueryWindow)
  259. // prevent builds into the future
  260. if e.After(time.Now().UTC()) {
  261. e = opencost.RoundForward(time.Now().UTC(), ing.config.Resolution)
  262. }
  263. }
  264. ing.lastRun = time.Now().UTC()
  265. limit := opencost.RoundBack(time.Now().UTC(), ing.config.Resolution).Add(-ing.config.Duration)
  266. err := ing.repo.Expire(limit)
  267. if err != nil {
  268. log.Errorf("CloudCost: Ingestor: failed to expire Data: %s", err)
  269. }
  270. ing.coverageLock.Lock()
  271. ing.coverage = ing.coverage.ContractStart(limit)
  272. ing.coverageLock.Unlock()
  273. ing.runs++
  274. ticker.TickIn(ing.config.RefreshRate)
  275. }
  276. }
  277. func (ing *ingestor) expandCoverage(window opencost.Window) {
  278. if window.IsOpen() {
  279. return
  280. }
  281. ing.coverageLock.Lock()
  282. defer ing.coverageLock.Unlock()
  283. coverage := ing.coverage.ExpandStart(*window.Start())
  284. coverage = coverage.ExpandEnd(*window.End())
  285. ing.coverage = coverage
  286. }
  287. func (ing *ingestor) RefreshStatus() cloud.ConnectionStatus {
  288. return ing.integration.RefreshStatus()
  289. }