| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- package cloudcost
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- "github.com/opencost/opencost/core/pkg/errors"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/opencost"
- "github.com/opencost/opencost/core/pkg/util/stringutil"
- "github.com/opencost/opencost/core/pkg/util/timeutil"
- "github.com/opencost/opencost/pkg/cloud"
- "github.com/opencost/opencost/pkg/env"
- )
- // IngestorStatus includes diagnostic values for a given Ingestor
- type IngestorStatus struct {
- Created time.Time
- LastRun time.Time
- NextRun time.Time
- Runs int
- Coverage opencost.Window
- ConnectionStatus cloud.ConnectionStatus
- }
- // IngestorConfig is a configuration struct for an Ingestor
- type IngestorConfig struct {
- MonthToDateRunInterval int
- RefreshRate time.Duration
- Resolution time.Duration
- Duration time.Duration
- QueryWindow time.Duration
- RunWindow time.Duration
- }
- // DefaultIngestorConfiguration retrieves an IngestorConfig from env variables
- func DefaultIngestorConfiguration() IngestorConfig {
- return IngestorConfig{
- Resolution: timeutil.Day,
- Duration: timeutil.Day * time.Duration(env.GetCloudCost1dRetention()),
- MonthToDateRunInterval: env.GetCloudCostMonthToDateInterval(),
- RefreshRate: time.Hour * time.Duration(env.GetCloudCostRefreshRateHours()),
- QueryWindow: timeutil.Day * time.Duration(env.GetCloudCostQueryWindowDays()),
- RunWindow: timeutil.Day * time.Duration(env.GetCloudCostRunWindowDays()),
- }
- }
- // ingestor runs the process for ingesting CloudCost from its CloudCostIntegration and store it in a Repository
- type ingestor struct {
- key string
- integration CloudCostIntegration
- config IngestorConfig
- repo Repository
- runID string
- lastRun time.Time
- runs int
- creationTime time.Time
- coverage opencost.Window
- coverageLock sync.Mutex
- isRunning atomic.Bool
- isStopping atomic.Bool
- exitBuildCh chan string
- exitRunCh chan string
- }
- // NewIngestor is an initializer for ingestor
- func NewIngestor(ingestorConfig IngestorConfig, repo Repository, config cloud.KeyedConfig) (*ingestor, error) {
- if repo == nil {
- return nil, fmt.Errorf("CloudCost: NewIngestor: repository connot be nil")
- }
- if config == nil {
- return nil, fmt.Errorf("CloudCost: NewIngestor: integration connot be nil")
- }
- cci := GetIntegrationFromConfig(config)
- if cci == nil {
- return nil, fmt.Errorf("CloudCost: NewIngestor: provider integration config was not a valid type: %T", config)
- }
- now := time.Now().UTC()
- midnight := opencost.RoundForward(now, timeutil.Day)
- return &ingestor{
- config: ingestorConfig,
- repo: repo,
- key: config.Key(),
- integration: cci,
- creationTime: now,
- lastRun: now,
- coverage: opencost.NewClosedWindow(midnight, midnight),
- }, nil
- }
- func (ing *ingestor) LoadWindow(start, end time.Time) {
- windows, err := opencost.GetWindows(start, end, timeutil.Day)
- if err != nil {
- log.Errorf("CloudCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
- return
- }
- for _, window := range windows {
- has, err2 := ing.repo.Has(*window.Start(), ing.key)
- if err2 != nil {
- log.Errorf("CloudCost[%s]: ingestor: error when loading window: %s", ing.key, err2.Error())
- }
- if !has {
- ing.BuildWindow(start, end)
- return
- }
- ing.expandCoverage(window)
- log.Debugf("CloudCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
- }
- }
- func (ing *ingestor) BuildWindow(start, end time.Time) {
- log.Infof("CloudCost[%s]: ingestor: building window %s", ing.key, opencost.NewWindow(&start, &end))
- ccsr, err := ing.integration.GetCloudCost(start, end)
- if err != nil {
- log.Errorf("CloudCost[%s]: ingestor: build failed for window %s: %s", ing.key, opencost.NewWindow(&start, &end), err.Error())
- return
- }
- for _, ccs := range ccsr.CloudCostSets {
- log.Debugf("BuildWindow[%s]: GetCloudCost: writing cloud costs for window %s: %d", ccs.Integration, ccs.Window, len(ccs.CloudCosts))
- err2 := ing.repo.Put(ccs)
- if err2 != nil {
- log.Errorf("CloudCost[%s]: ingestor: failed to save Cloud Cost Set with window %s: %s", ing.key, ccs.GetWindow().String(), err2.Error())
- }
- ing.expandCoverage(ccs.Window)
- }
- }
- func (ing *ingestor) Start(rebuild bool) {
- // If already running, log that and return.
- if !ing.isRunning.CompareAndSwap(false, true) {
- log.Infof("CloudCost: ingestor: is already running")
- return
- }
- ing.runID = stringutil.RandSeq(5)
- ing.exitBuildCh = make(chan string)
- ing.exitRunCh = make(chan string)
- // Build the store once, advancing backward in time from the earliest
- // point of coverage.
- go ing.build(rebuild)
- go ing.run()
- }
- func (ing *ingestor) Stop() {
- // If already stopping, log that and return.
- if !ing.isStopping.CompareAndSwap(false, true) {
- log.Infof("CloudCost: ingestor: is already stopping")
- return
- }
- msg := "Stopping"
- // If the processes are running (and thus there are channels available for
- // stopping them) then stop all sub-processes (i.e. build and run)
- var wg sync.WaitGroup
- if ing.exitBuildCh != nil {
- wg.Add(1)
- go func() {
- defer wg.Done()
- ing.exitBuildCh <- msg
- }()
- }
- if ing.exitRunCh != nil {
- wg.Add(1)
- go func() {
- defer wg.Done()
- ing.exitRunCh <- msg
- }()
- }
- wg.Wait()
- // Declare that the store is officially no longer running. This allows
- // Start to be called again, restarting the store from scratch.
- ing.isRunning.Store(false)
- ing.isStopping.Store(false)
- }
- // Status returns an IngestorStatus that describes the current state of the ingestor
- func (ing *ingestor) Status() IngestorStatus {
- return IngestorStatus{
- Created: ing.creationTime,
- LastRun: ing.lastRun,
- NextRun: ing.lastRun.Add(ing.config.RefreshRate).UTC(),
- Runs: ing.runs,
- Coverage: ing.coverage,
- ConnectionStatus: ing.integration.GetStatus(),
- }
- }
- func (ing *ingestor) build(rebuild bool) {
- defer errors.HandlePanic()
- // Profile the full Duration of the build time
- buildStart := time.Now()
- // Build as far back as the configures build Duration
- limit := opencost.RoundBack(time.Now().UTC().Add(-ing.config.Duration), ing.config.Resolution)
- queryWindowStr := timeutil.FormatStoreResolution(ing.config.QueryWindow)
- log.Infof("CloudCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, limit.String(), queryWindowStr)
- // Start with a window of the configured Duration and ending on the given
- // start time. Build windows repeating until the window reaches the
- // given limit time
- // Round end times back to nearest Resolution points in the past,
- // querying for exactly one interval
- e := opencost.RoundBack(time.Now().UTC(), ing.config.Resolution)
- s := e.Add(-ing.config.QueryWindow)
- // Continue until limit is reached
- for limit.Before(e) {
- // If exit instruction is received, log and return
- select {
- case <-ing.exitBuildCh:
- log.Debugf("CloudCost[%s]: ingestor: build[%s]: exiting", ing.key, ing.runID)
- return
- default:
- }
- // Profile the current build step
- stepStart := time.Now()
- // if rebuild is not specified then check for existing coverage on window
- if rebuild {
- ing.BuildWindow(s, e)
- } else {
- ing.LoadWindow(s, e)
- }
- log.Infof("CloudCost[%s]: ingestor: build[%s]: %s in %v", ing.key, ing.runID, opencost.NewClosedWindow(s, e), time.Since(stepStart))
- // Shift to next QueryWindow
- s = s.Add(-ing.config.QueryWindow)
- if s.Before(limit) {
- s = limit
- }
- e = e.Add(-ing.config.QueryWindow)
- }
- log.Infof("CloudCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart))
- // In order to be able to Stop, we have to wait on an exit message
- // here
- <-ing.exitBuildCh
- }
- func (ing *ingestor) run() {
- defer errors.HandlePanic()
- ticker := timeutil.NewJobTicker()
- defer ticker.Close()
- ticker.TickIn(0)
- for {
- // If an exit instruction is received, break the run loop
- select {
- case <-ing.exitRunCh:
- log.Debugf("CloudCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
- return
- case <-ticker.Ch:
- // Wait for next tick
- }
- // Start from the last covered time, minus the RunWindow
- start := ing.lastRun
- start = start.Add(-ing.config.RunWindow)
- // Every Nth (determined by the MonthToDateRunInterval) run should be a month to date run. Where the start is
- // truncated to the beginning of its current month this can mean that early in a new month we will build all of
- // last month and the first few days of the current month.
- if ing.runs%ing.config.MonthToDateRunInterval == 0 {
- start = time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
- log.Infof("CloudCost[%s]: ingestor: Run[%s]: running month-to-date update starting at %s", ing.key, ing.runID, start.String())
- }
- // Round start time back to the nearest Resolution point in the past from the
- // last update to the QueryWindow
- s := opencost.RoundBack(start.UTC(), ing.config.Resolution)
- e := s.Add(ing.config.QueryWindow)
- // Start with a window of the configured Duration and starting on the given
- // start time. Do the following, repeating until the window reaches the
- // current time:
- // 1. Instruct builder to build window
- // 2. Move window forward one Resolution
- for time.Now().After(s) {
- profStart := time.Now()
- ing.BuildWindow(s, e)
- log.Debugf("CloudCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
- s = s.Add(ing.config.QueryWindow)
- e = e.Add(ing.config.QueryWindow)
- // prevent builds into the future
- if e.After(time.Now().UTC()) {
- e = opencost.RoundForward(time.Now().UTC(), ing.config.Resolution)
- }
- }
- ing.lastRun = time.Now().UTC()
- limit := opencost.RoundBack(time.Now().UTC(), ing.config.Resolution).Add(-ing.config.Duration)
- err := ing.repo.Expire(limit)
- if err != nil {
- log.Errorf("CloudCost: Ingestor: failed to expire Data: %s", err)
- }
- ing.coverageLock.Lock()
- ing.coverage = ing.coverage.ContractStart(limit)
- ing.coverageLock.Unlock()
- ing.runs++
- ticker.TickIn(ing.config.RefreshRate)
- }
- }
- func (ing *ingestor) expandCoverage(window opencost.Window) {
- if window.IsOpen() {
- return
- }
- ing.coverageLock.Lock()
- defer ing.coverageLock.Unlock()
- coverage := ing.coverage.ExpandStart(*window.Start())
- coverage = coverage.ExpandEnd(*window.End())
- ing.coverage = coverage
- }
- func (ing *ingestor) RefreshStatus() cloud.ConnectionStatus {
- return ing.integration.RefreshStatus()
- }
|