Explorar o código

Update local storage asset creation (#2837)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb hai 1 ano
pai
achega
7b41474aba
Modificáronse 1 ficheiros con 105 adicións e 68 borrados
  1. 105 68
      pkg/costmodel/cluster.go

+ 105 - 68
pkg/costmodel/cluster.go

@@ -42,7 +42,7 @@ const (
 	queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
 )
 
-const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
+const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
 
 // Costs represents cumulative and monthly cluster costs over a given duration. Costs
 // are broken down by cores, memory, and storage.
@@ -209,11 +209,15 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 		hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
 		costPerGBHr := 0.04 / 730.0
 
-		queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
-		queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
-		queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device!="tmpfs", id="/", %s}[%s])) by (instance, %s, job)) by (instance, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
-		queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device!="tmpfs", id="/", %s}[%s])) by (instance, %s, job)) by (instance, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
-		queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
+		// container_fs metrics contains metrics for disks that are not local storage of the node. While not perfect to
+		// attempt to identify the correct device which is being used as local storage we first filter for devices mounted
+		// at paths `/dev/nvme.*` or `/dev/sda.*`. There still may be multiple devices mounted at paths matching the regex
+		// so later on we will select the device with the highest `container_fs_limit_bytes` per instance to create a local disk asset
+		queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*"}, id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
+		queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*"}, id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
+		queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*"}, id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+		queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*"}, id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+		queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*"}, id="/", %s}) by (instance, device, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
 		queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
 
 		resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
@@ -275,7 +279,16 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 
 	pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, provider, opencost.NewClosedWindow(start, end))
 
-	for _, result := range resLocalStorageCost {
+	type localStorage struct {
+		device string
+		disk   *Disk
+	}
+
+	localStorageDisks := map[DiskIdentifier]localStorage{}
+
+	// Start with local storage bytes so that the device with the largest size which has passed the
+	// query filters can be determined
+	for _, result := range resLocalStorageBytes {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -287,23 +300,37 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 			continue
 		}
 
-		cost := result.Values[0].Value
+		device, err := result.GetString("device")
+		if err != nil {
+			log.Warnf("ClusterDisks: local storage data missing device")
+			continue
+		}
+
+		bytes := result.Values[0].Value
+		// Ignore disks that are larger than the max size
+		if bytes > MAX_LOCAL_STORAGE_SIZE {
+			continue
+		}
+
 		key := DiskIdentifier{cluster, name}
-		if _, ok := diskMap[key]; !ok {
-			diskMap[key] = &Disk{
-				Cluster:   cluster,
-				Name:      name,
-				Breakdown: &ClusterCostsBreakdown{},
-				Local:     true,
+
+		// only keep the device with the most bytes per instance
+		if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
+			localStorageDisks[key] = localStorage{
+				device: device,
+				disk: &Disk{
+					Cluster:      cluster,
+					Name:         name,
+					Breakdown:    &ClusterCostsBreakdown{},
+					Local:        true,
+					StorageClass: opencost.LocalStorageClass,
+					Bytes:        bytes,
+				},
 			}
 		}
-		diskMap[key].Cost += cost
-
-		//Assigning explicitly the storage class of local storage to local
-		diskMap[key].StorageClass = opencost.LocalStorageClass
 	}
 
-	for _, result := range resLocalStorageUsedCost {
+	for _, result := range resLocalStorageCost {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -311,24 +338,27 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 
 		name, err := result.GetString("instance")
 		if err != nil {
-			log.Warnf("ClusterDisks: local storage usage data missing instance")
+			log.Warnf("ClusterDisks: local storage data missing instance")
+			continue
+		}
+
+		device, err := result.GetString("device")
+		if err != nil {
+			log.Warnf("ClusterDisks: local storage data missing device")
 			continue
 		}
 
 		cost := result.Values[0].Value
 		key := DiskIdentifier{cluster, name}
-		if _, ok := diskMap[key]; !ok {
-			diskMap[key] = &Disk{
-				Cluster:   cluster,
-				Name:      name,
-				Breakdown: &ClusterCostsBreakdown{},
-				Local:     true,
-			}
+		ls, ok := localStorageDisks[key]
+		if !ok || ls.device != device {
+			continue
 		}
-		diskMap[key].Breakdown.System = cost / diskMap[key].Cost
+		ls.disk.Cost = cost
+
 	}
 
-	for _, result := range resLocalStorageUsedAvg {
+	for _, result := range resLocalStorageUsedCost {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -336,24 +366,26 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 
 		name, err := result.GetString("instance")
 		if err != nil {
-			log.Warnf("ClusterDisks: local storage data missing instance")
+			log.Warnf("ClusterDisks: local storage usage data missing instance")
 			continue
 		}
 
-		bytesAvg := result.Values[0].Value
+		device, err := result.GetString("device")
+		if err != nil {
+			log.Warnf("ClusterDisks: local storage data missing device")
+			continue
+		}
+
+		cost := result.Values[0].Value
 		key := DiskIdentifier{cluster, name}
-		if _, ok := diskMap[key]; !ok {
-			diskMap[key] = &Disk{
-				Cluster:   cluster,
-				Name:      name,
-				Breakdown: &ClusterCostsBreakdown{},
-				Local:     true,
-			}
+		ls, ok := localStorageDisks[key]
+		if !ok || ls.device != device {
+			continue
 		}
-		diskMap[key].BytesUsedAvgPtr = &bytesAvg
+		ls.disk.Breakdown.System = cost / ls.disk.Cost
 	}
 
-	for _, result := range resLocalStorageUsedMax {
+	for _, result := range resLocalStorageUsedAvg {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -365,20 +397,22 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 			continue
 		}
 
-		bytesMax := result.Values[0].Value
+		device, err := result.GetString("device")
+		if err != nil {
+			log.Warnf("ClusterDisks: local storage data missing device")
+			continue
+		}
+
+		bytesAvg := result.Values[0].Value
 		key := DiskIdentifier{cluster, name}
-		if _, ok := diskMap[key]; !ok {
-			diskMap[key] = &Disk{
-				Cluster:   cluster,
-				Name:      name,
-				Breakdown: &ClusterCostsBreakdown{},
-				Local:     true,
-			}
+		ls, ok := localStorageDisks[key]
+		if !ok || ls.device != device {
+			continue
 		}
-		diskMap[key].BytesUsedMaxPtr = &bytesMax
+		ls.disk.BytesUsedAvgPtr = &bytesAvg
 	}
 
-	for _, result := range resLocalStorageBytes {
+	for _, result := range resLocalStorageUsedMax {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -390,21 +424,19 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 			continue
 		}
 
-		bytes := result.Values[0].Value
-		key := DiskIdentifier{cluster, name}
-		if _, ok := diskMap[key]; !ok {
-			diskMap[key] = &Disk{
-				Cluster:   cluster,
-				Name:      name,
-				Breakdown: &ClusterCostsBreakdown{},
-				Local:     true,
-			}
+		device, err := result.GetString("device")
+		if err != nil {
+			log.Warnf("ClusterDisks: local storage data missing device")
+			continue
 		}
-		diskMap[key].Bytes = bytes
-		if bytes/1024/1024/1024 > maxLocalDiskSize {
-			log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
-			delete(diskMap, key)
+
+		bytesMax := result.Values[0].Value
+		key := DiskIdentifier{cluster, name}
+		ls, ok := localStorageDisks[key]
+		if !ok || ls.device != device {
+			continue
 		}
+		ls.disk.BytesUsedMaxPtr = &bytesMax
 	}
 
 	for _, result := range resLocalActiveMins {
@@ -420,8 +452,8 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 		}
 
 		key := DiskIdentifier{cluster, name}
-		if _, ok := diskMap[key]; !ok {
-			log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
+		ls, ok := localStorageDisks[key]
+		if !ok {
 			continue
 		}
 
@@ -435,9 +467,14 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
 
-		diskMap[key].End = e
-		diskMap[key].Start = s
-		diskMap[key].Minutes = mins
+		ls.disk.End = e
+		ls.disk.Start = s
+		ls.disk.Minutes = mins
+	}
+
+	// move local storage disks to main disk map
+	for key, ls := range localStorageDisks {
+		diskMap[key] = ls.disk
 	}
 
 	var unTracedDiskLogData []DiskIdentifier