Browse Source

Finish implementation for storage exporter

Matt Bolt 1 năm trước cách đây
mục cha
commit
a206fbaab1

+ 17 - 22
core/pkg/exporter/exporter.go

@@ -16,13 +16,15 @@ type Exporter[T any] interface {
 	Export(window opencost.Window, data *T) error
 }
 
+// StorageExporter[T] is an implementation of Exporter[T] that writes data to a storage backend using
+// `github.com/opencost/opencost/core/pkg/storage`, a pathing strategy, and an encoder.
 type StorageExporter[T any] struct {
 	pipeline   string
 	resolution time.Duration
 	paths      pathing.StoragePathFormatter
 	encoder    Encoder[T]
 	storage    storage.Storage
-	validator  validator.StoreValidator[T]
+	validator  validator.ExportValidator[T]
 }
 
 func NewStorageExporter[T any](
@@ -31,7 +33,7 @@ func NewStorageExporter[T any](
 	paths pathing.StoragePathFormatter,
 	encoder Encoder[T],
 	storage storage.Storage,
-	validator validator.StoreValidator[T],
+	validator validator.ExportValidator[T],
 ) *StorageExporter[T] {
 	return &StorageExporter[T]{
 		pipeline:   pipeline,
@@ -44,32 +46,25 @@ func NewStorageExporter[T any](
 }
 
 func (se *StorageExporter[T]) Export(window opencost.Window, data *T) error {
-	// TODO: Move basic data validation and window validation to StoreValidator[T]
-	if data == nil {
-		return fmt.Errorf("invalid data: nil")
-	}
-
-	if window.IsOpen() {
-		return fmt.Errorf("invalid window: open")
+	if se.validator != nil {
+		err := se.validator.Validate(window, data)
+		if err != nil {
+			return fmt.Errorf("failed to validate data: %w", err)
+		}
 	}
 
 	s, e := *window.Start(), *window.End()
 	path := se.paths.ToFullPath("", s, e)
 
-	// FIXME: Validator should handle general logic for what to do provided specific circumstances
-	// FIXME: like an empty set AND a file already exists. Having a solid set of defaults here with
-	// FIXME: a customizable abstraction seems reasonable to start.
-	/*
-		currentExists, err := se.storage.Exists(path)
-		if err != nil {
-			return fmt.Errorf("unable to check for existing data from storage path: %w", err)
-		}
+	currentExists, err := se.storage.Exists(path)
+	if err != nil {
+		return fmt.Errorf("unable to check for existing data from storage path: %w", err)
+	}
 
-		if isSetEmpty && currentSetExists {
-			log.Debugf("retaining existing data in storage at path: %s", path)
-			return nil
-		}
-	*/
+	if currentExists && se.validator != nil && !se.validator.IsOverwrite(data) {
+		log.Debugf("retaining existing data in storage at path: %s", path)
+		return nil
+	}
 
 	bin, err := se.encoder.Encode(data)
 	if err != nil {

+ 51 - 132
core/pkg/exporter/validator/validator.go

@@ -1,7 +1,6 @@
 package validator
 
 import (
-	"encoding"
 	"errors"
 	"fmt"
 	"time"
@@ -26,24 +25,22 @@ var (
 	ErrEmptySet error = errors.New("invalid set: empty")
 )
 
-// SetConstraint is a helper constraint for StorageStrategy
+// SetConstraint is a helper constraint for an Export[T] implementation
 type SetConstraint[T any] interface {
-	encoding.BinaryMarshaler
-	encoding.BinaryUnmarshaler
-
-	Clone() *T
-	GetWindow() opencost.Window
 	IsEmpty() bool
-
 	*T
 }
 
 // Validator is an implementation of an object capable of validating a T instance prior to
 // insertion into a store.
-type StoreValidator[T any] interface {
-	// IsValid determines whether or not the given data can be legally
+type ExportValidator[T any] interface {
+	// Validate determines whether or not the given data can be legally
 	// added to the store.
-	IsValid(*T) (bool, error)
+	Validate(window opencost.Window, data *T) error
+
+	// IsOverwrite determines whether or not the provided data can be used
+	// to overwrite existing data in the storage.
+	IsOverwrite(data *T) bool
 }
 
 // validation of a window, which is a common pattern in the validator implementations
@@ -65,163 +62,85 @@ func validateWindow(window opencost.Window) (start, end time.Time, err error) {
 }
 
 //--------------------------------------------------------------------------
-//  Window Validator
+//  Chain Validator
 //--------------------------------------------------------------------------
 
-// windowValidator is a StoreValidator implementation which ensures that all
-// set windows are closed.
-type windowValidator[T any, U SetConstraint[T]] struct{}
-
-// NewWindowValidator creates a new window validator that ensures all
-// set windows are closed.
-func NewWindowValidator[T any, U SetConstraint[T]]() StoreValidator[T] {
-	return &windowValidator[T, U]{}
+// chain validator is used to chain multiple validators together.
+type chainValidator[T any] struct {
+	validators []ExportValidator[T]
 }
 
-// IsValid determines whether or not the given data can be legally
-// added to the store.
-func (wv *windowValidator[T, U]) IsValid(t *T) (bool, error) {
-	if t == nil {
-		return false, ErrNilSet
-	}
-
-	var set U = t
-	_, _, err := validateWindow(set.GetWindow())
-	if err != nil {
-		return false, err
-	}
-
-	return true, nil
+// NewChainValidator creates a single validator instances which chains together many validators.
+func NewChainValidator[T any](validators ...ExportValidator[T]) ExportValidator[T] {
+	return &chainValidator[T]{validators: validators}
 }
 
-//--------------------------------------------------------------------------
-//  Resolution Validator
-//--------------------------------------------------------------------------
-
-// resolution validator is used to validate against window and the window resolution
-type resolutionValidator[T any, U SetConstraint[T]] struct {
-	resolution time.Duration
-}
-
-// NewResolutionValidator creates a new validator for storage sets that validate both the window
-// and whether the resolution matches the window.
-func NewResolutionValidator[T any, U SetConstraint[T]](resolution time.Duration) StoreValidator[T] {
-	return &resolutionValidator[T, U]{
-		resolution: resolution,
+func (cv *chainValidator[T]) Validate(window opencost.Window, data *T) error {
+	for _, validator := range cv.validators {
+		err := validator.Validate(window, data)
+		if err != nil {
+			return err
+		}
 	}
+	return nil
 }
 
-// IsValid determines whether or not the given data can be legally
-// added to the store.
-func (rv *resolutionValidator[T, U]) IsValid(t *T) (bool, error) {
-	if t == nil {
-		return false, ErrNilSet
-	}
-
-	var set U = t
-	start, end, err := validateWindow(set.GetWindow())
-	if err != nil {
-		return false, err
-	}
-
-	resolution := end.Sub(start)
-	if resolution != rv.resolution {
-		return false, fmt.Errorf("invalid set: resolution of %ds != %ds", uint64(resolution.Seconds()), uint64(rv.resolution.Seconds()))
+func (cv *chainValidator[T]) IsOverwrite(data *T) bool {
+	for _, validator := range cv.validators {
+		if !validator.IsOverwrite(data) {
+			return false
+		}
 	}
-
-	return true, nil
+	return true
 }
 
 //--------------------------------------------------------------------------
-//  UTC Resolution Validator
+//  Set Validator
 //--------------------------------------------------------------------------
 
-// utc resolution validator is used to validate against window and the window resolution, and checks that the window
-// start and end are on the UTC multiple for that resolution
-type utcResolutionValidator[T any, U SetConstraint[T]] struct {
+// setValidator is used for the a potentially "empty" set of data that should avoid
+// overwriting existing data in the store, and applies a window and resolution validation.
+type setValidator[T any, U SetConstraint[T]] struct {
 	resolution time.Duration
 }
 
-// NewUTCResolutionValidator creates a new validator for storage sets that validate both the window,
-// whether the resolution matches the window and that the window is a UTC multiple of the resolution.
-func NewUTCResolutionValidator[T any, U SetConstraint[T]](resolution time.Duration) StoreValidator[T] {
-	return &utcResolutionValidator[T, U]{
+// NewSetValidator is used for the a potentially "empty" set of data that should avoid
+// overwriting existing data in the store, and applies a window and resolution validation.
+func NewSetValidator[T any, U SetConstraint[T]](resolution time.Duration) ExportValidator[T] {
+	return &setValidator[T, U]{
 		resolution: resolution,
 	}
 }
 
-// IsValid determines whether or not the given data can be legally
-// added to the store.
-func (urv *utcResolutionValidator[T, U]) IsValid(t *T) (bool, error) {
-	if t == nil {
-		return false, ErrNilSet
+// IsValid determines whether the provided start and end times are valid for the data provided.
+func (sv *setValidator[T, U]) Validate(window opencost.Window, data *T) error {
+	if data == nil {
+		return ErrNilSet
 	}
 
-	// Check Valid Window
-	var set U = t
-	start, end, err := validateWindow(set.GetWindow())
+	start, end, err := validateWindow(window)
 	if err != nil {
-		return false, err
+		return err
 	}
 
 	// Check Resolution
 	resolution := end.Sub(start)
-	if resolution != urv.resolution {
-		return false, fmt.Errorf("invalid set: resolution of %ds != %ds", uint64(resolution.Seconds()), uint64(urv.resolution.Seconds()))
+	if resolution != sv.resolution {
+		return fmt.Errorf("invalid set: resolution of %ds != %ds", uint64(resolution.Seconds()), uint64(sv.resolution.Seconds()))
 	}
 
 	// Check UTC Multiple
-	nearestUTCMultiple := opencost.RoundBack(start.UTC(), urv.resolution)
+	nearestUTCMultiple := opencost.RoundBack(start.UTC(), sv.resolution)
 	if !start.Equal(nearestUTCMultiple) {
-		return false, fmt.Errorf("invalid set: start %s is not a UTC multiple of resolution %ds, the nearest valid start is %s", start.String(), uint64(urv.resolution.Seconds()), nearestUTCMultiple.String())
+		return fmt.Errorf("invalid set: start %s is not a UTC multiple of resolution %ds, the nearest valid start is %s", start.String(), uint64(sv.resolution.Seconds()), nearestUTCMultiple.String())
 	}
 
-	return true, nil
+	return nil
 }
 
-//--------------------------------------------------------------------------
-//  Empty Set Validator
-//--------------------------------------------------------------------------
-
-// emptySetValidator validates that a set is non empty, has a valid window,
-// and
-type emptySetValidator[T any, U SetConstraint[T]] struct {
-	resolution time.Duration
-}
-
-// NewEmptySetValidator creates a validator that checks for non-empty sets,
-// a valid window, and a valid resolution
-func NewEmptySetValidator[T any, U SetConstraint[T]](resolution time.Duration) StoreValidator[T] {
-	return &emptySetValidator[T, U]{
-		resolution: resolution,
-	}
-}
-
-// IsValid determines whether or not the given data can be legally
-// added to the store.
-func (esv *emptySetValidator[T, U]) IsValid(t *T) (bool, error) {
-	// non-nil validation
-	if t == nil {
-		return false, ErrNilSet
-	}
-
-	var set U = t
-	// non-empty validation
-	if set.IsEmpty() {
-		return false, ErrEmptySet
-	}
-
-	// window validation
-	start, end, err := validateWindow(set.GetWindow())
-	if err != nil {
-		return false, err
-	}
-
-	// resolution validation
-	resolution := end.Sub(start)
-	if resolution != esv.resolution {
-		return false, fmt.Errorf("invalid set: resolution of %ds != %ds", uint64(resolution.Seconds()), uint64(esv.resolution.Seconds()))
-	}
+// IsOverwrite should return true if the data is not nil and the set is not empty
+func (sv *setValidator[T, U]) IsOverwrite(data *T) bool {
+	var set U = data
 
-	return true, nil
+	return set != nil && !set.IsEmpty()
 }

+ 126 - 93
core/pkg/exporter/validator/validator_test.go

@@ -1,6 +1,8 @@
 package validator
 
 import (
+	"fmt"
+	"slices"
 	"testing"
 	"time"
 
@@ -9,7 +11,7 @@ import (
 )
 
 func TestWindowValidator(t *testing.T) {
-	v := NewWindowValidator[opencost.AllocationSet]()
+	v := NewSetValidator[opencost.AllocationSet](time.Hour)
 
 	end := time.Now().UTC()
 	start := end.Add(-time.Hour)
@@ -18,74 +20,34 @@ func TestWindowValidator(t *testing.T) {
 
 	invalidEnd := opencost.NewWindow(&start, nil)
 	invalidStart := opencost.NewWindow(nil, &end)
-	valid := opencost.NewWindow(&start, &end)
+
+	s := start.Truncate(time.Hour)
+	e := end.Truncate(time.Hour)
+	valid := opencost.NewWindow(&s, &e)
 
 	// Invalid End
 	set.Window = invalidEnd
-	isValid, err := v.IsValid(set)
-	if isValid || err == nil {
+	err := v.Validate(set.Window, set)
+	if err == nil {
 		t.Errorf("Validator returned valid flag for invalid window in set")
 	}
 
 	// InValid Start
 	set.Window = invalidStart
-	isValid, err = v.IsValid(set)
-	if isValid || err == nil {
+	err = v.Validate(set.Window, set)
+	if err == nil {
 		t.Errorf("Validator returned valid flag for invalid window in set")
 	}
 
 	// Valid
 	set.Window = valid
-	isValid, err = v.IsValid(set)
-	if !isValid || err != nil {
-		t.Errorf("Validator returned an invalid flag or error for a valid window")
+	err = v.Validate(set.Window, set)
+	if err != nil {
+		t.Errorf("Validator returned an error for a valid window: %v", err)
 	}
 
 }
 
-func TestResolutionValidator(t *testing.T) {
-	v := NewResolutionValidator[opencost.AllocationSet](time.Hour)
-
-	end := time.Now().UTC()
-	start := end.Add(-time.Hour)
-	start2h := start.Add(-time.Hour)
-
-	set := opencost.NewAllocationSet(start, end)
-
-	invalidEnd := opencost.NewWindow(&start, nil)
-	invalidStart := opencost.NewWindow(nil, &end)
-	invalidResolution := opencost.NewWindow(&start2h, &end)
-	valid := opencost.NewWindow(&start, &end)
-
-	// Invalid End
-	set.Window = invalidEnd
-	isValid, err := v.IsValid(set)
-	if isValid || err == nil {
-		t.Errorf("Validator returned valid flag for invalid window in set")
-	}
-
-	// Invalid Start
-	set.Window = invalidStart
-	isValid, err = v.IsValid(set)
-	if isValid || err == nil {
-		t.Errorf("Validator returned valid flag for invalid window in set")
-	}
-
-	// Invalid Resolution
-	set.Window = invalidResolution
-	isValid, err = v.IsValid(set)
-	if isValid || err == nil {
-		t.Errorf("Validator returned valid flag for invalid resolution in set")
-	}
-
-	// Valid
-	set.Window = valid
-	isValid, err = v.IsValid(set)
-	if !isValid || err != nil {
-		t.Errorf("Validator returned an invalid flag or error for a valid window")
-	}
-}
-
 func TestUTCResolutionValidator(t *testing.T) {
 	start := opencost.RoundBack(time.Now().UTC(), timeutil.Week)
 
@@ -145,9 +107,10 @@ func TestUTCResolutionValidator(t *testing.T) {
 
 	for name, tc := range testCases {
 		t.Run(name, func(t *testing.T) {
-			v := NewUTCResolutionValidator[opencost.AllocationSet](tc.resolution)
+			v := NewSetValidator[opencost.AllocationSet](tc.resolution)
 			set.Window = tc.window
-			isValid, err := v.IsValid(set)
+			err := v.Validate(tc.window, set)
+			isValid := err == nil
 			if tc.expected != isValid {
 				t.Errorf("Validator returned incorrect flag")
 			}
@@ -162,59 +125,129 @@ func TestUTCResolutionValidator(t *testing.T) {
 	}
 }
 
-func TestEmptySetValidator(t *testing.T) {
-	v := NewEmptySetValidator[opencost.AllocationSet](time.Hour)
+func TestEmptyAndNil(t *testing.T) {
+	v := NewSetValidator[opencost.AllocationSet](time.Hour)
 
-	end := time.Now().UTC()
+	end := time.Now().UTC().Truncate(time.Hour)
 	start := end.Add(-time.Hour)
-	start2h := start.Add(-time.Hour)
 
-	set := opencost.NewAllocationSet(start, end, opencost.NewMockUnitAllocation("", start, time.Hour, nil))
+	window := opencost.NewClosedWindow(start, end)
+	emptySet := opencost.NewAllocationSet(start, end)
+	nilSet := (*opencost.AllocationSet)(nil)
 
-	invalidEnd := opencost.NewWindow(&start, nil)
-	invalidStart := opencost.NewWindow(nil, &end)
-	invalidResolution := opencost.NewWindow(&start2h, &end)
-	valid := opencost.NewWindow(&start, &end)
+	err := v.Validate(window, nilSet)
+	if err == nil {
+		t.Errorf("Validator returned valid flag for nil data")
+	}
 
-	//
-	// Non-Empty Tests
-	//
+	isEmpty := !v.IsOverwrite(emptySet)
+	if !isEmpty {
+		t.Errorf("Validator returned overwrite flag for empty data")
+	}
+}
 
-	// Invalid End
-	set.Window = invalidEnd
-	isValid, err := v.IsValid(set)
-	if isValid || err == nil {
-		t.Errorf("Validator returned valid flag for invalid window in set")
+type collection struct {
+	vs []string
+}
+
+func (c *collection) add(v string) {
+	c.vs = append(c.vs, v)
+}
+
+func (c *collection) clear() {
+	c.vs = []string{}
+}
+
+type appendingValidator struct {
+	tag  string
+	tags *collection
+	fail bool
+}
+
+func newAppendingValidator(tag string, tags *collection) *appendingValidator {
+	return &appendingValidator{
+		tag:  tag,
+		tags: tags,
 	}
+}
 
-	// Invalid Start
-	set.Window = invalidStart
-	isValid, err = v.IsValid(set)
-	if isValid || err == nil {
-		t.Errorf("Validator returned valid flag for invalid window in set")
+func newFailingValidator(tag string, tags *collection) *appendingValidator {
+	return &appendingValidator{
+		tag:  tag,
+		tags: tags,
+		fail: true,
 	}
+}
 
-	// Invalid Resolution
-	set.Window = invalidResolution
-	isValid, err = v.IsValid(set)
-	if isValid || err == nil {
-		t.Errorf("Validator returned valid flag for invalid resolution in set")
+func (av *appendingValidator) Validate(window opencost.Window, data *opencost.AllocationSet) error {
+	if av.fail {
+		return fmt.Errorf("failed validator: %s", av.tag)
 	}
+	av.tags.add("Validate: " + av.tag)
+	return nil
+}
 
-	// Valid
-	set.Window = valid
-	isValid, err = v.IsValid(set)
-	if !isValid || err != nil {
-		t.Errorf("Validator returned an invalid flag or error for a valid window")
+func (av *appendingValidator) IsOverwrite(data *opencost.AllocationSet) bool {
+	av.tags.add("IsOverwrite: " + av.tag)
+	return true
+}
+
+func TestChainValidation(t *testing.T) {
+	tags := new(collection)
+
+	validators := []ExportValidator[opencost.AllocationSet]{
+		newAppendingValidator("a", tags),
+		newAppendingValidator("b", tags),
+		newAppendingValidator("c", tags),
+		newAppendingValidator("d", tags),
 	}
 
-	//
-	// Empty Test
-	//
+	v := NewChainValidator(validators...)
 
-	set = opencost.NewAllocationSet(start, end)
-	isValid, err = v.IsValid(set)
-	if isValid || err == nil {
-		t.Errorf("Validator returned valid flag for empty set")
+	end := time.Now().UTC().Truncate(time.Hour)
+	start := end.Add(-time.Hour)
+
+	window := opencost.NewClosedWindow(start, end)
+	set := opencost.NewAllocationSet(start, end)
+
+	err := v.Validate(window, set)
+	if err != nil {
+		t.Errorf("Validator returned unexpected error: %v", err)
+	}
+
+	if !slices.Contains(tags.vs, "Validate: a") {
+		t.Errorf("Validator did not call validate on first validator")
+	}
+	if !slices.Contains(tags.vs, "Validate: b") {
+		t.Errorf("Validator did not call validate on second validator")
+	}
+	if !slices.Contains(tags.vs, "Validate: c") {
+		t.Errorf("Validator did not call validate on third validator")
+	}
+	if !slices.Contains(tags.vs, "Validate: d") {
+		t.Errorf("Validator did not call validate on fourth validator")
+	}
+
+	tags.clear()
+
+	// Test failing validator
+	validators = []ExportValidator[opencost.AllocationSet]{
+		newAppendingValidator("a", tags),
+		newAppendingValidator("b", tags),
+		newFailingValidator("c", tags),
+		newAppendingValidator("d", tags),
+	}
+
+	v = NewChainValidator(validators...)
+	err = v.Validate(window, set)
+	if err == nil {
+		t.Errorf("Validator did not return expected error")
+	}
+
+	if !slices.Contains(tags.vs, "Validate: a") {
+		t.Errorf("Validator did not call validate on first validator")
+	}
+	if !slices.Contains(tags.vs, "Validate: b") {
+		t.Errorf("Validator did not call validate on second validator")
 	}
 }