metrics.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876
  1. package costmodel
  2. import (
  3. "math"
  4. "strconv"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/clustercache"
  9. "github.com/opencost/opencost/core/pkg/clusters"
  10. coreenv "github.com/opencost/opencost/core/pkg/env"
  11. "github.com/opencost/opencost/core/pkg/errors"
  12. "github.com/opencost/opencost/core/pkg/log"
  13. "github.com/opencost/opencost/core/pkg/source"
  14. "github.com/opencost/opencost/core/pkg/util"
  15. "github.com/opencost/opencost/core/pkg/util/atomic"
  16. "github.com/opencost/opencost/core/pkg/util/promutil"
  17. "github.com/opencost/opencost/pkg/cloud/models"
  18. "github.com/opencost/opencost/pkg/env"
  19. "github.com/opencost/opencost/pkg/metrics"
  20. promclient "github.com/prometheus/client_golang/api"
  21. "github.com/prometheus/client_golang/prometheus"
  22. dto "github.com/prometheus/client_model/go"
  23. v1 "k8s.io/api/core/v1"
  24. )
  25. //--------------------------------------------------------------------------
  26. // ClusterInfoCollector
  27. //--------------------------------------------------------------------------
  28. // ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
  29. type ClusterInfoCollector struct {
  30. ClusterInfo clusters.ClusterInfoProvider
  31. metricsConfig metrics.MetricsConfig
  32. }
  33. // Describe sends the super-set of all possible descriptors of metrics
  34. // collected by this Collector.
  35. func (cic ClusterInfoCollector) Describe(ch chan<- *prometheus.Desc) {
  36. disabledMetrics := cic.metricsConfig.GetDisabledMetricsMap()
  37. if _, disabled := disabledMetrics["kubecost_cluster_info"]; disabled {
  38. return
  39. }
  40. ch <- prometheus.NewDesc("kubecost_cluster_info", "Kubecost Cluster Info", []string{}, nil)
  41. }
  42. // Collect is called by the Prometheus registry when collecting metrics.
  43. func (cic ClusterInfoCollector) Collect(ch chan<- prometheus.Metric) {
  44. disabledMetrics := cic.metricsConfig.GetDisabledMetricsMap()
  45. if _, disabled := disabledMetrics["kubecost_cluster_info"]; disabled {
  46. return
  47. }
  48. clusterInfo := cic.ClusterInfo.GetClusterInfo()
  49. labels := promutil.MapToLabels(clusterInfo)
  50. m := newClusterInfoMetric("kubecost_cluster_info", labels)
  51. ch <- m
  52. }
  53. //--------------------------------------------------------------------------
  54. // ClusterInfoMetric
  55. //--------------------------------------------------------------------------
  56. // ClusterInfoMetric is a prometheus.Metric used to encode the local cluster info
  57. type ClusterInfoMetric struct {
  58. fqName string
  59. help string
  60. labels map[string]string
  61. }
  62. // Creates a new ClusterInfoMetric, implementation of prometheus.Metric
  63. func newClusterInfoMetric(fqName string, labels map[string]string) ClusterInfoMetric {
  64. return ClusterInfoMetric{
  65. fqName: fqName,
  66. labels: labels,
  67. help: "kubecost_cluster_info ClusterInfo",
  68. }
  69. }
  70. // Desc returns the descriptor for the Metric. This method idempotently
  71. // returns the same descriptor throughout the lifetime of the Metric.
  72. func (cim ClusterInfoMetric) Desc() *prometheus.Desc {
  73. l := prometheus.Labels{}
  74. return prometheus.NewDesc(cim.fqName, cim.help, promutil.LabelNamesFrom(cim.labels), l)
  75. }
  76. // Write encodes the Metric into a "Metric" Protocol Buffer data
  77. // transmission object.
  78. func (cim ClusterInfoMetric) Write(m *dto.Metric) error {
  79. h := float64(1)
  80. m.Gauge = &dto.Gauge{
  81. Value: &h,
  82. }
  83. var labels []*dto.LabelPair
  84. for k, v := range cim.labels {
  85. labels = append(labels, &dto.LabelPair{
  86. Name: toStringPtr(k),
  87. Value: toStringPtr(v),
  88. })
  89. }
  90. m.Label = labels
  91. return nil
  92. }
  93. // returns a pointer to the string provided
  94. func toStringPtr(s string) *string { return &s }
  95. //--------------------------------------------------------------------------
  96. // Cost Model Metrics Initialization
  97. //--------------------------------------------------------------------------
  98. // Only allow the metrics to be instantiated and registered once
  99. var metricsInit sync.Once
  100. var (
  101. cpuGv *prometheus.GaugeVec
  102. ramGv *prometheus.GaugeVec
  103. gpuGv *prometheus.GaugeVec
  104. gpuCountGv *prometheus.GaugeVec
  105. pvGv *prometheus.GaugeVec
  106. spotGv *prometheus.GaugeVec
  107. totalGv *prometheus.GaugeVec
  108. ramAllocGv *prometheus.GaugeVec
  109. cpuAllocGv *prometheus.GaugeVec
  110. gpuAllocGv *prometheus.GaugeVec
  111. pvAllocGv *prometheus.GaugeVec
  112. networkZoneEgressCostG prometheus.Gauge
  113. networkRegionEgressCostG prometheus.Gauge
  114. networkInternetEgressCostG prometheus.Gauge
  115. networkNatGatewayEgressCostG prometheus.Gauge
  116. networkNatGatewayIngressCostG prometheus.Gauge
  117. clusterManagementCostGv *prometheus.GaugeVec
  118. lbCostGv *prometheus.GaugeVec
  119. )
  120. // initCostModelMetrics uses a sync.Once to ensure that these metrics are only created once
  121. func initCostModelMetrics(clusterInfo clusters.ClusterInfoProvider, metricsConfig *metrics.MetricsConfig) {
  122. disabledMetrics := metricsConfig.GetDisabledMetricsMap()
  123. var toRegisterGV []*prometheus.GaugeVec
  124. var toRegisterGauge []prometheus.Gauge
  125. metricsInit.Do(func() {
  126. cpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  127. Name: "node_cpu_hourly_cost",
  128. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  129. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  130. if _, disabled := disabledMetrics["node_cpu_hourly_cost"]; !disabled {
  131. toRegisterGV = append(toRegisterGV, cpuGv)
  132. }
  133. ramGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  134. Name: "node_ram_hourly_cost",
  135. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  136. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  137. if _, disabled := disabledMetrics["node_ram_hourly_cost"]; !disabled {
  138. toRegisterGV = append(toRegisterGV, ramGv)
  139. }
  140. gpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  141. Name: "node_gpu_hourly_cost",
  142. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  143. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  144. if _, disabled := disabledMetrics["node_gpu_hourly_cost"]; !disabled {
  145. toRegisterGV = append(toRegisterGV, gpuGv)
  146. }
  147. gpuCountGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  148. Name: "node_gpu_count",
  149. Help: "node_gpu_count count of gpu on this node",
  150. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  151. if _, disabled := disabledMetrics["node_gpu_count"]; !disabled {
  152. toRegisterGV = append(toRegisterGV, gpuCountGv)
  153. }
  154. pvGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  155. Name: "pv_hourly_cost",
  156. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  157. }, []string{"volumename", "persistentvolume", "provider_id", "uid"})
  158. if _, disabled := disabledMetrics["pv_hourly_cost"]; !disabled {
  159. toRegisterGV = append(toRegisterGV, pvGv)
  160. }
  161. spotGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  162. Name: "kubecost_node_is_spot",
  163. Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
  164. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  165. if _, disabled := disabledMetrics["kubecost_node_is_spot"]; !disabled {
  166. toRegisterGV = append(toRegisterGV, spotGv)
  167. }
  168. totalGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  169. Name: "node_total_hourly_cost",
  170. Help: "node_total_hourly_cost Total node cost per hour",
  171. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  172. if _, disabled := disabledMetrics["node_total_hourly_cost"]; !disabled {
  173. toRegisterGV = append(toRegisterGV, totalGv)
  174. }
  175. ramAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  176. Name: "container_memory_allocation_bytes",
  177. Help: "container_memory_allocation_bytes Bytes of RAM used",
  178. }, []string{"namespace", "pod", "container", "instance", "node", "uid"})
  179. if _, disabled := disabledMetrics["container_memory_allocation_bytes"]; !disabled {
  180. toRegisterGV = append(toRegisterGV, ramAllocGv)
  181. }
  182. cpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  183. Name: "container_cpu_allocation",
  184. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  185. }, []string{"namespace", "pod", "container", "instance", "node", "uid"})
  186. if _, disabled := disabledMetrics["container_cpu_allocation"]; !disabled {
  187. toRegisterGV = append(toRegisterGV, cpuAllocGv)
  188. }
  189. gpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  190. Name: "container_gpu_allocation",
  191. Help: "container_gpu_allocation GPU used",
  192. }, []string{"namespace", "pod", "container", "instance", "node", "uid"})
  193. if _, disabled := disabledMetrics["container_gpu_allocation"]; !disabled {
  194. toRegisterGV = append(toRegisterGV, gpuAllocGv)
  195. }
  196. pvAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  197. Name: "pod_pvc_allocation",
  198. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  199. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume", "uid"})
  200. if _, disabled := disabledMetrics["pod_pvc_allocation"]; !disabled {
  201. toRegisterGV = append(toRegisterGV, pvAllocGv)
  202. }
  203. networkZoneEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  204. Name: "kubecost_network_zone_egress_cost",
  205. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  206. })
  207. if _, disabled := disabledMetrics["kubecost_network_zone_egress_cost"]; !disabled {
  208. toRegisterGauge = append(toRegisterGauge, networkZoneEgressCostG)
  209. }
  210. networkRegionEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  211. Name: "kubecost_network_region_egress_cost",
  212. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  213. })
  214. if _, disabled := disabledMetrics["kubecost_network_region_egress_cost"]; !disabled {
  215. toRegisterGauge = append(toRegisterGauge, networkRegionEgressCostG)
  216. }
  217. networkInternetEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  218. Name: "kubecost_network_internet_egress_cost",
  219. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  220. })
  221. if _, disabled := disabledMetrics["kubecost_network_internet_egress_cost"]; !disabled {
  222. toRegisterGauge = append(toRegisterGauge, networkInternetEgressCostG)
  223. }
  224. networkNatGatewayEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  225. Name: "kubecost_network_nat_gateway_egress_cost",
  226. Help: "kubecost_network_nat_gateway_egress_cost Total cost per GB of nat gateway egress.",
  227. })
  228. if _, disabled := disabledMetrics["kubecost_network_nat_gateway_egress_cost"]; !disabled {
  229. toRegisterGauge = append(toRegisterGauge, networkNatGatewayEgressCostG)
  230. }
  231. networkNatGatewayIngressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  232. Name: "kubecost_network_nat_gateway_ingress_cost",
  233. Help: "kubecost_network_nat_gateway_ingress_cost Total cost per GB of nat gateway ingress.",
  234. })
  235. if _, disabled := disabledMetrics["kubecost_network_nat_gateway_ingress_cost"]; !disabled {
  236. toRegisterGauge = append(toRegisterGauge, networkNatGatewayIngressCostG)
  237. }
  238. clusterManagementCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  239. Name: "kubecost_cluster_management_cost",
  240. Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
  241. }, []string{"provisioner_name"})
  242. if _, disabled := disabledMetrics["kubecost_cluster_management_cost"]; !disabled {
  243. toRegisterGV = append(toRegisterGV, clusterManagementCostGv)
  244. }
  245. lbCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
  246. Name: "kubecost_load_balancer_cost",
  247. Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
  248. }, []string{"ingress_ip", "namespace", "service_name", "uid"}) // assumes one ingress IP per load balancer
  249. if _, disabled := disabledMetrics["kubecost_load_balancer_cost"]; !disabled {
  250. toRegisterGV = append(toRegisterGV, lbCostGv)
  251. }
  252. // Register cost-model metrics for emission
  253. for _, gv := range toRegisterGV {
  254. prometheus.MustRegister(gv)
  255. }
  256. for _, g := range toRegisterGauge {
  257. prometheus.MustRegister(g)
  258. }
  259. // General Metric Collectors
  260. prometheus.MustRegister(ClusterInfoCollector{
  261. ClusterInfo: clusterInfo,
  262. metricsConfig: *metricsConfig,
  263. })
  264. })
  265. }
  266. //--------------------------------------------------------------------------
  267. // CostModelMetricsEmitter
  268. //--------------------------------------------------------------------------
  269. // CostModelMetricsEmitter emits all cost-model specific metrics calculated by
  270. // the CostModel.ComputeCostData() method.
  271. type CostModelMetricsEmitter struct {
  272. PrometheusClient promclient.Client
  273. KubeClusterCache clustercache.ClusterCache
  274. CloudProvider models.Provider
  275. Model *CostModel
  276. // Metrics
  277. CPUPriceRecorder *prometheus.GaugeVec
  278. RAMPriceRecorder *prometheus.GaugeVec
  279. PersistentVolumePriceRecorder *prometheus.GaugeVec
  280. GPUPriceRecorder *prometheus.GaugeVec
  281. GPUCountRecorder *prometheus.GaugeVec
  282. PVAllocationRecorder *prometheus.GaugeVec
  283. NodeSpotRecorder *prometheus.GaugeVec
  284. NodeTotalPriceRecorder *prometheus.GaugeVec
  285. RAMAllocationRecorder *prometheus.GaugeVec
  286. CPUAllocationRecorder *prometheus.GaugeVec
  287. GPUAllocationRecorder *prometheus.GaugeVec
  288. ClusterManagementCostRecorder *prometheus.GaugeVec
  289. LBCostRecorder *prometheus.GaugeVec
  290. NetworkZoneEgressRecorder prometheus.Gauge
  291. NetworkRegionEgressRecorder prometheus.Gauge
  292. NetworkInternetEgressRecorder prometheus.Gauge
  293. NetworkNatGatewayEgressRecorder prometheus.Gauge
  294. NetworkNatGatewayIngressRecorder prometheus.Gauge
  295. // Concurrent Flow Control - Manages the run state of the metric emitter
  296. runState atomic.AtomicRunState
  297. }
  298. // NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
  299. func NewCostModelMetricsEmitter(clusterCache clustercache.ClusterCache, provider models.Provider, clusterInfo clusters.ClusterInfoProvider, model *CostModel) *CostModelMetricsEmitter {
  300. // Get metric configurations, if any
  301. metricsConfig, err := metrics.GetMetricsConfig()
  302. if err != nil {
  303. log.Infof("Failed to get metrics config before init: %s", err)
  304. }
  305. if len(metricsConfig.DisabledMetrics) > 0 {
  306. log.Infof("Starting metrics init with disabled metrics: %v", metricsConfig.DisabledMetrics)
  307. }
  308. // init will only actually execute once to register the custom gauges
  309. initCostModelMetrics(clusterInfo, metricsConfig)
  310. metrics.InitKubeMetrics(clusterInfo, clusterCache, metricsConfig, &metrics.KubeMetricsOpts{
  311. EmitKubecostControllerMetrics: true,
  312. EmitNamespaceAnnotations: coreenv.IsEmitNamespaceAnnotationsMetric(),
  313. EmitPodAnnotations: coreenv.IsEmitPodAnnotationsMetric(),
  314. EmitKubeStateMetrics: coreenv.IsEmitKsmV1Metrics(),
  315. EmitKubeStateMetricsV1Only: coreenv.IsEmitKsmV1MetricsOnly(),
  316. EmitDeprecatedMetrics: coreenv.IsEmitDeprecatedMetrics(),
  317. })
  318. metrics.InitOpencostTelemetry(metricsConfig)
  319. return &CostModelMetricsEmitter{
  320. KubeClusterCache: clusterCache,
  321. CloudProvider: provider,
  322. Model: model,
  323. CPUPriceRecorder: cpuGv,
  324. RAMPriceRecorder: ramGv,
  325. GPUPriceRecorder: gpuGv,
  326. GPUCountRecorder: gpuCountGv,
  327. PersistentVolumePriceRecorder: pvGv,
  328. NodeSpotRecorder: spotGv,
  329. NodeTotalPriceRecorder: totalGv,
  330. RAMAllocationRecorder: ramAllocGv,
  331. CPUAllocationRecorder: cpuAllocGv,
  332. GPUAllocationRecorder: gpuAllocGv,
  333. PVAllocationRecorder: pvAllocGv,
  334. NetworkZoneEgressRecorder: networkZoneEgressCostG,
  335. NetworkRegionEgressRecorder: networkRegionEgressCostG,
  336. NetworkInternetEgressRecorder: networkInternetEgressCostG,
  337. NetworkNatGatewayEgressRecorder: networkNatGatewayEgressCostG,
  338. NetworkNatGatewayIngressRecorder: networkNatGatewayIngressCostG,
  339. ClusterManagementCostRecorder: clusterManagementCostGv,
  340. LBCostRecorder: lbCostGv,
  341. }
  342. }
  343. // IsRunning returns true if metric recording is running.
  344. func (cmme *CostModelMetricsEmitter) IsRunning() bool {
  345. return cmme.runState.IsRunning()
  346. }
  347. // NodeCostAverages tracks a running average of a node's cost attributes.
  348. // The averages are used to detect and discard spurrious outliers.
  349. type NodeCostAverages struct {
  350. CpuCostAverage float64
  351. RamCostAverage float64
  352. NumCpuDataPoints float64
  353. NumRamDataPoints float64
  354. }
  355. // StartCostModelMetricRecording starts the go routine that emits metrics used to determine
  356. // cluster costs.
  357. func (cmme *CostModelMetricsEmitter) Start() bool {
  358. // wait for a reset to prevent a race between start and stop calls
  359. cmme.runState.WaitForReset()
  360. // Check to see if we're already recording, and atomically advance the run state to start if we're not
  361. if !cmme.runState.Start() {
  362. log.Errorf("Attempted to start cost model metric recording when it's already running.")
  363. return false
  364. }
  365. go func() {
  366. defer errors.HandlePanic()
  367. containerSeen := make(map[string]bool)
  368. nodeSeen := make(map[string]bool)
  369. loadBalancerSeen := make(map[string]bool)
  370. pvSeen := make(map[string]bool)
  371. pvcSeen := make(map[string]bool)
  372. nodeCostAverages := make(map[string]NodeCostAverages)
  373. getKeyFromLabelStrings := func(labels ...string) string {
  374. return strings.Join(labels, ",")
  375. }
  376. getLabelStringsFromKey := func(key string) []string {
  377. return strings.Split(key, ",")
  378. }
  379. var defaultRegion string = ""
  380. nodeList := cmme.KubeClusterCache.GetAllNodes()
  381. if len(nodeList) > 0 {
  382. var ok bool
  383. defaultRegion, ok = util.GetRegion(nodeList[0].Labels)
  384. if !ok {
  385. log.DedupedWarningf(5, "Failed to read default region from labels on node %s", nodeList[0].Name)
  386. }
  387. }
  388. for {
  389. log.Debugf("Recording prices...")
  390. podlist := cmme.KubeClusterCache.GetAllPods()
  391. podStatus := make(map[string]v1.PodPhase)
  392. podUIDs := make(map[string]string)
  393. for _, pod := range podlist {
  394. podStatus[pod.Name] = pod.Status.Phase
  395. podUIDs[pod.Name] = string(pod.UID)
  396. }
  397. // Create node UID lookup map
  398. nodeList := cmme.KubeClusterCache.GetAllNodes()
  399. nodeUIDs := make(map[string]string)
  400. for _, node := range nodeList {
  401. nodeUIDs[node.Name] = string(node.UID)
  402. }
  403. // Create PV UID lookup map
  404. pvList := cmme.KubeClusterCache.GetAllPersistentVolumes()
  405. pvUIDs := make(map[string]string)
  406. for _, pv := range pvList {
  407. pvUIDs[pv.Name] = string(pv.UID)
  408. }
  409. // Create service UID lookup map
  410. serviceList := cmme.KubeClusterCache.GetAllServices()
  411. serviceUIDs := make(map[string]string)
  412. for _, service := range serviceList {
  413. serviceKey := service.Namespace + "/" + service.Name
  414. serviceUIDs[serviceKey] = string(service.UID)
  415. }
  416. cfg, _ := cmme.CloudProvider.GetConfig()
  417. provisioner, clusterManagementCost, err := cmme.CloudProvider.ClusterManagementPricing()
  418. if err != nil {
  419. log.Errorf("Error getting cluster management cost %s", err.Error())
  420. }
  421. cmme.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
  422. // Record network pricing at global scope
  423. networkCosts, err := cmme.CloudProvider.NetworkPricing()
  424. if err != nil {
  425. log.Debugf("Failed to retrieve network costs: %s", err.Error())
  426. } else {
  427. cmme.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  428. cmme.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  429. cmme.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  430. cmme.NetworkNatGatewayEgressRecorder.Set(networkCosts.NatGatewayEgressCost)
  431. cmme.NetworkNatGatewayIngressRecorder.Set(networkCosts.NatGatewayIngressCost)
  432. }
  433. end := time.Now()
  434. queryWindow := env.GetMetricsEmitterQueryWindow()
  435. start := end.Add(-queryWindow)
  436. data, err := cmme.Model.ComputeCostData(start, end)
  437. if err != nil {
  438. // For an error collection, we'll just log the length of the errors (ComputeCostData already logs the
  439. // actual errors)
  440. if source.IsErrorCollection(err) {
  441. if ec, ok := err.(source.QueryErrorCollection); ok {
  442. log.Errorf("Error in price recording: %d errors occurred", len(ec.Errors()))
  443. }
  444. } else {
  445. log.Errorf("Error in price recording: %s", err)
  446. }
  447. // zero the for loop so the time.Sleep will still work
  448. data = map[string]*CostData{}
  449. }
  450. nodes, err := cmme.Model.GetNodeCost()
  451. if err != nil {
  452. log.Warnf("Error getting Node cost: %s", err)
  453. }
  454. for nodeName, node := range nodes {
  455. // Get node UID first
  456. nodeUID := nodeUIDs[nodeName]
  457. // Emit costs, guarding against NaN inputs for custom pricing.
  458. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  459. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  460. cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
  461. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  462. cpuCost = 0
  463. }
  464. }
  465. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  466. if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
  467. cpu = 1 // Assume 1 CPU
  468. }
  469. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  470. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  471. ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
  472. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  473. ramCost = 0
  474. }
  475. }
  476. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  477. if math.IsNaN(ram) || math.IsInf(ram, 0) {
  478. ram = 0
  479. }
  480. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  481. if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
  482. gpu = 0
  483. }
  484. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  485. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  486. gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
  487. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  488. gpuCost = 0
  489. }
  490. }
  491. nodeType := node.InstanceType
  492. nodeRegion := node.Region
  493. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  494. labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID)
  495. avgCosts, ok := nodeCostAverages[labelKey]
  496. // initialize average cost tracking for this node if there is none
  497. if !ok {
  498. avgCosts = NodeCostAverages{
  499. CpuCostAverage: cpuCost,
  500. RamCostAverage: ramCost,
  501. NumCpuDataPoints: 1,
  502. NumRamDataPoints: 1,
  503. }
  504. nodeCostAverages[labelKey] = avgCosts
  505. }
  506. cmme.GPUCountRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(gpu)
  507. cmme.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(gpuCost)
  508. const outlierFactor float64 = 30
  509. // don't record cpuCost, ramCost, or gpuCost in the case of wild outliers
  510. // k8s api sometimes causes cost spikes as described here:
  511. // https://github.com/opencost/opencost/issues/927
  512. cpuOutlierCutoff := outlierFactor * avgCosts.CpuCostAverage
  513. if cpuCost < cpuOutlierCutoff {
  514. cmme.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(cpuCost)
  515. avgCosts.CpuCostAverage = (avgCosts.CpuCostAverage*avgCosts.NumCpuDataPoints + cpuCost) / (avgCosts.NumCpuDataPoints + 1)
  516. avgCosts.NumCpuDataPoints += 1
  517. } else {
  518. log.Debugf("CPU cost outlier detected; skipping data point: %s had %f as cost, which is above %f.", nodeName, cpuCost, cpuOutlierCutoff)
  519. }
  520. ramOutlierCutoff := outlierFactor * avgCosts.RamCostAverage
  521. if ramCost < ramOutlierCutoff {
  522. cmme.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(ramCost)
  523. avgCosts.RamCostAverage = (avgCosts.RamCostAverage*avgCosts.NumRamDataPoints + ramCost) / (avgCosts.NumRamDataPoints + 1)
  524. avgCosts.NumRamDataPoints += 1
  525. } else {
  526. log.Debugf("RAM cost outlier detected; skipping data point: %s had %f as cost, which is above %f.", nodeName, ramCost, ramOutlierCutoff)
  527. }
  528. // skip redording totalCost if any constituent costs were outliers
  529. if cpuCost < cpuOutlierCutoff && ramCost < ramOutlierCutoff {
  530. cmme.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(totalCost)
  531. } else {
  532. log.Debugf("CPU and RAM outlier detected, not recording node %s total cost %f", nodeName, totalCost)
  533. }
  534. nodeCostAverages[labelKey] = avgCosts
  535. if node.IsSpot() {
  536. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(1.0)
  537. } else {
  538. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(0.0)
  539. }
  540. nodeSeen[labelKey] = true
  541. }
  542. loadBalancers, err := cmme.Model.GetLBCost()
  543. if err != nil {
  544. log.Warnf("Error getting LoadBalancer cost: %s", err)
  545. }
  546. for lbKey, lb := range loadBalancers {
  547. // TODO: parse (if necessary) and calculate cost associated with loadBalancer based on dynamic cloud prices fetched into each lb struct on GetLBCost() call
  548. namespace := lbKey.Namespace
  549. serviceName := lbKey.Service
  550. ingressIP := ""
  551. if len(lb.IngressIPAddresses) > 0 {
  552. ingressIP = lb.IngressIPAddresses[0] // assumes one ingress IP per load balancer
  553. }
  554. serviceKey := namespace + "/" + serviceName
  555. serviceUID := serviceUIDs[serviceKey]
  556. cmme.LBCostRecorder.WithLabelValues(ingressIP, namespace, serviceName, serviceUID).Set(lb.Cost)
  557. labelKey := getKeyFromLabelStrings(ingressIP, namespace, serviceName, serviceUID)
  558. loadBalancerSeen[labelKey] = true
  559. }
  560. for _, costs := range data {
  561. nodeName := costs.NodeName
  562. namespace := costs.Namespace
  563. podName := costs.PodName
  564. containerName := costs.Name
  565. if costs.PVCData != nil {
  566. for _, pvc := range costs.PVCData {
  567. if pvc.Volume != nil {
  568. timesClaimed := pvc.TimesClaimed
  569. if timesClaimed == 0 {
  570. timesClaimed = 1 // unallocated PVs are unclaimed but have a full allocation
  571. }
  572. podUID := podUIDs[podName]
  573. cmme.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName, podUID).Set(pvc.Values[0].Value / float64(timesClaimed))
  574. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName, podUID)
  575. pvcSeen[labelKey] = true
  576. }
  577. }
  578. }
  579. if len(costs.RAMAllocation) > 0 {
  580. podUID := podUIDs[podName]
  581. cmme.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName, podUID).Set(costs.RAMAllocation[0].Value)
  582. }
  583. if len(costs.CPUAllocation) > 0 {
  584. podUID := podUIDs[podName]
  585. cmme.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName, podUID).Set(costs.CPUAllocation[0].Value)
  586. }
  587. if len(costs.GPUReq) > 0 {
  588. // allocation here is set to the request because shared GPU usage not yet supported.
  589. // if VPGUs, request x (actual/virtual)
  590. vgpu := 0.0
  591. gpu := 0.0
  592. var err, verr error
  593. if matchedNode, found := nodes[nodeName]; found {
  594. vgpu, verr = strconv.ParseFloat(matchedNode.VGPU, 64)
  595. gpu, err = strconv.ParseFloat(matchedNode.GPU, 64)
  596. } else {
  597. log.Tracef("cost data for node %s had GPUReq, but there was no cost data available for the node", nodeName)
  598. log.Trace("defaulting GPU to 0 cost")
  599. }
  600. gpualloc := costs.GPUReq[0].Value
  601. if verr != nil && err != nil && vgpu != 0 {
  602. gpualloc = gpualloc * (gpu / vgpu)
  603. }
  604. podUID := podUIDs[podName]
  605. cmme.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName, podUID).Set(gpualloc)
  606. }
  607. podUID := podUIDs[podName]
  608. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName, podUID)
  609. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  610. containerSeen[labelKey] = true
  611. } else {
  612. containerSeen[labelKey] = false
  613. }
  614. }
  615. storageClasses := cmme.KubeClusterCache.GetAllStorageClasses()
  616. storageClassMap := make(map[string]map[string]string)
  617. for _, storageClass := range storageClasses {
  618. params := storageClass.Parameters
  619. storageClassMap[storageClass.Name] = params
  620. if storageClass.Annotations["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.Annotations["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  621. storageClassMap["default"] = params
  622. storageClassMap[""] = params
  623. }
  624. }
  625. pvs := cmme.KubeClusterCache.GetAllPersistentVolumes()
  626. for _, pv := range pvs {
  627. // Omit pv_hourly_cost if the volume status is failed
  628. if pv.Status.Phase == v1.VolumeFailed {
  629. continue
  630. }
  631. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  632. if !ok {
  633. log.Debugf("Unable to find parameters for storage class \"%s\". Pv \"%s\" might have an empty or invalid storageClassName.", pv.Spec.StorageClassName, pv.Name)
  634. }
  635. var region string
  636. if r, ok := util.GetRegion(pv.Labels); ok {
  637. region = r
  638. } else {
  639. region = defaultRegion
  640. }
  641. cacPv := &models.PV{
  642. Class: pv.Spec.StorageClassName,
  643. Region: region,
  644. Parameters: parameters,
  645. }
  646. cmme.Model.GetPVCost(cacPv, pv, region)
  647. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  648. pvUID := pvUIDs[pv.Name]
  649. cmme.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name, cacPv.ProviderID, pvUID).Set(c)
  650. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name, cacPv.ProviderID, pvUID)
  651. pvSeen[labelKey] = true
  652. }
  653. // Remove metrics on Nodes/LoadBalancers/Containers/PVs that no
  654. // longer exist
  655. for labelString, seen := range nodeSeen {
  656. if !seen {
  657. log.Debugf("Removing metrics for %s, no data observed recently", labelString)
  658. labels := getLabelStringsFromKey(labelString)
  659. ok := cmme.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  660. if ok {
  661. log.Debugf("No data observed for node with labels %v, removed from totalprice", labels)
  662. } else {
  663. log.Warnf("Failed to remove label set %v from metric node_total_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  664. }
  665. ok = cmme.NodeSpotRecorder.DeleteLabelValues(labels...)
  666. if ok {
  667. log.Debugf("No data observed for node with labels %v, removed from spot records", labels)
  668. } else {
  669. log.Warnf("Failed to remove label set %v from metric kubecost_node_is_spot. Failure to remove stale metrics may result in inaccurate data.", labels)
  670. }
  671. ok = cmme.CPUPriceRecorder.DeleteLabelValues(labels...)
  672. if ok {
  673. log.Debugf("No data observed for node with labels %v, removed from cpuprice", labels)
  674. } else {
  675. log.Warnf("Failed to remove label set %v from metric node_cpu_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  676. }
  677. ok = cmme.GPUPriceRecorder.DeleteLabelValues(labels...)
  678. if ok {
  679. log.Debugf("No data observed for node with labels %v, removed from gpuprice", labels)
  680. } else {
  681. log.Warnf("Failed to remove label set %v from metric node_gpu_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  682. }
  683. ok = cmme.GPUCountRecorder.DeleteLabelValues(labels...)
  684. if ok {
  685. log.Debugf("No data observed for node with labels %v, removed from gpucount", labels)
  686. } else {
  687. log.Warnf("Failed to remove label set %v from metric node_gpu_count. Failure to remove stale metrics may result in inaccurate data.", labels)
  688. }
  689. ok = cmme.RAMPriceRecorder.DeleteLabelValues(labels...)
  690. if ok {
  691. log.Debugf("No data observed for node with labels %v, removed from ramprice", labels)
  692. } else {
  693. log.Warnf("Failed to remove label set %v from metric node_ram_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  694. }
  695. delete(nodeSeen, labelString)
  696. delete(nodeCostAverages, labelString)
  697. } else {
  698. nodeSeen[labelString] = false
  699. }
  700. }
  701. for labelString, seen := range loadBalancerSeen {
  702. if !seen {
  703. labels := getLabelStringsFromKey(labelString)
  704. ok := cmme.LBCostRecorder.DeleteLabelValues(labels...)
  705. if !ok {
  706. log.Warnf("Failed to remove label set %v from metric kubecost_load_balancer_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  707. }
  708. delete(loadBalancerSeen, labelString)
  709. } else {
  710. loadBalancerSeen[labelString] = false
  711. }
  712. }
  713. for labelString, seen := range containerSeen {
  714. if !seen {
  715. labels := getLabelStringsFromKey(labelString)
  716. if len(labels) >= 2 && labels[1] != unmountedPVsContainer { // special "pod" to contain the unmounted PVs - does not have RAM/CPU/...
  717. ok := cmme.RAMAllocationRecorder.DeleteLabelValues(labels...)
  718. if !ok {
  719. log.Warnf("Failed to remove label set %v from metric container_memory_allocation_bytes. Failure to remove stale metrics may result in inaccurate data.", labels)
  720. }
  721. ok = cmme.CPUAllocationRecorder.DeleteLabelValues(labels...)
  722. if !ok {
  723. log.Warnf("Failed to remove label set %v from metric container_cpu_allocation. Failure to remove stale metrics may result in inaccurate data.", labels)
  724. }
  725. ok = cmme.GPUAllocationRecorder.DeleteLabelValues(labels...)
  726. if !ok {
  727. log.Warnf("Failed to remove label set %v from metric container_gpu_allocation. Failure to remove stale metrics may result in inaccurate data.", labels)
  728. }
  729. } else {
  730. log.Debugf("Did not try to delete RAM/CPU/GPU for fake '%s' container: %v", unmountedPVsContainer, labels)
  731. }
  732. delete(containerSeen, labelString)
  733. } else {
  734. containerSeen[labelString] = false
  735. }
  736. }
  737. for labelString, seen := range pvSeen {
  738. if !seen {
  739. labels := getLabelStringsFromKey(labelString)
  740. ok := cmme.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  741. if !ok {
  742. log.Warnf("Failed to remove label set %v from metric pv_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  743. }
  744. delete(pvSeen, labelString)
  745. } else {
  746. pvSeen[labelString] = false
  747. }
  748. }
  749. for labelString, seen := range pvcSeen {
  750. if !seen {
  751. labels := getLabelStringsFromKey(labelString)
  752. ok := cmme.PVAllocationRecorder.DeleteLabelValues(labels...)
  753. if !ok {
  754. log.Warnf("Failed to remove label set %v from metric pod_pvc_allocation. Failure to remove stale metrics may result in inaccurate data.", labels)
  755. }
  756. delete(pvcSeen, labelString)
  757. } else {
  758. pvcSeen[labelString] = false
  759. }
  760. }
  761. select {
  762. case <-time.After(time.Minute):
  763. case <-cmme.runState.OnStop():
  764. cmme.runState.Reset()
  765. return
  766. }
  767. }
  768. }()
  769. return true
  770. }
  771. // Stop halts the metrics emission loop after the current emission is completed
  772. // or if the emission is paused.
  773. func (cmme *CostModelMetricsEmitter) Stop() {
  774. cmme.runState.Stop()
  775. }