Browse Source

Merge pull request #1418 from opencost/bolt/go1.19-upgrade

Go 1.19 Upgrade - Atomics Update
Matt Bolt 3 years ago
parent
commit
357ee6afce
5 changed files with 13 additions and 95 deletions
  1. 1 1
      go.mod
  2. 5 7
      pkg/prom/prom.go
  3. 0 42
      pkg/util/atomic/atomicbool.go
  4. 0 37
      pkg/util/atomic/atomicint.go
  5. 7 8
      pkg/util/worker/worker.go

+ 1 - 1
go.mod

@@ -140,4 +140,4 @@ require (
 	sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
 )
 
-go 1.18
+go 1.19

+ 5 - 7
pkg/prom/prom.go

@@ -9,11 +9,11 @@ import (
 	"net/url"
 	"os"
 	"strings"
+	"sync/atomic"
 	"time"
 
 	"github.com/opencost/opencost/pkg/collections"
 	"github.com/opencost/opencost/pkg/log"
-	"github.com/opencost/opencost/pkg/util/atomic"
 	"github.com/opencost/opencost/pkg/util/fileutil"
 	"github.com/opencost/opencost/pkg/util/httputil"
 
@@ -117,7 +117,7 @@ type RateLimitedPrometheusClient struct {
 	queue          collections.BlockingQueue[*workRequest]
 	decorator      QueryParamsDecorator
 	rateLimitRetry *RateLimitRetryOpts
-	outbound       *atomic.AtomicInt32
+	outbound       atomic.Int32
 	fileLogger     *golog.Logger
 }
 
@@ -140,7 +140,6 @@ func NewRateLimitedClient(
 	queryLogFile string) (prometheus.Client, error) {
 
 	queue := collections.NewBlockingQueue[*workRequest]()
-	outbound := atomic.NewAtomicInt32(0)
 
 	var logger *golog.Logger
 	if queryLogFile != "" {
@@ -172,7 +171,6 @@ func NewRateLimitedClient(
 		queue:          queue,
 		decorator:      decorator,
 		rateLimitRetry: rateLimitRetryOpts,
-		outbound:       outbound,
 		auth:           auth,
 		fileLogger:     logger,
 	}
@@ -199,7 +197,7 @@ func (rlpc *RateLimitedPrometheusClient) TotalQueuedRequests() int {
 // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
 // sent to the server and are awaiting response.
 func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
-	return int(rlpc.outbound.Get())
+	return int(rlpc.outbound.Load())
 }
 
 // Passthrough to the prometheus client API
@@ -254,7 +252,7 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 		timeInQueue := time.Since(we.start)
 
 		// Increment outbound counter
-		rlpc.outbound.Increment()
+		rlpc.outbound.Add(1)
 
 		// Execute Request
 		roundTripStart := time.Now()
@@ -298,7 +296,7 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 		}
 
 		// Decrement outbound counter
-		rlpc.outbound.Decrement()
+		rlpc.outbound.Add(-1)
 		LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
 
 		// Pass back response data over channel to caller

+ 0 - 42
pkg/util/atomic/atomicbool.go

@@ -1,42 +0,0 @@
-package atomic
-
-import (
-	"sync/atomic"
-)
-
-// AtomicBool alias leverages a 32-bit integer CAS
-type AtomicBool int32
-
-// NewAtomicBool creates an AtomicBool with given default value
-func NewAtomicBool(value bool) *AtomicBool {
-	ab := new(AtomicBool)
-	ab.Set(value)
-	return ab
-}
-
-// Loads the bool value atomically
-func (ab *AtomicBool) Get() bool {
-	return atomic.LoadInt32((*int32)(ab)) != 0
-}
-
-// Sets the bool value atomically
-func (ab *AtomicBool) Set(value bool) {
-	if value {
-		atomic.StoreInt32((*int32)(ab), 1)
-	} else {
-		atomic.StoreInt32((*int32)(ab), 0)
-	}
-}
-
-// CompareAndSet sets value to new if current is equal to the current value. If the new value is
-// set, this function returns true.
-func (ab *AtomicBool) CompareAndSet(current, new bool) bool {
-	var o, n int32
-	if current {
-		o = 1
-	}
-	if new {
-		n = 1
-	}
-	return atomic.CompareAndSwapInt32((*int32)(ab), o, n)
-}

+ 0 - 37
pkg/util/atomic/atomicint.go

@@ -1,37 +0,0 @@
-package atomic
-
-import "sync/atomic"
-
-type AtomicInt32 int32
-
-// NewAtomicInt32 creates a new atomic int32 instance.
-func NewAtomicInt32(value int32) *AtomicInt32 {
-	ai := new(AtomicInt32)
-	ai.Set(value)
-	return ai
-}
-
-// Loads the int32 value atomically
-func (ai *AtomicInt32) Get() int32 {
-	return atomic.LoadInt32((*int32)(ai))
-}
-
-// Sets the int32 value atomically
-func (ai *AtomicInt32) Set(value int32) {
-	atomic.StoreInt32((*int32)(ai), value)
-}
-
-// Increments the atomic int and returns the new value
-func (ai *AtomicInt32) Increment() int32 {
-	return atomic.AddInt32((*int32)(ai), 1)
-}
-
-// Decrements the atomint int and returns the new value
-func (ai *AtomicInt32) Decrement() int32 {
-	return atomic.AddInt32((*int32)(ai), -1)
-}
-
-// CompareAndSet sets value to new if current is equal to the current value
-func (ai *AtomicInt32) CompareAndSet(current, new int32) bool {
-	return atomic.CompareAndSwapInt32((*int32)(ai), current, new)
-}

+ 7 - 8
pkg/util/worker/worker.go

@@ -4,9 +4,9 @@ import (
 	"fmt"
 	"runtime"
 	"sync"
+	"sync/atomic"
 
 	"github.com/opencost/opencost/pkg/collections"
-	"github.com/opencost/opencost/pkg/util/atomic"
 )
 
 // Worker is a transformation function from input type T to output type U.
@@ -46,7 +46,7 @@ type queuedWorkerPool[T any, U any] struct {
 	queue      collections.BlockingQueue[entry[T, U]]
 	work       Worker[T, U]
 	workers    int
-	isShutdown *atomic.AtomicBool
+	isShutdown atomic.Bool
 }
 
 // ordered is a WorkGroup implementation which enforces ordering based on when
@@ -62,10 +62,9 @@ type ordered[T any, U any] struct {
 // func used to transform inputs to outputs.
 func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U] {
 	owq := &queuedWorkerPool[T, U]{
-		workers:    workers,
-		work:       work,
-		queue:      collections.NewBlockingQueue[entry[T, U]](),
-		isShutdown: atomic.NewAtomicBool(false),
+		workers: workers,
+		work:    work,
+		queue:   collections.NewBlockingQueue[entry[T, U]](),
 	}
 
 	// startup the designated workers
@@ -80,7 +79,7 @@ func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U
 // to get the results. An error is returned if the pool is shutdown, or is in the process
 // of shutting down.
 func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
-	if wq.isShutdown.Get() {
+	if wq.isShutdown.Load() {
 		return fmt.Errorf("WorkerPoolShutdown")
 	}
 
@@ -95,7 +94,7 @@ func (wq *queuedWorkerPool[T, U]) Run(input T, onComplete chan<- U) error {
 
 // Shutdown stops all of the workers (if running).
 func (wq *queuedWorkerPool[T, U]) Shutdown() {
-	if !wq.isShutdown.CompareAndSet(false, true) {
+	if !wq.isShutdown.CompareAndSwap(false, true) {
 		return
 	}