package scrape import ( "fmt" "slices" "strings" "github.com/opencost/opencost/core/pkg/clustercache" "github.com/opencost/opencost/core/pkg/log" "github.com/opencost/opencost/core/pkg/source" "github.com/opencost/opencost/core/pkg/util/promutil" "github.com/opencost/opencost/modules/collector-source/pkg/metric" "github.com/opencost/opencost/modules/collector-source/pkg/util" "golang.org/x/exp/maps" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/validation" ) // Cluster Cache Metrics const ( KubeNodeStatusCapacityCPUCores = "kube_node_status_capacity_cpu_cores" KubeNodeStatusCapacityMemoryBytes = "kube_node_status_capacity_memory_bytes" KubeNodeStatusAllocatableCPUCores = "kube_node_status_allocatable_cpu_cores" KubeNodeStatusAllocatableMemoryBytes = "kube_node_status_allocatable_memory_bytes" KubeNodeLabels = "kube_node_labels" KubePodLabels = "kube_pod_labels" KubePodAnnotations = "kube_pod_annotations" KubePodOwner = "kube_pod_owner" KubePodContainerStatusRunning = "kube_pod_container_status_running" KubePodContainerResourceRequests = "kube_pod_container_resource_requests" KubePersistentVolumeClaimInfo = "kube_persistentvolumeclaim_info" KubePersistentVolumeClaimResourceRequestsStorageBytes = "kube_persistentvolumeclaim_resource_requests_storage_bytes" KubecostPVInfo = "kubecost_pv_info" KubePersistentVolumeCapacityBytes = "kube_persistentvolume_capacity_bytes" DeploymentMatchLabels = "deployment_match_labels" KubeNamespaceLabels = "kube_namespace_labels" KubeNamespaceAnnotations = "kube_namespace_annotations" ServiceSelectorLabels = "service_selector_labels" StatefulSetMatchLabels = "statefulSet_match_labels" KubeReplicasetOwner = "kube_replicaset_owner" ) type ClusterCacheScraper struct { clusterCache clustercache.ClusterCache } func newClusterCacheScraper(clusterCache clustercache.ClusterCache) Scraper { return &ClusterCacheScraper{ clusterCache: clusterCache, } } func (ccs *ClusterCacheScraper) Scrape() []metric.Update { scrapeFuncs := []ScrapeFunc{ ccs.ScrapeNodes, ccs.ScrapeDeployments, ccs.ScrapeNamespaces, ccs.ScrapePods, ccs.ScrapePVCs, ccs.ScrapePVs, ccs.ScrapeServices, ccs.ScrapeStatefulSets, ccs.ScrapeReplicaSets, } return concurrentScrape(scrapeFuncs...) } func (ccs *ClusterCacheScraper) ScrapeNodes() []metric.Update { nodes := ccs.clusterCache.GetAllNodes() return ccs.scrapeNodes(nodes) } func (ccs *ClusterCacheScraper) scrapeNodes(nodes []*clustercache.Node) []metric.Update { var scrapeResults []metric.Update for _, node := range nodes { nodeInfo := map[string]string{ source.NodeLabel: node.Name, source.ProviderIDLabel: node.SpecProviderID, } // Node Capacity if node.Status.Capacity != nil { if quantity, ok := node.Status.Capacity[v1.ResourceCPU]; ok { _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity) scrapeResults = append(scrapeResults, metric.Update{ Name: KubeNodeStatusCapacityCPUCores, Labels: nodeInfo, Value: value, }) } if quantity, ok := node.Status.Capacity[v1.ResourceMemory]; ok { _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity) scrapeResults = append(scrapeResults, metric.Update{ Name: KubeNodeStatusCapacityMemoryBytes, Labels: nodeInfo, Value: value, }) } } // Node Allocatable Resources if node.Status.Allocatable != nil { if quantity, ok := node.Status.Allocatable[v1.ResourceCPU]; ok { _, _, value := toResourceUnitValue(v1.ResourceCPU, quantity) scrapeResults = append(scrapeResults, metric.Update{ Name: KubeNodeStatusAllocatableCPUCores, Labels: nodeInfo, Value: value, }) } if quantity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok { _, _, value := toResourceUnitValue(v1.ResourceMemory, quantity) scrapeResults = append(scrapeResults, metric.Update{ Name: KubeNodeStatusAllocatableMemoryBytes, Labels: nodeInfo, Value: value, }) } } // node labels labelNames, labelValues := promutil.KubeLabelsToLabels(node.Labels) nodeLabels := util.ToMap(labelNames, labelValues) scrapeResults = append(scrapeResults, metric.Update{ Name: KubeNodeLabels, Labels: nodeInfo, Value: 0, AdditionalInfo: nodeLabels, }) } return scrapeResults } func (ccs *ClusterCacheScraper) ScrapeDeployments() []metric.Update { deployments := ccs.clusterCache.GetAllDeployments() return ccs.scrapeDeployments(deployments) } func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.Deployment) []metric.Update { var scrapeResults []metric.Update for _, deployment := range deployments { deploymentInfo := map[string]string{ source.DeploymentLabel: deployment.Name, source.NamespaceLabel: deployment.Namespace, } // deployment labels labelNames, labelValues := promutil.KubeLabelsToLabels(deployment.MatchLabels) deploymentLabels := util.ToMap(labelNames, labelValues) scrapeResults = append(scrapeResults, metric.Update{ Name: DeploymentMatchLabels, Labels: deploymentInfo, Value: 0, AdditionalInfo: deploymentLabels, }) } return scrapeResults } func (ccs *ClusterCacheScraper) ScrapeNamespaces() []metric.Update { namespaces := ccs.clusterCache.GetAllNamespaces() return ccs.scrapeNamespaces(namespaces) } func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Namespace) []metric.Update { var scrapeResults []metric.Update for _, namespace := range namespaces { namespaceInfo := map[string]string{ source.NamespaceLabel: namespace.Name, } // namespace labels labelNames, labelValues := promutil.KubeLabelsToLabels(namespace.Labels) namespaceLabels := util.ToMap(labelNames, labelValues) scrapeResults = append(scrapeResults, metric.Update{ Name: KubeNamespaceLabels, Labels: namespaceInfo, Value: 0, AdditionalInfo: namespaceLabels, }) // namespace annotations annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(namespace.Annotations) namespaceAnnotations := util.ToMap(annotationNames, annotationValues) scrapeResults = append(scrapeResults, metric.Update{ Name: KubeNamespaceAnnotations, Labels: namespaceInfo, Value: 0, AdditionalInfo: namespaceAnnotations, }) } return scrapeResults } func (ccs *ClusterCacheScraper) ScrapePods() []metric.Update { pods := ccs.clusterCache.GetAllPods() return ccs.scrapePods(pods) } func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Update { var scrapeResults []metric.Update for _, pod := range pods { podInfo := map[string]string{ source.PodLabel: pod.Name, source.NamespaceLabel: pod.Namespace, source.UIDLabel: string(pod.UID), source.NodeLabel: pod.Spec.NodeName, source.InstanceLabel: pod.Spec.NodeName, } // pod labels labelNames, labelValues := promutil.KubeLabelsToLabels(pod.Labels) podLabels := util.ToMap(labelNames, labelValues) scrapeResults = append(scrapeResults, metric.Update{ Name: KubePodLabels, Labels: podInfo, Value: 0, AdditionalInfo: podLabels, }) // pod annotations annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(pod.Annotations) podAnnotations := util.ToMap(annotationNames, annotationValues) scrapeResults = append(scrapeResults, metric.Update{ Name: KubePodAnnotations, Labels: podInfo, Value: 0, AdditionalInfo: podAnnotations, }) // Pod owner metric for _, owner := range pod.OwnerReferences { ownerInfo := maps.Clone(podInfo) ownerInfo[source.OwnerKindLabel] = owner.Kind ownerInfo[source.OwnerNameLabel] = owner.Name scrapeResults = append(scrapeResults, metric.Update{ Name: KubePodOwner, Labels: ownerInfo, Value: 0, }) } // Container Status for _, status := range pod.Status.ContainerStatuses { if status.State.Running != nil { containerInfo := maps.Clone(podInfo) containerInfo[source.ContainerLabel] = status.Name scrapeResults = append(scrapeResults, metric.Update{ Name: KubePodContainerStatusRunning, Labels: containerInfo, Value: 0, }) } } for _, container := range pod.Spec.Containers { containerInfo := maps.Clone(podInfo) containerInfo[source.ContainerLabel] = container.Name // Requests if container.Resources.Requests != nil { // sorting keys here for testing purposes keys := maps.Keys(container.Resources.Requests) slices.Sort(keys) for _, resourceName := range keys { quantity := container.Resources.Requests[resourceName] resource, unit, value := toResourceUnitValue(resourceName, quantity) // failed to parse the resource type if resource == "" { log.DedupedWarningf(5, "Failed to parse resource units and quantity for resource: %s", resourceName) continue } resourceRequestInfo := maps.Clone(containerInfo) resourceRequestInfo[source.ResourceLabel] = resource resourceRequestInfo[source.UnitLabel] = unit scrapeResults = append(scrapeResults, metric.Update{ Name: KubePodContainerResourceRequests, Labels: resourceRequestInfo, Value: value, }) } } } } return scrapeResults } func (ccs *ClusterCacheScraper) ScrapePVCs() []metric.Update { pvcs := ccs.clusterCache.GetAllPersistentVolumeClaims() return ccs.scrapePVCs(pvcs) } func (ccs *ClusterCacheScraper) scrapePVCs(pvcs []*clustercache.PersistentVolumeClaim) []metric.Update { var scrapeResults []metric.Update for _, pvc := range pvcs { pvcInfo := map[string]string{ source.PVCLabel: pvc.Name, source.NamespaceLabel: pvc.Namespace, source.VolumeNameLabel: pvc.Spec.VolumeName, source.StorageClassLabel: getPersistentVolumeClaimClass(pvc), } scrapeResults = append(scrapeResults, metric.Update{ Name: KubePersistentVolumeClaimInfo, Labels: pvcInfo, Value: 0, }) if storage, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage]; ok { scrapeResults = append(scrapeResults, metric.Update{ Name: KubePersistentVolumeClaimResourceRequestsStorageBytes, Labels: pvcInfo, Value: float64(storage.Value()), }) } } return scrapeResults } func (ccs *ClusterCacheScraper) ScrapePVs() []metric.Update { pvs := ccs.clusterCache.GetAllPersistentVolumes() return ccs.scrapePVs(pvs) } func (ccs *ClusterCacheScraper) scrapePVs(pvs []*clustercache.PersistentVolume) []metric.Update { var scrapeResults []metric.Update for _, pv := range pvs { providerID := pv.Name // if a more accurate provider ID is available, use that if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" { providerID = pv.Spec.CSI.VolumeHandle } pvInfo := map[string]string{ source.PVLabel: pv.Name, source.StorageClassLabel: pv.Spec.StorageClassName, source.ProviderIDLabel: providerID, } scrapeResults = append(scrapeResults, metric.Update{ Name: KubecostPVInfo, Labels: pvInfo, Value: 0, }) if storage, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok { scrapeResults = append(scrapeResults, metric.Update{ Name: KubePersistentVolumeCapacityBytes, Labels: pvInfo, Value: float64(storage.Value()), }) } } return scrapeResults } func (ccs *ClusterCacheScraper) ScrapeServices() []metric.Update { services := ccs.clusterCache.GetAllServices() return ccs.scrapeServices(services) } func (ccs *ClusterCacheScraper) scrapeServices(services []*clustercache.Service) []metric.Update { var scrapeResults []metric.Update for _, service := range services { serviceInfo := map[string]string{ source.ServiceLabel: service.Name, source.NamespaceLabel: service.Namespace, } // service labels labelNames, labelValues := promutil.KubeLabelsToLabels(service.SpecSelector) serviceLabels := util.ToMap(labelNames, labelValues) scrapeResults = append(scrapeResults, metric.Update{ Name: ServiceSelectorLabels, Labels: serviceInfo, Value: 0, AdditionalInfo: serviceLabels, }) } return scrapeResults } func (ccs *ClusterCacheScraper) ScrapeStatefulSets() []metric.Update { statefulSets := ccs.clusterCache.GetAllStatefulSets() return ccs.scrapeStatefulSets(statefulSets) } func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet) []metric.Update { var scrapeResults []metric.Update for _, statefulSet := range statefulSets { statefulSetInfo := map[string]string{ source.StatefulSetLabel: statefulSet.Name, source.NamespaceLabel: statefulSet.Namespace, } // statefulSet labels labelNames, labelValues := promutil.KubeLabelsToLabels(statefulSet.SpecSelector.MatchLabels) statefulSetLabels := util.ToMap(labelNames, labelValues) scrapeResults = append(scrapeResults, metric.Update{ Name: StatefulSetMatchLabels, Labels: statefulSetInfo, Value: 0, AdditionalInfo: statefulSetLabels, }) } return scrapeResults } func (ccs *ClusterCacheScraper) ScrapeReplicaSets() []metric.Update { replicaSets := ccs.clusterCache.GetAllReplicaSets() return ccs.scrapeReplicaSets(replicaSets) } func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.ReplicaSet) []metric.Update { var scrapeResults []metric.Update for _, replicaSet := range replicaSets { replicaSetInfo := map[string]string{ source.ReplicaSetLabel: replicaSet.Name, source.NamespaceLabel: replicaSet.Namespace, } for _, owner := range replicaSet.OwnerReferences { ownerInfo := maps.Clone(replicaSetInfo) ownerInfo[source.OwnerKindLabel] = owner.Kind ownerInfo[source.OwnerNameLabel] = owner.Name scrapeResults = append(scrapeResults, metric.Update{ Name: KubeReplicasetOwner, Labels: ownerInfo, Value: 0, }) } } return scrapeResults } // getPersistentVolumeClaimClass returns StorageClassName. If no storage class was // requested, it returns "". func getPersistentVolumeClaimClass(claim *clustercache.PersistentVolumeClaim) string { // Use beta annotation first if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found { return class } if claim.Spec.StorageClassName != nil { return *claim.Spec.StorageClassName } // Special non-empty string to indicate absence of storage class. return "" } // toResourceUnitValue accepts a resource name and quantity and returns the sanitized resource, the unit, and the value in the units. // Returns an empty string for resource and unit if there was a failure. func toResourceUnitValue(resourceName v1.ResourceName, quantity resource.Quantity) (resource string, unit string, value float64) { resource = promutil.SanitizeLabelName(string(resourceName)) switch resourceName { case v1.ResourceCPU: unit = "core" value = float64(quantity.MilliValue()) / 1000 return case v1.ResourceStorage: fallthrough case v1.ResourceEphemeralStorage: fallthrough case v1.ResourceMemory: unit = "byte" value = float64(quantity.Value()) return case v1.ResourcePods: unit = "integer" value = float64(quantity.Value()) return default: if isHugePageResourceName(resourceName) || isAttachableVolumeResourceName(resourceName) { unit = "byte" value = float64(quantity.Value()) return } if isExtendedResourceName(resourceName) { unit = "integer" value = float64(quantity.Value()) return } } resource = "" unit = "" value = 0.0 return } // isHugePageResourceName checks for a huge page container resource name func isHugePageResourceName(name v1.ResourceName) bool { return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix) } // isAttachableVolumeResourceName checks for attached volume container resource name func isAttachableVolumeResourceName(name v1.ResourceName) bool { return strings.HasPrefix(string(name), v1.ResourceAttachableVolumesPrefix) } // isExtendedResourceName checks for extended container resource name func isExtendedResourceName(name v1.ResourceName) bool { if isNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) { return false } // Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name)) if errs := validation.IsQualifiedName(nameForQuota); len(errs) != 0 { return false } return true } // isNativeResource checks for a kubernetes.io/ prefixed resource name func isNativeResource(name v1.ResourceName) bool { return !strings.Contains(string(name), "/") || isPrefixedNativeResource(name) } func isPrefixedNativeResource(name v1.ResourceName) bool { return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix) }