فهرست منبع

Add Allocation Metrics to KubeModel containers (#3785)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 5 روز پیش
والد
کامیت
951429b741
4فایلهای تغییر یافته به همراه82 افزوده شده و 32 حذف شده
  1. 12 10
      core/pkg/model/kubemodel/container.go
  2. 36 18
      core/pkg/model/kubemodel/kubemodel_codecs.go
  3. 6 4
      core/pkg/model/kubemodel/mock.go
  4. 28 0
      pkg/kubemodel/kubemodel.go

+ 12 - 10
core/pkg/model/kubemodel/container.go

@@ -7,16 +7,18 @@ import (
 
 // @bingen:generate:Container
 type Container struct {
-	PodUID           string             `json:"podUid"`
-	Name             string             `json:"name"`
-	ResourceRequests ResourceQuantities `json:"resourceRequests"`
-	ResourceLimits   ResourceQuantities `json:"resourceLimits"`
-	CPUCoreUsageAvg  float64            `json:"cpuCoreUsageAvg"`
-	CPUCoreUsageMax  float64            `json:"cpuCoreUsageMax"`
-	RAMBytesUsageAvg float64            `json:"ramBytesUsageAvg"`
-	RAMBytesUsageMax float64            `json:"ramBytesUsageMax"`
-	Start            time.Time          `json:"start"`
-	End              time.Time          `json:"end"`
+	PodUID            string             `json:"podUid"`
+	Name              string             `json:"name"`
+	ResourceRequests  ResourceQuantities `json:"resourceRequests"`
+	ResourceLimits    ResourceQuantities `json:"resourceLimits"`
+	CPUCoresAllocated float64            `json:"cpuCoresAllocated"`
+	CPUCoreUsageAvg   float64            `json:"cpuCoreUsageAvg"`
+	CPUCoreUsageMax   float64            `json:"cpuCoreUsageMax"`
+	RAMBytesAllocated float64            `json:"ramBytesAllocated"`
+	RAMBytesUsageAvg  float64            `json:"ramBytesUsageAvg"`
+	RAMBytesUsageMax  float64            `json:"ramBytesUsageMax"`
+	Start             time.Time          `json:"start"`
+	End               time.Time          `json:"end"`
 }
 
 func (c *Container) GetKey() string {

+ 36 - 18
core/pkg/model/kubemodel/kubemodel_codecs.go

@@ -1037,10 +1037,14 @@ func (target *Container) MarshalBinaryWithContext(ctx *EncodingContext) (err err
 	}
 	// --- [end][write][alias](ResourceQuantities) ---
 
+	buff.WriteFloat64(target.CPUCoresAllocated) // write float64
+
 	buff.WriteFloat64(target.CPUCoreUsageAvg) // write float64
 
 	buff.WriteFloat64(target.CPUCoreUsageMax) // write float64
 
+	buff.WriteFloat64(target.RAMBytesAllocated) // write float64
+
 	buff.WriteFloat64(target.RAMBytesUsageAvg) // write float64
 
 	buff.WriteFloat64(target.RAMBytesUsageMax) // write float64
@@ -1226,37 +1230,43 @@ func (target *Container) UnmarshalBinaryWithContext(ctx *DecodingContext) (err e
 	// --- [end][read][alias](ResourceQuantities) ---
 
 	bb := buff.ReadFloat64() // read float64
-	target.CPUCoreUsageAvg = bb
+	target.CPUCoresAllocated = bb
 
 	cc := buff.ReadFloat64() // read float64
-	target.CPUCoreUsageMax = cc
+	target.CPUCoreUsageAvg = cc
 
 	dd := buff.ReadFloat64() // read float64
-	target.RAMBytesUsageAvg = dd
+	target.CPUCoreUsageMax = dd
 
 	ee := buff.ReadFloat64() // read float64
-	target.RAMBytesUsageMax = ee
+	target.RAMBytesAllocated = ee
+
+	ff := buff.ReadFloat64() // read float64
+	target.RAMBytesUsageAvg = ff
+
+	gg := buff.ReadFloat64() // read float64
+	target.RAMBytesUsageMax = gg
 
 	// --- [begin][read][reference](time.Time) ---
-	ff := new(time.Time)
-	gg := buff.ReadInt() // byte array length
-	hh := buff.ReadBytes(gg)
-	errC := ff.UnmarshalBinary(hh)
+	hh := new(time.Time)
+	ll := buff.ReadInt() // byte array length
+	mm := buff.ReadBytes(ll)
+	errC := hh.UnmarshalBinary(mm)
 	if errC != nil {
 		return errC
 	}
-	target.Start = *ff
+	target.Start = *hh
 	// --- [end][read][reference](time.Time) ---
 
 	// --- [begin][read][reference](time.Time) ---
-	ll := new(time.Time)
-	mm := buff.ReadInt() // byte array length
-	nn := buff.ReadBytes(mm)
-	errD := ll.UnmarshalBinary(nn)
+	nn := new(time.Time)
+	oo := buff.ReadInt() // byte array length
+	pp := buff.ReadBytes(oo)
+	errD := nn.UnmarshalBinary(pp)
 	if errD != nil {
 		return errD
 	}
-	target.End = *ll
+	target.End = *nn
 	// --- [end][read][reference](time.Time) ---
 
 	return nil
@@ -4225,6 +4235,7 @@ func (target *KubeModelSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (er
 				if buff.ReadUInt8() == uint8(0) {
 					zzz = nil
 				} else {
+
 					// --- [begin][read][struct](Container) ---
 					y := new(Container)
 					buff.ReadInt() // [compatibility, unused]
@@ -4362,6 +4373,7 @@ func (target *KubeModelSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (er
 				if buff.ReadUInt8() == uint8(0) {
 					zzzzzz = nil
 				} else {
+
 					// --- [begin][read][struct](DaemonSet) ---
 					uu := new(DaemonSet)
 					buff.ReadInt() // [compatibility, unused]
@@ -4683,6 +4695,7 @@ func (target *KubeModelSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (er
 				if buff.ReadUInt8() == uint8(0) {
 					zzzzzzzzzzzzz = nil
 				} else {
+
 					// --- [begin][read][struct](Service) ---
 					uuuu := new(Service)
 					buff.ReadInt() // [compatibility, unused]
@@ -5102,6 +5115,7 @@ func (stream *KubeModelSetStream) Stream() iter.Seq2[BingenFieldInfo, *BingenVal
 					if buff.ReadUInt8() == uint8(0) {
 						zzz = nil
 					} else {
+
 						// --- [begin][read][struct](Container) ---
 						y := new(Container)
 						buff.ReadInt() // [compatibility, unused]
@@ -5281,6 +5295,7 @@ func (stream *KubeModelSetStream) Stream() iter.Seq2[BingenFieldInfo, *BingenVal
 					if buff.ReadUInt8() == uint8(0) {
 						zzzzzz = nil
 					} else {
+
 						// --- [begin][read][struct](DaemonSet) ---
 						rr := new(DaemonSet)
 						buff.ReadInt() // [compatibility, unused]
@@ -5700,6 +5715,7 @@ func (stream *KubeModelSetStream) Stream() iter.Seq2[BingenFieldInfo, *BingenVal
 					if buff.ReadUInt8() == uint8(0) {
 						zzzzzzzzzzzzz = nil
 					} else {
+
 						// --- [begin][read][struct](Service) ---
 						hhhh := new(Service)
 						buff.ReadInt() // [compatibility, unused]
@@ -5919,7 +5935,6 @@ func (target *Metadata) MarshalBinaryWithContext(ctx *EncodingContext) (err erro
 		// --- [begin][write][slice]([]Diagnostic) ---
 		buff.WriteInt(len(target.Diagnostics)) // slice length
 		for i := range target.Diagnostics {
-
 			// --- [begin][write][struct](Diagnostic) ---
 			buff.WriteInt(0) // [compatibility, unused]
 			errC := target.Diagnostics[i].MarshalBinaryWithContext(ctx)
@@ -6043,7 +6058,6 @@ func (target *Metadata) UnmarshalBinaryWithContext(ctx *DecodingContext) (err er
 			l := buff.ReadInt() // slice len
 			h := make([]Diagnostic, l)
 			for i := range l {
-
 				// --- [begin][read][struct](Diagnostic) ---
 				n := new(Diagnostic)
 				buff.ReadInt() // [compatibility, unused]
@@ -6066,7 +6080,6 @@ func (target *Metadata) UnmarshalBinaryWithContext(ctx *DecodingContext) (err er
 	}
 	// field version check
 	if uint8(1) <= version {
-
 		// --- [begin][read][alias](DiagnosticLevel) ---
 		var o int
 		p := buff.ReadInt() // read int
@@ -7832,6 +7845,7 @@ func (target *Pod) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 		// --- [begin][write][slice]([]NetworkTrafficDetail) ---
 		buff.WriteInt(len(target.NetworkTrafficDetails)) // slice length
 		for ii := range target.NetworkTrafficDetails {
+
 			// --- [begin][write][struct](NetworkTrafficDetail) ---
 			buff.WriteInt(0) // [compatibility, unused]
 			errC := target.NetworkTrafficDetails[ii].MarshalBinaryWithContext(ctx)
@@ -8085,6 +8099,7 @@ func (target *Pod) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 		tt := buff.ReadInt() // slice len
 		ss := make([]NetworkTrafficDetail, tt)
 		for ii := range tt {
+
 			// --- [begin][read][struct](NetworkTrafficDetail) ---
 			ww := new(NetworkTrafficDetail)
 			buff.ReadInt() // [compatibility, unused]
@@ -8799,7 +8814,6 @@ func (target *ResourceQuantity) UnmarshalBinaryWithContext(ctx *DecodingContext)
 	}
 	// field version check
 	if uint8(1) <= version {
-
 		// --- [begin][read][alias](Stats) ---
 		var l map[StatType]float64
 		if buff.ReadUInt8() == uint8(0) {
@@ -9428,6 +9442,7 @@ func (target *ResourceQuotaSpecHard) UnmarshalBinaryWithContext(ctx *DecodingCon
 
 	// field version check
 	if uint8(1) <= version {
+
 		// --- [begin][read][alias](ResourceQuantities) ---
 		var a map[Resource]ResourceQuantity
 		if buff.ReadUInt8() == uint8(0) {
@@ -9476,6 +9491,7 @@ func (target *ResourceQuotaSpecHard) UnmarshalBinaryWithContext(ctx *DecodingCon
 	}
 	// field version check
 	if uint8(1) <= version {
+
 		// --- [begin][read][alias](ResourceQuantities) ---
 		var l map[Resource]ResourceQuantity
 		if buff.ReadUInt8() == uint8(0) {
@@ -9822,6 +9838,7 @@ func (target *ResourceQuotaStatusUsed) UnmarshalBinaryWithContext(ctx *DecodingC
 
 	// field version check
 	if uint8(1) <= version {
+
 		// --- [begin][read][alias](ResourceQuantities) ---
 		var a map[Resource]ResourceQuantity
 		if buff.ReadUInt8() == uint8(0) {
@@ -9870,6 +9887,7 @@ func (target *ResourceQuotaStatusUsed) UnmarshalBinaryWithContext(ctx *DecodingC
 	}
 	// field version check
 	if uint8(1) <= version {
+
 		// --- [begin][read][alias](ResourceQuantities) ---
 		var l map[Resource]ResourceQuantity
 		if buff.ReadUInt8() == uint8(0) {

+ 6 - 4
core/pkg/model/kubemodel/mock.go

@@ -123,10 +123,12 @@ func NewMockKubeModelSet(start, end time.Time) *KubeModelSet {
 			ResourceCPU:    {Resource: ResourceCPU, Unit: UnitMillicore, Values: Stats{StatAvg: 500, StatMax: 500}},
 			ResourceMemory: {Resource: ResourceMemory, Unit: UnitByte, Values: Stats{StatAvg: 1e9, StatMax: 1e9}},
 		},
-		CPUCoreUsageAvg:  0.18,
-		CPUCoreUsageMax:  0.42,
-		RAMBytesUsageAvg: 300e6,
-		RAMBytesUsageMax: 480e6,
+		CPUCoresAllocated: 0.25,
+		CPUCoreUsageAvg:   0.18,
+		CPUCoreUsageMax:   0.42,
+		RAMBytesAllocated: 512e6,
+		RAMBytesUsageAvg:  300e6,
+		RAMBytesUsageMax:  480e6,
 		Start:            start,
 		End:              end,
 	})

+ 28 - 0
pkg/kubemodel/kubemodel.go

@@ -957,9 +957,11 @@ func (km *KubeModel) computeContainers(kms *kubemodel.KubeModelSet, start, end t
 	containerResourceRequestsFuture := source.WithGroup(grp, metrics.QueryContainerResourceRequests(start, end))
 	containerResourceLimitsFuture := source.WithGroup(grp, metrics.QueryContainerResourceLimits(start, end))
 
+	cpuCoresAllocatedFuture := source.WithGroup(grp, metrics.QueryCPUCoresAllocated(start, end))
 	cpuUsageAvgFuture := source.WithGroup(grp, metrics.QueryCPUUsageAvg(start, end))
 	cpuUsageMaxFuture := source.WithGroup(grp, metrics.QueryCPUUsageMax(start, end))
 
+	ramBytesAllocatedFuture := source.WithGroup(grp, metrics.QueryRAMBytesAllocated(start, end))
 	ramUsageAvgFuture := source.WithGroup(grp, metrics.QueryRAMUsageAvg(start, end))
 	ramUsageMaxFuture := source.WithGroup(grp, metrics.QueryRAMUsageMax(start, end))
 
@@ -1008,6 +1010,32 @@ func (km *KubeModel) computeContainers(kms *kubemodel.KubeModelSet, start, end t
 		container.ResourceLimits.Set(resource, unit, kubemodel.StatAvg, value)
 	}
 
+	cpuCoresAllocatedResult, _ := cpuCoresAllocatedFuture.Await()
+	for _, res := range cpuCoresAllocatedResult {
+		key := containerKey{podUID: res.UID, name: res.Container}
+		container, ok := containerMap[key]
+		if !ok {
+			log.Warnf("container %s/%s has not been initialized to add CPU cores allocated", res.UID, res.Container)
+			continue
+		}
+		if len(res.Data) > 0 {
+			container.CPUCoresAllocated = res.Data[0].Value
+		}
+	}
+
+	ramBytesAllocatedResult, _ := ramBytesAllocatedFuture.Await()
+	for _, res := range ramBytesAllocatedResult {
+		key := containerKey{podUID: res.UID, name: res.Container}
+		container, ok := containerMap[key]
+		if !ok {
+			log.Warnf("container %s/%s has not been initialized to add RAM bytes allocated", res.UID, res.Container)
+			continue
+		}
+		if len(res.Data) > 0 {
+			container.RAMBytesAllocated = res.Data[0].Value
+		}
+	}
+
 	cpuUsageAvgResult, _ := cpuUsageAvgFuture.Await()
 	for _, res := range cpuUsageAvgResult {
 		key := containerKey{podUID: res.UID, name: res.Container}