ingestor.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package customcost
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/hashicorp/go-plugin"
  9. "google.golang.org/protobuf/types/known/durationpb"
  10. "google.golang.org/protobuf/types/known/timestamppb"
  11. "github.com/opencost/opencost/core/pkg/errors"
  12. "github.com/opencost/opencost/core/pkg/log"
  13. "github.com/opencost/opencost/core/pkg/model/pb"
  14. "github.com/opencost/opencost/core/pkg/opencost"
  15. ocplugin "github.com/opencost/opencost/core/pkg/plugin"
  16. "github.com/opencost/opencost/core/pkg/util/stringutil"
  17. "github.com/opencost/opencost/core/pkg/util/timeutil"
  18. "github.com/opencost/opencost/pkg/env"
  19. )
  20. // IngestorStatus includes diagnostic values for a given Ingestor
  21. type IngestorStatus struct {
  22. Created time.Time
  23. LastRun time.Time
  24. NextRun time.Time
  25. Runs int
  26. Coverage map[string]opencost.Window
  27. RefreshRate time.Duration
  28. }
  29. // CustomCost IngestorConfig is a configuration struct for an Ingestor
  30. type CustomCostIngestorConfig struct {
  31. MonthToDateRunInterval int
  32. HourlyDuration, DailyDuration time.Duration
  33. DailyQueryWindow, HourlyQueryWindow time.Duration
  34. PluginConfigDir, PluginExecutableDir string
  35. }
  36. // DefaultIngestorConfiguration retrieves an CustomCostIngestorConfig from env variables
  37. func DefaultIngestorConfiguration() CustomCostIngestorConfig {
  38. return CustomCostIngestorConfig{
  39. DailyDuration: timeutil.Day * time.Duration(env.GetCustomCost1dRetention()),
  40. HourlyDuration: time.Hour * time.Duration(env.GetCustomCost1hRetention()),
  41. DailyQueryWindow: timeutil.Day * time.Duration(env.GetCustomCostQueryWindowDays()),
  42. HourlyQueryWindow: time.Hour * time.Duration(env.GetCustomCostQueryWindowHours()),
  43. PluginConfigDir: env.GetPluginConfigDir(),
  44. PluginExecutableDir: env.GetPluginExecutableDir(),
  45. }
  46. }
  47. type CustomCostIngestor struct {
  48. key string
  49. config *CustomCostIngestorConfig
  50. repo Repository
  51. runID string
  52. lastRun time.Time
  53. runs int
  54. creationTime time.Time
  55. coverage map[string]opencost.Window
  56. coverageLock sync.Mutex
  57. isRunning atomic.Bool
  58. isStopping atomic.Bool
  59. exitBuildCh chan string
  60. exitRunCh chan string
  61. plugins map[string]*plugin.Client
  62. pluginsLock sync.RWMutex
  63. resolution time.Duration
  64. refreshRate time.Duration
  65. }
  66. // NewIngestor is an initializer for ingestor
  67. func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client, res time.Duration) (*CustomCostIngestor, error) {
  68. if repo == nil {
  69. return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
  70. }
  71. if ingestorConfig == nil {
  72. return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: config connot be nil")
  73. }
  74. key := ""
  75. for name := range plugins {
  76. key += "," + name
  77. }
  78. key = strings.TrimPrefix(key, ",")
  79. now := time.Now().UTC()
  80. return &CustomCostIngestor{
  81. key: key,
  82. config: ingestorConfig,
  83. repo: repo,
  84. creationTime: now,
  85. lastRun: now,
  86. coverage: map[string]opencost.Window{},
  87. plugins: plugins,
  88. resolution: res,
  89. refreshRate: res,
  90. }, nil
  91. }
  92. func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
  93. var targets []opencost.Window
  94. if ing.resolution == timeutil.Day {
  95. oldestDailyDate := time.Now().UTC().Add(-1 * ing.config.DailyDuration).Truncate(timeutil.Day)
  96. if !oldestDailyDate.After(start) {
  97. windows, err := opencost.GetWindows(start, end, timeutil.Day)
  98. if err != nil {
  99. log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  100. return
  101. }
  102. targets = windows
  103. }
  104. } else {
  105. oldestHourlyDate := time.Now().UTC().Add(-1 * ing.config.HourlyDuration).Truncate(time.Hour)
  106. if !oldestHourlyDate.After(start) {
  107. windows, err := opencost.GetWindows(start, end, time.Hour)
  108. if err != nil {
  109. log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  110. return
  111. }
  112. targets = windows
  113. }
  114. }
  115. for _, window := range targets {
  116. allPluginsHave := true
  117. ing.pluginsLock.RLock()
  118. for domain := range ing.plugins {
  119. has, err2 := ing.repo.Has(*window.Start(), domain)
  120. if err2 != nil {
  121. log.Errorf("CustomCost[%s]: ingestor: error when loading window for plugin %s: %s", ing.key, domain, err2.Error())
  122. }
  123. if !has {
  124. allPluginsHave = false
  125. break
  126. }
  127. }
  128. ing.pluginsLock.RUnlock()
  129. if !allPluginsHave {
  130. ing.BuildWindow(*window.Start(), *window.End())
  131. } else {
  132. ing.pluginsLock.RLock()
  133. for domain := range ing.plugins {
  134. ing.expandCoverage(window, domain)
  135. }
  136. ing.pluginsLock.RUnlock()
  137. log.Debugf("CustomCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
  138. }
  139. }
  140. }
  141. func (ing *CustomCostIngestor) BuildWindow(start, end time.Time) {
  142. ing.pluginsLock.RLock()
  143. for domain := range ing.plugins {
  144. ing.buildSingleDomain(start, end, domain)
  145. }
  146. ing.pluginsLock.RUnlock()
  147. }
  148. func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain string) {
  149. req := &pb.CustomCostRequest{
  150. Start: timestamppb.New(start),
  151. End: timestamppb.New(end),
  152. Resolution: durationpb.New(ing.resolution),
  153. }
  154. log.Infof("ingestor: building window %s for plugin %s", opencost.NewWindow(&start, &end), domain)
  155. // make RPC call via plugin
  156. ing.pluginsLock.RLock()
  157. pluginClient, found := ing.plugins[domain]
  158. ing.pluginsLock.RUnlock()
  159. if !found {
  160. log.Errorf("could not find plugin client for plugin %s. Did you initialize the plugin correctly?", domain)
  161. return
  162. }
  163. // connect the client
  164. rpcClient, err := pluginClient.Client()
  165. if err != nil {
  166. log.Errorf("error connecting client for plugin %s: %v", domain, err)
  167. return
  168. }
  169. // Request the plugin
  170. raw, err := rpcClient.Dispense("CustomCostSource")
  171. if err != nil {
  172. log.Errorf("error creating new plugin client for plugin %s: %v", domain, err)
  173. return
  174. }
  175. custCostSrc := raw.(ocplugin.CustomCostSource)
  176. custCostResps := custCostSrc.GetCustomCosts(req)
  177. // loop through each customCostResponse, adding to repo
  178. for _, ccr := range custCostResps {
  179. // check for errors in response
  180. if len(ccr.Errors) > 0 {
  181. for _, errResp := range ccr.Errors {
  182. log.Errorf("error in getting custom costs for plugin %s: %v", domain, errResp)
  183. }
  184. log.Errorf("not adding any costs for window %v-%v on plugin %s", req.Start, req.End, domain)
  185. continue
  186. }
  187. log.Debugf("BuildWindow[%s]: GetCustomCost: writing custom costs for window %v-%v: %d", domain, ccr.Start, ccr.End, len(ccr.Costs))
  188. err2 := ing.repo.Put(ccr)
  189. if err2 != nil {
  190. log.Errorf("CustomCost[%s]: ingestor: failed to save Custom Cost Set with window %v-%v: %s", domain, ccr.Start, ccr.End, err2.Error())
  191. }
  192. ing.expandCoverage(opencost.NewClosedWindow(ccr.Start.AsTime(), ccr.End.AsTime()), domain)
  193. }
  194. }
  195. func (ing *CustomCostIngestor) Start(rebuild bool) {
  196. // If already running, log that and return.
  197. if !ing.isRunning.CompareAndSwap(false, true) {
  198. log.Infof("CustomCost: ingestor: is already running")
  199. return
  200. }
  201. ing.runID = stringutil.RandSeq(5)
  202. ing.exitBuildCh = make(chan string)
  203. ing.exitRunCh = make(chan string)
  204. // Build the store once, advancing backward in time from the earliest
  205. // point of coverage.
  206. go ing.build(rebuild)
  207. go ing.run()
  208. }
  209. func (ing *CustomCostIngestor) Stop() {
  210. // If already stopping, log that and return.
  211. if !ing.isStopping.CompareAndSwap(false, true) {
  212. log.Infof("CustomCost: ingestor: is already stopping")
  213. return
  214. }
  215. msg := "Stopping"
  216. // If the processes are running (and thus there are channels available for
  217. // stopping them) then stop all sub-processes (i.e. build and run)
  218. var wg sync.WaitGroup
  219. if ing.exitBuildCh != nil {
  220. wg.Add(1)
  221. go func() {
  222. defer wg.Done()
  223. ing.exitBuildCh <- msg
  224. }()
  225. }
  226. if ing.exitRunCh != nil {
  227. wg.Add(1)
  228. go func() {
  229. defer wg.Done()
  230. ing.exitRunCh <- msg
  231. }()
  232. }
  233. wg.Wait()
  234. // Kill all plugin client processes before returning
  235. ing.pluginsLock.Lock()
  236. for name, client := range ing.plugins {
  237. if client != nil {
  238. log.Debugf("CustomCost[%s]: ingestor: killing plugin process: %s", ing.key, name)
  239. client.Kill()
  240. }
  241. }
  242. ing.pluginsLock.Unlock()
  243. // Mark as no longer running so Start() can be called again if needed
  244. ing.isRunning.Store(false)
  245. ing.isStopping.Store(false)
  246. }
  247. // Status returns an IngestorStatus that describes the current state of the ingestor
  248. func (ing *CustomCostIngestor) Status() IngestorStatus {
  249. return IngestorStatus{
  250. Created: ing.creationTime,
  251. LastRun: ing.lastRun,
  252. NextRun: ing.lastRun.Add(ing.refreshRate).UTC(),
  253. Runs: ing.runs,
  254. Coverage: ing.coverage,
  255. RefreshRate: ing.refreshRate,
  256. }
  257. }
  258. func (ing *CustomCostIngestor) build(rebuild bool) {
  259. defer errors.HandlePanic()
  260. e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
  261. s := e.Add(-ing.config.DailyDuration)
  262. if ing.resolution == time.Hour {
  263. s = e.Add(-ing.config.HourlyDuration)
  264. }
  265. // Profile the full Duration of the build time
  266. buildStart := time.Now()
  267. log.Infof("CustomCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, s, ing.resolution)
  268. // if rebuild is not specified then check for existing coverage on window
  269. if rebuild {
  270. ing.BuildWindow(s, e)
  271. } else {
  272. ing.LoadWindow(s, e)
  273. }
  274. log.Infof("CustomCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart))
  275. // In order to be able to Stop, we have to wait on an exit message
  276. // here
  277. <-ing.exitBuildCh
  278. }
  279. func (ing *CustomCostIngestor) Rebuild(domain string) error {
  280. targetDur := ing.config.DailyDuration
  281. if ing.resolution == time.Hour {
  282. targetDur = ing.config.HourlyDuration
  283. }
  284. // Build as far back as the configures build Duration
  285. limit := opencost.RoundBack(time.Now().UTC().Add(-1*targetDur), ing.resolution)
  286. e := time.Now().UTC()
  287. ing.buildSingleDomain(limit, e, domain)
  288. return nil
  289. }
  290. func (ing *CustomCostIngestor) run() {
  291. defer errors.HandlePanic()
  292. ticker := timeutil.NewJobTicker()
  293. defer ticker.Close()
  294. ticker.TickIn(0)
  295. for {
  296. // If an exit instruction is received, break the run loop
  297. select {
  298. case <-ing.exitRunCh:
  299. log.Debugf("CustomCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
  300. return
  301. case <-ticker.Ch:
  302. // Wait for next tick
  303. }
  304. queryWin := ing.config.DailyQueryWindow
  305. if ing.resolution == time.Hour {
  306. queryWin = ing.config.HourlyQueryWindow
  307. }
  308. // Start from the last covered time, minus the query window
  309. // this allows re-querying of data as the plugin providers' data may stabilize over time
  310. start := ing.lastRun
  311. start = start.Add(-1 * queryWin)
  312. // Round start time back to the nearest Resolution point in the past from the
  313. // last update to the QueryWindow
  314. s := opencost.RoundBack(start.UTC(), ing.resolution)
  315. e := s.Add(queryWin)
  316. // Start with a window of the configured Duration and starting on the given
  317. // start time. Do the following, repeating until the window reaches the
  318. // current time:
  319. // 1. Instruct builder to build window
  320. // 2. Move window forward one Resolution
  321. for time.Now().After(s) {
  322. profStart := time.Now()
  323. ing.BuildWindow(s, e)
  324. log.Debugf("CustomCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
  325. s = s.Add(queryWin)
  326. e = e.Add(queryWin)
  327. // prevent builds into the future
  328. if e.After(time.Now().UTC()) {
  329. e = opencost.RoundForward(time.Now().UTC(), ing.resolution)
  330. }
  331. }
  332. ing.lastRun = time.Now().UTC()
  333. ing.runs++
  334. ticker.TickIn(ing.refreshRate)
  335. }
  336. }
  337. func (ing *CustomCostIngestor) expandCoverage(window opencost.Window, plugin string) {
  338. if window.IsOpen() {
  339. return
  340. }
  341. ing.coverageLock.Lock()
  342. defer ing.coverageLock.Unlock()
  343. if _, hasCoverage := ing.coverage[plugin]; !hasCoverage {
  344. ing.coverage[plugin] = window.Clone()
  345. } else {
  346. // expand existing coverage
  347. ing.coverage[plugin] = ing.coverage[plugin].ExpandStart(*window.Start())
  348. ing.coverage[plugin] = ing.coverage[plugin].ExpandEnd(*window.End())
  349. }
  350. }