Matt Bolt před 5 roky
rodič
revize
e9ae5f6366

+ 8 - 23
pkg/costmodel/clusters/clustermap.go

@@ -1,8 +1,8 @@
 package clusters
 
 import (
+	"context"
 	"fmt"
-	"math/rand"
 	"strings"
 	"sync"
 	"time"
@@ -10,6 +10,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
+	"github.com/kubecost/cost-model/pkg/util/retry"
 
 	prometheus "github.com/prometheus/client_golang/api"
 )
@@ -120,33 +121,17 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error)
 	}
 
 	// Execute Query
-	tryQuery := func() ([]*prom.QueryResult, prometheus.Warnings, error) {
+	tryQuery := func() (interface{}, error) {
 		ctx := prom.NewContext(pcm.client)
-		return ctx.QuerySync(clusterInfoQuery(offset))
+		r, _, e := ctx.QuerySync(clusterInfoQuery(offset))
+		return r, e
 	}
 
-	var qr []*prom.QueryResult
-	var err error
-
 	// Retry on failure
-	delay := LoadRetryDelay
-	for r := LoadRetries; r > 0; r-- {
-		qr, _, err = tryQuery()
-
-		// non-error breaks out of loop
-		if err == nil {
-			break
-		}
-
-		// wait the delay
-		time.Sleep(delay)
+	result, err := retry.Retry(context.Background(), tryQuery, uint(LoadRetries), LoadRetryDelay)
 
-		// add some random backoff
-		jitter := time.Duration(rand.Int63n(int64(delay)))
-		delay = delay + jitter/2
-	}
-
-	if err != nil {
+	qr, ok := result.([]*prom.QueryResult)
+	if !ok || err != nil {
 		return nil, err
 	}
 

+ 44 - 0
pkg/util/retry/retry.go

@@ -0,0 +1,44 @@
+package retry
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"time"
+)
+
+// RetryCancellationErr is the error type that's returned if the retry is cancelled
+var RetryCancellationErr error = fmt.Errorf("RetryCancellationErr")
+
+// IsRetryCancelledError returns true if the error was a cancellation
+func IsRetryCancelledError(err error) bool {
+	return err != nil && err.Error() == "RetryCancellationErr"
+}
+
+// Retry will run the f func until we receive a non error result up to the provided attempts or a cancellation.
+func Retry(ctx context.Context, f func() (interface{}, error), attempts uint, delay time.Duration) (interface{}, error) {
+	var result interface{}
+	var err error
+
+	d := delay
+	for r := attempts; r > 0; r-- {
+		select {
+		case <-ctx.Done():
+			return nil, RetryCancellationErr
+		default:
+		}
+
+		result, err = f()
+
+		if err == nil {
+			break
+		}
+
+		time.Sleep(d)
+
+		jitter := time.Duration(rand.Int63n(int64(d)))
+		d = d + jitter/2
+	}
+
+	return result, err
+}

+ 88 - 0
pkg/util/retry/retry_test.go

@@ -0,0 +1,88 @@
+package retry
+
+import (
+	"context"
+	"fmt"
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+func TestSuccessRetry(t *testing.T) {
+	const Expected uint64 = 3
+
+	var count uint64 = 0
+
+	f := func() (interface{}, error) {
+		c := atomic.AddUint64(&count, 1)
+		fmt.Println("Try:", c)
+
+		if c == Expected {
+			return struct{}{}, nil
+		}
+
+		return nil, fmt.Errorf("Failed: %d", c)
+	}
+
+	_, err := Retry(context.Background(), f, 5, time.Second)
+	if err != nil {
+		t.Fatalf("Unexpected error: %s", err)
+	}
+}
+
+func TestFailRetry(t *testing.T) {
+	const Expected uint64 = 5
+
+	expectedError := fmt.Sprintf("Failed: %d", Expected)
+	var count uint64 = 0
+
+	f := func() (interface{}, error) {
+		c := atomic.AddUint64(&count, 1)
+		fmt.Println("Try:", c)
+		return nil, fmt.Errorf("Failed: %d", c)
+	}
+
+	_, err := Retry(context.Background(), f, 5, time.Second)
+	if count != 5 {
+		t.Fatalf("Expected Count: %d, Actual: %d", Expected, count)
+	}
+
+	if err.Error() != expectedError {
+		t.Fatalf("Expected error: %s, Actual error: %s", expectedError, err.Error())
+	}
+}
+
+func TestCancelRetry(t *testing.T) {
+	const Expected uint64 = 5
+
+	var count uint64 = 0
+
+	f := func() (interface{}, error) {
+		c := atomic.AddUint64(&count, 1)
+		fmt.Println("Try:", c)
+		return nil, fmt.Errorf("Failed: %d", c)
+	}
+
+	wait := make(chan error)
+	ctx, cancel := context.WithCancel(context.Background())
+
+	// execute retry in go routine
+	go func() {
+		_, err := Retry(ctx, f, 5, time.Second)
+
+		wait <- err
+	}()
+
+	// cancel after 2 seconds
+	go func() {
+		time.Sleep(time.Second * 2)
+		cancel()
+	}()
+
+	// wait for error result
+	e := <-wait
+
+	if !IsRetryCancelledError(e) {
+		t.Fatalf("Expected CancellationError, got: %s", e)
+	}
+}