controller.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package exporter
  2. import (
  3. "reflect"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/opencost"
  7. "github.com/opencost/opencost/core/pkg/source"
  8. "github.com/opencost/opencost/core/pkg/util/atomic"
  9. )
  10. type ComputeExportController[T any] struct {
  11. runState atomic.AtomicRunState
  12. source ComputeSource[T]
  13. exporter Exporter[T]
  14. resolution time.Duration
  15. tName string
  16. }
  17. func NewComputeExportController[T any](source ComputeSource[T], exporter Exporter[T], resolution time.Duration) *ComputeExportController[T] {
  18. return &ComputeExportController[T]{
  19. source: source,
  20. resolution: resolution,
  21. exporter: exporter,
  22. tName: reflect.TypeOf((*T)(nil)).Elem().String(),
  23. }
  24. }
  25. func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
  26. // Before we attempt to start, we must ensure we are not in a stopping state
  27. cd.runState.WaitForReset()
  28. // This will atomically check the current state to ensure we can run, then advances the state.
  29. // If the state is already started, it will return false.
  30. if !cd.runState.Start() {
  31. return false
  32. }
  33. // our run state is advanced, let's execute our action on the interval
  34. // spawn a new goroutine which will loop and wait the interval each iteration
  35. go func() {
  36. for {
  37. // use a select statement to receive whichever channel receives data first
  38. select {
  39. // if our stop channel receives data, it means we have explicitly called
  40. // Stop(), and must reset our AtomicRunState to it's initial idle state
  41. case <-cd.runState.OnStop():
  42. cd.runState.Reset()
  43. return // exit go routine
  44. // After our interval elapses, fall through
  45. case <-time.After(interval):
  46. }
  47. start := time.Now().UTC().Truncate(cd.resolution)
  48. end := start.Add(cd.resolution)
  49. log.Debugf("[%s] Reporting for window: %s - %s", cd.tName, start.UTC(), end.UTC())
  50. if !cd.source.CanCompute(start, end) {
  51. log.Errorf("[%s] Cannot compute window: [Start: %s, End: %s]", cd.tName, start, end)
  52. continue
  53. }
  54. set, err := cd.source.Compute(start, end, cd.resolution)
  55. // If a NoDataError or ErrorCollection is returned, we expect that an empty set will
  56. // also be returned. Like an EOF error, this is an expected state
  57. // and indicates that we should still Insert and Save.
  58. if err != nil && !source.IsNoDataError(err) && !source.IsErrorCollection(err) {
  59. continue
  60. }
  61. // Check ErrorCollection to set Warnings and Errors
  62. if source.IsErrorCollection(err) {
  63. c := err.(source.QueryErrorCollection)
  64. errors, warnings := c.ToErrorAndWarningStrings()
  65. logErrors(start, end, warnings, errors)
  66. continue
  67. }
  68. err = cd.exporter.Export(opencost.NewClosedWindow(start, end), set)
  69. if err != nil {
  70. log.Warnf("[%s] Error during Write: %s", cd.tName, err)
  71. }
  72. }
  73. }()
  74. return true
  75. }
  76. // Stops the compute processing loop
  77. func (cd *ComputeExportController[T]) Stop() {
  78. cd.runState.Stop()
  79. }
  80. // temporary
  81. func logErrors(start, end time.Time, warnings []string, errors []string) {
  82. }