clustercache.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. package scrape
  2. import (
  3. "fmt"
  4. "slices"
  5. "strings"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/clustercache"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/source"
  10. "github.com/opencost/opencost/core/pkg/util/promutil"
  11. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  12. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  13. "golang.org/x/exp/maps"
  14. v1 "k8s.io/api/core/v1"
  15. "k8s.io/apimachinery/pkg/api/resource"
  16. "k8s.io/apimachinery/pkg/util/validation"
  17. )
  18. // Cluster Cache Metrics
  19. const (
  20. KubeNodeStatusCapacityCPUCores = "kube_node_status_capacity_cpu_cores"
  21. KubeNodeStatusCapacityMemoryBytes = "kube_node_status_capacity_memory_bytes"
  22. KubeNodeStatusAllocatableCPUCores = "kube_node_status_allocatable_cpu_cores"
  23. KubeNodeStatusAllocatableMemoryBytes = "kube_node_status_allocatable_memory_bytes"
  24. KubeNodeLabels = "kube_node_labels"
  25. KubePodLabels = "kube_pod_labels"
  26. KubePodAnnotations = "kube_pod_annotations"
  27. KubePodOwner = "kube_pod_owner"
  28. KubePodContainerStatusRunning = "kube_pod_container_status_running"
  29. KubePodContainerResourceRequests = "kube_pod_container_resource_requests"
  30. KubePersistentVolumeClaimInfo = "kube_persistentvolumeclaim_info"
  31. KubePersistentVolumeClaimResourceRequestsStorageBytes = "kube_persistentvolumeclaim_resource_requests_storage_bytes"
  32. KubecostPVInfo = "kubecost_pv_info"
  33. KubePersistentVolumeCapacityBytes = "kube_persistentvolume_capacity_bytes"
  34. DeploymentMatchLabels = "deployment_match_labels"
  35. KubeNamespaceLabels = "kube_namespace_labels"
  36. KubeNamespaceAnnotations = "kube_namespace_annotations"
  37. ServiceSelectorLabels = "service_selector_labels"
  38. StatefulSetMatchLabels = "statefulSet_match_labels"
  39. KubeReplicasetOwner = "kube_replicaset_owner"
  40. )
  41. type ClusterCacheScraper struct {
  42. clusterCache clustercache.ClusterCache
  43. updater metric.MetricUpdater
  44. }
  45. func newClusterCacheScraper(clusterCache clustercache.ClusterCache, updater metric.MetricUpdater) Scraper {
  46. return &ClusterCacheScraper{
  47. clusterCache: clusterCache,
  48. updater: updater,
  49. }
  50. }
  51. func (ccs *ClusterCacheScraper) Scrape() {
  52. timestamp := time.Now().UTC()
  53. nodes := ccs.clusterCache.GetAllNodes()
  54. deployments := ccs.clusterCache.GetAllDeployments()
  55. namespaces := ccs.clusterCache.GetAllNamespaces()
  56. pods := ccs.clusterCache.GetAllPods()
  57. pvcs := ccs.clusterCache.GetAllPersistentVolumeClaims()
  58. pvs := ccs.clusterCache.GetAllPersistentVolumes()
  59. services := ccs.clusterCache.GetAllServices()
  60. statefulSets := ccs.clusterCache.GetAllStatefulSets()
  61. replicaSets := ccs.clusterCache.GetAllReplicaSets()
  62. ccs.scrapeNodes(nodes, timestamp)
  63. ccs.scrapeDeployments(deployments, timestamp)
  64. ccs.scrapeNamespaces(namespaces, timestamp)
  65. ccs.scrapePods(pods, timestamp)
  66. ccs.scrapePVCs(pvcs, timestamp)
  67. ccs.scrapePVs(pvs, timestamp)
  68. ccs.scrapeServices(services, timestamp)
  69. ccs.scrapeStatefulSets(statefulSets, timestamp)
  70. ccs.scrapeReplicaSets(replicaSets, timestamp)
  71. }
  72. func (ccs *ClusterCacheScraper) scrapeNodes(nodes []*clustercache.Node, timestamp time.Time) {
  73. for _, node := range nodes {
  74. nodeInfo := map[string]string{
  75. source.NodeLabel: node.Name,
  76. source.ProviderIDLabel: node.SpecProviderID,
  77. }
  78. // Node Capacity
  79. if node.Status.Capacity != nil {
  80. if quantity, ok := node.Status.Capacity[v1.ResourceCPU]; ok {
  81. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  82. ccs.updater.Update(KubeNodeStatusCapacityCPUCores, nodeInfo, value, &timestamp, nil)
  83. }
  84. if quantity, ok := node.Status.Capacity[v1.ResourceMemory]; ok {
  85. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  86. ccs.updater.Update(KubeNodeStatusCapacityMemoryBytes, nodeInfo, value, &timestamp, nil)
  87. }
  88. }
  89. // Node Allocatable Resources
  90. if node.Status.Allocatable != nil {
  91. if quantity, ok := node.Status.Allocatable[v1.ResourceCPU]; ok {
  92. _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
  93. ccs.updater.Update(KubeNodeStatusAllocatableCPUCores, nodeInfo, value, &timestamp, nil)
  94. }
  95. if quantity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok {
  96. _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
  97. ccs.updater.Update(KubeNodeStatusAllocatableMemoryBytes, nodeInfo, value, &timestamp, nil)
  98. }
  99. }
  100. // node labels
  101. labelNames, labelValues := promutil.KubeLabelsToLabels(node.Labels)
  102. nodeLabels := util.ToMap(labelNames, labelValues)
  103. ccs.updater.Update(KubeNodeLabels, nodeInfo, 0, &timestamp, nodeLabels)
  104. }
  105. }
  106. func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.Deployment, timestamp time.Time) {
  107. for _, deployment := range deployments {
  108. deploymentInfo := map[string]string{
  109. source.DeploymentLabel: deployment.Name,
  110. source.NamespaceLabel: deployment.Namespace,
  111. }
  112. // deployment labels
  113. labelNames, labelValues := promutil.KubeLabelsToLabels(deployment.MatchLabels)
  114. deploymentLabels := util.ToMap(labelNames, labelValues)
  115. ccs.updater.Update(DeploymentMatchLabels, deploymentInfo, 0, &timestamp, deploymentLabels)
  116. }
  117. }
  118. func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Namespace, timestamp time.Time) {
  119. for _, namespace := range namespaces {
  120. namespaceInfo := map[string]string{
  121. source.NamespaceLabel: namespace.Name,
  122. }
  123. // namespace labels
  124. labelNames, labelValues := promutil.KubeLabelsToLabels(namespace.Labels)
  125. namespaceLabels := util.ToMap(labelNames, labelValues)
  126. ccs.updater.Update(KubeNamespaceLabels, namespaceInfo, 0, &timestamp, namespaceLabels)
  127. // namespace annotations
  128. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(namespace.Annotations)
  129. namespaceAnnotations := util.ToMap(annotationNames, annotationValues)
  130. ccs.updater.Update(KubeNamespaceAnnotations, namespaceInfo, 0, &timestamp, namespaceAnnotations)
  131. }
  132. }
  133. func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod, timestamp time.Time) {
  134. for _, pod := range pods {
  135. podInfo := map[string]string{
  136. source.PodLabel: pod.Name,
  137. source.NamespaceLabel: pod.Namespace,
  138. source.UIDLabel: string(pod.UID),
  139. source.NodeLabel: pod.Spec.NodeName,
  140. source.InstanceLabel: pod.Spec.NodeName,
  141. }
  142. // pod labels
  143. labelNames, labelValues := promutil.KubeLabelsToLabels(pod.Labels)
  144. podLabels := util.ToMap(labelNames, labelValues)
  145. ccs.updater.Update(KubePodLabels, podInfo, 0, &timestamp, podLabels)
  146. // pod annotations
  147. annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(pod.Annotations)
  148. podAnnotations := util.ToMap(annotationNames, annotationValues)
  149. ccs.updater.Update(KubePodAnnotations, podInfo, 0, &timestamp, podAnnotations)
  150. // Pod owner metric
  151. for _, owner := range pod.OwnerReferences {
  152. ownerInfo := maps.Clone(podInfo)
  153. ownerInfo[source.OwnerKindLabel] = owner.Kind
  154. ownerInfo[source.OwnerNameLabel] = owner.Name
  155. ccs.updater.Update(KubePodOwner, ownerInfo, 0, &timestamp, nil)
  156. }
  157. // Container Status
  158. for _, status := range pod.Status.ContainerStatuses {
  159. if status.State.Running != nil {
  160. containerInfo := maps.Clone(podInfo)
  161. containerInfo[source.ContainerLabel] = status.Name
  162. ccs.updater.Update(KubePodContainerStatusRunning, containerInfo, 0, &timestamp, nil)
  163. }
  164. }
  165. for _, container := range pod.Spec.Containers {
  166. containerInfo := maps.Clone(podInfo)
  167. containerInfo[source.ContainerLabel] = container.Name
  168. // Requests
  169. if container.Resources.Requests != nil {
  170. // sorting keys here for testing purposes
  171. keys := maps.Keys(container.Resources.Requests)
  172. slices.Sort(keys)
  173. for _, resourceName := range keys {
  174. quantity := container.Resources.Requests[resourceName]
  175. resource, unit, value := toResourceUnitValue(resourceName, quantity)
  176. // failed to parse the resource type
  177. if resource == "" {
  178. log.DedupedWarningf(5, "Failed to parse resource units and quantity for resource: %s", resourceName)
  179. continue
  180. }
  181. resourceRequestInfo := maps.Clone(containerInfo)
  182. resourceRequestInfo[source.ResourceLabel] = resource
  183. resourceRequestInfo[source.UnitLabel] = unit
  184. ccs.updater.Update(KubePodContainerResourceRequests, resourceRequestInfo, value, &timestamp, nil)
  185. }
  186. }
  187. }
  188. }
  189. }
  190. func (ccs *ClusterCacheScraper) scrapePVCs(pvcs []*clustercache.PersistentVolumeClaim, timestamp time.Time) {
  191. for _, pvc := range pvcs {
  192. pvcInfo := map[string]string{
  193. source.PVCLabel: pvc.Name,
  194. source.NamespaceLabel: pvc.Namespace,
  195. source.VolumeNameLabel: pvc.Spec.VolumeName,
  196. source.StorageClassLabel: getPersistentVolumeClaimClass(pvc),
  197. }
  198. ccs.updater.Update(KubePersistentVolumeClaimInfo, pvcInfo, 0, &timestamp, nil)
  199. if storage, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage]; ok {
  200. ccs.updater.Update(KubePersistentVolumeClaimResourceRequestsStorageBytes, pvcInfo, float64(storage.Value()), &timestamp, nil)
  201. }
  202. }
  203. }
  204. func (ccs *ClusterCacheScraper) scrapePVs(pvs []*clustercache.PersistentVolume, timestamp time.Time) {
  205. for _, pv := range pvs {
  206. providerID := pv.Name
  207. // if a more accurate provider ID is available, use that
  208. if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
  209. providerID = pv.Spec.CSI.VolumeHandle
  210. }
  211. pvInfo := map[string]string{
  212. source.PVLabel: pv.Name,
  213. source.StorageClassLabel: pv.Spec.StorageClassName,
  214. source.ProviderIDLabel: providerID,
  215. }
  216. ccs.updater.Update(KubecostPVInfo, pvInfo, 0, &timestamp, nil)
  217. if storage, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok {
  218. ccs.updater.Update(KubePersistentVolumeCapacityBytes, pvInfo, float64(storage.Value()), &timestamp, nil)
  219. }
  220. }
  221. }
  222. func (ccs *ClusterCacheScraper) scrapeServices(services []*clustercache.Service, timestamp time.Time) {
  223. for _, service := range services {
  224. serviceInfo := map[string]string{
  225. source.ServiceLabel: service.Name,
  226. source.NamespaceLabel: service.Namespace,
  227. }
  228. // service labels
  229. labelNames, labelValues := promutil.KubeLabelsToLabels(service.SpecSelector)
  230. serviceLabels := util.ToMap(labelNames, labelValues)
  231. ccs.updater.Update(ServiceSelectorLabels, serviceInfo, 0, &timestamp, serviceLabels)
  232. }
  233. }
  234. func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet, timestamp time.Time) {
  235. for _, statefulSet := range statefulSets {
  236. statefulSetInfo := map[string]string{
  237. source.StatefulSetLabel: statefulSet.Name,
  238. source.NamespaceLabel: statefulSet.Namespace,
  239. }
  240. // statefulSet labels
  241. labelNames, labelValues := promutil.KubeLabelsToLabels(statefulSet.SpecSelector.MatchLabels)
  242. statefulSetLabels := util.ToMap(labelNames, labelValues)
  243. ccs.updater.Update(StatefulSetMatchLabels, statefulSetInfo, 0, &timestamp, statefulSetLabels)
  244. }
  245. }
  246. func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.ReplicaSet, timestamp time.Time) {
  247. for _, replicaSet := range replicaSets {
  248. replicaSetInfo := map[string]string{
  249. source.ReplicaSetLabel: replicaSet.Name,
  250. source.NamespaceLabel: replicaSet.Namespace,
  251. }
  252. for _, owner := range replicaSet.OwnerReferences {
  253. ownerInfo := maps.Clone(replicaSetInfo)
  254. ownerInfo[source.OwnerKindLabel] = owner.Kind
  255. ownerInfo[source.OwnerNameLabel] = owner.Name
  256. ccs.updater.Update(KubeReplicasetOwner, ownerInfo, 0, &timestamp, nil)
  257. }
  258. }
  259. }
  260. // getPersistentVolumeClaimClass returns StorageClassName. If no storage class was
  261. // requested, it returns "".
  262. func getPersistentVolumeClaimClass(claim *clustercache.PersistentVolumeClaim) string {
  263. // Use beta annotation first
  264. if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found {
  265. return class
  266. }
  267. if claim.Spec.StorageClassName != nil {
  268. return *claim.Spec.StorageClassName
  269. }
  270. // Special non-empty string to indicate absence of storage class.
  271. return ""
  272. }
  273. // toResourceUnitValue accepts a resource name and quantity and returns the sanitized resource, the unit, and the value in the units.
  274. // Returns an empty string for resource and unit if there was a failure.
  275. func toResourceUnitValue(resourceName v1.ResourceName, quantity resource.Quantity) (resource string, unit string, value float64) {
  276. resource = promutil.SanitizeLabelName(string(resourceName))
  277. switch resourceName {
  278. case v1.ResourceCPU:
  279. unit = "core"
  280. value = float64(quantity.MilliValue()) / 1000
  281. return
  282. case v1.ResourceStorage:
  283. fallthrough
  284. case v1.ResourceEphemeralStorage:
  285. fallthrough
  286. case v1.ResourceMemory:
  287. unit = "byte"
  288. value = float64(quantity.Value())
  289. return
  290. case v1.ResourcePods:
  291. unit = "integer"
  292. value = float64(quantity.Value())
  293. return
  294. default:
  295. if isHugePageResourceName(resourceName) || isAttachableVolumeResourceName(resourceName) {
  296. unit = "byte"
  297. value = float64(quantity.Value())
  298. return
  299. }
  300. if isExtendedResourceName(resourceName) {
  301. unit = "integer"
  302. value = float64(quantity.Value())
  303. return
  304. }
  305. }
  306. resource = ""
  307. unit = ""
  308. value = 0.0
  309. return
  310. }
  311. // isHugePageResourceName checks for a huge page container resource name
  312. func isHugePageResourceName(name v1.ResourceName) bool {
  313. return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix)
  314. }
  315. // isAttachableVolumeResourceName checks for attached volume container resource name
  316. func isAttachableVolumeResourceName(name v1.ResourceName) bool {
  317. return strings.HasPrefix(string(name), v1.ResourceAttachableVolumesPrefix)
  318. }
  319. // isExtendedResourceName checks for extended container resource name
  320. func isExtendedResourceName(name v1.ResourceName) bool {
  321. if isNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) {
  322. return false
  323. }
  324. // Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name
  325. nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name))
  326. if errs := validation.IsQualifiedName(nameForQuota); len(errs) != 0 {
  327. return false
  328. }
  329. return true
  330. }
  331. // isNativeResource checks for a kubernetes.io/ prefixed resource name
  332. func isNativeResource(name v1.ResourceName) bool {
  333. return !strings.Contains(string(name), "/") || isPrefixedNativeResource(name)
  334. }
  335. func isPrefixedNativeResource(name v1.ResourceName) bool {
  336. return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix)
  337. }