metrics.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875
  1. package costmodel
  2. import (
  3. "math"
  4. "strconv"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/clustercache"
  9. "github.com/opencost/opencost/core/pkg/clusters"
  10. "github.com/opencost/opencost/core/pkg/errors"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/source"
  13. "github.com/opencost/opencost/core/pkg/util"
  14. "github.com/opencost/opencost/core/pkg/util/atomic"
  15. "github.com/opencost/opencost/core/pkg/util/promutil"
  16. "github.com/opencost/opencost/pkg/cloud/models"
  17. "github.com/opencost/opencost/pkg/env"
  18. "github.com/opencost/opencost/pkg/metrics"
  19. promclient "github.com/prometheus/client_golang/api"
  20. "github.com/prometheus/client_golang/prometheus"
  21. dto "github.com/prometheus/client_model/go"
  22. v1 "k8s.io/api/core/v1"
  23. )
  24. //--------------------------------------------------------------------------
  25. // ClusterInfoCollector
  26. //--------------------------------------------------------------------------
  27. // ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
  28. type ClusterInfoCollector struct {
  29. ClusterInfo clusters.ClusterInfoProvider
  30. metricsConfig metrics.MetricsConfig
  31. }
  32. // Describe sends the super-set of all possible descriptors of metrics
  33. // collected by this Collector.
  34. func (cic ClusterInfoCollector) Describe(ch chan<- *prometheus.Desc) {
  35. disabledMetrics := cic.metricsConfig.GetDisabledMetricsMap()
  36. if _, disabled := disabledMetrics["kubecost_cluster_info"]; disabled {
  37. return
  38. }
  39. ch <- prometheus.NewDesc("kubecost_cluster_info", "Kubecost Cluster Info", []string{}, nil)
  40. }
  41. // Collect is called by the Prometheus registry when collecting metrics.
  42. func (cic ClusterInfoCollector) Collect(ch chan<- prometheus.Metric) {
  43. disabledMetrics := cic.metricsConfig.GetDisabledMetricsMap()
  44. if _, disabled := disabledMetrics["kubecost_cluster_info"]; disabled {
  45. return
  46. }
  47. clusterInfo := cic.ClusterInfo.GetClusterInfo()
  48. labels := promutil.MapToLabels(clusterInfo)
  49. m := newClusterInfoMetric("kubecost_cluster_info", labels)
  50. ch <- m
  51. }
  52. //--------------------------------------------------------------------------
  53. // ClusterInfoMetric
  54. //--------------------------------------------------------------------------
  55. // ClusterInfoMetric is a prometheus.Metric used to encode the local cluster info
  56. type ClusterInfoMetric struct {
  57. fqName string
  58. help string
  59. labels map[string]string
  60. }
  61. // Creates a new ClusterInfoMetric, implementation of prometheus.Metric
  62. func newClusterInfoMetric(fqName string, labels map[string]string) ClusterInfoMetric {
  63. return ClusterInfoMetric{
  64. fqName: fqName,
  65. labels: labels,
  66. help: "kubecost_cluster_info ClusterInfo",
  67. }
  68. }
  69. // Desc returns the descriptor for the Metric. This method idempotently
  70. // returns the same descriptor throughout the lifetime of the Metric.
  71. func (cim ClusterInfoMetric) Desc() *prometheus.Desc {
  72. l := prometheus.Labels{}
  73. return prometheus.NewDesc(cim.fqName, cim.help, promutil.LabelNamesFrom(cim.labels), l)
  74. }
  75. // Write encodes the Metric into a "Metric" Protocol Buffer data
  76. // transmission object.
  77. func (cim ClusterInfoMetric) Write(m *dto.Metric) error {
  78. h := float64(1)
  79. m.Gauge = &dto.Gauge{
  80. Value: &h,
  81. }
  82. var labels []*dto.LabelPair
  83. for k, v := range cim.labels {
  84. labels = append(labels, &dto.LabelPair{
  85. Name: toStringPtr(k),
  86. Value: toStringPtr(v),
  87. })
  88. }
  89. m.Label = labels
  90. return nil
  91. }
  92. // returns a pointer to the string provided
  93. func toStringPtr(s string) *string { return &s }
  94. //--------------------------------------------------------------------------
  95. // Cost Model Metrics Initialization
  96. //--------------------------------------------------------------------------
  97. // Only allow the metrics to be instantiated and registered once
  98. var metricsInit sync.Once
  99. var (
  100. cpuGv *prometheus.GaugeVec
  101. ramGv *prometheus.GaugeVec
  102. gpuGv *prometheus.GaugeVec
  103. gpuCountGv *prometheus.GaugeVec
  104. pvGv *prometheus.GaugeVec
  105. spotGv *prometheus.GaugeVec
  106. totalGv *prometheus.GaugeVec
  107. ramAllocGv *prometheus.GaugeVec
  108. cpuAllocGv *prometheus.GaugeVec
  109. gpuAllocGv *prometheus.GaugeVec
  110. pvAllocGv *prometheus.GaugeVec
  111. networkZoneEgressCostG prometheus.Gauge
  112. networkRegionEgressCostG prometheus.Gauge
  113. networkInternetEgressCostG prometheus.Gauge
  114. networkNatGatewayEgressCostG prometheus.Gauge
  115. networkNatGatewayIngressCostG prometheus.Gauge
  116. clusterManagementCostGv *prometheus.GaugeVec
  117. lbCostGv *prometheus.GaugeVec
  118. )
  119. // initCostModelMetrics uses a sync.Once to ensure that these metrics are only created once
  120. func initCostModelMetrics(clusterInfo clusters.ClusterInfoProvider, metricsConfig *metrics.MetricsConfig) {
  121. disabledMetrics := metricsConfig.GetDisabledMetricsMap()
  122. var toRegisterGV []*prometheus.GaugeVec
  123. var toRegisterGauge []prometheus.Gauge
  124. metricsInit.Do(func() {
  125. cpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  126. Name: "node_cpu_hourly_cost",
  127. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  128. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  129. if _, disabled := disabledMetrics["node_cpu_hourly_cost"]; !disabled {
  130. toRegisterGV = append(toRegisterGV, cpuGv)
  131. }
  132. ramGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  133. Name: "node_ram_hourly_cost",
  134. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  135. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  136. if _, disabled := disabledMetrics["node_ram_hourly_cost"]; !disabled {
  137. toRegisterGV = append(toRegisterGV, ramGv)
  138. }
  139. gpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  140. Name: "node_gpu_hourly_cost",
  141. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  142. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  143. if _, disabled := disabledMetrics["node_gpu_hourly_cost"]; !disabled {
  144. toRegisterGV = append(toRegisterGV, gpuGv)
  145. }
  146. gpuCountGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  147. Name: "node_gpu_count",
  148. Help: "node_gpu_count count of gpu on this node",
  149. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  150. if _, disabled := disabledMetrics["node_gpu_count"]; !disabled {
  151. toRegisterGV = append(toRegisterGV, gpuCountGv)
  152. }
  153. pvGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  154. Name: "pv_hourly_cost",
  155. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  156. }, []string{"volumename", "persistentvolume", "provider_id", "uid"})
  157. if _, disabled := disabledMetrics["pv_hourly_cost"]; !disabled {
  158. toRegisterGV = append(toRegisterGV, pvGv)
  159. }
  160. spotGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  161. Name: "kubecost_node_is_spot",
  162. Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
  163. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  164. if _, disabled := disabledMetrics["kubecost_node_is_spot"]; !disabled {
  165. toRegisterGV = append(toRegisterGV, spotGv)
  166. }
  167. totalGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  168. Name: "node_total_hourly_cost",
  169. Help: "node_total_hourly_cost Total node cost per hour",
  170. }, []string{"instance", "node", "instance_type", "region", "provider_id", "arch", "uid"})
  171. if _, disabled := disabledMetrics["node_total_hourly_cost"]; !disabled {
  172. toRegisterGV = append(toRegisterGV, totalGv)
  173. }
  174. ramAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  175. Name: "container_memory_allocation_bytes",
  176. Help: "container_memory_allocation_bytes Bytes of RAM used",
  177. }, []string{"namespace", "pod", "container", "instance", "node", "uid"})
  178. if _, disabled := disabledMetrics["container_memory_allocation_bytes"]; !disabled {
  179. toRegisterGV = append(toRegisterGV, ramAllocGv)
  180. }
  181. cpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  182. Name: "container_cpu_allocation",
  183. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  184. }, []string{"namespace", "pod", "container", "instance", "node", "uid"})
  185. if _, disabled := disabledMetrics["container_cpu_allocation"]; !disabled {
  186. toRegisterGV = append(toRegisterGV, cpuAllocGv)
  187. }
  188. gpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  189. Name: "container_gpu_allocation",
  190. Help: "container_gpu_allocation GPU used",
  191. }, []string{"namespace", "pod", "container", "instance", "node", "uid"})
  192. if _, disabled := disabledMetrics["container_gpu_allocation"]; !disabled {
  193. toRegisterGV = append(toRegisterGV, gpuAllocGv)
  194. }
  195. pvAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  196. Name: "pod_pvc_allocation",
  197. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  198. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume", "uid"})
  199. if _, disabled := disabledMetrics["pod_pvc_allocation"]; !disabled {
  200. toRegisterGV = append(toRegisterGV, pvAllocGv)
  201. }
  202. networkZoneEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  203. Name: "kubecost_network_zone_egress_cost",
  204. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  205. })
  206. if _, disabled := disabledMetrics["kubecost_network_zone_egress_cost"]; !disabled {
  207. toRegisterGauge = append(toRegisterGauge, networkZoneEgressCostG)
  208. }
  209. networkRegionEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  210. Name: "kubecost_network_region_egress_cost",
  211. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  212. })
  213. if _, disabled := disabledMetrics["kubecost_network_region_egress_cost"]; !disabled {
  214. toRegisterGauge = append(toRegisterGauge, networkRegionEgressCostG)
  215. }
  216. networkInternetEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  217. Name: "kubecost_network_internet_egress_cost",
  218. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  219. })
  220. if _, disabled := disabledMetrics["kubecost_network_internet_egress_cost"]; !disabled {
  221. toRegisterGauge = append(toRegisterGauge, networkInternetEgressCostG)
  222. }
  223. networkNatGatewayEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  224. Name: "kubecost_network_nat_gateway_egress_cost",
  225. Help: "kubecost_network_nat_gateway_egress_cost Total cost per GB of nat gateway egress.",
  226. })
  227. if _, disabled := disabledMetrics["kubecost_network_nat_gateway_egress_cost"]; !disabled {
  228. toRegisterGauge = append(toRegisterGauge, networkNatGatewayEgressCostG)
  229. }
  230. networkNatGatewayIngressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  231. Name: "kubecost_network_nat_gateway_ingress_cost",
  232. Help: "kubecost_network_nat_gateway_ingress_cost Total cost per GB of nat gateway ingress.",
  233. })
  234. if _, disabled := disabledMetrics["kubecost_network_nat_gateway_ingress_cost"]; !disabled {
  235. toRegisterGauge = append(toRegisterGauge, networkNatGatewayIngressCostG)
  236. }
  237. clusterManagementCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  238. Name: "kubecost_cluster_management_cost",
  239. Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
  240. }, []string{"provisioner_name"})
  241. if _, disabled := disabledMetrics["kubecost_cluster_management_cost"]; !disabled {
  242. toRegisterGV = append(toRegisterGV, clusterManagementCostGv)
  243. }
  244. lbCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
  245. Name: "kubecost_load_balancer_cost",
  246. Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
  247. }, []string{"ingress_ip", "namespace", "service_name", "uid"}) // assumes one ingress IP per load balancer
  248. if _, disabled := disabledMetrics["kubecost_load_balancer_cost"]; !disabled {
  249. toRegisterGV = append(toRegisterGV, lbCostGv)
  250. }
  251. // Register cost-model metrics for emission
  252. for _, gv := range toRegisterGV {
  253. prometheus.MustRegister(gv)
  254. }
  255. for _, g := range toRegisterGauge {
  256. prometheus.MustRegister(g)
  257. }
  258. // General Metric Collectors
  259. prometheus.MustRegister(ClusterInfoCollector{
  260. ClusterInfo: clusterInfo,
  261. metricsConfig: *metricsConfig,
  262. })
  263. })
  264. }
  265. //--------------------------------------------------------------------------
  266. // CostModelMetricsEmitter
  267. //--------------------------------------------------------------------------
  268. // CostModelMetricsEmitter emits all cost-model specific metrics calculated by
  269. // the CostModel.ComputeCostData() method.
  270. type CostModelMetricsEmitter struct {
  271. PrometheusClient promclient.Client
  272. KubeClusterCache clustercache.ClusterCache
  273. CloudProvider models.Provider
  274. Model *CostModel
  275. // Metrics
  276. CPUPriceRecorder *prometheus.GaugeVec
  277. RAMPriceRecorder *prometheus.GaugeVec
  278. PersistentVolumePriceRecorder *prometheus.GaugeVec
  279. GPUPriceRecorder *prometheus.GaugeVec
  280. GPUCountRecorder *prometheus.GaugeVec
  281. PVAllocationRecorder *prometheus.GaugeVec
  282. NodeSpotRecorder *prometheus.GaugeVec
  283. NodeTotalPriceRecorder *prometheus.GaugeVec
  284. RAMAllocationRecorder *prometheus.GaugeVec
  285. CPUAllocationRecorder *prometheus.GaugeVec
  286. GPUAllocationRecorder *prometheus.GaugeVec
  287. ClusterManagementCostRecorder *prometheus.GaugeVec
  288. LBCostRecorder *prometheus.GaugeVec
  289. NetworkZoneEgressRecorder prometheus.Gauge
  290. NetworkRegionEgressRecorder prometheus.Gauge
  291. NetworkInternetEgressRecorder prometheus.Gauge
  292. NetworkNatGatewayEgressRecorder prometheus.Gauge
  293. NetworkNatGatewayIngressRecorder prometheus.Gauge
  294. // Concurrent Flow Control - Manages the run state of the metric emitter
  295. runState atomic.AtomicRunState
  296. }
  297. // NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
  298. func NewCostModelMetricsEmitter(clusterCache clustercache.ClusterCache, provider models.Provider, clusterInfo clusters.ClusterInfoProvider, model *CostModel) *CostModelMetricsEmitter {
  299. // Get metric configurations, if any
  300. metricsConfig, err := metrics.GetMetricsConfig()
  301. if err != nil {
  302. log.Infof("Failed to get metrics config before init: %s", err)
  303. }
  304. if len(metricsConfig.DisabledMetrics) > 0 {
  305. log.Infof("Starting metrics init with disabled metrics: %v", metricsConfig.DisabledMetrics)
  306. }
  307. // init will only actually execute once to register the custom gauges
  308. initCostModelMetrics(clusterInfo, metricsConfig)
  309. metrics.InitKubeMetrics(clusterCache, metricsConfig, &metrics.KubeMetricsOpts{
  310. EmitKubecostControllerMetrics: true,
  311. EmitNamespaceAnnotations: env.IsEmitNamespaceAnnotationsMetric(),
  312. EmitPodAnnotations: env.IsEmitPodAnnotationsMetric(),
  313. EmitKubeStateMetrics: env.IsEmitKsmV1Metrics(),
  314. EmitKubeStateMetricsV1Only: env.IsEmitKsmV1MetricsOnly(),
  315. EmitDeprecatedMetrics: env.IsEmitDeprecatedMetrics(),
  316. })
  317. metrics.InitOpencostTelemetry(metricsConfig)
  318. return &CostModelMetricsEmitter{
  319. KubeClusterCache: clusterCache,
  320. CloudProvider: provider,
  321. Model: model,
  322. CPUPriceRecorder: cpuGv,
  323. RAMPriceRecorder: ramGv,
  324. GPUPriceRecorder: gpuGv,
  325. GPUCountRecorder: gpuCountGv,
  326. PersistentVolumePriceRecorder: pvGv,
  327. NodeSpotRecorder: spotGv,
  328. NodeTotalPriceRecorder: totalGv,
  329. RAMAllocationRecorder: ramAllocGv,
  330. CPUAllocationRecorder: cpuAllocGv,
  331. GPUAllocationRecorder: gpuAllocGv,
  332. PVAllocationRecorder: pvAllocGv,
  333. NetworkZoneEgressRecorder: networkZoneEgressCostG,
  334. NetworkRegionEgressRecorder: networkRegionEgressCostG,
  335. NetworkInternetEgressRecorder: networkInternetEgressCostG,
  336. NetworkNatGatewayEgressRecorder: networkNatGatewayEgressCostG,
  337. NetworkNatGatewayIngressRecorder: networkNatGatewayIngressCostG,
  338. ClusterManagementCostRecorder: clusterManagementCostGv,
  339. LBCostRecorder: lbCostGv,
  340. }
  341. }
  342. // IsRunning returns true if metric recording is running.
  343. func (cmme *CostModelMetricsEmitter) IsRunning() bool {
  344. return cmme.runState.IsRunning()
  345. }
  346. // NodeCostAverages tracks a running average of a node's cost attributes.
  347. // The averages are used to detect and discard spurrious outliers.
  348. type NodeCostAverages struct {
  349. CpuCostAverage float64
  350. RamCostAverage float64
  351. NumCpuDataPoints float64
  352. NumRamDataPoints float64
  353. }
  354. // StartCostModelMetricRecording starts the go routine that emits metrics used to determine
  355. // cluster costs.
  356. func (cmme *CostModelMetricsEmitter) Start() bool {
  357. // wait for a reset to prevent a race between start and stop calls
  358. cmme.runState.WaitForReset()
  359. // Check to see if we're already recording, and atomically advance the run state to start if we're not
  360. if !cmme.runState.Start() {
  361. log.Errorf("Attempted to start cost model metric recording when it's already running.")
  362. return false
  363. }
  364. go func() {
  365. defer errors.HandlePanic()
  366. containerSeen := make(map[string]bool)
  367. nodeSeen := make(map[string]bool)
  368. loadBalancerSeen := make(map[string]bool)
  369. pvSeen := make(map[string]bool)
  370. pvcSeen := make(map[string]bool)
  371. nodeCostAverages := make(map[string]NodeCostAverages)
  372. getKeyFromLabelStrings := func(labels ...string) string {
  373. return strings.Join(labels, ",")
  374. }
  375. getLabelStringsFromKey := func(key string) []string {
  376. return strings.Split(key, ",")
  377. }
  378. var defaultRegion string = ""
  379. nodeList := cmme.KubeClusterCache.GetAllNodes()
  380. if len(nodeList) > 0 {
  381. var ok bool
  382. defaultRegion, ok = util.GetRegion(nodeList[0].Labels)
  383. if !ok {
  384. log.DedupedWarningf(5, "Failed to read default region from labels on node %s", nodeList[0].Name)
  385. }
  386. }
  387. for {
  388. log.Debugf("Recording prices...")
  389. podlist := cmme.KubeClusterCache.GetAllPods()
  390. podStatus := make(map[string]v1.PodPhase)
  391. podUIDs := make(map[string]string)
  392. for _, pod := range podlist {
  393. podStatus[pod.Name] = pod.Status.Phase
  394. podUIDs[pod.Name] = string(pod.UID)
  395. }
  396. // Create node UID lookup map
  397. nodeList := cmme.KubeClusterCache.GetAllNodes()
  398. nodeUIDs := make(map[string]string)
  399. for _, node := range nodeList {
  400. nodeUIDs[node.Name] = string(node.UID)
  401. }
  402. // Create PV UID lookup map
  403. pvList := cmme.KubeClusterCache.GetAllPersistentVolumes()
  404. pvUIDs := make(map[string]string)
  405. for _, pv := range pvList {
  406. pvUIDs[pv.Name] = string(pv.UID)
  407. }
  408. // Create service UID lookup map
  409. serviceList := cmme.KubeClusterCache.GetAllServices()
  410. serviceUIDs := make(map[string]string)
  411. for _, service := range serviceList {
  412. serviceKey := service.Namespace + "/" + service.Name
  413. serviceUIDs[serviceKey] = string(service.UID)
  414. }
  415. cfg, _ := cmme.CloudProvider.GetConfig()
  416. provisioner, clusterManagementCost, err := cmme.CloudProvider.ClusterManagementPricing()
  417. if err != nil {
  418. log.Errorf("Error getting cluster management cost %s", err.Error())
  419. }
  420. cmme.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
  421. // Record network pricing at global scope
  422. networkCosts, err := cmme.CloudProvider.NetworkPricing()
  423. if err != nil {
  424. log.Debugf("Failed to retrieve network costs: %s", err.Error())
  425. } else {
  426. cmme.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  427. cmme.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  428. cmme.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  429. cmme.NetworkNatGatewayEgressRecorder.Set(networkCosts.NatGatewayEgressCost)
  430. cmme.NetworkNatGatewayIngressRecorder.Set(networkCosts.NatGatewayIngressCost)
  431. }
  432. end := time.Now()
  433. queryWindow := env.GetMetricsEmitterQueryWindow()
  434. start := end.Add(-queryWindow)
  435. data, err := cmme.Model.ComputeCostData(start, end)
  436. if err != nil {
  437. // For an error collection, we'll just log the length of the errors (ComputeCostData already logs the
  438. // actual errors)
  439. if source.IsErrorCollection(err) {
  440. if ec, ok := err.(source.QueryErrorCollection); ok {
  441. log.Errorf("Error in price recording: %d errors occurred", len(ec.Errors()))
  442. }
  443. } else {
  444. log.Errorf("Error in price recording: %s", err)
  445. }
  446. // zero the for loop so the time.Sleep will still work
  447. data = map[string]*CostData{}
  448. }
  449. nodes, err := cmme.Model.GetNodeCost()
  450. if err != nil {
  451. log.Warnf("Error getting Node cost: %s", err)
  452. }
  453. for nodeName, node := range nodes {
  454. // Get node UID first
  455. nodeUID := nodeUIDs[nodeName]
  456. // Emit costs, guarding against NaN inputs for custom pricing.
  457. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  458. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  459. cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
  460. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  461. cpuCost = 0
  462. }
  463. }
  464. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  465. if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
  466. cpu = 1 // Assume 1 CPU
  467. }
  468. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  469. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  470. ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
  471. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  472. ramCost = 0
  473. }
  474. }
  475. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  476. if math.IsNaN(ram) || math.IsInf(ram, 0) {
  477. ram = 0
  478. }
  479. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  480. if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
  481. gpu = 0
  482. }
  483. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  484. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  485. gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
  486. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  487. gpuCost = 0
  488. }
  489. }
  490. nodeType := node.InstanceType
  491. nodeRegion := node.Region
  492. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  493. labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID)
  494. avgCosts, ok := nodeCostAverages[labelKey]
  495. // initialize average cost tracking for this node if there is none
  496. if !ok {
  497. avgCosts = NodeCostAverages{
  498. CpuCostAverage: cpuCost,
  499. RamCostAverage: ramCost,
  500. NumCpuDataPoints: 1,
  501. NumRamDataPoints: 1,
  502. }
  503. nodeCostAverages[labelKey] = avgCosts
  504. }
  505. cmme.GPUCountRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(gpu)
  506. cmme.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(gpuCost)
  507. const outlierFactor float64 = 30
  508. // don't record cpuCost, ramCost, or gpuCost in the case of wild outliers
  509. // k8s api sometimes causes cost spikes as described here:
  510. // https://github.com/opencost/opencost/issues/927
  511. cpuOutlierCutoff := outlierFactor * avgCosts.CpuCostAverage
  512. if cpuCost < cpuOutlierCutoff {
  513. cmme.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(cpuCost)
  514. avgCosts.CpuCostAverage = (avgCosts.CpuCostAverage*avgCosts.NumCpuDataPoints + cpuCost) / (avgCosts.NumCpuDataPoints + 1)
  515. avgCosts.NumCpuDataPoints += 1
  516. } else {
  517. log.Debugf("CPU cost outlier detected; skipping data point: %s had %f as cost, which is above %f.", nodeName, cpuCost, cpuOutlierCutoff)
  518. }
  519. ramOutlierCutoff := outlierFactor * avgCosts.RamCostAverage
  520. if ramCost < ramOutlierCutoff {
  521. cmme.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(ramCost)
  522. avgCosts.RamCostAverage = (avgCosts.RamCostAverage*avgCosts.NumRamDataPoints + ramCost) / (avgCosts.NumRamDataPoints + 1)
  523. avgCosts.NumRamDataPoints += 1
  524. } else {
  525. log.Debugf("RAM cost outlier detected; skipping data point: %s had %f as cost, which is above %f.", nodeName, ramCost, ramOutlierCutoff)
  526. }
  527. // skip redording totalCost if any constituent costs were outliers
  528. if cpuCost < cpuOutlierCutoff && ramCost < ramOutlierCutoff {
  529. cmme.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(totalCost)
  530. } else {
  531. log.Debugf("CPU and RAM outlier detected, not recording node %s total cost %f", nodeName, totalCost)
  532. }
  533. nodeCostAverages[labelKey] = avgCosts
  534. if node.IsSpot() {
  535. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(1.0)
  536. } else {
  537. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID, node.ArchType, nodeUID).Set(0.0)
  538. }
  539. nodeSeen[labelKey] = true
  540. }
  541. loadBalancers, err := cmme.Model.GetLBCost()
  542. if err != nil {
  543. log.Warnf("Error getting LoadBalancer cost: %s", err)
  544. }
  545. for lbKey, lb := range loadBalancers {
  546. // TODO: parse (if necessary) and calculate cost associated with loadBalancer based on dynamic cloud prices fetched into each lb struct on GetLBCost() call
  547. namespace := lbKey.Namespace
  548. serviceName := lbKey.Service
  549. ingressIP := ""
  550. if len(lb.IngressIPAddresses) > 0 {
  551. ingressIP = lb.IngressIPAddresses[0] // assumes one ingress IP per load balancer
  552. }
  553. serviceKey := namespace + "/" + serviceName
  554. serviceUID := serviceUIDs[serviceKey]
  555. cmme.LBCostRecorder.WithLabelValues(ingressIP, namespace, serviceName, serviceUID).Set(lb.Cost)
  556. labelKey := getKeyFromLabelStrings(ingressIP, namespace, serviceName, serviceUID)
  557. loadBalancerSeen[labelKey] = true
  558. }
  559. for _, costs := range data {
  560. nodeName := costs.NodeName
  561. namespace := costs.Namespace
  562. podName := costs.PodName
  563. containerName := costs.Name
  564. if costs.PVCData != nil {
  565. for _, pvc := range costs.PVCData {
  566. if pvc.Volume != nil {
  567. timesClaimed := pvc.TimesClaimed
  568. if timesClaimed == 0 {
  569. timesClaimed = 1 // unallocated PVs are unclaimed but have a full allocation
  570. }
  571. podUID := podUIDs[podName]
  572. cmme.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName, podUID).Set(pvc.Values[0].Value / float64(timesClaimed))
  573. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName, podUID)
  574. pvcSeen[labelKey] = true
  575. }
  576. }
  577. }
  578. if len(costs.RAMAllocation) > 0 {
  579. podUID := podUIDs[podName]
  580. cmme.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName, podUID).Set(costs.RAMAllocation[0].Value)
  581. }
  582. if len(costs.CPUAllocation) > 0 {
  583. podUID := podUIDs[podName]
  584. cmme.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName, podUID).Set(costs.CPUAllocation[0].Value)
  585. }
  586. if len(costs.GPUReq) > 0 {
  587. // allocation here is set to the request because shared GPU usage not yet supported.
  588. // if VPGUs, request x (actual/virtual)
  589. vgpu := 0.0
  590. gpu := 0.0
  591. var err, verr error
  592. if matchedNode, found := nodes[nodeName]; found {
  593. vgpu, verr = strconv.ParseFloat(matchedNode.VGPU, 64)
  594. gpu, err = strconv.ParseFloat(matchedNode.GPU, 64)
  595. } else {
  596. log.Tracef("cost data for node %s had GPUReq, but there was no cost data available for the node", nodeName)
  597. log.Trace("defaulting GPU to 0 cost")
  598. }
  599. gpualloc := costs.GPUReq[0].Value
  600. if verr != nil && err != nil && vgpu != 0 {
  601. gpualloc = gpualloc * (gpu / vgpu)
  602. }
  603. podUID := podUIDs[podName]
  604. cmme.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName, podUID).Set(gpualloc)
  605. }
  606. podUID := podUIDs[podName]
  607. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName, podUID)
  608. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  609. containerSeen[labelKey] = true
  610. } else {
  611. containerSeen[labelKey] = false
  612. }
  613. }
  614. storageClasses := cmme.KubeClusterCache.GetAllStorageClasses()
  615. storageClassMap := make(map[string]map[string]string)
  616. for _, storageClass := range storageClasses {
  617. params := storageClass.Parameters
  618. storageClassMap[storageClass.Name] = params
  619. if storageClass.Annotations["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.Annotations["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  620. storageClassMap["default"] = params
  621. storageClassMap[""] = params
  622. }
  623. }
  624. pvs := cmme.KubeClusterCache.GetAllPersistentVolumes()
  625. for _, pv := range pvs {
  626. // Omit pv_hourly_cost if the volume status is failed
  627. if pv.Status.Phase == v1.VolumeFailed {
  628. continue
  629. }
  630. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  631. if !ok {
  632. log.Debugf("Unable to find parameters for storage class \"%s\". Pv \"%s\" might have an empty or invalid storageClassName.", pv.Spec.StorageClassName, pv.Name)
  633. }
  634. var region string
  635. if r, ok := util.GetRegion(pv.Labels); ok {
  636. region = r
  637. } else {
  638. region = defaultRegion
  639. }
  640. cacPv := &models.PV{
  641. Class: pv.Spec.StorageClassName,
  642. Region: region,
  643. Parameters: parameters,
  644. }
  645. cmme.Model.GetPVCost(cacPv, pv, region)
  646. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  647. pvUID := pvUIDs[pv.Name]
  648. cmme.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name, cacPv.ProviderID, pvUID).Set(c)
  649. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name, cacPv.ProviderID, pvUID)
  650. pvSeen[labelKey] = true
  651. }
  652. // Remove metrics on Nodes/LoadBalancers/Containers/PVs that no
  653. // longer exist
  654. for labelString, seen := range nodeSeen {
  655. if !seen {
  656. log.Debugf("Removing metrics for %s, no data observed recently", labelString)
  657. labels := getLabelStringsFromKey(labelString)
  658. ok := cmme.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  659. if ok {
  660. log.Debugf("No data observed for node with labels %v, removed from totalprice", labels)
  661. } else {
  662. log.Warnf("Failed to remove label set %v from metric node_total_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  663. }
  664. ok = cmme.NodeSpotRecorder.DeleteLabelValues(labels...)
  665. if ok {
  666. log.Debugf("No data observed for node with labels %v, removed from spot records", labels)
  667. } else {
  668. log.Warnf("Failed to remove label set %v from metric kubecost_node_is_spot. Failure to remove stale metrics may result in inaccurate data.", labels)
  669. }
  670. ok = cmme.CPUPriceRecorder.DeleteLabelValues(labels...)
  671. if ok {
  672. log.Debugf("No data observed for node with labels %v, removed from cpuprice", labels)
  673. } else {
  674. log.Warnf("Failed to remove label set %v from metric node_cpu_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  675. }
  676. ok = cmme.GPUPriceRecorder.DeleteLabelValues(labels...)
  677. if ok {
  678. log.Debugf("No data observed for node with labels %v, removed from gpuprice", labels)
  679. } else {
  680. log.Warnf("Failed to remove label set %v from metric node_gpu_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  681. }
  682. ok = cmme.GPUCountRecorder.DeleteLabelValues(labels...)
  683. if ok {
  684. log.Debugf("No data observed for node with labels %v, removed from gpucount", labels)
  685. } else {
  686. log.Warnf("Failed to remove label set %v from metric node_gpu_count. Failure to remove stale metrics may result in inaccurate data.", labels)
  687. }
  688. ok = cmme.RAMPriceRecorder.DeleteLabelValues(labels...)
  689. if ok {
  690. log.Debugf("No data observed for node with labels %v, removed from ramprice", labels)
  691. } else {
  692. log.Warnf("Failed to remove label set %v from metric node_ram_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  693. }
  694. delete(nodeSeen, labelString)
  695. delete(nodeCostAverages, labelString)
  696. } else {
  697. nodeSeen[labelString] = false
  698. }
  699. }
  700. for labelString, seen := range loadBalancerSeen {
  701. if !seen {
  702. labels := getLabelStringsFromKey(labelString)
  703. ok := cmme.LBCostRecorder.DeleteLabelValues(labels...)
  704. if !ok {
  705. log.Warnf("Failed to remove label set %v from metric kubecost_load_balancer_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  706. }
  707. delete(loadBalancerSeen, labelString)
  708. } else {
  709. loadBalancerSeen[labelString] = false
  710. }
  711. }
  712. for labelString, seen := range containerSeen {
  713. if !seen {
  714. labels := getLabelStringsFromKey(labelString)
  715. if len(labels) >= 2 && labels[1] != unmountedPVsContainer { // special "pod" to contain the unmounted PVs - does not have RAM/CPU/...
  716. ok := cmme.RAMAllocationRecorder.DeleteLabelValues(labels...)
  717. if !ok {
  718. log.Warnf("Failed to remove label set %v from metric container_memory_allocation_bytes. Failure to remove stale metrics may result in inaccurate data.", labels)
  719. }
  720. ok = cmme.CPUAllocationRecorder.DeleteLabelValues(labels...)
  721. if !ok {
  722. log.Warnf("Failed to remove label set %v from metric container_cpu_allocation. Failure to remove stale metrics may result in inaccurate data.", labels)
  723. }
  724. ok = cmme.GPUAllocationRecorder.DeleteLabelValues(labels...)
  725. if !ok {
  726. log.Warnf("Failed to remove label set %v from metric container_gpu_allocation. Failure to remove stale metrics may result in inaccurate data.", labels)
  727. }
  728. } else {
  729. log.Debugf("Did not try to delete RAM/CPU/GPU for fake '%s' container: %v", unmountedPVsContainer, labels)
  730. }
  731. delete(containerSeen, labelString)
  732. } else {
  733. containerSeen[labelString] = false
  734. }
  735. }
  736. for labelString, seen := range pvSeen {
  737. if !seen {
  738. labels := getLabelStringsFromKey(labelString)
  739. ok := cmme.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  740. if !ok {
  741. log.Warnf("Failed to remove label set %v from metric pv_hourly_cost. Failure to remove stale metrics may result in inaccurate data.", labels)
  742. }
  743. delete(pvSeen, labelString)
  744. } else {
  745. pvSeen[labelString] = false
  746. }
  747. }
  748. for labelString, seen := range pvcSeen {
  749. if !seen {
  750. labels := getLabelStringsFromKey(labelString)
  751. ok := cmme.PVAllocationRecorder.DeleteLabelValues(labels...)
  752. if !ok {
  753. log.Warnf("Failed to remove label set %v from metric pod_pvc_allocation. Failure to remove stale metrics may result in inaccurate data.", labels)
  754. }
  755. delete(pvcSeen, labelString)
  756. } else {
  757. pvcSeen[labelString] = false
  758. }
  759. }
  760. select {
  761. case <-time.After(time.Minute):
  762. case <-cmme.runState.OnStop():
  763. cmme.runState.Reset()
  764. return
  765. }
  766. }
  767. }()
  768. return true
  769. }
  770. // Stop halts the metrics emission loop after the current emission is completed
  771. // or if the emission is paused.
  772. func (cmme *CostModelMetricsEmitter) Stop() {
  773. cmme.runState.Stop()
  774. }