Jelajahi Sumber

KubeModel: units and TODOs

Niko Kovacevic 6 bulan lalu
induk
melakukan
d25cf7811f

+ 0 - 2
core/pkg/exporter/pathing/kubemodelpath.go

@@ -24,8 +24,6 @@ type KubeModelStoragePathFormatter struct {
 	dir string
 }
 
-// TODO: we need to figure out how to get the proper clusterId here, if we're
-// going to use the kube-system namespace UID as the real cluster ID.
 func NewKubeModelStoragePathFormatter(rootDir, clusterId, resolution string) (StoragePathFormatter[opencost.Window], error) {
 	if clusterId == "" {
 		return nil, fmt.Errorf("cluster id cannot be empty")

+ 0 - 2
core/pkg/model/kubemodel/cluster.go

@@ -4,8 +4,6 @@ import (
 	"time"
 )
 
-// TODO: do we need (start, end) for Cluster?
-
 // @bingen:generate:Cluster
 type Cluster struct {
 	UID      string    `json:"uid"`      // @bingen:field[version=1]

+ 12 - 12
core/pkg/model/kubemodel/kubemodel_codecs.go

@@ -1493,14 +1493,14 @@ func (target *ResourceQuantity) MarshalBinaryWithContext(ctx *EncodingContext) (
 	// --- [end][write][alias](Unit) ---
 
 	// --- [begin][write][alias](Stats) ---
-	if map[StatType]float64(target.Values) == nil {
+	if map[StatType]uint64(target.Values) == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
 	} else {
 		buff.WriteUInt8(uint8(1)) // write non-nil byte
 
-		// --- [begin][write][map](map[StatType]float64) ---
-		buff.WriteInt(len(map[StatType]float64(target.Values))) // map length
-		for v, z := range map[StatType]float64(target.Values) {
+		// --- [begin][write][map](map[StatType]uint64) ---
+		buff.WriteInt(len(map[StatType]uint64(target.Values))) // map length
+		for v, z := range map[StatType]uint64(target.Values) {
 			// --- [begin][write][alias](StatType) ---
 			if ctx.IsStringTable() {
 				c := ctx.Table.AddOrGet(string(v))
@@ -1510,9 +1510,9 @@ func (target *ResourceQuantity) MarshalBinaryWithContext(ctx *EncodingContext) (
 			}
 			// --- [end][write][alias](StatType) ---
 
-			buff.WriteFloat64(z) // write float64
+			buff.WriteUInt64(z) // write uint64
 		}
-		// --- [end][write][map](map[StatType]float64) ---
+		// --- [end][write][map](map[StatType]uint64) ---
 
 	}
 	// --- [end][write][alias](Stats) ---
@@ -1617,13 +1617,13 @@ func (target *ResourceQuantity) UnmarshalBinaryWithContext(ctx *DecodingContext)
 	// field version check
 	if uint8(1) <= version {
 		// --- [begin][read][alias](Stats) ---
-		var k map[StatType]float64
+		var k map[StatType]uint64
 		if buff.ReadUInt8() == uint8(0) {
 			k = nil
 		} else {
-			// --- [begin][read][map](map[StatType]float64) ---
+			// --- [begin][read][map](map[StatType]uint64) ---
 			m := buff.ReadInt() // map len
-			l := make(map[StatType]float64, m)
+			l := make(map[StatType]uint64, m)
 			for i := 0; i < m; i++ {
 				// --- [begin][read][alias](StatType) ---
 				var n string
@@ -1640,14 +1640,14 @@ func (target *ResourceQuantity) UnmarshalBinaryWithContext(ctx *DecodingContext)
 				v := StatType(n)
 				// --- [end][read][alias](StatType) ---
 
-				var z float64
-				r := buff.ReadFloat64() // read float64
+				var z uint64
+				r := buff.ReadUInt64() // read uint64
 				z = r
 
 				l[v] = z
 			}
 			k = l
-			// --- [end][read][map](map[StatType]float64) ---
+			// --- [end][read][map](map[StatType]uint64) ---
 
 		}
 		target.Values = Stats(k)

+ 1 - 1
core/pkg/model/kubemodel/resource.go

@@ -20,7 +20,7 @@ type ResourceQuantity struct {
 // @bingen:generate:ResourceQuantities
 type ResourceQuantities map[Resource]ResourceQuantity
 
-func (rqs ResourceQuantities) Set(resource Resource, unit Unit, statType StatType, value float64) {
+func (rqs ResourceQuantities) Set(resource Resource, unit Unit, statType StatType, value uint64) {
 	if _, ok := rqs[resource]; !ok {
 		rqs[resource] = ResourceQuantity{
 			Resource: resource,

+ 2 - 0
core/pkg/model/kubemodel/resourcequota.go

@@ -2,6 +2,8 @@ package kubemodel
 
 import "fmt"
 
+// TODO: Do we need (Start, End) for these?
+
 // @bingen:generate:ResourceQuota
 type ResourceQuota struct {
 	UID          string               `json:"uid"`          // @bingen:field[version=1]

+ 13 - 48
core/pkg/model/kubemodel/stats.go

@@ -1,11 +1,5 @@
 package kubemodel
 
-import (
-	"errors"
-	"fmt"
-	"math"
-)
-
 // @bingen:generate:StatType
 type StatType string
 
@@ -18,20 +12,20 @@ const (
 )
 
 // @bingen:generate:Stats
-type Stats map[StatType]float64
+type Stats map[StatType]uint64
 
 func NewStats(capacity ...int) Stats {
 	if len(capacity) == 1 {
-		s := make(map[StatType]float64, capacity[0])
+		s := make(map[StatType]uint64, capacity[0])
 		return s
 	}
 
-	return map[StatType]float64{}
+	return map[StatType]uint64{}
 }
 
-func (s Stats) Avg() (float64, bool) {
+func (s Stats) Avg() (uint64, bool) {
 	if s == nil {
-		return 0.0, false
+		return 0, false
 	}
 
 	val, ok := s[StatAvg]
@@ -39,9 +33,9 @@ func (s Stats) Avg() (float64, bool) {
 	return val, ok
 }
 
-func (s Stats) Max() (float64, bool) {
+func (s Stats) Max() (uint64, bool) {
 	if s == nil {
-		return 0.0, false
+		return 0, false
 	}
 
 	val, ok := s[StatMax]
@@ -49,9 +43,9 @@ func (s Stats) Max() (float64, bool) {
 	return val, ok
 }
 
-func (s Stats) Min() (float64, bool) {
+func (s Stats) Min() (uint64, bool) {
 	if s == nil {
-		return 0.0, false
+		return 0, false
 	}
 
 	val, ok := s[StatMin]
@@ -59,9 +53,9 @@ func (s Stats) Min() (float64, bool) {
 	return val, ok
 }
 
-func (s Stats) P95() (float64, bool) {
+func (s Stats) P95() (uint64, bool) {
 	if s == nil {
-		return 0.0, false
+		return 0, false
 	}
 
 	val, ok := s[StatP95]
@@ -69,41 +63,12 @@ func (s Stats) P95() (float64, bool) {
 	return val, ok
 }
 
-func (s Stats) P85() (float64, bool) {
+func (s Stats) P85() (uint64, bool) {
 	if s == nil {
-		return 0.0, false
+		return 0, false
 	}
 
 	val, ok := s[StatP85]
 
 	return val, ok
 }
-
-func (s Stats) Sanitize() error {
-	if s == nil {
-		return nil
-	}
-
-	var errs []error
-
-	for t := range s {
-		if math.IsNaN(s[t]) {
-			delete(s, t)
-			errs = append(errs, fmt.Errorf("%v is NaN", t))
-		}
-		if math.IsInf(s[t], 0) {
-			delete(s, t)
-			errs = append(errs, fmt.Errorf("%v is Inf", t))
-		}
-	}
-
-	if len(errs) > 0 {
-		errStr := fmt.Sprintf("%d errors:", len(errs))
-		for _, e := range errs {
-			errStr += fmt.Sprintf(" [%s]", e)
-		}
-		return errors.New(errStr)
-	}
-
-	return nil
-}

+ 0 - 59
core/pkg/model/kubemodel/stats_test.go

@@ -1,59 +0,0 @@
-package kubemodel
-
-import (
-	"errors"
-	"math"
-	"testing"
-)
-
-func TestStats_Sanitize(t *testing.T) {
-	type testCase struct {
-		stats Stats
-		exp   error
-	}
-
-	testCases := []testCase{
-		{
-			nil,
-			nil,
-		},
-		{
-			Stats{},
-			nil,
-		},
-		{
-			Stats{
-				StatAvg: 0.1,
-				StatMax: 1.0,
-			},
-			nil,
-		},
-		{
-			Stats{
-				StatAvg: math.Inf(0),
-				StatMax: 1.0,
-			},
-			errors.New("1 errors: [avg is Inf]"),
-		},
-		{
-			Stats{
-				StatAvg: math.Inf(0),
-				StatMax: math.NaN(),
-			},
-			errors.New("2 errors: [avg is Inf] [max is NaN]"),
-		},
-	}
-
-	for _, tc := range testCases {
-		err := tc.stats.Sanitize()
-		if err != nil && tc.exp == nil {
-			t.Errorf("unexpected error: %s", err)
-		}
-		if err == nil && tc.exp != nil {
-			t.Errorf("expected error: %s", tc.exp)
-		}
-		if err != nil && tc.exp != nil && err.Error()[0] != tc.exp.Error()[0] {
-			t.Errorf("expected error: %s; received error: %s", tc.exp, err)
-		}
-	}
-}

+ 5 - 13
core/pkg/model/kubemodel/unit.go

@@ -4,17 +4,9 @@ package kubemodel
 type Unit string
 
 const (
-	UnitMillicore     = "mCPU"
-	UnitByte          = "B"
-	UnitMi            = "Mi"
-	UnitGB            = "GB"
-	UnitGPU           = "GPU"
-	UnitSecond        = "s"
-	UnitMinute        = "m"
-	UnitHour          = "h"
-	UnitMillicoreHour = "m-h"
-	UnitByteHour      = "B-h"
-	UnitMiHour        = "Mi-h"
-	UnitGBHour        = "GB-h"
-	UnitGPUHour       = "GPU-h"
+	UnitMillicore       = "m"
+	UnitByte            = "B"
+	UnitSecond          = "s"
+	UnitMillicoreSecond = "m-s"
+	UnitByteSecond      = "B-s"
 )

+ 2 - 3
pkg/costmodel/costmodel.go

@@ -63,11 +63,10 @@ func NewCostModel(
 	// request grouping to prevent over-requesting the same data prior to caching
 	requestGroup := new(singleflight.Group)
 
-	// TODO: is this too contrived??
 	kubeModel, err := km.NewKubeModel(dataSource)
 	if err != nil {
-		// TODO: what to do here?
-		panic(err)
+		// KubeModel is required. Log a fatal error if we fail to init.
+		log.Fatalf("error initializing KubeModel: %s", err)
 	}
 
 	return &CostModel{

+ 27 - 40
pkg/kubemodel/kubemodel.go

@@ -6,7 +6,6 @@ import (
 	"time"
 
 	"github.com/opencost/opencost/core/pkg/env"
-	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/model/kubemodel"
 	"github.com/opencost/opencost/core/pkg/source"
 )
@@ -25,9 +24,9 @@ func NewKubeModel(dataSource source.OpenCostDataSource) (*KubeModel, error) {
 
 	km := &KubeModel{ds: dataSource}
 
-	clusterUID, err := km.computeClusterUID(time.Now().UTC())
+	clusterUID, err := km.computeClusterUID()
 	if err != nil {
-		return nil, fmt.Errorf("error computing cluster UID: %w", err)
+		return nil, fmt.Errorf("error determining cluster UID: %w", err)
 	}
 
 	km.clusterUID = clusterUID
@@ -70,32 +69,20 @@ func (km *KubeModel) ComputeKubeModelSet(start, end time.Time) (*kubemodel.KubeM
 	return kms, nil
 }
 
-// TODO: come up with a better way to pull kube-system namespace UID from Metrics()?
-func (km *KubeModel) computeClusterUID(start time.Time) (string, error) {
-	// TODO: what (start, end) here? will this always work? or will it fail,
-	// e.g. right after a clean install?
-	start = start.Truncate(km.ds.Resolution())
-	end := start.Add(km.ds.Resolution())
-
-	nsLabelsResult, _ := km.ds.Metrics().QueryNamespaceLabels(start, end).Await()
-	for _, res := range nsLabelsResult {
-		if res.Namespace == "kube-system" {
-			log.Infof("KubeModel: detected cluster UID from kube-system: %s", res.UID)
-			return res.UID, nil
-		}
-	}
-
+func (km *KubeModel) computeClusterUID() (string, error) {
+	// TODO: Replace with kube-system namespace UID when we have a reliable way
+	// to query for it.
 	clusterUID := env.GetClusterID()
-	if clusterUID != "" {
-		log.Warnf("KubeModel: failed to infer cluster UID from kube-system: using env var: %s", clusterUID)
-		return clusterUID, nil
+	if clusterUID == "" {
+		return "", errors.New("failed to detect cluster UID")
 	}
 
-	return "", errors.New("failed to detect cluster UID")
+	return clusterUID, nil
 }
 
-// TODO: should we periodically check the ClusterUID?
-// TODO: where do we get the additional information? km.ds.ClusterInfo().GetClusterInfo() is a map[string]string...
+// TODO: How do we pull kube-system namespace UID for Cluster?
+// TODO: How do we populate (Start, End) for Cluster? A new cluster_info metric?
+// TODO: Where do we get the additional information? km.ds.ClusterInfo().GetClusterInfo()?
 func (km *KubeModel) computeCluster(kms *kubemodel.KubeModelSet) error {
 	kms.Cluster = &kubemodel.Cluster{
 		UID:  km.clusterUID,
@@ -159,105 +146,105 @@ func (km *KubeModel) computeResourceQuotas(kms *kubemodel.KubeModelSet, start, e
 	rqSpecCPURequestAverageResult, _ := rqSpecCPURequestAverageResultFuture.Await()
 	for _, res := range rqSpecCPURequestAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		mcpu := res.Data[0].Value * 1000
+		mcpu := uint64(res.Data[0].Value * 1000)
 		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
 	}
 
 	rqSpecCPURequestMaxResult, _ := rqSpecCPURequestMaxResultFuture.Await()
 	for _, res := range rqSpecCPURequestMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		mcpu := res.Data[0].Value * 1000
+		mcpu := uint64(res.Data[0].Value * 1000)
 		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
 	}
 
 	rqSpecRAMRequestAverageResult, _ := rqSpecRAMRequestAverageResultFuture.Await()
 	for _, res := range rqSpecRAMRequestAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Data[0].Value)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, uint64(res.Data[0].Value))
 	}
 
 	rqSpecRAMRequestMaxResult, _ := rqSpecRAMRequestMaxResultFuture.Await()
 	for _, res := range rqSpecRAMRequestMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Data[0].Value)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, uint64(res.Data[0].Value))
 	}
 
 	rqSpecCPULimitAverageResult, _ := rqSpecCPULimitAverageResultFuture.Await()
 	for _, res := range rqSpecCPULimitAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		mcpu := res.Data[0].Value * 1000
+		mcpu := uint64(res.Data[0].Value * 1000)
 		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
 	}
 
 	rqSpecCPULimitMaxResult, _ := rqSpecCPULimitMaxResultFuture.Await()
 	for _, res := range rqSpecCPULimitMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		mcpu := res.Data[0].Value * 1000
+		mcpu := uint64(res.Data[0].Value * 1000)
 		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
 	}
 
 	rqSpecRAMLimitAverageResult, _ := rqSpecRAMLimitAverageResultFuture.Await()
 	for _, res := range rqSpecRAMLimitAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Data[0].Value)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, uint64(res.Data[0].Value))
 	}
 
 	rqSpecRAMLimitMaxResult, _ := rqSpecRAMLimitMaxResultFuture.Await()
 	for _, res := range rqSpecRAMLimitMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Data[0].Value)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, uint64(res.Data[0].Value))
 	}
 
 	rqStatusUsedCPURequestAverageResult, _ := rqStatusUsedCPURequestAverageResultFuture.Await()
 	for _, res := range rqStatusUsedCPURequestAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		mcpu := res.Data[0].Value * 1000
+		mcpu := uint64(res.Data[0].Value * 1000)
 		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
 	}
 
 	rqStatusUsedCPURequestMaxResult, _ := rqStatusUsedCPURequestMaxResultFuture.Await()
 	for _, res := range rqStatusUsedCPURequestMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		mcpu := res.Data[0].Value * 1000
+		mcpu := uint64(res.Data[0].Value * 1000)
 		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
 	}
 
 	rqStatusUsedRAMRequestAverageResult, _ := rqStatusUsedRAMRequestAverageResultFuture.Await()
 	for _, res := range rqStatusUsedRAMRequestAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Data[0].Value)
+		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, uint64(res.Data[0].Value))
 	}
 
 	rqStatusUsedRAMRequestMaxResult, _ := rqStatusUsedRAMRequestMaxResultFuture.Await()
 	for _, res := range rqStatusUsedRAMRequestMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Data[0].Value)
+		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, uint64(res.Data[0].Value))
 	}
 
 	rqStatusUsedCPULimitAverageResult, _ := rqStatusUsedCPULimitAverageResultFuture.Await()
 	for _, res := range rqStatusUsedCPULimitAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		mcpu := res.Data[0].Value * 1000
+		mcpu := uint64(res.Data[0].Value * 1000)
 		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
 	}
 
 	rqStatusUsedCPULimitMaxResult, _ := rqStatusUsedCPULimitMaxResultFuture.Await()
 	for _, res := range rqStatusUsedCPULimitMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		mcpu := res.Data[0].Value * 1000
+		mcpu := uint64(res.Data[0].Value * 1000)
 		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
 	}
 
 	rqStatusUsedRAMLimitAverageResult, _ := rqStatusUsedRAMLimitAverageResultFuture.Await()
 	for _, res := range rqStatusUsedRAMLimitAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Data[0].Value)
+		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, uint64(res.Data[0].Value))
 	}
 
 	rqStatusUsedRAMLimitMaxResult, _ := rqStatusUsedRAMLimitMaxResultFuture.Await()
 	for _, res := range rqStatusUsedRAMLimitMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Data[0].Value)
+		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, uint64(res.Data[0].Value))
 	}
 
 	return nil