controller.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package exporter
  2. import (
  3. "reflect"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/opencost"
  7. "github.com/opencost/opencost/core/pkg/source"
  8. "github.com/opencost/opencost/core/pkg/util/atomic"
  9. )
  10. // ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
  11. // to regularly compute the data for the current resolution and export it on a specific interval.
  12. type ComputeExportController[T any] struct {
  13. runState atomic.AtomicRunState
  14. source ComputeSource[T]
  15. exporter Exporter[T]
  16. resolution time.Duration
  17. typeName string
  18. }
  19. // NewComputeExportController creates a new `ComputeExportController[T]` instance.
  20. func NewComputeExportController[T any](source ComputeSource[T], exporter Exporter[T], resolution time.Duration) *ComputeExportController[T] {
  21. return &ComputeExportController[T]{
  22. source: source,
  23. resolution: resolution,
  24. exporter: exporter,
  25. typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
  26. }
  27. }
  28. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  29. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  30. // already running.
  31. func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
  32. // Before we attempt to start, we must ensure we are not in a stopping state
  33. cd.runState.WaitForReset()
  34. // This will atomically check the current state to ensure we can run, then advances the state.
  35. // If the state is already started, it will return false.
  36. if !cd.runState.Start() {
  37. return false
  38. }
  39. // our run state is advanced, let's execute our action on the interval
  40. // spawn a new goroutine which will loop and wait the interval each iteration
  41. go func() {
  42. for {
  43. // use a select statement to receive whichever channel receives data first
  44. select {
  45. // if our stop channel receives data, it means we have explicitly called
  46. // Stop(), and must reset our AtomicRunState to it's initial idle state
  47. case <-cd.runState.OnStop():
  48. cd.runState.Reset()
  49. return // exit go routine
  50. // After our interval elapses, fall through
  51. case <-time.After(interval):
  52. }
  53. start := time.Now().UTC().Truncate(cd.resolution)
  54. end := start.Add(cd.resolution)
  55. log.Debugf("[%s] Reporting for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  56. if !cd.source.CanCompute(start, end) {
  57. log.Errorf("[%s] Cannot compute window: [Start: %s, End: %s]", cd.typeName, start, end)
  58. continue
  59. }
  60. set, err := cd.source.Compute(start, end, cd.resolution)
  61. // If a NoDataError or ErrorCollection is returned, we expect that an empty set will
  62. // also be returned. Like an EOF error, this is an expected state
  63. // and indicates that we should still Insert and Save.
  64. if err != nil && !source.IsNoDataError(err) && !source.IsErrorCollection(err) {
  65. continue
  66. }
  67. // Check ErrorCollection to set Warnings and Errors
  68. if source.IsErrorCollection(err) {
  69. c := err.(source.QueryErrorCollection)
  70. errors, warnings := c.ToErrorAndWarningStrings()
  71. logErrors(start, end, warnings, errors)
  72. continue
  73. }
  74. err = cd.exporter.Export(opencost.NewClosedWindow(start, end), set)
  75. if err != nil {
  76. log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
  77. }
  78. }
  79. }()
  80. return true
  81. }
  82. // Stops the compute processing loop
  83. func (cd *ComputeExportController[T]) Stop() {
  84. cd.runState.Stop()
  85. }
  86. // temporary
  87. func logErrors(start, end time.Time, warnings []string, errors []string) {
  88. }