Przeglądaj źródła

Merge pull request #1352 from avrodrigues5/avr/fix_storage_class

Making storage class available for persistent volume by pulling infor…
Michael Dresser 3 lat temu
rodzic
commit
428167d73f

+ 56 - 10
pkg/costmodel/cluster.go

@@ -7,6 +7,7 @@ import (
 
 
 	"github.com/opencost/opencost/pkg/kubecost"
 	"github.com/opencost/opencost/pkg/kubecost"
 	"github.com/opencost/opencost/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/util/timeutil"
+	"golang.org/x/exp/slices"
 
 
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/env"
 	"github.com/opencost/opencost/pkg/env"
@@ -106,16 +107,17 @@ func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offse
 }
 }
 
 
 type Disk struct {
 type Disk struct {
-	Cluster    string
-	Name       string
-	ProviderID string
-	Cost       float64
-	Bytes      float64
-	Local      bool
-	Start      time.Time
-	End        time.Time
-	Minutes    float64
-	Breakdown  *ClusterCostsBreakdown
+	Cluster      string
+	Name         string
+	ProviderID   string
+	StorageClass string
+	Cost         float64
+	Bytes        float64
+	Local        bool
+	Start        time.Time
+	End          time.Time
+	Minutes      float64
+	Breakdown    *ClusterCostsBreakdown
 }
 }
 
 
 type DiskIdentifier struct {
 type DiskIdentifier struct {
@@ -156,6 +158,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 	queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
 	queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
 	queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
 	queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
 	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
 	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+	queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info[%s])) by (%s, persistentvolume, storageclass)`, durStr, env.GetPromClusterLabel())
 
 
 	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
 	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
 	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
 	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
@@ -165,6 +168,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 	resChPVCost := ctx.QueryAtTime(queryPVCost, t)
 	resChPVCost := ctx.QueryAtTime(queryPVCost, t)
 	resChPVSize := ctx.QueryAtTime(queryPVSize, t)
 	resChPVSize := ctx.QueryAtTime(queryPVSize, t)
 	resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
 	resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
+	resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
 	resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
 	resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
 	resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
 	resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
 	resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
 	resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
@@ -173,10 +177,12 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 	resPVCost, _ := resChPVCost.Await()
 	resPVCost, _ := resChPVCost.Await()
 	resPVSize, _ := resChPVSize.Await()
 	resPVSize, _ := resChPVSize.Await()
 	resActiveMins, _ := resChActiveMins.Await()
 	resActiveMins, _ := resChActiveMins.Await()
+	resPVStorageClass, _ := resChPVStorageClass.Await()
 	resLocalStorageCost, _ := resChLocalStorageCost.Await()
 	resLocalStorageCost, _ := resChLocalStorageCost.Await()
 	resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
 	resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
 	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
 	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
 	resLocalActiveMins, _ := resChLocalActiveMins.Await()
 	resLocalActiveMins, _ := resChLocalActiveMins.Await()
+
 	if ctx.HasErrors() {
 	if ctx.HasErrors() {
 		return nil, ctx.ErrorCollection()
 		return nil, ctx.ErrorCollection()
 	}
 	}
@@ -208,6 +214,9 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 			}
 			}
 		}
 		}
 		diskMap[key].Cost += cost
 		diskMap[key].Cost += cost
+
+		//Assigning explicitly the storage class of local storage to local
+		diskMap[key].StorageClass = kubecost.LocalStorageClass
 	}
 	}
 
 
 	for _, result := range resLocalStorageUsedCost {
 	for _, result := range resLocalStorageUsedCost {
@@ -297,6 +306,43 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end
 		diskMap[key].Minutes = mins
 		diskMap[key].Minutes = mins
 	}
 	}
 
 
+	var unTracedDiskLogData []DiskIdentifier
+	//Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
+	for _, result := range resPVStorageClass {
+		cluster, err := result.GetString(env.GetPromClusterLabel())
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, _ := result.GetString("persistentvolume")
+
+		key := DiskIdentifier{cluster, name}
+		if _, ok := diskMap[key]; !ok {
+			if !slices.Contains(unTracedDiskLogData, key) {
+				unTracedDiskLogData = append(unTracedDiskLogData, key)
+			}
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		storageClass, err := result.GetString("storageclass")
+
+		if err != nil {
+			diskMap[key].StorageClass = kubecost.UnknownStorageClass
+		} else {
+			diskMap[key].StorageClass = storageClass
+		}
+	}
+
+	// Logging the unidentified disk information outside the loop
+
+	for _, unIdentifiedDisk := range unTracedDiskLogData {
+		log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
+	}
+
 	for _, disk := range diskMap {
 	for _, disk := range diskMap {
 		// Apply all remaining RAM to Idle
 		// Apply all remaining RAM to Idle
 		disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
 		disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)

+ 36 - 21
pkg/kubecost/asset.go

@@ -15,6 +15,12 @@ import (
 // E.g. if aggregating on Cluster, Assets in the AssetSet where Asset has no cluster will be grouped under key "__undefined__"
 // E.g. if aggregating on Cluster, Assets in the AssetSet where Asset has no cluster will be grouped under key "__undefined__"
 const UndefinedKey = "__undefined__"
 const UndefinedKey = "__undefined__"
 
 
+// LocalStorageClass is used to assign storage class of local disks.
+const LocalStorageClass = "__local__"
+
+// UnknownStorageClass is used to assign storage class of persistent volume whose information is unable to be traced.
+const UnknownStorageClass = "__unknown__"
+
 // Asset defines an entity within a cluster that has a defined cost over a
 // Asset defines an entity within a cluster that has a defined cost over a
 // given period of time.
 // given period of time.
 type Asset interface {
 type Asset interface {
@@ -1053,16 +1059,17 @@ func (cm *ClusterManagement) String() string {
 
 
 // Disk represents an in-cluster disk Asset
 // Disk represents an in-cluster disk Asset
 type Disk struct {
 type Disk struct {
-	Labels     AssetLabels
-	Properties *AssetProperties
-	Start      time.Time
-	End        time.Time
-	Window     Window
-	Adjustment float64
-	Cost       float64
-	ByteHours  float64
-	Local      float64
-	Breakdown  *Breakdown
+	Labels       AssetLabels
+	Properties   *AssetProperties
+	Start        time.Time
+	End          time.Time
+	Window       Window
+	Adjustment   float64
+	Cost         float64
+	ByteHours    float64
+	Local        float64
+	Breakdown    *Breakdown
+	StorageClass string // @bingen:field[version=17]
 }
 }
 
 
 // NewDisk creates and returns a new Disk Asset
 // NewDisk creates and returns a new Disk Asset
@@ -1254,21 +1261,27 @@ func (d *Disk) add(that *Disk) {
 	d.Cost += that.Cost
 	d.Cost += that.Cost
 
 
 	d.ByteHours += that.ByteHours
 	d.ByteHours += that.ByteHours
+
+	// If storage class don't match default it to empty storage class
+	if d.StorageClass != that.StorageClass {
+		d.StorageClass = ""
+	}
 }
 }
 
 
 // Clone returns a cloned instance of the Asset
 // Clone returns a cloned instance of the Asset
 func (d *Disk) Clone() Asset {
 func (d *Disk) Clone() Asset {
 	return &Disk{
 	return &Disk{
-		Properties: d.Properties.Clone(),
-		Labels:     d.Labels.Clone(),
-		Start:      d.Start,
-		End:        d.End,
-		Window:     d.Window.Clone(),
-		Adjustment: d.Adjustment,
-		Cost:       d.Cost,
-		ByteHours:  d.ByteHours,
-		Local:      d.Local,
-		Breakdown:  d.Breakdown.Clone(),
+		Properties:   d.Properties.Clone(),
+		Labels:       d.Labels.Clone(),
+		Start:        d.Start,
+		End:          d.End,
+		Window:       d.Window.Clone(),
+		Adjustment:   d.Adjustment,
+		Cost:         d.Cost,
+		ByteHours:    d.ByteHours,
+		Local:        d.Local,
+		Breakdown:    d.Breakdown.Clone(),
+		StorageClass: d.StorageClass,
 	}
 	}
 }
 }
 
 
@@ -1309,6 +1322,9 @@ func (d *Disk) Equal(a Asset) bool {
 	if !d.Breakdown.Equal(that.Breakdown) {
 	if !d.Breakdown.Equal(that.Breakdown) {
 		return false
 		return false
 	}
 	}
+	if d.StorageClass != that.StorageClass {
+		return false
+	}
 
 
 	return true
 	return true
 }
 }
@@ -2900,7 +2916,6 @@ func (as *AssetSet) Insert(asset Asset) error {
 		as.Assets[k] = newAsset
 		as.Assets[k] = newAsset
 		addToConcreteMap(as, k, newAsset)
 		addToConcreteMap(as, k, newAsset)
 	}
 	}
-
 	// Expand the window, just to be safe. It's possible that the asset will
 	// Expand the window, just to be safe. It's possible that the asset will
 	// be set into the map without expanding it to the AssetSet's window.
 	// be set into the map without expanding it to the AssetSet's window.
 	as.Assets[k].ExpandWindow(as.Window)
 	as.Assets[k].ExpandWindow(as.Window)

+ 6 - 1
pkg/kubecost/asset_json.go

@@ -261,7 +261,8 @@ func (d *Disk) MarshalJSON() ([]byte, error) {
 	jsonEncodeFloat64(buffer, "bytes", d.Bytes(), ",")
 	jsonEncodeFloat64(buffer, "bytes", d.Bytes(), ",")
 	jsonEncode(buffer, "breakdown", d.Breakdown, ",")
 	jsonEncode(buffer, "breakdown", d.Breakdown, ",")
 	jsonEncodeFloat64(buffer, "adjustment", d.Adjustment, ",")
 	jsonEncodeFloat64(buffer, "adjustment", d.Adjustment, ",")
-	jsonEncodeFloat64(buffer, "totalCost", d.TotalCost(), "")
+	jsonEncodeFloat64(buffer, "totalCost", d.TotalCost(), ",")
+	jsonEncodeString(buffer, "storageClass", d.StorageClass, "")
 	buffer.WriteString("}")
 	buffer.WriteString("}")
 	return buffer.Bytes(), nil
 	return buffer.Bytes(), nil
 }
 }
@@ -332,6 +333,10 @@ func (d *Disk) InterfaceToDisk(itf interface{}) error {
 		d.ByteHours = ByteHours.(float64)
 		d.ByteHours = ByteHours.(float64)
 	}
 	}
 
 
+	if StorageClass, err := getTypedVal(fmap["storageClass"]); err == nil {
+		d.StorageClass = StorageClass.(string)
+	}
+
 	// d.Local is not marhsaled, and cannot be calculated from marshaled values.
 	// d.Local is not marhsaled, and cannot be calculated from marshaled values.
 	// Currently, it is just ignored and not set in the resulting unmarshal to Disk
 	// Currently, it is just ignored and not set in the resulting unmarshal to Disk
 	//  be aware that this means a resulting Disk from an unmarshal is therefore NOT
 	//  be aware that this means a resulting Disk from an unmarshal is therefore NOT

+ 1 - 1
pkg/kubecost/bingen.go

@@ -24,7 +24,7 @@ package kubecost
 // @bingen:generate:Window
 // @bingen:generate:Window
 
 
 // Asset Version Set: Includes Asset pipeline specific resources
 // Asset Version Set: Includes Asset pipeline specific resources
-// @bingen:set[name=Assets,version=16]
+// @bingen:set[name=Assets,version=17]
 // @bingen:generate:Any
 // @bingen:generate:Any
 // @bingen:generate:Asset
 // @bingen:generate:Asset
 // @bingen:generate:AssetLabels
 // @bingen:generate:AssetLabels

+ 24 - 3
pkg/kubecost/kubecost_codecs.go

@@ -13,12 +13,11 @@ package kubecost
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	util "github.com/opencost/opencost/pkg/util"
 	"reflect"
 	"reflect"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
-
-	util "github.com/opencost/opencost/pkg/util"
 )
 )
 
 
 const (
 const (
@@ -38,7 +37,7 @@ const (
 	DefaultCodecVersion uint8 = 15
 	DefaultCodecVersion uint8 = 15
 
 
 	// AssetsCodecVersion is used for any resources listed in the Assets version set
 	// AssetsCodecVersion is used for any resources listed in the Assets version set
-	AssetsCodecVersion uint8 = 16
+	AssetsCodecVersion uint8 = 17
 
 
 	// AllocationCodecVersion is used for any resources listed in the Allocation version set
 	// AllocationCodecVersion is used for any resources listed in the Allocation version set
 	AllocationCodecVersion uint8 = 15
 	AllocationCodecVersion uint8 = 15
@@ -4972,6 +4971,12 @@ func (target *Disk) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 		// --- [end][write][struct](Breakdown) ---
 		// --- [end][write][struct](Breakdown) ---
 
 
 	}
 	}
+	if ctx.IsStringTable() {
+		e := ctx.Table.AddOrGet(target.StorageClass)
+		buff.WriteInt(e) // write table index
+	} else {
+		buff.WriteString(target.StorageClass) // write string
+	}
 	return nil
 	return nil
 }
 }
 
 
@@ -5141,6 +5146,22 @@ func (target *Disk) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 		// --- [end][read][struct](Breakdown) ---
 		// --- [end][read][struct](Breakdown) ---
 
 
 	}
 	}
+	// field version check
+	if uint8(17) <= version {
+		var bb string
+		if ctx.IsStringTable() {
+			cc := buff.ReadInt() // read string index
+			bb = ctx.Table[cc]
+		} else {
+			bb = buff.ReadString() // read string
+		}
+		aa := bb
+		target.StorageClass = aa
+
+	} else {
+		target.StorageClass = "" // default
+	}
+
 	return nil
 	return nil
 }
 }
 
 

+ 61 - 0
pkg/metrics/pvmetrics.go

@@ -28,6 +28,9 @@ func (kpvcb KubePVCollector) Describe(ch chan<- *prometheus.Desc) {
 	if _, disabled := disabledMetrics["kube_persistentvolume_status_phase"]; !disabled {
 	if _, disabled := disabledMetrics["kube_persistentvolume_status_phase"]; !disabled {
 		ch <- prometheus.NewDesc("kube_persistentvolume_status_phase", "The phase indicates if a volume is available, bound to a claim, or released by a claim.", []string{}, nil)
 		ch <- prometheus.NewDesc("kube_persistentvolume_status_phase", "The phase indicates if a volume is available, bound to a claim, or released by a claim.", []string{}, nil)
 	}
 	}
+	if _, disabled := disabledMetrics["kubecost_pv_info"]; !disabled {
+		ch <- prometheus.NewDesc("kubecost_pv_info", "The pv information", []string{}, nil)
+	}
 }
 }
 
 
 // Collect is called by the Prometheus registry when collecting metrics.
 // Collect is called by the Prometheus registry when collecting metrics.
@@ -59,7 +62,12 @@ func (kpvcb KubePVCollector) Collect(ch chan<- prometheus.Metric) {
 		if _, disabled := disabledMetrics["kube_persistentvolume_capacity_bytes"]; !disabled {
 		if _, disabled := disabledMetrics["kube_persistentvolume_capacity_bytes"]; !disabled {
 			storage := pv.Spec.Capacity[v1.ResourceStorage]
 			storage := pv.Spec.Capacity[v1.ResourceStorage]
 			m := newKubePVCapacityBytesMetric("kube_persistentvolume_capacity_bytes", pv.Name, float64(storage.Value()))
 			m := newKubePVCapacityBytesMetric("kube_persistentvolume_capacity_bytes", pv.Name, float64(storage.Value()))
+			ch <- m
+		}
 
 
+		if _, disabled := disabledMetrics["kubecost_pv_info"]; !disabled {
+			storageClass := pv.Spec.StorageClassName
+			m := newKubecostPVInfoMetric("kubecost_pv_info", pv.Name, storageClass, float64(1))
 			ch <- m
 			ch <- m
 		}
 		}
 	}
 	}
@@ -165,3 +173,56 @@ func (kpcrr KubePVStatusPhaseMetric) Write(m *dto.Metric) error {
 	}
 	}
 	return nil
 	return nil
 }
 }
+
+//--------------------------------------------------------------------------
+//  KubecostPVInfoMetric
+//--------------------------------------------------------------------------
+// KubecostPVInfoMetric is a prometheus.Metric
+type KubecostPVInfoMetric struct {
+	fqName       string
+	help         string
+	pv           string
+	storageClass string
+	value        float64
+}
+
+// Creates a new newKubecostPVInfoMetric, implementation of prometheus.Metric
+func newKubecostPVInfoMetric(fqname, pv, storageClass string, value float64) KubecostPVInfoMetric {
+	return KubecostPVInfoMetric{
+		fqName:       fqname,
+		help:         "kubecost_pv_info pv info",
+		pv:           pv,
+		storageClass: storageClass,
+		value:        value,
+	}
+}
+
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
+func (kpvim KubecostPVInfoMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{
+		"persistentvolume": kpvim.pv,
+		"storageclass":     kpvim.storageClass,
+	}
+	return prometheus.NewDesc(kpvim.fqName, kpvim.help, []string{}, l)
+}
+
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
+func (kpvim KubecostPVInfoMetric) Write(m *dto.Metric) error {
+	m.Gauge = &dto.Gauge{
+		Value: &kpvim.value,
+	}
+
+	m.Label = []*dto.LabelPair{
+		{
+			Name:  toStringPtr("persistentvolume"),
+			Value: &kpvim.pv,
+		},
+		{
+			Name:  toStringPtr("storageclass"),
+			Value: &kpvim.storageClass,
+		},
+	}
+	return nil
+}