ingestor.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package customcost
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/hashicorp/go-plugin"
  9. "google.golang.org/protobuf/types/known/durationpb"
  10. "google.golang.org/protobuf/types/known/timestamppb"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/model/pb"
  13. "github.com/opencost/opencost/core/pkg/opencost"
  14. ocplugin "github.com/opencost/opencost/core/pkg/plugin"
  15. "github.com/opencost/opencost/core/pkg/util/stringutil"
  16. "github.com/opencost/opencost/core/pkg/util/timeutil"
  17. "github.com/opencost/opencost/pkg/env"
  18. "github.com/opencost/opencost/pkg/errors"
  19. )
  20. // IngestorStatus includes diagnostic values for a given Ingestor
  21. type IngestorStatus struct {
  22. Created time.Time
  23. LastRun time.Time
  24. NextRun time.Time
  25. Runs int
  26. Coverage map[string]opencost.Window
  27. RefreshRate time.Duration
  28. }
  29. // CustomCost IngestorConfig is a configuration struct for an Ingestor
  30. type CustomCostIngestorConfig struct {
  31. MonthToDateRunInterval int
  32. HourlyDuration, DailyDuration time.Duration
  33. DailyQueryWindow, HourlyQueryWindow time.Duration
  34. PluginConfigDir, PluginExecutableDir string
  35. }
  36. // DefaultIngestorConfiguration retrieves an CustomCostIngestorConfig from env variables
  37. func DefaultIngestorConfiguration() CustomCostIngestorConfig {
  38. return CustomCostIngestorConfig{
  39. DailyDuration: timeutil.Day * time.Duration(env.GetDataRetentionDailyResolutionDays()),
  40. HourlyDuration: time.Hour * time.Duration(env.GetDataRetentionHourlyResolutionHours()),
  41. DailyQueryWindow: timeutil.Day * time.Duration(env.GetCustomCostQueryWindowDays()),
  42. HourlyQueryWindow: time.Hour * time.Duration(env.GetCustomCostQueryWindowHours()),
  43. PluginConfigDir: env.GetPluginConfigDir(),
  44. PluginExecutableDir: env.GetPluginExecutableDir(),
  45. }
  46. }
  47. type CustomCostIngestor struct {
  48. key string
  49. config *CustomCostIngestorConfig
  50. repo Repository
  51. runID string
  52. lastRun time.Time
  53. runs int
  54. creationTime time.Time
  55. coverage map[string]opencost.Window
  56. coverageLock sync.Mutex
  57. isRunning atomic.Bool
  58. isStopping atomic.Bool
  59. exitBuildCh chan string
  60. exitRunCh chan string
  61. plugins map[string]*plugin.Client
  62. resolution time.Duration
  63. refreshRate time.Duration
  64. }
  65. // NewIngestor is an initializer for ingestor
  66. func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client, res time.Duration) (*CustomCostIngestor, error) {
  67. if repo == nil {
  68. return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
  69. }
  70. if ingestorConfig == nil {
  71. return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: config connot be nil")
  72. }
  73. key := ""
  74. for name := range plugins {
  75. key += "," + name
  76. }
  77. key = strings.TrimPrefix(key, ",")
  78. now := time.Now().UTC()
  79. return &CustomCostIngestor{
  80. key: key,
  81. config: ingestorConfig,
  82. repo: repo,
  83. creationTime: now,
  84. lastRun: now,
  85. coverage: map[string]opencost.Window{},
  86. plugins: plugins,
  87. resolution: res,
  88. refreshRate: res,
  89. }, nil
  90. }
  91. func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
  92. var targets []opencost.Window
  93. if ing.resolution == timeutil.Day {
  94. oldestDailyDate := time.Now().UTC().Add(-1 * ing.config.DailyDuration).Truncate(timeutil.Day)
  95. if !oldestDailyDate.After(start) {
  96. windows, err := opencost.GetWindows(start, end, timeutil.Day)
  97. if err != nil {
  98. log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  99. return
  100. }
  101. targets = windows
  102. }
  103. } else {
  104. oldestHourlyDate := time.Now().UTC().Add(-1 * ing.config.HourlyDuration).Truncate(time.Hour)
  105. if !oldestHourlyDate.After(start) {
  106. windows, err := opencost.GetWindows(start, end, time.Hour)
  107. if err != nil {
  108. log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  109. return
  110. }
  111. targets = windows
  112. }
  113. }
  114. for _, window := range targets {
  115. allPluginsHave := true
  116. for domain := range ing.plugins {
  117. has, err2 := ing.repo.Has(*window.Start(), domain)
  118. if err2 != nil {
  119. log.Errorf("CustomCost[%s]: ingestor: error when loading window for plugin %s: %s", ing.key, domain, err2.Error())
  120. }
  121. if !has {
  122. allPluginsHave = false
  123. break
  124. }
  125. }
  126. if !allPluginsHave {
  127. ing.BuildWindow(*window.Start(), *window.End())
  128. } else {
  129. for domain := range ing.plugins {
  130. ing.expandCoverage(window, domain)
  131. }
  132. log.Debugf("CustomCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
  133. }
  134. }
  135. }
  136. func (ing *CustomCostIngestor) BuildWindow(start, end time.Time) {
  137. for domain := range ing.plugins {
  138. ing.buildSingleDomain(start, end, domain)
  139. }
  140. }
  141. func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain string) {
  142. req := &pb.CustomCostRequest{
  143. Start: timestamppb.New(start),
  144. End: timestamppb.New(end),
  145. Resolution: durationpb.New(ing.resolution),
  146. }
  147. log.Infof("ingestor: building window %s for plugin %s", opencost.NewWindow(&start, &end), domain)
  148. // make RPC call via plugin
  149. pluginClient, found := ing.plugins[domain]
  150. if !found {
  151. log.Errorf("could not find plugin client for plugin %s. Did you initialize the plugin correctly?", domain)
  152. return
  153. }
  154. // connect the client
  155. rpcClient, err := pluginClient.Client()
  156. if err != nil {
  157. log.Errorf("error connecting client for plugin %s: %v", domain, err)
  158. return
  159. }
  160. // Request the plugin
  161. raw, err := rpcClient.Dispense("CustomCostSource")
  162. if err != nil {
  163. log.Errorf("error creating new plugin client for plugin %s: %v", domain, err)
  164. return
  165. }
  166. custCostSrc := raw.(ocplugin.CustomCostSource)
  167. custCostResps := custCostSrc.GetCustomCosts(req)
  168. // loop through each customCostResponse, adding to repo
  169. for _, ccr := range custCostResps {
  170. // check for errors in response
  171. if len(ccr.Errors) > 0 {
  172. for _, errResp := range ccr.Errors {
  173. log.Errorf("error in getting custom costs for plugin %s: %v", domain, errResp)
  174. }
  175. log.Errorf("not adding any costs for window %v-%v on plugin %s", req.Start, req.End, domain)
  176. continue
  177. }
  178. log.Debugf("BuildWindow[%s]: GetCustomCost: writing custom costs for window %v-%v: %d", domain, ccr.Start, ccr.End, len(ccr.Costs))
  179. err2 := ing.repo.Put(ccr)
  180. if err2 != nil {
  181. log.Errorf("CustomCost[%s]: ingestor: failed to save Custom Cost Set with window %v-%v: %s", domain, ccr.Start, ccr.End, err2.Error())
  182. }
  183. ing.expandCoverage(opencost.NewClosedWindow(ccr.Start.AsTime(), ccr.End.AsTime()), domain)
  184. }
  185. }
  186. func (ing *CustomCostIngestor) Start(rebuild bool) {
  187. // If already running, log that and return.
  188. if !ing.isRunning.CompareAndSwap(false, true) {
  189. log.Infof("CustomCost: ingestor: is already running")
  190. return
  191. }
  192. ing.runID = stringutil.RandSeq(5)
  193. ing.exitBuildCh = make(chan string)
  194. ing.exitRunCh = make(chan string)
  195. // Build the store once, advancing backward in time from the earliest
  196. // point of coverage.
  197. go ing.build(rebuild)
  198. go ing.run()
  199. }
  200. func (ing *CustomCostIngestor) Stop() {
  201. // If already stopping, log that and return.
  202. if !ing.isStopping.CompareAndSwap(false, true) {
  203. log.Infof("CustomCost: ingestor: is already stopping")
  204. return
  205. }
  206. msg := "Stopping"
  207. // If the processes are running (and thus there are channels available for
  208. // stopping them) then stop all sub-processes (i.e. build and run)
  209. var wg sync.WaitGroup
  210. if ing.exitBuildCh != nil {
  211. wg.Add(1)
  212. go func() {
  213. defer wg.Done()
  214. ing.exitBuildCh <- msg
  215. }()
  216. }
  217. if ing.exitRunCh != nil {
  218. wg.Add(1)
  219. go func() {
  220. defer wg.Done()
  221. ing.exitRunCh <- msg
  222. }()
  223. }
  224. wg.Wait()
  225. // Declare that the store is officially no longer running. This allows
  226. // Start to be called again, restarting the store from scratch.
  227. ing.isRunning.Store(false)
  228. ing.isStopping.Store(false)
  229. }
  230. // Status returns an IngestorStatus that describes the current state of the ingestor
  231. func (ing *CustomCostIngestor) Status() IngestorStatus {
  232. return IngestorStatus{
  233. Created: ing.creationTime,
  234. LastRun: ing.lastRun,
  235. NextRun: ing.lastRun.Add(ing.refreshRate).UTC(),
  236. Runs: ing.runs,
  237. Coverage: ing.coverage,
  238. RefreshRate: ing.refreshRate,
  239. }
  240. }
  241. func (ing *CustomCostIngestor) build(rebuild bool) {
  242. defer errors.HandlePanic()
  243. e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
  244. s := e.Add(-ing.config.DailyDuration)
  245. if ing.resolution == time.Hour {
  246. s = e.Add(-ing.config.HourlyDuration)
  247. }
  248. // Profile the full Duration of the build time
  249. buildStart := time.Now()
  250. log.Infof("CustomCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, s, ing.resolution)
  251. // if rebuild is not specified then check for existing coverage on window
  252. if rebuild {
  253. ing.BuildWindow(s, e)
  254. } else {
  255. ing.LoadWindow(s, e)
  256. }
  257. log.Infof(fmt.Sprintf("CustomCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart)))
  258. // In order to be able to Stop, we have to wait on an exit message
  259. // here
  260. <-ing.exitBuildCh
  261. }
  262. func (ing *CustomCostIngestor) Rebuild(domain string) error {
  263. targetDur := ing.config.DailyDuration
  264. if ing.resolution == time.Hour {
  265. targetDur = ing.config.HourlyDuration
  266. }
  267. // Build as far back as the configures build Duration
  268. limit := opencost.RoundBack(time.Now().UTC().Add(-1*targetDur), ing.resolution)
  269. e := time.Now().UTC()
  270. ing.buildSingleDomain(limit, e, domain)
  271. return nil
  272. }
  273. func (ing *CustomCostIngestor) run() {
  274. defer errors.HandlePanic()
  275. ticker := timeutil.NewJobTicker()
  276. defer ticker.Close()
  277. ticker.TickIn(0)
  278. for {
  279. // If an exit instruction is received, break the run loop
  280. select {
  281. case <-ing.exitRunCh:
  282. log.Debugf("CustomCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
  283. return
  284. case <-ticker.Ch:
  285. // Wait for next tick
  286. }
  287. queryWin := ing.config.DailyQueryWindow
  288. if ing.resolution == time.Hour {
  289. queryWin = ing.config.HourlyQueryWindow
  290. }
  291. // Start from the last covered time, minus the query window
  292. // this allows re-querying of data as the plugin providers' data may stabilize over time
  293. start := ing.lastRun
  294. start = start.Add(-1 * queryWin)
  295. // Round start time back to the nearest Resolution point in the past from the
  296. // last update to the QueryWindow
  297. s := opencost.RoundBack(start.UTC(), ing.resolution)
  298. e := s.Add(queryWin)
  299. // Start with a window of the configured Duration and starting on the given
  300. // start time. Do the following, repeating until the window reaches the
  301. // current time:
  302. // 1. Instruct builder to build window
  303. // 2. Move window forward one Resolution
  304. for time.Now().After(s) {
  305. profStart := time.Now()
  306. ing.BuildWindow(s, e)
  307. log.Debugf("CustomCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
  308. s = s.Add(queryWin)
  309. e = e.Add(queryWin)
  310. // prevent builds into the future
  311. if e.After(time.Now().UTC()) {
  312. e = opencost.RoundForward(time.Now().UTC(), ing.resolution)
  313. }
  314. }
  315. ing.lastRun = time.Now().UTC()
  316. ing.runs++
  317. ticker.TickIn(ing.refreshRate)
  318. }
  319. }
  320. func (ing *CustomCostIngestor) expandCoverage(window opencost.Window, plugin string) {
  321. if window.IsOpen() {
  322. return
  323. }
  324. ing.coverageLock.Lock()
  325. defer ing.coverageLock.Unlock()
  326. if _, hasCoverage := ing.coverage[plugin]; !hasCoverage {
  327. ing.coverage[plugin] = window.Clone()
  328. } else {
  329. // expand existing coverage
  330. ing.coverage[plugin] = ing.coverage[plugin].ExpandStart(*window.Start())
  331. ing.coverage[plugin] = ing.coverage[plugin].ExpandEnd(*window.End())
  332. }
  333. }