metrics.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109
  1. package costmodel
  2. import (
  3. "math"
  4. "strconv"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/clustercache"
  10. "github.com/kubecost/cost-model/pkg/env"
  11. "github.com/kubecost/cost-model/pkg/errors"
  12. "github.com/kubecost/cost-model/pkg/log"
  13. "github.com/kubecost/cost-model/pkg/prom"
  14. "github.com/kubecost/cost-model/pkg/util"
  15. promclient "github.com/prometheus/client_golang/api"
  16. "github.com/prometheus/client_golang/prometheus"
  17. dto "github.com/prometheus/client_model/go"
  18. v1 "k8s.io/api/core/v1"
  19. "k8s.io/client-go/kubernetes"
  20. "k8s.io/klog"
  21. )
  22. //--------------------------------------------------------------------------
  23. // StatefulsetCollector
  24. //--------------------------------------------------------------------------
  25. // StatefulsetCollector is a prometheus collector that generates StatefulsetMetrics
  26. type StatefulsetCollector struct {
  27. KubeClusterCache clustercache.ClusterCache
  28. }
  29. // Describe sends the super-set of all possible descriptors of metrics
  30. // collected by this Collector.
  31. func (sc StatefulsetCollector) Describe(ch chan<- *prometheus.Desc) {
  32. ch <- prometheus.NewDesc("statefulSet_match_labels", "statfulSet match labels", []string{}, nil)
  33. }
  34. // Collect is called by the Prometheus registry when collecting metrics.
  35. func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
  36. ds := sc.KubeClusterCache.GetAllStatefulSets()
  37. for _, statefulset := range ds {
  38. labels, values := prom.KubeLabelsToLabels(statefulset.Spec.Selector.MatchLabels)
  39. if len(labels) > 0 {
  40. m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
  41. ch <- m
  42. }
  43. }
  44. }
  45. //--------------------------------------------------------------------------
  46. // StatefulsetMetric
  47. //--------------------------------------------------------------------------
  48. // StatefulsetMetric is a prometheus.Metric used to encode statefulset match labels
  49. type StatefulsetMetric struct {
  50. fqName string
  51. help string
  52. labelNames []string
  53. labelValues []string
  54. statefulsetName string
  55. namespace string
  56. }
  57. // Creates a new StatefulsetMetric, implementation of prometheus.Metric
  58. func newStatefulsetMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) StatefulsetMetric {
  59. return StatefulsetMetric{
  60. fqName: fqname,
  61. labelNames: labelNames,
  62. labelValues: labelvalues,
  63. help: "statefulSet_match_labels StatefulSet Match Labels",
  64. statefulsetName: name,
  65. namespace: namespace,
  66. }
  67. }
  68. // Desc returns the descriptor for the Metric. This method idempotently
  69. // returns the same descriptor throughout the lifetime of the Metric.
  70. func (s StatefulsetMetric) Desc() *prometheus.Desc {
  71. l := prometheus.Labels{"statefulSet": s.statefulsetName, "namespace": s.namespace}
  72. return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
  73. }
  74. // Write encodes the Metric into a "Metric" Protocol Buffer data
  75. // transmission object.
  76. func (s StatefulsetMetric) Write(m *dto.Metric) error {
  77. h := float64(1)
  78. m.Gauge = &dto.Gauge{
  79. Value: &h,
  80. }
  81. var labels []*dto.LabelPair
  82. for i := range s.labelNames {
  83. labels = append(labels, &dto.LabelPair{
  84. Name: &s.labelNames[i],
  85. Value: &s.labelValues[i],
  86. })
  87. }
  88. n := "namespace"
  89. labels = append(labels, &dto.LabelPair{
  90. Name: &n,
  91. Value: &s.namespace,
  92. })
  93. r := "statefulSet"
  94. labels = append(labels, &dto.LabelPair{
  95. Name: &r,
  96. Value: &s.statefulsetName,
  97. })
  98. m.Label = labels
  99. return nil
  100. }
  101. //--------------------------------------------------------------------------
  102. // DeploymentCollector
  103. //--------------------------------------------------------------------------
  104. // DeploymentCollector is a prometheus collector that generates DeploymentMetrics
  105. type DeploymentCollector struct {
  106. KubeClusterCache clustercache.ClusterCache
  107. }
  108. // Describe sends the super-set of all possible descriptors of metrics
  109. // collected by this Collector.
  110. func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
  111. ch <- prometheus.NewDesc("deployment_match_labels", "deployment match labels", []string{}, nil)
  112. }
  113. // Collect is called by the Prometheus registry when collecting metrics.
  114. func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
  115. ds := sc.KubeClusterCache.GetAllDeployments()
  116. for _, deployment := range ds {
  117. labels, values := prom.KubeLabelsToLabels(deployment.Spec.Selector.MatchLabels)
  118. if len(labels) > 0 {
  119. m := newDeploymentMetric(deployment.GetName(), deployment.GetNamespace(), "deployment_match_labels", labels, values)
  120. ch <- m
  121. }
  122. }
  123. }
  124. //--------------------------------------------------------------------------
  125. // DeploymentMetric
  126. //--------------------------------------------------------------------------
  127. // DeploymentMetric is a prometheus.Metric used to encode deployment match labels
  128. type DeploymentMetric struct {
  129. fqName string
  130. help string
  131. labelNames []string
  132. labelValues []string
  133. deploymentName string
  134. namespace string
  135. }
  136. // Creates a new DeploymentMetric, implementation of prometheus.Metric
  137. func newDeploymentMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) DeploymentMetric {
  138. return DeploymentMetric{
  139. fqName: fqname,
  140. labelNames: labelNames,
  141. labelValues: labelvalues,
  142. help: "deployment_match_labels Deployment Match Labels",
  143. deploymentName: name,
  144. namespace: namespace,
  145. }
  146. }
  147. // Desc returns the descriptor for the Metric. This method idempotently
  148. // returns the same descriptor throughout the lifetime of the Metric.
  149. func (s DeploymentMetric) Desc() *prometheus.Desc {
  150. l := prometheus.Labels{"deployment": s.deploymentName, "namespace": s.namespace}
  151. return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
  152. }
  153. // Write encodes the Metric into a "Metric" Protocol Buffer data
  154. // transmission object.
  155. func (s DeploymentMetric) Write(m *dto.Metric) error {
  156. h := float64(1)
  157. m.Gauge = &dto.Gauge{
  158. Value: &h,
  159. }
  160. var labels []*dto.LabelPair
  161. for i := range s.labelNames {
  162. labels = append(labels, &dto.LabelPair{
  163. Name: &s.labelNames[i],
  164. Value: &s.labelValues[i],
  165. })
  166. }
  167. n := "namespace"
  168. labels = append(labels, &dto.LabelPair{
  169. Name: &n,
  170. Value: &s.namespace,
  171. })
  172. r := "deployment"
  173. labels = append(labels, &dto.LabelPair{
  174. Name: &r,
  175. Value: &s.deploymentName,
  176. })
  177. m.Label = labels
  178. return nil
  179. }
  180. //--------------------------------------------------------------------------
  181. // ServiceCollector
  182. //--------------------------------------------------------------------------
  183. // ServiceCollector is a prometheus collector that generates ServiceMetrics
  184. type ServiceCollector struct {
  185. KubeClusterCache clustercache.ClusterCache
  186. }
  187. // Describe sends the super-set of all possible descriptors of metrics
  188. // collected by this Collector.
  189. func (sc ServiceCollector) Describe(ch chan<- *prometheus.Desc) {
  190. ch <- prometheus.NewDesc("service_selector_labels", "service selector labels", []string{}, nil)
  191. }
  192. // Collect is called by the Prometheus registry when collecting metrics.
  193. func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
  194. svcs := sc.KubeClusterCache.GetAllServices()
  195. for _, svc := range svcs {
  196. labels, values := prom.KubeLabelsToLabels(svc.Spec.Selector)
  197. if len(labels) > 0 {
  198. m := newServiceMetric(svc.GetName(), svc.GetNamespace(), "service_selector_labels", labels, values)
  199. ch <- m
  200. }
  201. }
  202. }
  203. //--------------------------------------------------------------------------
  204. // ServiceMetric
  205. //--------------------------------------------------------------------------
  206. // ServiceMetric is a prometheus.Metric used to encode service selector labels
  207. type ServiceMetric struct {
  208. fqName string
  209. help string
  210. labelNames []string
  211. labelValues []string
  212. serviceName string
  213. namespace string
  214. }
  215. // Creates a new ServiceMetric, implementation of prometheus.Metric
  216. func newServiceMetric(name, namespace, fqname string, labelNames []string, labelvalues []string) ServiceMetric {
  217. return ServiceMetric{
  218. fqName: fqname,
  219. labelNames: labelNames,
  220. labelValues: labelvalues,
  221. help: "service_selector_labels Service Selector Labels",
  222. serviceName: name,
  223. namespace: namespace,
  224. }
  225. }
  226. // Desc returns the descriptor for the Metric. This method idempotently
  227. // returns the same descriptor throughout the lifetime of the Metric.
  228. func (s ServiceMetric) Desc() *prometheus.Desc {
  229. l := prometheus.Labels{"service": s.serviceName, "namespace": s.namespace}
  230. return prometheus.NewDesc(s.fqName, s.help, s.labelNames, l)
  231. }
  232. // Write encodes the Metric into a "Metric" Protocol Buffer data
  233. // transmission object.
  234. func (s ServiceMetric) Write(m *dto.Metric) error {
  235. h := float64(1)
  236. m.Gauge = &dto.Gauge{
  237. Value: &h,
  238. }
  239. var labels []*dto.LabelPair
  240. for i := range s.labelNames {
  241. labels = append(labels, &dto.LabelPair{
  242. Name: &s.labelNames[i],
  243. Value: &s.labelValues[i],
  244. })
  245. }
  246. n := "namespace"
  247. labels = append(labels, &dto.LabelPair{
  248. Name: &n,
  249. Value: &s.namespace,
  250. })
  251. r := "service"
  252. labels = append(labels, &dto.LabelPair{
  253. Name: &r,
  254. Value: &s.serviceName,
  255. })
  256. m.Label = labels
  257. return nil
  258. }
  259. //--------------------------------------------------------------------------
  260. // NamespaceAnnotationCollector
  261. //--------------------------------------------------------------------------
  262. // NamespaceAnnotationCollector is a prometheus collector that generates NamespaceAnnotationMetrics
  263. type NamespaceAnnotationCollector struct {
  264. KubeClusterCache clustercache.ClusterCache
  265. }
  266. // Describe sends the super-set of all possible descriptors of metrics
  267. // collected by this Collector.
  268. func (nsac NamespaceAnnotationCollector) Describe(ch chan<- *prometheus.Desc) {
  269. ch <- prometheus.NewDesc("kube_namespace_annotations", "namespace annotations", []string{}, nil)
  270. }
  271. // Collect is called by the Prometheus registry when collecting metrics.
  272. func (nsac NamespaceAnnotationCollector) Collect(ch chan<- prometheus.Metric) {
  273. namespaces := nsac.KubeClusterCache.GetAllNamespaces()
  274. for _, namespace := range namespaces {
  275. labels, values := prom.KubeAnnotationsToLabels(namespace.Annotations)
  276. if len(labels) > 0 {
  277. m := newNamespaceAnnotationsMetric(namespace.GetName(), "kube_namespace_annotations", labels, values)
  278. ch <- m
  279. }
  280. }
  281. }
  282. //--------------------------------------------------------------------------
  283. // NamespaceAnnotationsMetric
  284. //--------------------------------------------------------------------------
  285. // NamespaceAnnotationsMetric is a prometheus.Metric used to encode namespace annotations
  286. type NamespaceAnnotationsMetric struct {
  287. fqName string
  288. help string
  289. labelNames []string
  290. labelValues []string
  291. namespace string
  292. }
  293. // Creates a new NamespaceAnnotationsMetric, implementation of prometheus.Metric
  294. func newNamespaceAnnotationsMetric(namespace, fqname string, labelNames []string, labelValues []string) NamespaceAnnotationsMetric {
  295. return NamespaceAnnotationsMetric{
  296. namespace: namespace,
  297. fqName: fqname,
  298. labelNames: labelNames,
  299. labelValues: labelValues,
  300. help: "kube_namespace_annotations Namespace Annotations",
  301. }
  302. }
  303. // Desc returns the descriptor for the Metric. This method idempotently
  304. // returns the same descriptor throughout the lifetime of the Metric.
  305. func (nam NamespaceAnnotationsMetric) Desc() *prometheus.Desc {
  306. l := prometheus.Labels{"namespace": nam.namespace}
  307. return prometheus.NewDesc(nam.fqName, nam.help, nam.labelNames, l)
  308. }
  309. // Write encodes the Metric into a "Metric" Protocol Buffer data
  310. // transmission object.
  311. func (nam NamespaceAnnotationsMetric) Write(m *dto.Metric) error {
  312. h := float64(1)
  313. m.Gauge = &dto.Gauge{
  314. Value: &h,
  315. }
  316. var labels []*dto.LabelPair
  317. for i := range nam.labelNames {
  318. labels = append(labels, &dto.LabelPair{
  319. Name: &nam.labelNames[i],
  320. Value: &nam.labelValues[i],
  321. })
  322. }
  323. n := "namespace"
  324. labels = append(labels, &dto.LabelPair{
  325. Name: &n,
  326. Value: &nam.namespace,
  327. })
  328. m.Label = labels
  329. return nil
  330. }
  331. //--------------------------------------------------------------------------
  332. // PodAnnotationCollector
  333. //--------------------------------------------------------------------------
  334. // PodAnnotationCollector is a prometheus collector that generates PodAnnotationMetrics
  335. type PodAnnotationCollector struct {
  336. KubeClusterCache clustercache.ClusterCache
  337. }
  338. // Describe sends the super-set of all possible descriptors of metrics
  339. // collected by this Collector.
  340. func (pac PodAnnotationCollector) Describe(ch chan<- *prometheus.Desc) {
  341. ch <- prometheus.NewDesc("kube_pod_annotations", "pod annotations", []string{}, nil)
  342. }
  343. // Collect is called by the Prometheus registry when collecting metrics.
  344. func (pac PodAnnotationCollector) Collect(ch chan<- prometheus.Metric) {
  345. pods := pac.KubeClusterCache.GetAllPods()
  346. for _, pod := range pods {
  347. labels, values := prom.KubeAnnotationsToLabels(pod.Annotations)
  348. if len(labels) > 0 {
  349. m := newPodAnnotationMetric(pod.GetNamespace(), pod.GetName(), "kube_pod_annotations", labels, values)
  350. ch <- m
  351. }
  352. }
  353. }
  354. //--------------------------------------------------------------------------
  355. // PodAnnotationsMetric
  356. //--------------------------------------------------------------------------
  357. // PodAnnotationsMetric is a prometheus.Metric used to encode namespace annotations
  358. type PodAnnotationsMetric struct {
  359. name string
  360. fqName string
  361. help string
  362. labelNames []string
  363. labelValues []string
  364. namespace string
  365. }
  366. // Creates a new PodAnnotationsMetric, implementation of prometheus.Metric
  367. func newPodAnnotationMetric(namespace, name, fqname string, labelNames []string, labelValues []string) PodAnnotationsMetric {
  368. return PodAnnotationsMetric{
  369. namespace: namespace,
  370. name: name,
  371. fqName: fqname,
  372. labelNames: labelNames,
  373. labelValues: labelValues,
  374. help: "kube_pod_annotations Pod Annotations",
  375. }
  376. }
  377. // Desc returns the descriptor for the Metric. This method idempotently
  378. // returns the same descriptor throughout the lifetime of the Metric.
  379. func (pam PodAnnotationsMetric) Desc() *prometheus.Desc {
  380. l := prometheus.Labels{"namespace": pam.namespace, "pod": pam.name}
  381. return prometheus.NewDesc(pam.fqName, pam.help, pam.labelNames, l)
  382. }
  383. // Write encodes the Metric into a "Metric" Protocol Buffer data
  384. // transmission object.
  385. func (pam PodAnnotationsMetric) Write(m *dto.Metric) error {
  386. h := float64(1)
  387. m.Gauge = &dto.Gauge{
  388. Value: &h,
  389. }
  390. var labels []*dto.LabelPair
  391. for i := range pam.labelNames {
  392. labels = append(labels, &dto.LabelPair{
  393. Name: &pam.labelNames[i],
  394. Value: &pam.labelValues[i],
  395. })
  396. }
  397. n := "namespace"
  398. labels = append(labels, &dto.LabelPair{
  399. Name: &n,
  400. Value: &pam.namespace,
  401. })
  402. r := "pod"
  403. labels = append(labels, &dto.LabelPair{
  404. Name: &r,
  405. Value: &pam.name,
  406. })
  407. m.Label = labels
  408. return nil
  409. }
  410. //--------------------------------------------------------------------------
  411. // ClusterInfoCollector
  412. //--------------------------------------------------------------------------
  413. // ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
  414. type ClusterInfoCollector struct {
  415. Cloud cloud.Provider
  416. KubeClientSet kubernetes.Interface
  417. }
  418. // Describe sends the super-set of all possible descriptors of metrics
  419. // collected by this Collector.
  420. func (cic ClusterInfoCollector) Describe(ch chan<- *prometheus.Desc) {
  421. ch <- prometheus.NewDesc("kubecost_cluster_info", "Kubecost Cluster Info", []string{}, nil)
  422. }
  423. // Collect is called by the Prometheus registry when collecting metrics.
  424. func (cic ClusterInfoCollector) Collect(ch chan<- prometheus.Metric) {
  425. clusterInfo := GetClusterInfo(cic.KubeClientSet, cic.Cloud)
  426. labels := prom.MapToLabels(clusterInfo)
  427. m := newClusterInfoMetric("kubecost_cluster_info", labels)
  428. ch <- m
  429. }
  430. //--------------------------------------------------------------------------
  431. // ClusterInfoMetric
  432. //--------------------------------------------------------------------------
  433. // ClusterInfoMetric is a prometheus.Metric used to encode the local cluster info
  434. type ClusterInfoMetric struct {
  435. fqName string
  436. help string
  437. labels map[string]string
  438. }
  439. // Creates a new ClusterInfoMetric, implementation of prometheus.Metric
  440. func newClusterInfoMetric(fqName string, labels map[string]string) ClusterInfoMetric {
  441. return ClusterInfoMetric{
  442. fqName: fqName,
  443. labels: labels,
  444. help: "kubecost_cluster_info ClusterInfo",
  445. }
  446. }
  447. // Desc returns the descriptor for the Metric. This method idempotently
  448. // returns the same descriptor throughout the lifetime of the Metric.
  449. func (cim ClusterInfoMetric) Desc() *prometheus.Desc {
  450. l := prometheus.Labels{}
  451. return prometheus.NewDesc(cim.fqName, cim.help, prom.LabelNamesFrom(cim.labels), l)
  452. }
  453. // Write encodes the Metric into a "Metric" Protocol Buffer data
  454. // transmission object.
  455. func (cim ClusterInfoMetric) Write(m *dto.Metric) error {
  456. h := float64(1)
  457. m.Gauge = &dto.Gauge{
  458. Value: &h,
  459. }
  460. var labels []*dto.LabelPair
  461. for k, v := range cim.labels {
  462. labels = append(labels, &dto.LabelPair{
  463. Name: toStringPtr(k),
  464. Value: toStringPtr(v),
  465. })
  466. }
  467. m.Label = labels
  468. return nil
  469. }
  470. // toStringPtr is used to create a new string pointer from iteration vars
  471. func toStringPtr(s string) *string {
  472. return &s
  473. }
  474. //--------------------------------------------------------------------------
  475. // Cost Model Metrics Initialization
  476. //--------------------------------------------------------------------------
  477. // Only allow the metrics to be instantiated and registered once
  478. var metricsInit sync.Once
  479. var (
  480. cpuGv *prometheus.GaugeVec
  481. ramGv *prometheus.GaugeVec
  482. gpuGv *prometheus.GaugeVec
  483. gpuCountGv *prometheus.GaugeVec
  484. pvGv *prometheus.GaugeVec
  485. spotGv *prometheus.GaugeVec
  486. totalGv *prometheus.GaugeVec
  487. ramAllocGv *prometheus.GaugeVec
  488. cpuAllocGv *prometheus.GaugeVec
  489. gpuAllocGv *prometheus.GaugeVec
  490. pvAllocGv *prometheus.GaugeVec
  491. networkZoneEgressCostG prometheus.Gauge
  492. networkRegionEgressCostG prometheus.Gauge
  493. networkInternetEgressCostG prometheus.Gauge
  494. clusterManagementCostGv *prometheus.GaugeVec
  495. lbCostGv *prometheus.GaugeVec
  496. )
  497. // initCostModelMetrics uses a sync.Once to ensure that these metrics are only created once
  498. func initCostModelMetrics(clusterCache clustercache.ClusterCache, provider cloud.Provider) {
  499. metricsInit.Do(func() {
  500. cpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  501. Name: "node_cpu_hourly_cost",
  502. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  503. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  504. ramGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  505. Name: "node_ram_hourly_cost",
  506. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  507. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  508. gpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  509. Name: "node_gpu_hourly_cost",
  510. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  511. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  512. gpuCountGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  513. Name: "node_gpu_count",
  514. Help: "node_gpu_count count of gpu on this node",
  515. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  516. pvGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  517. Name: "pv_hourly_cost",
  518. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  519. }, []string{"volumename", "persistentvolume", "provider_id"})
  520. spotGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  521. Name: "kubecost_node_is_spot",
  522. Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
  523. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  524. totalGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  525. Name: "node_total_hourly_cost",
  526. Help: "node_total_hourly_cost Total node cost per hour",
  527. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  528. ramAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  529. Name: "container_memory_allocation_bytes",
  530. Help: "container_memory_allocation_bytes Bytes of RAM used",
  531. }, []string{"namespace", "pod", "container", "instance", "node"})
  532. cpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  533. Name: "container_cpu_allocation",
  534. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  535. }, []string{"namespace", "pod", "container", "instance", "node"})
  536. gpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  537. Name: "container_gpu_allocation",
  538. Help: "container_gpu_allocation GPU used",
  539. }, []string{"namespace", "pod", "container", "instance", "node"})
  540. pvAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  541. Name: "pod_pvc_allocation",
  542. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  543. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  544. networkZoneEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  545. Name: "kubecost_network_zone_egress_cost",
  546. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  547. })
  548. networkRegionEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  549. Name: "kubecost_network_region_egress_cost",
  550. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  551. })
  552. networkInternetEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  553. Name: "kubecost_network_internet_egress_cost",
  554. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  555. })
  556. clusterManagementCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  557. Name: "kubecost_cluster_management_cost",
  558. Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
  559. }, []string{"provisioner_name"})
  560. lbCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
  561. Name: "kubecost_load_balancer_cost",
  562. Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
  563. }, []string{"ingress_ip", "namespace", "service_name"}) // assumes one ingress IP per load balancer
  564. // Register cost-model metrics for emission
  565. prometheus.MustRegister(cpuGv, ramGv, gpuGv, gpuCountGv, totalGv, pvGv, spotGv)
  566. prometheus.MustRegister(ramAllocGv, cpuAllocGv, gpuAllocGv, pvAllocGv)
  567. prometheus.MustRegister(networkZoneEgressCostG, networkRegionEgressCostG, networkInternetEgressCostG)
  568. prometheus.MustRegister(clusterManagementCostGv, lbCostGv)
  569. // General Metric Collectors
  570. prometheus.MustRegister(ServiceCollector{
  571. KubeClusterCache: clusterCache,
  572. })
  573. prometheus.MustRegister(DeploymentCollector{
  574. KubeClusterCache: clusterCache,
  575. })
  576. prometheus.MustRegister(StatefulsetCollector{
  577. KubeClusterCache: clusterCache,
  578. })
  579. prometheus.MustRegister(ClusterInfoCollector{
  580. KubeClientSet: clusterCache.GetClient(),
  581. Cloud: provider,
  582. })
  583. if env.IsEmitNamespaceAnnotationsMetric() {
  584. prometheus.MustRegister(NamespaceAnnotationCollector{
  585. KubeClusterCache: clusterCache,
  586. })
  587. }
  588. if env.IsEmitPodAnnotationsMetric() {
  589. prometheus.MustRegister(PodAnnotationCollector{
  590. KubeClusterCache: clusterCache,
  591. })
  592. }
  593. })
  594. }
  595. //--------------------------------------------------------------------------
  596. // CostModelMetricsEmitter
  597. //--------------------------------------------------------------------------
  598. // CostModelMetricsEmitter emits all cost-model specific metrics calculated by
  599. // the CostModel.ComputeCostData() method.
  600. type CostModelMetricsEmitter struct {
  601. PrometheusClient promclient.Client
  602. KubeClusterCache clustercache.ClusterCache
  603. CloudProvider cloud.Provider
  604. Model *CostModel
  605. // Metrics
  606. CPUPriceRecorder *prometheus.GaugeVec
  607. RAMPriceRecorder *prometheus.GaugeVec
  608. PersistentVolumePriceRecorder *prometheus.GaugeVec
  609. GPUPriceRecorder *prometheus.GaugeVec
  610. GPUCountRecorder *prometheus.GaugeVec
  611. PVAllocationRecorder *prometheus.GaugeVec
  612. NodeSpotRecorder *prometheus.GaugeVec
  613. NodeTotalPriceRecorder *prometheus.GaugeVec
  614. RAMAllocationRecorder *prometheus.GaugeVec
  615. CPUAllocationRecorder *prometheus.GaugeVec
  616. GPUAllocationRecorder *prometheus.GaugeVec
  617. ClusterManagementCostRecorder *prometheus.GaugeVec
  618. LBCostRecorder *prometheus.GaugeVec
  619. NetworkZoneEgressRecorder prometheus.Gauge
  620. NetworkRegionEgressRecorder prometheus.Gauge
  621. NetworkInternetEgressRecorder prometheus.Gauge
  622. // Flow Control
  623. recordingLock *sync.Mutex
  624. recordingStopping bool
  625. recordingStop chan bool
  626. }
  627. // NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
  628. func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clustercache.ClusterCache, provider cloud.Provider, model *CostModel) *CostModelMetricsEmitter {
  629. // init will only actually execute once to register the custom gauges
  630. initCostModelMetrics(clusterCache, provider)
  631. return &CostModelMetricsEmitter{
  632. PrometheusClient: promClient,
  633. KubeClusterCache: clusterCache,
  634. CloudProvider: provider,
  635. Model: model,
  636. CPUPriceRecorder: cpuGv,
  637. RAMPriceRecorder: ramGv,
  638. GPUPriceRecorder: gpuGv,
  639. GPUCountRecorder: gpuCountGv,
  640. PersistentVolumePriceRecorder: pvGv,
  641. NodeSpotRecorder: spotGv,
  642. NodeTotalPriceRecorder: totalGv,
  643. RAMAllocationRecorder: ramAllocGv,
  644. CPUAllocationRecorder: cpuAllocGv,
  645. GPUAllocationRecorder: gpuAllocGv,
  646. PVAllocationRecorder: pvAllocGv,
  647. NetworkZoneEgressRecorder: networkZoneEgressCostG,
  648. NetworkRegionEgressRecorder: networkRegionEgressCostG,
  649. NetworkInternetEgressRecorder: networkInternetEgressCostG,
  650. ClusterManagementCostRecorder: clusterManagementCostGv,
  651. LBCostRecorder: lbCostGv,
  652. recordingLock: new(sync.Mutex),
  653. recordingStopping: false,
  654. recordingStop: nil,
  655. }
  656. }
  657. // Checks to see if there is a metric recording stop channel. If it exists, a new
  658. // channel is not created and false is returned. If it doesn't exist, a new channel
  659. // is created and true is returned.
  660. func (cmme *CostModelMetricsEmitter) checkOrCreateRecordingChan() bool {
  661. cmme.recordingLock.Lock()
  662. defer cmme.recordingLock.Unlock()
  663. if cmme.recordingStop != nil {
  664. return false
  665. }
  666. cmme.recordingStop = make(chan bool, 1)
  667. return true
  668. }
  669. // IsRunning returns true if metric recording is running.
  670. func (cmme *CostModelMetricsEmitter) IsRunning() bool {
  671. cmme.recordingLock.Lock()
  672. defer cmme.recordingLock.Unlock()
  673. return cmme.recordingStop != nil
  674. }
  675. // StartCostModelMetricRecording starts the go routine that emits metrics used to determine
  676. // cluster costs.
  677. func (cmme *CostModelMetricsEmitter) Start() bool {
  678. // Check to see if we're already recording
  679. // This function will create the stop recording channel and return true
  680. // if it doesn't exist.
  681. if !cmme.checkOrCreateRecordingChan() {
  682. log.Errorf("Attempted to start cost model metric recording when it's already running.")
  683. return false
  684. }
  685. go func() {
  686. defer errors.HandlePanic()
  687. containerSeen := make(map[string]bool)
  688. nodeSeen := make(map[string]bool)
  689. loadBalancerSeen := make(map[string]bool)
  690. pvSeen := make(map[string]bool)
  691. pvcSeen := make(map[string]bool)
  692. getKeyFromLabelStrings := func(labels ...string) string {
  693. return strings.Join(labels, ",")
  694. }
  695. getLabelStringsFromKey := func(key string) []string {
  696. return strings.Split(key, ",")
  697. }
  698. var defaultRegion string = ""
  699. nodeList := cmme.KubeClusterCache.GetAllNodes()
  700. if len(nodeList) > 0 {
  701. var ok bool
  702. defaultRegion, ok = util.GetRegion(nodeList[0].Labels)
  703. if !ok {
  704. log.DedupedWarningf(5, "Failed to locate default region")
  705. }
  706. }
  707. for {
  708. klog.V(4).Info("Recording prices...")
  709. podlist := cmme.KubeClusterCache.GetAllPods()
  710. podStatus := make(map[string]v1.PodPhase)
  711. for _, pod := range podlist {
  712. podStatus[pod.Name] = pod.Status.Phase
  713. }
  714. cfg, _ := cmme.CloudProvider.GetConfig()
  715. provisioner, clusterManagementCost, err := cmme.CloudProvider.ClusterManagementPricing()
  716. if err != nil {
  717. klog.V(1).Infof("Error getting cluster management cost %s", err.Error())
  718. }
  719. cmme.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
  720. // Record network pricing at global scope
  721. networkCosts, err := cmme.CloudProvider.NetworkPricing()
  722. if err != nil {
  723. klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
  724. } else {
  725. cmme.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  726. cmme.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  727. cmme.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  728. }
  729. // TODO: Pass PrometheusClient and CloudProvider into CostModel on instantiation so this isn't so awkward
  730. data, err := cmme.Model.ComputeCostData(cmme.PrometheusClient, cmme.CloudProvider, "2m", "", "")
  731. if err != nil {
  732. // For an error collection, we'll just log the length of the errors (ComputeCostData already logs the
  733. // actual errors)
  734. if prom.IsErrorCollection(err) {
  735. if ec, ok := err.(prom.QueryErrorCollection); ok {
  736. log.Errorf("Error in price recording: %d errors occurred", len(ec.Errors()))
  737. }
  738. } else {
  739. log.Errorf("Error in price recording: " + err.Error())
  740. }
  741. // zero the for loop so the time.Sleep will still work
  742. data = map[string]*CostData{}
  743. }
  744. // TODO: Pass CloudProvider into CostModel on instantiation so this isn't so awkward
  745. nodes, err := cmme.Model.GetNodeCost(cmme.CloudProvider)
  746. for nodeName, node := range nodes {
  747. // Emit costs, guarding against NaN inputs for custom pricing.
  748. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  749. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  750. cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
  751. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  752. cpuCost = 0
  753. }
  754. }
  755. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  756. if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
  757. cpu = 1 // Assume 1 CPU
  758. }
  759. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  760. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  761. ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
  762. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  763. ramCost = 0
  764. }
  765. }
  766. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  767. if math.IsNaN(ram) || math.IsInf(ram, 0) {
  768. ram = 0
  769. }
  770. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  771. if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
  772. gpu = 0
  773. }
  774. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  775. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  776. gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
  777. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  778. gpuCost = 0
  779. }
  780. }
  781. nodeType := node.InstanceType
  782. nodeRegion := node.Region
  783. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  784. cmme.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
  785. cmme.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
  786. cmme.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
  787. cmme.GPUCountRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpu)
  788. cmme.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
  789. if node.IsSpot() {
  790. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(1.0)
  791. } else {
  792. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(0.0)
  793. }
  794. labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
  795. nodeSeen[labelKey] = true
  796. }
  797. // TODO: Pass CloudProvider into CostModel on instantiation so this isn't so awkward
  798. loadBalancers, err := cmme.Model.GetLBCost(cmme.CloudProvider)
  799. for lbKey, lb := range loadBalancers {
  800. // TODO: parse (if necessary) and calculate cost associated with loadBalancer based on dynamic cloud prices fetched into each lb struct on GetLBCost() call
  801. keyParts := getLabelStringsFromKey(lbKey)
  802. namespace := keyParts[0]
  803. serviceName := keyParts[1]
  804. ingressIP := ""
  805. if len(lb.IngressIPAddresses) > 0 {
  806. ingressIP = lb.IngressIPAddresses[0] // assumes one ingress IP per load balancer
  807. }
  808. cmme.LBCostRecorder.WithLabelValues(ingressIP, namespace, serviceName).Set(lb.Cost)
  809. labelKey := getKeyFromLabelStrings(namespace, serviceName)
  810. loadBalancerSeen[labelKey] = true
  811. }
  812. for _, costs := range data {
  813. nodeName := costs.NodeName
  814. namespace := costs.Namespace
  815. podName := costs.PodName
  816. containerName := costs.Name
  817. if costs.PVCData != nil {
  818. for _, pvc := range costs.PVCData {
  819. if pvc.Volume != nil {
  820. timesClaimed := pvc.TimesClaimed
  821. if timesClaimed == 0 {
  822. timesClaimed = 1 // unallocated PVs are unclaimed but have a full allocation
  823. }
  824. cmme.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value / float64(timesClaimed))
  825. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  826. pvcSeen[labelKey] = true
  827. }
  828. }
  829. }
  830. if len(costs.RAMAllocation) > 0 {
  831. cmme.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  832. }
  833. if len(costs.CPUAllocation) > 0 {
  834. cmme.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  835. }
  836. if len(costs.GPUReq) > 0 {
  837. // allocation here is set to the request because shared GPU usage not yet supported.
  838. cmme.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  839. }
  840. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  841. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  842. containerSeen[labelKey] = true
  843. } else {
  844. containerSeen[labelKey] = false
  845. }
  846. }
  847. storageClasses := cmme.KubeClusterCache.GetAllStorageClasses()
  848. storageClassMap := make(map[string]map[string]string)
  849. for _, storageClass := range storageClasses {
  850. params := storageClass.Parameters
  851. storageClassMap[storageClass.ObjectMeta.Name] = params
  852. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  853. storageClassMap["default"] = params
  854. storageClassMap[""] = params
  855. }
  856. }
  857. pvs := cmme.KubeClusterCache.GetAllPersistentVolumes()
  858. for _, pv := range pvs {
  859. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  860. if !ok {
  861. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  862. }
  863. var region string
  864. if r, ok := util.GetRegion(pv.Labels); ok {
  865. region = r
  866. } else {
  867. region = defaultRegion
  868. }
  869. cacPv := &cloud.PV{
  870. Class: pv.Spec.StorageClassName,
  871. Region: region,
  872. Parameters: parameters,
  873. }
  874. // TODO: GetPVCost should be a method in CostModel?
  875. GetPVCost(cacPv, pv, cmme.CloudProvider, region)
  876. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  877. cmme.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name, cacPv.ProviderID).Set(c)
  878. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  879. pvSeen[labelKey] = true
  880. }
  881. for labelString, seen := range nodeSeen {
  882. if !seen {
  883. klog.V(4).Infof("Removing %s from nodes", labelString)
  884. labels := getLabelStringsFromKey(labelString)
  885. ok := cmme.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  886. if ok {
  887. klog.V(4).Infof("removed %s from totalprice", labelString)
  888. } else {
  889. klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
  890. }
  891. ok = cmme.NodeSpotRecorder.DeleteLabelValues(labels...)
  892. if ok {
  893. klog.V(4).Infof("removed %s from spot records", labelString)
  894. } else {
  895. klog.Infof("FAILURE TO REMOVE %s from spot records", labelString)
  896. }
  897. ok = cmme.CPUPriceRecorder.DeleteLabelValues(labels...)
  898. if ok {
  899. klog.V(4).Infof("removed %s from cpuprice", labelString)
  900. } else {
  901. klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
  902. }
  903. ok = cmme.GPUPriceRecorder.DeleteLabelValues(labels...)
  904. if ok {
  905. klog.V(4).Infof("removed %s from gpuprice", labelString)
  906. } else {
  907. klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
  908. }
  909. ok = cmme.GPUCountRecorder.DeleteLabelValues(labels...)
  910. if ok {
  911. klog.V(4).Infof("removed %s from gpucount", labelString)
  912. } else {
  913. klog.Infof("FAILURE TO REMOVE %s from gpucount", labelString)
  914. }
  915. ok = cmme.RAMPriceRecorder.DeleteLabelValues(labels...)
  916. if ok {
  917. klog.V(4).Infof("removed %s from ramprice", labelString)
  918. } else {
  919. klog.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
  920. }
  921. delete(nodeSeen, labelString)
  922. } else {
  923. nodeSeen[labelString] = false
  924. }
  925. }
  926. for labelString, seen := range loadBalancerSeen {
  927. if !seen {
  928. labels := getLabelStringsFromKey(labelString)
  929. cmme.LBCostRecorder.DeleteLabelValues(labels...)
  930. } else {
  931. loadBalancerSeen[labelString] = false
  932. }
  933. }
  934. for labelString, seen := range containerSeen {
  935. if !seen {
  936. labels := getLabelStringsFromKey(labelString)
  937. cmme.RAMAllocationRecorder.DeleteLabelValues(labels...)
  938. cmme.CPUAllocationRecorder.DeleteLabelValues(labels...)
  939. cmme.GPUAllocationRecorder.DeleteLabelValues(labels...)
  940. delete(containerSeen, labelString)
  941. } else {
  942. containerSeen[labelString] = false
  943. }
  944. }
  945. for labelString, seen := range pvSeen {
  946. if !seen {
  947. labels := getLabelStringsFromKey(labelString)
  948. cmme.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  949. delete(pvSeen, labelString)
  950. } else {
  951. pvSeen[labelString] = false
  952. }
  953. }
  954. for labelString, seen := range pvcSeen {
  955. if !seen {
  956. labels := getLabelStringsFromKey(labelString)
  957. cmme.PVAllocationRecorder.DeleteLabelValues(labels...)
  958. delete(pvcSeen, labelString)
  959. } else {
  960. pvcSeen[labelString] = false
  961. }
  962. }
  963. select {
  964. case <-time.After(time.Minute):
  965. case <-cmme.recordingStop:
  966. cmme.recordingLock.Lock()
  967. cmme.recordingStopping = false
  968. cmme.recordingStop = nil
  969. cmme.recordingLock.Unlock()
  970. return
  971. }
  972. }
  973. }()
  974. return true
  975. }
  976. // Stop halts the metrics emission loop after the current emission is completed
  977. // or if the emission is paused.
  978. func (cmme *CostModelMetricsEmitter) Stop() {
  979. cmme.recordingLock.Lock()
  980. defer cmme.recordingLock.Unlock()
  981. if !cmme.recordingStopping && cmme.recordingStop != nil {
  982. cmme.recordingStopping = true
  983. close(cmme.recordingStop)
  984. }
  985. }