Răsfoiți Sursa

ETL Data Structures: NewAccumulation() (#1404)

* add AccumulateImmutable to set ranges for etl

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt 3 ani în urmă
părinte
comite
f3a3973098

+ 32 - 1
pkg/kubecost/allocation.go

@@ -93,7 +93,8 @@ type Allocation struct {
 // A2 Using 2 CPU      ----      -----      ----
 // A3 Using 1 CPU         ---       --
 // _______________________________________________
-//                   Time ---->
+//
+//	Time ---->
 //
 // The logical maximum CPU usage is 5, but this cannot be calculated iteratively,
 // which is how we calculate aggregations and accumulations of Allocations currently.
@@ -1988,6 +1989,36 @@ func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
 	return allocSet, nil
 }
 
+// NewAccumulation clones the first available AllocationSet to use as the data structure to
+// accumulate the remaining data. This leaves the original AllocationSetRange intact.
+func (asr *AllocationSetRange) NewAccumulation() (*AllocationSet, error) {
+	// NOTE: Adding this API for consistency across SummaryAllocation and Assets, but this
+	// NOTE: implementation is almost identical to regular Accumulate(). The accumulate() method
+	// NOTE: for Allocation returns Clone() of the input, which is required for AccumulateBy
+	// NOTE: support (unit tests are great for verifying this information).
+	var allocSet *AllocationSet
+	var err error
+
+	for _, as := range asr.Allocations {
+		if allocSet == nil {
+			allocSet = as.Clone()
+			continue
+		}
+
+		var allocSetCopy *AllocationSet = nil
+		if as != nil {
+			allocSetCopy = as.Clone()
+		}
+
+		allocSet, err = allocSet.accumulate(allocSetCopy)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return allocSet, nil
+}
+
 // AccumulateBy sums AllocationSets based on the resolution given. The resolution given is subject to the scale used for the AllocationSets.
 // Resolutions not evenly divisible by the AllocationSetRange window durations accumulate sets until a sum greater than or equal to the resolution is met,
 // at which point AccumulateBy will start summing from 0 until the requested resolution is met again.

+ 42 - 0
pkg/kubecost/allocation_test.go

@@ -1614,6 +1614,48 @@ func TestAllocationSet_insertMatchingWindow(t *testing.T) {
 // TODO niko/etl
 //func TestNewAllocationSetRange(t *testing.T) {}
 
+func TestAllocationSetRange_AccumulateRepeat(t *testing.T) {
+	ago2d := time.Now().UTC().Truncate(day).Add(-2 * day)
+	yesterday := time.Now().UTC().Truncate(day).Add(-day)
+	today := time.Now().UTC().Truncate(day)
+	tomorrow := time.Now().UTC().Truncate(day).Add(day)
+
+	a := GenerateMockAllocationSet(ago2d)
+	b := GenerateMockAllocationSet(yesterday)
+	c := GenerateMockAllocationSet(today)
+	d := GenerateMockAllocationSet(tomorrow)
+
+	asr := NewAllocationSetRange(a, b, c, d)
+
+	// Take Total Cost
+	totalCost := asr.TotalCost()
+
+	// NewAccumulation does not mutate
+	result, err := asr.NewAccumulation()
+	if err != nil {
+		t.Fatal(err)
+	}
+	asr2 := NewAllocationSetRange(result)
+
+	// Ensure Costs Match
+	if totalCost != asr2.TotalCost() {
+		t.Fatalf("Accumulated Total Cost does not match original Total Cost")
+	}
+
+	// Next NewAccumulation() call should prove that there is no mutation of inner data
+	result, err = asr.NewAccumulation()
+	if err != nil {
+		t.Fatal(err)
+	}
+	asr3 := NewAllocationSetRange(result)
+
+	// Costs should be correct, as multiple calls to NewAccumulation() should not alter
+	// the internals of the AllocationSetRange
+	if totalCost != asr3.TotalCost() {
+		t.Fatalf("Accumulated Total Cost does not match original Total Cost. %f != %f", totalCost, asr3.TotalCost())
+	}
+}
+
 func TestAllocationSetRange_Accumulate(t *testing.T) {
 	ago2d := time.Now().UTC().Truncate(day).Add(-2 * day)
 	yesterday := time.Now().UTC().Truncate(day).Add(-day)

+ 93 - 51
pkg/kubecost/asset.go

@@ -68,63 +68,65 @@ type Asset interface {
 // to Asset label. For example, consider this asset:
 //
 // CURRENT: Asset ETL stores its data ALREADY MAPPED from label to k8s concept. This isn't ideal-- see the TOOD.
-//   Cloud {
-// 	   TotalCost: 10.00,
-// 	   Labels{
-//       "kubernetes_namespace":"monitoring",
-// 	     "env":"prod"
-// 	   }
-//   }
+//
+//	  Cloud {
+//		   TotalCost: 10.00,
+//		   Labels{
+//	      "kubernetes_namespace":"monitoring",
+//		     "env":"prod"
+//		   }
+//	  }
 //
 // Given the following parameters, we expect to return:
 //
-//   1) single-prop full match
-//   aggregateBy = ["namespace"]
-//   => Allocation{Name: "monitoring", ExternalCost: 10.00, TotalCost: 10.00}, nil
+//  1. single-prop full match
+//     aggregateBy = ["namespace"]
+//     => Allocation{Name: "monitoring", ExternalCost: 10.00, TotalCost: 10.00}, nil
 //
-//   2) multi-prop full match
-//   aggregateBy = ["namespace", "label:env"]
-//   allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
-//   => Allocation{Name: "monitoring/env=prod", ExternalCost: 10.00, TotalCost: 10.00}, nil
+//  2. multi-prop full match
+//     aggregateBy = ["namespace", "label:env"]
+//     allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
+//     => Allocation{Name: "monitoring/env=prod", ExternalCost: 10.00, TotalCost: 10.00}, nil
 //
-//   3) multi-prop partial match
-//   aggregateBy = ["namespace", "label:foo"]
-//   => Allocation{Name: "monitoring/__unallocated__", ExternalCost: 10.00, TotalCost: 10.00}, nil
+//  3. multi-prop partial match
+//     aggregateBy = ["namespace", "label:foo"]
+//     => Allocation{Name: "monitoring/__unallocated__", ExternalCost: 10.00, TotalCost: 10.00}, nil
 //
-//   4) no match
-//   aggregateBy = ["cluster"]
-//   => nil, err
+//  4. no match
+//     aggregateBy = ["cluster"]
+//     => nil, err
 //
 // TODO:
-//   Cloud {
-// 	   TotalCost: 10.00,
-// 	   Labels{
-//       "kubernetes_namespace":"monitoring",
-// 	     "env":"prod"
-// 	   }
-//   }
+//
+//	  Cloud {
+//		   TotalCost: 10.00,
+//		   Labels{
+//	      "kubernetes_namespace":"monitoring",
+//		     "env":"prod"
+//		   }
+//	  }
 //
 // Given the following parameters, we expect to return:
 //
-//   1) single-prop full match
-//   aggregateBy = ["namespace"]
-//   allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
-//   => Allocation{Name: "monitoring", ExternalCost: 10.00, TotalCost: 10.00}, nil
+//  1. single-prop full match
+//     aggregateBy = ["namespace"]
+//     allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
+//     => Allocation{Name: "monitoring", ExternalCost: 10.00, TotalCost: 10.00}, nil
 //
-//   2) multi-prop full match
-//   aggregateBy = ["namespace", "label:env"]
-//   allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
-//   => Allocation{Name: "monitoring/env=prod", ExternalCost: 10.00, TotalCost: 10.00}, nil
+//  2. multi-prop full match
+//     aggregateBy = ["namespace", "label:env"]
+//     allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
+//     => Allocation{Name: "monitoring/env=prod", ExternalCost: 10.00, TotalCost: 10.00}, nil
 //
-//   3) multi-prop partial match
-//   aggregateBy = ["namespace", "label:foo"]
-//   allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
-//   => Allocation{Name: "monitoring/__unallocated__", ExternalCost: 10.00, TotalCost: 10.00}, nil
+//  3. multi-prop partial match
+//     aggregateBy = ["namespace", "label:foo"]
+//     allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
+//     => Allocation{Name: "monitoring/__unallocated__", ExternalCost: 10.00, TotalCost: 10.00}, nil
 //
-//   4) no match
-//   aggregateBy = ["cluster"]
-//   allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
-//   => nil, err
+//  4. no match
+//     aggregateBy = ["cluster"]
+//     allocationPropertyLabels = {"namespace":"kubernetes_namespace"}
+//     => nil, err
 //
 // (See asset_test.go for assertions of these examples and more.)
 func AssetToExternalAllocation(asset Asset, aggregateBy []string, labelConfig *LabelConfig) (*Allocation, error) {
@@ -1339,11 +1341,14 @@ func (d *Disk) String() string {
 // hours running; e.g. the sum of a 100GiB disk running for the first 10 hours
 // and a 30GiB disk running for the last 20 hours of the same 24-hour window
 // would produce:
-//   (100*10 + 30*20) / 24 = 66.667GiB
+//
+//	(100*10 + 30*20) / 24 = 66.667GiB
+//
 // However, any number of disks running for the full span of a window will
 // report the actual number of bytes of the static disk; e.g. the above
 // scenario for one entire 24-hour window:
-//   (100*24 + 30*24) / 24 = (100 + 30) = 130GiB
+//
+//	(100*24 + 30*24) / 24 = (100 + 30) = 130GiB
 func (d *Disk) Bytes() float64 {
 	// [b*hr]*([min/hr]*[1/min]) = [b*hr]/[hr] = b
 	return d.ByteHours * (60.0 / d.Minutes())
@@ -1993,11 +1998,14 @@ func (n *Node) IsPreemptible() bool {
 // hours running; e.g. the sum of a 4-core node running for the first 10 hours
 // and a 3-core node running for the last 20 hours of the same 24-hour window
 // would produce:
-//   (4*10 + 3*20) / 24 = 4.167 cores
+//
+//	(4*10 + 3*20) / 24 = 4.167 cores
+//
 // However, any number of cores running for the full span of a window will
 // report the actual number of cores of the static node; e.g. the above
 // scenario for one entire 24-hour window:
-//   (4*24 + 3*24) / 24 = (4 + 3) = 7 cores
+//
+//	(4*24 + 3*24) / 24 = (4 + 3) = 7 cores
 func (n *Node) CPUCores() float64 {
 	// [core*hr]*([min/hr]*[1/min]) = [core*hr]/[hr] = core
 	return n.CPUCoreHours * (60.0 / n.Minutes())
@@ -2008,11 +2016,14 @@ func (n *Node) CPUCores() float64 {
 // hours running; e.g. the sum of a 12GiB-RAM node running for the first 10 hours
 // and a 16GiB-RAM node running for the last 20 hours of the same 24-hour window
 // would produce:
-//   (12*10 + 16*20) / 24 = 18.333GiB RAM
+//
+//	(12*10 + 16*20) / 24 = 18.333GiB RAM
+//
 // However, any number of bytes running for the full span of a window will
 // report the actual number of bytes of the static node; e.g. the above
 // scenario for one entire 24-hour window:
-//   (12*24 + 16*24) / 24 = (12 + 16) = 28GiB RAM
+//
+//	(12*24 + 16*24) / 24 = (12 + 16) = 28GiB RAM
 func (n *Node) RAMBytes() float64 {
 	// [b*hr]*([min/hr]*[1/min]) = [b*hr]/[hr] = b
 	return n.RAMByteHours * (60.0 / n.Minutes())
@@ -2023,11 +2034,14 @@ func (n *Node) RAMBytes() float64 {
 // hours running; e.g. the sum of a 2 gpu node running for the first 10 hours
 // and a 1 gpu node running for the last 20 hours of the same 24-hour window
 // would produce:
-//   (2*10 + 1*20) / 24 = 1.667 GPUs
+//
+//	(2*10 + 1*20) / 24 = 1.667 GPUs
+//
 // However, any number of GPUs running for the full span of a window will
 // report the actual number of GPUs of the static node; e.g. the above
 // scenario for one entire 24-hour window:
-//   (2*24 + 1*24) / 24 = (2 + 1) = 3 GPUs
+//
+//	(2*24 + 1*24) / 24 = (2 + 1) = 3 GPUs
 func (n *Node) GPUs() float64 {
 	// [b*hr]*([min/hr]*[1/min]) = [b*hr]/[hr] = b
 	return n.GPUHours * (60.0 / n.Minutes())
@@ -3113,6 +3127,34 @@ func (asr *AssetSetRange) Accumulate() (*AssetSet, error) {
 	return assetSet, nil
 }
 
+// NewAccumulation clones the first available AssetSet to use as the data structure to
+// accumulate the remaining data. This leaves the original AssetSetRange intact.
+func (asr *AssetSetRange) NewAccumulation() (*AssetSet, error) {
+	var assetSet *AssetSet
+	var err error
+
+	for _, as := range asr.Assets {
+		if assetSet == nil {
+			assetSet = as.Clone()
+			continue
+		}
+
+		// copy if non-nil
+		var assetSetCopy *AssetSet = nil
+		if as != nil {
+			assetSetCopy = as.Clone()
+		}
+
+		// nil is acceptable to pass to accumulate
+		assetSet, err = assetSet.accumulate(assetSetCopy)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return assetSet, nil
+}
+
 type AssetAggregationOptions struct {
 	SharedHourlyCosts map[string]float64
 	FilterFuncs       []AssetMatchFunc

+ 33 - 0
pkg/kubecost/summaryallocation.go

@@ -1315,6 +1315,39 @@ func (sasr *SummaryAllocationSetRange) Accumulate() (*SummaryAllocationSet, erro
 	return result, nil
 }
 
+// NewAccumulation clones the first available SummaryAllocationSet to use as the data structure to
+// accumulate the remaining data. This leaves the original SummaryAllocationSetRange intact.
+func (sasr *SummaryAllocationSetRange) NewAccumulation() (*SummaryAllocationSet, error) {
+	var result *SummaryAllocationSet
+	var err error
+
+	sasr.RLock()
+	defer sasr.RUnlock()
+
+	for _, sas := range sasr.SummaryAllocationSets {
+		// we want to clone the first summary allocation set, then just Add the others
+		// to the clone. We will eventually use the clone to create the set range.
+		if result == nil {
+			result = sas.Clone()
+			continue
+		}
+
+		// Copy if sas is non-nil
+		var sasCopy *SummaryAllocationSet = nil
+		if sas != nil {
+			sasCopy = sas.Clone()
+		}
+
+		// nil is ok to pass into Add
+		result, err = result.Add(sasCopy)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return result, nil
+}
+
 // AggregateBy aggregates each AllocationSet in the range by the given
 // properties and options.
 func (sasr *SummaryAllocationSetRange) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {