Ver Fonte

Merge pull request #470 from kubecost/develop

Merge develop into master
Ajay Tripathy há 5 anos atrás
pai
commit
2273fa368b
5 ficheiros alterados com 92 adições e 20 exclusões
  1. 9 0
      pkg/cloud/providerconfig.go
  2. 2 1
      pkg/costmodel/cluster.go
  3. 35 10
      pkg/costmodel/router.go
  4. 46 0
      pkg/prom/prom.go
  5. 0 9
      pkg/prom/query.go

+ 9 - 0
pkg/cloud/providerconfig.go

@@ -6,6 +6,7 @@ import (
 	"io/ioutil"
 	"os"
 	"reflect"
+	"strconv"
 	"strings"
 	"sync"
 
@@ -138,6 +139,14 @@ func (pc *ProviderConfig) UpdateFromMap(a map[string]string) (*CustomPricing, er
 		for k, v := range a {
 			// Just so we consistently supply / receive the same values, uppercase the first letter.
 			kUpper := strings.Title(k)
+			if kUpper == "CPU" || kUpper == "SpotCPU" || kUpper == "RAM" || kUpper == "SpotRAM" || kUpper == "GPU" || kUpper == "Storage" {
+				val, err := strconv.ParseFloat(v, 64)
+				if err != nil {
+					return fmt.Errorf("Unable to parse CPU from string to float: %s", err.Error())
+				}
+				v = fmt.Sprintf("%f", val/730)
+			}
+
 			err := SetCustomPricingField(c, kUpper, v)
 			if err != nil {
 				return err

+ 2 - 1
pkg/costmodel/cluster.go

@@ -81,6 +81,7 @@ type ClusterCosts struct {
 	StorageBreakdown  *ClusterCostsBreakdown `json:"storageBreakdown"`
 	TotalCumulative   float64                `json:"totalCumulativeCost"`
 	TotalMonthly      float64                `json:"totalMonthlyCost"`
+	DataMinutes       float64
 }
 
 // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
@@ -396,7 +397,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
 			costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
 		}
-
+		costs.DataMinutes = dataMins
 		costsByCluster[id] = costs
 	}
 

+ 35 - 10
pkg/costmodel/router.go

@@ -24,6 +24,8 @@ import (
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	cm "github.com/kubecost/cost-model/pkg/clustermanager"
 	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
@@ -40,6 +42,7 @@ const (
 	logCollectionEnvVar            = "LOG_COLLECTION_ENABLED"
 	productAnalyticsEnvVar         = "PRODUCT_ANALYTICS_ENABLED"
 	errorReportingEnvVar           = "ERROR_REPORTING_ENABLED"
+	maxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
 	prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
 	prometheusTroubleshootingEp    = "http://docs.kubecost.com/custom-prom#troubleshoot"
 	RFC3339Milli                   = "2006-01-02T15:04:05.000Z"
@@ -163,6 +166,22 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
+// Parses the max query concurrency environment variable
+func maxQueryConcurrency() int {
+	v := os.Getenv(maxQueryConcurrencyEnvVar)
+	if v == "" {
+		return 5
+	}
+
+	result, err := strconv.Atoi(v)
+	if err != nil {
+		log.Warningf("Failed to parse MAX_QUERY_CONCURRENCY. Defaulting to 5 - %s", err)
+		return 5
+	}
+
+	return result
+}
+
 // writeReportingFlags writes the reporting flags to the cluster info map
 func writeReportingFlags(clusterInfo map[string]string) {
 	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
@@ -879,23 +898,27 @@ type ConfigWatchers struct {
 	WatchFunc     func(string, map[string]string) error
 }
 
+// captures the panic event in sentry
+func capturePanicEvent(err string, stack string) {
+	msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
+	sentry.CurrentHub().CaptureEvent(&sentry.Event{
+		Level:   sentry.LevelError,
+		Message: msg,
+	})
+	sentry.Flush(5 * time.Second)
+}
+
 // handle any panics reported by the errors package
 func handlePanic(p errors.Panic) bool {
 	err := p.Error
 
 	if err != nil {
 		if err, ok := err.(error); ok {
-			sentry.CurrentHub().CaptureException(err)
-			sentry.Flush(5 * time.Second)
+			capturePanicEvent(err.Error(), p.Stack)
 		}
 
 		if err, ok := err.(string); ok {
-			msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, p.Stack)
-			sentry.CurrentHub().CaptureEvent(&sentry.Event{
-				Level:   sentry.LevelError,
-				Message: msg,
-			})
-			sentry.Flush(5 * time.Second)
+			capturePanicEvent(err, p.Stack)
 		}
 	}
 
@@ -928,6 +951,8 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
 	}
 
+	queryConcurrency := maxQueryConcurrency()
+
 	var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
 		Proxy: http.ProxyFromEnvironment,
 		DialContext: (&net.Dialer{
@@ -941,7 +966,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Address:      address,
 		RoundTripper: LongTimeoutRoundTripper,
 	}
-	promCli, _ := prometheusClient.NewClient(pc)
+	promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency)
 
 	m, err := ValidatePrometheus(promCli, false)
 	if err != nil || m.Running == false {
@@ -1155,7 +1180,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 				Address:      thanosUrl,
 				RoundTripper: thanosRT,
 			}
-			thanosCli, _ := prometheusClient.NewClient(thanosConfig)
+			thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency)
 
 			_, err = ValidatePrometheus(thanosCli, true)
 			if err != nil {

+ 46 - 0
pkg/prom/prom.go

@@ -0,0 +1,46 @@
+package prom
+
+import (
+	"context"
+	"net/http"
+	"net/url"
+
+	"github.com/kubecost/cost-model/pkg/util"
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+// NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
+// prometheus requests.
+func NewRateLimitedClient(config prometheus.Config, maxConcurrency int) (prometheus.Client, error) {
+	c, err := prometheus.NewClient(config)
+	if err != nil {
+		return nil, err
+	}
+
+	limiter := util.NewSemaphore(maxConcurrency)
+
+	return &RateLimitedPrometheusClient{
+		client:  c,
+		limiter: limiter,
+	}, nil
+}
+
+// Creates a new prometheus client which limits the total number of concurrent outbound requests
+// allowed at a given moment.
+type RateLimitedPrometheusClient struct {
+	client  prometheus.Client
+	limiter *util.Semaphore
+}
+
+// Passthrough to the prometheus client API
+func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string) *url.URL {
+	return rlpc.client.URL(ep, args)
+}
+
+// Rate limit and passthrough to prometheus client API
+func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
+	rlpc.limiter.Acquire()
+	defer rlpc.limiter.Return()
+
+	return rlpc.client.Do(ctx, req)
+}

+ 0 - 9
pkg/prom/query.go

@@ -7,7 +7,6 @@ import (
 	"net/http"
 
 	"github.com/kubecost/cost-model/pkg/errors"
-	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
@@ -22,20 +21,15 @@ const (
 type Context struct {
 	Client         prometheus.Client
 	ErrorCollector *errors.ErrorCollector
-	semaphore      *util.Semaphore
 }
 
 // NewContext creates a new Promethues querying context from the given client
 func NewContext(client prometheus.Client) *Context {
 	var ec errors.ErrorCollector
 
-	// By deafult, allow 20 concurrent queries, which is the Prometheus default
-	sem := util.NewSemaphore(20)
-
 	return &Context{
 		Client:         client,
 		ErrorCollector: &ec,
-		semaphore:      sem,
 	}
 }
 
@@ -82,9 +76,6 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 }
 
 func (ctx *Context) query(query string) (interface{}, error) {
-	ctx.semaphore.Acquire()
-	defer ctx.semaphore.Return()
-
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)