clustercache.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. package scrape
  2. import (
  3. "fmt"
  4. "slices"
  5. "strings"
  6. "github.com/opencost/opencost/core/pkg/clustercache"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/core/pkg/util/promutil"
  10. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  11. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  12. "golang.org/x/exp/maps"
  13. v1 "k8s.io/api/core/v1"
  14. "k8s.io/apimachinery/pkg/api/resource"
  15. "k8s.io/apimachinery/pkg/util/validation"
  16. )
  17. // Cluster Cache Metrics
  18. const (
  19. KubeNodeStatusCapacityCPUCores = "kube_node_status_capacity_cpu_cores"
  20. KubeNodeStatusCapacityMemoryBytes = "kube_node_status_capacity_memory_bytes"
  21. KubeNodeStatusAllocatableCPUCores = "kube_node_status_allocatable_cpu_cores"
  22. KubeNodeStatusAllocatableMemoryBytes = "kube_node_status_allocatable_memory_bytes"
  23. KubeNodeLabels = "kube_node_labels"
  24. KubePodLabels = "kube_pod_labels"
  25. KubePodAnnotations = "kube_pod_annotations"
  26. KubePodOwner = "kube_pod_owner"
  27. KubePodContainerStatusRunning = "kube_pod_container_status_running"
  28. KubePodContainerResourceRequests = "kube_pod_container_resource_requests"
  29. KubePersistentVolumeClaimInfo = "kube_persistentvolumeclaim_info"
  30. KubePersistentVolumeClaimResourceRequestsStorageBytes = "kube_persistentvolumeclaim_resource_requests_storage_bytes"
  31. KubecostPVInfo = "kubecost_pv_info"
  32. KubePersistentVolumeCapacityBytes = "kube_persistentvolume_capacity_bytes"
  33. DeploymentMatchLabels = "deployment_match_labels"
  34. KubeNamespaceLabels = "kube_namespace_labels"
  35. KubeNamespaceAnnotations = "kube_namespace_annotations"
  36. ServiceSelectorLabels = "service_selector_labels"
  37. StatefulSetMatchLabels = "statefulSet_match_labels"
  38. KubeReplicasetOwner = "kube_replicaset_owner"
  39. )
  40. type ClusterCacheScraper struct {
  41. clusterCache clustercache.ClusterCache
  42. }
  43. func newClusterCacheScraper(clusterCache clustercache.ClusterCache) Scraper {
  44. return &ClusterCacheScraper{
  45. clusterCache: clusterCache,
  46. }
  47. }
  48. func (ccs *ClusterCacheScraper) Scrape() []metric.Update {
  49. scrapeFuncs := []ScrapeFunc{
  50. ccs.ScrapeNodes,
  51. ccs.ScrapeDeployments,
  52. ccs.ScrapeNamespaces,
  53. ccs.ScrapePods,
  54. ccs.ScrapePVCs,
  55. ccs.ScrapePVs,
  56. ccs.ScrapeServices,
  57. ccs.ScrapeStatefulSets,
  58. ccs.ScrapeReplicaSets,
  59. }
  60. return concurrentScrape(scrapeFuncs...)
  61. }
  62. func (ccs *ClusterCacheScraper) ScrapeNodes() []metric.Update {
  63. nodes := ccs.clusterCache.GetAllNodes()
  64. return ccs.scrapeNodes(nodes)
  65. }
  66. func (ccs *ClusterCacheScraper) scrapeNodes(nodes []*clustercache.Node) []metric.Update {
  67. var scrapeResults []metric.Update
  68. for _, node := range nodes {
  69. nodeInfo := map[string]string{
  70. source.NodeLabel: node.Name,
  71. source.ProviderIDLabel: node.SpecProviderID,
  72. }
  73. // Node Capacity
  74. if node.Status.Capacity != nil {
  75. if quantity, ok := node.Status.Capacity[v1.ResourceCPU]; ok {
  76. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  77. scrapeResults = append(scrapeResults, metric.Update{
  78. Name: KubeNodeStatusCapacityCPUCores,
  79. Labels: nodeInfo,
  80. Value: value,
  81. })
  82. }
  83. if quantity, ok := node.Status.Capacity[v1.ResourceMemory]; ok {
  84. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  85. scrapeResults = append(scrapeResults, metric.Update{
  86. Name: KubeNodeStatusCapacityMemoryBytes,
  87. Labels: nodeInfo,
  88. Value: value,
  89. })
  90. }
  91. }
  92. // Node Allocatable Resources
  93. if node.Status.Allocatable != nil {
  94. if quantity, ok := node.Status.Allocatable[v1.ResourceCPU]; ok {
  95. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  96. scrapeResults = append(scrapeResults, metric.Update{
  97. Name: KubeNodeStatusAllocatableCPUCores,
  98. Labels: nodeInfo,
  99. Value: value,
  100. })
  101. }
  102. if quantity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok {
  103. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  104. scrapeResults = append(scrapeResults, metric.Update{
  105. Name: KubeNodeStatusAllocatableMemoryBytes,
  106. Labels: nodeInfo,
  107. Value: value,
  108. })
  109. }
  110. }
  111. // node labels
  112. labelNames, labelValues := promutil.KubeLabelsToLabels(node.Labels)
  113. nodeLabels := util.ToMap(labelNames, labelValues)
  114. scrapeResults = append(scrapeResults, metric.Update{
  115. Name: KubeNodeLabels,
  116. Labels: nodeInfo,
  117. Value: 0,
  118. AdditionalInfo: nodeLabels,
  119. })
  120. }
  121. return scrapeResults
  122. }
  123. func (ccs *ClusterCacheScraper) ScrapeDeployments() []metric.Update {
  124. deployments := ccs.clusterCache.GetAllDeployments()
  125. return ccs.scrapeDeployments(deployments)
  126. }
  127. func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.Deployment) []metric.Update {
  128. var scrapeResults []metric.Update
  129. for _, deployment := range deployments {
  130. deploymentInfo := map[string]string{
  131. source.DeploymentLabel: deployment.Name,
  132. source.NamespaceLabel: deployment.Namespace,
  133. }
  134. // deployment labels
  135. labelNames, labelValues := promutil.KubeLabelsToLabels(deployment.MatchLabels)
  136. deploymentLabels := util.ToMap(labelNames, labelValues)
  137. scrapeResults = append(scrapeResults, metric.Update{
  138. Name: DeploymentMatchLabels,
  139. Labels: deploymentInfo,
  140. Value: 0,
  141. AdditionalInfo: deploymentLabels,
  142. })
  143. }
  144. return scrapeResults
  145. }
  146. func (ccs *ClusterCacheScraper) ScrapeNamespaces() []metric.Update {
  147. namespaces := ccs.clusterCache.GetAllNamespaces()
  148. return ccs.scrapeNamespaces(namespaces)
  149. }
  150. func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Namespace) []metric.Update {
  151. var scrapeResults []metric.Update
  152. for _, namespace := range namespaces {
  153. namespaceInfo := map[string]string{
  154. source.NamespaceLabel: namespace.Name,
  155. }
  156. // namespace labels
  157. labelNames, labelValues := promutil.KubeLabelsToLabels(namespace.Labels)
  158. namespaceLabels := util.ToMap(labelNames, labelValues)
  159. scrapeResults = append(scrapeResults, metric.Update{
  160. Name: KubeNamespaceLabels,
  161. Labels: namespaceInfo,
  162. Value: 0,
  163. AdditionalInfo: namespaceLabels,
  164. })
  165. // namespace annotations
  166. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(namespace.Annotations)
  167. namespaceAnnotations := util.ToMap(annotationNames, annotationValues)
  168. scrapeResults = append(scrapeResults, metric.Update{
  169. Name: KubeNamespaceAnnotations,
  170. Labels: namespaceInfo,
  171. Value: 0,
  172. AdditionalInfo: namespaceAnnotations,
  173. })
  174. }
  175. return scrapeResults
  176. }
  177. func (ccs *ClusterCacheScraper) ScrapePods() []metric.Update {
  178. pods := ccs.clusterCache.GetAllPods()
  179. return ccs.scrapePods(pods)
  180. }
  181. func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Update {
  182. var scrapeResults []metric.Update
  183. for _, pod := range pods {
  184. podInfo := map[string]string{
  185. source.PodLabel: pod.Name,
  186. source.NamespaceLabel: pod.Namespace,
  187. source.UIDLabel: string(pod.UID),
  188. source.NodeLabel: pod.Spec.NodeName,
  189. source.InstanceLabel: pod.Spec.NodeName,
  190. }
  191. // pod labels
  192. labelNames, labelValues := promutil.KubeLabelsToLabels(pod.Labels)
  193. podLabels := util.ToMap(labelNames, labelValues)
  194. scrapeResults = append(scrapeResults, metric.Update{
  195. Name: KubePodLabels,
  196. Labels: podInfo,
  197. Value: 0,
  198. AdditionalInfo: podLabels,
  199. })
  200. // pod annotations
  201. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(pod.Annotations)
  202. podAnnotations := util.ToMap(annotationNames, annotationValues)
  203. scrapeResults = append(scrapeResults, metric.Update{
  204. Name: KubePodAnnotations,
  205. Labels: podInfo,
  206. Value: 0,
  207. AdditionalInfo: podAnnotations,
  208. })
  209. // Pod owner metric
  210. for _, owner := range pod.OwnerReferences {
  211. ownerInfo := maps.Clone(podInfo)
  212. ownerInfo[source.OwnerKindLabel] = owner.Kind
  213. ownerInfo[source.OwnerNameLabel] = owner.Name
  214. scrapeResults = append(scrapeResults, metric.Update{
  215. Name: KubePodOwner,
  216. Labels: ownerInfo,
  217. Value: 0,
  218. })
  219. }
  220. // Container Status
  221. for _, status := range pod.Status.ContainerStatuses {
  222. if status.State.Running != nil {
  223. containerInfo := maps.Clone(podInfo)
  224. containerInfo[source.ContainerLabel] = status.Name
  225. scrapeResults = append(scrapeResults, metric.Update{
  226. Name: KubePodContainerStatusRunning,
  227. Labels: containerInfo,
  228. Value: 0,
  229. })
  230. }
  231. }
  232. for _, container := range pod.Spec.Containers {
  233. containerInfo := maps.Clone(podInfo)
  234. containerInfo[source.ContainerLabel] = container.Name
  235. // Requests
  236. if container.Resources.Requests != nil {
  237. // sorting keys here for testing purposes
  238. keys := maps.Keys(container.Resources.Requests)
  239. slices.Sort(keys)
  240. for _, resourceName := range keys {
  241. quantity := container.Resources.Requests[resourceName]
  242. resource, unit, value := toResourceUnitValue(resourceName, quantity)
  243. // failed to parse the resource type
  244. if resource == "" {
  245. log.DedupedWarningf(5, "Failed to parse resource units and quantity for resource: %s", resourceName)
  246. continue
  247. }
  248. resourceRequestInfo := maps.Clone(containerInfo)
  249. resourceRequestInfo[source.ResourceLabel] = resource
  250. resourceRequestInfo[source.UnitLabel] = unit
  251. scrapeResults = append(scrapeResults, metric.Update{
  252. Name: KubePodContainerResourceRequests,
  253. Labels: resourceRequestInfo,
  254. Value: value,
  255. })
  256. }
  257. }
  258. }
  259. }
  260. return scrapeResults
  261. }
  262. func (ccs *ClusterCacheScraper) ScrapePVCs() []metric.Update {
  263. pvcs := ccs.clusterCache.GetAllPersistentVolumeClaims()
  264. return ccs.scrapePVCs(pvcs)
  265. }
  266. func (ccs *ClusterCacheScraper) scrapePVCs(pvcs []*clustercache.PersistentVolumeClaim) []metric.Update {
  267. var scrapeResults []metric.Update
  268. for _, pvc := range pvcs {
  269. pvcInfo := map[string]string{
  270. source.PVCLabel: pvc.Name,
  271. source.NamespaceLabel: pvc.Namespace,
  272. source.VolumeNameLabel: pvc.Spec.VolumeName,
  273. source.StorageClassLabel: getPersistentVolumeClaimClass(pvc),
  274. }
  275. scrapeResults = append(scrapeResults, metric.Update{
  276. Name: KubePersistentVolumeClaimInfo,
  277. Labels: pvcInfo,
  278. Value: 0,
  279. })
  280. if storage, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage]; ok {
  281. scrapeResults = append(scrapeResults, metric.Update{
  282. Name: KubePersistentVolumeClaimResourceRequestsStorageBytes,
  283. Labels: pvcInfo,
  284. Value: float64(storage.Value()),
  285. })
  286. }
  287. }
  288. return scrapeResults
  289. }
  290. func (ccs *ClusterCacheScraper) ScrapePVs() []metric.Update {
  291. pvs := ccs.clusterCache.GetAllPersistentVolumes()
  292. return ccs.scrapePVs(pvs)
  293. }
  294. func (ccs *ClusterCacheScraper) scrapePVs(pvs []*clustercache.PersistentVolume) []metric.Update {
  295. var scrapeResults []metric.Update
  296. for _, pv := range pvs {
  297. providerID := pv.Name
  298. // if a more accurate provider ID is available, use that
  299. if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
  300. providerID = pv.Spec.CSI.VolumeHandle
  301. }
  302. pvInfo := map[string]string{
  303. source.PVLabel: pv.Name,
  304. source.StorageClassLabel: pv.Spec.StorageClassName,
  305. source.ProviderIDLabel: providerID,
  306. }
  307. scrapeResults = append(scrapeResults, metric.Update{
  308. Name: KubecostPVInfo,
  309. Labels: pvInfo,
  310. Value: 0,
  311. })
  312. if storage, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok {
  313. scrapeResults = append(scrapeResults, metric.Update{
  314. Name: KubePersistentVolumeCapacityBytes,
  315. Labels: pvInfo,
  316. Value: float64(storage.Value()),
  317. })
  318. }
  319. }
  320. return scrapeResults
  321. }
  322. func (ccs *ClusterCacheScraper) ScrapeServices() []metric.Update {
  323. services := ccs.clusterCache.GetAllServices()
  324. return ccs.scrapeServices(services)
  325. }
  326. func (ccs *ClusterCacheScraper) scrapeServices(services []*clustercache.Service) []metric.Update {
  327. var scrapeResults []metric.Update
  328. for _, service := range services {
  329. serviceInfo := map[string]string{
  330. source.ServiceLabel: service.Name,
  331. source.NamespaceLabel: service.Namespace,
  332. }
  333. // service labels
  334. labelNames, labelValues := promutil.KubeLabelsToLabels(service.SpecSelector)
  335. serviceLabels := util.ToMap(labelNames, labelValues)
  336. scrapeResults = append(scrapeResults, metric.Update{
  337. Name: ServiceSelectorLabels,
  338. Labels: serviceInfo,
  339. Value: 0,
  340. AdditionalInfo: serviceLabels,
  341. })
  342. }
  343. return scrapeResults
  344. }
  345. func (ccs *ClusterCacheScraper) ScrapeStatefulSets() []metric.Update {
  346. statefulSets := ccs.clusterCache.GetAllStatefulSets()
  347. return ccs.scrapeStatefulSets(statefulSets)
  348. }
  349. func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet) []metric.Update {
  350. var scrapeResults []metric.Update
  351. for _, statefulSet := range statefulSets {
  352. statefulSetInfo := map[string]string{
  353. source.StatefulSetLabel: statefulSet.Name,
  354. source.NamespaceLabel: statefulSet.Namespace,
  355. }
  356. // statefulSet labels
  357. labelNames, labelValues := promutil.KubeLabelsToLabels(statefulSet.SpecSelector.MatchLabels)
  358. statefulSetLabels := util.ToMap(labelNames, labelValues)
  359. scrapeResults = append(scrapeResults, metric.Update{
  360. Name: StatefulSetMatchLabels,
  361. Labels: statefulSetInfo,
  362. Value: 0,
  363. AdditionalInfo: statefulSetLabels,
  364. })
  365. }
  366. return scrapeResults
  367. }
  368. func (ccs *ClusterCacheScraper) ScrapeReplicaSets() []metric.Update {
  369. replicaSets := ccs.clusterCache.GetAllReplicaSets()
  370. return ccs.scrapeReplicaSets(replicaSets)
  371. }
  372. func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.ReplicaSet) []metric.Update {
  373. var scrapeResults []metric.Update
  374. for _, replicaSet := range replicaSets {
  375. replicaSetInfo := map[string]string{
  376. source.ReplicaSetLabel: replicaSet.Name,
  377. source.NamespaceLabel: replicaSet.Namespace,
  378. }
  379. for _, owner := range replicaSet.OwnerReferences {
  380. ownerInfo := maps.Clone(replicaSetInfo)
  381. ownerInfo[source.OwnerKindLabel] = owner.Kind
  382. ownerInfo[source.OwnerNameLabel] = owner.Name
  383. scrapeResults = append(scrapeResults, metric.Update{
  384. Name: KubeReplicasetOwner,
  385. Labels: ownerInfo,
  386. Value: 0,
  387. })
  388. }
  389. }
  390. return scrapeResults
  391. }
  392. // getPersistentVolumeClaimClass returns StorageClassName. If no storage class was
  393. // requested, it returns "".
  394. func getPersistentVolumeClaimClass(claim *clustercache.PersistentVolumeClaim) string {
  395. // Use beta annotation first
  396. if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found {
  397. return class
  398. }
  399. if claim.Spec.StorageClassName != nil {
  400. return *claim.Spec.StorageClassName
  401. }
  402. // Special non-empty string to indicate absence of storage class.
  403. return ""
  404. }
  405. // toResourceUnitValue accepts a resource name and quantity and returns the sanitized resource, the unit, and the value in the units.
  406. // Returns an empty string for resource and unit if there was a failure.
  407. func toResourceUnitValue(resourceName v1.ResourceName, quantity resource.Quantity) (resource string, unit string, value float64) {
  408. resource = promutil.SanitizeLabelName(string(resourceName))
  409. switch resourceName {
  410. case v1.ResourceCPU:
  411. unit = "core"
  412. value = float64(quantity.MilliValue()) / 1000
  413. return
  414. case v1.ResourceStorage:
  415. fallthrough
  416. case v1.ResourceEphemeralStorage:
  417. fallthrough
  418. case v1.ResourceMemory:
  419. unit = "byte"
  420. value = float64(quantity.Value())
  421. return
  422. case v1.ResourcePods:
  423. unit = "integer"
  424. value = float64(quantity.Value())
  425. return
  426. default:
  427. if isHugePageResourceName(resourceName) || isAttachableVolumeResourceName(resourceName) {
  428. unit = "byte"
  429. value = float64(quantity.Value())
  430. return
  431. }
  432. if isExtendedResourceName(resourceName) {
  433. unit = "integer"
  434. value = float64(quantity.Value())
  435. return
  436. }
  437. }
  438. resource = ""
  439. unit = ""
  440. value = 0.0
  441. return
  442. }
  443. // isHugePageResourceName checks for a huge page container resource name
  444. func isHugePageResourceName(name v1.ResourceName) bool {
  445. return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix)
  446. }
  447. // isAttachableVolumeResourceName checks for attached volume container resource name
  448. func isAttachableVolumeResourceName(name v1.ResourceName) bool {
  449. return strings.HasPrefix(string(name), v1.ResourceAttachableVolumesPrefix)
  450. }
  451. // isExtendedResourceName checks for extended container resource name
  452. func isExtendedResourceName(name v1.ResourceName) bool {
  453. if isNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) {
  454. return false
  455. }
  456. // Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name
  457. nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name))
  458. if errs := validation.IsQualifiedName(nameForQuota); len(errs) != 0 {
  459. return false
  460. }
  461. return true
  462. }
  463. // isNativeResource checks for a kubernetes.io/ prefixed resource name
  464. func isNativeResource(name v1.ResourceName) bool {
  465. return !strings.Contains(string(name), "/") || isPrefixedNativeResource(name)
  466. }
  467. func isPrefixedNativeResource(name v1.ResourceName) bool {
  468. return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix)
  469. }