controller.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package exporter
  2. import (
  3. "reflect"
  4. "strings"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/core/pkg/util/atomic"
  10. "github.com/opencost/opencost/core/pkg/util/timeutil"
  11. )
  12. // ExportController is a controller interface that is responsible for exporting data on a specific interval.
  13. type ExportController interface {
  14. // Name returns the name of the controller
  15. Name() string
  16. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  17. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  18. // already running.
  19. Start(interval time.Duration) bool
  20. // Stops the compute processing loop
  21. Stop()
  22. }
  23. // EventExportController[T] is used to export timestamped events of type T on a specific interval.
  24. type EventExportController[T any] struct {
  25. runState atomic.AtomicRunState
  26. source ExportSource[T]
  27. exporter Exporter[T]
  28. typeName string
  29. }
  30. // NewEventExportController creates a new `EventExportController[T]` instance which is used to export timestamped events of type T
  31. // on a specific interval.
  32. func NewEventExportController[T any](source ExportSource[T], exporter Exporter[T]) *EventExportController[T] {
  33. return &EventExportController[T]{
  34. source: source,
  35. exporter: exporter,
  36. typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
  37. }
  38. }
  39. // Name returns the name of the controller, which is the name of the T-type
  40. func (cd *EventExportController[T]) Name() string {
  41. return cd.typeName
  42. }
  43. // Start starts a background export loop, which will create a new event instance for the current minute-truncated time
  44. // and export it on the provided interval. This function will return `true` if the loop was started successfully, and
  45. // `false` if it was already running.
  46. func (cd *EventExportController[T]) Start(interval time.Duration) bool {
  47. cd.runState.WaitForReset()
  48. if !cd.runState.Start() {
  49. return false
  50. }
  51. go func() {
  52. for {
  53. select {
  54. case <-cd.runState.OnStop():
  55. cd.runState.Reset()
  56. return // exit go routine
  57. case <-time.After(interval):
  58. }
  59. // truncate the time to the minute to ensure broad enough coverage for event exports
  60. t := time.Now().UTC().Truncate(time.Second)
  61. err := cd.exporter.Export(cd.source.Make(t))
  62. if err != nil {
  63. log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
  64. }
  65. }
  66. }()
  67. return true
  68. }
  69. // Stops the export loop
  70. func (cd *EventExportController[T]) Stop() {
  71. cd.runState.Stop()
  72. }
  73. // ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
  74. // to regularly compute the data for the current resolution and export it on a specific interval.
  75. type ComputeExportController[T any] struct {
  76. runState atomic.AtomicRunState
  77. source ComputeSource[T]
  78. exporter ComputeExporter[T]
  79. resolution time.Duration
  80. sourceResolution time.Duration
  81. typeName string
  82. }
  83. // NewComputeExportController creates a new `ComputeExportController[T]` instance.
  84. func NewComputeExportController[T any](
  85. source ComputeSource[T],
  86. exporter ComputeExporter[T],
  87. sourceResolution time.Duration,
  88. ) *ComputeExportController[T] {
  89. return &ComputeExportController[T]{
  90. source: source,
  91. resolution: exporter.Resolution(),
  92. sourceResolution: sourceResolution,
  93. exporter: exporter,
  94. typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
  95. }
  96. }
  97. // Name returns the name of the controller, which is a combination of the type name and the resolution
  98. func (cd *ComputeExportController[T]) Name() string {
  99. return cd.typeName + "-" + timeutil.FormatStoreResolution(cd.resolution)
  100. }
  101. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  102. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  103. // already running.
  104. func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
  105. // Before we attempt to start, we must ensure we are not in a stopping state
  106. cd.runState.WaitForReset()
  107. // This will atomically check the current state to ensure we can run, then advances the state.
  108. // If the state is already started, it will return false.
  109. if !cd.runState.Start() {
  110. return false
  111. }
  112. // our run state is advanced, let's execute our action on the interval
  113. // spawn a new goroutine which will loop and wait the interval each iteration
  114. go func() {
  115. for {
  116. // use a select statement to receive whichever channel receives data first
  117. select {
  118. // if our stop channel receives data, it means we have explicitly called
  119. // Stop(), and must reset our AtomicRunState to it's initial idle state
  120. case <-cd.runState.OnStop():
  121. cd.runState.Reset()
  122. return // exit go routine
  123. // After our interval elapses, fall through
  124. case <-time.After(interval):
  125. }
  126. start := time.Now().UTC().Truncate(cd.resolution)
  127. end := start.Add(cd.resolution)
  128. log.Debugf("[%s] Reporting for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  129. if !cd.source.CanCompute(start, end) {
  130. log.Errorf("[%s] Cannot compute window: [Start: %s, End: %s]", cd.typeName, start, end)
  131. continue
  132. }
  133. set, err := cd.source.Compute(start, end, cd.sourceResolution)
  134. // If a NoDataError or ErrorCollection is returned, we expect that an empty set will
  135. // also be returned. Like an EOF error, this is an expected state
  136. // and indicates that we should still Insert and Save.
  137. if err != nil && !source.IsNoDataError(err) && !source.IsErrorCollection(err) {
  138. log.Errorf("[%s] Error during Compute: %s", cd.typeName, err)
  139. continue
  140. }
  141. // Check ErrorCollection to set Warnings and Errors
  142. if source.IsErrorCollection(err) {
  143. c := err.(source.QueryErrorCollection)
  144. errors, warnings := c.ToErrorAndWarningStrings()
  145. cd.logErrors(start, end, warnings, errors)
  146. continue
  147. }
  148. log.Debugf("[%s] Exporting data for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  149. err = cd.exporter.Export(opencost.NewClosedWindow(start, end), set)
  150. if err != nil {
  151. log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
  152. }
  153. }
  154. }()
  155. return true
  156. }
  157. // Stops the compute processing loop
  158. func (cd *ComputeExportController[T]) Stop() {
  159. cd.runState.Stop()
  160. }
  161. // temporary
  162. func (cd *ComputeExportController[T]) logErrors(start, end time.Time, warnings []string, errors []string) {
  163. for _, w := range warnings {
  164. log.Warnf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), w)
  165. }
  166. for _, e := range errors {
  167. log.Errorf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), e)
  168. }
  169. }
  170. type ComputeExportControllerGroup[T any] struct {
  171. controllers []*ComputeExportController[T]
  172. }
  173. func NewComputeExportControllerGroup[T any](controllers ...*ComputeExportController[T]) *ComputeExportControllerGroup[T] {
  174. return &ComputeExportControllerGroup[T]{controllers: controllers}
  175. }
  176. func (g *ComputeExportControllerGroup[T]) Name() string {
  177. var sb strings.Builder
  178. sb.WriteRune('[')
  179. for i, c := range g.controllers {
  180. if i > 0 {
  181. sb.WriteRune('/')
  182. }
  183. sb.WriteString(c.Name())
  184. }
  185. sb.WriteRune(']')
  186. return sb.String()
  187. }
  188. func (g *ComputeExportControllerGroup[T]) Start(interval time.Duration) bool {
  189. for _, c := range g.controllers {
  190. if !c.Start(interval) {
  191. return false
  192. }
  193. }
  194. return true
  195. }
  196. func (g *ComputeExportControllerGroup[T]) Stop() {
  197. for _, c := range g.controllers {
  198. c.Stop()
  199. }
  200. }