2
0

clustercache.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. package scrape
  2. import (
  3. "fmt"
  4. "slices"
  5. "strings"
  6. "github.com/opencost/opencost/core/pkg/clustercache"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/core/pkg/util/promutil"
  10. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  11. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  12. "golang.org/x/exp/maps"
  13. v1 "k8s.io/api/core/v1"
  14. "k8s.io/apimachinery/pkg/api/resource"
  15. "k8s.io/apimachinery/pkg/util/validation"
  16. )
  17. type ClusterCacheScraper struct {
  18. clusterCache clustercache.ClusterCache
  19. }
  20. func newClusterCacheScraper(clusterCache clustercache.ClusterCache) Scraper {
  21. return &ClusterCacheScraper{
  22. clusterCache: clusterCache,
  23. }
  24. }
  25. func (ccs *ClusterCacheScraper) Scrape() []metric.Update {
  26. scrapeFuncs := []ScrapeFunc{
  27. ccs.ScrapeNodes,
  28. ccs.ScrapeDeployments,
  29. ccs.ScrapeNamespaces,
  30. ccs.ScrapePods,
  31. ccs.ScrapePVCs,
  32. ccs.ScrapePVs,
  33. ccs.ScrapeServices,
  34. ccs.ScrapeStatefulSets,
  35. ccs.ScrapeReplicaSets,
  36. }
  37. return concurrentScrape(scrapeFuncs...)
  38. }
  39. func (ccs *ClusterCacheScraper) ScrapeNodes() []metric.Update {
  40. nodes := ccs.clusterCache.GetAllNodes()
  41. return ccs.scrapeNodes(nodes)
  42. }
  43. func (ccs *ClusterCacheScraper) scrapeNodes(nodes []*clustercache.Node) []metric.Update {
  44. var scrapeResults []metric.Update
  45. for _, node := range nodes {
  46. nodeInfo := map[string]string{
  47. source.NodeLabel: node.Name,
  48. source.ProviderIDLabel: node.SpecProviderID,
  49. }
  50. // Node Capacity
  51. if node.Status.Capacity != nil {
  52. if quantity, ok := node.Status.Capacity[v1.ResourceCPU]; ok {
  53. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  54. scrapeResults = append(scrapeResults, metric.Update{
  55. Name: metric.KubeNodeStatusCapacityCPUCores,
  56. Labels: nodeInfo,
  57. Value: value,
  58. })
  59. }
  60. if quantity, ok := node.Status.Capacity[v1.ResourceMemory]; ok {
  61. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  62. scrapeResults = append(scrapeResults, metric.Update{
  63. Name: metric.KubeNodeStatusCapacityMemoryBytes,
  64. Labels: nodeInfo,
  65. Value: value,
  66. })
  67. }
  68. }
  69. // Node Allocatable Resources
  70. if node.Status.Allocatable != nil {
  71. if quantity, ok := node.Status.Allocatable[v1.ResourceCPU]; ok {
  72. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  73. scrapeResults = append(scrapeResults, metric.Update{
  74. Name: metric.KubeNodeStatusAllocatableCPUCores,
  75. Labels: nodeInfo,
  76. Value: value,
  77. })
  78. }
  79. if quantity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok {
  80. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  81. scrapeResults = append(scrapeResults, metric.Update{
  82. Name: metric.KubeNodeStatusAllocatableMemoryBytes,
  83. Labels: nodeInfo,
  84. Value: value,
  85. })
  86. }
  87. }
  88. // node labels
  89. labelNames, labelValues := promutil.KubeLabelsToLabels(node.Labels)
  90. nodeLabels := util.ToMap(labelNames, labelValues)
  91. scrapeResults = append(scrapeResults, metric.Update{
  92. Name: metric.KubeNodeLabels,
  93. Labels: nodeInfo,
  94. Value: 0,
  95. AdditionalInfo: nodeLabels,
  96. })
  97. }
  98. return scrapeResults
  99. }
  100. func (ccs *ClusterCacheScraper) ScrapeDeployments() []metric.Update {
  101. deployments := ccs.clusterCache.GetAllDeployments()
  102. return ccs.scrapeDeployments(deployments)
  103. }
  104. func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.Deployment) []metric.Update {
  105. var scrapeResults []metric.Update
  106. for _, deployment := range deployments {
  107. deploymentInfo := map[string]string{
  108. source.DeploymentLabel: deployment.Name,
  109. source.NamespaceLabel: deployment.Namespace,
  110. }
  111. // deployment labels
  112. labelNames, labelValues := promutil.KubeLabelsToLabels(deployment.MatchLabels)
  113. deploymentLabels := util.ToMap(labelNames, labelValues)
  114. scrapeResults = append(scrapeResults, metric.Update{
  115. Name: metric.DeploymentMatchLabels,
  116. Labels: deploymentInfo,
  117. Value: 0,
  118. AdditionalInfo: deploymentLabels,
  119. })
  120. }
  121. return scrapeResults
  122. }
  123. func (ccs *ClusterCacheScraper) ScrapeNamespaces() []metric.Update {
  124. namespaces := ccs.clusterCache.GetAllNamespaces()
  125. return ccs.scrapeNamespaces(namespaces)
  126. }
  127. func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Namespace) []metric.Update {
  128. var scrapeResults []metric.Update
  129. for _, namespace := range namespaces {
  130. namespaceInfo := map[string]string{
  131. source.NamespaceLabel: namespace.Name,
  132. }
  133. // namespace labels
  134. labelNames, labelValues := promutil.KubeLabelsToLabels(namespace.Labels)
  135. namespaceLabels := util.ToMap(labelNames, labelValues)
  136. scrapeResults = append(scrapeResults, metric.Update{
  137. Name: metric.KubeNamespaceLabels,
  138. Labels: namespaceInfo,
  139. Value: 0,
  140. AdditionalInfo: namespaceLabels,
  141. })
  142. // namespace annotations
  143. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(namespace.Annotations)
  144. namespaceAnnotations := util.ToMap(annotationNames, annotationValues)
  145. scrapeResults = append(scrapeResults, metric.Update{
  146. Name: metric.KubeNamespaceAnnotations,
  147. Labels: namespaceInfo,
  148. Value: 0,
  149. AdditionalInfo: namespaceAnnotations,
  150. })
  151. }
  152. return scrapeResults
  153. }
  154. func (ccs *ClusterCacheScraper) ScrapePods() []metric.Update {
  155. pods := ccs.clusterCache.GetAllPods()
  156. return ccs.scrapePods(pods)
  157. }
  158. func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Update {
  159. var scrapeResults []metric.Update
  160. for _, pod := range pods {
  161. podInfo := map[string]string{
  162. source.PodLabel: pod.Name,
  163. source.NamespaceLabel: pod.Namespace,
  164. source.UIDLabel: string(pod.UID),
  165. source.NodeLabel: pod.Spec.NodeName,
  166. source.InstanceLabel: pod.Spec.NodeName,
  167. }
  168. // pod labels
  169. labelNames, labelValues := promutil.KubeLabelsToLabels(pod.Labels)
  170. podLabels := util.ToMap(labelNames, labelValues)
  171. scrapeResults = append(scrapeResults, metric.Update{
  172. Name: metric.KubePodLabels,
  173. Labels: podInfo,
  174. Value: 0,
  175. AdditionalInfo: podLabels,
  176. })
  177. // pod annotations
  178. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(pod.Annotations)
  179. podAnnotations := util.ToMap(annotationNames, annotationValues)
  180. scrapeResults = append(scrapeResults, metric.Update{
  181. Name: metric.KubePodAnnotations,
  182. Labels: podInfo,
  183. Value: 0,
  184. AdditionalInfo: podAnnotations,
  185. })
  186. // Pod owner metric
  187. for _, owner := range pod.OwnerReferences {
  188. ownerInfo := maps.Clone(podInfo)
  189. ownerInfo[source.OwnerKindLabel] = owner.Kind
  190. ownerInfo[source.OwnerNameLabel] = owner.Name
  191. scrapeResults = append(scrapeResults, metric.Update{
  192. Name: metric.KubePodOwner,
  193. Labels: ownerInfo,
  194. Value: 0,
  195. })
  196. }
  197. // Container Status
  198. for _, status := range pod.Status.ContainerStatuses {
  199. if status.State.Running != nil {
  200. containerInfo := maps.Clone(podInfo)
  201. containerInfo[source.ContainerLabel] = status.Name
  202. scrapeResults = append(scrapeResults, metric.Update{
  203. Name: metric.KubePodContainerStatusRunning,
  204. Labels: containerInfo,
  205. Value: 0,
  206. })
  207. }
  208. }
  209. for _, container := range pod.Spec.Containers {
  210. containerInfo := maps.Clone(podInfo)
  211. containerInfo[source.ContainerLabel] = container.Name
  212. // Requests
  213. if container.Resources.Requests != nil {
  214. // sorting keys here for testing purposes
  215. keys := maps.Keys(container.Resources.Requests)
  216. slices.Sort(keys)
  217. for _, resourceName := range keys {
  218. quantity := container.Resources.Requests[resourceName]
  219. resource, unit, value := toResourceUnitValue(resourceName, quantity)
  220. // failed to parse the resource type
  221. if resource == "" {
  222. log.DedupedWarningf(5, "Failed to parse resource units and quantity for resource: %s", resourceName)
  223. continue
  224. }
  225. resourceRequestInfo := maps.Clone(containerInfo)
  226. resourceRequestInfo[source.ResourceLabel] = resource
  227. resourceRequestInfo[source.UnitLabel] = unit
  228. scrapeResults = append(scrapeResults, metric.Update{
  229. Name: metric.KubePodContainerResourceRequests,
  230. Labels: resourceRequestInfo,
  231. Value: value,
  232. })
  233. }
  234. }
  235. }
  236. }
  237. return scrapeResults
  238. }
  239. func (ccs *ClusterCacheScraper) ScrapePVCs() []metric.Update {
  240. pvcs := ccs.clusterCache.GetAllPersistentVolumeClaims()
  241. return ccs.scrapePVCs(pvcs)
  242. }
  243. func (ccs *ClusterCacheScraper) scrapePVCs(pvcs []*clustercache.PersistentVolumeClaim) []metric.Update {
  244. var scrapeResults []metric.Update
  245. for _, pvc := range pvcs {
  246. pvcInfo := map[string]string{
  247. source.PVCLabel: pvc.Name,
  248. source.NamespaceLabel: pvc.Namespace,
  249. source.VolumeNameLabel: pvc.Spec.VolumeName,
  250. source.StorageClassLabel: getPersistentVolumeClaimClass(pvc),
  251. }
  252. scrapeResults = append(scrapeResults, metric.Update{
  253. Name: metric.KubePersistentVolumeClaimInfo,
  254. Labels: pvcInfo,
  255. Value: 0,
  256. })
  257. if storage, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage]; ok {
  258. scrapeResults = append(scrapeResults, metric.Update{
  259. Name: metric.KubePersistentVolumeClaimResourceRequestsStorageBytes,
  260. Labels: pvcInfo,
  261. Value: float64(storage.Value()),
  262. })
  263. }
  264. }
  265. return scrapeResults
  266. }
  267. func (ccs *ClusterCacheScraper) ScrapePVs() []metric.Update {
  268. pvs := ccs.clusterCache.GetAllPersistentVolumes()
  269. return ccs.scrapePVs(pvs)
  270. }
  271. func (ccs *ClusterCacheScraper) scrapePVs(pvs []*clustercache.PersistentVolume) []metric.Update {
  272. var scrapeResults []metric.Update
  273. for _, pv := range pvs {
  274. providerID := pv.Name
  275. // if a more accurate provider ID is available, use that
  276. if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
  277. providerID = pv.Spec.CSI.VolumeHandle
  278. }
  279. pvInfo := map[string]string{
  280. source.PVLabel: pv.Name,
  281. source.StorageClassLabel: pv.Spec.StorageClassName,
  282. source.ProviderIDLabel: providerID,
  283. }
  284. scrapeResults = append(scrapeResults, metric.Update{
  285. Name: metric.KubecostPVInfo,
  286. Labels: pvInfo,
  287. Value: 0,
  288. })
  289. if storage, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok {
  290. scrapeResults = append(scrapeResults, metric.Update{
  291. Name: metric.KubePersistentVolumeCapacityBytes,
  292. Labels: pvInfo,
  293. Value: float64(storage.Value()),
  294. })
  295. }
  296. }
  297. return scrapeResults
  298. }
  299. func (ccs *ClusterCacheScraper) ScrapeServices() []metric.Update {
  300. services := ccs.clusterCache.GetAllServices()
  301. return ccs.scrapeServices(services)
  302. }
  303. func (ccs *ClusterCacheScraper) scrapeServices(services []*clustercache.Service) []metric.Update {
  304. var scrapeResults []metric.Update
  305. for _, service := range services {
  306. serviceInfo := map[string]string{
  307. source.ServiceLabel: service.Name,
  308. source.NamespaceLabel: service.Namespace,
  309. }
  310. // service labels
  311. labelNames, labelValues := promutil.KubeLabelsToLabels(service.SpecSelector)
  312. serviceLabels := util.ToMap(labelNames, labelValues)
  313. scrapeResults = append(scrapeResults, metric.Update{
  314. Name: metric.ServiceSelectorLabels,
  315. Labels: serviceInfo,
  316. Value: 0,
  317. AdditionalInfo: serviceLabels,
  318. })
  319. }
  320. return scrapeResults
  321. }
  322. func (ccs *ClusterCacheScraper) ScrapeStatefulSets() []metric.Update {
  323. statefulSets := ccs.clusterCache.GetAllStatefulSets()
  324. return ccs.scrapeStatefulSets(statefulSets)
  325. }
  326. func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet) []metric.Update {
  327. var scrapeResults []metric.Update
  328. for _, statefulSet := range statefulSets {
  329. statefulSetInfo := map[string]string{
  330. source.StatefulSetLabel: statefulSet.Name,
  331. source.NamespaceLabel: statefulSet.Namespace,
  332. }
  333. // statefulSet labels
  334. labelNames, labelValues := promutil.KubeLabelsToLabels(statefulSet.SpecSelector.MatchLabels)
  335. statefulSetLabels := util.ToMap(labelNames, labelValues)
  336. scrapeResults = append(scrapeResults, metric.Update{
  337. Name: metric.StatefulSetMatchLabels,
  338. Labels: statefulSetInfo,
  339. Value: 0,
  340. AdditionalInfo: statefulSetLabels,
  341. })
  342. }
  343. return scrapeResults
  344. }
  345. func (ccs *ClusterCacheScraper) ScrapeReplicaSets() []metric.Update {
  346. replicaSets := ccs.clusterCache.GetAllReplicaSets()
  347. return ccs.scrapeReplicaSets(replicaSets)
  348. }
  349. func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.ReplicaSet) []metric.Update {
  350. var scrapeResults []metric.Update
  351. for _, replicaSet := range replicaSets {
  352. replicaSetInfo := map[string]string{
  353. source.ReplicaSetLabel: replicaSet.Name,
  354. source.NamespaceLabel: replicaSet.Namespace,
  355. }
  356. for _, owner := range replicaSet.OwnerReferences {
  357. ownerInfo := maps.Clone(replicaSetInfo)
  358. ownerInfo[source.OwnerKindLabel] = owner.Kind
  359. ownerInfo[source.OwnerNameLabel] = owner.Name
  360. scrapeResults = append(scrapeResults, metric.Update{
  361. Name: metric.KubeReplicasetOwner,
  362. Labels: ownerInfo,
  363. Value: 0,
  364. })
  365. }
  366. }
  367. return scrapeResults
  368. }
  369. // getPersistentVolumeClaimClass returns StorageClassName. If no storage class was
  370. // requested, it returns "".
  371. func getPersistentVolumeClaimClass(claim *clustercache.PersistentVolumeClaim) string {
  372. // Use beta annotation first
  373. if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found {
  374. return class
  375. }
  376. if claim.Spec.StorageClassName != nil {
  377. return *claim.Spec.StorageClassName
  378. }
  379. // Special non-empty string to indicate absence of storage class.
  380. return ""
  381. }
  382. // toResourceUnitValue accepts a resource name and quantity and returns the sanitized resource, the unit, and the value in the units.
  383. // Returns an empty string for resource and unit if there was a failure.
  384. func toResourceUnitValue(resourceName v1.ResourceName, quantity resource.Quantity) (resource string, unit string, value float64) {
  385. resource = promutil.SanitizeLabelName(string(resourceName))
  386. switch resourceName {
  387. case v1.ResourceCPU:
  388. unit = "core"
  389. value = float64(quantity.MilliValue()) / 1000
  390. return
  391. case v1.ResourceStorage:
  392. fallthrough
  393. case v1.ResourceEphemeralStorage:
  394. fallthrough
  395. case v1.ResourceMemory:
  396. unit = "byte"
  397. value = float64(quantity.Value())
  398. return
  399. case v1.ResourcePods:
  400. unit = "integer"
  401. value = float64(quantity.Value())
  402. return
  403. default:
  404. if isHugePageResourceName(resourceName) || isAttachableVolumeResourceName(resourceName) {
  405. unit = "byte"
  406. value = float64(quantity.Value())
  407. return
  408. }
  409. if isExtendedResourceName(resourceName) {
  410. unit = "integer"
  411. value = float64(quantity.Value())
  412. return
  413. }
  414. }
  415. resource = ""
  416. unit = ""
  417. value = 0.0
  418. return
  419. }
  420. // isHugePageResourceName checks for a huge page container resource name
  421. func isHugePageResourceName(name v1.ResourceName) bool {
  422. return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix)
  423. }
  424. // isAttachableVolumeResourceName checks for attached volume container resource name
  425. func isAttachableVolumeResourceName(name v1.ResourceName) bool {
  426. return strings.HasPrefix(string(name), v1.ResourceAttachableVolumesPrefix)
  427. }
  428. // isExtendedResourceName checks for extended container resource name
  429. func isExtendedResourceName(name v1.ResourceName) bool {
  430. if isNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) {
  431. return false
  432. }
  433. // Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name
  434. nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name))
  435. if errs := validation.IsQualifiedName(nameForQuota); len(errs) != 0 {
  436. return false
  437. }
  438. return true
  439. }
  440. // isNativeResource checks for a kubernetes.io/ prefixed resource name
  441. func isNativeResource(name v1.ResourceName) bool {
  442. return !strings.Contains(string(name), "/") || isPrefixedNativeResource(name)
  443. }
  444. func isPrefixedNativeResource(name v1.ResourceName) bool {
  445. return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix)
  446. }