ingestor.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package customcost
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/hashicorp/go-plugin"
  9. "google.golang.org/protobuf/types/known/durationpb"
  10. "google.golang.org/protobuf/types/known/timestamppb"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/model/pb"
  13. "github.com/opencost/opencost/core/pkg/opencost"
  14. ocplugin "github.com/opencost/opencost/core/pkg/plugin"
  15. "github.com/opencost/opencost/core/pkg/util/stringutil"
  16. "github.com/opencost/opencost/core/pkg/util/timeutil"
  17. "github.com/opencost/opencost/pkg/env"
  18. "github.com/opencost/opencost/pkg/errors"
  19. )
  20. // IngestorStatus includes diagnostic values for a given Ingestor
  21. type IngestorStatus struct {
  22. Created time.Time
  23. LastRun time.Time
  24. NextRun time.Time
  25. Runs int
  26. Coverage map[string]opencost.Window
  27. RefreshRate time.Duration
  28. }
  29. // CustomCost IngestorConfig is a configuration struct for an Ingestor
  30. type CustomCostIngestorConfig struct {
  31. MonthToDateRunInterval int
  32. HourlyDuration, DailyDuration time.Duration
  33. DailyQueryWindow, HourlyQueryWindow time.Duration
  34. PluginConfigDir, PluginExecutableDir string
  35. }
  36. // DefaultIngestorConfiguration retrieves an CustomCostIngestorConfig from env variables
  37. func DefaultIngestorConfiguration() CustomCostIngestorConfig {
  38. return CustomCostIngestorConfig{
  39. DailyDuration: timeutil.Day * time.Duration(env.GetDataRetentionDailyResolutionDays()),
  40. HourlyDuration: time.Hour * time.Duration(env.GetDataRetentionHourlyResolutionHours()),
  41. MonthToDateRunInterval: env.GetCloudCostMonthToDateInterval(),
  42. DailyQueryWindow: timeutil.Day * time.Duration(env.GetCustomCostQueryWindowDays()),
  43. HourlyQueryWindow: time.Hour * time.Duration(env.GetCustomCostQueryWindowHours()),
  44. PluginConfigDir: env.GetPluginConfigDir(),
  45. PluginExecutableDir: env.GetPluginExecutableDir(),
  46. }
  47. }
  48. type CustomCostIngestor struct {
  49. key string
  50. config *CustomCostIngestorConfig
  51. repo Repository
  52. runID string
  53. lastRun time.Time
  54. runs int
  55. creationTime time.Time
  56. coverage map[string]opencost.Window
  57. coverageLock sync.Mutex
  58. isRunning atomic.Bool
  59. isStopping atomic.Bool
  60. exitBuildCh chan string
  61. exitRunCh chan string
  62. plugins map[string]*plugin.Client
  63. resolution time.Duration
  64. refreshRate time.Duration
  65. }
  66. // NewIngestor is an initializer for ingestor
  67. func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client, res time.Duration) (*CustomCostIngestor, error) {
  68. if repo == nil {
  69. return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
  70. }
  71. if ingestorConfig == nil {
  72. return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: config connot be nil")
  73. }
  74. key := ""
  75. for name := range plugins {
  76. key += "," + name
  77. }
  78. key = strings.TrimPrefix(key, ",")
  79. now := time.Now().UTC()
  80. return &CustomCostIngestor{
  81. key: key,
  82. config: ingestorConfig,
  83. repo: repo,
  84. creationTime: now,
  85. lastRun: now,
  86. coverage: map[string]opencost.Window{},
  87. plugins: plugins,
  88. resolution: res,
  89. refreshRate: res,
  90. }, nil
  91. }
  92. func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
  93. var targets []opencost.Window
  94. if ing.resolution == timeutil.Day {
  95. oldestDailyDate := time.Now().UTC().Add(-1 * ing.config.DailyDuration).Truncate(timeutil.Day)
  96. if !oldestDailyDate.After(start) {
  97. windows, err := opencost.GetWindows(start, end, timeutil.Day)
  98. if err != nil {
  99. log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  100. return
  101. }
  102. targets = windows
  103. }
  104. } else {
  105. oldestHourlyDate := time.Now().UTC().Add(-1 * ing.config.HourlyDuration).Truncate(time.Hour)
  106. if !oldestHourlyDate.After(start) {
  107. windows, err := opencost.GetWindows(start, end, time.Hour)
  108. if err != nil {
  109. log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  110. return
  111. }
  112. targets = windows
  113. }
  114. }
  115. for _, window := range targets {
  116. allPluginsHave := true
  117. for domain := range ing.plugins {
  118. has, err2 := ing.repo.Has(*window.Start(), domain)
  119. if err2 != nil {
  120. log.Errorf("CustomCost[%s]: ingestor: error when loading window for plugin %s: %s", ing.key, domain, err2.Error())
  121. }
  122. if !has {
  123. allPluginsHave = false
  124. break
  125. }
  126. }
  127. if !allPluginsHave {
  128. ing.BuildWindow(*window.Start(), *window.End())
  129. } else {
  130. for domain := range ing.plugins {
  131. ing.expandCoverage(window, domain)
  132. }
  133. log.Debugf("CustomCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
  134. }
  135. }
  136. }
  137. func (ing *CustomCostIngestor) BuildWindow(start, end time.Time) {
  138. for domain := range ing.plugins {
  139. ing.buildSingleDomain(start, end, domain)
  140. }
  141. }
  142. func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain string) {
  143. req := pb.CustomCostRequest{
  144. Start: timestamppb.New(start),
  145. End: timestamppb.New(end),
  146. Resolution: durationpb.New(ing.resolution),
  147. }
  148. log.Infof("ingestor: building window %s for plugin %s", opencost.NewWindow(&start, &end), domain)
  149. // make RPC call via plugin
  150. pluginClient, found := ing.plugins[domain]
  151. if !found {
  152. log.Errorf("could not find plugin client for plugin %s. Did you initialize the plugin correctly?", domain)
  153. return
  154. }
  155. // connect the client
  156. rpcClient, err := pluginClient.Client()
  157. if err != nil {
  158. log.Errorf("error connecting client for plugin %s: %v", domain, err)
  159. return
  160. }
  161. // TODO HAVE PLUG IN RETURN DATA AS PROTOBUF
  162. // Request the plugin
  163. raw, err := rpcClient.Dispense("CustomCostSource")
  164. if err != nil {
  165. log.Errorf("error creating new plugin client for plugin %s: %v", domain, err)
  166. return
  167. }
  168. custCostSrc := raw.(ocplugin.CustomCostSource)
  169. custCostResps := custCostSrc.GetCustomCosts(req)
  170. // loop through each customCostResponse, adding to repo
  171. for _, ccr := range custCostResps {
  172. // check for errors in response
  173. if len(ccr.Errors) > 0 {
  174. for _, errResp := range ccr.Errors {
  175. log.Errorf("error in getting custom costs for plugin %s: %v", domain, errResp)
  176. }
  177. log.Errorf("not adding any costs for window %v-%v on plugin %s", req.Start, req.End, domain)
  178. continue
  179. }
  180. log.Debugf("BuildWindow[%s]: GetCustomCost: writing custom costs for window %v-%v: %d", domain, ccr.Start, ccr.End, len(ccr.Costs))
  181. err2 := ing.repo.Put(&ccr)
  182. if err2 != nil {
  183. log.Errorf("CustomCost[%s]: ingestor: failed to save Custom Cost Set with window %v-%v: %s", domain, ccr.Start, ccr.End, err2.Error())
  184. }
  185. ing.expandCoverage(opencost.NewClosedWindow(ccr.Start.AsTime(), ccr.End.AsTime()), domain)
  186. }
  187. }
  188. func (ing *CustomCostIngestor) Start(rebuild bool) {
  189. // If already running, log that and return.
  190. if !ing.isRunning.CompareAndSwap(false, true) {
  191. log.Infof("CustomCost: ingestor: is already running")
  192. return
  193. }
  194. ing.runID = stringutil.RandSeq(5)
  195. ing.exitBuildCh = make(chan string)
  196. ing.exitRunCh = make(chan string)
  197. // Build the store once, advancing backward in time from the earliest
  198. // point of coverage.
  199. go ing.build(rebuild)
  200. go ing.run()
  201. }
  202. func (ing *CustomCostIngestor) Stop() {
  203. // If already stopping, log that and return.
  204. if !ing.isStopping.CompareAndSwap(false, true) {
  205. log.Infof("CloudCost: ingestor: is already stopping")
  206. return
  207. }
  208. msg := "Stopping"
  209. // If the processes are running (and thus there are channels available for
  210. // stopping them) then stop all sub-processes (i.e. build and run)
  211. var wg sync.WaitGroup
  212. if ing.exitBuildCh != nil {
  213. wg.Add(1)
  214. go func() {
  215. defer wg.Done()
  216. ing.exitBuildCh <- msg
  217. }()
  218. }
  219. if ing.exitRunCh != nil {
  220. wg.Add(1)
  221. go func() {
  222. defer wg.Done()
  223. ing.exitRunCh <- msg
  224. }()
  225. }
  226. wg.Wait()
  227. // Declare that the store is officially no longer running. This allows
  228. // Start to be called again, restarting the store from scratch.
  229. ing.isRunning.Store(false)
  230. ing.isStopping.Store(false)
  231. }
  232. // Status returns an IngestorStatus that describes the current state of the ingestor
  233. func (ing *CustomCostIngestor) Status() IngestorStatus {
  234. return IngestorStatus{
  235. Created: ing.creationTime,
  236. LastRun: ing.lastRun,
  237. NextRun: ing.lastRun.Add(ing.refreshRate).UTC(),
  238. Runs: ing.runs,
  239. Coverage: ing.coverage,
  240. RefreshRate: ing.refreshRate,
  241. }
  242. }
  243. func (ing *CustomCostIngestor) build(rebuild bool) {
  244. defer errors.HandlePanic()
  245. e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
  246. s := e.Add(-ing.config.DailyDuration)
  247. if ing.resolution == time.Hour {
  248. s = e.Add(-ing.config.HourlyDuration)
  249. }
  250. // Profile the full Duration of the build time
  251. buildStart := time.Now()
  252. log.Infof("CustomCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, s, ing.resolution)
  253. // if rebuild is not specified then check for existing coverage on window
  254. if rebuild {
  255. ing.BuildWindow(s, e)
  256. } else {
  257. ing.LoadWindow(s, e)
  258. }
  259. log.Infof(fmt.Sprintf("CustomCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart)))
  260. // In order to be able to Stop, we have to wait on an exit message
  261. // here
  262. <-ing.exitBuildCh
  263. }
  264. func (ing *CustomCostIngestor) Rebuild(domain string) error {
  265. targetDur := ing.config.DailyDuration
  266. if ing.resolution == time.Hour {
  267. targetDur = ing.config.HourlyDuration
  268. }
  269. // Build as far back as the configures build Duration
  270. limit := opencost.RoundBack(time.Now().UTC().Add(-1*targetDur), ing.resolution)
  271. e := time.Now().UTC()
  272. ing.buildSingleDomain(limit, e, domain)
  273. return nil
  274. }
  275. func (ing *CustomCostIngestor) run() {
  276. defer errors.HandlePanic()
  277. ticker := timeutil.NewJobTicker()
  278. defer ticker.Close()
  279. ticker.TickIn(0)
  280. for {
  281. // If an exit instruction is received, break the run loop
  282. select {
  283. case <-ing.exitRunCh:
  284. log.Debugf("CustomCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
  285. return
  286. case <-ticker.Ch:
  287. // Wait for next tick
  288. }
  289. // Start from the last covered time, minus the RunWindow
  290. start := ing.lastRun
  291. start = start.Add(-ing.resolution)
  292. queryWin := ing.config.DailyQueryWindow
  293. if ing.resolution == time.Hour {
  294. queryWin = ing.config.HourlyQueryWindow
  295. }
  296. // Round start time back to the nearest Resolution point in the past from the
  297. // last update to the QueryWindow
  298. s := opencost.RoundBack(start.UTC(), ing.resolution)
  299. e := s.Add(queryWin)
  300. // Start with a window of the configured Duration and starting on the given
  301. // start time. Do the following, repeating until the window reaches the
  302. // current time:
  303. // 1. Instruct builder to build window
  304. // 2. Move window forward one Resolution
  305. for time.Now().After(s) {
  306. profStart := time.Now()
  307. ing.BuildWindow(s, e)
  308. log.Debugf("CustomCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
  309. s = s.Add(queryWin)
  310. e = e.Add(queryWin)
  311. // prevent builds into the future
  312. if e.After(time.Now().UTC()) {
  313. e = opencost.RoundForward(time.Now().UTC(), ing.resolution)
  314. }
  315. }
  316. ing.lastRun = time.Now().UTC()
  317. ing.runs++
  318. ticker.TickIn(ing.refreshRate)
  319. }
  320. }
  321. func (ing *CustomCostIngestor) expandCoverage(window opencost.Window, plugin string) {
  322. if window.IsOpen() {
  323. return
  324. }
  325. ing.coverageLock.Lock()
  326. defer ing.coverageLock.Unlock()
  327. if _, hasCoverage := ing.coverage[plugin]; !hasCoverage {
  328. ing.coverage[plugin] = window.Clone()
  329. } else {
  330. // expand existing coverage
  331. ing.coverage[plugin] = ing.coverage[plugin].ExpandStart(*window.Start())
  332. ing.coverage[plugin] = ing.coverage[plugin].ExpandEnd(*window.End())
  333. }
  334. }