Преглед изворни кода

KubeModel: ResourceQuotas compute; Stats package; indexing for Namespaces

Niko Kovacevic пре 6 месеци
родитељ
комит
ef8452b16c

+ 17 - 7
core/pkg/model/kubemodel/kubemodel.go

@@ -2,6 +2,7 @@ package kubemodel
 
 import (
 	"errors"
+	"fmt"
 	"time"
 )
 
@@ -11,10 +12,14 @@ type KubeModelSet struct {
 	Cluster        *Cluster
 	Namespaces     map[string]*Namespace
 	ResourceQuotas map[string]*ResourceQuota
-	idx            *kubeModelSetIndexes
+	indexes        *kubeModelSetIndexes
 }
 
 func NewKubeModelSet(start, end time.Time) *KubeModelSet {
+	indexes := &kubeModelSetIndexes{
+		namespaceToNamespaceUID: map[string]string{},
+	}
+
 	return &KubeModelSet{
 		Metadata: &KubeModelSetMetadata{
 			CreatedAt: time.Now().UTC(),
@@ -25,10 +30,11 @@ func NewKubeModelSet(start, end time.Time) *KubeModelSet {
 		},
 		Namespaces:     map[string]*Namespace{},
 		ResourceQuotas: map[string]*ResourceQuota{},
+		indexes:        indexes,
 	}
 }
 
-func (kms *KubeModelSet) RegisterNamespace(uid string) error {
+func (kms *KubeModelSet) RegisterNamespace(uid, name string) error {
 	if _, ok := kms.Namespaces[uid]; !ok {
 		if kms.Cluster == nil {
 			return errors.New("KubeModelSet missing Cluster")
@@ -37,23 +43,27 @@ func (kms *KubeModelSet) RegisterNamespace(uid string) error {
 		kms.Namespaces[uid] = &Namespace{
 			UID:        uid,
 			ClusterUID: kms.Cluster.UID,
+			Name:       name,
 		}
 
-		// TODO: index namespace name-to-UID
+		kms.indexes.namespaceToNamespaceUID[name] = uid
 	}
 
 	return nil
 }
 
-func (kms *KubeModelSet) RegisterResourceQuota(uid string) error {
+func (kms *KubeModelSet) RegisterResourceQuota(uid, name, namespace string) error {
 	if _, ok := kms.ResourceQuotas[uid]; !ok {
-		if kms.Cluster != nil {
-			return errors.New("KubeModelSet missing Cluster")
+		if _, ok := kms.indexes.namespaceToNamespaceUID[namespace]; !ok {
+			return fmt.Errorf("KubeModelSet missing NamespaceUID for namespace=%s", namespace)
 		}
 
 		kms.ResourceQuotas[uid] = &ResourceQuota{
 			UID:          uid,
-			NamespaceUID: kms.Cluster.UID,
+			Name:         name,
+			NamespaceUID: kms.indexes.namespaceToNamespaceUID[namespace],
+			Spec:         &ResourceQuotaSpec{Hard: &ResourceQuotaSpecHard{}},
+			Status:       &ResourceQuotaStatus{Used: &ResourceQuotaStatusUsed{}},
 		}
 	}
 

+ 33 - 14
core/pkg/model/kubemodel/resource.go

@@ -1,30 +1,49 @@
 package kubemodel
 
+import "github.com/opencost/opencost/core/pkg/stats"
+
 type Resource string
 
 const (
-	ResourceCPU    = "cpu"
-	ResourceMemory = "memory"
-	ResourceGPU    = "gpu"
+	ResourceCPU    Resource = "cpu"
+	ResourceMemory Resource = "memory"
+	ResourceGPU    Resource = "gpu"
 )
 
 type Unit string
 
 const (
-	UnitCPUm       = "m"
-	UnitMemoryMi   = "Mi"
-	UnitGPU        = "GPU"
-	UnitByte       = "B"
-	UnitGB         = "GB"
-	UnitTimeHr     = "hr"
-	UnitCPUmHr     = "m-hr"
-	UnitMemoryMiHr = "Mi-hr"
-	UnitGPUHr      = "GPU-hr"
-	UnitGBHr       = "GB-hr"
+	UnitCPUCore   = "vCPU"
+	UnitCPUm      = "m"
+	UnitByte      = "B"
+	UnitMi        = "Mi"
+	UnitGB        = "GB"
+	UnitGPU       = "GPU"
+	UnitTimeHr    = "hr"
+	UnitCPUCoreHr = "vCPU-hr"
+	UnitCPUmHr    = "m-hr"
+	UnitBHr       = "B-hr"
+	UnitMiHr      = "Mi-hr"
+	UnitGBHr      = "GB-hr"
+	UnitGPUHr     = "GPU-hr"
 )
 
 type ResourceQuantity struct {
 	Resource Resource
 	Unit     Unit
-	Quantity float64
+	Values   stats.Stats
+}
+
+type ResourceQuantities map[Resource]ResourceQuantity
+
+func (rqs ResourceQuantities) Set(resource Resource, unit Unit, statType stats.StatType, value float64) {
+	if _, ok := rqs[resource]; !ok {
+		rqs[resource] = ResourceQuantity{
+			Resource: resource,
+			Unit:     unit,
+			Values:   stats.NewStats(),
+		}
+	}
+
+	rqs[resource].Values[statType] = value
 }

+ 6 - 6
core/pkg/model/kubemodel/resourcequota.go

@@ -11,19 +11,19 @@ type ResourceQuota struct {
 }
 
 type ResourceQuotaSpec struct {
-	Hard ResourceQuotaSpecHard
+	Hard *ResourceQuotaSpecHard
 }
 
 type ResourceQuotaSpecHard struct {
-	Requests []ResourceQuantity
-	Limits   []ResourceQuantity
+	Requests ResourceQuantities
+	Limits   ResourceQuantities
 }
 
 type ResourceQuotaStatus struct {
-	Used ResourceQuotaStatusUsed
+	Used *ResourceQuotaStatusUsed
 }
 
 type ResourceQuotaStatusUsed struct {
-	Requests []ResourceQuantity
-	Limits   []ResourceQuantity
+	Requests ResourceQuantities
+	Limits   ResourceQuantities
 }

+ 118 - 0
core/pkg/stats/stats.go

@@ -0,0 +1,118 @@
+package stats
+
+import (
+	"errors"
+	"fmt"
+	"math"
+)
+
+type StatType string
+
+const (
+	Val StatType = ""
+	Avg StatType = "avg"
+	Max StatType = "max"
+	Min StatType = "min"
+	P95 StatType = "p95"
+	P85 StatType = "p85"
+)
+
+type Stats map[StatType]float64
+
+func NewStats(capacity ...int) Stats {
+	if len(capacity) == 1 {
+		s := make(map[StatType]float64, capacity[0])
+		return s
+	}
+
+	return map[StatType]float64{}
+}
+
+func (s Stats) Value() (float64, bool) {
+	if s == nil {
+		return 0.0, false
+	}
+
+	val, ok := s[Val]
+
+	return val, ok
+}
+
+func (s Stats) Avg() (float64, bool) {
+	if s == nil {
+		return 0.0, false
+	}
+
+	val, ok := s[Avg]
+
+	return val, ok
+}
+
+func (s Stats) Max() (float64, bool) {
+	if s == nil {
+		return 0.0, false
+	}
+
+	val, ok := s[Max]
+
+	return val, ok
+}
+
+func (s Stats) Min() (float64, bool) {
+	if s == nil {
+		return 0.0, false
+	}
+
+	val, ok := s[Min]
+
+	return val, ok
+}
+
+func (s Stats) P95() (float64, bool) {
+	if s == nil {
+		return 0.0, false
+	}
+
+	val, ok := s[P95]
+
+	return val, ok
+}
+
+func (s Stats) P85() (float64, bool) {
+	if s == nil {
+		return 0.0, false
+	}
+
+	val, ok := s[P85]
+
+	return val, ok
+}
+
+func (s Stats) Sanitize() error {
+	if s == nil {
+		return nil
+	}
+
+	var errs []error
+
+	for t := range s {
+		if math.IsNaN(s[t]) {
+			delete(s, t)
+			errs = append(errs, fmt.Errorf("%v is NaN", t))
+		}
+		if math.IsInf(s[t], 0) {
+			delete(s, t)
+			errs = append(errs, fmt.Errorf("%v is Inf", t))
+		}
+	}
+
+	if len(errs) > 0 {
+		errStr := fmt.Sprintf("%d errors:", len(errs))
+		for _, e := range errs {
+			errStr += fmt.Sprintf(" [%s]", e)
+		}
+		return errors.New(errStr)
+	}
+
+	return nil
+}

+ 65 - 0
core/pkg/stats/stats_test.go

@@ -0,0 +1,65 @@
+package stats
+
+import (
+	"errors"
+	"math"
+	"testing"
+)
+
+func TestStats_Sanitize(t *testing.T) {
+	type testCase struct {
+		stats Stats
+		exp   error
+	}
+
+	testCases := []testCase{
+		{
+			nil,
+			nil,
+		},
+		{
+			Stats{},
+			nil,
+		},
+		{
+			Stats{
+				Val: 1.0,
+			},
+			nil,
+		},
+		{
+			Stats{
+				Avg: 0.1,
+				Max: 1.0,
+			},
+			nil,
+		},
+		{
+			Stats{
+				Avg: math.Inf(0),
+				Max: 1.0,
+			},
+			errors.New("1 errors: [avg is Inf]"),
+		},
+		{
+			Stats{
+				Avg: math.Inf(0),
+				Max: math.NaN(),
+			},
+			errors.New("2 errors: [avg is Inf] [max is NaN]"),
+		},
+	}
+
+	for _, tc := range testCases {
+		err := tc.stats.Sanitize()
+		if err != nil && tc.exp == nil {
+			t.Errorf("unexpected error: %s", err)
+		}
+		if err == nil && tc.exp != nil {
+			t.Errorf("expected error: %s", tc.exp)
+		}
+		if err != nil && tc.exp != nil && err.Error()[0] != tc.exp.Error()[0] {
+			t.Errorf("expected error: %s; received error: %s", tc.exp, err)
+		}
+	}
+}

+ 117 - 9
pkg/costmodel/kubemodel.go

@@ -7,6 +7,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/env"
 	"github.com/opencost/opencost/core/pkg/model/kubemodel"
 	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/core/pkg/stats"
 )
 
 const logTimeFmt string = "2006-01-02T15:04:05"
@@ -69,12 +70,12 @@ func (cm *CostModel) kmComputeNamespaces(kms *kubemodel.KubeModelSet, start, end
 	nsAnnosResult, _ := nsAnnosResultFuture.Await()
 
 	for _, res := range nsLabelsResult {
-		kms.RegisterNamespace(res.UID)
+		kms.RegisterNamespace(res.UID, res.Namespace)
 		kms.Namespaces[res.UID].Labels = res.Labels
 	}
 
 	for _, res := range nsAnnosResult {
-		kms.RegisterNamespace(res.UID)
+		kms.RegisterNamespace(res.UID, res.Namespace)
 		kms.Namespaces[res.UID].Annotations = res.Annotations
 	}
 
@@ -85,17 +86,124 @@ func (cm *CostModel) kmComputeResourceQuotas(kms *kubemodel.KubeModelSet, start,
 	grp := source.NewQueryGroup()
 	ds := cm.DataSource.Metrics()
 
+	// spec.hard.requests
 	rqSpecCPURequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPURequestAverage(start, end))
+	rqSpecCPURequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPURequestMax(start, end))
+	rqSpecRAMRequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMRequestAverage(start, end))
+	rqSpecRAMRequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMRequestMax(start, end))
+
+	// spec.hard.limits
+	rqSpecCPULimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPULimitAverage(start, end))
+	rqSpecCPULimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPULimitMax(start, end))
+	rqSpecRAMLimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMLimitAverage(start, end))
+	rqSpecRAMLimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMLimitMax(start, end))
+
+	// status.used.requests
+	rqStatusUsedCPURequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPURequestAverage(start, end))
+	rqStatusUsedCPURequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPURequestMax(start, end))
+	rqStatusUsedRAMRequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMRequestAverage(start, end))
+	rqStatusUsedRAMRequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMRequestMax(start, end))
+
+	// status.used.limits
+	rqStatusUsedCPULimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPULimitAverage(start, end))
+	rqStatusUsedCPULimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPULimitMax(start, end))
+	rqStatusUsedRAMLimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMLimitAverage(start, end))
+	rqStatusUsedRAMLimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMLimitMax(start, end))
 
 	rqSpecCPURequestAverageResult, _ := rqSpecCPURequestAverageResultFuture.Await()
-
 	for _, res := range rqSpecCPURequestAverageResult {
-		kms.RegisterResourceQuota(res.UID)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Requests = append(kms.ResourceQuotas[res.UID].Spec.Hard.Requests, kubemodel.ResourceQuantity{
-			Resource: kubemodel.ResourceCPU,
-			Unit:     kubemodel.UnitCPUm,
-			Quantity: res.Data[0].Value * 1000.0,
-		})
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Avg, res.Data[0].Value)
+	}
+
+	rqSpecCPURequestMaxResult, _ := rqSpecCPURequestMaxResultFuture.Await()
+	for _, res := range rqSpecCPURequestMaxResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Max, res.Data[0].Value)
+	}
+
+	rqSpecRAMRequestAverageResult, _ := rqSpecRAMRequestAverageResultFuture.Await()
+	for _, res := range rqSpecRAMRequestAverageResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Avg, res.Data[0].Value)
+	}
+
+	rqSpecRAMRequestMaxResult, _ := rqSpecRAMRequestMaxResultFuture.Await()
+	for _, res := range rqSpecRAMRequestMaxResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Max, res.Data[0].Value)
+	}
+
+	rqSpecCPULimitAverageResult, _ := rqSpecCPULimitAverageResultFuture.Await()
+	for _, res := range rqSpecCPULimitAverageResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Avg, res.Data[0].Value)
+	}
+
+	rqSpecCPULimitMaxResult, _ := rqSpecCPULimitMaxResultFuture.Await()
+	for _, res := range rqSpecCPULimitMaxResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Max, res.Data[0].Value)
+	}
+
+	rqSpecRAMLimitAverageResult, _ := rqSpecRAMLimitAverageResultFuture.Await()
+	for _, res := range rqSpecRAMLimitAverageResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Avg, res.Data[0].Value)
+	}
+
+	rqSpecRAMLimitMaxResult, _ := rqSpecRAMLimitMaxResultFuture.Await()
+	for _, res := range rqSpecRAMLimitMaxResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Max, res.Data[0].Value)
+	}
+
+	rqStatusUsedCPURequestAverageResult, _ := rqStatusUsedCPURequestAverageResultFuture.Await()
+	for _, res := range rqStatusUsedCPURequestAverageResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Avg, res.Data[0].Value)
+	}
+
+	rqStatusUsedCPURequestMaxResult, _ := rqStatusUsedCPURequestMaxResultFuture.Await()
+	for _, res := range rqStatusUsedCPURequestMaxResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Max, res.Data[0].Value)
+	}
+
+	rqStatusUsedRAMRequestAverageResult, _ := rqStatusUsedRAMRequestAverageResultFuture.Await()
+	for _, res := range rqStatusUsedRAMRequestAverageResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Avg, res.Data[0].Value)
+	}
+
+	rqStatusUsedRAMRequestMaxResult, _ := rqStatusUsedRAMRequestMaxResultFuture.Await()
+	for _, res := range rqStatusUsedRAMRequestMaxResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Max, res.Data[0].Value)
+	}
+
+	rqStatusUsedCPULimitAverageResult, _ := rqStatusUsedCPULimitAverageResultFuture.Await()
+	for _, res := range rqStatusUsedCPULimitAverageResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Avg, res.Data[0].Value)
+	}
+
+	rqStatusUsedCPULimitMaxResult, _ := rqStatusUsedCPULimitMaxResultFuture.Await()
+	for _, res := range rqStatusUsedCPULimitMaxResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Max, res.Data[0].Value)
+	}
+
+	rqStatusUsedRAMLimitAverageResult, _ := rqStatusUsedRAMLimitAverageResultFuture.Await()
+	for _, res := range rqStatusUsedRAMLimitAverageResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Avg, res.Data[0].Value)
+	}
+
+	rqStatusUsedRAMLimitMaxResult, _ := rqStatusUsedRAMLimitMaxResultFuture.Await()
+	for _, res := range rqStatusUsedRAMLimitMaxResult {
+		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
+		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceMemory, kubemodel.UnitByte, stats.Max, res.Data[0].Value)
 	}
 
 	return nil