Browse Source

Merge pull request #547 from kubecost/develop

Merge develop for 1.65.1
Ajay Tripathy 5 năm trước cách đây
mục cha
commit
47c66038fe
5 tập tin đã thay đổi với 77 bổ sung28 xóa
  1. 2 2
      kubernetes/deployment.yaml
  2. 28 11
      pkg/costmodel/cluster.go
  3. 6 2
      pkg/costmodel/router.go
  4. 22 0
      pkg/env/costmodelenv.go
  5. 19 13
      pkg/prom/prom.go

+ 2 - 2
kubernetes/deployment.yaml

@@ -22,7 +22,7 @@ spec:
       restartPolicy: Always
       serviceAccountName: cost-model
       containers:
-        - image: ajaytripathy/kubecost-cost-model:latest
+        - image: quay.io/kubecost1/kubecost-cost-model:latest
           name: cost-model
           resources:
             requests:
@@ -32,5 +32,5 @@ spec:
             - name: PROMETHEUS_SERVER_ENDPOINT
               value: "{{prometheusEndpoint}}"  #The endpoint should have the form http://<service-name>.<namespace-name>.svc
             - name: CLOUD_PROVIDER_API_KEY
-              value: "AIzaSyD29bGxmHAVEOBYtgd8sYM2gM2ekfxQX4U" # The GCP Pricing API requires a key.
+              value: "AIzaSyD29bGxmHAVEOBYtgd8sYM2gM2ekfxQX4U" # The GCP Pricing API requires a key. This is supplied just for evaluation.
           imagePullPolicy: Always

+ 28 - 11
pkg/costmodel/cluster.go

@@ -112,6 +112,7 @@ type Disk struct {
 	Bytes      float64
 	Local      bool
 	Start      time.Time
+	End        time.Time
 	Minutes    float64
 	Breakdown  *ClusterCostsBreakdown
 }
@@ -126,7 +127,8 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
 	// but more expensive queries, and vice-a-versa.
-	minsPerResolution := 5
+	minsPerResolution := 1
+	resolution := time.Duration(minsPerResolution) * time.Minute
 
 	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
 	// value, converts it to a cumulative value; i.e.
@@ -316,11 +318,12 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		}
 
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
 
+		diskMap[key].End = e
 		diskMap[key].Start = s
 		diskMap[key].Minutes = mins
 	}
@@ -348,11 +351,12 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		}
 
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
 
+		diskMap[key].End = e
 		diskMap[key].Start = s
 		diskMap[key].Minutes = mins
 	}
@@ -380,9 +384,16 @@ type Node struct {
 	CPUBreakdown *ClusterCostsBreakdown
 	RAMBreakdown *ClusterCostsBreakdown
 	Start        time.Time
+	End          time.Time
 	Minutes      float64
 }
 
+var partialCPUMap = map[string]float64{
+	"e2-micro":  0.25,
+	"e2-small":  0.5,
+	"e2-medium": 1.0,
+}
+
 func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, []error) {
 	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
 	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
@@ -393,7 +404,8 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
 	// but more expensive queries, and vice-a-versa.
-	minsPerResolution := 5
+	minsPerResolution := 1
+	resolution := time.Duration(minsPerResolution) * time.Minute
 
 	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
 	// value, converts it to a cumulative value; i.e.
@@ -410,7 +422,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
-	queryActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryActiveMins := fmt.Sprintf(`node_total_hourly_cost[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 	resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
 	resChNodeCPUCores := ctx.Query(queryNodeCPUCores)
@@ -469,11 +481,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		}
 		nodeMap[key].CPUCost += cpuCost
 		nodeMap[key].NodeType = nodeType
+		if nodeMap[key].ProviderID == "" {
+			nodeMap[key].ProviderID = cp.ParseID(providerID)
+		}
 	}
-	partialCPUMap := make(map[string]float64)
-	partialCPUMap["e2-micro"] = 0.25
-	partialCPUMap["e2-small"] = 0.5
-	partialCPUMap["e2-medium"] = 1.0
 
 	for _, result := range resNodeCPUCores {
 		cluster, err := result.GetString("cluster_id")
@@ -508,7 +519,6 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		} else {
 			nodeMap[key].CPUCores = cpuCores
 		}
-
 	}
 
 	for _, result := range resNodeRAMCost {
@@ -541,6 +551,9 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		}
 		nodeMap[key].RAMCost += ramCost
 		nodeMap[key].NodeType = nodeType
+		if nodeMap[key].ProviderID == "" {
+			nodeMap[key].ProviderID = cp.ParseID(providerID)
+		}
 	}
 
 	for _, result := range resNodeRAMBytes {
@@ -598,6 +611,9 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 			}
 		}
 		nodeMap[key].GPUCost += gpuCost
+		if nodeMap[key].ProviderID == "" {
+			nodeMap[key].ProviderID = cp.ParseID(providerID)
+		}
 	}
 
 	for _, result := range resNodeCPUModePct {
@@ -707,11 +723,12 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		}
 
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
 
+		nodeMap[key].End = e
 		nodeMap[key].Start = s
 		nodeMap[key].Minutes = mins
 	}

+ 6 - 2
pkg/costmodel/router.go

@@ -46,8 +46,12 @@ const (
 var (
 	// gitCommit is set by the build system
 	gitCommit                       string
+	dbBasicAuthUsername             string = env.GetDBBasicAuthUsername()
+	dbBasicAuthPW                   string = env.GetDBBasicAuthUserPassword()
+	dbBearerToken                   string = env.GetDBBearerToken()
 	multiclusterDBBasicAuthUsername string = env.GetMultiClusterBasicAuthUsername()
 	multiclusterDBBasicAuthPW       string = env.GetMultiClusterBasicAuthPassword()
+	multiClusterBearerToken         string = env.GetMultiClusterBearerToken()
 )
 
 var Router = httprouter.New()
@@ -739,7 +743,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Address:      address,
 		RoundTripper: LongTimeoutRoundTripper,
 	}
-	promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency, "", "")
+	promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency, dbBasicAuthUsername, dbBasicAuthPW, dbBearerToken)
 
 	m, err := ValidatePrometheus(promCli, false)
 	if err != nil || m.Running == false {
@@ -973,7 +977,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 				RoundTripper: thanosRT,
 			}
 
-			thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency, multiclusterDBBasicAuthUsername, multiclusterDBBasicAuthPW)
+			thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency, multiclusterDBBasicAuthUsername, multiclusterDBBasicAuthPW, multiClusterBearerToken)
 
 			_, err = ValidatePrometheus(thanosCli, true)
 			if err != nil {

+ 22 - 0
pkg/env/costmodelenv.go

@@ -28,8 +28,13 @@ const (
 	ErrorReportingEnabledEnvVar   = "ERROR_REPORTING_ENABLED"
 	ValuesReportingEnabledEnvVar  = "VALUES_REPORTING_ENABLED"
 
+	DBBasicAuthUsername = "DB_BASIC_AUTH_USERNAME"
+	DBBasicAuthPassword = "DB_BASIC_AUTH_PW"
+	DBBearerToken       = "DB_BEARER_TOKEN"
+
 	MultiClusterBasicAuthUsername = "MC_BASIC_AUTH_USERNAME"
 	MultiClusterBasicAuthPassword = "MC_BASIC_AUTH_PW"
+	MultiClusterBearerToken       = "MC_BEARER_TOKEN"
 
 	InsecureSkipVerify = "INSECURE_SKIP_VERIFY"
 )
@@ -178,6 +183,19 @@ func GetMaxQueryConcurrency() int {
 	return GetInt(MaxQueryConcurrencyEnvVar, 5)
 }
 
+func GetDBBasicAuthUsername() string {
+	return Get(DBBasicAuthUsername, "")
+}
+
+func GetDBBasicAuthUserPassword() string {
+	return Get(DBBasicAuthPassword, "")
+
+}
+
+func GetDBBearerToken() string {
+	return Get(DBBearerToken, "")
+}
+
 // GetMultiClusterBasicAuthUsername returns the environemnt variable value for MultiClusterBasicAuthUsername
 func GetMultiClusterBasicAuthUsername() string {
 	return Get(MultiClusterBasicAuthUsername, "")
@@ -187,3 +205,7 @@ func GetMultiClusterBasicAuthUsername() string {
 func GetMultiClusterBasicAuthPassword() string {
 	return Get(MultiClusterBasicAuthPassword, "")
 }
+
+func GetMultiClusterBearerToken() string {
+	return Get(MultiClusterBearerToken, "")
+}

+ 19 - 13
pkg/prom/prom.go

@@ -13,12 +13,13 @@ import (
 // Creates a new prometheus client which limits the total number of concurrent outbound requests
 // allowed at a given moment.
 type RateLimitedPrometheusClient struct {
-	client   prometheus.Client
-	limiter  *util.Semaphore
-	requests *util.AtomicInt32
-	outbound *util.AtomicInt32
-	username string
-	password string
+	client      prometheus.Client
+	limiter     *util.Semaphore
+	requests    *util.AtomicInt32
+	outbound    *util.AtomicInt32
+	username    string
+	password    string
+	bearerToken string
 }
 
 // requestCounter is used to determine if the prometheus client keeps track of
@@ -30,7 +31,7 @@ type requestCounter interface {
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
-func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password string) (prometheus.Client, error) {
+func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password, bearerToken string) (prometheus.Client, error) {
 	c, err := prometheus.NewClient(config)
 	if err != nil {
 		return nil, err
@@ -41,12 +42,13 @@ func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username
 	outbound := util.NewAtomicInt32(0)
 
 	return &RateLimitedPrometheusClient{
-		client:   c,
-		limiter:  limiter,
-		requests: requests,
-		outbound: outbound,
-		username: username,
-		password: password,
+		client:      c,
+		limiter:     limiter,
+		requests:    requests,
+		outbound:    outbound,
+		username:    username,
+		password:    password,
+		bearerToken: bearerToken,
 	}, nil
 }
 
@@ -84,6 +86,10 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 	if rlpc.username != "" {
 		req.SetBasicAuth(rlpc.username, rlpc.password)
 	}
+	if rlpc.bearerToken != "" {
+		token := "Bearer " + rlpc.bearerToken
+		req.Header.Add("Authorization", token)
+	}
 	// Increment the total request counter first
 	rlpc.requests.Increment()
 	defer rlpc.requests.Decrement()