| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546 |
- package metrics
- import (
- "fmt"
- "github.com/opencost/opencost/core/pkg/clustercache"
- "github.com/opencost/opencost/core/pkg/clusters"
- "github.com/opencost/opencost/core/pkg/log"
- coreutil "github.com/opencost/opencost/core/pkg/util"
- "github.com/opencost/opencost/core/pkg/util/promutil"
- "github.com/prometheus/client_golang/prometheus"
- dto "github.com/prometheus/client_model/go"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- "k8s.io/apimachinery/pkg/types"
- )
- //--------------------------------------------------------------------------
- // KubeModelCollector
- //--------------------------------------------------------------------------
- // kubeModelMetricNames lists every metric name emitted by KubeModelCollector.
- // These are checked against the disabled-metrics map in Describe/Collect.
- var kubeModelMetricNames = []string{
- "node_info",
- "cluster_info",
- "pod_info",
- "pod_pvc_volume",
- "namespace_info",
- "deployment_info",
- "deployment_labels",
- "deployment_annotations",
- "statefulset_info",
- "statefulset_labels",
- "statefulset_annotations",
- "daemonset_info",
- "daemonset_labels",
- "daemonset_annotations",
- "job_info",
- "job_labels",
- "job_annotations",
- "cronjob_info",
- "cronjob_labels",
- "cronjob_annotations",
- "replicaset_info",
- "replicaset_labels",
- "replicaset_annotations",
- "resourcequota_info",
- }
- // KubeModelCollector emits a unified set of info/labels/annotations metrics for
- // all Kubernetes resource types. It mirrors the collector-source ClusterCacheScraper:
- // indexes are built once per Collect call and per-resource scrapes run concurrently.
- type KubeModelCollector struct {
- KubeClusterCache clustercache.ClusterCache
- ClusterInfo clusters.ClusterInfoProvider
- metricsConfig MetricsConfig
- }
- // Describe sends a generic descriptor for each metric emitted by this collector.
- func (c KubeModelCollector) Describe(ch chan<- *prometheus.Desc) {
- disabled := c.metricsConfig.GetDisabledMetricsMap()
- for _, name := range kubeModelMetricNames {
- if _, ok := disabled[name]; ok {
- continue
- }
- ch <- prometheus.NewDesc(name, name, []string{}, nil)
- }
- }
- // Collect fetches all cluster resources, builds cross-reference indexes, then
- // emits info/labels/annotations metrics concurrently per resource type.
- func (c KubeModelCollector) Collect(ch chan<- prometheus.Metric) {
- disabled := c.metricsConfig.GetDisabledMetricsMap()
- // Fetch all resources from the cache up front.
- nodes := c.KubeClusterCache.GetAllNodes()
- namespaces := c.KubeClusterCache.GetAllNamespaces()
- pods := c.KubeClusterCache.GetAllPods()
- pvcs := c.KubeClusterCache.GetAllPersistentVolumeClaims()
- deployments := c.KubeClusterCache.GetAllDeployments()
- statefulSets := c.KubeClusterCache.GetAllStatefulSets()
- daemonSets := c.KubeClusterCache.GetAllDaemonSets()
- jobs := c.KubeClusterCache.GetAllJobs()
- cronJobs := c.KubeClusterCache.GetAllCronJobs()
- replicaSets := c.KubeClusterCache.GetAllReplicaSets()
- resourceQuotas := c.KubeClusterCache.GetAllResourceQuotas()
- // Build cross-reference indexes.
- nsIndex := make(map[string]types.UID, len(namespaces))
- for _, ns := range namespaces {
- nsIndex[ns.Name] = ns.UID
- }
- nodeIndex := make(map[string]types.UID, len(nodes))
- for _, node := range nodes {
- nodeIndex[node.Name] = node.UID
- }
- pvcIndex := make(map[string]types.UID, len(pvcs))
- for _, pvc := range pvcs {
- pvcIndex[pvcIndexKey(pvc.Namespace, pvc.Name)] = pvc.UID
- }
- // Collect concurrently using a channel.
- type scrapeFn func() []kubeModelMetric
- fns := []scrapeFn{
- func() []kubeModelMetric { return c.scrapeClusterInfo(disabled) },
- func() []kubeModelMetric { return c.scrapeNodes(nodes, disabled) },
- func() []kubeModelMetric { return c.scrapeNamespaces(namespaces, disabled) },
- func() []kubeModelMetric { return c.scrapePods(pods, nsIndex, nodeIndex, pvcIndex, disabled) },
- func() []kubeModelMetric { return c.scrapeDeployments(deployments, nsIndex, disabled) },
- func() []kubeModelMetric { return c.scrapeStatefulSets(statefulSets, nsIndex, disabled) },
- func() []kubeModelMetric { return c.scrapeDaemonSets(daemonSets, nsIndex, disabled) },
- func() []kubeModelMetric { return c.scrapeJobs(jobs, nsIndex, disabled) },
- func() []kubeModelMetric { return c.scrapeCronJobs(cronJobs, nsIndex, disabled) },
- func() []kubeModelMetric { return c.scrapeReplicaSets(replicaSets, nsIndex, disabled) },
- func() []kubeModelMetric { return c.scrapeResourceQuotas(resourceQuotas, nsIndex, disabled) },
- }
- results := make(chan []kubeModelMetric, len(fns))
- for _, fn := range fns {
- fn := fn
- go func() { results <- fn() }()
- }
- for range fns {
- for _, m := range <-results {
- ch <- m
- }
- }
- }
- // pvcIndexKey returns a map key for a PVC by namespace+name.
- func pvcIndexKey(namespace, name string) string {
- return fmt.Sprintf("%s/%s", namespace, name)
- }
- //--------------------------------------------------------------------------
- // kubeModelMetric — generic prometheus.Metric for kube-model emissions
- //--------------------------------------------------------------------------
- // kubeModelMetric implements prometheus.Metric for any kube-model info/labels metric.
- // All labels are stored in a map and emitted via Write; the gauge value defaults to 1.
- type kubeModelMetric struct {
- name string
- help string
- labels map[string]string
- value float64
- }
- func newInfoMetric(name string, labels map[string]string) kubeModelMetric {
- return kubeModelMetric{name: name, help: name, labels: labels, value: 1}
- }
- func newValueMetric(name string, labels map[string]string, value float64) kubeModelMetric {
- return kubeModelMetric{name: name, help: name, labels: labels, value: value}
- }
- func (m kubeModelMetric) Desc() *prometheus.Desc {
- return prometheus.NewDesc(m.name, m.help, promutil.LabelNamesFrom(m.labels), prometheus.Labels{})
- }
- func (m kubeModelMetric) Write(pb *dto.Metric) error {
- pb.Gauge = &dto.Gauge{Value: &m.value}
- pairs := make([]*dto.LabelPair, 0, len(m.labels))
- for k, v := range m.labels {
- pairs = append(pairs, &dto.LabelPair{
- Name: toStringPtr(k),
- Value: toStringPtr(v),
- })
- }
- pb.Label = pairs
- return nil
- }
- //--------------------------------------------------------------------------
- // Per-resource scrape helpers
- //--------------------------------------------------------------------------
- func (c KubeModelCollector) scrapeClusterInfo(disabled map[string]struct{}) []kubeModelMetric {
- if _, ok := disabled["cluster_info"]; ok {
- return nil
- }
- if c.ClusterInfo == nil {
- return nil
- }
- info := c.ClusterInfo.GetClusterInfo()
- labels := map[string]string{
- "uid": info[clusters.ClusterInfoIdKey],
- "provider": info[clusters.ClusterInfoProviderKey],
- "account_id": info[clusters.ClusterInfoAccountKey],
- "provisioner_name": info[clusters.ClusterInfoProvisionerKey],
- "region": info[clusters.ClusterInfoRegionKey],
- }
- // GCP uses "project" instead of "account"
- if labels["account_id"] == "" {
- labels["account_id"] = info[clusters.ClusterInfoProjectKey]
- }
- return []kubeModelMetric{newInfoMetric("cluster_info", labels)}
- }
- func (c KubeModelCollector) scrapeNodes(nodes []*clustercache.Node, disabled map[string]struct{}) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "node_info")
- for _, node := range nodes {
- nodeInfo := map[string]string{
- "node": node.Name,
- "uid": string(node.UID),
- "provider_id": node.SpecProviderID,
- }
- if instanceType, ok := coreutil.GetInstanceType(node.Labels); ok {
- nodeInfo["instance_type"] = instanceType
- }
- if emitInfo {
- out = append(out, newInfoMetric("node_info", nodeInfo))
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapeNamespaces(namespaces []*clustercache.Namespace, disabled map[string]struct{}) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "namespace_info")
- for _, ns := range namespaces {
- if emitInfo {
- out = append(out, newInfoMetric("namespace_info", map[string]string{
- "uid": string(ns.UID),
- "namespace": ns.Name,
- }))
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapePods(
- pods []*clustercache.Pod,
- nsIndex map[string]types.UID,
- nodeIndex map[string]types.UID,
- pvcIndex map[string]types.UID,
- disabled map[string]struct{},
- ) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "pod_info")
- emitPVC := !isDisabled(disabled, "pod_pvc_volume")
- for _, pod := range pods {
- nsUID, ok := nsIndex[pod.Namespace]
- if !ok {
- log.Debugf("KubeModelCollector: pod namespace uid missing for namespace '%s'", pod.Namespace)
- }
- nodeUID, ok := nodeIndex[pod.Spec.NodeName]
- if !ok && pod.Spec.NodeName != "" {
- log.Debugf("KubeModelCollector: pod node uid missing for node '%s'", pod.Spec.NodeName)
- }
- if emitInfo {
- out = append(out, newInfoMetric("pod_info", map[string]string{
- "uid": string(pod.UID),
- "pod": pod.Name,
- "namespace_uid": string(nsUID),
- "node_uid": string(nodeUID),
- }))
- }
- if emitPVC {
- for _, vol := range pod.Spec.Volumes {
- if vol.PersistentVolumeClaim == nil {
- continue
- }
- pvcUID := pvcIndex[pvcIndexKey(pod.Namespace, vol.PersistentVolumeClaim.ClaimName)]
- out = append(out, newInfoMetric("pod_pvc_volume", map[string]string{
- "uid": string(pod.UID),
- "persistentvolumeclaim_uid": string(pvcUID),
- "pod_volume_name": vol.Name,
- }))
- }
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapeDeployments(
- deployments []*clustercache.Deployment,
- nsIndex map[string]types.UID,
- disabled map[string]struct{},
- ) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "deployment_info")
- emitLabels := !isDisabled(disabled, "deployment_labels")
- emitAnno := !isDisabled(disabled, "deployment_annotations")
- for _, d := range deployments {
- nsUID, ok := nsIndex[d.Namespace]
- if !ok {
- log.Debugf("KubeModelCollector: deployment namespace uid missing for namespace '%s'", d.Namespace)
- }
- if emitInfo {
- out = append(out, newInfoMetric("deployment_info", map[string]string{
- "uid": string(d.UID),
- "namespace_uid": string(nsUID),
- "deployment": d.Name,
- }))
- }
- if emitLabels {
- out = append(out, kubeLabelsMetric("deployment_labels", string(d.UID), d.Labels))
- }
- if emitAnno {
- out = append(out, kubeAnnotationsMetric("deployment_annotations", string(d.UID), d.Annotations))
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapeStatefulSets(
- sets []*clustercache.StatefulSet,
- nsIndex map[string]types.UID,
- disabled map[string]struct{},
- ) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "statefulset_info")
- emitLabels := !isDisabled(disabled, "statefulset_labels")
- emitAnno := !isDisabled(disabled, "statefulset_annotations")
- for _, s := range sets {
- nsUID, ok := nsIndex[s.Namespace]
- if !ok {
- log.Debugf("KubeModelCollector: statefulset namespace uid missing for namespace '%s'", s.Namespace)
- }
- if emitInfo {
- out = append(out, newInfoMetric("statefulset_info", map[string]string{
- "uid": string(s.UID),
- "namespace_uid": string(nsUID),
- "statefulSet": s.Name,
- }))
- }
- if emitLabels {
- out = append(out, kubeLabelsMetric("statefulset_labels", string(s.UID), s.Labels))
- }
- if emitAnno {
- out = append(out, kubeAnnotationsMetric("statefulset_annotations", string(s.UID), s.Annotations))
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapeDaemonSets(
- sets []*clustercache.DaemonSet,
- nsIndex map[string]types.UID,
- disabled map[string]struct{},
- ) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "daemonset_info")
- emitLabels := !isDisabled(disabled, "daemonset_labels")
- emitAnno := !isDisabled(disabled, "daemonset_annotations")
- for _, ds := range sets {
- nsUID, ok := nsIndex[ds.Namespace]
- if !ok {
- log.Debugf("KubeModelCollector: daemonset namespace uid missing for namespace '%s'", ds.Namespace)
- }
- if emitInfo {
- out = append(out, newInfoMetric("daemonset_info", map[string]string{
- "uid": string(ds.UID),
- "namespace_uid": string(nsUID),
- "daemonset": ds.Name,
- }))
- }
- if emitLabels {
- out = append(out, kubeLabelsMetric("daemonset_labels", string(ds.UID), ds.Labels))
- }
- if emitAnno {
- out = append(out, kubeAnnotationsMetric("daemonset_annotations", string(ds.UID), ds.Annotations))
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapeJobs(
- jobs []*clustercache.Job,
- nsIndex map[string]types.UID,
- disabled map[string]struct{},
- ) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "job_info")
- emitLabels := !isDisabled(disabled, "job_labels")
- emitAnno := !isDisabled(disabled, "job_annotations")
- for _, j := range jobs {
- nsUID, ok := nsIndex[j.Namespace]
- if !ok {
- log.Debugf("KubeModelCollector: job namespace uid missing for namespace '%s'", j.Namespace)
- }
- if emitInfo {
- out = append(out, newInfoMetric("job_info", map[string]string{
- "uid": string(j.UID),
- "namespace_uid": string(nsUID),
- "job": j.Name,
- }))
- }
- if emitLabels {
- out = append(out, kubeLabelsMetric("job_labels", string(j.UID), j.Labels))
- }
- if emitAnno {
- out = append(out, kubeAnnotationsMetric("job_annotations", string(j.UID), j.Annotations))
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapeCronJobs(
- cronJobs []*clustercache.CronJob,
- nsIndex map[string]types.UID,
- disabled map[string]struct{},
- ) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "cronjob_info")
- emitLabels := !isDisabled(disabled, "cronjob_labels")
- emitAnno := !isDisabled(disabled, "cronjob_annotations")
- for _, cj := range cronJobs {
- nsUID, ok := nsIndex[cj.Namespace]
- if !ok {
- log.Debugf("KubeModelCollector: cronjob namespace uid missing for namespace '%s'", cj.Namespace)
- }
- if emitInfo {
- out = append(out, newInfoMetric("cronjob_info", map[string]string{
- "uid": string(cj.UID),
- "namespace_uid": string(nsUID),
- "cronjob": cj.Name,
- }))
- }
- if emitLabels {
- out = append(out, kubeLabelsMetric("cronjob_labels", string(cj.UID), cj.Labels))
- }
- if emitAnno {
- out = append(out, kubeAnnotationsMetric("cronjob_annotations", string(cj.UID), cj.Annotations))
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapeReplicaSets(
- sets []*clustercache.ReplicaSet,
- nsIndex map[string]types.UID,
- disabled map[string]struct{},
- ) []kubeModelMetric {
- var out []kubeModelMetric
- emitInfo := !isDisabled(disabled, "replicaset_info")
- emitLabels := !isDisabled(disabled, "replicaset_labels")
- emitAnno := !isDisabled(disabled, "replicaset_annotations")
- for _, rs := range sets {
- nsUID, ok := nsIndex[rs.Namespace]
- if !ok {
- log.Debugf("KubeModelCollector: replicaset namespace uid missing for namespace '%s'", rs.Namespace)
- }
- if emitInfo {
- out = append(out, newInfoMetric("replicaset_info", map[string]string{
- "uid": string(rs.UID),
- "namespace_uid": string(nsUID),
- "replicaset": rs.Name,
- }))
- }
- if emitLabels {
- out = append(out, kubeLabelsMetric("replicaset_labels", string(rs.UID), rs.Labels))
- }
- if emitAnno {
- out = append(out, kubeAnnotationsMetric("replicaset_annotations", string(rs.UID), rs.Annotations))
- }
- }
- return out
- }
- func (c KubeModelCollector) scrapeResourceQuotas(
- quotas []*clustercache.ResourceQuota,
- nsIndex map[string]types.UID,
- disabled map[string]struct{},
- ) []kubeModelMetric {
- if isDisabled(disabled, "resourcequota_info") {
- return nil
- }
- var out []kubeModelMetric
- for _, rq := range quotas {
- nsUID, ok := nsIndex[rq.Namespace]
- if !ok {
- log.Debugf("KubeModelCollector: resourcequota namespace uid missing for namespace '%s'", rq.Namespace)
- }
- out = append(out, newInfoMetric("resourcequota_info", map[string]string{
- "uid": string(rq.UID),
- "namespace_uid": string(nsUID),
- "resourcequota": rq.Name,
- }))
- }
- return out
- }
- //--------------------------------------------------------------------------
- // Helpers
- //--------------------------------------------------------------------------
- // isDisabled returns true if the named metric appears in the disabled map.
- func isDisabled(disabled map[string]struct{}, name string) bool {
- _, ok := disabled[name]
- return ok
- }
- // kubeLabelsMetric builds a labels metric for a resource, adding the resource
- // uid as a fixed label alongside the k8s labels (prefixed with "label_").
- func kubeLabelsMetric(name, uid string, k8sLabels map[string]string) kubeModelMetric {
- labelNames, labelValues := promutil.KubeLabelsToLabels(promutil.SanitizeLabels(k8sLabels))
- m := make(map[string]string, len(labelNames)+1)
- m["uid"] = uid
- for i, k := range labelNames {
- m[k] = labelValues[i]
- }
- return newInfoMetric(name, m)
- }
- // kubeAnnotationsMetric builds an annotations metric for a resource.
- func kubeAnnotationsMetric(name, uid string, k8sAnnotations map[string]string) kubeModelMetric {
- annoNames, annoValues := promutil.KubeAnnotationsToLabels(k8sAnnotations)
- m := make(map[string]string, len(annoNames)+1)
- m["uid"] = uid
- for i, k := range annoNames {
- m[k] = annoValues[i]
- }
- return newInfoMetric(name, m)
- }
- // kubeModelResourceValue converts a Kubernetes resource quantity to a float64 value.
- // It mirrors the collector-source toResourceUnitValue logic for the cases we need.
- func kubeModelResourceValue(resourceName v1.ResourceName, quantity resource.Quantity) float64 {
- switch resourceName {
- case v1.ResourceCPU:
- return float64(quantity.MilliValue()) / 1000.0
- default:
- return float64(quantity.Value())
- }
- }
|