clustercache.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009
  1. package scrape
  2. import (
  3. "fmt"
  4. "slices"
  5. "strconv"
  6. "strings"
  7. "github.com/kubecost/events"
  8. "github.com/opencost/opencost/core/pkg/clustercache"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/source"
  11. "github.com/opencost/opencost/core/pkg/util/promutil"
  12. "github.com/opencost/opencost/modules/collector-source/pkg/event"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  14. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  15. "golang.org/x/exp/maps"
  16. v1 "k8s.io/api/core/v1"
  17. "k8s.io/apimachinery/pkg/api/resource"
  18. "k8s.io/apimachinery/pkg/util/validation"
  19. )
  20. const unmountedPVsContainer = "unmounted-pvs"
  21. type ClusterCacheScraper struct {
  22. clusterCache clustercache.ClusterCache
  23. }
  24. func newClusterCacheScraper(clusterCache clustercache.ClusterCache) Scraper {
  25. return &ClusterCacheScraper{
  26. clusterCache: clusterCache,
  27. }
  28. }
  29. func (ccs *ClusterCacheScraper) Scrape() []metric.Update {
  30. scrapeFuncs := []ScrapeFunc{
  31. ccs.ScrapeNodes,
  32. ccs.ScrapeDeployments,
  33. ccs.ScrapeNamespaces,
  34. ccs.ScrapePods,
  35. ccs.ScrapePVCs,
  36. ccs.ScrapePVs,
  37. ccs.ScrapeServices,
  38. ccs.ScrapeStatefulSets,
  39. ccs.ScrapeReplicaSets,
  40. ccs.ScrapeResourceQuotas,
  41. }
  42. return concurrentScrape(scrapeFuncs...)
  43. }
  44. func (ccs *ClusterCacheScraper) ScrapeNodes() []metric.Update {
  45. nodes := ccs.clusterCache.GetAllNodes()
  46. return ccs.scrapeNodes(nodes)
  47. }
  48. func (ccs *ClusterCacheScraper) scrapeNodes(nodes []*clustercache.Node) []metric.Update {
  49. var scrapeResults []metric.Update
  50. for _, node := range nodes {
  51. nodeInfo := map[string]string{
  52. source.NodeLabel: node.Name,
  53. source.ProviderIDLabel: node.SpecProviderID,
  54. source.UIDLabel: string(node.UID),
  55. }
  56. // Node Capacity
  57. if node.Status.Capacity != nil {
  58. if quantity, ok := node.Status.Capacity[v1.ResourceCPU]; ok {
  59. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  60. scrapeResults = append(scrapeResults, metric.Update{
  61. Name: metric.KubeNodeStatusCapacityCPUCores,
  62. Labels: nodeInfo,
  63. Value: value,
  64. })
  65. }
  66. if quantity, ok := node.Status.Capacity[v1.ResourceMemory]; ok {
  67. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  68. scrapeResults = append(scrapeResults, metric.Update{
  69. Name: metric.KubeNodeStatusCapacityMemoryBytes,
  70. Labels: nodeInfo,
  71. Value: value,
  72. })
  73. }
  74. }
  75. // Node Allocatable Resources
  76. if node.Status.Allocatable != nil {
  77. if quantity, ok := node.Status.Allocatable[v1.ResourceCPU]; ok {
  78. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  79. scrapeResults = append(scrapeResults, metric.Update{
  80. Name: metric.KubeNodeStatusAllocatableCPUCores,
  81. Labels: nodeInfo,
  82. Value: value,
  83. })
  84. }
  85. if quantity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok {
  86. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  87. scrapeResults = append(scrapeResults, metric.Update{
  88. Name: metric.KubeNodeStatusAllocatableMemoryBytes,
  89. Labels: nodeInfo,
  90. Value: value,
  91. })
  92. }
  93. }
  94. // node labels
  95. labelNames, labelValues := promutil.KubeLabelsToLabels(node.Labels)
  96. nodeLabels := util.ToMap(labelNames, labelValues)
  97. scrapeResults = append(scrapeResults, metric.Update{
  98. Name: metric.KubeNodeLabels,
  99. Labels: nodeInfo,
  100. Value: 0,
  101. AdditionalInfo: nodeLabels,
  102. })
  103. }
  104. events.Dispatch(event.ScrapeEvent{
  105. ScraperName: event.KubernetesClusterScraperName,
  106. ScrapeType: event.NodeScraperType,
  107. Targets: len(nodes),
  108. Errors: nil,
  109. })
  110. return scrapeResults
  111. }
  112. func (ccs *ClusterCacheScraper) ScrapeDeployments() []metric.Update {
  113. deployments := ccs.clusterCache.GetAllDeployments()
  114. return ccs.scrapeDeployments(deployments)
  115. }
  116. func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.Deployment) []metric.Update {
  117. var scrapeResults []metric.Update
  118. for _, deployment := range deployments {
  119. deploymentInfo := map[string]string{
  120. source.DeploymentLabel: deployment.Name,
  121. source.NamespaceLabel: deployment.Namespace,
  122. source.UIDLabel: string(deployment.UID),
  123. }
  124. // deployment labels
  125. labelNames, labelValues := promutil.KubeLabelsToLabels(deployment.MatchLabels)
  126. deploymentLabels := util.ToMap(labelNames, labelValues)
  127. scrapeResults = append(scrapeResults, metric.Update{
  128. Name: metric.DeploymentMatchLabels,
  129. Labels: deploymentInfo,
  130. Value: 0,
  131. AdditionalInfo: deploymentLabels,
  132. })
  133. }
  134. events.Dispatch(event.ScrapeEvent{
  135. ScraperName: event.KubernetesClusterScraperName,
  136. ScrapeType: event.DeploymentScraperType,
  137. Targets: len(deployments),
  138. Errors: nil,
  139. })
  140. return scrapeResults
  141. }
  142. func (ccs *ClusterCacheScraper) ScrapeNamespaces() []metric.Update {
  143. namespaces := ccs.clusterCache.GetAllNamespaces()
  144. return ccs.scrapeNamespaces(namespaces)
  145. }
  146. func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Namespace) []metric.Update {
  147. var scrapeResults []metric.Update
  148. for _, namespace := range namespaces {
  149. namespaceInfo := map[string]string{
  150. source.NamespaceLabel: namespace.Name,
  151. source.UIDLabel: string(namespace.UID),
  152. }
  153. scrapeResults = append(scrapeResults, metric.Update{
  154. Name: metric.NamespaceInfo,
  155. Labels: namespaceInfo,
  156. AdditionalInfo: namespaceInfo,
  157. Value: 0,
  158. })
  159. // namespace labels
  160. labelNames, labelValues := promutil.KubeLabelsToLabels(namespace.Labels)
  161. namespaceLabels := util.ToMap(labelNames, labelValues)
  162. scrapeResults = append(scrapeResults, metric.Update{
  163. Name: metric.KubeNamespaceLabels,
  164. Labels: namespaceInfo,
  165. Value: 0,
  166. AdditionalInfo: namespaceLabels,
  167. })
  168. // namespace annotations
  169. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(namespace.Annotations)
  170. namespaceAnnotations := util.ToMap(annotationNames, annotationValues)
  171. scrapeResults = append(scrapeResults, metric.Update{
  172. Name: metric.KubeNamespaceAnnotations,
  173. Labels: namespaceInfo,
  174. Value: 0,
  175. AdditionalInfo: namespaceAnnotations,
  176. })
  177. }
  178. events.Dispatch(event.ScrapeEvent{
  179. ScraperName: event.KubernetesClusterScraperName,
  180. ScrapeType: event.NamespaceScraperType,
  181. Targets: len(namespaces),
  182. Errors: nil,
  183. })
  184. return scrapeResults
  185. }
  186. func (ccs *ClusterCacheScraper) ScrapePods() []metric.Update {
  187. pods := ccs.clusterCache.GetAllPods()
  188. pvcs := ccs.clusterCache.GetAllPersistentVolumeClaims()
  189. return ccs.scrapePods(pods, pvcs)
  190. }
  191. func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod, pvcs []*clustercache.PersistentVolumeClaim) []metric.Update {
  192. // this is only populated if we find gpu resources being requested
  193. var nodesGpuInfo map[string]*NodeGpuInfo
  194. // pv allocation and unmounted pvs
  195. pvcInfo := getPvcsInfo(pvcs)
  196. // pod info by uid
  197. podInfoByUid := make(map[string]map[string]string)
  198. var scrapeResults []metric.Update
  199. for _, pod := range pods {
  200. podInfo := map[string]string{
  201. source.PodLabel: pod.Name,
  202. source.NamespaceLabel: pod.Namespace,
  203. source.UIDLabel: string(pod.UID),
  204. source.NodeLabel: pod.Spec.NodeName,
  205. source.InstanceLabel: pod.Spec.NodeName,
  206. }
  207. podInfoByUid[string(pod.UID)] = podInfo
  208. // pod labels
  209. labelNames, labelValues := promutil.KubeLabelsToLabels(pod.Labels)
  210. podLabels := util.ToMap(labelNames, labelValues)
  211. scrapeResults = append(scrapeResults, metric.Update{
  212. Name: metric.KubePodLabels,
  213. Labels: podInfo,
  214. Value: 0,
  215. AdditionalInfo: podLabels,
  216. })
  217. // pod annotations
  218. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(pod.Annotations)
  219. podAnnotations := util.ToMap(annotationNames, annotationValues)
  220. scrapeResults = append(scrapeResults, metric.Update{
  221. Name: metric.KubePodAnnotations,
  222. Labels: podInfo,
  223. Value: 0,
  224. AdditionalInfo: podAnnotations,
  225. })
  226. // Determine PVC use data for Pod
  227. claimed := make(map[string]struct{})
  228. for _, volume := range pod.Spec.Volumes {
  229. if volume.PersistentVolumeClaim != nil {
  230. name := volume.PersistentVolumeClaim.ClaimName
  231. key := pod.Namespace + "," + name
  232. if _, seen := claimed[key]; seen {
  233. continue
  234. }
  235. if pvc, ok := pvcInfo[key]; ok {
  236. pvc.PodsClaimed = append(pvc.PodsClaimed, string(pod.UID))
  237. claimed[key] = struct{}{}
  238. }
  239. }
  240. }
  241. // Pod owner metric
  242. for _, owner := range pod.OwnerReferences {
  243. ownerInfo := maps.Clone(podInfo)
  244. ownerInfo[source.OwnerKindLabel] = owner.Kind
  245. ownerInfo[source.OwnerNameLabel] = owner.Name
  246. scrapeResults = append(scrapeResults, metric.Update{
  247. Name: metric.KubePodOwner,
  248. Labels: ownerInfo,
  249. Value: 0,
  250. })
  251. }
  252. // Container Status
  253. for _, status := range pod.Status.ContainerStatuses {
  254. if status.State.Running != nil {
  255. containerInfo := maps.Clone(podInfo)
  256. containerInfo[source.ContainerLabel] = status.Name
  257. scrapeResults = append(scrapeResults, metric.Update{
  258. Name: metric.KubePodContainerStatusRunning,
  259. Labels: containerInfo,
  260. Value: 0,
  261. })
  262. }
  263. }
  264. for _, container := range pod.Spec.Containers {
  265. // gpu "requests" is either the request or limit if it exists
  266. var gpuRequest *float64
  267. containerInfo := maps.Clone(podInfo)
  268. containerInfo[source.ContainerLabel] = container.Name
  269. // Requests
  270. if container.Resources.Requests != nil {
  271. // sorting keys here for testing purposes
  272. keys := maps.Keys(container.Resources.Requests)
  273. slices.Sort(keys)
  274. for _, resourceName := range keys {
  275. quantity := container.Resources.Requests[resourceName]
  276. resource, unit, value := toResourceUnitValue(resourceName, quantity)
  277. // failed to parse the resource type
  278. if resource == "" {
  279. log.DedupedWarningf(5, "Failed to parse resource units and quantity for resource: %s", resourceName)
  280. continue
  281. }
  282. resourceRequestInfo := maps.Clone(containerInfo)
  283. resourceRequestInfo[source.ResourceLabel] = resource
  284. resourceRequestInfo[source.UnitLabel] = unit
  285. scrapeResults = append(scrapeResults, metric.Update{
  286. Name: metric.KubePodContainerResourceRequests,
  287. Labels: resourceRequestInfo,
  288. Value: value,
  289. })
  290. // set gpu request if it exists
  291. if isGpuResourceName(resourceName) {
  292. gpuRequestValue := value
  293. gpuRequest = &gpuRequestValue
  294. }
  295. }
  296. }
  297. // Limits
  298. if container.Resources.Limits != nil {
  299. // sorting keys here for testing purposes
  300. keys := maps.Keys(container.Resources.Limits)
  301. slices.Sort(keys)
  302. for _, resourceName := range keys {
  303. quantity := container.Resources.Limits[resourceName]
  304. resource, unit, value := toResourceUnitValue(resourceName, quantity)
  305. // failed to parse the resource type
  306. if resource == "" {
  307. log.DedupedWarningf(5, "Failed to parse resource units and quantity for resource: %s", resourceName)
  308. continue
  309. }
  310. resourceLimitInfo := maps.Clone(containerInfo)
  311. resourceLimitInfo[source.ResourceLabel] = resource
  312. resourceLimitInfo[source.UnitLabel] = unit
  313. scrapeResults = append(scrapeResults, metric.Update{
  314. Name: metric.KubePodContainerResourceLimits,
  315. Labels: resourceLimitInfo,
  316. Value: value,
  317. })
  318. // if we didn't set a gpuRequest previously and the limit is a gpu resource,
  319. // set it to the limit
  320. if gpuRequest == nil && isGpuResourceName(resourceName) {
  321. gpuRequestValue := value
  322. gpuRequest = &gpuRequestValue
  323. }
  324. }
  325. }
  326. // handle the GPU allocation metric here IFF there exists a request/limit for GPUs
  327. // we only load the node gpu data map if we run into a container with gpu requests/limits
  328. if gpuRequest != nil {
  329. if nodesGpuInfo == nil {
  330. nodesGpuInfo = ccs.getNodesGpuInfo()
  331. }
  332. gpuAlloc := *gpuRequest
  333. if nodeGpuInfo, ok := nodesGpuInfo[pod.Spec.NodeName]; ok {
  334. if nodeGpuInfo != nil && nodeGpuInfo.VGPU != 0 {
  335. gpuAlloc = gpuAlloc * (nodeGpuInfo.GPU / nodeGpuInfo.VGPU)
  336. }
  337. }
  338. scrapeResults = append(scrapeResults, metric.Update{
  339. Name: metric.ContainerGPUAllocation,
  340. Labels: maps.Clone(containerInfo),
  341. Value: gpuAlloc,
  342. })
  343. }
  344. }
  345. }
  346. // Iterate through PVC Info after the pods have been tallied and export
  347. // allocation metrics based on the number of other pods claiming the volume
  348. for _, pvc := range pvcInfo {
  349. // unmounted pvs get full allocation
  350. if len(pvc.PodsClaimed) == 0 {
  351. labels := map[string]string{
  352. source.PodLabel: unmountedPVsContainer,
  353. source.NamespaceLabel: pvc.Namespace,
  354. source.UIDLabel: "",
  355. source.NodeLabel: "",
  356. source.InstanceLabel: "",
  357. source.PVCLabel: pvc.Claim,
  358. source.PVLabel: pvc.VolumeName,
  359. }
  360. scrapeResults = append(scrapeResults, metric.Update{
  361. Name: metric.PodPVCAllocation,
  362. Labels: labels,
  363. Value: pvc.Requests,
  364. })
  365. continue
  366. }
  367. // pods get a proportion of pv allocation
  368. value := pvc.Requests / float64(len(pvc.PodsClaimed))
  369. for _, podUid := range pvc.PodsClaimed {
  370. podInfo, ok := podInfoByUid[podUid]
  371. if !ok {
  372. continue
  373. }
  374. pvcLabels := maps.Clone(podInfo)
  375. pvcLabels[source.PVCLabel] = pvc.Claim
  376. pvcLabels[source.PVLabel] = pvc.VolumeName
  377. scrapeResults = append(scrapeResults, metric.Update{
  378. Name: metric.PodPVCAllocation,
  379. Labels: pvcLabels,
  380. Value: value,
  381. })
  382. }
  383. }
  384. events.Dispatch(event.ScrapeEvent{
  385. ScraperName: event.KubernetesClusterScraperName,
  386. ScrapeType: event.PodScraperType,
  387. Targets: len(pods),
  388. Errors: nil,
  389. })
  390. return scrapeResults
  391. }
  392. func (ccs *ClusterCacheScraper) ScrapePVCs() []metric.Update {
  393. pvcs := ccs.clusterCache.GetAllPersistentVolumeClaims()
  394. return ccs.scrapePVCs(pvcs)
  395. }
  396. func (ccs *ClusterCacheScraper) scrapePVCs(pvcs []*clustercache.PersistentVolumeClaim) []metric.Update {
  397. var scrapeResults []metric.Update
  398. for _, pvc := range pvcs {
  399. pvcInfo := map[string]string{
  400. source.PVCLabel: pvc.Name,
  401. source.NamespaceLabel: pvc.Namespace,
  402. source.UIDLabel: string(pvc.UID),
  403. source.VolumeNameLabel: pvc.Spec.VolumeName,
  404. source.StorageClassLabel: getPersistentVolumeClaimClass(pvc),
  405. }
  406. scrapeResults = append(scrapeResults, metric.Update{
  407. Name: metric.KubePersistentVolumeClaimInfo,
  408. Labels: pvcInfo,
  409. Value: 0,
  410. })
  411. if storage, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage]; ok {
  412. scrapeResults = append(scrapeResults, metric.Update{
  413. Name: metric.KubePersistentVolumeClaimResourceRequestsStorageBytes,
  414. Labels: pvcInfo,
  415. Value: float64(storage.Value()),
  416. })
  417. }
  418. }
  419. events.Dispatch(event.ScrapeEvent{
  420. ScraperName: event.KubernetesClusterScraperName,
  421. ScrapeType: event.PvcScraperType,
  422. Targets: len(pvcs),
  423. Errors: nil,
  424. })
  425. return scrapeResults
  426. }
  427. func (ccs *ClusterCacheScraper) ScrapePVs() []metric.Update {
  428. pvs := ccs.clusterCache.GetAllPersistentVolumes()
  429. return ccs.scrapePVs(pvs)
  430. }
  431. func (ccs *ClusterCacheScraper) scrapePVs(pvs []*clustercache.PersistentVolume) []metric.Update {
  432. var scrapeResults []metric.Update
  433. for _, pv := range pvs {
  434. providerID := pv.Name
  435. // if a more accurate provider ID is available, use that
  436. if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
  437. providerID = pv.Spec.CSI.VolumeHandle
  438. }
  439. pvInfo := map[string]string{
  440. source.PVLabel: pv.Name,
  441. source.UIDLabel: string(pv.UID),
  442. source.StorageClassLabel: pv.Spec.StorageClassName,
  443. source.ProviderIDLabel: providerID,
  444. }
  445. scrapeResults = append(scrapeResults, metric.Update{
  446. Name: metric.KubecostPVInfo,
  447. Labels: pvInfo,
  448. Value: 0,
  449. })
  450. if storage, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok {
  451. scrapeResults = append(scrapeResults, metric.Update{
  452. Name: metric.KubePersistentVolumeCapacityBytes,
  453. Labels: pvInfo,
  454. Value: float64(storage.Value()),
  455. })
  456. }
  457. }
  458. events.Dispatch(event.ScrapeEvent{
  459. ScraperName: event.KubernetesClusterScraperName,
  460. ScrapeType: event.PvScraperType,
  461. Targets: len(pvs),
  462. Errors: nil,
  463. })
  464. return scrapeResults
  465. }
  466. func (ccs *ClusterCacheScraper) ScrapeServices() []metric.Update {
  467. services := ccs.clusterCache.GetAllServices()
  468. return ccs.scrapeServices(services)
  469. }
  470. func (ccs *ClusterCacheScraper) scrapeServices(services []*clustercache.Service) []metric.Update {
  471. var scrapeResults []metric.Update
  472. for _, service := range services {
  473. serviceInfo := map[string]string{
  474. source.ServiceLabel: service.Name,
  475. source.NamespaceLabel: service.Namespace,
  476. source.UIDLabel: string(service.UID),
  477. }
  478. // service labels
  479. labelNames, labelValues := promutil.KubeLabelsToLabels(service.SpecSelector)
  480. serviceLabels := util.ToMap(labelNames, labelValues)
  481. scrapeResults = append(scrapeResults, metric.Update{
  482. Name: metric.ServiceSelectorLabels,
  483. Labels: serviceInfo,
  484. Value: 0,
  485. AdditionalInfo: serviceLabels,
  486. })
  487. }
  488. events.Dispatch(event.ScrapeEvent{
  489. ScraperName: event.KubernetesClusterScraperName,
  490. ScrapeType: event.ServiceScraperType,
  491. Targets: len(services),
  492. Errors: nil,
  493. })
  494. return scrapeResults
  495. }
  496. func (ccs *ClusterCacheScraper) ScrapeStatefulSets() []metric.Update {
  497. statefulSets := ccs.clusterCache.GetAllStatefulSets()
  498. return ccs.scrapeStatefulSets(statefulSets)
  499. }
  500. func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet) []metric.Update {
  501. var scrapeResults []metric.Update
  502. for _, statefulSet := range statefulSets {
  503. statefulSetInfo := map[string]string{
  504. source.StatefulSetLabel: statefulSet.Name,
  505. source.NamespaceLabel: statefulSet.Namespace,
  506. source.UIDLabel: string(statefulSet.UID),
  507. }
  508. // statefulSet labels
  509. labelNames, labelValues := promutil.KubeLabelsToLabels(statefulSet.SpecSelector.MatchLabels)
  510. statefulSetLabels := util.ToMap(labelNames, labelValues)
  511. scrapeResults = append(scrapeResults, metric.Update{
  512. Name: metric.StatefulSetMatchLabels,
  513. Labels: statefulSetInfo,
  514. Value: 0,
  515. AdditionalInfo: statefulSetLabels,
  516. })
  517. }
  518. events.Dispatch(event.ScrapeEvent{
  519. ScraperName: event.KubernetesClusterScraperName,
  520. ScrapeType: event.StatefulSetScraperType,
  521. Targets: len(statefulSets),
  522. Errors: nil,
  523. })
  524. return scrapeResults
  525. }
  526. func (ccs *ClusterCacheScraper) ScrapeReplicaSets() []metric.Update {
  527. replicaSets := ccs.clusterCache.GetAllReplicaSets()
  528. return ccs.scrapeReplicaSets(replicaSets)
  529. }
  530. func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.ReplicaSet) []metric.Update {
  531. var scrapeResults []metric.Update
  532. for _, replicaSet := range replicaSets {
  533. replicaSetInfo := map[string]string{
  534. source.ReplicaSetLabel: replicaSet.Name,
  535. source.NamespaceLabel: replicaSet.Namespace,
  536. source.UIDLabel: string(replicaSet.UID),
  537. }
  538. // this specific metric exports a special <none> value for name and kind
  539. // if there are no owners
  540. if len(replicaSet.OwnerReferences) == 0 {
  541. ownerInfo := maps.Clone(replicaSetInfo)
  542. ownerInfo[source.OwnerKindLabel] = source.NoneLabelValue
  543. ownerInfo[source.OwnerNameLabel] = source.NoneLabelValue
  544. scrapeResults = append(scrapeResults, metric.Update{
  545. Name: metric.KubeReplicasetOwner,
  546. Labels: ownerInfo,
  547. Value: 0,
  548. })
  549. } else {
  550. for _, owner := range replicaSet.OwnerReferences {
  551. ownerInfo := maps.Clone(replicaSetInfo)
  552. ownerInfo[source.OwnerKindLabel] = owner.Kind
  553. ownerInfo[source.OwnerNameLabel] = owner.Name
  554. scrapeResults = append(scrapeResults, metric.Update{
  555. Name: metric.KubeReplicasetOwner,
  556. Labels: ownerInfo,
  557. Value: 0,
  558. })
  559. }
  560. }
  561. }
  562. events.Dispatch(event.ScrapeEvent{
  563. ScraperName: event.KubernetesClusterScraperName,
  564. ScrapeType: event.ReplicaSetScraperType,
  565. Targets: len(replicaSets),
  566. Errors: nil,
  567. })
  568. return scrapeResults
  569. }
  570. func (ccs *ClusterCacheScraper) ScrapeResourceQuotas() []metric.Update {
  571. resourceQuotas := ccs.clusterCache.GetAllResourceQuotas()
  572. return ccs.scrapeResourceQuotas(resourceQuotas)
  573. }
  574. func (ccs *ClusterCacheScraper) scrapeResourceQuotas(resourceQuotas []*clustercache.ResourceQuota) []metric.Update {
  575. var scrapeResults []metric.Update
  576. processResource := func(baseLabels map[string]string, name v1.ResourceName, quantity resource.Quantity, metricName string) metric.Update {
  577. resource, unit, value := toResourceUnitValue(name, quantity)
  578. labels := maps.Clone(baseLabels)
  579. labels[source.ResourceLabel] = resource
  580. labels[source.UnitLabel] = unit
  581. return metric.Update{
  582. Name: metricName,
  583. Labels: labels,
  584. Value: value,
  585. }
  586. }
  587. for _, resourceQuota := range resourceQuotas {
  588. resourceQuotaInfo := map[string]string{
  589. source.ResourceQuotaLabel: resourceQuota.Name,
  590. source.NamespaceLabel: resourceQuota.Namespace,
  591. source.UIDLabel: string(resourceQuota.UID),
  592. }
  593. scrapeResults = append(scrapeResults, metric.Update{
  594. Name: metric.ResourceQuotaInfo,
  595. Labels: resourceQuotaInfo,
  596. AdditionalInfo: resourceQuotaInfo,
  597. Value: 0,
  598. })
  599. if resourceQuota.Spec.Hard != nil {
  600. // CPU/memory requests can also be aliased as "cpu" and "memory". For now, however, only scrape the complete names
  601. // https://kubernetes.io/docs/concepts/policy/resource-quotas/#compute-resource-quota
  602. if quantity, ok := resourceQuota.Spec.Hard[v1.ResourceRequestsCPU]; ok {
  603. scrapeResults = append(scrapeResults, processResource(resourceQuotaInfo, v1.ResourceCPU, quantity, metric.KubeResourceQuotaSpecResourceRequests))
  604. }
  605. if quantity, ok := resourceQuota.Spec.Hard[v1.ResourceRequestsMemory]; ok {
  606. scrapeResults = append(scrapeResults, processResource(resourceQuotaInfo, v1.ResourceMemory, quantity, metric.KubeResourceQuotaSpecResourceRequests))
  607. }
  608. if quantity, ok := resourceQuota.Spec.Hard[v1.ResourceLimitsCPU]; ok {
  609. scrapeResults = append(scrapeResults, processResource(resourceQuotaInfo, v1.ResourceCPU, quantity, metric.KubeResourceQuotaSpecResourceLimits))
  610. }
  611. if quantity, ok := resourceQuota.Spec.Hard[v1.ResourceLimitsMemory]; ok {
  612. scrapeResults = append(scrapeResults, processResource(resourceQuotaInfo, v1.ResourceMemory, quantity, metric.KubeResourceQuotaSpecResourceLimits))
  613. }
  614. }
  615. if resourceQuota.Status.Used != nil {
  616. if quantity, ok := resourceQuota.Status.Used[v1.ResourceRequestsCPU]; ok {
  617. scrapeResults = append(scrapeResults, processResource(resourceQuotaInfo, v1.ResourceCPU, quantity, metric.KubeResourceQuotaStatusUsedResourceRequests))
  618. }
  619. if quantity, ok := resourceQuota.Status.Used[v1.ResourceRequestsMemory]; ok {
  620. scrapeResults = append(scrapeResults, processResource(resourceQuotaInfo, v1.ResourceMemory, quantity, metric.KubeResourceQuotaStatusUsedResourceRequests))
  621. }
  622. if quantity, ok := resourceQuota.Status.Used[v1.ResourceLimitsCPU]; ok {
  623. scrapeResults = append(scrapeResults, processResource(resourceQuotaInfo, v1.ResourceCPU, quantity, metric.KubeResourceQuotaStatusUsedResourceLimits))
  624. }
  625. if quantity, ok := resourceQuota.Status.Used[v1.ResourceLimitsMemory]; ok {
  626. scrapeResults = append(scrapeResults, processResource(resourceQuotaInfo, v1.ResourceMemory, quantity, metric.KubeResourceQuotaStatusUsedResourceLimits))
  627. }
  628. }
  629. }
  630. events.Dispatch(event.ScrapeEvent{
  631. ScraperName: event.KubernetesClusterScraperName,
  632. ScrapeType: event.ResourceQuotaScraperType,
  633. Targets: len(resourceQuotas),
  634. Errors: nil,
  635. })
  636. return scrapeResults
  637. }
  638. // PvcInfo is used to store information about a pvc for tracking volume usage.
  639. type PvcInfo struct {
  640. Class string
  641. Claim string
  642. Namespace string
  643. VolumeName string
  644. Requests float64
  645. PodsClaimed []string
  646. }
  647. func getPvcsInfo(pvcs []*clustercache.PersistentVolumeClaim) map[string]*PvcInfo {
  648. toReturn := make(map[string]*PvcInfo)
  649. for _, pvc := range pvcs {
  650. ns := pvc.Namespace
  651. pvcName := pvc.Name
  652. volumeName := pvc.Spec.VolumeName
  653. pvClass := getPersistentVolumeClaimClass(pvc)
  654. requests := float64(pvc.Spec.Resources.Requests.Storage().Value())
  655. key := ns + "," + pvcName
  656. toReturn[key] = &PvcInfo{
  657. Class: pvClass,
  658. Claim: pvcName,
  659. Namespace: ns,
  660. VolumeName: volumeName,
  661. Requests: requests,
  662. }
  663. }
  664. return toReturn
  665. }
  666. // NodeGpuInfo contains the gpu count and vgpu counts for nodes
  667. type NodeGpuInfo struct {
  668. GPU float64
  669. VGPU float64
  670. }
  671. func (ccs *ClusterCacheScraper) getNodesGpuInfo() map[string]*NodeGpuInfo {
  672. // use a closure to cache allocatableVGPU result instead of calculating
  673. // it every time we need it
  674. var allocatableVGPUs *float64
  675. allocVGPUs := func() (float64, error) {
  676. if allocatableVGPUs != nil {
  677. return *allocatableVGPUs, nil
  678. }
  679. vgpu, err := getAllocatableVGPUs(ccs.clusterCache.GetAllDaemonSets())
  680. if err != nil {
  681. return vgpu, err
  682. }
  683. allocatableVGPUs = &vgpu
  684. return *allocatableVGPUs, nil
  685. }
  686. var nodeGpuMap map[string]*NodeGpuInfo = make(map[string]*NodeGpuInfo)
  687. for _, node := range ccs.clusterCache.GetAllNodes() {
  688. info, err := gpuInfoFor(node, allocVGPUs)
  689. if err != nil {
  690. log.Warnf("Failed to retrieve GPU Info for Node: %s - %s", node.Name, err)
  691. continue
  692. }
  693. nodeGpuMap[node.Name] = info
  694. }
  695. return nodeGpuMap
  696. }
  697. // getPersistentVolumeClaimClass returns StorageClassName. If no storage class was
  698. // requested, it returns "".
  699. func getPersistentVolumeClaimClass(claim *clustercache.PersistentVolumeClaim) string {
  700. // Use beta annotation first
  701. if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found {
  702. return class
  703. }
  704. if claim.Spec.StorageClassName != nil {
  705. return *claim.Spec.StorageClassName
  706. }
  707. // Special non-empty string to indicate absence of storage class.
  708. return ""
  709. }
  710. // toResourceUnitValue accepts a resource name and quantity and returns the sanitized resource, the unit, and the value in the units.
  711. // Returns an empty string for resource and unit if there was a failure.
  712. func toResourceUnitValue(resourceName v1.ResourceName, quantity resource.Quantity) (resource string, unit string, value float64) {
  713. resource = promutil.SanitizeLabelName(string(resourceName))
  714. switch resourceName {
  715. case v1.ResourceCPU:
  716. unit = "core"
  717. value = float64(quantity.MilliValue()) / 1000
  718. return
  719. case v1.ResourceStorage:
  720. fallthrough
  721. case v1.ResourceEphemeralStorage:
  722. fallthrough
  723. case v1.ResourceMemory:
  724. unit = "byte"
  725. value = float64(quantity.Value())
  726. return
  727. case v1.ResourcePods:
  728. unit = "integer"
  729. value = float64(quantity.Value())
  730. return
  731. default:
  732. if isHugePageResourceName(resourceName) || isAttachableVolumeResourceName(resourceName) {
  733. unit = "byte"
  734. value = float64(quantity.Value())
  735. return
  736. }
  737. if isExtendedResourceName(resourceName) {
  738. unit = "integer"
  739. value = float64(quantity.Value())
  740. return
  741. }
  742. }
  743. resource = ""
  744. unit = ""
  745. value = 0.0
  746. return
  747. }
  748. func isGpuResourceName(name v1.ResourceName) bool {
  749. return name == "nvidia.com/gpu" || name == "k8s.amazonaws.com/vgpu"
  750. }
  751. // isHugePageResourceName checks for a huge page container resource name
  752. func isHugePageResourceName(name v1.ResourceName) bool {
  753. return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix)
  754. }
  755. // isAttachableVolumeResourceName checks for attached volume container resource name
  756. func isAttachableVolumeResourceName(name v1.ResourceName) bool {
  757. return strings.HasPrefix(string(name), v1.ResourceAttachableVolumesPrefix)
  758. }
  759. // isExtendedResourceName checks for extended container resource name
  760. func isExtendedResourceName(name v1.ResourceName) bool {
  761. if isNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) {
  762. return false
  763. }
  764. // Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name
  765. nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name))
  766. if errs := validation.IsQualifiedName(nameForQuota); len(errs) != 0 {
  767. return false
  768. }
  769. return true
  770. }
  771. // isNativeResource checks for a kubernetes.io/ prefixed resource name
  772. func isNativeResource(name v1.ResourceName) bool {
  773. return !strings.Contains(string(name), "/") || isPrefixedNativeResource(name)
  774. }
  775. func isPrefixedNativeResource(name v1.ResourceName) bool {
  776. return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix)
  777. }
  778. // gets the Node GPUs and VGPUs using the node data from k8s. Returns nil if GPUs could not be located for the node.
  779. func gpuInfoFor(
  780. n *clustercache.Node,
  781. allocatedVGPUs func() (float64, error),
  782. ) (*NodeGpuInfo, error) {
  783. g, hasGpu := n.Status.Capacity["nvidia.com/gpu"]
  784. _, hasReplicas := n.Labels["nvidia.com/gpu.replicas"]
  785. // Case 1: Standard NVIDIA GPU
  786. if hasGpu && g.Value() != 0 && !hasReplicas {
  787. return &NodeGpuInfo{
  788. GPU: float64(g.Value()),
  789. VGPU: float64(g.Value()),
  790. }, nil
  791. }
  792. // Case 2: NVIDIA GPU with GPU Feature Discovery (GFD) Pod enabled.
  793. // Ref: https://docs.nvidia.com/datacenter/cloud-native/gpu-operator/latest/gpu-sharing.html#verifying-the-gpu-time-slicing-configuration
  794. // Ref: https://github.com/NVIDIA/k8s-device-plugin/blob/d899752a424818428f744a946d32b132ea2c0cf1/internal/lm/resource_test.go#L44-L45
  795. // Ref: https://github.com/NVIDIA/k8s-device-plugin/blob/d899752a424818428f744a946d32b132ea2c0cf1/internal/lm/resource_test.go#L103-L118
  796. if hasReplicas {
  797. resultGPU := 0.0
  798. resultVGPU := 0.0
  799. if c, ok := n.Labels["nvidia.com/gpu.count"]; ok {
  800. var err error
  801. resultGPU, err = strconv.ParseFloat(c, 64)
  802. if err != nil {
  803. return nil, fmt.Errorf("could not parse label \"nvidia.com/gpu.count\": %v", err)
  804. }
  805. }
  806. if s, ok := n.Status.Capacity["nvidia.com/gpu.shared"]; ok { // GFD configured `renameByDefault=true`
  807. resultVGPU = float64(s.Value())
  808. } else if g, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // GFD configured `renameByDefault=false`
  809. resultVGPU = float64(g.Value())
  810. } else {
  811. resultVGPU = resultGPU
  812. }
  813. return &NodeGpuInfo{
  814. GPU: resultGPU,
  815. VGPU: resultVGPU,
  816. }, nil
  817. }
  818. // Case 3: AWS vGPU
  819. if vgpu, ok := n.Status.Capacity["k8s.amazonaws.com/vgpu"]; ok {
  820. vgpuCount, err := allocatedVGPUs()
  821. if err != nil {
  822. return nil, err
  823. }
  824. vgpuCoeff := 10.0
  825. if vgpuCount > 0.0 {
  826. vgpuCoeff = vgpuCount
  827. }
  828. if vgpu.Value() != 0 {
  829. resultGPU := float64(vgpu.Value()) / vgpuCoeff
  830. resultVGPU := float64(vgpu.Value())
  831. return &NodeGpuInfo{
  832. GPU: resultGPU,
  833. VGPU: resultVGPU,
  834. }, nil
  835. }
  836. }
  837. // No GPU found
  838. return nil, nil
  839. }
  840. func getAllocatableVGPUs(daemonsets []*clustercache.DaemonSet) (float64, error) {
  841. vgpuCount := 0.0
  842. for _, ds := range daemonsets {
  843. dsContainerList := &ds.SpecContainers
  844. for _, ctnr := range *dsContainerList {
  845. if ctnr.Args != nil {
  846. for _, arg := range ctnr.Args {
  847. if strings.Contains(arg, "--vgpu=") {
  848. vgpus, err := strconv.ParseFloat(arg[strings.IndexByte(arg, '=')+1:], 64)
  849. if err != nil {
  850. log.Errorf("failed to parse vgpu allocation string %s: %v", arg, err)
  851. continue
  852. }
  853. vgpuCount = vgpus
  854. return vgpuCount, nil
  855. }
  856. }
  857. }
  858. }
  859. }
  860. return vgpuCount, nil
  861. }