controller.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package exporter
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/opencost"
  9. "github.com/opencost/opencost/core/pkg/source"
  10. "github.com/opencost/opencost/core/pkg/util/atomic"
  11. "github.com/opencost/opencost/core/pkg/util/timeutil"
  12. )
  13. // ExportController is a controller interface that is responsible for exporting data on a specific interval.
  14. type ExportController interface {
  15. // Name returns the name of the controller
  16. Name() string
  17. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  18. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  19. // already running.
  20. Start(interval time.Duration) bool
  21. // Stops the compute processing loop
  22. Stop()
  23. }
  24. // EventExportController[T] is used to export timestamped events of type T on a specific interval.
  25. type EventExportController[T any] struct {
  26. runState atomic.AtomicRunState
  27. source ExportSource[T]
  28. exporter Exporter[T]
  29. typeName string
  30. }
  31. // NewEventExportController creates a new `EventExportController[T]` instance which is used to export timestamped events of type T
  32. // on a specific interval.
  33. func NewEventExportController[T any](source ExportSource[T], exporter Exporter[T]) *EventExportController[T] {
  34. return &EventExportController[T]{
  35. source: source,
  36. exporter: exporter,
  37. typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
  38. }
  39. }
  40. // Name returns the name of the controller, which is the name of the T-type
  41. func (cd *EventExportController[T]) Name() string {
  42. return cd.typeName
  43. }
  44. // Start starts a background export loop, which will create a new event instance for the current minute-truncated time
  45. // and export it on the provided interval. This function will return `true` if the loop was started successfully, and
  46. // `false` if it was already running.
  47. func (cd *EventExportController[T]) Start(interval time.Duration) bool {
  48. cd.runState.WaitForReset()
  49. if !cd.runState.Start() {
  50. return false
  51. }
  52. go func() {
  53. for {
  54. select {
  55. case <-cd.runState.OnStop():
  56. cd.runState.Reset()
  57. return // exit go routine
  58. case <-time.After(interval):
  59. }
  60. // truncate the time to the minute to ensure broad enough coverage for event exports
  61. t := time.Now().UTC().Truncate(time.Second)
  62. err := cd.exporter.Export(cd.source.Make(t))
  63. if err != nil {
  64. log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
  65. }
  66. }
  67. }()
  68. return true
  69. }
  70. // Stops the export loop
  71. func (cd *EventExportController[T]) Stop() {
  72. cd.runState.Stop()
  73. }
  74. // ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
  75. // to regularly compute the data for the current resolution and export it on a specific interval.
  76. type ComputeExportController[T any] struct {
  77. runState atomic.AtomicRunState
  78. source ComputeSource[T]
  79. exporter ComputeExporter[T]
  80. resolution time.Duration
  81. sourceResolution time.Duration
  82. lastExport time.Time
  83. typeName string
  84. }
  85. // NewComputeExportController creates a new `ComputeExportController[T]` instance.
  86. func NewComputeExportController[T any](
  87. source ComputeSource[T],
  88. exporter ComputeExporter[T],
  89. sourceResolution time.Duration,
  90. ) *ComputeExportController[T] {
  91. return &ComputeExportController[T]{
  92. source: source,
  93. resolution: exporter.Resolution(),
  94. sourceResolution: sourceResolution,
  95. exporter: exporter,
  96. typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
  97. }
  98. }
  99. // Name returns the name of the controller, which is a combination of the type name and the resolution
  100. func (cd *ComputeExportController[T]) Name() string {
  101. return cd.typeName + "-" + timeutil.FormatStoreResolution(cd.resolution)
  102. }
  103. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  104. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  105. // already running.
  106. func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
  107. // Before we attempt to start, we must ensure we are not in a stopping state
  108. cd.runState.WaitForReset()
  109. // This will atomically check the current state to ensure we can run, then advances the state.
  110. // If the state is already started, it will return false.
  111. if !cd.runState.Start() {
  112. return false
  113. }
  114. // our run state is advanced, let's execute our action on the interval
  115. // spawn a new goroutine which will loop and wait the interval each iteration
  116. go func() {
  117. for {
  118. // use a select statement to receive whichever channel receives data first
  119. select {
  120. // if our stop channel receives data, it means we have explicitly called
  121. // Stop(), and must reset our AtomicRunState to it's initial idle state
  122. case <-cd.runState.OnStop():
  123. cd.runState.Reset()
  124. return // exit go routine
  125. // After our interval elapses, fall through
  126. case <-time.After(interval):
  127. }
  128. now := time.Now().UTC()
  129. windows := cd.exportWindowsFor(now)
  130. for _, window := range windows {
  131. err := cd.export(window)
  132. if err != nil {
  133. // Check ErrorCollection to set Warnings and Errors
  134. if source.IsErrorCollection(err) {
  135. c := err.(source.QueryErrorCollection)
  136. errors, warnings := c.ToErrorAndWarningStrings()
  137. cd.logErrors(window, warnings, errors)
  138. continue
  139. }
  140. log.Errorf("[%s] %s", cd.typeName, err)
  141. } else {
  142. cd.lastExport = now
  143. }
  144. }
  145. }
  146. }()
  147. return true
  148. }
  149. // exportWindows uses the last export time to determine the current time windows to
  150. // export. This will, at most, return 2 windows: the previous resolution window and
  151. // the current resolution window.
  152. func (cd *ComputeExportController[T]) exportWindowsFor(now time.Time) []opencost.Window {
  153. start := now.Truncate(cd.resolution)
  154. end := start.Add(cd.resolution)
  155. if cd.lastExport.IsZero() {
  156. return []opencost.Window{
  157. opencost.NewClosedWindow(start, end),
  158. }
  159. }
  160. lastStart := cd.lastExport.Truncate(cd.resolution)
  161. if lastStart.Equal(start) {
  162. return []opencost.Window{
  163. opencost.NewClosedWindow(start, end),
  164. }
  165. }
  166. lastEnd := lastStart.Add(cd.resolution)
  167. // we've identified that the last export window is not the same as the current,
  168. // so we should export the previous resolution window as well as the current one
  169. return []opencost.Window{
  170. opencost.NewClosedWindow(lastStart, lastEnd),
  171. opencost.NewClosedWindow(start, end),
  172. }
  173. }
  174. // export computes and exports the data for a given time window
  175. func (cd *ComputeExportController[T]) export(window opencost.Window) error {
  176. if window.IsOpen() {
  177. return fmt.Errorf("window is open: %s", window.String())
  178. }
  179. start, end := *window.Start(), *window.End()
  180. log.Debugf("[%s] Reporting for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  181. if !cd.source.CanCompute(start, end) {
  182. return fmt.Errorf("cannot compute window: [Start: %s, End: %s]", start, end)
  183. }
  184. set, err := cd.source.Compute(start, end, cd.sourceResolution)
  185. // all errors but NoDataError are considered a halt to the export
  186. if err != nil && !source.IsNoDataError(err) {
  187. return err
  188. }
  189. log.Debugf("[%s] Exporting data for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  190. err = cd.exporter.Export(window, set)
  191. if err != nil {
  192. return fmt.Errorf("write error: %w", err)
  193. }
  194. return nil
  195. }
  196. // Stops the compute processing loop
  197. func (cd *ComputeExportController[T]) Stop() {
  198. cd.runState.Stop()
  199. }
  200. // temporary
  201. func (cd *ComputeExportController[T]) logErrors(window opencost.Window, warnings []string, errors []string) {
  202. start, end := window.Start(), window.End()
  203. for _, w := range warnings {
  204. log.Warnf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), w)
  205. }
  206. for _, e := range errors {
  207. log.Errorf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), e)
  208. }
  209. }
  210. type ComputeExportControllerGroup[T any] struct {
  211. controllers []*ComputeExportController[T]
  212. }
  213. func NewComputeExportControllerGroup[T any](controllers ...*ComputeExportController[T]) *ComputeExportControllerGroup[T] {
  214. return &ComputeExportControllerGroup[T]{controllers: controllers}
  215. }
  216. func (g *ComputeExportControllerGroup[T]) Name() string {
  217. var sb strings.Builder
  218. sb.WriteRune('[')
  219. for i, c := range g.controllers {
  220. if i > 0 {
  221. sb.WriteRune('/')
  222. }
  223. sb.WriteString(c.Name())
  224. }
  225. sb.WriteRune(']')
  226. return sb.String()
  227. }
  228. func (g *ComputeExportControllerGroup[T]) Start(interval time.Duration) bool {
  229. for _, c := range g.controllers {
  230. if !c.Start(interval) {
  231. return false
  232. }
  233. }
  234. return true
  235. }
  236. func (g *ComputeExportControllerGroup[T]) Stop() {
  237. for _, c := range g.controllers {
  238. c.Stop()
  239. }
  240. }