Explorar o código

Audit and Generic store types

Sean Holcomb %!s(int64=4) %!d(string=hai) anos
pai
achega
0ec3192033

+ 1 - 1
pkg/kubecost/allocation.go

@@ -914,7 +914,7 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 	shouldAggregate := aggregateBy != nil
 	shouldAggregate := aggregateBy != nil
 	shouldFilter := len(options.FilterFuncs) > 0
 	shouldFilter := len(options.FilterFuncs) > 0
 	shouldShare := len(options.SharedHourlyCosts) > 0 || len(options.ShareFuncs) > 0
 	shouldShare := len(options.SharedHourlyCosts) > 0 || len(options.ShareFuncs) > 0
-	if !shouldAggregate && !shouldFilter && !shouldShare {
+	if !shouldAggregate && !shouldFilter && !shouldShare && options.ShareIdle == ShareNone {
 		// There is nothing for AggregateBy to do, so simply return nil
 		// There is nothing for AggregateBy to do, so simply return nil
 		return nil
 		return nil
 	}
 	}

+ 11 - 4
pkg/kubecost/asset.go

@@ -856,6 +856,7 @@ type ClusterManagement struct {
 	labels     AssetLabels
 	labels     AssetLabels
 	properties *AssetProperties
 	properties *AssetProperties
 	window     Window
 	window     Window
+	adjustment float64
 	Cost       float64
 	Cost       float64
 }
 }
 
 
@@ -902,17 +903,17 @@ func (cm *ClusterManagement) SetLabels(props AssetLabels) {
 
 
 // Adjustment does not apply to ClusterManagement
 // Adjustment does not apply to ClusterManagement
 func (cm *ClusterManagement) Adjustment() float64 {
 func (cm *ClusterManagement) Adjustment() float64 {
-	return 0.0
+	return cm.adjustment
 }
 }
 
 
 // SetAdjustment does not apply to ClusterManagement
 // SetAdjustment does not apply to ClusterManagement
-func (cm *ClusterManagement) SetAdjustment(float64) {
-	return
+func (cm *ClusterManagement) SetAdjustment(adj float64) {
+	cm.adjustment = adj
 }
 }
 
 
 // TotalCost returns the Asset's total cost
 // TotalCost returns the Asset's total cost
 func (cm *ClusterManagement) TotalCost() float64 {
 func (cm *ClusterManagement) TotalCost() float64 {
-	return cm.Cost
+	return cm.Cost + cm.adjustment
 }
 }
 
 
 // Start returns the Asset's precise start time within the window
 // Start returns the Asset's precise start time within the window
@@ -991,6 +992,7 @@ func (cm *ClusterManagement) add(that *ClusterManagement) {
 	cm.window = window
 	cm.window = window
 	cm.SetProperties(props)
 	cm.SetProperties(props)
 	cm.SetLabels(labels)
 	cm.SetLabels(labels)
+	cm.adjustment += that.adjustment
 	cm.Cost += that.Cost
 	cm.Cost += that.Cost
 }
 }
 
 
@@ -1000,6 +1002,7 @@ func (cm *ClusterManagement) Clone() Asset {
 		labels:     cm.labels.Clone(),
 		labels:     cm.labels.Clone(),
 		properties: cm.properties.Clone(),
 		properties: cm.properties.Clone(),
 		window:     cm.window.Clone(),
 		window:     cm.window.Clone(),
+		adjustment: cm.adjustment,
 		Cost:       cm.Cost,
 		Cost:       cm.Cost,
 	}
 	}
 }
 }
@@ -1022,6 +1025,10 @@ func (cm *ClusterManagement) Equal(a Asset) bool {
 		return false
 		return false
 	}
 	}
 
 
+	if cm.adjustment != that.adjustment {
+		return false
+	}
+
 	if cm.Cost != that.Cost {
 	if cm.Cost != that.Cost {
 		return false
 		return false
 	}
 	}

+ 361 - 0
pkg/kubecost/audit.go

@@ -0,0 +1,361 @@
+package kubecost
+
+import (
+	"golang.org/x/exp/slices"
+	"sync"
+	"time"
+)
+
+// AuditType the types of Audits, each of which should be contained in an AuditSet
+type AuditType string
+
+const (
+	AuditAllocationReconciliation AuditType = "AuditAllocationReconciliation"
+	AuditAllocationTotalStore     AuditType = "AuditAllocationTotalStore"
+	AuditAllocationAggStore       AuditType = "AuditAllocationAggStore"
+	AuditAssetReconciliation      AuditType = "AuditAssetReconciliation"
+	AuditAssetTotalStore          AuditType = "AuditAssetTotalStore"
+	AuditAssetAggStore            AuditType = "AuditAssetAggStore"
+	AuditClusterEquality          AuditType = "AuditClusterEquality"
+
+	AuditAll         AuditType = ""
+	AuditInvalidType AuditType = "InvalidType"
+)
+
+// ToAuditType converts a string to an Audit type
+func ToAuditType(check string) AuditType {
+	switch check {
+	case string(AuditAllocationReconciliation):
+		return AuditAllocationReconciliation
+	case string(AuditAllocationTotalStore):
+		return AuditAllocationTotalStore
+	case string(AuditAllocationAggStore):
+		return AuditAllocationAggStore
+	case string(AuditAssetReconciliation):
+		return AuditAssetReconciliation
+	case string(AuditAssetTotalStore):
+		return AuditAssetTotalStore
+	case string(AuditAssetAggStore):
+		return AuditAssetAggStore
+	case string(AuditClusterEquality):
+		return AuditClusterEquality
+	case string(AuditAll):
+		return AuditAll
+	default:
+		return AuditInvalidType
+	}
+}
+
+// AuditStatus are possible outcomes of an audit
+type AuditStatus string
+
+const (
+	FailedStatus  AuditStatus = "Failed"
+	WarningStatus             = "Warning"
+	PassedStatus              = "Passed"
+)
+
+// AuditMissingValue records when a value that should be present in a store or in the audit generated results are missing
+type AuditMissingValue struct {
+	Description string
+	Key         string
+}
+
+// AuditFloatResult structure for holding the results of a failed audit on a float value, Expected should be the Audit generated value
+// while actual is what is contained in the relevant store
+type AuditFloatResult struct {
+	Expected float64
+	Actual   float64
+}
+
+func (afr *AuditFloatResult) Clone() *AuditFloatResult {
+	return &AuditFloatResult{
+		Expected: afr.Expected,
+		Actual:   afr.Actual,
+	}
+}
+
+// AllocationReconciliationAudit records the differences of between compute resource costs between allocations by nodes
+// and node assets keyed on node name and resource
+type AllocationReconciliationAudit struct {
+	Status        AuditStatus
+	Description   string
+	LastRun       time.Time
+	Resources     map[string]map[string]*AuditFloatResult
+	MissingValues []*AuditMissingValue
+}
+
+func (ara *AllocationReconciliationAudit) Clone() *AllocationReconciliationAudit {
+	if ara == nil {
+		return nil
+	}
+
+	resources := make(map[string]map[string]*AuditFloatResult, len(ara.Resources))
+	for node, resourceMap := range ara.Resources {
+		copyResourceMap := make(map[string]*AuditFloatResult, len(resourceMap))
+		for resourceName, val := range resourceMap {
+			copyResourceMap[resourceName] = val.Clone()
+		}
+		resources[node] = copyResourceMap
+	}
+	return &AllocationReconciliationAudit{
+		Status:        ara.Status,
+		Description:   ara.Description,
+		LastRun:       ara.LastRun,
+		Resources:     resources,
+		MissingValues: slices.Clone(ara.MissingValues),
+	}
+}
+
+// TotalAudit records the differences between a total store and the totaled results of the store that it is based on
+// keyed by cluster and node names
+type TotalAudit struct {
+	Status         AuditStatus
+	Description    string
+	LastRun        time.Time
+	TotalByNode    map[string]*AuditFloatResult
+	TotalByCluster map[string]*AuditFloatResult
+	MissingValues  []*AuditMissingValue
+}
+
+// Clone returns a deep copy of the caller
+func (ta *TotalAudit) Clone() *TotalAudit {
+	if ta == nil {
+		return nil
+	}
+
+	tbn := make(map[string]*AuditFloatResult, len(ta.TotalByNode))
+	for k, v := range ta.TotalByNode {
+		tbn[k] = v
+	}
+	tbc := make(map[string]*AuditFloatResult, len(ta.TotalByNode))
+	for k, v := range ta.TotalByCluster {
+		tbc[k] = v
+	}
+
+	return &TotalAudit{
+		Status:         ta.Status,
+		Description:    ta.Description,
+		LastRun:        ta.LastRun,
+		TotalByNode:    tbn,
+		TotalByCluster: tbc,
+		MissingValues:  slices.Clone(ta.MissingValues),
+	}
+}
+
+// AggAudit contains the results of an Audit on an AggStore keyed on aggregation prop and Allocation key
+type AggAudit struct {
+	Status        AuditStatus
+	Description   string
+	LastRun       time.Time
+	Results       map[string]map[string]*AuditFloatResult
+	MissingValues []*AuditMissingValue
+}
+
+// Clone returns a deep copy of the caller
+func (aa *AggAudit) Clone() *AggAudit {
+	if aa == nil {
+		return nil
+	}
+	res := make(map[string]map[string]*AuditFloatResult, len(aa.Results))
+	for aggType, aggResults := range aa.Results {
+		copyAggResult := make(map[string]*AuditFloatResult, len(aggResults))
+		for aggName, auditFloatResult := range aggResults {
+			copyAggResult[aggName] = auditFloatResult
+		}
+		res[aggType] = copyAggResult
+	}
+
+	return &AggAudit{
+		Status:        aa.Status,
+		Description:   aa.Description,
+		LastRun:       aa.LastRun,
+		Results:       res,
+		MissingValues: slices.Clone(aa.MissingValues),
+	}
+}
+
+// AssetReconciliationAudit records differences in assets and the Cloud
+type AssetReconciliationAudit struct {
+	Status        AuditStatus
+	Description   string
+	LastRun       time.Time
+	Results       map[string]map[string]*AuditFloatResult
+	MissingValues []*AuditMissingValue
+}
+
+// Clone returns a deep copy of the caller
+func (ara *AssetReconciliationAudit) Clone() *AssetReconciliationAudit {
+	res := make(map[string]map[string]*AuditFloatResult, len(ara.Results))
+	for aggType, aggResults := range ara.Results {
+		copyAggResult := make(map[string]*AuditFloatResult, len(aggResults))
+		for aggName, auditFloatResult := range aggResults {
+			copyAggResult[aggName] = auditFloatResult
+		}
+		res[aggType] = copyAggResult
+	}
+
+	return &AssetReconciliationAudit{
+		Status:        ara.Status,
+		Description:   ara.Description,
+		LastRun:       ara.LastRun,
+		Results:       res,
+		MissingValues: slices.Clone(ara.MissingValues),
+	}
+}
+
+// EqualityAudit records the difference in cost between Allocations and Assets aggregated by cluster and keyed on cluster
+type EqualityAudit struct {
+	Status        AuditStatus
+	Description   string
+	LastRun       time.Time
+	Clusters      map[string]*AuditFloatResult
+	MissingValues []*AuditMissingValue
+}
+
+// Clone returns a deep copy of the caller
+func (ea *EqualityAudit) Clone() *EqualityAudit {
+	if ea == nil {
+		return nil
+	}
+	clusters := make(map[string]*AuditFloatResult, len(ea.Clusters))
+	for k, v := range ea.Clusters {
+		clusters[k] = v
+	}
+	return &EqualityAudit{
+		Status:        ea.Status,
+		Description:   ea.Description,
+		LastRun:       ea.LastRun,
+		Clusters:      clusters,
+		MissingValues: slices.Clone(ea.MissingValues),
+	}
+}
+
+// AuditCoverage tracks coverage of each audit type
+type AuditCoverage struct {
+	sync.RWMutex
+	AllocationReconciliation Window `json:"allocationReconciliation"`
+	AllocationAgg            Window `json:"allocationAgg"`
+	AllocationTotal          Window `json:"allocationTotal"`
+	AssetTotal               Window `json:"assetTotal"`
+	AssetReconciliation      Window `json:"assetReconciliation"`
+	ClusterEquality          Window `json:"clusterEquality"`
+}
+
+// NewAuditCoverage create default AuditCoverage
+func NewAuditCoverage() *AuditCoverage {
+	return &AuditCoverage{}
+}
+
+// Update expands the coverage of each Window in the coverage that the given AuditSet's Window if the corresponding Audit is not nil
+// Note: This means of determining coverage can lead to holes in the given window
+func (ac *AuditCoverage) Update(as *AuditSet) {
+	if as != nil && as.AllocationReconciliation != nil {
+		ac.AllocationReconciliation.Expand(as.Window)
+		ac.AllocationAgg.Expand(as.Window)
+		ac.AllocationTotal.Expand(as.Window)
+		ac.AssetTotal.Expand(as.Window)
+		ac.AssetReconciliation.Expand(as.Window)
+		ac.ClusterEquality.Expand(as.Window)
+	}
+
+}
+
+// AuditSet is a ETLSet which contains all kind of Audits for a given Window
+type AuditSet struct {
+	sync.RWMutex
+	AllocationReconciliation *AllocationReconciliationAudit `json:"allocationReconciliation"`
+	AllocationAgg            *AggAudit                      `json:"allocationAgg"`
+	AllocationTotal          *TotalAudit                    `json:"allocationTotal"`
+	AssetTotal               *TotalAudit                    `json:"assetTotal"`
+	AssetReconciliation      *AssetReconciliationAudit      `json:"assetReconciliation"`
+	ClusterEquality          *EqualityAudit                 `json:"clusterEquality"`
+	Window                   Window                         `json:"window"`
+}
+
+// NewAuditSet creates an empty AuditSet with the given window
+func NewAuditSet(start, end time.Time) *AuditSet {
+	return &AuditSet{
+		Window: NewWindow(&start, &end),
+	}
+}
+
+// UpdateAuditSet overwrites any audit fields in the caller with those in the given AuditSet which are not nil
+func (as *AuditSet) UpdateAuditSet(that *AuditSet) *AuditSet {
+	if as == nil {
+		return that
+	}
+
+	if that.AllocationReconciliation != nil {
+		as.AllocationReconciliation = that.AllocationReconciliation
+	}
+	if that.AllocationAgg != nil {
+		as.AllocationAgg = that.AllocationAgg
+	}
+	if that.AllocationTotal != nil {
+		as.AllocationTotal = that.AllocationTotal
+	}
+	if that.AssetTotal != nil {
+		as.AssetTotal = that.AssetTotal
+	}
+	if that.AssetReconciliation != nil {
+		as.AssetReconciliation = that.AssetReconciliation
+	}
+
+	if that.ClusterEquality != nil {
+		as.ClusterEquality = that.ClusterEquality
+	}
+
+	return as
+}
+
+// ConstructSet fulfills the ETLSet interface to provide an empty version of itself so that it can be initialized in its
+// generic form.
+func (as *AuditSet) ConstructSet() ETLSet {
+	return &AuditSet{}
+}
+
+// IsEmpty returns true if any of the audits are non-nil
+func (as *AuditSet) IsEmpty() bool {
+	return as == nil || (as.AllocationReconciliation == nil &&
+		as.AllocationAgg == nil &&
+		as.AllocationTotal == nil &&
+		as.AssetTotal == nil &&
+		as.AssetReconciliation == nil &&
+		as.ClusterEquality == nil)
+}
+
+// GetWindow returns AuditSet Window
+func (as *AuditSet) GetWindow() Window {
+	return as.Window
+}
+
+// Clone returns a deep copy of the caller
+func (as *AuditSet) Clone() *AuditSet {
+	if as == nil {
+		return nil
+	}
+
+	as.RLock()
+	defer as.RUnlock()
+
+	return &AuditSet{
+		AllocationReconciliation: as.AllocationReconciliation.Clone(),
+		AllocationAgg:            as.AllocationAgg.Clone(),
+		AllocationTotal:          as.AllocationTotal.Clone(),
+		AssetTotal:               as.AssetTotal.Clone(),
+		AssetReconciliation:      as.AssetReconciliation.Clone(),
+		ClusterEquality:          as.ClusterEquality.Clone(),
+		Window:                   as.Window.Clone(),
+	}
+}
+
+// CloneSet returns a deep copy of the caller and returns set
+func (as *AuditSet) CloneSet() ETLSet {
+	return as.Clone()
+}
+
+// AuditSetRange SetRange of AuditSets
+type AuditSetRange struct {
+	SetRange[*AuditSet]
+}

+ 14 - 0
pkg/kubecost/bingen.go

@@ -57,4 +57,18 @@ package kubecost
 // @bingen:generate:PVAllocation
 // @bingen:generate:PVAllocation
 // @bingen:end
 // @bingen:end
 
 
+// @bingen:set[name=Audit,version=1]
+// @bingen:generate:AllocationReconciliationAudit
+// @bingen:generate:TotalAudit
+// @bingen:generate:AggAudit
+// @bingen:generate:AuditFloatResult
+// @bingen:generate:AuditMissingValue
+// @bingen:generate:AssetReconciliationAudit
+// @bingen:generate:EqualityAudit
+// @bingen:generate:AuditType
+// @bingen:generate:AuditStatus
+// @bingen:generate[stringtable]:AuditSet
+// @bingen:generate:AuditSetRange
+// @bingen:end
+
 //go:generate bingen -package=kubecost -version=15 -buffer=github.com/kubecost/opencost/pkg/util
 //go:generate bingen -package=kubecost -version=15 -buffer=github.com/kubecost/opencost/pkg/util

+ 79 - 0
pkg/kubecost/etlrange.go

@@ -0,0 +1,79 @@
+package kubecost
+
+import (
+	"fmt"
+	"github.com/kubecost/opencost/pkg/util/json"
+	"sync"
+)
+
+// SetRange is a generic implementation of the SetRanges that act as containers. It covers the basic functionality that
+// is shared by the basic types but is meant to be extended by each implementation.
+type SetRange[T ETLSet] struct {
+	lock sync.RWMutex
+	sets []T
+}
+
+// Append attaches the given ETLSet to the end of the sets slice.
+// currently does not check that the window is correct.
+func (r *SetRange[T]) Append(that T) {
+	r.lock.Lock()
+	defer r.lock.Unlock()
+	r.sets = append(r.sets, that)
+}
+
+// Each invokes the given function for each ETLSet in the SetRange
+func (r *SetRange[T]) Each(f func(int, T)) {
+	if r == nil {
+		return
+	}
+
+	for i, set := range r.sets {
+		f(i, set)
+	}
+}
+
+// Get retrieves the given index from the sets slice
+func (r *SetRange[T]) Get(i int) (T, error) {
+	if i < 0 || i >= len(r.sets) {
+		var set T
+		return set, fmt.Errorf("range: index out of range: %d", i)
+	}
+
+	r.lock.RLock()
+	defer r.lock.RUnlock()
+	return r.sets[i], nil
+}
+
+// Length returns the length of the sets slice
+func (r *SetRange[T]) Length() int {
+	if r == nil || r.sets == nil {
+		return 0
+	}
+
+	r.lock.RLock()
+	defer r.lock.RUnlock()
+	return len(r.sets)
+}
+
+// IsEmpty returns false if SetRange contains a single ETLSet that is not empty
+func (r *SetRange[T]) IsEmpty() bool {
+	if r == nil || r.Length() == 0 {
+		return true
+	}
+	r.lock.RLock()
+	defer r.lock.RUnlock()
+
+	for _, set := range r.sets {
+		if !set.IsEmpty() {
+			return false
+		}
+	}
+	return true
+}
+
+// MarshalJSON converts SetRange to JSON
+func (r *SetRange[T]) MarshalJSON() ([]byte, error) {
+	r.lock.RLock()
+	defer r.lock.RUnlock()
+	return json.Marshal(r.sets)
+}

+ 15 - 0
pkg/kubecost/etlset.go

@@ -0,0 +1,15 @@
+package kubecost
+
+import "encoding"
+
+// ETLSet is an interface which represents the basic data block of an ETL. It is keyed by its Window
+type ETLSet interface {
+	ConstructSet() ETLSet
+	CloneSet() ETLSet
+	IsEmpty() bool
+	GetWindow() Window
+
+	// Representations
+	encoding.BinaryMarshaler
+	encoding.BinaryUnmarshaler
+}

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 1144 - 74
pkg/kubecost/kubecost_codecs.go


+ 3 - 2
pkg/kubecost/totals.go

@@ -411,7 +411,7 @@ func ComputeAssetTotals(as *AssetSet, prop AssetProperty) map[string]*AssetTotal
 
 
 			arts[key].Count++
 			arts[key].Count++
 			arts[key].LoadBalancerCost += lb.Cost
 			arts[key].LoadBalancerCost += lb.Cost
-			arts[key].LoadBalancerCost += lb.adjustment
+			arts[key].LoadBalancerCostAdjustment += lb.adjustment
 		} else if cm, ok := asset.(*ClusterManagement); ok && prop == AssetClusterProp {
 		} else if cm, ok := asset.(*ClusterManagement); ok && prop == AssetClusterProp {
 			// Only record cluster management when prop is Cluster because we
 			// Only record cluster management when prop is Cluster because we
 			// can't break down ClusterManagement by node.
 			// can't break down ClusterManagement by node.
@@ -426,7 +426,8 @@ func ComputeAssetTotals(as *AssetSet, prop AssetProperty) map[string]*AssetTotal
 			}
 			}
 
 
 			arts[key].Count++
 			arts[key].Count++
-			arts[key].ClusterManagementCost += cm.TotalCost()
+			arts[key].ClusterManagementCost += cm.Cost
+			arts[key].ClusterManagementCostAdjustment += cm.adjustment
 		} else if disk, ok := asset.(*Disk); ok {
 		} else if disk, ok := asset.(*Disk); ok {
 			// Record disks in an intermediate structure, which will be
 			// Record disks in an intermediate structure, which will be
 			// processed after all assets have been seen.
 			// processed after all assets have been seen.

+ 1 - 1
pkg/kubecost/window.go

@@ -456,7 +456,7 @@ func (w Window) Hours() float64 {
 }
 }
 
 
 func (w Window) IsEmpty() bool {
 func (w Window) IsEmpty() bool {
-	return !w.IsOpen() && w.end.Equal(*w.Start())
+	return w.start == nil && w.end == nil
 }
 }
 
 
 func (w Window) IsNegative() bool {
 func (w Window) IsNegative() bool {

Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio