ingestor.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package customcost
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/hashicorp/go-plugin"
  9. "google.golang.org/protobuf/types/known/durationpb"
  10. "google.golang.org/protobuf/types/known/timestamppb"
  11. "github.com/opencost/opencost/core/pkg/errors"
  12. "github.com/opencost/opencost/core/pkg/log"
  13. "github.com/opencost/opencost/core/pkg/model/pb"
  14. "github.com/opencost/opencost/core/pkg/opencost"
  15. ocplugin "github.com/opencost/opencost/core/pkg/plugin"
  16. "github.com/opencost/opencost/core/pkg/util/stringutil"
  17. "github.com/opencost/opencost/core/pkg/util/timeutil"
  18. "github.com/opencost/opencost/pkg/env"
  19. )
  20. // pluginConnector abstracts *plugin.Client so buildSingleDomain can be tested with mocks.
  21. type pluginConnector interface {
  22. Client() (plugin.ClientProtocol, error)
  23. Kill()
  24. }
  25. // IngestorStatus includes diagnostic values for a given Ingestor
  26. type IngestorStatus struct {
  27. Created time.Time
  28. LastRun time.Time
  29. NextRun time.Time
  30. Runs int
  31. Coverage map[string]opencost.Window
  32. RefreshRate time.Duration
  33. }
  34. // CustomCost IngestorConfig is a configuration struct for an Ingestor
  35. type CustomCostIngestorConfig struct {
  36. MonthToDateRunInterval int
  37. HourlyDuration, DailyDuration time.Duration
  38. DailyQueryWindow, HourlyQueryWindow time.Duration
  39. PluginConfigDir, PluginExecutableDir string
  40. }
  41. // DefaultIngestorConfiguration retrieves an CustomCostIngestorConfig from env variables
  42. func DefaultIngestorConfiguration() CustomCostIngestorConfig {
  43. return CustomCostIngestorConfig{
  44. DailyDuration: timeutil.Day * time.Duration(env.GetCustomCost1dRetention()),
  45. HourlyDuration: time.Hour * time.Duration(env.GetCustomCost1hRetention()),
  46. DailyQueryWindow: timeutil.Day * time.Duration(env.GetCustomCostQueryWindowDays()),
  47. HourlyQueryWindow: time.Hour * time.Duration(env.GetCustomCostQueryWindowHours()),
  48. PluginConfigDir: env.GetPluginConfigDir(),
  49. PluginExecutableDir: env.GetPluginExecutableDir(),
  50. }
  51. }
  52. type CustomCostIngestor struct {
  53. key string
  54. config *CustomCostIngestorConfig
  55. repo Repository
  56. runID string
  57. lastRun time.Time
  58. runs int
  59. creationTime time.Time
  60. coverage map[string]opencost.Window
  61. coverageLock sync.Mutex
  62. isRunning atomic.Bool
  63. isStopping atomic.Bool
  64. exitBuildCh chan string
  65. exitRunCh chan string
  66. plugins map[string]pluginConnector
  67. pluginsLock sync.RWMutex
  68. resolution time.Duration
  69. refreshRate time.Duration
  70. }
  71. // NewCustomCostIngestor is an initializer for ingestor
  72. func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client, res time.Duration) (*CustomCostIngestor, error) {
  73. connectors := make(map[string]pluginConnector, len(plugins))
  74. for k, v := range plugins {
  75. connectors[k] = v
  76. }
  77. return newCustomCostIngestor(ingestorConfig, repo, connectors, res)
  78. }
  79. func newCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]pluginConnector, res time.Duration) (*CustomCostIngestor, error) {
  80. if repo == nil {
  81. return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
  82. }
  83. if ingestorConfig == nil {
  84. return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: config connot be nil")
  85. }
  86. key := ""
  87. for name := range plugins {
  88. key += "," + name
  89. }
  90. key = strings.TrimPrefix(key, ",")
  91. now := time.Now().UTC()
  92. return &CustomCostIngestor{
  93. key: key,
  94. config: ingestorConfig,
  95. repo: repo,
  96. creationTime: now,
  97. lastRun: now,
  98. coverage: map[string]opencost.Window{},
  99. plugins: plugins,
  100. resolution: res,
  101. refreshRate: res,
  102. }, nil
  103. }
  104. func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
  105. var targets []opencost.Window
  106. if ing.resolution == timeutil.Day {
  107. oldestDailyDate := time.Now().UTC().Add(-1 * ing.config.DailyDuration).Truncate(timeutil.Day)
  108. if !oldestDailyDate.After(start) {
  109. windows, err := opencost.GetWindows(start, end, timeutil.Day)
  110. if err != nil {
  111. log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  112. return
  113. }
  114. targets = windows
  115. }
  116. } else {
  117. oldestHourlyDate := time.Now().UTC().Add(-1 * ing.config.HourlyDuration).Truncate(time.Hour)
  118. if !oldestHourlyDate.After(start) {
  119. windows, err := opencost.GetWindows(start, end, time.Hour)
  120. if err != nil {
  121. log.Errorf("CustomCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
  122. return
  123. }
  124. targets = windows
  125. }
  126. }
  127. for _, window := range targets {
  128. allPluginsHave := true
  129. ing.pluginsLock.RLock()
  130. for domain := range ing.plugins {
  131. has, err2 := ing.repo.Has(*window.Start(), domain)
  132. if err2 != nil {
  133. log.Errorf("CustomCost[%s]: ingestor: error when loading window for plugin %s: %s", ing.key, domain, err2.Error())
  134. }
  135. if !has {
  136. allPluginsHave = false
  137. break
  138. }
  139. }
  140. ing.pluginsLock.RUnlock()
  141. if !allPluginsHave {
  142. ing.BuildWindow(*window.Start(), *window.End())
  143. } else {
  144. ing.pluginsLock.RLock()
  145. for domain := range ing.plugins {
  146. ing.expandCoverage(window, domain)
  147. }
  148. ing.pluginsLock.RUnlock()
  149. log.Debugf("CustomCost[%s]: ingestor: skipping build for window %s, coverage already exists", ing.key, window.String())
  150. }
  151. }
  152. }
  153. func (ing *CustomCostIngestor) BuildWindow(start, end time.Time) {
  154. ing.pluginsLock.RLock()
  155. for domain := range ing.plugins {
  156. ing.buildSingleDomain(start, end, domain)
  157. }
  158. ing.pluginsLock.RUnlock()
  159. }
  160. func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain string) {
  161. req := &pb.CustomCostRequest{
  162. Start: timestamppb.New(start),
  163. End: timestamppb.New(end),
  164. Resolution: durationpb.New(ing.resolution),
  165. }
  166. log.Infof("ingestor: building window %s for plugin %s", opencost.NewWindow(&start, &end), domain)
  167. // make RPC call via plugin
  168. ing.pluginsLock.RLock()
  169. pluginClient, found := ing.plugins[domain]
  170. ing.pluginsLock.RUnlock()
  171. if !found {
  172. log.Errorf("could not find plugin client for plugin %s. Did you initialize the plugin correctly?", domain)
  173. return
  174. }
  175. // connect the client
  176. rpcClient, err := pluginClient.Client()
  177. if err != nil {
  178. log.Errorf("error connecting client for plugin %s: %v", domain, err)
  179. return
  180. }
  181. // Request the plugin
  182. raw, err := rpcClient.Dispense("CustomCostSource")
  183. if err != nil {
  184. log.Errorf("error creating new plugin client for plugin %s: %v", domain, err)
  185. return
  186. }
  187. custCostSrc, ok := raw.(ocplugin.CustomCostSource)
  188. if !ok {
  189. log.Errorf("plugin %s returned invalid type: expected CustomCostSource, got %T", domain, raw)
  190. return
  191. }
  192. custCostResps := custCostSrc.GetCustomCosts(req)
  193. // loop through each customCostResponse, adding to repo
  194. for _, ccr := range custCostResps {
  195. // check for errors in response
  196. if len(ccr.Errors) > 0 {
  197. for _, errResp := range ccr.Errors {
  198. log.Errorf("error in getting custom costs for plugin %s: %v", domain, errResp)
  199. }
  200. log.Errorf("not adding any costs for window %v-%v on plugin %s", req.Start, req.End, domain)
  201. continue
  202. }
  203. log.Debugf("BuildWindow[%s]: GetCustomCost: writing custom costs for window %v-%v: %d", domain, ccr.Start, ccr.End, len(ccr.Costs))
  204. err2 := ing.repo.Put(ccr)
  205. if err2 != nil {
  206. log.Errorf("CustomCost[%s]: ingestor: failed to save Custom Cost Set with window %v-%v: %s", domain, ccr.Start, ccr.End, err2.Error())
  207. }
  208. ing.expandCoverage(opencost.NewClosedWindow(ccr.Start.AsTime(), ccr.End.AsTime()), domain)
  209. }
  210. }
  211. func (ing *CustomCostIngestor) Start(rebuild bool) {
  212. // If already running, log that and return.
  213. if !ing.isRunning.CompareAndSwap(false, true) {
  214. log.Infof("CustomCost: ingestor: is already running")
  215. return
  216. }
  217. ing.runID = stringutil.RandSeq(5)
  218. ing.exitBuildCh = make(chan string)
  219. ing.exitRunCh = make(chan string)
  220. // Build the store once, advancing backward in time from the earliest
  221. // point of coverage.
  222. go ing.build(rebuild)
  223. go ing.run()
  224. }
  225. func (ing *CustomCostIngestor) Stop() {
  226. // If already stopping, log that and return.
  227. if !ing.isStopping.CompareAndSwap(false, true) {
  228. log.Infof("CustomCost: ingestor: is already stopping")
  229. return
  230. }
  231. msg := "Stopping"
  232. // If the processes are running (and thus there are channels available for
  233. // stopping them) then stop all sub-processes (i.e. build and run)
  234. var wg sync.WaitGroup
  235. if ing.exitBuildCh != nil {
  236. wg.Add(1)
  237. go func() {
  238. defer wg.Done()
  239. ing.exitBuildCh <- msg
  240. }()
  241. }
  242. if ing.exitRunCh != nil {
  243. wg.Add(1)
  244. go func() {
  245. defer wg.Done()
  246. ing.exitRunCh <- msg
  247. }()
  248. }
  249. wg.Wait()
  250. // Kill all plugin client processes before returning
  251. ing.pluginsLock.Lock()
  252. for name, client := range ing.plugins {
  253. if client != nil {
  254. log.Debugf("CustomCost[%s]: ingestor: killing plugin process: %s", ing.key, name)
  255. client.Kill()
  256. }
  257. }
  258. ing.pluginsLock.Unlock()
  259. // Mark as no longer running so Start() can be called again if needed
  260. ing.isRunning.Store(false)
  261. ing.isStopping.Store(false)
  262. }
  263. // Status returns an IngestorStatus that describes the current state of the ingestor
  264. func (ing *CustomCostIngestor) Status() IngestorStatus {
  265. return IngestorStatus{
  266. Created: ing.creationTime,
  267. LastRun: ing.lastRun,
  268. NextRun: ing.lastRun.Add(ing.refreshRate).UTC(),
  269. Runs: ing.runs,
  270. Coverage: ing.coverage,
  271. RefreshRate: ing.refreshRate,
  272. }
  273. }
  274. func (ing *CustomCostIngestor) build(rebuild bool) {
  275. defer errors.HandlePanic()
  276. e := opencost.RoundBack(time.Now().UTC(), ing.resolution)
  277. s := e.Add(-ing.config.DailyDuration)
  278. if ing.resolution == time.Hour {
  279. s = e.Add(-ing.config.HourlyDuration)
  280. }
  281. // Profile the full Duration of the build time
  282. buildStart := time.Now()
  283. log.Infof("CustomCost[%s]: ingestor: build[%s]: Starting build back to %s in blocks of %s", ing.key, ing.runID, s, ing.resolution)
  284. // if rebuild is not specified then check for existing coverage on window
  285. if rebuild {
  286. ing.BuildWindow(s, e)
  287. } else {
  288. ing.LoadWindow(s, e)
  289. }
  290. log.Infof("CustomCost[%s]: ingestor: build[%s]: completed in %v", ing.key, ing.runID, time.Since(buildStart))
  291. // In order to be able to Stop, we have to wait on an exit message
  292. // here
  293. <-ing.exitBuildCh
  294. }
  295. func (ing *CustomCostIngestor) Rebuild(domain string) error {
  296. targetDur := ing.config.DailyDuration
  297. if ing.resolution == time.Hour {
  298. targetDur = ing.config.HourlyDuration
  299. }
  300. // Build as far back as the configures build Duration
  301. limit := opencost.RoundBack(time.Now().UTC().Add(-1*targetDur), ing.resolution)
  302. e := time.Now().UTC()
  303. ing.buildSingleDomain(limit, e, domain)
  304. return nil
  305. }
  306. func (ing *CustomCostIngestor) run() {
  307. defer errors.HandlePanic()
  308. ticker := timeutil.NewJobTicker()
  309. defer ticker.Close()
  310. ticker.TickIn(0)
  311. for {
  312. // If an exit instruction is received, break the run loop
  313. select {
  314. case <-ing.exitRunCh:
  315. log.Debugf("CustomCost[%s]: ingestor: Run[%s] exiting", ing.key, ing.runID)
  316. return
  317. case <-ticker.Ch:
  318. // Wait for next tick
  319. }
  320. queryWin := ing.config.DailyQueryWindow
  321. if ing.resolution == time.Hour {
  322. queryWin = ing.config.HourlyQueryWindow
  323. }
  324. // Start from the last covered time, minus the query window
  325. // this allows re-querying of data as the plugin providers' data may stabilize over time
  326. start := ing.lastRun
  327. start = start.Add(-1 * queryWin)
  328. // Round start time back to the nearest Resolution point in the past from the
  329. // last update to the QueryWindow
  330. s := opencost.RoundBack(start.UTC(), ing.resolution)
  331. e := s.Add(queryWin)
  332. // Start with a window of the configured Duration and starting on the given
  333. // start time. Do the following, repeating until the window reaches the
  334. // current time:
  335. // 1. Instruct builder to build window
  336. // 2. Move window forward one Resolution
  337. for time.Now().After(s) {
  338. profStart := time.Now()
  339. ing.BuildWindow(s, e)
  340. log.Debugf("CustomCost[%s]: ingestor: Run[%s]: completed %s in %v", ing.key, ing.runID, opencost.NewWindow(&s, &e), time.Since(profStart))
  341. s = s.Add(queryWin)
  342. e = e.Add(queryWin)
  343. // prevent builds into the future
  344. if e.After(time.Now().UTC()) {
  345. e = opencost.RoundForward(time.Now().UTC(), ing.resolution)
  346. }
  347. }
  348. ing.lastRun = time.Now().UTC()
  349. ing.runs++
  350. ticker.TickIn(ing.refreshRate)
  351. }
  352. }
  353. func (ing *CustomCostIngestor) expandCoverage(window opencost.Window, plugin string) {
  354. if window.IsOpen() {
  355. return
  356. }
  357. ing.coverageLock.Lock()
  358. defer ing.coverageLock.Unlock()
  359. if _, hasCoverage := ing.coverage[plugin]; !hasCoverage {
  360. ing.coverage[plugin] = window.Clone()
  361. } else {
  362. // expand existing coverage
  363. ing.coverage[plugin] = ing.coverage[plugin].ExpandStart(*window.Start())
  364. ing.coverage[plugin] = ing.coverage[plugin].ExpandEnd(*window.End())
  365. }
  366. }