| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589 |
- package kubemodel
- import (
- "errors"
- "fmt"
- "time"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/model/kubemodel"
- "github.com/opencost/opencost/core/pkg/model/shared"
- "github.com/opencost/opencost/core/pkg/source"
- )
- const logTimeFmt string = "2006-01-02T15:04:05"
- type KubeModel struct {
- ds source.OpenCostDataSource
- clusterUID string
- }
- func NewKubeModel(clusterUID string, dataSource source.OpenCostDataSource) (*KubeModel, error) {
- if dataSource == nil {
- return nil, errors.New("OpenCostDataSource cannot be nil")
- }
- km := &KubeModel{
- ds: dataSource,
- clusterUID: clusterUID,
- }
- km.clusterUID = clusterUID
- log.Debugf("NewKubeModel(%s)", km.clusterUID)
- return km, nil
- }
- // ComputeKubeModel uses the CostModel instance to compute an KubeModelSet
- // for the window defined by the given start and end times. The KubeModels
- // returned are unaggregated (i.e. down to the container level).
- func (km *KubeModel) ComputeKubeModelSet(start, end time.Time) (*kubemodel.KubeModelSet, error) {
- // 1. Initialize new KubeModelSet for requested Window
- kms := kubemodel.NewKubeModelSet(start, end)
- // 2. Query CostModel for each set of objects
- var err error
- // 2.1 Compute Cluster
- err = km.computeCluster(kms, start, end)
- if err != nil {
- kms.Error(err)
- return kms, fmt.Errorf("error computing kubemodel.Cluster for (%s, %s): %w", start.Format(logTimeFmt), end.Format(logTimeFmt), err)
- }
- // 2.2 Compute Nodes
- err = km.computeNodes(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.3 Compute Namespaces
- err = km.computeNamespaces(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.5 Compute Pods
- err = km.computePods(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.6 Compute Deployments
- err = km.computeDeployments(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.7 Compute StatefulSets
- err = km.computeStatefulSets(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.8 Compute DaemonSets
- err = km.computeDaemonSets(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.9 Compute Jobs
- err = km.computeJobs(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.10 Compute CronJobs
- err = km.computeCronJobs(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.11 Compute ReplicaSets
- err = km.computeReplicaSets(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.12 Compute Containers
- err = km.computeContainers(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.13 Compute ResourceQuotas
- err = km.computeResourceQuotas(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.14 Compute Services
- err = km.computeServices(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.15 Compute PersistentVolumes
- err = km.computePersistentVolumes(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.16 Compute PersistentVolumeClaims
- err = km.computePersistentVolumeClaims(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 2.17 Compute DCGM Devices
- err = km.computeDCGMDevices(kms, start, end)
- if err != nil {
- kms.Error(err)
- }
- // 3. Mark KubeModelSet as completed
- kms.Metadata.CompletedAt = time.Now().UTC()
- return kms, nil
- }
- func (km *KubeModel) computeCluster(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- clusterInfoResultFuture := source.WithGroup(grp, metrics.QueryClusterInfo(start, end))
- clusterUptimeResultFuture := source.WithGroup(grp, metrics.QueryClusterUptime(start, end))
- clusterMap := make(map[string]*kubemodel.Cluster)
- clusterInfoResult, _ := clusterInfoResultFuture.Await()
- for _, res := range clusterInfoResult {
- clusterMap[res.UID] = &kubemodel.Cluster{
- UID: res.UID,
- Provider: shared.ParseProvider(res.Provider),
- Account: res.AccountID,
- Name: res.Cluster,
- Region: res.Region,
- }
- }
- clusterUptimeResult, _ := clusterUptimeResultFuture.Await()
- for _, res := range clusterUptimeResult {
- cluster, ok := clusterMap[res.UID]
- if !ok {
- log.Warnf("cluster with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- cluster.Start = s
- cluster.End = e
- }
- cluster, ok := clusterMap[km.clusterUID]
- if !ok {
- return fmt.Errorf("failed to compute cluster with UID '%s'", km.clusterUID)
- }
- kms.RegisterCluster(cluster)
- return nil
- }
- func (km *KubeModel) computeNodes(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- nodeInfoResultFuture := source.WithGroup(grp, metrics.QueryNodeInfo(start, end))
- nodeUptimeResultFuture := source.WithGroup(grp, metrics.QueryNodeUptime(start, end))
- nodeLabelsResultFuture := source.WithGroup(grp, metrics.QueryNodeLabels(start, end))
- nodeResourceCapacitiesFuture := source.WithGroup(grp, metrics.QueryNodeResourceCapacities(start, end))
- nodeResourcesAllocatableFuture := source.WithGroup(grp, metrics.QueryNodeResourcesAllocatable(start, end))
- localStorageBytesFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageBytes(start, end))
- localStorageUsedAvgFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageUsedAvg(start, end))
- localStorageUsedMaxFuture := source.WithGroup(grp, metrics.QueryKMLocalStorageUsedMax(start, end))
- nodeMap := make(map[string]*kubemodel.Node)
- nodeInfoResult, _ := nodeInfoResultFuture.Await()
- for _, res := range nodeInfoResult {
- nodeMap[res.UID] = &kubemodel.Node{
- UID: res.UID,
- ProviderID: res.ProviderID,
- Name: res.Node,
- ResourceCapacities: make(kubemodel.ResourceQuantities),
- ResourcesAllocatable: make(kubemodel.ResourceQuantities),
- }
- }
- nodeUptimeResult, _ := nodeUptimeResultFuture.Await()
- for _, res := range nodeUptimeResult {
- node, ok := nodeMap[res.UID]
- if !ok {
- log.Warnf("node with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- node.Start = s
- node.End = e
- }
- nodeResourceCapacitiesResult, _ := nodeResourceCapacitiesFuture.Await()
- for _, res := range nodeResourceCapacitiesResult {
- node, ok := nodeMap[res.UID]
- if !ok {
- log.Warnf("node with UID '%s' has not been initialized to add resource capacities", res.UID)
- continue
- }
- resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
- node.ResourceCapacities.Set(resource, unit, kubemodel.StatAvg, value)
- }
- nodeResourcesAllocatableResult, _ := nodeResourcesAllocatableFuture.Await()
- for _, res := range nodeResourcesAllocatableResult {
- node, ok := nodeMap[res.UID]
- if !ok {
- log.Warnf("node with UID '%s' has not been initialized to add resources allocatable", res.UID)
- continue
- }
- resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
- node.ResourcesAllocatable.Set(resource, unit, kubemodel.StatAvg, value)
- }
- nodeLabelsResult, _ := nodeLabelsResultFuture.Await()
- for _, res := range nodeLabelsResult {
- node, ok := nodeMap[res.UID]
- if !ok {
- log.Warnf("node with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- node.Labels = res.Labels
- }
- localStorageBytesResult, _ := localStorageBytesFuture.Await()
- for _, res := range localStorageBytesResult {
- node, ok := nodeMap[res.UID]
- if ok {
- node.FileSystem.CapacityBytes = res.Value
- }
- }
- localStorageUsedAvgResult, _ := localStorageUsedAvgFuture.Await()
- for _, res := range localStorageUsedAvgResult {
- node, ok := nodeMap[res.UID]
- if ok {
- node.FileSystem.UsageByteAvg = res.Value
- }
- }
- localStorageUsedMaxResult, _ := localStorageUsedMaxFuture.Await()
- for _, res := range localStorageUsedMaxResult {
- node, ok := nodeMap[res.UID]
- if ok {
- node.FileSystem.UsageByteMax = res.Value
- }
- }
- for _, node := range nodeMap {
- err := kms.RegisterNode(node)
- if err != nil {
- log.Warnf("Failed to register node: %s", err.Error())
- }
- }
- return nil
- }
- // resourceUnitValue converts prometheus resource/unit strings from ResourceResult
- // into kubemodel types, applying any necessary unit conversions.
- func resourceUnitValue(resource, unit string, value float64) (kubemodel.Resource, kubemodel.Unit, float64) {
- switch resource {
- case "cpu":
- return kubemodel.ResourceCPU, kubemodel.UnitCore, value
- case "memory":
- return kubemodel.ResourceMemory, kubemodel.UnitByte, value
- default:
- return kubemodel.Resource(resource), kubemodel.Unit(unit), value
- }
- }
- func (km *KubeModel) computeNamespaces(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- nsInfoResultFuture := source.WithGroup(grp, metrics.QueryNamespaceInfo(start, end))
- nsUptimeResultFuture := source.WithGroup(grp, metrics.QueryNamespaceUptime(start, end))
- nsLabelsResultFuture := source.WithGroup(grp, metrics.QueryNamespaceLabels(start, end))
- nsAnnosResultFuture := source.WithGroup(grp, metrics.QueryNamespaceAnnotations(start, end))
- nsMap := make(map[string]*kubemodel.Namespace)
- // Initialize namespaces from info
- nsInfoResult, _ := nsInfoResultFuture.Await()
- for _, res := range nsInfoResult {
- nsMap[res.UID] = &kubemodel.Namespace{
- UID: res.UID,
- Name: res.Namespace,
- }
- }
- nsUptimeResult, _ := nsUptimeResultFuture.Await()
- for _, res := range nsUptimeResult {
- ns, ok := nsMap[res.UID]
- if !ok {
- log.Warnf("namespace with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- ns.Start = s
- ns.End = e
- }
- nsLabelsResult, _ := nsLabelsResultFuture.Await()
- for _, res := range nsLabelsResult {
- ns, ok := nsMap[res.UID]
- if !ok {
- log.Warnf("namespace with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- ns.Labels = res.Labels
- }
- nsAnnosResult, _ := nsAnnosResultFuture.Await()
- for _, res := range nsAnnosResult {
- ns, ok := nsMap[res.UID]
- if !ok {
- log.Warnf("namespace with UID '%s' has not been initialized to add annotations", res.UID)
- continue
- }
- ns.Annotations = res.Annotations
- }
- for _, namespace := range nsMap {
- err := kms.RegisterNamespace(namespace)
- if err != nil {
- log.Warnf("Failed to register namespace: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computePods(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- podInfoResultFuture := source.WithGroup(grp, metrics.QueryPodInfo(start, end))
- podUptimeResultFuture := source.WithGroup(grp, metrics.QueryPodUptime(start, end))
- podOwnerResultFuture := source.WithGroup(grp, metrics.QueryPodOwners(start, end))
- podPVCVolumesResultFuture := source.WithGroup(grp, metrics.QueryPodPVCVolumes(start, end))
- podLabelsResultFuture := source.WithGroup(grp, metrics.QueryPodLabels(start, end))
- podAnnosResultFuture := source.WithGroup(grp, metrics.QueryPodAnnotations(start, end))
- podNetworkEgressBytesResultFuture := source.WithGroup(grp, metrics.QueryPodNetworkEgressBytes(start, end))
- podNetworkIngressBytesResultFuture := source.WithGroup(grp, metrics.QueryPodNetworkIngressBytes(start, end))
- podMap := make(map[string]*kubemodel.Pod)
- podInfoResult, _ := podInfoResultFuture.Await()
- for _, res := range podInfoResult {
- podMap[res.UID] = &kubemodel.Pod{
- UID: res.UID,
- Name: res.Pod,
- NamespaceUID: res.NamespaceUID,
- NodeUID: res.NodeUID,
- }
- }
- podUptimeResult, _ := podUptimeResultFuture.Await()
- for _, res := range podUptimeResult {
- pod, ok := podMap[res.UID]
- if !ok {
- log.Warnf("pod with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- pod.Start = s
- pod.End = e
- }
- podOwnersResult, _ := podOwnerResultFuture.Await()
- for _, res := range podOwnersResult {
- pod, ok := podMap[res.UID]
- if !ok {
- log.Warnf("pod with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- pod.Owners = append(pod.Owners, kubemodel.Owner{
- UID: res.OwnerUID,
- Kind: kubemodel.ParseOwnerKind(res.OwnerKind),
- Controller: res.Controller,
- })
- }
- podPVCVolumesResult, _ := podPVCVolumesResultFuture.Await()
- for _, res := range podPVCVolumesResult {
- pod, ok := podMap[res.UID]
- if !ok {
- log.Warnf("pod with UID '%s' has not been initialized to add PVC volumes", res.UID)
- continue
- }
- pod.PVCVolumes = append(pod.PVCVolumes, kubemodel.PodPVCVolume{
- Name: res.PodVolumeName,
- PersistentVolumeClaimUID: res.PVCUID,
- })
- }
- podLabelsResult, _ := podLabelsResultFuture.Await()
- for _, res := range podLabelsResult {
- pod, ok := podMap[res.UID]
- if !ok {
- log.Warnf("pod with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- pod.Labels = res.Labels
- }
- podAnnosResult, _ := podAnnosResultFuture.Await()
- for _, res := range podAnnosResult {
- pod, ok := podMap[res.UID]
- if !ok {
- log.Warnf("pod with UID '%s' has not been initialized to add annotations", res.UID)
- continue
- }
- pod.Annotations = res.Annotations
- }
- appendDetail := func(uid string, dir kubemodel.TrafficDirection, tt kubemodel.TrafficType, isNatGateway bool, endpoint string, bytes float64) {
- pod, ok := podMap[uid]
- if !ok || bytes <= 0 {
- return
- }
- pod.NetworkTrafficDetails = append(pod.NetworkTrafficDetails, kubemodel.NetworkTrafficDetail{
- PodUID: uid,
- TrafficDirection: dir,
- TrafficType: tt,
- IsNatGateway: isNatGateway,
- Endpoint: endpoint,
- Bytes: bytes,
- })
- }
- networkTrafficType := func(res *source.PodNetworkBytesResult) (kubemodel.TrafficType, bool) {
- if res.Internet {
- return kubemodel.TrafficTypeInternet, true
- }
- if !res.SameRegion {
- return kubemodel.TrafficTypeCrossRegion, true
- }
- if !res.SameZone {
- return kubemodel.TrafficTypeCrossZone, true
- }
- return "", false
- }
- podNetworkEgressResult, _ := podNetworkEgressBytesResultFuture.Await()
- for _, res := range podNetworkEgressResult {
- tt, ok := networkTrafficType(res)
- if !ok {
- continue
- }
- appendDetail(res.UID, kubemodel.TrafficDirectionEgress, tt, res.NatGateway, res.Service, res.Value)
- }
- podNetworkIngressResult, _ := podNetworkIngressBytesResultFuture.Await()
- for _, res := range podNetworkIngressResult {
- tt, ok := networkTrafficType(res)
- if !ok {
- continue
- }
- appendDetail(res.UID, kubemodel.TrafficDirectionIngress, tt, res.NatGateway, res.Service, res.Value)
- }
- for _, pod := range podMap {
- err := kms.RegisterPod(pod)
- if err != nil {
- log.Warnf("Failed to register pod: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeDeployments(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- deploymentInfoResultFuture := source.WithGroup(grp, metrics.QueryDeploymentInfo(start, end))
- deploymentUptimeResultFuture := source.WithGroup(grp, metrics.QueryDeploymentUptime(start, end))
- deploymentLabelsResultFuture := source.WithGroup(grp, metrics.QueryDeploymentLabels(start, end))
- deploymentAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryDeploymentAnnotations(start, end))
- deploymentMatchLabelsResultFuture := source.WithGroup(grp, metrics.QueryDeploymentMatchLabels(start, end))
- deploymentMap := make(map[string]*kubemodel.Deployment)
- deploymentInfoResult, _ := deploymentInfoResultFuture.Await()
- for _, res := range deploymentInfoResult {
- deploymentMap[res.UID] = &kubemodel.Deployment{
- UID: res.UID,
- Name: res.Deployment,
- NamespaceUID: res.NamespaceUID,
- }
- }
- deploymentUptimeResult, _ := deploymentUptimeResultFuture.Await()
- for _, res := range deploymentUptimeResult {
- deployment, ok := deploymentMap[res.UID]
- if !ok {
- log.Warnf("deployment with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- deployment.Start = s
- deployment.End = e
- }
- deploymentLabelsResult, _ := deploymentLabelsResultFuture.Await()
- for _, res := range deploymentLabelsResult {
- deployment, ok := deploymentMap[res.UID]
- if !ok {
- log.Warnf("deployment with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- deployment.Labels = res.Labels
- }
- deploymentAnnotationsResult, _ := deploymentAnnotationsResultFuture.Await()
- for _, res := range deploymentAnnotationsResult {
- deployment, ok := deploymentMap[res.UID]
- if !ok {
- log.Warnf("deployment with UID '%s' has not been initialized to add annotations", res.UID)
- continue
- }
- deployment.Annotations = res.Annotations
- }
- deploymentMatchLabelsResult, _ := deploymentMatchLabelsResultFuture.Await()
- for _, res := range deploymentMatchLabelsResult {
- deployment, ok := deploymentMap[res.UID]
- if !ok {
- log.Warnf("deployment with UID '%s' has not been initialized to add match labels", res.UID)
- continue
- }
- deployment.MatchLabels = res.Labels
- }
- for _, deployment := range deploymentMap {
- err := kms.RegisterDeployment(deployment)
- if err != nil {
- log.Warnf("Failed to register deployment: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeStatefulSets(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- statefulSetInfoResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetInfo(start, end))
- statefulSetUptimeResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetUptime(start, end))
- statefulSetLabelsResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetLabels(start, end))
- statefulSetAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetAnnotations(start, end))
- statefulSetMatchLabelsResultFuture := source.WithGroup(grp, metrics.QueryStatefulSetMatchLabels(start, end))
- statefulSetMap := make(map[string]*kubemodel.StatefulSet)
- statefulSetInfoResult, _ := statefulSetInfoResultFuture.Await()
- for _, res := range statefulSetInfoResult {
- statefulSetMap[res.UID] = &kubemodel.StatefulSet{
- UID: res.UID,
- Name: res.StatefulSet,
- NamespaceUID: res.NamespaceUID,
- }
- }
- statefulSetUptimeResult, _ := statefulSetUptimeResultFuture.Await()
- for _, res := range statefulSetUptimeResult {
- statefulSet, ok := statefulSetMap[res.UID]
- if !ok {
- log.Warnf("statefulset with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- statefulSet.Start = s
- statefulSet.End = e
- }
- statefulSetLabelsResult, _ := statefulSetLabelsResultFuture.Await()
- for _, res := range statefulSetLabelsResult {
- statefulSet, ok := statefulSetMap[res.UID]
- if !ok {
- log.Warnf("statefulset with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- statefulSet.Labels = res.Labels
- }
- statefulSetAnnotationsResult, _ := statefulSetAnnotationsResultFuture.Await()
- for _, res := range statefulSetAnnotationsResult {
- statefulSet, ok := statefulSetMap[res.UID]
- if !ok {
- log.Warnf("statefulset with UID '%s' has not been initialized to add annotations", res.UID)
- continue
- }
- statefulSet.Annotations = res.Annotations
- }
- statefulSetMatchLabelsResult, _ := statefulSetMatchLabelsResultFuture.Await()
- for _, res := range statefulSetMatchLabelsResult {
- statefulSet, ok := statefulSetMap[res.UID]
- if !ok {
- log.Warnf("statefulset with UID '%s' has not been initialized to add match labels", res.UID)
- continue
- }
- statefulSet.MatchLabels = res.Labels
- }
- for _, statefulSet := range statefulSetMap {
- err := kms.RegisterStatefulSet(statefulSet)
- if err != nil {
- log.Warnf("Failed to register statefulset: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeDaemonSets(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- daemonSetInfoResultFuture := source.WithGroup(grp, metrics.QueryDaemonSetInfo(start, end))
- daemonSetUptimeResultFuture := source.WithGroup(grp, metrics.QueryDaemonSetUptime(start, end))
- daemonSetLabelsResultFuture := source.WithGroup(grp, metrics.QueryDaemonSetLabels(start, end))
- daemonSetAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryDaemonSetAnnotations(start, end))
- daemonSetMap := make(map[string]*kubemodel.DaemonSet)
- daemonSetInfoResult, _ := daemonSetInfoResultFuture.Await()
- for _, res := range daemonSetInfoResult {
- daemonSetMap[res.UID] = &kubemodel.DaemonSet{
- UID: res.UID,
- Name: res.DaemonSet,
- NamespaceUID: res.NamespaceUID,
- }
- }
- daemonSetUptimeResult, _ := daemonSetUptimeResultFuture.Await()
- for _, res := range daemonSetUptimeResult {
- daemonSet, ok := daemonSetMap[res.UID]
- if !ok {
- log.Warnf("daemonset with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- daemonSet.Start = s
- daemonSet.End = e
- }
- daemonSetLabelsResult, _ := daemonSetLabelsResultFuture.Await()
- for _, res := range daemonSetLabelsResult {
- daemonSet, ok := daemonSetMap[res.UID]
- if !ok {
- log.Warnf("daemonset with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- daemonSet.Labels = res.Labels
- }
- daemonSetAnnotationsResult, _ := daemonSetAnnotationsResultFuture.Await()
- for _, res := range daemonSetAnnotationsResult {
- daemonSet, ok := daemonSetMap[res.UID]
- if !ok {
- log.Warnf("daemonset with UID '%s' has not been initialized to add annotations", res.UID)
- continue
- }
- daemonSet.Annotations = res.Annotations
- }
- for _, daemonSet := range daemonSetMap {
- err := kms.RegisterDaemonSet(daemonSet)
- if err != nil {
- log.Warnf("Failed to register daemonset: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeJobs(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- jobInfoResultFuture := source.WithGroup(grp, metrics.QueryJobInfo(start, end))
- jobUptimeResultFuture := source.WithGroup(grp, metrics.QueryJobUptime(start, end))
- jobLabelsResultFuture := source.WithGroup(grp, metrics.QueryJobLabels(start, end))
- jobAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryJobAnnotations(start, end))
- jobMap := make(map[string]*kubemodel.Job)
- jobInfoResult, _ := jobInfoResultFuture.Await()
- for _, res := range jobInfoResult {
- jobMap[res.UID] = &kubemodel.Job{
- UID: res.UID,
- Name: res.Job,
- NamespaceUID: res.NamespaceUID,
- }
- }
- jobUptimeResult, _ := jobUptimeResultFuture.Await()
- for _, res := range jobUptimeResult {
- job, ok := jobMap[res.UID]
- if !ok {
- log.Warnf("job with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- job.Start = s
- job.End = e
- }
- jobLabelsResult, _ := jobLabelsResultFuture.Await()
- for _, res := range jobLabelsResult {
- job, ok := jobMap[res.UID]
- if !ok {
- log.Warnf("job with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- job.Labels = res.Labels
- }
- jobAnnotationsResult, _ := jobAnnotationsResultFuture.Await()
- for _, res := range jobAnnotationsResult {
- job, ok := jobMap[res.UID]
- if !ok {
- log.Warnf("job with UID '%s' has not been initialized to add annotations", res.UID)
- continue
- }
- job.Annotations = res.Annotations
- }
- for _, job := range jobMap {
- err := kms.RegisterJob(job)
- if err != nil {
- log.Warnf("Failed to register job: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeCronJobs(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- cronJobInfoResultFuture := source.WithGroup(grp, metrics.QueryCronJobInfo(start, end))
- cronJobUptimeResultFuture := source.WithGroup(grp, metrics.QueryCronJobUptime(start, end))
- cronJobLabelsResultFuture := source.WithGroup(grp, metrics.QueryCronJobLabels(start, end))
- cronJobAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryCronJobAnnotations(start, end))
- cronJobMap := make(map[string]*kubemodel.CronJob)
- cronJobInfoResult, _ := cronJobInfoResultFuture.Await()
- for _, res := range cronJobInfoResult {
- cronJobMap[res.UID] = &kubemodel.CronJob{
- UID: res.UID,
- Name: res.CronJob,
- NamespaceUID: res.NamespaceUID,
- }
- }
- cronJobUptimeResult, _ := cronJobUptimeResultFuture.Await()
- for _, res := range cronJobUptimeResult {
- cronJob, ok := cronJobMap[res.UID]
- if !ok {
- log.Warnf("cronjob with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- cronJob.Start = s
- cronJob.End = e
- }
- cronJobLabelsResult, _ := cronJobLabelsResultFuture.Await()
- for _, res := range cronJobLabelsResult {
- cronJob, ok := cronJobMap[res.UID]
- if !ok {
- log.Warnf("cronjob with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- cronJob.Labels = res.Labels
- }
- cronJobAnnotationsResult, _ := cronJobAnnotationsResultFuture.Await()
- for _, res := range cronJobAnnotationsResult {
- cronJob, ok := cronJobMap[res.UID]
- if !ok {
- log.Warnf("cronjob with UID '%s' has not been initialized to add annotations", res.UID)
- continue
- }
- cronJob.Annotations = res.Annotations
- }
- for _, cronJob := range cronJobMap {
- err := kms.RegisterCronJob(cronJob)
- if err != nil {
- log.Warnf("Failed to register cronjob: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeReplicaSets(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- replicaSetInfoResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetInfo(start, end))
- replicaSetUptimeResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetUptime(start, end))
- replicaSetOwnerResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetOwners(start, end))
- replicaSetLabelsResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetLabels(start, end))
- replicaSetAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryReplicaSetAnnotations(start, end))
- replicaSetMap := make(map[string]*kubemodel.ReplicaSet)
- replicaSetInfoResult, _ := replicaSetInfoResultFuture.Await()
- for _, res := range replicaSetInfoResult {
- replicaSetMap[res.UID] = &kubemodel.ReplicaSet{
- UID: res.UID,
- Name: res.ReplicaSet,
- NamespaceUID: res.NamespaceUID,
- }
- }
- replicaSetUptimeResult, _ := replicaSetUptimeResultFuture.Await()
- for _, res := range replicaSetUptimeResult {
- replicaSet, ok := replicaSetMap[res.UID]
- if !ok {
- log.Warnf("replicaset with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- replicaSet.Start = s
- replicaSet.End = e
- }
- replicaSetOwnersResult, _ := replicaSetOwnerResultFuture.Await()
- for _, res := range replicaSetOwnersResult {
- replicaSet, ok := replicaSetMap[res.UID]
- if !ok {
- log.Warnf("replicaset with UID '%s' has not been initialized to add owner", res.UID)
- continue
- }
- replicaSet.Owners = append(replicaSet.Owners, kubemodel.Owner{
- UID: res.OwnerUID,
- Kind: kubemodel.ParseOwnerKind(res.OwnerKind),
- Controller: res.Controller,
- })
- }
- replicaSetLabelsResult, _ := replicaSetLabelsResultFuture.Await()
- for _, res := range replicaSetLabelsResult {
- replicaSet, ok := replicaSetMap[res.UID]
- if !ok {
- log.Warnf("replicaset with UID '%s' has not been initialized to add labels", res.UID)
- continue
- }
- replicaSet.Labels = res.Labels
- }
- replicaSetAnnotationsResult, _ := replicaSetAnnotationsResultFuture.Await()
- for _, res := range replicaSetAnnotationsResult {
- replicaSet, ok := replicaSetMap[res.UID]
- if !ok {
- log.Warnf("replicaset with UID '%s' has not been initialized to add annotations", res.UID)
- continue
- }
- replicaSet.Annotations = res.Annotations
- }
- for _, replicaSet := range replicaSetMap {
- err := kms.RegisterReplicaSet(replicaSet)
- if err != nil {
- log.Warnf("Failed to register replicaset: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeContainers(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- containerUptimeFuture := source.WithGroup(grp, metrics.QueryContainerUptime(start, end))
- containerResourceRequestsFuture := source.WithGroup(grp, metrics.QueryContainerResourceRequests(start, end))
- containerResourceLimitsFuture := source.WithGroup(grp, metrics.QueryContainerResourceLimits(start, end))
- cpuCoresAllocatedFuture := source.WithGroup(grp, metrics.QueryCPUCoresAllocated(start, end))
- cpuUsageAvgFuture := source.WithGroup(grp, metrics.QueryCPUUsageAvg(start, end))
- cpuUsageMaxFuture := source.WithGroup(grp, metrics.QueryCPUUsageMax(start, end))
- ramBytesAllocatedFuture := source.WithGroup(grp, metrics.QueryRAMBytesAllocated(start, end))
- ramUsageAvgFuture := source.WithGroup(grp, metrics.QueryRAMUsageAvg(start, end))
- ramUsageMaxFuture := source.WithGroup(grp, metrics.QueryRAMUsageMax(start, end))
- type containerKey struct {
- podUID string
- name string
- }
- containerMap := make(map[containerKey]*kubemodel.Container)
- containerUptimeResult, _ := containerUptimeFuture.Await()
- for _, res := range containerUptimeResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- containerMap[key] = &kubemodel.Container{
- PodUID: res.UID,
- Name: res.Container,
- ResourceRequests: make(kubemodel.ResourceQuantities),
- ResourceLimits: make(kubemodel.ResourceQuantities),
- Start: s,
- End: e,
- }
- }
- containerResourceRequestsResult, _ := containerResourceRequestsFuture.Await()
- for _, res := range containerResourceRequestsResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- container, ok := containerMap[key]
- if !ok {
- log.Warnf("container %s/%s has not been initialized to add resource requests", res.UID, res.Container)
- continue
- }
- resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
- container.ResourceRequests.Set(resource, unit, kubemodel.StatAvg, value)
- }
- containerResourceLimitsResult, _ := containerResourceLimitsFuture.Await()
- for _, res := range containerResourceLimitsResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- container, ok := containerMap[key]
- if !ok {
- log.Warnf("container %s/%s has not been initialized to add resource limits", res.UID, res.Container)
- continue
- }
- resource, unit, value := resourceUnitValue(res.Resource, res.Unit, res.Value)
- container.ResourceLimits.Set(resource, unit, kubemodel.StatAvg, value)
- }
- cpuCoresAllocatedResult, _ := cpuCoresAllocatedFuture.Await()
- for _, res := range cpuCoresAllocatedResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- container, ok := containerMap[key]
- if !ok {
- log.Warnf("container %s/%s has not been initialized to add CPU cores allocated", res.UID, res.Container)
- continue
- }
- if len(res.Data) > 0 {
- container.CPUCoresAllocated = res.Data[0].Value
- }
- }
- ramBytesAllocatedResult, _ := ramBytesAllocatedFuture.Await()
- for _, res := range ramBytesAllocatedResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- container, ok := containerMap[key]
- if !ok {
- log.Warnf("container %s/%s has not been initialized to add RAM bytes allocated", res.UID, res.Container)
- continue
- }
- if len(res.Data) > 0 {
- container.RAMBytesAllocated = res.Data[0].Value
- }
- }
- cpuUsageAvgResult, _ := cpuUsageAvgFuture.Await()
- for _, res := range cpuUsageAvgResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- container, ok := containerMap[key]
- if !ok {
- log.Warnf("container %s/%s has not been initialized to add CPU usage avg", res.UID, res.Container)
- continue
- }
- if len(res.Data) > 0 {
- container.CPUCoreUsageAvg = res.Data[0].Value
- }
- }
- cpuUsageMaxResult, _ := cpuUsageMaxFuture.Await()
- for _, res := range cpuUsageMaxResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- container, ok := containerMap[key]
- if !ok {
- log.Warnf("container %s/%s has not been initialized to add CPU usage max", res.UID, res.Container)
- continue
- }
- if len(res.Data) > 0 {
- container.CPUCoreUsageMax = res.Data[0].Value
- }
- }
- ramUsageAvgResult, _ := ramUsageAvgFuture.Await()
- for _, res := range ramUsageAvgResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- container, ok := containerMap[key]
- if !ok {
- log.Warnf("container %s/%s has not been initialized to add RAM usage avg", res.UID, res.Container)
- continue
- }
- if len(res.Data) > 0 {
- container.RAMBytesUsageAvg = res.Data[0].Value
- }
- }
- ramUsageMaxResult, _ := ramUsageMaxFuture.Await()
- for _, res := range ramUsageMaxResult {
- key := containerKey{podUID: res.UID, name: res.Container}
- container, ok := containerMap[key]
- if !ok {
- log.Warnf("container %s/%s has not been initialized to add RAM usage max", res.UID, res.Container)
- continue
- }
- if len(res.Data) > 0 {
- container.RAMBytesUsageMax = res.Data[0].Value
- }
- }
- for _, container := range containerMap {
- err := kms.RegisterContainer(container)
- if err != nil {
- log.Warnf("Failed to register container: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeResourceQuotas(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- rqInfoResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaInfo(start, end))
- rqUptimeResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaUptime(start, end))
- // spec.hard.requests
- rqSpecCPURequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPURequestAverage(start, end))
- rqSpecCPURequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPURequestMax(start, end))
- rqSpecRAMRequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMRequestAverage(start, end))
- rqSpecRAMRequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMRequestMax(start, end))
- // spec.hard.limits
- rqSpecCPULimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPULimitAverage(start, end))
- rqSpecCPULimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPULimitMax(start, end))
- rqSpecRAMLimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMLimitAverage(start, end))
- rqSpecRAMLimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMLimitMax(start, end))
- // status.used.requests
- rqStatusUsedCPURequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPURequestAverage(start, end))
- rqStatusUsedCPURequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPURequestMax(start, end))
- rqStatusUsedRAMRequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMRequestAverage(start, end))
- rqStatusUsedRAMRequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMRequestMax(start, end))
- // status.used.limits
- rqStatusUsedCPULimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPULimitAverage(start, end))
- rqStatusUsedCPULimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPULimitMax(start, end))
- rqStatusUsedRAMLimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMLimitAverage(start, end))
- rqStatusUsedRAMLimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMLimitMax(start, end))
- rqMap := make(map[string]*kubemodel.ResourceQuota)
- // Initialize resource quotas from info
- rqInfoResult, _ := rqInfoResultFuture.Await()
- for _, res := range rqInfoResult {
- rqMap[res.UID] = &kubemodel.ResourceQuota{
- UID: res.UID,
- Name: res.ResourceQuota,
- NamespaceUID: res.NamespaceUID,
- Spec: &kubemodel.ResourceQuotaSpec{Hard: &kubemodel.ResourceQuotaSpecHard{}},
- Status: &kubemodel.ResourceQuotaStatus{Used: &kubemodel.ResourceQuotaStatusUsed{}},
- }
- }
- rqUptimeResult, _ := rqUptimeResultFuture.Await()
- for _, res := range rqUptimeResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- rq.Start = s
- rq.End = e
- }
- rqSpecCPURequestAverageResult, _ := rqSpecCPURequestAverageResultFuture.Await()
- for _, res := range rqSpecCPURequestAverageResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add spec CPU request average", res.UID)
- continue
- }
- mcpu := res.Value * 1000
- rq.Spec.Hard.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
- }
- rqSpecCPURequestMaxResult, _ := rqSpecCPURequestMaxResultFuture.Await()
- for _, res := range rqSpecCPURequestMaxResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add spec CPU request max", res.UID)
- continue
- }
- mcpu := res.Value * 1000
- rq.Spec.Hard.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
- }
- rqSpecRAMRequestAverageResult, _ := rqSpecRAMRequestAverageResultFuture.Await()
- for _, res := range rqSpecRAMRequestAverageResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add spec RAM request average", res.UID)
- continue
- }
- rq.Spec.Hard.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Value)
- }
- rqSpecRAMRequestMaxResult, _ := rqSpecRAMRequestMaxResultFuture.Await()
- for _, res := range rqSpecRAMRequestMaxResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add spec RAM request max", res.UID)
- continue
- }
- rq.Spec.Hard.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Value)
- }
- rqSpecCPULimitAverageResult, _ := rqSpecCPULimitAverageResultFuture.Await()
- for _, res := range rqSpecCPULimitAverageResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add spec CPU limit average", res.UID)
- continue
- }
- mcpu := res.Value * 1000
- rq.Spec.Hard.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
- }
- rqSpecCPULimitMaxResult, _ := rqSpecCPULimitMaxResultFuture.Await()
- for _, res := range rqSpecCPULimitMaxResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add spec CPU limit max", res.UID)
- continue
- }
- mcpu := res.Value * 1000
- rq.Spec.Hard.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
- }
- rqSpecRAMLimitAverageResult, _ := rqSpecRAMLimitAverageResultFuture.Await()
- for _, res := range rqSpecRAMLimitAverageResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add spec RAM limit average", res.UID)
- continue
- }
- rq.Spec.Hard.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Value)
- }
- rqSpecRAMLimitMaxResult, _ := rqSpecRAMLimitMaxResultFuture.Await()
- for _, res := range rqSpecRAMLimitMaxResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add spec RAM limit max", res.UID)
- continue
- }
- rq.Spec.Hard.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Value)
- }
- rqStatusUsedCPURequestAverageResult, _ := rqStatusUsedCPURequestAverageResultFuture.Await()
- for _, res := range rqStatusUsedCPURequestAverageResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add status CPU request average", res.UID)
- continue
- }
- mcpu := res.Value * 1000
- rq.Status.Used.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
- }
- rqStatusUsedCPURequestMaxResult, _ := rqStatusUsedCPURequestMaxResultFuture.Await()
- for _, res := range rqStatusUsedCPURequestMaxResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add status CPU request max", res.UID)
- continue
- }
- mcpu := res.Value * 1000
- rq.Status.Used.SetRequest(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
- }
- rqStatusUsedRAMRequestAverageResult, _ := rqStatusUsedRAMRequestAverageResultFuture.Await()
- for _, res := range rqStatusUsedRAMRequestAverageResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add status RAM request average", res.UID)
- continue
- }
- rq.Status.Used.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Value)
- }
- rqStatusUsedRAMRequestMaxResult, _ := rqStatusUsedRAMRequestMaxResultFuture.Await()
- for _, res := range rqStatusUsedRAMRequestMaxResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add status RAM request max", res.UID)
- continue
- }
- rq.Status.Used.SetRequest(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Value)
- }
- rqStatusUsedCPULimitAverageResult, _ := rqStatusUsedCPULimitAverageResultFuture.Await()
- for _, res := range rqStatusUsedCPULimitAverageResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add status CPU limit average", res.UID)
- continue
- }
- mcpu := res.Value * 1000
- rq.Status.Used.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatAvg, mcpu)
- }
- rqStatusUsedCPULimitMaxResult, _ := rqStatusUsedCPULimitMaxResultFuture.Await()
- for _, res := range rqStatusUsedCPULimitMaxResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add status CPU limit max", res.UID)
- continue
- }
- mcpu := res.Value * 1000
- rq.Status.Used.SetLimit(kubemodel.ResourceCPU, kubemodel.UnitMillicore, kubemodel.StatMax, mcpu)
- }
- rqStatusUsedRAMLimitAverageResult, _ := rqStatusUsedRAMLimitAverageResultFuture.Await()
- for _, res := range rqStatusUsedRAMLimitAverageResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add status RAM limit average", res.UID)
- continue
- }
- rq.Status.Used.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatAvg, res.Value)
- }
- rqStatusUsedRAMLimitMaxResult, _ := rqStatusUsedRAMLimitMaxResultFuture.Await()
- for _, res := range rqStatusUsedRAMLimitMaxResult {
- rq, ok := rqMap[res.UID]
- if !ok {
- log.Warnf("resource quota with UID '%s' has not been initialized to add status RAM limit max", res.UID)
- continue
- }
- rq.Status.Used.SetLimit(kubemodel.ResourceMemory, kubemodel.UnitByte, kubemodel.StatMax, res.Value)
- }
- for _, resourceQuota := range rqMap {
- err := kms.RegisterResourceQuota(resourceQuota)
- if err != nil {
- log.Warnf("Failed to register resource quota: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeServices(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- serviceInfoResultFuture := source.WithGroup(grp, metrics.QueryServiceInfo(start, end))
- serviceUptimeResultFuture := source.WithGroup(grp, metrics.QueryServiceUptime(start, end))
- serviceSelectorLabelsResultFuture := source.WithGroup(grp, metrics.QueryServiceSelectorLabels(start, end))
- serviceMap := make(map[string]*kubemodel.Service)
- // Initialize services from info
- serviceInfoResult, _ := serviceInfoResultFuture.Await()
- for _, res := range serviceInfoResult {
- serviceMap[res.UID] = &kubemodel.Service{
- UID: res.UID,
- NamespaceUID: res.NamespaceUID,
- Name: res.Service,
- Type: kubemodel.ParseServiceType(res.ServiceType),
- LBIngressAddress: res.LBIngressAddress,
- }
- }
- serviceUptimeResult, _ := serviceUptimeResultFuture.Await()
- for _, res := range serviceUptimeResult {
- service, ok := serviceMap[res.UID]
- if !ok {
- log.Warnf("service with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- service.Start = s
- service.End = e
- }
- serviceSelectorLabelsResult, _ := serviceSelectorLabelsResultFuture.Await()
- for _, res := range serviceSelectorLabelsResult {
- service, ok := serviceMap[res.UID]
- if !ok {
- log.Warnf("service with UID '%s' has not been initialized to add selector labels", res.UID)
- continue
- }
- service.Selector = res.Labels
- }
- for _, service := range serviceMap {
- err := kms.RegisterService(service)
- if err != nil {
- log.Warnf("Failed to register service: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computePersistentVolumes(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- pvInfoResultFuture := source.WithGroup(grp, metrics.QueryKMPVInfo(start, end))
- pvUptimeResultFuture := source.WithGroup(grp, metrics.QueryPVUptime(start, end))
- pvBytesResultFuture := source.WithGroup(grp, metrics.QueryPVBytes(start, end))
- pvMap := make(map[string]*kubemodel.PersistentVolume)
- pvInfoResult, _ := pvInfoResultFuture.Await()
- for _, res := range pvInfoResult {
- pvMap[res.UID] = &kubemodel.PersistentVolume{
- UID: res.UID,
- Name: res.PersistentVolume,
- StorageClass: res.StorageClass,
- CSIVolumeHandle: res.CSIVolumeHandle,
- }
- }
- pvUptimeResult, _ := pvUptimeResultFuture.Await()
- for _, res := range pvUptimeResult {
- pv, ok := pvMap[res.UID]
- if !ok {
- log.Warnf("persistent volume with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- pv.Start = s
- pv.End = e
- }
- pvBytesResult, _ := pvBytesResultFuture.Await()
- for _, res := range pvBytesResult {
- pv, ok := pvMap[res.UID]
- if !ok {
- log.Warnf("persistent volume with UID '%s' has not been initialized to add bytes", res.UID)
- continue
- }
- pv.SizeBytes = res.Value
- }
- for _, pv := range pvMap {
- err := kms.RegisterPersistentVolume(pv)
- if err != nil {
- log.Warnf("Failed to register persistent volume: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computePersistentVolumeClaims(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- pvcInfoResultFuture := source.WithGroup(grp, metrics.QueryKMPVCInfo(start, end))
- pvcUptimeResultFuture := source.WithGroup(grp, metrics.QueryPVCUptime(start, end))
- pvcBytesRequestedResultFuture := source.WithGroup(grp, metrics.QueryPVCBytesRequested(start, end))
- pvcBytesUsedAvgResultFuture := source.WithGroup(grp, metrics.QueryPVCBytesUsedAverage(start, end))
- pvcBytesUsedMaxResultFuture := source.WithGroup(grp, metrics.QueryPVCBytesUsedMax(start, end))
- pvcMap := make(map[string]*kubemodel.PersistentVolumeClaim)
- pvcInfoResult, _ := pvcInfoResultFuture.Await()
- for _, res := range pvcInfoResult {
- pvcMap[res.UID] = &kubemodel.PersistentVolumeClaim{
- UID: res.UID,
- Name: res.PersistentVolumeClaim,
- NamespaceUID: res.NamespaceUID,
- PersistentVolumeUID: res.PVUID,
- StorageClass: res.StorageClass,
- }
- }
- pvcUptimeResult, _ := pvcUptimeResultFuture.Await()
- for _, res := range pvcUptimeResult {
- pvc, ok := pvcMap[res.UID]
- if !ok {
- log.Warnf("persistent volume claim with UID '%s' has not been initialized to add uptime", res.UID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- pvc.Start = s
- pvc.End = e
- }
- pvcBytesRequestedResult, _ := pvcBytesRequestedResultFuture.Await()
- for _, res := range pvcBytesRequestedResult {
- pvc, ok := pvcMap[res.UID]
- if !ok {
- log.Warnf("persistent volume claim with UID '%s' has not been initialized to add requested bytes", res.UID)
- continue
- }
- if len(res.Data) > 0 {
- pvc.RequestedBytes = res.Data[0].Value
- }
- }
- pvcBytesUsedAvgResult, _ := pvcBytesUsedAvgResultFuture.Await()
- for _, res := range pvcBytesUsedAvgResult {
- pvc, ok := pvcMap[res.UID]
- if !ok {
- log.Warnf("persistent volume claim with UID '%s' has not been initialized to add bytes used average", res.UID)
- continue
- }
- pvc.UsageBytesAvg = res.Value
- }
- pvcBytesUsedMaxResult, _ := pvcBytesUsedMaxResultFuture.Await()
- for _, res := range pvcBytesUsedMaxResult {
- pvc, ok := pvcMap[res.UID]
- if !ok {
- log.Warnf("persistent volume claim with UID '%s' has not been initialized to add bytes used max", res.UID)
- continue
- }
- pvc.UsageBytesMax = res.Value
- }
- for _, pvc := range pvcMap {
- err := kms.RegisterPVC(pvc)
- if err != nil {
- log.Warnf("Failed to register persistent volume claim: %s", err.Error())
- }
- }
- return nil
- }
- func (km *KubeModel) computeDCGMDevices(kms *kubemodel.KubeModelSet, start, end time.Time) error {
- grp := source.NewQueryGroup()
- metrics := km.ds.Metrics()
- dcgmInfoFuture := source.WithGroup(grp, metrics.QueryDCGMDeviceInfo(start, end))
- dcgmUptimeFuture := source.WithGroup(grp, metrics.QueryDCGMDeviceUptime(start, end))
- dcgmUsageAvgFuture := source.WithGroup(grp, metrics.QueryDCGMContainerUsageAvg(start, end))
- dcgmUsageMaxFuture := source.WithGroup(grp, metrics.QueryDCGMContainerUsageMax(start, end))
- deviceMap := make(map[string]*kubemodel.DCGMDevice)
- dcgmInfoResult, _ := dcgmInfoFuture.Await()
- for _, res := range dcgmInfoResult {
- if res.UUID == "" {
- continue
- }
- if _, ok := deviceMap[res.UUID]; ok {
- continue
- }
- deviceMap[res.UUID] = &kubemodel.DCGMDevice{
- UUID: res.UUID,
- Device: res.Device,
- ModelName: res.ModelName,
- PodUsages: make(map[string]kubemodel.DCGMPod),
- }
- }
- dcgmUptimeResult, _ := dcgmUptimeFuture.Await()
- for _, res := range dcgmUptimeResult {
- d, ok := deviceMap[res.UUID]
- if !ok {
- log.Warnf("DCGM uptime result for unknown device UUID '%s'", res.UUID)
- continue
- }
- s, e := res.GetStartEnd(start, end, km.ds.Resolution())
- d.Start = s
- d.End = e
- }
- dcgmUsageAvgResult, _ := dcgmUsageAvgFuture.Await()
- for _, res := range dcgmUsageAvgResult {
- device, ok := deviceMap[res.UUID]
- if !ok || res.PodUID == "" || res.Container == "" {
- continue
- }
- pod, ok := device.PodUsages[res.PodUID]
- if !ok {
- pod = kubemodel.DCGMPod{ContainerUsages: make(map[string]kubemodel.DCGMContainer)}
- }
- c := pod.ContainerUsages[res.Container]
- c.UsageAvg = res.Value
- pod.ContainerUsages[res.Container] = c
- device.PodUsages[res.PodUID] = pod
- }
- dcgmUsageMaxResult, _ := dcgmUsageMaxFuture.Await()
- for _, res := range dcgmUsageMaxResult {
- device, ok := deviceMap[res.UUID]
- if !ok || res.PodUID == "" || res.Container == "" {
- continue
- }
- pod, ok := device.PodUsages[res.PodUID]
- if !ok {
- pod = kubemodel.DCGMPod{ContainerUsages: make(map[string]kubemodel.DCGMContainer)}
- }
- c := pod.ContainerUsages[res.Container]
- c.UsageMax = res.Value
- pod.ContainerUsages[res.Container] = c
- device.PodUsages[res.PodUID] = pod
- }
- for _, device := range deviceMap {
- if err := kms.RegisterDCGMDevice(device); err != nil {
- log.Warnf("Failed to register DCGM device: %s", err.Error())
- }
- }
- return nil
- }
|