clustercache.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. package scrape
  2. import (
  3. "fmt"
  4. "slices"
  5. "strings"
  6. "github.com/kubecost/events"
  7. "github.com/opencost/opencost/core/pkg/clustercache"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/source"
  10. "github.com/opencost/opencost/core/pkg/util/promutil"
  11. "github.com/opencost/opencost/modules/collector-source/pkg/event"
  12. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  13. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  14. "golang.org/x/exp/maps"
  15. v1 "k8s.io/api/core/v1"
  16. "k8s.io/apimachinery/pkg/api/resource"
  17. "k8s.io/apimachinery/pkg/util/validation"
  18. )
  19. type ClusterCacheScraper struct {
  20. clusterCache clustercache.ClusterCache
  21. }
  22. func newClusterCacheScraper(clusterCache clustercache.ClusterCache) Scraper {
  23. return &ClusterCacheScraper{
  24. clusterCache: clusterCache,
  25. }
  26. }
  27. func (ccs *ClusterCacheScraper) Scrape() []metric.Update {
  28. scrapeFuncs := []ScrapeFunc{
  29. ccs.ScrapeNodes,
  30. ccs.ScrapeDeployments,
  31. ccs.ScrapeNamespaces,
  32. ccs.ScrapePods,
  33. ccs.ScrapePVCs,
  34. ccs.ScrapePVs,
  35. ccs.ScrapeServices,
  36. ccs.ScrapeStatefulSets,
  37. ccs.ScrapeReplicaSets,
  38. }
  39. return concurrentScrape(scrapeFuncs...)
  40. }
  41. func (ccs *ClusterCacheScraper) ScrapeNodes() []metric.Update {
  42. nodes := ccs.clusterCache.GetAllNodes()
  43. return ccs.scrapeNodes(nodes)
  44. }
  45. func (ccs *ClusterCacheScraper) scrapeNodes(nodes []*clustercache.Node) []metric.Update {
  46. var scrapeResults []metric.Update
  47. for _, node := range nodes {
  48. nodeInfo := map[string]string{
  49. source.NodeLabel: node.Name,
  50. source.ProviderIDLabel: node.SpecProviderID,
  51. }
  52. // Node Capacity
  53. if node.Status.Capacity != nil {
  54. if quantity, ok := node.Status.Capacity[v1.ResourceCPU]; ok {
  55. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  56. scrapeResults = append(scrapeResults, metric.Update{
  57. Name: metric.KubeNodeStatusCapacityCPUCores,
  58. Labels: nodeInfo,
  59. Value: value,
  60. })
  61. }
  62. if quantity, ok := node.Status.Capacity[v1.ResourceMemory]; ok {
  63. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  64. scrapeResults = append(scrapeResults, metric.Update{
  65. Name: metric.KubeNodeStatusCapacityMemoryBytes,
  66. Labels: nodeInfo,
  67. Value: value,
  68. })
  69. }
  70. }
  71. // Node Allocatable Resources
  72. if node.Status.Allocatable != nil {
  73. if quantity, ok := node.Status.Allocatable[v1.ResourceCPU]; ok {
  74. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  75. scrapeResults = append(scrapeResults, metric.Update{
  76. Name: metric.KubeNodeStatusAllocatableCPUCores,
  77. Labels: nodeInfo,
  78. Value: value,
  79. })
  80. }
  81. if quantity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok {
  82. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  83. scrapeResults = append(scrapeResults, metric.Update{
  84. Name: metric.KubeNodeStatusAllocatableMemoryBytes,
  85. Labels: nodeInfo,
  86. Value: value,
  87. })
  88. }
  89. }
  90. // node labels
  91. labelNames, labelValues := promutil.KubeLabelsToLabels(node.Labels)
  92. nodeLabels := util.ToMap(labelNames, labelValues)
  93. scrapeResults = append(scrapeResults, metric.Update{
  94. Name: metric.KubeNodeLabels,
  95. Labels: nodeInfo,
  96. Value: 0,
  97. AdditionalInfo: nodeLabels,
  98. })
  99. }
  100. events.Dispatch(event.ScrapeEvent{
  101. ScraperName: event.KubernetesClusterScraperName,
  102. ScrapeType: event.NodeScraperType,
  103. Targets: len(nodes),
  104. Errors: nil,
  105. })
  106. return scrapeResults
  107. }
  108. func (ccs *ClusterCacheScraper) ScrapeDeployments() []metric.Update {
  109. deployments := ccs.clusterCache.GetAllDeployments()
  110. return ccs.scrapeDeployments(deployments)
  111. }
  112. func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.Deployment) []metric.Update {
  113. var scrapeResults []metric.Update
  114. for _, deployment := range deployments {
  115. deploymentInfo := map[string]string{
  116. source.DeploymentLabel: deployment.Name,
  117. source.NamespaceLabel: deployment.Namespace,
  118. }
  119. // deployment labels
  120. labelNames, labelValues := promutil.KubeLabelsToLabels(deployment.MatchLabels)
  121. deploymentLabels := util.ToMap(labelNames, labelValues)
  122. scrapeResults = append(scrapeResults, metric.Update{
  123. Name: metric.DeploymentMatchLabels,
  124. Labels: deploymentInfo,
  125. Value: 0,
  126. AdditionalInfo: deploymentLabels,
  127. })
  128. }
  129. events.Dispatch(event.ScrapeEvent{
  130. ScraperName: event.KubernetesClusterScraperName,
  131. ScrapeType: event.DeploymentScraperType,
  132. Targets: len(deployments),
  133. Errors: nil,
  134. })
  135. return scrapeResults
  136. }
  137. func (ccs *ClusterCacheScraper) ScrapeNamespaces() []metric.Update {
  138. namespaces := ccs.clusterCache.GetAllNamespaces()
  139. return ccs.scrapeNamespaces(namespaces)
  140. }
  141. func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Namespace) []metric.Update {
  142. var scrapeResults []metric.Update
  143. for _, namespace := range namespaces {
  144. namespaceInfo := map[string]string{
  145. source.NamespaceLabel: namespace.Name,
  146. }
  147. // namespace labels
  148. labelNames, labelValues := promutil.KubeLabelsToLabels(namespace.Labels)
  149. namespaceLabels := util.ToMap(labelNames, labelValues)
  150. scrapeResults = append(scrapeResults, metric.Update{
  151. Name: metric.KubeNamespaceLabels,
  152. Labels: namespaceInfo,
  153. Value: 0,
  154. AdditionalInfo: namespaceLabels,
  155. })
  156. // namespace annotations
  157. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(namespace.Annotations)
  158. namespaceAnnotations := util.ToMap(annotationNames, annotationValues)
  159. scrapeResults = append(scrapeResults, metric.Update{
  160. Name: metric.KubeNamespaceAnnotations,
  161. Labels: namespaceInfo,
  162. Value: 0,
  163. AdditionalInfo: namespaceAnnotations,
  164. })
  165. }
  166. events.Dispatch(event.ScrapeEvent{
  167. ScraperName: event.KubernetesClusterScraperName,
  168. ScrapeType: event.NamespaceScraperType,
  169. Targets: len(namespaces),
  170. Errors: nil,
  171. })
  172. return scrapeResults
  173. }
  174. func (ccs *ClusterCacheScraper) ScrapePods() []metric.Update {
  175. pods := ccs.clusterCache.GetAllPods()
  176. return ccs.scrapePods(pods)
  177. }
  178. func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Update {
  179. var scrapeResults []metric.Update
  180. for _, pod := range pods {
  181. podInfo := map[string]string{
  182. source.PodLabel: pod.Name,
  183. source.NamespaceLabel: pod.Namespace,
  184. source.UIDLabel: string(pod.UID),
  185. source.NodeLabel: pod.Spec.NodeName,
  186. source.InstanceLabel: pod.Spec.NodeName,
  187. }
  188. // pod labels
  189. labelNames, labelValues := promutil.KubeLabelsToLabels(pod.Labels)
  190. podLabels := util.ToMap(labelNames, labelValues)
  191. scrapeResults = append(scrapeResults, metric.Update{
  192. Name: metric.KubePodLabels,
  193. Labels: podInfo,
  194. Value: 0,
  195. AdditionalInfo: podLabels,
  196. })
  197. // pod annotations
  198. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(pod.Annotations)
  199. podAnnotations := util.ToMap(annotationNames, annotationValues)
  200. scrapeResults = append(scrapeResults, metric.Update{
  201. Name: metric.KubePodAnnotations,
  202. Labels: podInfo,
  203. Value: 0,
  204. AdditionalInfo: podAnnotations,
  205. })
  206. // Pod owner metric
  207. for _, owner := range pod.OwnerReferences {
  208. ownerInfo := maps.Clone(podInfo)
  209. ownerInfo[source.OwnerKindLabel] = owner.Kind
  210. ownerInfo[source.OwnerNameLabel] = owner.Name
  211. scrapeResults = append(scrapeResults, metric.Update{
  212. Name: metric.KubePodOwner,
  213. Labels: ownerInfo,
  214. Value: 0,
  215. })
  216. }
  217. // Container Status
  218. for _, status := range pod.Status.ContainerStatuses {
  219. if status.State.Running != nil {
  220. containerInfo := maps.Clone(podInfo)
  221. containerInfo[source.ContainerLabel] = status.Name
  222. scrapeResults = append(scrapeResults, metric.Update{
  223. Name: metric.KubePodContainerStatusRunning,
  224. Labels: containerInfo,
  225. Value: 0,
  226. })
  227. }
  228. }
  229. for _, container := range pod.Spec.Containers {
  230. containerInfo := maps.Clone(podInfo)
  231. containerInfo[source.ContainerLabel] = container.Name
  232. // Requests
  233. if container.Resources.Requests != nil {
  234. // sorting keys here for testing purposes
  235. keys := maps.Keys(container.Resources.Requests)
  236. slices.Sort(keys)
  237. for _, resourceName := range keys {
  238. quantity := container.Resources.Requests[resourceName]
  239. resource, unit, value := toResourceUnitValue(resourceName, quantity)
  240. // failed to parse the resource type
  241. if resource == "" {
  242. log.DedupedWarningf(5, "Failed to parse resource units and quantity for resource: %s", resourceName)
  243. continue
  244. }
  245. resourceRequestInfo := maps.Clone(containerInfo)
  246. resourceRequestInfo[source.ResourceLabel] = resource
  247. resourceRequestInfo[source.UnitLabel] = unit
  248. scrapeResults = append(scrapeResults, metric.Update{
  249. Name: metric.KubePodContainerResourceRequests,
  250. Labels: resourceRequestInfo,
  251. Value: value,
  252. })
  253. }
  254. }
  255. }
  256. }
  257. events.Dispatch(event.ScrapeEvent{
  258. ScraperName: event.KubernetesClusterScraperName,
  259. ScrapeType: event.PodScraperType,
  260. Targets: len(pods),
  261. Errors: nil,
  262. })
  263. return scrapeResults
  264. }
  265. func (ccs *ClusterCacheScraper) ScrapePVCs() []metric.Update {
  266. pvcs := ccs.clusterCache.GetAllPersistentVolumeClaims()
  267. return ccs.scrapePVCs(pvcs)
  268. }
  269. func (ccs *ClusterCacheScraper) scrapePVCs(pvcs []*clustercache.PersistentVolumeClaim) []metric.Update {
  270. var scrapeResults []metric.Update
  271. for _, pvc := range pvcs {
  272. pvcInfo := map[string]string{
  273. source.PVCLabel: pvc.Name,
  274. source.NamespaceLabel: pvc.Namespace,
  275. source.VolumeNameLabel: pvc.Spec.VolumeName,
  276. source.StorageClassLabel: getPersistentVolumeClaimClass(pvc),
  277. }
  278. scrapeResults = append(scrapeResults, metric.Update{
  279. Name: metric.KubePersistentVolumeClaimInfo,
  280. Labels: pvcInfo,
  281. Value: 0,
  282. })
  283. if storage, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage]; ok {
  284. scrapeResults = append(scrapeResults, metric.Update{
  285. Name: metric.KubePersistentVolumeClaimResourceRequestsStorageBytes,
  286. Labels: pvcInfo,
  287. Value: float64(storage.Value()),
  288. })
  289. }
  290. }
  291. events.Dispatch(event.ScrapeEvent{
  292. ScraperName: event.KubernetesClusterScraperName,
  293. ScrapeType: event.PvcScraperType,
  294. Targets: len(pvcs),
  295. Errors: nil,
  296. })
  297. return scrapeResults
  298. }
  299. func (ccs *ClusterCacheScraper) ScrapePVs() []metric.Update {
  300. pvs := ccs.clusterCache.GetAllPersistentVolumes()
  301. return ccs.scrapePVs(pvs)
  302. }
  303. func (ccs *ClusterCacheScraper) scrapePVs(pvs []*clustercache.PersistentVolume) []metric.Update {
  304. var scrapeResults []metric.Update
  305. for _, pv := range pvs {
  306. providerID := pv.Name
  307. // if a more accurate provider ID is available, use that
  308. if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
  309. providerID = pv.Spec.CSI.VolumeHandle
  310. }
  311. pvInfo := map[string]string{
  312. source.PVLabel: pv.Name,
  313. source.StorageClassLabel: pv.Spec.StorageClassName,
  314. source.ProviderIDLabel: providerID,
  315. }
  316. scrapeResults = append(scrapeResults, metric.Update{
  317. Name: metric.KubecostPVInfo,
  318. Labels: pvInfo,
  319. Value: 0,
  320. })
  321. if storage, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok {
  322. scrapeResults = append(scrapeResults, metric.Update{
  323. Name: metric.KubePersistentVolumeCapacityBytes,
  324. Labels: pvInfo,
  325. Value: float64(storage.Value()),
  326. })
  327. }
  328. }
  329. events.Dispatch(event.ScrapeEvent{
  330. ScraperName: event.KubernetesClusterScraperName,
  331. ScrapeType: event.PvScraperType,
  332. Targets: len(pvs),
  333. Errors: nil,
  334. })
  335. return scrapeResults
  336. }
  337. func (ccs *ClusterCacheScraper) ScrapeServices() []metric.Update {
  338. services := ccs.clusterCache.GetAllServices()
  339. return ccs.scrapeServices(services)
  340. }
  341. func (ccs *ClusterCacheScraper) scrapeServices(services []*clustercache.Service) []metric.Update {
  342. var scrapeResults []metric.Update
  343. for _, service := range services {
  344. serviceInfo := map[string]string{
  345. source.ServiceLabel: service.Name,
  346. source.NamespaceLabel: service.Namespace,
  347. }
  348. // service labels
  349. labelNames, labelValues := promutil.KubeLabelsToLabels(service.SpecSelector)
  350. serviceLabels := util.ToMap(labelNames, labelValues)
  351. scrapeResults = append(scrapeResults, metric.Update{
  352. Name: metric.ServiceSelectorLabels,
  353. Labels: serviceInfo,
  354. Value: 0,
  355. AdditionalInfo: serviceLabels,
  356. })
  357. }
  358. events.Dispatch(event.ScrapeEvent{
  359. ScraperName: event.KubernetesClusterScraperName,
  360. ScrapeType: event.ServiceScraperType,
  361. Targets: len(services),
  362. Errors: nil,
  363. })
  364. return scrapeResults
  365. }
  366. func (ccs *ClusterCacheScraper) ScrapeStatefulSets() []metric.Update {
  367. statefulSets := ccs.clusterCache.GetAllStatefulSets()
  368. return ccs.scrapeStatefulSets(statefulSets)
  369. }
  370. func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet) []metric.Update {
  371. var scrapeResults []metric.Update
  372. for _, statefulSet := range statefulSets {
  373. statefulSetInfo := map[string]string{
  374. source.StatefulSetLabel: statefulSet.Name,
  375. source.NamespaceLabel: statefulSet.Namespace,
  376. }
  377. // statefulSet labels
  378. labelNames, labelValues := promutil.KubeLabelsToLabels(statefulSet.SpecSelector.MatchLabels)
  379. statefulSetLabels := util.ToMap(labelNames, labelValues)
  380. scrapeResults = append(scrapeResults, metric.Update{
  381. Name: metric.StatefulSetMatchLabels,
  382. Labels: statefulSetInfo,
  383. Value: 0,
  384. AdditionalInfo: statefulSetLabels,
  385. })
  386. }
  387. events.Dispatch(event.ScrapeEvent{
  388. ScraperName: event.KubernetesClusterScraperName,
  389. ScrapeType: event.StatefulSetScraperType,
  390. Targets: len(statefulSets),
  391. Errors: nil,
  392. })
  393. return scrapeResults
  394. }
  395. func (ccs *ClusterCacheScraper) ScrapeReplicaSets() []metric.Update {
  396. replicaSets := ccs.clusterCache.GetAllReplicaSets()
  397. return ccs.scrapeReplicaSets(replicaSets)
  398. }
  399. func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.ReplicaSet) []metric.Update {
  400. var scrapeResults []metric.Update
  401. for _, replicaSet := range replicaSets {
  402. replicaSetInfo := map[string]string{
  403. source.ReplicaSetLabel: replicaSet.Name,
  404. source.NamespaceLabel: replicaSet.Namespace,
  405. }
  406. // this specific metric exports a special <none> value for name and kind
  407. // if there are no owners
  408. if len(replicaSet.OwnerReferences) == 0 {
  409. ownerInfo := maps.Clone(replicaSetInfo)
  410. ownerInfo[source.OwnerKindLabel] = source.NoneLabelValue
  411. ownerInfo[source.OwnerNameLabel] = source.NoneLabelValue
  412. scrapeResults = append(scrapeResults, metric.Update{
  413. Name: metric.KubeReplicasetOwner,
  414. Labels: ownerInfo,
  415. Value: 0,
  416. })
  417. } else {
  418. for _, owner := range replicaSet.OwnerReferences {
  419. ownerInfo := maps.Clone(replicaSetInfo)
  420. ownerInfo[source.OwnerKindLabel] = owner.Kind
  421. ownerInfo[source.OwnerNameLabel] = owner.Name
  422. scrapeResults = append(scrapeResults, metric.Update{
  423. Name: metric.KubeReplicasetOwner,
  424. Labels: ownerInfo,
  425. Value: 0,
  426. })
  427. }
  428. }
  429. }
  430. events.Dispatch(event.ScrapeEvent{
  431. ScraperName: event.KubernetesClusterScraperName,
  432. ScrapeType: event.ReplicaSetScraperType,
  433. Targets: len(replicaSets),
  434. Errors: nil,
  435. })
  436. return scrapeResults
  437. }
  438. // getPersistentVolumeClaimClass returns StorageClassName. If no storage class was
  439. // requested, it returns "".
  440. func getPersistentVolumeClaimClass(claim *clustercache.PersistentVolumeClaim) string {
  441. // Use beta annotation first
  442. if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found {
  443. return class
  444. }
  445. if claim.Spec.StorageClassName != nil {
  446. return *claim.Spec.StorageClassName
  447. }
  448. // Special non-empty string to indicate absence of storage class.
  449. return ""
  450. }
  451. // toResourceUnitValue accepts a resource name and quantity and returns the sanitized resource, the unit, and the value in the units.
  452. // Returns an empty string for resource and unit if there was a failure.
  453. func toResourceUnitValue(resourceName v1.ResourceName, quantity resource.Quantity) (resource string, unit string, value float64) {
  454. resource = promutil.SanitizeLabelName(string(resourceName))
  455. switch resourceName {
  456. case v1.ResourceCPU:
  457. unit = "core"
  458. value = float64(quantity.MilliValue()) / 1000
  459. return
  460. case v1.ResourceStorage:
  461. fallthrough
  462. case v1.ResourceEphemeralStorage:
  463. fallthrough
  464. case v1.ResourceMemory:
  465. unit = "byte"
  466. value = float64(quantity.Value())
  467. return
  468. case v1.ResourcePods:
  469. unit = "integer"
  470. value = float64(quantity.Value())
  471. return
  472. default:
  473. if isHugePageResourceName(resourceName) || isAttachableVolumeResourceName(resourceName) {
  474. unit = "byte"
  475. value = float64(quantity.Value())
  476. return
  477. }
  478. if isExtendedResourceName(resourceName) {
  479. unit = "integer"
  480. value = float64(quantity.Value())
  481. return
  482. }
  483. }
  484. resource = ""
  485. unit = ""
  486. value = 0.0
  487. return
  488. }
  489. // isHugePageResourceName checks for a huge page container resource name
  490. func isHugePageResourceName(name v1.ResourceName) bool {
  491. return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix)
  492. }
  493. // isAttachableVolumeResourceName checks for attached volume container resource name
  494. func isAttachableVolumeResourceName(name v1.ResourceName) bool {
  495. return strings.HasPrefix(string(name), v1.ResourceAttachableVolumesPrefix)
  496. }
  497. // isExtendedResourceName checks for extended container resource name
  498. func isExtendedResourceName(name v1.ResourceName) bool {
  499. if isNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) {
  500. return false
  501. }
  502. // Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name
  503. nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name))
  504. if errs := validation.IsQualifiedName(nameForQuota); len(errs) != 0 {
  505. return false
  506. }
  507. return true
  508. }
  509. // isNativeResource checks for a kubernetes.io/ prefixed resource name
  510. func isNativeResource(name v1.ResourceName) bool {
  511. return !strings.Contains(string(name), "/") || isPrefixedNativeResource(name)
  512. }
  513. func isPrefixedNativeResource(name v1.ResourceName) bool {
  514. return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix)
  515. }