kubemodel.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. package metrics
  2. import (
  3. "fmt"
  4. "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/core/pkg/clusters"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. coreutil "github.com/opencost/opencost/core/pkg/util"
  8. "github.com/opencost/opencost/core/pkg/util/promutil"
  9. "github.com/prometheus/client_golang/prometheus"
  10. dto "github.com/prometheus/client_model/go"
  11. v1 "k8s.io/api/core/v1"
  12. "k8s.io/apimachinery/pkg/api/resource"
  13. "k8s.io/apimachinery/pkg/types"
  14. )
  15. //--------------------------------------------------------------------------
  16. // KubeModelCollector
  17. //--------------------------------------------------------------------------
  18. // kubeModelMetricNames lists every metric name emitted by KubeModelCollector.
  19. // These are checked against the disabled-metrics map in Describe/Collect.
  20. var kubeModelMetricNames = []string{
  21. "node_info",
  22. "cluster_info",
  23. "pod_info",
  24. "pod_pvc_volume",
  25. "namespace_info",
  26. "deployment_info",
  27. "deployment_labels",
  28. "deployment_annotations",
  29. "statefulset_info",
  30. "statefulset_labels",
  31. "statefulset_annotations",
  32. "daemonset_info",
  33. "daemonset_labels",
  34. "daemonset_annotations",
  35. "job_info",
  36. "job_labels",
  37. "job_annotations",
  38. "cronjob_info",
  39. "cronjob_labels",
  40. "cronjob_annotations",
  41. "replicaset_info",
  42. "replicaset_labels",
  43. "replicaset_annotations",
  44. "resourcequota_info",
  45. }
  46. // KubeModelCollector emits a unified set of info/labels/annotations metrics for
  47. // all Kubernetes resource types. It mirrors the collector-source ClusterCacheScraper:
  48. // indexes are built once per Collect call and per-resource scrapes run concurrently.
  49. type KubeModelCollector struct {
  50. KubeClusterCache clustercache.ClusterCache
  51. ClusterInfo clusters.ClusterInfoProvider
  52. metricsConfig MetricsConfig
  53. }
  54. // Describe sends a generic descriptor for each metric emitted by this collector.
  55. func (c KubeModelCollector) Describe(ch chan<- *prometheus.Desc) {
  56. disabled := c.metricsConfig.GetDisabledMetricsMap()
  57. for _, name := range kubeModelMetricNames {
  58. if _, ok := disabled[name]; ok {
  59. continue
  60. }
  61. ch <- prometheus.NewDesc(name, name, []string{}, nil)
  62. }
  63. }
  64. // Collect fetches all cluster resources, builds cross-reference indexes, then
  65. // emits info/labels/annotations metrics concurrently per resource type.
  66. func (c KubeModelCollector) Collect(ch chan<- prometheus.Metric) {
  67. disabled := c.metricsConfig.GetDisabledMetricsMap()
  68. // Fetch all resources from the cache up front.
  69. nodes := c.KubeClusterCache.GetAllNodes()
  70. namespaces := c.KubeClusterCache.GetAllNamespaces()
  71. pods := c.KubeClusterCache.GetAllPods()
  72. pvcs := c.KubeClusterCache.GetAllPersistentVolumeClaims()
  73. deployments := c.KubeClusterCache.GetAllDeployments()
  74. statefulSets := c.KubeClusterCache.GetAllStatefulSets()
  75. daemonSets := c.KubeClusterCache.GetAllDaemonSets()
  76. jobs := c.KubeClusterCache.GetAllJobs()
  77. cronJobs := c.KubeClusterCache.GetAllCronJobs()
  78. replicaSets := c.KubeClusterCache.GetAllReplicaSets()
  79. resourceQuotas := c.KubeClusterCache.GetAllResourceQuotas()
  80. // Build cross-reference indexes.
  81. nsIndex := make(map[string]types.UID, len(namespaces))
  82. for _, ns := range namespaces {
  83. nsIndex[ns.Name] = ns.UID
  84. }
  85. nodeIndex := make(map[string]types.UID, len(nodes))
  86. for _, node := range nodes {
  87. nodeIndex[node.Name] = node.UID
  88. }
  89. pvcIndex := make(map[string]types.UID, len(pvcs))
  90. for _, pvc := range pvcs {
  91. pvcIndex[pvcIndexKey(pvc.Namespace, pvc.Name)] = pvc.UID
  92. }
  93. // Collect concurrently using a channel.
  94. type scrapeFn func() []kubeModelMetric
  95. fns := []scrapeFn{
  96. func() []kubeModelMetric { return c.scrapeClusterInfo(disabled) },
  97. func() []kubeModelMetric { return c.scrapeNodes(nodes, disabled) },
  98. func() []kubeModelMetric { return c.scrapeNamespaces(namespaces, disabled) },
  99. func() []kubeModelMetric { return c.scrapePods(pods, nsIndex, nodeIndex, pvcIndex, disabled) },
  100. func() []kubeModelMetric { return c.scrapeDeployments(deployments, nsIndex, disabled) },
  101. func() []kubeModelMetric { return c.scrapeStatefulSets(statefulSets, nsIndex, disabled) },
  102. func() []kubeModelMetric { return c.scrapeDaemonSets(daemonSets, nsIndex, disabled) },
  103. func() []kubeModelMetric { return c.scrapeJobs(jobs, nsIndex, disabled) },
  104. func() []kubeModelMetric { return c.scrapeCronJobs(cronJobs, nsIndex, disabled) },
  105. func() []kubeModelMetric { return c.scrapeReplicaSets(replicaSets, nsIndex, disabled) },
  106. func() []kubeModelMetric { return c.scrapeResourceQuotas(resourceQuotas, nsIndex, disabled) },
  107. }
  108. results := make(chan []kubeModelMetric, len(fns))
  109. for _, fn := range fns {
  110. fn := fn
  111. go func() { results <- fn() }()
  112. }
  113. for range fns {
  114. for _, m := range <-results {
  115. ch <- m
  116. }
  117. }
  118. }
  119. // pvcIndexKey returns a map key for a PVC by namespace+name.
  120. func pvcIndexKey(namespace, name string) string {
  121. return fmt.Sprintf("%s/%s", namespace, name)
  122. }
  123. //--------------------------------------------------------------------------
  124. // kubeModelMetric — generic prometheus.Metric for kube-model emissions
  125. //--------------------------------------------------------------------------
  126. // kubeModelMetric implements prometheus.Metric for any kube-model info/labels metric.
  127. // All labels are stored in a map and emitted via Write; the gauge value defaults to 1.
  128. type kubeModelMetric struct {
  129. name string
  130. help string
  131. labels map[string]string
  132. value float64
  133. }
  134. func newInfoMetric(name string, labels map[string]string) kubeModelMetric {
  135. return kubeModelMetric{name: name, help: name, labels: labels, value: 1}
  136. }
  137. func newValueMetric(name string, labels map[string]string, value float64) kubeModelMetric {
  138. return kubeModelMetric{name: name, help: name, labels: labels, value: value}
  139. }
  140. func (m kubeModelMetric) Desc() *prometheus.Desc {
  141. return prometheus.NewDesc(m.name, m.help, promutil.LabelNamesFrom(m.labels), prometheus.Labels{})
  142. }
  143. func (m kubeModelMetric) Write(pb *dto.Metric) error {
  144. pb.Gauge = &dto.Gauge{Value: &m.value}
  145. pairs := make([]*dto.LabelPair, 0, len(m.labels))
  146. for k, v := range m.labels {
  147. pairs = append(pairs, &dto.LabelPair{
  148. Name: toStringPtr(k),
  149. Value: toStringPtr(v),
  150. })
  151. }
  152. pb.Label = pairs
  153. return nil
  154. }
  155. //--------------------------------------------------------------------------
  156. // Per-resource scrape helpers
  157. //--------------------------------------------------------------------------
  158. func (c KubeModelCollector) scrapeClusterInfo(disabled map[string]struct{}) []kubeModelMetric {
  159. if _, ok := disabled["cluster_info"]; ok {
  160. return nil
  161. }
  162. if c.ClusterInfo == nil {
  163. return nil
  164. }
  165. info := c.ClusterInfo.GetClusterInfo()
  166. labels := map[string]string{
  167. "uid": info[clusters.ClusterInfoIdKey],
  168. "provider": info[clusters.ClusterInfoProviderKey],
  169. "account_id": info[clusters.ClusterInfoAccountKey],
  170. "provisioner_name": info[clusters.ClusterInfoProvisionerKey],
  171. "region": info[clusters.ClusterInfoRegionKey],
  172. }
  173. // GCP uses "project" instead of "account"
  174. if labels["account_id"] == "" {
  175. labels["account_id"] = info[clusters.ClusterInfoProjectKey]
  176. }
  177. return []kubeModelMetric{newInfoMetric("cluster_info", labels)}
  178. }
  179. func (c KubeModelCollector) scrapeNodes(nodes []*clustercache.Node, disabled map[string]struct{}) []kubeModelMetric {
  180. var out []kubeModelMetric
  181. emitInfo := !isDisabled(disabled, "node_info")
  182. for _, node := range nodes {
  183. nodeInfo := map[string]string{
  184. "node": node.Name,
  185. "uid": string(node.UID),
  186. "provider_id": node.SpecProviderID,
  187. }
  188. if instanceType, ok := coreutil.GetInstanceType(node.Labels); ok {
  189. nodeInfo["instance_type"] = instanceType
  190. }
  191. if emitInfo {
  192. out = append(out, newInfoMetric("node_info", nodeInfo))
  193. }
  194. }
  195. return out
  196. }
  197. func (c KubeModelCollector) scrapeNamespaces(namespaces []*clustercache.Namespace, disabled map[string]struct{}) []kubeModelMetric {
  198. var out []kubeModelMetric
  199. emitInfo := !isDisabled(disabled, "namespace_info")
  200. for _, ns := range namespaces {
  201. if emitInfo {
  202. out = append(out, newInfoMetric("namespace_info", map[string]string{
  203. "uid": string(ns.UID),
  204. "namespace": ns.Name,
  205. }))
  206. }
  207. }
  208. return out
  209. }
  210. func (c KubeModelCollector) scrapePods(
  211. pods []*clustercache.Pod,
  212. nsIndex map[string]types.UID,
  213. nodeIndex map[string]types.UID,
  214. pvcIndex map[string]types.UID,
  215. disabled map[string]struct{},
  216. ) []kubeModelMetric {
  217. var out []kubeModelMetric
  218. emitInfo := !isDisabled(disabled, "pod_info")
  219. emitPVC := !isDisabled(disabled, "pod_pvc_volume")
  220. for _, pod := range pods {
  221. nsUID, ok := nsIndex[pod.Namespace]
  222. if !ok {
  223. log.Debugf("KubeModelCollector: pod namespace uid missing for namespace '%s'", pod.Namespace)
  224. }
  225. nodeUID, ok := nodeIndex[pod.Spec.NodeName]
  226. if !ok && pod.Spec.NodeName != "" {
  227. log.Debugf("KubeModelCollector: pod node uid missing for node '%s'", pod.Spec.NodeName)
  228. }
  229. if emitInfo {
  230. out = append(out, newInfoMetric("pod_info", map[string]string{
  231. "uid": string(pod.UID),
  232. "pod": pod.Name,
  233. "namespace_uid": string(nsUID),
  234. "node_uid": string(nodeUID),
  235. }))
  236. }
  237. if emitPVC {
  238. for _, vol := range pod.Spec.Volumes {
  239. if vol.PersistentVolumeClaim == nil {
  240. continue
  241. }
  242. pvcUID := pvcIndex[pvcIndexKey(pod.Namespace, vol.PersistentVolumeClaim.ClaimName)]
  243. out = append(out, newInfoMetric("pod_pvc_volume", map[string]string{
  244. "uid": string(pod.UID),
  245. "persistentvolumeclaim_uid": string(pvcUID),
  246. "pod_volume_name": vol.Name,
  247. }))
  248. }
  249. }
  250. }
  251. return out
  252. }
  253. func (c KubeModelCollector) scrapeDeployments(
  254. deployments []*clustercache.Deployment,
  255. nsIndex map[string]types.UID,
  256. disabled map[string]struct{},
  257. ) []kubeModelMetric {
  258. var out []kubeModelMetric
  259. emitInfo := !isDisabled(disabled, "deployment_info")
  260. emitLabels := !isDisabled(disabled, "deployment_labels")
  261. emitAnno := !isDisabled(disabled, "deployment_annotations")
  262. for _, d := range deployments {
  263. nsUID, ok := nsIndex[d.Namespace]
  264. if !ok {
  265. log.Debugf("KubeModelCollector: deployment namespace uid missing for namespace '%s'", d.Namespace)
  266. }
  267. if emitInfo {
  268. out = append(out, newInfoMetric("deployment_info", map[string]string{
  269. "uid": string(d.UID),
  270. "namespace_uid": string(nsUID),
  271. "deployment": d.Name,
  272. }))
  273. }
  274. if emitLabels {
  275. out = append(out, kubeLabelsMetric("deployment_labels", string(d.UID), d.Labels))
  276. }
  277. if emitAnno {
  278. out = append(out, kubeAnnotationsMetric("deployment_annotations", string(d.UID), d.Annotations))
  279. }
  280. }
  281. return out
  282. }
  283. func (c KubeModelCollector) scrapeStatefulSets(
  284. sets []*clustercache.StatefulSet,
  285. nsIndex map[string]types.UID,
  286. disabled map[string]struct{},
  287. ) []kubeModelMetric {
  288. var out []kubeModelMetric
  289. emitInfo := !isDisabled(disabled, "statefulset_info")
  290. emitLabels := !isDisabled(disabled, "statefulset_labels")
  291. emitAnno := !isDisabled(disabled, "statefulset_annotations")
  292. for _, s := range sets {
  293. nsUID, ok := nsIndex[s.Namespace]
  294. if !ok {
  295. log.Debugf("KubeModelCollector: statefulset namespace uid missing for namespace '%s'", s.Namespace)
  296. }
  297. if emitInfo {
  298. out = append(out, newInfoMetric("statefulset_info", map[string]string{
  299. "uid": string(s.UID),
  300. "namespace_uid": string(nsUID),
  301. "statefulSet": s.Name,
  302. }))
  303. }
  304. if emitLabels {
  305. out = append(out, kubeLabelsMetric("statefulset_labels", string(s.UID), s.Labels))
  306. }
  307. if emitAnno {
  308. out = append(out, kubeAnnotationsMetric("statefulset_annotations", string(s.UID), s.Annotations))
  309. }
  310. }
  311. return out
  312. }
  313. func (c KubeModelCollector) scrapeDaemonSets(
  314. sets []*clustercache.DaemonSet,
  315. nsIndex map[string]types.UID,
  316. disabled map[string]struct{},
  317. ) []kubeModelMetric {
  318. var out []kubeModelMetric
  319. emitInfo := !isDisabled(disabled, "daemonset_info")
  320. emitLabels := !isDisabled(disabled, "daemonset_labels")
  321. emitAnno := !isDisabled(disabled, "daemonset_annotations")
  322. for _, ds := range sets {
  323. nsUID, ok := nsIndex[ds.Namespace]
  324. if !ok {
  325. log.Debugf("KubeModelCollector: daemonset namespace uid missing for namespace '%s'", ds.Namespace)
  326. }
  327. if emitInfo {
  328. out = append(out, newInfoMetric("daemonset_info", map[string]string{
  329. "uid": string(ds.UID),
  330. "namespace_uid": string(nsUID),
  331. "daemonset": ds.Name,
  332. }))
  333. }
  334. if emitLabels {
  335. out = append(out, kubeLabelsMetric("daemonset_labels", string(ds.UID), ds.Labels))
  336. }
  337. if emitAnno {
  338. out = append(out, kubeAnnotationsMetric("daemonset_annotations", string(ds.UID), ds.Annotations))
  339. }
  340. }
  341. return out
  342. }
  343. func (c KubeModelCollector) scrapeJobs(
  344. jobs []*clustercache.Job,
  345. nsIndex map[string]types.UID,
  346. disabled map[string]struct{},
  347. ) []kubeModelMetric {
  348. var out []kubeModelMetric
  349. emitInfo := !isDisabled(disabled, "job_info")
  350. emitLabels := !isDisabled(disabled, "job_labels")
  351. emitAnno := !isDisabled(disabled, "job_annotations")
  352. for _, j := range jobs {
  353. nsUID, ok := nsIndex[j.Namespace]
  354. if !ok {
  355. log.Debugf("KubeModelCollector: job namespace uid missing for namespace '%s'", j.Namespace)
  356. }
  357. if emitInfo {
  358. out = append(out, newInfoMetric("job_info", map[string]string{
  359. "uid": string(j.UID),
  360. "namespace_uid": string(nsUID),
  361. "job": j.Name,
  362. }))
  363. }
  364. if emitLabels {
  365. out = append(out, kubeLabelsMetric("job_labels", string(j.UID), j.Labels))
  366. }
  367. if emitAnno {
  368. out = append(out, kubeAnnotationsMetric("job_annotations", string(j.UID), j.Annotations))
  369. }
  370. }
  371. return out
  372. }
  373. func (c KubeModelCollector) scrapeCronJobs(
  374. cronJobs []*clustercache.CronJob,
  375. nsIndex map[string]types.UID,
  376. disabled map[string]struct{},
  377. ) []kubeModelMetric {
  378. var out []kubeModelMetric
  379. emitInfo := !isDisabled(disabled, "cronjob_info")
  380. emitLabels := !isDisabled(disabled, "cronjob_labels")
  381. emitAnno := !isDisabled(disabled, "cronjob_annotations")
  382. for _, cj := range cronJobs {
  383. nsUID, ok := nsIndex[cj.Namespace]
  384. if !ok {
  385. log.Debugf("KubeModelCollector: cronjob namespace uid missing for namespace '%s'", cj.Namespace)
  386. }
  387. if emitInfo {
  388. out = append(out, newInfoMetric("cronjob_info", map[string]string{
  389. "uid": string(cj.UID),
  390. "namespace_uid": string(nsUID),
  391. "cronjob": cj.Name,
  392. }))
  393. }
  394. if emitLabels {
  395. out = append(out, kubeLabelsMetric("cronjob_labels", string(cj.UID), cj.Labels))
  396. }
  397. if emitAnno {
  398. out = append(out, kubeAnnotationsMetric("cronjob_annotations", string(cj.UID), cj.Annotations))
  399. }
  400. }
  401. return out
  402. }
  403. func (c KubeModelCollector) scrapeReplicaSets(
  404. sets []*clustercache.ReplicaSet,
  405. nsIndex map[string]types.UID,
  406. disabled map[string]struct{},
  407. ) []kubeModelMetric {
  408. var out []kubeModelMetric
  409. emitInfo := !isDisabled(disabled, "replicaset_info")
  410. emitLabels := !isDisabled(disabled, "replicaset_labels")
  411. emitAnno := !isDisabled(disabled, "replicaset_annotations")
  412. for _, rs := range sets {
  413. nsUID, ok := nsIndex[rs.Namespace]
  414. if !ok {
  415. log.Debugf("KubeModelCollector: replicaset namespace uid missing for namespace '%s'", rs.Namespace)
  416. }
  417. if emitInfo {
  418. out = append(out, newInfoMetric("replicaset_info", map[string]string{
  419. "uid": string(rs.UID),
  420. "namespace_uid": string(nsUID),
  421. "replicaset": rs.Name,
  422. }))
  423. }
  424. if emitLabels {
  425. out = append(out, kubeLabelsMetric("replicaset_labels", string(rs.UID), rs.Labels))
  426. }
  427. if emitAnno {
  428. out = append(out, kubeAnnotationsMetric("replicaset_annotations", string(rs.UID), rs.Annotations))
  429. }
  430. }
  431. return out
  432. }
  433. func (c KubeModelCollector) scrapeResourceQuotas(
  434. quotas []*clustercache.ResourceQuota,
  435. nsIndex map[string]types.UID,
  436. disabled map[string]struct{},
  437. ) []kubeModelMetric {
  438. if isDisabled(disabled, "resourcequota_info") {
  439. return nil
  440. }
  441. var out []kubeModelMetric
  442. for _, rq := range quotas {
  443. nsUID, ok := nsIndex[rq.Namespace]
  444. if !ok {
  445. log.Debugf("KubeModelCollector: resourcequota namespace uid missing for namespace '%s'", rq.Namespace)
  446. }
  447. out = append(out, newInfoMetric("resourcequota_info", map[string]string{
  448. "uid": string(rq.UID),
  449. "namespace_uid": string(nsUID),
  450. "resourcequota": rq.Name,
  451. }))
  452. }
  453. return out
  454. }
  455. //--------------------------------------------------------------------------
  456. // Helpers
  457. //--------------------------------------------------------------------------
  458. // isDisabled returns true if the named metric appears in the disabled map.
  459. func isDisabled(disabled map[string]struct{}, name string) bool {
  460. _, ok := disabled[name]
  461. return ok
  462. }
  463. // kubeLabelsMetric builds a labels metric for a resource, adding the resource
  464. // uid as a fixed label alongside the k8s labels (prefixed with "label_").
  465. func kubeLabelsMetric(name, uid string, k8sLabels map[string]string) kubeModelMetric {
  466. labelNames, labelValues := promutil.KubeLabelsToLabels(promutil.SanitizeLabels(k8sLabels))
  467. m := make(map[string]string, len(labelNames)+1)
  468. m["uid"] = uid
  469. for i, k := range labelNames {
  470. m[k] = labelValues[i]
  471. }
  472. return newInfoMetric(name, m)
  473. }
  474. // kubeAnnotationsMetric builds an annotations metric for a resource.
  475. func kubeAnnotationsMetric(name, uid string, k8sAnnotations map[string]string) kubeModelMetric {
  476. annoNames, annoValues := promutil.KubeAnnotationsToLabels(k8sAnnotations)
  477. m := make(map[string]string, len(annoNames)+1)
  478. m["uid"] = uid
  479. for i, k := range annoNames {
  480. m[k] = annoValues[i]
  481. }
  482. return newInfoMetric(name, m)
  483. }
  484. // kubeModelResourceValue converts a Kubernetes resource quantity to a float64 value.
  485. // It mirrors the collector-source toResourceUnitValue logic for the cases we need.
  486. func kubeModelResourceValue(resourceName v1.ResourceName, quantity resource.Quantity) float64 {
  487. switch resourceName {
  488. case v1.ResourceCPU:
  489. return float64(quantity.MilliValue()) / 1000.0
  490. default:
  491. return float64(quantity.Value())
  492. }
  493. }