فهرست منبع

Merge branch 'develop' into develop

Ajay Tripathy 3 سال پیش
والد
کامیت
47472ca9fb
8فایلهای تغییر یافته به همراه99 افزوده شده و 103 حذف شده
  1. 4 1
      .gitignore
  2. 1 1
      go.mod
  3. 7 7
      pkg/kubecost/allocation.go
  4. 75 0
      pkg/kubecost/allocation_test.go
  5. 5 7
      pkg/prom/prom.go
  6. 0 42
      pkg/util/atomic/atomicbool.go
  7. 0 37
      pkg/util/atomic/atomicint.go
  8. 7 8
      pkg/util/worker/worker.go

+ 4 - 1
.gitignore

@@ -1,5 +1,8 @@
+# Jetbrains project files
 .idea
 .idea
+*.iml
+
 ui/.cache
 ui/.cache
 ui/dist
 ui/dist
 ui/node_modules/
 ui/node_modules/
-cmd/costmodel/costmodel
+cmd/costmodel/costmodel

+ 1 - 1
go.mod

@@ -140,4 +140,4 @@ require (
 	sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
 	sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
 )
 )
 
 
-go 1.18
+go 1.19

+ 7 - 7
pkg/kubecost/allocation.go

@@ -1914,7 +1914,7 @@ func (as *AllocationSet) UTCOffset() time.Duration {
 	return time.Duration(zone) * time.Second
 	return time.Duration(zone) * time.Second
 }
 }
 
 
-func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error) {
+func (as *AllocationSet) Accumulate(that *AllocationSet) (*AllocationSet, error) {
 	if as.IsEmpty() {
 	if as.IsEmpty() {
 		return that.Clone(), nil
 		return that.Clone(), nil
 	}
 	}
@@ -1985,7 +1985,7 @@ func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
 	var err error
 	var err error
 
 
 	for _, as := range asr.Allocations {
 	for _, as := range asr.Allocations {
-		allocSet, err = allocSet.accumulate(as)
+		allocSet, err = allocSet.Accumulate(as)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
@@ -1995,10 +1995,10 @@ func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
 }
 }
 
 
 // NewAccumulation clones the first available AllocationSet to use as the data structure to
 // NewAccumulation clones the first available AllocationSet to use as the data structure to
-// accumulate the remaining data. This leaves the original AllocationSetRange intact.
+// Accumulate the remaining data. This leaves the original AllocationSetRange intact.
 func (asr *AllocationSetRange) NewAccumulation() (*AllocationSet, error) {
 func (asr *AllocationSetRange) NewAccumulation() (*AllocationSet, error) {
 	// NOTE: Adding this API for consistency across SummaryAllocation and Assets, but this
 	// NOTE: Adding this API for consistency across SummaryAllocation and Assets, but this
-	// NOTE: implementation is almost identical to regular Accumulate(). The accumulate() method
+	// NOTE: implementation is almost identical to regular Accumulate(). The Accumulate() method
 	// NOTE: for Allocation returns Clone() of the input, which is required for AccumulateBy
 	// NOTE: for Allocation returns Clone() of the input, which is required for AccumulateBy
 	// NOTE: support (unit tests are great for verifying this information).
 	// NOTE: support (unit tests are great for verifying this information).
 	var allocSet *AllocationSet
 	var allocSet *AllocationSet
@@ -2015,7 +2015,7 @@ func (asr *AllocationSetRange) NewAccumulation() (*AllocationSet, error) {
 			allocSetCopy = as.Clone()
 			allocSetCopy = as.Clone()
 		}
 		}
 
 
-		allocSet, err = allocSet.accumulate(allocSetCopy)
+		allocSet, err = allocSet.Accumulate(allocSetCopy)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
@@ -2025,7 +2025,7 @@ func (asr *AllocationSetRange) NewAccumulation() (*AllocationSet, error) {
 }
 }
 
 
 // AccumulateBy sums AllocationSets based on the resolution given. The resolution given is subject to the scale used for the AllocationSets.
 // AccumulateBy sums AllocationSets based on the resolution given. The resolution given is subject to the scale used for the AllocationSets.
-// Resolutions not evenly divisible by the AllocationSetRange window durations accumulate sets until a sum greater than or equal to the resolution is met,
+// Resolutions not evenly divisible by the AllocationSetRange window durations Accumulate sets until a sum greater than or equal to the resolution is met,
 // at which point AccumulateBy will start summing from 0 until the requested resolution is met again.
 // at which point AccumulateBy will start summing from 0 until the requested resolution is met again.
 // If the requested resolution is smaller than the window of an AllocationSet then the resolution will default to the duration of a set.
 // If the requested resolution is smaller than the window of an AllocationSet then the resolution will default to the duration of a set.
 // Resolutions larger than the duration of the entire AllocationSetRange will default to the duration of the range.
 // Resolutions larger than the duration of the entire AllocationSetRange will default to the duration of the range.
@@ -2035,7 +2035,7 @@ func (asr *AllocationSetRange) AccumulateBy(resolution time.Duration) (*Allocati
 	var err error
 	var err error
 
 
 	for i, as := range asr.Allocations {
 	for i, as := range asr.Allocations {
-		allocSet, err = allocSet.accumulate(as)
+		allocSet, err = allocSet.Accumulate(as)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}

+ 75 - 0
pkg/kubecost/allocation_test.go

@@ -2657,3 +2657,78 @@ func TestAllocationSetRange_Minutes(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate(t *testing.T) {
+
+	today := time.Now().Round(day)
+	start := today.AddDate(0, 0, -4)
+
+	var allocationSets []*AllocationSet
+
+	for i := 0; i < 4; i++ {
+		allocationSets = append(allocationSets, GenerateMockAllocationSet(start))
+		start = start.AddDate(0, 0, 1)
+	}
+
+	var originalAllocationSets []*AllocationSet
+
+	for _, as := range allocationSets {
+		originalAllocationSets = append(originalAllocationSets, as.Clone())
+	}
+
+	asr := NewAllocationSetRange()
+	for _, as := range allocationSets {
+		asr.Append(as.Clone())
+	}
+
+	expected, err := asr.Accumulate()
+	if err != nil {
+		t.Errorf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: AllocationSetRange.Accumulate() returned an error\n")
+	}
+
+	var got *AllocationSet
+
+	for i := 0; i < len(allocationSets); i++ {
+		got, err = got.Accumulate(allocationSets[i])
+		if err != nil {
+			t.Errorf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: got.Accumulate(allocationSets[%d]) returned an error\n", i)
+		}
+	}
+
+	// compare the got and expected Allocation sets, ensure that they match
+	if len(got.Allocations) != len(expected.Allocations) {
+		t.Fatalf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: length of got.Allocations does not match length of expected.Allocations\n")
+	}
+	for key, a := range got.Allocations {
+		if _, ok := expected.Allocations[key]; !ok {
+			t.Fatalf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: got.Allocations[%s] not found in expected.Allocations\n", key)
+		}
+
+		if !a.Equal(expected.Allocations[key]) {
+			t.Fatalf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: got.Allocations[%s] did not match expected.Allocations[%[1]s]", key)
+		}
+	}
+
+	if len(got.ExternalKeys) != len(expected.ExternalKeys) {
+		t.Fatalf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: length of got.ExternalKeys does not match length of expected.ExternalKeys\n")
+	}
+
+	if len(got.IdleKeys) != len(expected.IdleKeys) {
+		t.Fatalf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: length of got.IdleKeys does not match length of expected.IdleKeys\n")
+	}
+
+	if !got.Window.Start().UTC().Equal(expected.Window.Start().UTC()) {
+		t.Fatalf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: Window.start: got:%s, expected:%s\n", got.Window.Start(), expected.Window.Start())
+	}
+	if !got.Window.End().UTC().Equal(expected.Window.End().UTC()) {
+		t.Fatalf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: Window.end: got:%s, expected:%s\n", got.Window.End(), expected.Window.End())
+	}
+
+	for i := range allocationSets {
+		for key, allocation := range allocationSets[i].Allocations {
+			if !allocation.Equal(originalAllocationSets[i].Allocations[key]) {
+				t.Fatalf("TestAllocationSet_Accumulate_Equals_AllocationSetRange_Accumulate: allocationSet has been mutated in Accumulate; allocationSet: %d, allocation: %s\n", i, key)
+			}
+		}
+	}
+}

+ 5 - 7
pkg/prom/prom.go

@@ -9,11 +9,11 @@ import (
 	"net/url"
 	"net/url"
 	"os"
 	"os"
 	"strings"
 	"strings"
+	"sync/atomic"
 	"time"
 	"time"
 
 
 	"github.com/opencost/opencost/pkg/collections"
 	"github.com/opencost/opencost/pkg/collections"
 	"github.com/opencost/opencost/pkg/log"
 	"github.com/opencost/opencost/pkg/log"
-	"github.com/opencost/opencost/pkg/util/atomic"
 	"github.com/opencost/opencost/pkg/util/fileutil"
 	"github.com/opencost/opencost/pkg/util/fileutil"
 	"github.com/opencost/opencost/pkg/util/httputil"
 	"github.com/opencost/opencost/pkg/util/httputil"
 
 
@@ -117,7 +117,7 @@ type RateLimitedPrometheusClient struct {
 	queue          collections.BlockingQueue[*workRequest]
 	queue          collections.BlockingQueue[*workRequest]
 	decorator      QueryParamsDecorator
 	decorator      QueryParamsDecorator
 	rateLimitRetry *RateLimitRetryOpts
 	rateLimitRetry *RateLimitRetryOpts
-	outbound       *atomic.AtomicInt32
+	outbound       atomic.Int32
 	fileLogger     *golog.Logger
 	fileLogger     *golog.Logger
 }
 }
 
 
@@ -140,7 +140,6 @@ func NewRateLimitedClient(
 	queryLogFile string) (prometheus.Client, error) {
 	queryLogFile string) (prometheus.Client, error) {
 
 
 	queue := collections.NewBlockingQueue[*workRequest]()
 	queue := collections.NewBlockingQueue[*workRequest]()
-	outbound := atomic.NewAtomicInt32(0)
 
 
 	var logger *golog.Logger
 	var logger *golog.Logger
 	if queryLogFile != "" {
 	if queryLogFile != "" {
@@ -172,7 +171,6 @@ func NewRateLimitedClient(
 		queue:          queue,
 		queue:          queue,
 		decorator:      decorator,
 		decorator:      decorator,
 		rateLimitRetry: rateLimitRetryOpts,
 		rateLimitRetry: rateLimitRetryOpts,
-		outbound:       outbound,
 		auth:           auth,
 		auth:           auth,
 		fileLogger:     logger,
 		fileLogger:     logger,
 	}
 	}
@@ -199,7 +197,7 @@ func (rlpc *RateLimitedPrometheusClient) TotalQueuedRequests() int {
 // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
 // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
 // sent to the server and are awaiting response.
 // sent to the server and are awaiting response.
 func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
 func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
-	return int(rlpc.outbound.Get())
+	return int(rlpc.outbound.Load())
 }
 }
 
 
 // Passthrough to the prometheus client API
 // Passthrough to the prometheus client API
@@ -254,7 +252,7 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 		timeInQueue := time.Since(we.start)
 		timeInQueue := time.Since(we.start)
 
 
 		// Increment outbound counter
 		// Increment outbound counter
-		rlpc.outbound.Increment()
+		rlpc.outbound.Add(1)
 
 
 		// Execute Request
 		// Execute Request
 		roundTripStart := time.Now()
 		roundTripStart := time.Now()
@@ -298,7 +296,7 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 		}
 		}
 
 
 		// Decrement outbound counter
 		// Decrement outbound counter
-		rlpc.outbound.Decrement()
+		rlpc.outbound.Add(-1)
 		LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
 		LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
 
 
 		// Pass back response data over channel to caller
 		// Pass back response data over channel to caller

+ 0 - 42
pkg/util/atomic/atomicbool.go

@@ -1,42 +0,0 @@
-package atomic
-
-import (
-	"sync/atomic"
-)
-
-// AtomicBool alias leverages a 32-bit integer CAS
-type AtomicBool int32
-
-// NewAtomicBool creates an AtomicBool with given default value
-func NewAtomicBool(value bool) *AtomicBool {
-	ab := new(AtomicBool)
-	ab.Set(value)
-	return ab
-}
-
-// Loads the bool value atomically
-func (ab *AtomicBool) Get() bool {
-	return atomic.LoadInt32((*int32)(ab)) != 0
-}
-
-// Sets the bool value atomically
-func (ab *AtomicBool) Set(value bool) {
-	if value {
-		atomic.StoreInt32((*int32)(ab), 1)
-	} else {
-		atomic.StoreInt32((*int32)(ab), 0)
-	}
-}
-
-// CompareAndSet sets value to new if current is equal to the current value. If the new value is
-// set, this function returns true.
-func (ab *AtomicBool) CompareAndSet(current, new bool) bool {
-	var o, n int32
-	if current {
-		o = 1
-	}
-	if new {
-		n = 1
-	}
-	return atomic.CompareAndSwapInt32((*int32)(ab), o, n)
-}

+ 0 - 37
pkg/util/atomic/atomicint.go

@@ -1,37 +0,0 @@
-package atomic
-
-import "sync/atomic"
-
-type AtomicInt32 int32
-
-// NewAtomicInt32 creates a new atomic int32 instance.
-func NewAtomicInt32(value int32) *AtomicInt32 {
-	ai := new(AtomicInt32)
-	ai.Set(value)
-	return ai
-}
-
-// Loads the int32 value atomically
-func (ai *AtomicInt32) Get() int32 {
-	return atomic.LoadInt32((*int32)(ai))
-}
-
-// Sets the int32 value atomically
-func (ai *AtomicInt32) Set(value int32) {
-	atomic.StoreInt32((*int32)(ai), value)
-}
-
-// Increments the atomic int and returns the new value
-func (ai *AtomicInt32) Increment() int32 {
-	return atomic.AddInt32((*int32)(ai), 1)
-}
-
-// Decrements the atomint int and returns the new value
-func (ai *AtomicInt32) Decrement() int32 {
-	return atomic.AddInt32((*int32)(ai), -1)
-}
-
-// CompareAndSet sets value to new if current is equal to the current value
-func (ai *AtomicInt32) CompareAndSet(current, new int32) bool {
-	return atomic.CompareAndSwapInt32((*int32)(ai), current, new)
-}

+ 7 - 8
pkg/util/worker/worker.go

@@ -4,9 +4,9 @@ import (
 	"fmt"
 	"fmt"
 	"runtime"
 	"runtime"
 	"sync"
 	"sync"
+	"sync/atomic"
 
 
 	"github.com/opencost/opencost/pkg/collections"
 	"github.com/opencost/opencost/pkg/collections"
-	"github.com/opencost/opencost/pkg/util/atomic"
 )
 )
 
 
 // Worker is a transformation function from input type T to output type U.
 // Worker is a transformation function from input type T to output type U.
@@ -46,7 +46,7 @@ type queuedWorkerPool[T any, U any] struct {
 	queue      collections.BlockingQueue[entry[T, U]]
 	queue      collections.BlockingQueue[entry[T, U]]
 	work       Worker[T, U]
 	work       Worker[T, U]
 	workers    int
 	workers    int
-	isShutdown *atomic.AtomicBool
+	isShutdown atomic.Bool
 }
 }
 
 
 // ordered is a WorkGroup implementation which enforces ordering based on when
 // ordered is a WorkGroup implementation which enforces ordering based on when
@@ -62,10 +62,9 @@ type ordered[T any, U any] struct {
 // func used to transform inputs to outputs.
 // func used to transform inputs to outputs.
 func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
 func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
 	owq := &queuedWorkerPool[T, U]{
 	owq := &queuedWorkerPool[T, U]{
-		workers:    workers,
-		work:       work,
-		queue:      collections.NewBlockingQueue[entry[T, U]](),
-		isShutdown: atomic.NewAtomicBool(false),
+		workers: workers,
+		work:    work,
+		queue:   collections.NewBlockingQueue[entry[T, U]](),
 	}
 	}
 
 
 	// startup the designated workers
 	// startup the designated workers
@@ -80,7 +79,7 @@ func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U
 // to get the results. An error is returned if the pool is shutdown, or is in the process
 // to get the results. An error is returned if the pool is shutdown, or is in the process
 // of shutting down.
 // of shutting down.
 func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
 func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
-	if wq.isShutdown.Get() {
+	if wq.isShutdown.Load() {
 		return fmt.Errorf("WorkerPoolShutdown")
 		return fmt.Errorf("WorkerPoolShutdown")
 	}
 	}
 
 
@@ -95,7 +94,7 @@ func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
 
 
 // Shutdown stops all of the workers (if running).
 // Shutdown stops all of the workers (if running).
 func (wq *queuedWorkerPool[T, U]) Shutdown() {
 func (wq *queuedWorkerPool[T, U]) Shutdown() {
-	if !wq.isShutdown.CompareAndSet(false, true) {
+	if !wq.isShutdown.CompareAndSwap(false, true) {
 		return
 		return
 	}
 	}