controller.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package exporter
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/opencost"
  9. "github.com/opencost/opencost/core/pkg/source"
  10. "github.com/opencost/opencost/core/pkg/util/atomic"
  11. "github.com/opencost/opencost/core/pkg/util/timeutil"
  12. "github.com/opencost/opencost/core/pkg/util/typeutil"
  13. )
  14. // ExportController is a controller interface that is responsible for exporting data on a specific interval.
  15. type ExportController interface {
  16. // Name returns the name of the controller
  17. Name() string
  18. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  19. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  20. // already running.
  21. Start(interval time.Duration) bool
  22. // Stops the compute processing loop
  23. Stop()
  24. }
  25. // EventExportController[T] is used to export timestamped events of type T on a specific interval.
  26. type EventExportController[T any] struct {
  27. runState atomic.AtomicRunState
  28. source ExportSource[T]
  29. exporter EventExporter[T]
  30. typeName string
  31. }
  32. // NewEventExportController creates a new `EventExportController[T]` instance which is used to export timestamped events of type T
  33. // on a specific interval.
  34. func NewEventExportController[T any](source ExportSource[T], exporter EventExporter[T]) *EventExportController[T] {
  35. return &EventExportController[T]{
  36. source: source,
  37. exporter: exporter,
  38. typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
  39. }
  40. }
  41. // Name returns the name of the controller, which is the name of the T-type
  42. func (cd *EventExportController[T]) Name() string {
  43. return cd.typeName
  44. }
  45. // Start starts a background export loop, which will create a new event instance for the current minute-truncated time
  46. // and export it on the provided interval. This function will return `true` if the loop was started successfully, and
  47. // `false` if it was already running.
  48. func (cd *EventExportController[T]) Start(interval time.Duration) bool {
  49. cd.runState.WaitForReset()
  50. if !cd.runState.Start() {
  51. return false
  52. }
  53. go func() {
  54. for {
  55. select {
  56. case <-cd.runState.OnStop():
  57. cd.runState.Reset()
  58. return // exit go routine
  59. case <-time.After(interval):
  60. }
  61. // truncate the time to the second to ensure broad enough coverage for event exports
  62. t := time.Now().UTC().Truncate(time.Second)
  63. evt := cd.source.Make(t)
  64. if evt == nil {
  65. log.Debugf("[%s] No event data to export", cd.typeName)
  66. continue
  67. }
  68. err := cd.exporter.Export(t, evt)
  69. if err != nil {
  70. log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
  71. }
  72. }
  73. }()
  74. return true
  75. }
  76. // Stops the export loop
  77. func (cd *EventExportController[T]) Stop() {
  78. cd.runState.Stop()
  79. }
  80. // ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
  81. // to regularly compute the data for the current resolution and export it on a specific interval.
  82. type ComputeExportController[T any] struct {
  83. runState atomic.AtomicRunState
  84. source ComputeSource[T]
  85. exporter ComputeExporter[T]
  86. resolution time.Duration
  87. lastExport time.Time
  88. typeName string
  89. }
  90. // NewComputeExportController creates a new `ComputeExportController[T]` instance.
  91. func NewComputeExportController[T any](
  92. source ComputeSource[T],
  93. exporter ComputeExporter[T],
  94. resolution time.Duration,
  95. ) *ComputeExportController[T] {
  96. return &ComputeExportController[T]{
  97. source: source,
  98. resolution: resolution,
  99. exporter: exporter,
  100. typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
  101. }
  102. }
  103. // Name returns the name of the controller, which is a combination of the type name and the resolution
  104. func (cd *ComputeExportController[T]) Name() string {
  105. return cd.typeName + "-" + timeutil.FormatStoreResolution(cd.resolution)
  106. }
  107. // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
  108. // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
  109. // already running.
  110. func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
  111. // TODO remove
  112. log.Infof("[KM] ComputeExportController.Start(%s)", interval)
  113. // Before we attempt to start, we must ensure we are not in a stopping state
  114. cd.runState.WaitForReset()
  115. // This will atomically check the current state to ensure we can run, then advances the state.
  116. // If the state is already started, it will return false.
  117. if !cd.runState.Start() {
  118. return false
  119. }
  120. // TODO remove
  121. log.Infof("[KM] ComputeExportController.Start(%s): running", interval)
  122. // our run state is advanced, let's execute our action on the interval
  123. // spawn a new goroutine which will loop and wait the interval each iteration
  124. go func() {
  125. for {
  126. // TODO remove
  127. log.Infof("[KM] ComputeExportController.Start(%s): looping", interval)
  128. // use a select statement to receive whichever channel receives data first
  129. select {
  130. // if our stop channel receives data, it means we have explicitly called
  131. // Stop(), and must reset our AtomicRunState to it's initial idle state
  132. case <-cd.runState.OnStop():
  133. cd.runState.Reset()
  134. return // exit go routine
  135. // After our interval elapses, fall through
  136. case <-time.After(interval):
  137. }
  138. now := time.Now().UTC()
  139. windows := cd.exportWindowsFor(now)
  140. // TODO remove
  141. log.Infof("[KM] ComputeExportController.Start(%s): %d windows", interval, len(windows))
  142. for _, win := range windows {
  143. log.Infof("[KM] ComputeExportController.Start(%s): %d windows: %s", interval, len(windows), win)
  144. }
  145. for _, window := range windows {
  146. err := cd.export(window)
  147. if err != nil {
  148. // Check ErrorCollection to set Warnings and Errors
  149. if source.IsErrorCollection(err) {
  150. c := err.(source.QueryErrorCollection)
  151. errors, warnings := c.ToErrorAndWarningStrings()
  152. cd.logErrors(window, warnings, errors)
  153. continue
  154. }
  155. log.Errorf("[%s] %s", cd.typeName, err)
  156. } else {
  157. cd.lastExport = now
  158. }
  159. }
  160. }
  161. }()
  162. return true
  163. }
  164. // exportWindows uses the last export time to determine the current time windows to
  165. // export. This will, at most, return 2 windows: the previous resolution window and
  166. // the current resolution window.
  167. func (cd *ComputeExportController[T]) exportWindowsFor(now time.Time) []opencost.Window {
  168. start := now.Truncate(cd.resolution)
  169. end := start.Add(cd.resolution)
  170. if cd.lastExport.IsZero() {
  171. return []opencost.Window{
  172. opencost.NewClosedWindow(start, end),
  173. }
  174. }
  175. lastStart := cd.lastExport.Truncate(cd.resolution)
  176. if lastStart.Equal(start) {
  177. return []opencost.Window{
  178. opencost.NewClosedWindow(start, end),
  179. }
  180. }
  181. lastEnd := lastStart.Add(cd.resolution)
  182. // we've identified that the last export window is not the same as the current,
  183. // so we should export the previous resolution window as well as the current one
  184. return []opencost.Window{
  185. opencost.NewClosedWindow(lastStart, lastEnd),
  186. opencost.NewClosedWindow(start, end),
  187. }
  188. }
  189. // export computes and exports the data for a given time window
  190. func (cd *ComputeExportController[T]) export(window opencost.Window) error {
  191. if window.IsOpen() {
  192. return fmt.Errorf("window is open: %s", window.String())
  193. }
  194. start, end := *window.Start(), *window.End()
  195. log.Debugf("[%s] Reporting for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  196. if !cd.source.CanCompute(start, end) {
  197. return fmt.Errorf("cannot compute window: [Start: %s, End: %s]", start, end)
  198. }
  199. set, err := cd.source.Compute(start, end)
  200. // all errors but NoDataError are considered a halt to the export
  201. if err != nil && !source.IsNoDataError(err) {
  202. return err
  203. }
  204. log.Debugf("[%s] Exporting data for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
  205. err = cd.exporter.Export(window, set)
  206. if err != nil {
  207. return fmt.Errorf("write error: %w", err)
  208. }
  209. return nil
  210. }
  211. // Stops the compute processing loop
  212. func (cd *ComputeExportController[T]) Stop() {
  213. cd.runState.Stop()
  214. }
  215. // temporary
  216. func (cd *ComputeExportController[T]) logErrors(window opencost.Window, warnings []string, errors []string) {
  217. start, end := window.Start(), window.End()
  218. for _, w := range warnings {
  219. log.Warnf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), w)
  220. }
  221. for _, e := range errors {
  222. log.Errorf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), e)
  223. }
  224. }
  225. type ComputeExportControllerGroup[T any] struct {
  226. controllers []*ComputeExportController[T]
  227. }
  228. func NewComputeExportControllerGroup[T any](controllers ...*ComputeExportController[T]) *ComputeExportControllerGroup[T] {
  229. return &ComputeExportControllerGroup[T]{controllers: controllers}
  230. }
  231. func (g *ComputeExportControllerGroup[T]) Name() string {
  232. var sb strings.Builder
  233. sb.WriteRune('[')
  234. for i, c := range g.controllers {
  235. if i > 0 {
  236. sb.WriteRune('/')
  237. }
  238. sb.WriteString(c.Name())
  239. }
  240. sb.WriteRune(']')
  241. return sb.String()
  242. }
  243. func (g *ComputeExportControllerGroup[T]) Start(interval time.Duration) bool {
  244. if len(g.controllers) == 0 {
  245. log.Warnf("ComputeExportControllerGroup[%s] has no controllers to start", typeutil.TypeOf[T]())
  246. return false
  247. }
  248. for _, c := range g.controllers {
  249. if !c.Start(interval) {
  250. return false
  251. }
  252. }
  253. return true
  254. }
  255. func (g *ComputeExportControllerGroup[T]) Stop() {
  256. for _, c := range g.controllers {
  257. c.Stop()
  258. }
  259. }
  260. func (g *ComputeExportControllerGroup[T]) Resolutions() []time.Duration {
  261. resolutions := make([]time.Duration, 0, len(g.controllers))
  262. for _, c := range g.controllers {
  263. resolutions = append(resolutions, c.resolution)
  264. }
  265. return resolutions
  266. }