metrics.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
  1. package costmodel
  2. import (
  3. "math"
  4. "strconv"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/pkg/cloud"
  9. "github.com/opencost/opencost/pkg/clustercache"
  10. "github.com/opencost/opencost/pkg/costmodel/clusters"
  11. "github.com/opencost/opencost/pkg/env"
  12. "github.com/opencost/opencost/pkg/errors"
  13. "github.com/opencost/opencost/pkg/log"
  14. "github.com/opencost/opencost/pkg/metrics"
  15. "github.com/opencost/opencost/pkg/prom"
  16. "github.com/opencost/opencost/pkg/util"
  17. "github.com/opencost/opencost/pkg/util/atomic"
  18. promclient "github.com/prometheus/client_golang/api"
  19. "github.com/prometheus/client_golang/prometheus"
  20. dto "github.com/prometheus/client_model/go"
  21. v1 "k8s.io/api/core/v1"
  22. )
  23. //--------------------------------------------------------------------------
  24. // ClusterInfoCollector
  25. //--------------------------------------------------------------------------
  26. // ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
  27. type ClusterInfoCollector struct {
  28. ClusterInfo clusters.ClusterInfoProvider
  29. metricsConfig metrics.MetricsConfig
  30. }
  31. // Describe sends the super-set of all possible descriptors of metrics
  32. // collected by this Collector.
  33. func (cic ClusterInfoCollector) Describe(ch chan<- *prometheus.Desc) {
  34. disabledMetrics := cic.metricsConfig.GetDisabledMetricsMap()
  35. if _, disabled := disabledMetrics["kube_pod_annotations"]; disabled {
  36. return
  37. }
  38. ch <- prometheus.NewDesc("kubecost_cluster_info", "Kubecost Cluster Info", []string{}, nil)
  39. }
  40. // Collect is called by the Prometheus registry when collecting metrics.
  41. func (cic ClusterInfoCollector) Collect(ch chan<- prometheus.Metric) {
  42. disabledMetrics := cic.metricsConfig.GetDisabledMetricsMap()
  43. if _, disabled := disabledMetrics["kube_pod_annotations"]; disabled {
  44. return
  45. }
  46. clusterInfo := cic.ClusterInfo.GetClusterInfo()
  47. labels := prom.MapToLabels(clusterInfo)
  48. m := newClusterInfoMetric("kubecost_cluster_info", labels)
  49. ch <- m
  50. }
  51. //--------------------------------------------------------------------------
  52. // ClusterInfoMetric
  53. //--------------------------------------------------------------------------
  54. // ClusterInfoMetric is a prometheus.Metric used to encode the local cluster info
  55. type ClusterInfoMetric struct {
  56. fqName string
  57. help string
  58. labels map[string]string
  59. }
  60. // Creates a new ClusterInfoMetric, implementation of prometheus.Metric
  61. func newClusterInfoMetric(fqName string, labels map[string]string) ClusterInfoMetric {
  62. return ClusterInfoMetric{
  63. fqName: fqName,
  64. labels: labels,
  65. help: "kubecost_cluster_info ClusterInfo",
  66. }
  67. }
  68. // Desc returns the descriptor for the Metric. This method idempotently
  69. // returns the same descriptor throughout the lifetime of the Metric.
  70. func (cim ClusterInfoMetric) Desc() *prometheus.Desc {
  71. l := prometheus.Labels{}
  72. return prometheus.NewDesc(cim.fqName, cim.help, prom.LabelNamesFrom(cim.labels), l)
  73. }
  74. // Write encodes the Metric into a "Metric" Protocol Buffer data
  75. // transmission object.
  76. func (cim ClusterInfoMetric) Write(m *dto.Metric) error {
  77. h := float64(1)
  78. m.Gauge = &dto.Gauge{
  79. Value: &h,
  80. }
  81. var labels []*dto.LabelPair
  82. for k, v := range cim.labels {
  83. labels = append(labels, &dto.LabelPair{
  84. Name: toStringPtr(k),
  85. Value: toStringPtr(v),
  86. })
  87. }
  88. m.Label = labels
  89. return nil
  90. }
  91. // returns a pointer to the string provided
  92. func toStringPtr(s string) *string { return &s }
  93. //--------------------------------------------------------------------------
  94. // Cost Model Metrics Initialization
  95. //--------------------------------------------------------------------------
  96. // Only allow the metrics to be instantiated and registered once
  97. var metricsInit sync.Once
  98. var (
  99. cpuGv *prometheus.GaugeVec
  100. ramGv *prometheus.GaugeVec
  101. gpuGv *prometheus.GaugeVec
  102. gpuCountGv *prometheus.GaugeVec
  103. pvGv *prometheus.GaugeVec
  104. spotGv *prometheus.GaugeVec
  105. totalGv *prometheus.GaugeVec
  106. ramAllocGv *prometheus.GaugeVec
  107. cpuAllocGv *prometheus.GaugeVec
  108. gpuAllocGv *prometheus.GaugeVec
  109. pvAllocGv *prometheus.GaugeVec
  110. networkZoneEgressCostG prometheus.Gauge
  111. networkRegionEgressCostG prometheus.Gauge
  112. networkInternetEgressCostG prometheus.Gauge
  113. clusterManagementCostGv *prometheus.GaugeVec
  114. lbCostGv *prometheus.GaugeVec
  115. )
  116. // initCostModelMetrics uses a sync.Once to ensure that these metrics are only created once
  117. func initCostModelMetrics(clusterCache clustercache.ClusterCache, provider cloud.Provider, clusterInfo clusters.ClusterInfoProvider, metricsConfig *metrics.MetricsConfig) {
  118. disabledMetrics := metricsConfig.GetDisabledMetricsMap()
  119. var toRegisterGV []*prometheus.GaugeVec
  120. var toRegisterGauge []prometheus.Gauge
  121. metricsInit.Do(func() {
  122. if _, disabled := disabledMetrics["node_cpu_hourly_cost"]; !disabled {
  123. cpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  124. Name: "node_cpu_hourly_cost",
  125. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  126. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  127. toRegisterGV = append(toRegisterGV, cpuGv)
  128. }
  129. if _, disabled := disabledMetrics["node_ram_hourly_cost"]; !disabled {
  130. ramGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  131. Name: "node_ram_hourly_cost",
  132. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  133. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  134. toRegisterGV = append(toRegisterGV, ramGv)
  135. }
  136. if _, disabled := disabledMetrics["node_gpu_hourly_cost"]; !disabled {
  137. gpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  138. Name: "node_gpu_hourly_cost",
  139. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  140. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  141. toRegisterGV = append(toRegisterGV, gpuGv)
  142. }
  143. if _, disabled := disabledMetrics["node_gpu_count"]; !disabled {
  144. gpuCountGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  145. Name: "node_gpu_count",
  146. Help: "node_gpu_count count of gpu on this node",
  147. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  148. toRegisterGV = append(toRegisterGV, gpuCountGv)
  149. }
  150. if _, disabled := disabledMetrics["pv_hourly_cost"]; !disabled {
  151. pvGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  152. Name: "pv_hourly_cost",
  153. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  154. }, []string{"volumename", "persistentvolume", "provider_id"})
  155. toRegisterGV = append(toRegisterGV, pvGv)
  156. }
  157. if _, disabled := disabledMetrics["kubecost_node_is_spot"]; !disabled {
  158. spotGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  159. Name: "kubecost_node_is_spot",
  160. Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
  161. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  162. toRegisterGV = append(toRegisterGV, spotGv)
  163. }
  164. if _, disabled := disabledMetrics["node_total_hourly_cost"]; !disabled {
  165. totalGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  166. Name: "node_total_hourly_cost",
  167. Help: "node_total_hourly_cost Total node cost per hour",
  168. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  169. toRegisterGV = append(toRegisterGV, totalGv)
  170. }
  171. if _, disabled := disabledMetrics["container_memory_allocation_bytes"]; !disabled {
  172. ramAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  173. Name: "container_memory_allocation_bytes",
  174. Help: "container_memory_allocation_bytes Bytes of RAM used",
  175. }, []string{"namespace", "pod", "container", "instance", "node"})
  176. toRegisterGV = append(toRegisterGV, ramAllocGv)
  177. }
  178. if _, disabled := disabledMetrics["container_cpu_allocation"]; !disabled {
  179. cpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  180. Name: "container_cpu_allocation",
  181. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  182. }, []string{"namespace", "pod", "container", "instance", "node"})
  183. toRegisterGV = append(toRegisterGV, cpuAllocGv)
  184. }
  185. if _, disabled := disabledMetrics["container_gpu_allocation"]; !disabled {
  186. gpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  187. Name: "container_gpu_allocation",
  188. Help: "container_gpu_allocation GPU used",
  189. }, []string{"namespace", "pod", "container", "instance", "node"})
  190. toRegisterGV = append(toRegisterGV, gpuAllocGv)
  191. }
  192. if _, disabled := disabledMetrics["pod_pvc_allocation"]; !disabled {
  193. pvAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  194. Name: "pod_pvc_allocation",
  195. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  196. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  197. toRegisterGV = append(toRegisterGV, pvAllocGv)
  198. }
  199. if _, disabled := disabledMetrics["kubecost_network_zone_egress_cost"]; !disabled {
  200. networkZoneEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  201. Name: "kubecost_network_zone_egress_cost",
  202. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  203. })
  204. toRegisterGauge = append(toRegisterGauge, networkZoneEgressCostG)
  205. }
  206. if _, disabled := disabledMetrics["kubecost_network_region_egress_cost"]; !disabled {
  207. networkRegionEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  208. Name: "kubecost_network_region_egress_cost",
  209. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  210. })
  211. toRegisterGauge = append(toRegisterGauge, networkRegionEgressCostG)
  212. }
  213. if _, disabled := disabledMetrics["kubecost_network_internet_egress_cost"]; !disabled {
  214. networkInternetEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  215. Name: "kubecost_network_internet_egress_cost",
  216. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  217. })
  218. toRegisterGauge = append(toRegisterGauge, networkInternetEgressCostG)
  219. }
  220. if _, disabled := disabledMetrics["kubecost_cluster_management_cost"]; !disabled {
  221. clusterManagementCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  222. Name: "kubecost_cluster_management_cost",
  223. Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
  224. }, []string{"provisioner_name"})
  225. toRegisterGV = append(toRegisterGV, clusterManagementCostGv)
  226. }
  227. if _, disabled := disabledMetrics["kubecost_load_balancer_cost"]; !disabled {
  228. lbCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
  229. Name: "kubecost_load_balancer_cost",
  230. Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
  231. }, []string{"ingress_ip", "namespace", "service_name"}) // assumes one ingress IP per load balancer
  232. toRegisterGV = append(toRegisterGV, lbCostGv)
  233. }
  234. // Register cost-model metrics for emission
  235. for _, gv := range toRegisterGV {
  236. prometheus.MustRegister(gv)
  237. }
  238. for _, g := range toRegisterGauge {
  239. prometheus.MustRegister(g)
  240. }
  241. // General Metric Collectors
  242. prometheus.MustRegister(ClusterInfoCollector{
  243. ClusterInfo: clusterInfo,
  244. metricsConfig: *metricsConfig,
  245. })
  246. })
  247. }
  248. //--------------------------------------------------------------------------
  249. // CostModelMetricsEmitter
  250. //--------------------------------------------------------------------------
  251. // CostModelMetricsEmitter emits all cost-model specific metrics calculated by
  252. // the CostModel.ComputeCostData() method.
  253. type CostModelMetricsEmitter struct {
  254. PrometheusClient promclient.Client
  255. KubeClusterCache clustercache.ClusterCache
  256. CloudProvider cloud.Provider
  257. Model *CostModel
  258. // Metrics
  259. CPUPriceRecorder *prometheus.GaugeVec
  260. RAMPriceRecorder *prometheus.GaugeVec
  261. PersistentVolumePriceRecorder *prometheus.GaugeVec
  262. GPUPriceRecorder *prometheus.GaugeVec
  263. GPUCountRecorder *prometheus.GaugeVec
  264. PVAllocationRecorder *prometheus.GaugeVec
  265. NodeSpotRecorder *prometheus.GaugeVec
  266. NodeTotalPriceRecorder *prometheus.GaugeVec
  267. RAMAllocationRecorder *prometheus.GaugeVec
  268. CPUAllocationRecorder *prometheus.GaugeVec
  269. GPUAllocationRecorder *prometheus.GaugeVec
  270. ClusterManagementCostRecorder *prometheus.GaugeVec
  271. LBCostRecorder *prometheus.GaugeVec
  272. NetworkZoneEgressRecorder prometheus.Gauge
  273. NetworkRegionEgressRecorder prometheus.Gauge
  274. NetworkInternetEgressRecorder prometheus.Gauge
  275. // Concurrent Flow Control - Manages the run state of the metric emitter
  276. runState atomic.AtomicRunState
  277. }
  278. // NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
  279. func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clustercache.ClusterCache, provider cloud.Provider, clusterInfo clusters.ClusterInfoProvider, model *CostModel) *CostModelMetricsEmitter {
  280. // Get metric configurations, if any
  281. metricsConfig, err := metrics.GetMetricsConfig()
  282. if err != nil {
  283. log.Infof("Failed to get metrics config before init: %s", err)
  284. }
  285. if len(metricsConfig.DisabledMetrics) > 0 {
  286. log.Infof("Starting metrics init with disabled metrics: %v", metricsConfig.DisabledMetrics)
  287. }
  288. // init will only actually execute once to register the custom gauges
  289. initCostModelMetrics(clusterCache, provider, clusterInfo, metricsConfig)
  290. metrics.InitKubeMetrics(clusterCache, metricsConfig, &metrics.KubeMetricsOpts{
  291. EmitKubecostControllerMetrics: true,
  292. EmitNamespaceAnnotations: env.IsEmitNamespaceAnnotationsMetric(),
  293. EmitPodAnnotations: env.IsEmitPodAnnotationsMetric(),
  294. EmitKubeStateMetrics: env.IsEmitKsmV1Metrics(),
  295. EmitKubeStateMetricsV1Only: env.IsEmitKsmV1MetricsOnly(),
  296. })
  297. metrics.InitKubecostTelemetry(metricsConfig)
  298. return &CostModelMetricsEmitter{
  299. PrometheusClient: promClient,
  300. KubeClusterCache: clusterCache,
  301. CloudProvider: provider,
  302. Model: model,
  303. CPUPriceRecorder: cpuGv,
  304. RAMPriceRecorder: ramGv,
  305. GPUPriceRecorder: gpuGv,
  306. GPUCountRecorder: gpuCountGv,
  307. PersistentVolumePriceRecorder: pvGv,
  308. NodeSpotRecorder: spotGv,
  309. NodeTotalPriceRecorder: totalGv,
  310. RAMAllocationRecorder: ramAllocGv,
  311. CPUAllocationRecorder: cpuAllocGv,
  312. GPUAllocationRecorder: gpuAllocGv,
  313. PVAllocationRecorder: pvAllocGv,
  314. NetworkZoneEgressRecorder: networkZoneEgressCostG,
  315. NetworkRegionEgressRecorder: networkRegionEgressCostG,
  316. NetworkInternetEgressRecorder: networkInternetEgressCostG,
  317. ClusterManagementCostRecorder: clusterManagementCostGv,
  318. LBCostRecorder: lbCostGv,
  319. }
  320. }
  321. // IsRunning returns true if metric recording is running.
  322. func (cmme *CostModelMetricsEmitter) IsRunning() bool {
  323. return cmme.runState.IsRunning()
  324. }
  325. // NodeCostAverages tracks a running average of a node's cost attributes.
  326. // The averages are used to detect and discard spurrious outliers.
  327. type NodeCostAverages struct {
  328. CpuCostAverage float64
  329. RamCostAverage float64
  330. NumCpuDataPoints float64
  331. NumRamDataPoints float64
  332. }
  333. // StartCostModelMetricRecording starts the go routine that emits metrics used to determine
  334. // cluster costs.
  335. func (cmme *CostModelMetricsEmitter) Start() bool {
  336. // wait for a reset to prevent a race between start and stop calls
  337. cmme.runState.WaitForReset()
  338. // Check to see if we're already recording, and atomically advance the run state to start if we're not
  339. if !cmme.runState.Start() {
  340. log.Errorf("Attempted to start cost model metric recording when it's already running.")
  341. return false
  342. }
  343. go func() {
  344. defer errors.HandlePanic()
  345. containerSeen := make(map[string]bool)
  346. nodeSeen := make(map[string]bool)
  347. loadBalancerSeen := make(map[string]bool)
  348. pvSeen := make(map[string]bool)
  349. pvcSeen := make(map[string]bool)
  350. nodeCostAverages := make(map[string]NodeCostAverages)
  351. getKeyFromLabelStrings := func(labels ...string) string {
  352. return strings.Join(labels, ",")
  353. }
  354. getLabelStringsFromKey := func(key string) []string {
  355. return strings.Split(key, ",")
  356. }
  357. var defaultRegion string = ""
  358. nodeList := cmme.KubeClusterCache.GetAllNodes()
  359. if len(nodeList) > 0 {
  360. var ok bool
  361. defaultRegion, ok = util.GetRegion(nodeList[0].Labels)
  362. if !ok {
  363. log.DedupedWarningf(5, "Failed to locate default region")
  364. }
  365. }
  366. for {
  367. log.Debugf("Recording prices...")
  368. podlist := cmme.KubeClusterCache.GetAllPods()
  369. podStatus := make(map[string]v1.PodPhase)
  370. for _, pod := range podlist {
  371. podStatus[pod.Name] = pod.Status.Phase
  372. }
  373. cfg, _ := cmme.CloudProvider.GetConfig()
  374. provisioner, clusterManagementCost, err := cmme.CloudProvider.ClusterManagementPricing()
  375. if err != nil {
  376. log.Errorf("Error getting cluster management cost %s", err.Error())
  377. }
  378. cmme.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
  379. // Record network pricing at global scope
  380. networkCosts, err := cmme.CloudProvider.NetworkPricing()
  381. if err != nil {
  382. log.Debugf("Failed to retrieve network costs: %s", err.Error())
  383. } else {
  384. cmme.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  385. cmme.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  386. cmme.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  387. }
  388. // TODO: Pass PrometheusClient and CloudProvider into CostModel on instantiation so this isn't so awkward
  389. data, err := cmme.Model.ComputeCostData(cmme.PrometheusClient, cmme.CloudProvider, "2m", "", "")
  390. if err != nil {
  391. // For an error collection, we'll just log the length of the errors (ComputeCostData already logs the
  392. // actual errors)
  393. if prom.IsErrorCollection(err) {
  394. if ec, ok := err.(prom.QueryErrorCollection); ok {
  395. log.Errorf("Error in price recording: %d errors occurred", len(ec.Errors()))
  396. }
  397. } else {
  398. log.Errorf("Error in price recording: " + err.Error())
  399. }
  400. // zero the for loop so the time.Sleep will still work
  401. data = map[string]*CostData{}
  402. }
  403. // TODO: Pass CloudProvider into CostModel on instantiation so this isn't so awkward
  404. nodes, err := cmme.Model.GetNodeCost(cmme.CloudProvider)
  405. if err != nil {
  406. log.Warnf("Metric emission: error getting Node cost: %s", err)
  407. }
  408. for nodeName, node := range nodes {
  409. // Emit costs, guarding against NaN inputs for custom pricing.
  410. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  411. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  412. cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
  413. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  414. cpuCost = 0
  415. }
  416. }
  417. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  418. if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
  419. cpu = 1 // Assume 1 CPU
  420. }
  421. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  422. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  423. ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
  424. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  425. ramCost = 0
  426. }
  427. }
  428. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  429. if math.IsNaN(ram) || math.IsInf(ram, 0) {
  430. ram = 0
  431. }
  432. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  433. if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
  434. gpu = 0
  435. }
  436. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  437. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  438. gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
  439. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  440. gpuCost = 0
  441. }
  442. }
  443. nodeType := node.InstanceType
  444. nodeRegion := node.Region
  445. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  446. labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
  447. avgCosts, ok := nodeCostAverages[labelKey]
  448. // initialize average cost tracking for this node if there is none
  449. if !ok {
  450. avgCosts = NodeCostAverages{
  451. CpuCostAverage: cpuCost,
  452. RamCostAverage: ramCost,
  453. NumCpuDataPoints: 1,
  454. NumRamDataPoints: 1,
  455. }
  456. nodeCostAverages[labelKey] = avgCosts
  457. }
  458. cmme.GPUCountRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpu)
  459. cmme.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
  460. const outlierFactor float64 = 30
  461. // don't record cpuCost, ramCost, or gpuCost in the case of wild outliers
  462. // k8s api sometimes causes cost spikes as described here:
  463. // https://github.com/opencost/opencost/issues/927
  464. if cpuCost < outlierFactor*avgCosts.CpuCostAverage {
  465. cmme.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
  466. avgCosts.CpuCostAverage = (avgCosts.CpuCostAverage*avgCosts.NumCpuDataPoints + cpuCost) / (avgCosts.NumCpuDataPoints + 1)
  467. avgCosts.NumCpuDataPoints += 1
  468. } else {
  469. log.Warnf("CPU cost outlier detected; skipping data point.")
  470. }
  471. if ramCost < outlierFactor*avgCosts.RamCostAverage {
  472. cmme.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
  473. avgCosts.RamCostAverage = (avgCosts.RamCostAverage*avgCosts.NumRamDataPoints + ramCost) / (avgCosts.NumRamDataPoints + 1)
  474. avgCosts.NumRamDataPoints += 1
  475. } else {
  476. log.Warnf("RAM cost outlier detected; skipping data point.")
  477. }
  478. // skip redording totalCost if any constituent costs were outliers
  479. if cpuCost < outlierFactor*avgCosts.CpuCostAverage &&
  480. ramCost < outlierFactor*avgCosts.RamCostAverage {
  481. cmme.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
  482. }
  483. nodeCostAverages[labelKey] = avgCosts
  484. if node.IsSpot() {
  485. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(1.0)
  486. } else {
  487. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(0.0)
  488. }
  489. nodeSeen[labelKey] = true
  490. }
  491. // TODO: Pass CloudProvider into CostModel on instantiation so this isn't so awkward
  492. loadBalancers, err := cmme.Model.GetLBCost(cmme.CloudProvider)
  493. if err != nil {
  494. log.Warnf("Metric emission: error getting LoadBalancer cost: %s", err)
  495. }
  496. for lbKey, lb := range loadBalancers {
  497. // TODO: parse (if necessary) and calculate cost associated with loadBalancer based on dynamic cloud prices fetched into each lb struct on GetLBCost() call
  498. namespace := lbKey.Namespace
  499. serviceName := lbKey.Service
  500. ingressIP := ""
  501. if len(lb.IngressIPAddresses) > 0 {
  502. ingressIP = lb.IngressIPAddresses[0] // assumes one ingress IP per load balancer
  503. }
  504. cmme.LBCostRecorder.WithLabelValues(ingressIP, namespace, serviceName).Set(lb.Cost)
  505. labelKey := getKeyFromLabelStrings(ingressIP, namespace, serviceName)
  506. loadBalancerSeen[labelKey] = true
  507. }
  508. for _, costs := range data {
  509. nodeName := costs.NodeName
  510. namespace := costs.Namespace
  511. podName := costs.PodName
  512. containerName := costs.Name
  513. if costs.PVCData != nil {
  514. for _, pvc := range costs.PVCData {
  515. if pvc.Volume != nil {
  516. timesClaimed := pvc.TimesClaimed
  517. if timesClaimed == 0 {
  518. timesClaimed = 1 // unallocated PVs are unclaimed but have a full allocation
  519. }
  520. cmme.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value / float64(timesClaimed))
  521. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  522. pvcSeen[labelKey] = true
  523. }
  524. }
  525. }
  526. if len(costs.RAMAllocation) > 0 {
  527. cmme.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  528. }
  529. if len(costs.CPUAllocation) > 0 {
  530. cmme.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  531. }
  532. if len(costs.GPUReq) > 0 {
  533. // allocation here is set to the request because shared GPU usage not yet supported.
  534. cmme.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  535. }
  536. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  537. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  538. containerSeen[labelKey] = true
  539. } else {
  540. containerSeen[labelKey] = false
  541. }
  542. }
  543. storageClasses := cmme.KubeClusterCache.GetAllStorageClasses()
  544. storageClassMap := make(map[string]map[string]string)
  545. for _, storageClass := range storageClasses {
  546. params := storageClass.Parameters
  547. storageClassMap[storageClass.ObjectMeta.Name] = params
  548. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  549. storageClassMap["default"] = params
  550. storageClassMap[""] = params
  551. }
  552. }
  553. pvs := cmme.KubeClusterCache.GetAllPersistentVolumes()
  554. for _, pv := range pvs {
  555. // Omit pv_hourly_cost if the volume status is failed
  556. if pv.Status.Phase == v1.VolumeFailed {
  557. continue
  558. }
  559. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  560. if !ok {
  561. log.Debugf("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  562. }
  563. var region string
  564. if r, ok := util.GetRegion(pv.Labels); ok {
  565. region = r
  566. } else {
  567. region = defaultRegion
  568. }
  569. cacPv := &cloud.PV{
  570. Class: pv.Spec.StorageClassName,
  571. Region: region,
  572. Parameters: parameters,
  573. }
  574. // TODO: GetPVCost should be a method in CostModel?
  575. GetPVCost(cacPv, pv, cmme.CloudProvider, region)
  576. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  577. cmme.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name, cacPv.ProviderID).Set(c)
  578. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  579. pvSeen[labelKey] = true
  580. }
  581. for labelString, seen := range nodeSeen {
  582. if !seen {
  583. log.Debugf("Removing %s from nodes", labelString)
  584. labels := getLabelStringsFromKey(labelString)
  585. ok := cmme.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  586. if ok {
  587. log.Debugf("removed %s from totalprice", labelString)
  588. } else {
  589. log.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
  590. }
  591. ok = cmme.NodeSpotRecorder.DeleteLabelValues(labels...)
  592. if ok {
  593. log.Debugf("removed %s from spot records", labelString)
  594. } else {
  595. log.Infof("FAILURE TO REMOVE %s from spot records", labelString)
  596. }
  597. ok = cmme.CPUPriceRecorder.DeleteLabelValues(labels...)
  598. if ok {
  599. log.Debugf("removed %s from cpuprice", labelString)
  600. } else {
  601. log.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
  602. }
  603. ok = cmme.GPUPriceRecorder.DeleteLabelValues(labels...)
  604. if ok {
  605. log.Debugf("removed %s from gpuprice", labelString)
  606. } else {
  607. log.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
  608. }
  609. ok = cmme.GPUCountRecorder.DeleteLabelValues(labels...)
  610. if ok {
  611. log.Debugf("removed %s from gpucount", labelString)
  612. } else {
  613. log.Infof("FAILURE TO REMOVE %s from gpucount", labelString)
  614. }
  615. ok = cmme.RAMPriceRecorder.DeleteLabelValues(labels...)
  616. if ok {
  617. log.Debugf("removed %s from ramprice", labelString)
  618. } else {
  619. log.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
  620. }
  621. delete(nodeSeen, labelString)
  622. delete(nodeCostAverages, labelString)
  623. } else {
  624. nodeSeen[labelString] = false
  625. }
  626. }
  627. for labelString, seen := range loadBalancerSeen {
  628. if !seen {
  629. labels := getLabelStringsFromKey(labelString)
  630. ok := cmme.LBCostRecorder.DeleteLabelValues(labels...)
  631. if !ok {
  632. log.Warnf("Metric emission: failed to delete LoadBalancer with labels: %v", labels)
  633. }
  634. delete(loadBalancerSeen, labelString)
  635. } else {
  636. loadBalancerSeen[labelString] = false
  637. }
  638. }
  639. for labelString, seen := range containerSeen {
  640. if !seen {
  641. labels := getLabelStringsFromKey(labelString)
  642. cmme.RAMAllocationRecorder.DeleteLabelValues(labels...)
  643. cmme.CPUAllocationRecorder.DeleteLabelValues(labels...)
  644. cmme.GPUAllocationRecorder.DeleteLabelValues(labels...)
  645. delete(containerSeen, labelString)
  646. } else {
  647. containerSeen[labelString] = false
  648. }
  649. }
  650. for labelString, seen := range pvSeen {
  651. if !seen {
  652. labels := getLabelStringsFromKey(labelString)
  653. cmme.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  654. delete(pvSeen, labelString)
  655. } else {
  656. pvSeen[labelString] = false
  657. }
  658. }
  659. for labelString, seen := range pvcSeen {
  660. if !seen {
  661. labels := getLabelStringsFromKey(labelString)
  662. cmme.PVAllocationRecorder.DeleteLabelValues(labels...)
  663. delete(pvcSeen, labelString)
  664. } else {
  665. pvcSeen[labelString] = false
  666. }
  667. }
  668. select {
  669. case <-time.After(time.Minute):
  670. case <-cmme.runState.OnStop():
  671. cmme.runState.Reset()
  672. return
  673. }
  674. }
  675. }()
  676. return true
  677. }
  678. // Stop halts the metrics emission loop after the current emission is completed
  679. // or if the emission is paused.
  680. func (cmme *CostModelMetricsEmitter) Stop() {
  681. cmme.runState.Stop()
  682. }