Ver Fonte

Provider csi vols ii (#2126)

* when possible, surface volume handle as provider id for PVs

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
(cherry picked from commit c2c5ecbaf9717f62ab761b4c2d08b27a8c71dcfa)

* add filtering

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
(cherry picked from commit 9c47e80987b3b2b59e1efa2a23ba8586d3b2e95d)

* re-write to pass through allocations, not assets

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
(cherry picked from commit a1440622c6fb4462c11369166ab21be0d24f6aad)

* revert cluster.go changes

Signed-off-by: Alex Meijer <ameijer@kubecost.com>

* re-add missing methods

Signed-off-by: Alex Meijer <ameijer@kubecost.com>

* re-add missing key method

Signed-off-by: Alex Meijer <ameijer@kubecost.com>

* fix test

Signed-off-by: Alex Meijer <ameijer@kubecost.com>

---------

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
Alex Meijer há 2 anos atrás
pai
commit
ede901a0fc

+ 6 - 1
pkg/costmodel/allocation.go

@@ -34,6 +34,7 @@ const (
 	queryFmtPVActiveMins                = `count(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%s]`
 	queryFmtPVBytes                     = `avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (persistentvolume, %s)`
 	queryFmtPVCostPerGiBHour            = `avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (volumename, %s)`
+	queryFmtPVMeta                      = `avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, provider_id)`
 	queryFmtNetZoneGiB                  = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
 	queryFmtNetZoneCostPerGiB           = `avg(avg_over_time(kubecost_network_zone_egress_cost{%s}[%s])) by (%s)`
 	queryFmtNetRegionGiB                = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
@@ -454,6 +455,9 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
 	resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
 
+	queryPVMeta := fmt.Sprintf(queryFmtPVMeta, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+	resChPVMeta := ctx.QueryAtTime(queryPVMeta, end)
+
 	queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
 	resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
 
@@ -545,6 +549,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	resPVActiveMins, _ := resChPVActiveMins.Await()
 	resPVBytes, _ := resChPVBytes.Await()
 	resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
+	resPVMeta, _ := resChPVMeta.Await()
 
 	resPVCInfo, _ := resChPVCInfo.Await()
 	resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
@@ -650,7 +655,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	// a PVC, we get time running there, so this is only inaccurate
 	// for short-lived, unmounted PVs.)
 	pvMap := map[pvKey]*pv{}
-	buildPVMap(resolution, pvMap, resPVCostPerGiBHour, resPVActiveMins, window)
+	buildPVMap(resolution, pvMap, resPVCostPerGiBHour, resPVActiveMins, resPVMeta, window)
 	applyPVBytes(pvMap, resPVBytes)
 
 	// Build out the map of all PVCs with time running, bytes requested,

+ 23 - 3
pkg/costmodel/allocation_helpers.go

@@ -1776,7 +1776,7 @@ func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*nodePricing, nodeKey no
 
 /* PV/PVC Helpers */
 
-func buildPVMap(resolution time.Duration, pvMap map[pvKey]*pv, resPVCostPerGiBHour, resPVActiveMins []*prom.QueryResult, window kubecost.Window) {
+func buildPVMap(resolution time.Duration, pvMap map[pvKey]*pv, resPVCostPerGiBHour, resPVActiveMins, resPVMeta []*prom.QueryResult, window kubecost.Window) {
 	for _, result := range resPVActiveMins {
 		key, err := resultPVKey(result, env.GetPromClusterLabel(), "persistentvolume")
 		if err != nil {
@@ -1813,6 +1813,25 @@ func buildPVMap(resolution time.Duration, pvMap map[pvKey]*pv, resPVCostPerGiBHo
 		pvMap[key].CostPerGiBHour = result.Values[0].Value
 
 	}
+
+	for _, result := range resPVMeta {
+		key, err := resultPVKey(result, env.GetPromClusterLabel(), "persistentvolume")
+		if err != nil {
+			log.Warnf("error getting key for PV: %v", err)
+			continue
+		}
+
+		// only add metadata for disks that exist in the other metrics
+		if _, ok := pvMap[key]; ok {
+			provId, err := result.GetString("provider_id")
+			if err != nil {
+				log.Warnf("error getting provider id for PV %v: %v", key, err)
+				continue
+			}
+			pvMap[key].ProviderID = provId
+		}
+
+	}
 }
 
 func applyPVBytes(pvMap map[pvKey]*pv, resPVBytes []*prom.QueryResult) {
@@ -2057,8 +2076,9 @@ func applyPVCsToPods(window kubecost.Window, podMap map[podKey]*pod, podPVCMap m
 				// would be equal to the values of the original pv
 				count := float64(len(pod.Allocations))
 				alloc.PVs[pvKey] = &kubecost.PVAllocation{
-					ByteHours: byteHours * coef / count,
-					Cost:      cost * coef / count,
+					ByteHours:  byteHours * coef / count,
+					Cost:       cost * coef / count,
+					ProviderID: pvc.Volume.ProviderID,
 				}
 			}
 		}

+ 1 - 1
pkg/costmodel/allocation_helpers_test.go

@@ -354,7 +354,7 @@ func TestBuildPVMap(t *testing.T) {
 	for name, testCase := range testCases {
 		t.Run(name, func(t *testing.T) {
 			pvMap := make(map[pvKey]*pv)
-			buildPVMap(testCase.resolution, pvMap, testCase.resultsPVCostPerGiBHour, testCase.resultsActiveMinutes, window)
+			buildPVMap(testCase.resolution, pvMap, testCase.resultsPVCostPerGiBHour, testCase.resultsActiveMinutes, []*prom.QueryResult{}, window)
 			if len(pvMap) != len(testCase.expected) {
 				t.Errorf("pv map does not have the expected length %d : %d", len(pvMap), len(testCase.expected))
 			}

+ 2 - 1
pkg/costmodel/allocation_types.go

@@ -132,6 +132,7 @@ type pv struct {
 	Cluster        string    `json:"cluster"`
 	Name           string    `json:"name"`
 	StorageClass   string    `json:"storageClass"`
+	ProviderID     string    `json:"providerID"`
 }
 
 func (p *pv) clone() *pv {
@@ -190,7 +191,7 @@ func (p *pv) String() string {
 	if p == nil {
 		return "<nil>"
 	}
-	return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", p.Cluster, p.Name, p.Bytes, p.CostPerGiBHour, p.StorageClass)
+	return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s, ProviderID: %s}", p.Cluster, p.Name, p.Bytes, p.CostPerGiBHour, p.StorageClass, p.ProviderID)
 }
 
 func (p *pv) minutes() float64 {

+ 30 - 0
pkg/costmodel/handlers.go

@@ -6,6 +6,9 @@ import (
 
 	"github.com/julienschmidt/httprouter"
 	"github.com/opencost/opencost/pkg/env"
+	assetfilter "github.com/opencost/opencost/pkg/filter21/asset"
+	"github.com/opencost/opencost/pkg/filter21/ast"
+	"github.com/opencost/opencost/pkg/filter21/matcher"
 	"github.com/opencost/opencost/pkg/kubecost"
 	"github.com/opencost/opencost/pkg/util/httputil"
 )
@@ -29,6 +32,33 @@ func (a *Accesses) ComputeAssetsHandler(w http.ResponseWriter, r *http.Request,
 		http.Error(w, fmt.Sprintf("Error computing asset set: %s", err), http.StatusInternalServerError)
 		return
 	}
+	filterString := qp.Get("filter", "")
+
+	var filter kubecost.AssetMatcher
+	if filterString == "" {
+		filter = &matcher.AllPass[kubecost.Asset]{}
+	} else {
+		parser := assetfilter.NewAssetFilterParser()
+		tree, errParse := parser.Parse(filterString)
+		if errParse != nil {
+			http.Error(w, fmt.Sprintf("err parsing filter '%s': %v", ast.ToPreOrderShortString(tree), errParse), http.StatusBadRequest)
+		}
+		compiler := kubecost.NewAssetMatchCompiler()
+		var err error
+		filter, err = compiler.Compile(tree)
+		if err != nil {
+			http.Error(w, fmt.Sprintf("err compiling filter '%s': %v", ast.ToPreOrderShortString(tree), err), http.StatusBadRequest)
+		}
+	}
+	if filter == nil {
+		http.Error(w, fmt.Sprintf("unexpected nil filter"), http.StatusInternalServerError)
+	}
+
+	for key, asset := range assetSet.Assets {
+		if !filter.Matches(asset) {
+			delete(assetSet.Assets, key)
+		}
+	}
 
 	w.Write(WrapData(assetSet, nil))
 }

+ 12 - 4
pkg/kubecost/allocation.go

@@ -213,8 +213,9 @@ func (pv PVAllocations) Clone() PVAllocations {
 	clonePV := make(map[PVKey]*PVAllocation, len(pv))
 	for k, v := range pv {
 		clonePV[k] = &PVAllocation{
-			ByteHours: v.ByteHours,
-			Cost:      v.Cost,
+			ByteHours:  v.ByteHours,
+			Cost:       v.Cost,
+			ProviderID: v.ProviderID,
 		}
 	}
 	return clonePV
@@ -235,6 +236,11 @@ func (pv PVAllocations) Add(that PVAllocations) PVAllocations {
 			apvAlloc.Cost += thatPVAlloc.Cost
 			apvAlloc.ByteHours += thatPVAlloc.ByteHours
 			apv[pvKey] = apvAlloc
+			if apvAlloc.ProviderID == thatPVAlloc.ProviderID {
+				apv[pvKey].ProviderID = apvAlloc.ProviderID
+			} else {
+				apv[pvKey].ProviderID = ""
+			}
 		}
 	}
 	return apv
@@ -294,8 +300,9 @@ func (pvk *PVKey) FromString(key string) error {
 // PVAllocation contains the byte hour usage
 // and cost of an Allocation for a single PV
 type PVAllocation struct {
-	ByteHours float64 `json:"byteHours"`
-	Cost      float64 `json:"cost"`
+	ByteHours  float64 `json:"byteHours"`
+	Cost       float64 `json:"cost"`
+	ProviderID string  `json:"providerID"` // @bingen:field[version=20]
 }
 
 // Equal returns true if the two PVAllocation instances contain approximately the same
@@ -2289,6 +2296,7 @@ func deriveProportionalAssetResourceCosts(options *AllocationAggregationOptions,
 				Cluster:            name.Cluster,
 				Name:               name.Name,
 				Type:               "PV",
+				ProviderID:         pvAlloc.ProviderID,
 				PVProportionalCost: pvAlloc.Cost,
 			}, options.IdleByNode)
 		}

+ 1 - 1
pkg/kubecost/bingen.go

@@ -46,7 +46,7 @@ package kubecost
 // @bingen:end
 
 // Allocation Version Set: Includes Allocation pipeline specific resources
-// @bingen:set[name=Allocation,version=19]
+// @bingen:set[name=Allocation,version=20]
 // @bingen:generate:Allocation
 // @bingen:generate[stringtable]:AllocationSet
 // @bingen:generate:AllocationSetRange

+ 27 - 6
pkg/kubecost/kubecost_codecs.go

@@ -13,12 +13,11 @@ package kubecost
 
 import (
 	"fmt"
+	util "github.com/opencost/opencost/pkg/util"
 	"reflect"
 	"strings"
 	"sync"
 	"time"
-
-	util "github.com/opencost/opencost/pkg/util"
 )
 
 const (
@@ -34,17 +33,17 @@ const (
 )
 
 const (
+	// DefaultCodecVersion is used for any resources listed in the Default version set
+	DefaultCodecVersion uint8 = 17
+
 	// AssetsCodecVersion is used for any resources listed in the Assets version set
 	AssetsCodecVersion uint8 = 21
 
 	// AllocationCodecVersion is used for any resources listed in the Allocation version set
-	AllocationCodecVersion uint8 = 19
+	AllocationCodecVersion uint8 = 20
 
 	// CloudCostCodecVersion is used for any resources listed in the CloudCost version set
 	CloudCostCodecVersion uint8 = 2
-
-	// DefaultCodecVersion is used for any resources listed in the Default version set
-	DefaultCodecVersion uint8 = 17
 )
 
 //--------------------------------------------------------------------------
@@ -6633,6 +6632,12 @@ func (target *PVAllocation) MarshalBinaryWithContext(ctx *EncodingContext) (err
 
 	buff.WriteFloat64(target.ByteHours) // write float64
 	buff.WriteFloat64(target.Cost)      // write float64
+	if ctx.IsStringTable() {
+		a := ctx.Table.AddOrGet(target.ProviderID)
+		buff.WriteInt(a) // write table index
+	} else {
+		buff.WriteString(target.ProviderID) // write string
+	}
 	return nil
 }
 
@@ -6696,6 +6701,22 @@ func (target *PVAllocation) UnmarshalBinaryWithContext(ctx *DecodingContext) (er
 	b := buff.ReadFloat64() // read float64
 	target.Cost = b
 
+	// field version check
+	if uint8(20) <= version {
+		var d string
+		if ctx.IsStringTable() {
+			e := buff.ReadInt() // read string index
+			d = ctx.Table[e]
+		} else {
+			d = buff.ReadString() // read string
+		}
+		c := d
+		target.ProviderID = c
+
+	} else {
+		target.ProviderID = "" // default
+	}
+
 	return nil
 }
 

+ 14 - 2
pkg/metrics/pvmetrics.go

@@ -67,7 +67,12 @@ func (kpvcb KubePVCollector) Collect(ch chan<- prometheus.Metric) {
 
 		if _, disabled := disabledMetrics["kubecost_pv_info"]; !disabled {
 			storageClass := pv.Spec.StorageClassName
-			m := newKubecostPVInfoMetric("kubecost_pv_info", pv.Name, storageClass, float64(1))
+			providerID := pv.Name
+			// if a more accurate provider ID is available, use that
+			if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
+				providerID = pv.Spec.CSI.VolumeHandle
+			}
+			m := newKubecostPVInfoMetric("kubecost_pv_info", pv.Name, storageClass, providerID, float64(1))
 			ch <- m
 		}
 	}
@@ -186,16 +191,18 @@ type KubecostPVInfoMetric struct {
 	pv           string
 	storageClass string
 	value        float64
+	providerId   string
 }
 
 // Creates a new newKubecostPVInfoMetric, implementation of prometheus.Metric
-func newKubecostPVInfoMetric(fqname, pv, storageClass string, value float64) KubecostPVInfoMetric {
+func newKubecostPVInfoMetric(fqname, pv, storageClass, providerID string, value float64) KubecostPVInfoMetric {
 	return KubecostPVInfoMetric{
 		fqName:       fqname,
 		help:         "kubecost_pv_info pv info",
 		pv:           pv,
 		storageClass: storageClass,
 		value:        value,
+		providerId:   providerID,
 	}
 }
 
@@ -205,6 +212,7 @@ func (kpvim KubecostPVInfoMetric) Desc() *prometheus.Desc {
 	l := prometheus.Labels{
 		"persistentvolume": kpvim.pv,
 		"storageclass":     kpvim.storageClass,
+		"provider_id":      kpvim.providerId,
 	}
 	return prometheus.NewDesc(kpvim.fqName, kpvim.help, []string{}, l)
 }
@@ -225,6 +233,10 @@ func (kpvim KubecostPVInfoMetric) Write(m *dto.Metric) error {
 			Name:  toStringPtr("storageclass"),
 			Value: &kpvim.storageClass,
 		},
+		{
+			Name:  toStringPtr("provider_id"),
+			Value: &kpvim.providerId,
+		},
 	}
 	return nil
 }