Преглед изворни кода

Making storage class available for persistent volume by pulling information from Prometheus metrics and assigning local storage storageClass as Local

Signed-off-by: Alan Rodrigues <alanr5691@yahoo.com>
Alan Rodrigues пре 3 година
родитељ
комит
e19ee71494
5 измењених фајлова са 133 додато и 58 уклоњено
  1. 51 10
      pkg/costmodel/cluster.go
  2. 29 21
      pkg/kubecost/asset.go
  3. 7 2
      pkg/kubecost/asset_json.go
  4. 1 1
      pkg/kubecost/bingen.go
  5. 45 24
      pkg/kubecost/kubecost_codecs.go

+ 51 - 10
pkg/costmodel/cluster.go

@@ -106,16 +106,17 @@ func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offse
 }
 
 type Disk struct {
-	Cluster    string
-	Name       string
-	ProviderID string
-	Cost       float64
-	Bytes      float64
-	Local      bool
-	Start      time.Time
-	End        time.Time
-	Minutes    float64
-	Breakdown  *ClusterCostsBreakdown
+	Cluster      string
+	Name         string
+	ProviderID   string
+	StorageClass string
+	Cost         float64
+	Bytes        float64
+	Local        bool
+	Start        time.Time
+	End          time.Time
+	Minutes      float64
+	Breakdown    *ClusterCostsBreakdown
 }
 
 type DiskIdentifier struct {
@@ -156,6 +157,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 	queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
 	queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
 	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+	queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info[%s])) by (%s, volumename, storageclass)`, durStr, env.GetPromClusterLabel())
 
 	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
 	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
@@ -165,6 +167,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 	resChPVCost := ctx.QueryAtTime(queryPVCost, t)
 	resChPVSize := ctx.QueryAtTime(queryPVSize, t)
 	resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
+	resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
 	resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
 	resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
 	resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
@@ -173,10 +176,12 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 	resPVCost, _ := resChPVCost.Await()
 	resPVSize, _ := resChPVSize.Await()
 	resActiveMins, _ := resChActiveMins.Await()
+	resPVStorageClass, _ := resChPVStorageClass.Await()
 	resLocalStorageCost, _ := resChLocalStorageCost.Await()
 	resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
 	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
 	resLocalActiveMins, _ := resChLocalActiveMins.Await()
+
 	if ctx.HasErrors() {
 		return nil, ctx.ErrorCollection()
 	}
@@ -208,6 +213,9 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 			}
 		}
 		diskMap[key].Cost += cost
+
+		//Assigning explicitly the storage class of local storage to local
+		diskMap[key].StorageClass = "Local"
 	}
 
 	for _, result := range resLocalStorageUsedCost {
@@ -297,6 +305,34 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 		diskMap[key].Minutes = mins
 	}
 
+	//Iterating through Persistent Volume given by kube_persistentvolumeclaim_info and assign the storage class if known and unknown if not known.
+	for _, result := range resPVStorageClass {
+		cluster, err := result.GetString(env.GetPromClusterLabel())
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, _ := result.GetString("volumename")
+
+		key := DiskIdentifier{cluster, name}
+		if _, ok := diskMap[key]; !ok {
+			log.DedupedWarningf(5, "ClusterDisks: Storage class information for unidentified disk or disk deleted from analysis")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		storageClass, err := result.GetString("storageclass")
+
+		if err != nil {
+			diskMap[key].StorageClass = "Unknown"
+		} else {
+			diskMap[key].StorageClass = storageClass
+		}
+	}
+
 	for _, disk := range diskMap {
 		// Apply all remaining RAM to Idle
 		disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
@@ -305,6 +341,11 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 		if disk.ProviderID == "" {
 			disk.ProviderID = disk.Name
 		}
+
+		// Explicitly specify unknown storage class for disk whose information is unavailable in prometheus metrics of kube_persistentvolumeclaim_info
+		if disk.StorageClass == "" {
+			disk.StorageClass = "Unknown"
+		}
 	}
 
 	return diskMap, nil

+ 29 - 21
pkg/kubecost/asset.go

@@ -1053,16 +1053,17 @@ func (cm *ClusterManagement) String() string {
 
 // Disk represents an in-cluster disk Asset
 type Disk struct {
-	Labels     AssetLabels
-	Properties *AssetProperties
-	Start      time.Time
-	End        time.Time
-	Window     Window
-	Adjustment float64
-	Cost       float64
-	ByteHours  float64
-	Local      float64
-	Breakdown  *Breakdown
+	labels       AssetLabels
+	properties   *AssetProperties
+	start        time.Time
+	end          time.Time
+	window       Window
+	adjustment   float64
+	Cost         float64
+	ByteHours    float64
+	Local        float64
+	Breakdown    *Breakdown
+	StorageClass string // @bingen:field[version=17]
 }
 
 // NewDisk creates and returns a new Disk Asset
@@ -1254,21 +1255,26 @@ func (d *Disk) add(that *Disk) {
 	d.Cost += that.Cost
 
 	d.ByteHours += that.ByteHours
+
+	if that.StorageClass != "" && that.StorageClass != "Unknown" {
+		d.StorageClass = that.StorageClass
+	}
 }
 
 // Clone returns a cloned instance of the Asset
 func (d *Disk) Clone() Asset {
 	return &Disk{
-		Properties: d.Properties.Clone(),
-		Labels:     d.Labels.Clone(),
-		Start:      d.Start,
-		End:        d.End,
-		Window:     d.Window.Clone(),
-		Adjustment: d.Adjustment,
-		Cost:       d.Cost,
-		ByteHours:  d.ByteHours,
-		Local:      d.Local,
-		Breakdown:  d.Breakdown.Clone(),
+		properties:   d.properties.Clone(),
+		labels:       d.labels.Clone(),
+		start:        d.start,
+		end:          d.end,
+		window:       d.window.Clone(),
+		adjustment:   d.adjustment,
+		Cost:         d.Cost,
+		ByteHours:    d.ByteHours,
+		Local:        d.Local,
+		Breakdown:    d.Breakdown.Clone(),
+		StorageClass: d.StorageClass,
 	}
 }
 
@@ -1309,6 +1315,9 @@ func (d *Disk) Equal(a Asset) bool {
 	if !d.Breakdown.Equal(that.Breakdown) {
 		return false
 	}
+	if d.StorageClass != that.StorageClass {
+		return false
+	}
 
 	return true
 }
@@ -2900,7 +2909,6 @@ func (as *AssetSet) Insert(asset Asset) error {
 		as.Assets[k] = newAsset
 		addToConcreteMap(as, k, newAsset)
 	}
-
 	// Expand the window, just to be safe. It's possible that the asset will
 	// be set into the map without expanding it to the AssetSet's window.
 	as.Assets[k].ExpandWindow(as.Window)

+ 7 - 2
pkg/kubecost/asset_json.go

@@ -260,8 +260,9 @@ func (d *Disk) MarshalJSON() ([]byte, error) {
 	jsonEncodeFloat64(buffer, "byteHours", d.ByteHours, ",")
 	jsonEncodeFloat64(buffer, "bytes", d.Bytes(), ",")
 	jsonEncode(buffer, "breakdown", d.Breakdown, ",")
-	jsonEncodeFloat64(buffer, "adjustment", d.Adjustment, ",")
-	jsonEncodeFloat64(buffer, "totalCost", d.TotalCost(), "")
+	jsonEncodeFloat64(buffer, "adjustment", d.Adjustment(), ",")
+	jsonEncodeFloat64(buffer, "totalCost", d.TotalCost(), ",")
+	jsonEncodeString(buffer, "storageClass", d.StorageClass, "")
 	buffer.WriteString("}")
 	return buffer.Bytes(), nil
 }
@@ -332,6 +333,10 @@ func (d *Disk) InterfaceToDisk(itf interface{}) error {
 		d.ByteHours = ByteHours.(float64)
 	}
 
+	if StorageClass, err := getTypedVal(fmap["storageClass"]); err == nil {
+		d.StorageClass = StorageClass.(string)
+	}
+
 	// d.Local is not marhsaled, and cannot be calculated from marshaled values.
 	// Currently, it is just ignored and not set in the resulting unmarshal to Disk
 	//  be aware that this means a resulting Disk from an unmarshal is therefore NOT

+ 1 - 1
pkg/kubecost/bingen.go

@@ -24,7 +24,7 @@ package kubecost
 // @bingen:generate:Window
 
 // Asset Version Set: Includes Asset pipeline specific resources
-// @bingen:set[name=Assets,version=16]
+// @bingen:set[name=Assets,version=17]
 // @bingen:generate:Any
 // @bingen:generate:Asset
 // @bingen:generate:AssetLabels

+ 45 - 24
pkg/kubecost/kubecost_codecs.go

@@ -13,12 +13,11 @@ package kubecost
 
 import (
 	"fmt"
+	util "github.com/opencost/opencost/pkg/util"
 	"reflect"
 	"strings"
 	"sync"
 	"time"
-
-	util "github.com/opencost/opencost/pkg/util"
 )
 
 const (
@@ -34,17 +33,17 @@ const (
 )
 
 const (
-	// DefaultCodecVersion is used for any resources listed in the Default version set
-	DefaultCodecVersion uint8 = 15
-
-	// AssetsCodecVersion is used for any resources listed in the Assets version set
-	AssetsCodecVersion uint8 = 16
-
 	// AllocationCodecVersion is used for any resources listed in the Allocation version set
 	AllocationCodecVersion uint8 = 15
 
 	// AuditCodecVersion is used for any resources listed in the Audit version set
 	AuditCodecVersion uint8 = 1
+
+	// DefaultCodecVersion is used for any resources listed in the Default version set
+	DefaultCodecVersion uint8 = 15
+
+	// AssetsCodecVersion is used for any resources listed in the Assets version set
+	AssetsCodecVersion uint8 = 17
 )
 
 //--------------------------------------------------------------------------
@@ -4888,14 +4887,14 @@ func (target *Disk) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 	buff.WriteUInt8(AssetsCodecVersion) // version
 
 	// --- [begin][write][alias](AssetLabels) ---
-	if map[string]string(target.Labels) == nil {
+	if map[string]string(target.labels) == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
 	} else {
 		buff.WriteUInt8(uint8(1)) // write non-nil byte
 
 		// --- [begin][write][map](map[string]string) ---
-		buff.WriteInt(len(map[string]string(target.Labels))) // map length
-		for v, z := range map[string]string(target.Labels) {
+		buff.WriteInt(len(map[string]string(target.labels))) // map length
+		for v, z := range map[string]string(target.labels) {
 			if ctx.IsStringTable() {
 				a := ctx.Table.AddOrGet(v)
 				buff.WriteInt(a) // write table index
@@ -4914,14 +4913,14 @@ func (target *Disk) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 	}
 	// --- [end][write][alias](AssetLabels) ---
 
-	if target.Properties == nil {
+	if target.properties == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
 	} else {
 		buff.WriteUInt8(uint8(1)) // write non-nil byte
 
 		// --- [begin][write][struct](AssetProperties) ---
 		buff.WriteInt(0) // [compatibility, unused]
-		errA := target.Properties.MarshalBinaryWithContext(ctx)
+		errA := target.properties.MarshalBinaryWithContext(ctx)
 		if errA != nil {
 			return errA
 		}
@@ -4929,7 +4928,7 @@ func (target *Disk) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 
 	}
 	// --- [begin][write][reference](time.Time) ---
-	c, errB := target.Start.MarshalBinary()
+	c, errB := target.start.MarshalBinary()
 	if errB != nil {
 		return errB
 	}
@@ -4938,7 +4937,7 @@ func (target *Disk) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 	// --- [end][write][reference](time.Time) ---
 
 	// --- [begin][write][reference](time.Time) ---
-	d, errC := target.End.MarshalBinary()
+	d, errC := target.end.MarshalBinary()
 	if errC != nil {
 		return errC
 	}
@@ -4948,13 +4947,13 @@ func (target *Disk) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 
 	// --- [begin][write][struct](Window) ---
 	buff.WriteInt(0) // [compatibility, unused]
-	errD := target.Window.MarshalBinaryWithContext(ctx)
+	errD := target.window.MarshalBinaryWithContext(ctx)
 	if errD != nil {
 		return errD
 	}
 	// --- [end][write][struct](Window) ---
 
-	buff.WriteFloat64(target.Adjustment) // write float64
+	buff.WriteFloat64(target.adjustment) // write float64
 	buff.WriteFloat64(target.Cost)       // write float64
 	buff.WriteFloat64(target.ByteHours)  // write float64
 	buff.WriteFloat64(target.Local)      // write float64
@@ -4972,6 +4971,12 @@ func (target *Disk) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 		// --- [end][write][struct](Breakdown) ---
 
 	}
+	if ctx.IsStringTable() {
+		e := ctx.Table.AddOrGet(target.StorageClass)
+		buff.WriteInt(e) // write table index
+	} else {
+		buff.WriteString(target.StorageClass) // write string
+	}
 	return nil
 }
 
@@ -5066,11 +5071,11 @@ func (target *Disk) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 		// --- [end][read][map](map[string]string) ---
 
 	}
-	target.Labels = AssetLabels(a)
+	target.labels = AssetLabels(a)
 	// --- [end][read][alias](AssetLabels) ---
 
 	if buff.ReadUInt8() == uint8(0) {
-		target.Properties = nil
+		target.properties = nil
 	} else {
 		// --- [begin][read][struct](AssetProperties) ---
 		l := &AssetProperties{}
@@ -5079,7 +5084,7 @@ func (target *Disk) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 		if errA != nil {
 			return errA
 		}
-		target.Properties = l
+		target.properties = l
 		// --- [end][read][struct](AssetProperties) ---
 
 	}
@@ -5091,7 +5096,7 @@ func (target *Disk) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 	if errB != nil {
 		return errB
 	}
-	target.Start = *m
+	target.start = *m
 	// --- [end][read][reference](time.Time) ---
 
 	// --- [begin][read][reference](time.Time) ---
@@ -5102,7 +5107,7 @@ func (target *Disk) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 	if errC != nil {
 		return errC
 	}
-	target.End = *p
+	target.end = *p
 	// --- [end][read][reference](time.Time) ---
 
 	// --- [begin][read][struct](Window) ---
@@ -5112,11 +5117,11 @@ func (target *Disk) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 	if errD != nil {
 		return errD
 	}
-	target.Window = *s
+	target.window = *s
 	// --- [end][read][struct](Window) ---
 
 	t := buff.ReadFloat64() // read float64
-	target.Adjustment = t
+	target.adjustment = t
 
 	u := buff.ReadFloat64() // read float64
 	target.Cost = u
@@ -5141,6 +5146,22 @@ func (target *Disk) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 		// --- [end][read][struct](Breakdown) ---
 
 	}
+	// field version check
+	if uint8(17) <= version {
+		var bb string
+		if ctx.IsStringTable() {
+			cc := buff.ReadInt() // read string index
+			bb = ctx.Table[cc]
+		} else {
+			bb = buff.ReadString() // read string
+		}
+		aa := bb
+		target.StorageClass = aa
+
+	} else {
+		target.StorageClass = "" // default
+	}
+
 	return nil
 }