Explorar o código

Rewrite ClusterLoadBalancers

Niko Kovacevic %!s(int64=4) %!d(string=hai) anos
pai
achega
ea4705e8d9
Modificáronse 2 ficheiros con 59 adicións e 37 borrados
  1. 59 35
      pkg/costmodel/cluster.go
  2. 0 2
      pkg/prom/query.go

+ 59 - 35
pkg/costmodel/cluster.go

@@ -505,39 +505,45 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.T
 	return nodeMap, nil
 }
 
+type LoadBalancerIdentifier struct {
+	Cluster   string
+	Namespace string
+	Name      string
+}
+
 type LoadBalancer struct {
 	Cluster    string
+	Namespace  string
 	Name       string
 	ProviderID string
 	Cost       float64
 	Start      time.Time
+	End        time.Time
 	Minutes    float64
 }
 
-// TODO rewrite this more-or-less altogether
-func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
-	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
-	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
-	if offset < time.Minute {
-		offsetStr = ""
-	}
+func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
+	// Query for the duration between start and end
+	durStr := timeutil.DurationString(end.Sub(start))
+
+	// Start from the time "end", querying backwards
+	t := end
 
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
 	// but more expensive queries, and vice-a-versa.
-	minsPerResolution := 5
-
-	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
-	// value, converts it to a cumulative value; i.e.
-	// [$/hr] * [min/res]*[hr/min] = [$/res]
-	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
+	minsPerResolution := 1
 
 	ctx := prom.NewNamedContext(client, prom.ClusterContextName)
-	queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip))[%s:%dm]%s) * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
-	queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
 
-	resChLBCost := ctx.Query(queryLBCost)
-	resChActiveMins := ctx.Query(queryActiveMins)
+	queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
+	queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+
+	log.Infof("[Prom] t=%d q=%s", t.Unix(), queryLBCost)
+	log.Infof("[Prom] t=%d q=%s", t.Unix(), queryActiveMins)
+
+	resChLBCost := ctx.QueryAtTime(queryLBCost, t)
+	resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
 
 	resLBCost, _ := resChLBCost.Await()
 	resActiveMins, _ := resChActiveMins.Await()
@@ -546,9 +552,9 @@ func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Durati
 		return nil, ctx.ErrorCollection()
 	}
 
-	loadBalancerMap := map[string]*LoadBalancer{}
+	loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
 
-	for _, result := range resLBCost {
+	for _, result := range resActiveMins {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -558,7 +564,7 @@ func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Durati
 			log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
 			continue
 		}
-		serviceName, err := result.GetString("service_name")
+		name, err := result.GetString("service_name")
 		if err != nil {
 			log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
 			continue
@@ -568,25 +574,43 @@ func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Durati
 			log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
 			providerID = ""
 		}
-		lbCost := result.Values[0].Value
 
-		key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
+		key := LoadBalancerIdentifier{
+			Cluster:   cluster,
+			Namespace: namespace,
+			Name:      name,
+		}
+
+		// Skip if there are no data
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		// Add load balancer to the set of load balancers
 		if _, ok := loadBalancerMap[key]; !ok {
 			loadBalancerMap[key] = &LoadBalancer{
 				Cluster:    cluster,
-				Name:       namespace + "/" + serviceName,
+				Namespace:  namespace,
+				Name:       fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
 				ProviderID: cloud.ParseLBID(providerID),
 			}
 		}
+
+		// Append start, end, and minutes. This should come before all other data.
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		loadBalancerMap[key].Start = s
+		loadBalancerMap[key].End = e
+		loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
+
 		// Fill in Provider ID if it is available and missing in the loadBalancerMap
 		// Prevents there from being a duplicate LoadBalancers on the same day
 		if providerID != "" && loadBalancerMap[key].ProviderID == "" {
 			loadBalancerMap[key].ProviderID = providerID
 		}
-		loadBalancerMap[key].Cost += lbCost
 	}
 
-	for _, result := range resActiveMins {
+	for _, result := range resLBCost {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -596,28 +620,28 @@ func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Durati
 			log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
 			continue
 		}
-		serviceName, err := result.GetString("service_name")
+		name, err := result.GetString("service_name")
 		if err != nil {
 			log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
 			continue
 		}
-		key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
 
-		if len(result.Values) == 0 {
-			continue
+		key := LoadBalancerIdentifier{
+			Cluster:   cluster,
+			Namespace: namespace,
+			Name:      name,
 		}
 
+		// Apply cost as price-per-hour * hours
 		if lb, ok := loadBalancerMap[key]; ok {
-			s := time.Unix(int64(result.Values[0].Timestamp), 0)
-			e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
-			mins := e.Sub(s).Minutes()
-
-			lb.Start = s
-			lb.Minutes = mins
+			lbPricePerHr := result.Values[0].Value
+			hrs := lb.Minutes / 60.0
+			lb.Cost += lbPricePerHr * hrs
 		} else {
 			log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
 		}
 	}
+
 	return loadBalancerMap, nil
 }
 

+ 0 - 2
pkg/prom/query.go

@@ -190,8 +190,6 @@ func (ctx *Context) RawQuery(query string, t time.Time) ([]byte, error) {
 	q.Set("query", query)
 
 	if !t.IsZero() {
-		// TODO remove log
-		log.Infof("[Prom] time=%s query=%s", strconv.FormatInt(t.Unix(), 10), query)
 		q.Set("time", strconv.FormatInt(t.Unix(), 10))
 	} else {
 		// for non-range queries, we set the timestamp for the query to time-offset