controller.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package exporter
  2. import (
  3. "reflect"
  4. "strings"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/core/pkg/util/atomic"
  10. "github.com/opencost/opencost/core/pkg/util/timeutil"
  11. )
  12. // ExportController is a controller interface that is responsible for exporting data on a specific interval.
  13. type ExportController interface {
  14. // Name returns the name of the controller
  15. Name() string
  16. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  17. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  18. // already running.
  19. Start(interval time.Duration) bool
  20. // Stops the compute processing loop
  21. Stop()
  22. }
  23. // ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
  24. // to regularly compute the data for the current resolution and export it on a specific interval.
  25. type ComputeExportController[T any] struct {
  26. runState atomic.AtomicRunState
  27. source ComputeSource[T]
  28. exporter Exporter[T]
  29. resolution time.Duration
  30. sourceResolution time.Duration
  31. typeName string
  32. }
  33. // NewComputeExportController creates a new `ComputeExportController[T]` instance.
  34. func NewComputeExportController[T any](
  35. source ComputeSource[T],
  36. exporter Exporter[T],
  37. sourceResolution time.Duration,
  38. ) *ComputeExportController[T] {
  39. return &ComputeExportController[T]{
  40. source: source,
  41. resolution: exporter.Resolution(),
  42. sourceResolution: sourceResolution,
  43. exporter: exporter,
  44. typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
  45. }
  46. }
  47. // Name returns the name of the controller, which is a combination of the type name and the resolution
  48. func (cd *ComputeExportController[T]) Name() string {
  49. return cd.typeName + "-" + timeutil.FormatStoreResolution(cd.resolution)
  50. }
  51. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  52. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  53. // already running.
  54. func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
  55. // Before we attempt to start, we must ensure we are not in a stopping state
  56. cd.runState.WaitForReset()
  57. // This will atomically check the current state to ensure we can run, then advances the state.
  58. // If the state is already started, it will return false.
  59. if !cd.runState.Start() {
  60. return false
  61. }
  62. // our run state is advanced, let's execute our action on the interval
  63. // spawn a new goroutine which will loop and wait the interval each iteration
  64. go func() {
  65. for {
  66. // use a select statement to receive whichever channel receives data first
  67. select {
  68. // if our stop channel receives data, it means we have explicitly called
  69. // Stop(), and must reset our AtomicRunState to it's initial idle state
  70. case <-cd.runState.OnStop():
  71. cd.runState.Reset()
  72. return // exit go routine
  73. // After our interval elapses, fall through
  74. case <-time.After(interval):
  75. }
  76. start := time.Now().UTC().Truncate(cd.resolution)
  77. end := start.Add(cd.resolution)
  78. log.Debugf("[%s] Reporting for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  79. if !cd.source.CanCompute(start, end) {
  80. log.Errorf("[%s] Cannot compute window: [Start: %s, End: %s]", cd.typeName, start, end)
  81. continue
  82. }
  83. set, err := cd.source.Compute(start, end, cd.sourceResolution)
  84. // If a NoDataError or ErrorCollection is returned, we expect that an empty set will
  85. // also be returned. Like an EOF error, this is an expected state
  86. // and indicates that we should still Insert and Save.
  87. if err != nil && !source.IsNoDataError(err) && !source.IsErrorCollection(err) {
  88. log.Errorf("[%s] Error during Compute: %s", cd.typeName, err)
  89. continue
  90. }
  91. // Check ErrorCollection to set Warnings and Errors
  92. if source.IsErrorCollection(err) {
  93. c := err.(source.QueryErrorCollection)
  94. errors, warnings := c.ToErrorAndWarningStrings()
  95. cd.logErrors(start, end, warnings, errors)
  96. continue
  97. }
  98. log.Debugf("[%s] Exporting data for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  99. err = cd.exporter.Export(opencost.NewClosedWindow(start, end), set)
  100. if err != nil {
  101. log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
  102. }
  103. }
  104. }()
  105. return true
  106. }
  107. // Stops the compute processing loop
  108. func (cd *ComputeExportController[T]) Stop() {
  109. cd.runState.Stop()
  110. }
  111. // temporary
  112. func (cd *ComputeExportController[T]) logErrors(start, end time.Time, warnings []string, errors []string) {
  113. for _, w := range warnings {
  114. log.Warnf("[%s] %s", cd.typeName, w)
  115. }
  116. for _, e := range errors {
  117. log.Errorf("[%s] %s", cd.typeName, e)
  118. }
  119. }
  120. type ComputeExportControllerGroup[T any] struct {
  121. controllers []*ComputeExportController[T]
  122. }
  123. func NewComputeExportControllerGroup[T any](controllers ...*ComputeExportController[T]) *ComputeExportControllerGroup[T] {
  124. return &ComputeExportControllerGroup[T]{controllers: controllers}
  125. }
  126. func (g *ComputeExportControllerGroup[T]) Name() string {
  127. var sb strings.Builder
  128. sb.WriteRune('[')
  129. for i, c := range g.controllers {
  130. if i > 0 {
  131. sb.WriteRune('/')
  132. }
  133. sb.WriteString(c.Name())
  134. }
  135. sb.WriteRune(']')
  136. return sb.String()
  137. }
  138. func (g *ComputeExportControllerGroup[T]) Start(interval time.Duration) bool {
  139. for _, c := range g.controllers {
  140. if !c.Start(interval) {
  141. return false
  142. }
  143. }
  144. return true
  145. }
  146. func (g *ComputeExportControllerGroup[T]) Stop() {
  147. for _, c := range g.controllers {
  148. c.Stop()
  149. }
  150. }