2
0
Эх сурвалжийг харах

Check last export against new window, and ensure we export both when we cross into the next window.

Matt Bolt 1 жил өмнө
parent
commit
b7c844a3e2

+ 79 - 31
core/pkg/exporter/controller.go

@@ -1,6 +1,7 @@
 package exporter
 
 import (
+	"fmt"
 	"reflect"
 	"strings"
 	"time"
@@ -94,6 +95,7 @@ type ComputeExportController[T any] struct {
 	exporter         ComputeExporter[T]
 	resolution       time.Duration
 	sourceResolution time.Duration
+	lastExport       time.Time
 	typeName         string
 }
 
@@ -146,43 +148,88 @@ func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
 			case <-time.After(interval):
 			}
 
-			start := time.Now().UTC().Truncate(cd.resolution)
-			end := start.Add(cd.resolution)
-
-			log.Debugf("[%s] Reporting for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
-			if !cd.source.CanCompute(start, end) {
-				log.Errorf("[%s] Cannot compute window: [Start: %s, End: %s]", cd.typeName, start, end)
-				continue
+			now := time.Now().UTC()
+			windows := cd.exportWindowsFor(now)
+
+			for _, window := range windows {
+				err := cd.export(window)
+				if err != nil {
+					// Check ErrorCollection to set Warnings and Errors
+					if source.IsErrorCollection(err) {
+						c := err.(source.QueryErrorCollection)
+						errors, warnings := c.ToErrorAndWarningStrings()
+
+						cd.logErrors(window, warnings, errors)
+						continue
+					}
+
+					log.Errorf("[%s] %s", cd.typeName, err)
+				} else {
+					cd.lastExport = now
+				}
 			}
+		}
+	}()
 
-			set, err := cd.source.Compute(start, end, cd.sourceResolution)
-
-			// If a NoDataError or ErrorCollection is returned, we expect that an empty set will
-			// also be returned. Like an EOF error, this is an expected state
-			// and indicates that we should still Insert and Save.
-			if err != nil && !source.IsNoDataError(err) && !source.IsErrorCollection(err) {
-				log.Errorf("[%s] Error during Compute: %s", cd.typeName, err)
-				continue
-			}
+	return true
+}
 
-			// Check ErrorCollection to set Warnings and Errors
-			if source.IsErrorCollection(err) {
-				c := err.(source.QueryErrorCollection)
-				errors, warnings := c.ToErrorAndWarningStrings()
+// exportWindows uses the last export time to determine the current time windows to
+// export. This will, at most, return 2 windows: the previous resolution window and
+// the current resolution window.
+func (cd *ComputeExportController[T]) exportWindowsFor(now time.Time) []opencost.Window {
+	start := now.Truncate(cd.resolution)
+	end := start.Add(cd.resolution)
 
-				cd.logErrors(start, end, warnings, errors)
-				continue
-			}
+	if cd.lastExport.IsZero() {
+		return []opencost.Window{
+			opencost.NewClosedWindow(start, end),
+		}
+	}
 
-			log.Debugf("[%s] Exporting data for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
-			err = cd.exporter.Export(opencost.NewClosedWindow(start, end), set)
-			if err != nil {
-				log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
-			}
+	lastStart := cd.lastExport.Truncate(cd.resolution)
+	if lastStart.Equal(start) {
+		return []opencost.Window{
+			opencost.NewClosedWindow(start, end),
 		}
-	}()
+	}
+	lastEnd := lastStart.Add(cd.resolution)
 
-	return true
+	// we've identified that the last export window is not the same as the current,
+	// so we should export the previous resolution window as well as the current one
+	return []opencost.Window{
+		opencost.NewClosedWindow(lastStart, lastEnd),
+		opencost.NewClosedWindow(start, end),
+	}
+}
+
+// export computes and exports the data for a given time window
+func (cd *ComputeExportController[T]) export(window opencost.Window) error {
+	if window.IsOpen() {
+		return fmt.Errorf("window is open: %s", window.String())
+	}
+
+	start, end := *window.Start(), *window.End()
+
+	log.Debugf("[%s] Reporting for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
+
+	if !cd.source.CanCompute(start, end) {
+		return fmt.Errorf("cannot compute window: [Start: %s, End: %s]", start, end)
+	}
+
+	set, err := cd.source.Compute(start, end, cd.sourceResolution)
+	// all errors but NoDataError are considered a halt to the export
+	if err != nil && !source.IsNoDataError(err) {
+		return err
+	}
+
+	log.Debugf("[%s] Exporting data for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
+	err = cd.exporter.Export(opencost.NewClosedWindow(start, end), set)
+	if err != nil {
+		return fmt.Errorf("write error: %w", err)
+	}
+
+	return nil
 }
 
 // Stops the compute processing loop
@@ -191,7 +238,8 @@ func (cd *ComputeExportController[T]) Stop() {
 }
 
 // temporary
-func (cd *ComputeExportController[T]) logErrors(start, end time.Time, warnings []string, errors []string) {
+func (cd *ComputeExportController[T]) logErrors(window opencost.Window, warnings []string, errors []string) {
+	start, end := window.Start(), window.End()
 	for _, w := range warnings {
 		log.Warnf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), w)
 	}