metrics.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. package costmodel
  2. import (
  3. "math"
  4. "strconv"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/clustercache"
  10. "github.com/kubecost/cost-model/pkg/env"
  11. "github.com/kubecost/cost-model/pkg/errors"
  12. "github.com/kubecost/cost-model/pkg/log"
  13. "github.com/kubecost/cost-model/pkg/metrics"
  14. "github.com/kubecost/cost-model/pkg/prom"
  15. "github.com/kubecost/cost-model/pkg/util"
  16. promclient "github.com/prometheus/client_golang/api"
  17. "github.com/prometheus/client_golang/prometheus"
  18. dto "github.com/prometheus/client_model/go"
  19. v1 "k8s.io/api/core/v1"
  20. "k8s.io/client-go/kubernetes"
  21. "k8s.io/klog"
  22. )
  23. //--------------------------------------------------------------------------
  24. // ClusterInfoCollector
  25. //--------------------------------------------------------------------------
  26. // ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
  27. type ClusterInfoCollector struct {
  28. Cloud cloud.Provider
  29. KubeClientSet kubernetes.Interface
  30. }
  31. // Describe sends the super-set of all possible descriptors of metrics
  32. // collected by this Collector.
  33. func (cic ClusterInfoCollector) Describe(ch chan<- *prometheus.Desc) {
  34. ch <- prometheus.NewDesc("kubecost_cluster_info", "Kubecost Cluster Info", []string{}, nil)
  35. }
  36. // Collect is called by the Prometheus registry when collecting metrics.
  37. func (cic ClusterInfoCollector) Collect(ch chan<- prometheus.Metric) {
  38. clusterInfo := GetClusterInfo(cic.KubeClientSet, cic.Cloud)
  39. labels := prom.MapToLabels(clusterInfo)
  40. m := newClusterInfoMetric("kubecost_cluster_info", labels)
  41. ch <- m
  42. }
  43. //--------------------------------------------------------------------------
  44. // ClusterInfoMetric
  45. //--------------------------------------------------------------------------
  46. // ClusterInfoMetric is a prometheus.Metric used to encode the local cluster info
  47. type ClusterInfoMetric struct {
  48. fqName string
  49. help string
  50. labels map[string]string
  51. }
  52. // Creates a new ClusterInfoMetric, implementation of prometheus.Metric
  53. func newClusterInfoMetric(fqName string, labels map[string]string) ClusterInfoMetric {
  54. return ClusterInfoMetric{
  55. fqName: fqName,
  56. labels: labels,
  57. help: "kubecost_cluster_info ClusterInfo",
  58. }
  59. }
  60. // Desc returns the descriptor for the Metric. This method idempotently
  61. // returns the same descriptor throughout the lifetime of the Metric.
  62. func (cim ClusterInfoMetric) Desc() *prometheus.Desc {
  63. l := prometheus.Labels{}
  64. return prometheus.NewDesc(cim.fqName, cim.help, prom.LabelNamesFrom(cim.labels), l)
  65. }
  66. // Write encodes the Metric into a "Metric" Protocol Buffer data
  67. // transmission object.
  68. func (cim ClusterInfoMetric) Write(m *dto.Metric) error {
  69. h := float64(1)
  70. m.Gauge = &dto.Gauge{
  71. Value: &h,
  72. }
  73. var labels []*dto.LabelPair
  74. for k, v := range cim.labels {
  75. labels = append(labels, &dto.LabelPair{
  76. Name: toStringPtr(k),
  77. Value: toStringPtr(v),
  78. })
  79. }
  80. m.Label = labels
  81. return nil
  82. }
  83. // returns a pointer to the string provided
  84. func toStringPtr(s string) *string { return &s }
  85. //--------------------------------------------------------------------------
  86. // Cost Model Metrics Initialization
  87. //--------------------------------------------------------------------------
  88. // Only allow the metrics to be instantiated and registered once
  89. var metricsInit sync.Once
  90. var (
  91. cpuGv *prometheus.GaugeVec
  92. ramGv *prometheus.GaugeVec
  93. gpuGv *prometheus.GaugeVec
  94. gpuCountGv *prometheus.GaugeVec
  95. pvGv *prometheus.GaugeVec
  96. spotGv *prometheus.GaugeVec
  97. totalGv *prometheus.GaugeVec
  98. ramAllocGv *prometheus.GaugeVec
  99. cpuAllocGv *prometheus.GaugeVec
  100. gpuAllocGv *prometheus.GaugeVec
  101. pvAllocGv *prometheus.GaugeVec
  102. networkZoneEgressCostG prometheus.Gauge
  103. networkRegionEgressCostG prometheus.Gauge
  104. networkInternetEgressCostG prometheus.Gauge
  105. clusterManagementCostGv *prometheus.GaugeVec
  106. lbCostGv *prometheus.GaugeVec
  107. )
  108. // initCostModelMetrics uses a sync.Once to ensure that these metrics are only created once
  109. func initCostModelMetrics(clusterCache clustercache.ClusterCache, provider cloud.Provider) {
  110. metricsInit.Do(func() {
  111. cpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  112. Name: "node_cpu_hourly_cost",
  113. Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
  114. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  115. ramGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  116. Name: "node_ram_hourly_cost",
  117. Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
  118. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  119. gpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  120. Name: "node_gpu_hourly_cost",
  121. Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
  122. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  123. gpuCountGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  124. Name: "node_gpu_count",
  125. Help: "node_gpu_count count of gpu on this node",
  126. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  127. pvGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  128. Name: "pv_hourly_cost",
  129. Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
  130. }, []string{"volumename", "persistentvolume", "provider_id"})
  131. spotGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  132. Name: "kubecost_node_is_spot",
  133. Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
  134. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  135. totalGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  136. Name: "node_total_hourly_cost",
  137. Help: "node_total_hourly_cost Total node cost per hour",
  138. }, []string{"instance", "node", "instance_type", "region", "provider_id"})
  139. ramAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  140. Name: "container_memory_allocation_bytes",
  141. Help: "container_memory_allocation_bytes Bytes of RAM used",
  142. }, []string{"namespace", "pod", "container", "instance", "node"})
  143. cpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  144. Name: "container_cpu_allocation",
  145. Help: "container_cpu_allocation Percent of a single CPU used in a minute",
  146. }, []string{"namespace", "pod", "container", "instance", "node"})
  147. gpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  148. Name: "container_gpu_allocation",
  149. Help: "container_gpu_allocation GPU used",
  150. }, []string{"namespace", "pod", "container", "instance", "node"})
  151. pvAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  152. Name: "pod_pvc_allocation",
  153. Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
  154. }, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
  155. networkZoneEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  156. Name: "kubecost_network_zone_egress_cost",
  157. Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
  158. })
  159. networkRegionEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  160. Name: "kubecost_network_region_egress_cost",
  161. Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
  162. })
  163. networkInternetEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
  164. Name: "kubecost_network_internet_egress_cost",
  165. Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
  166. })
  167. clusterManagementCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  168. Name: "kubecost_cluster_management_cost",
  169. Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
  170. }, []string{"provisioner_name"})
  171. lbCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
  172. Name: "kubecost_load_balancer_cost",
  173. Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
  174. }, []string{"ingress_ip", "namespace", "service_name"}) // assumes one ingress IP per load balancer
  175. // Register cost-model metrics for emission
  176. prometheus.MustRegister(cpuGv, ramGv, gpuGv, gpuCountGv, totalGv, pvGv, spotGv)
  177. prometheus.MustRegister(ramAllocGv, cpuAllocGv, gpuAllocGv, pvAllocGv)
  178. prometheus.MustRegister(networkZoneEgressCostG, networkRegionEgressCostG, networkInternetEgressCostG)
  179. prometheus.MustRegister(clusterManagementCostGv, lbCostGv)
  180. // General Metric Collectors
  181. prometheus.MustRegister(ClusterInfoCollector{
  182. KubeClientSet: clusterCache.GetClient(),
  183. Cloud: provider,
  184. })
  185. })
  186. }
  187. //--------------------------------------------------------------------------
  188. // CostModelMetricsEmitter
  189. //--------------------------------------------------------------------------
  190. // CostModelMetricsEmitter emits all cost-model specific metrics calculated by
  191. // the CostModel.ComputeCostData() method.
  192. type CostModelMetricsEmitter struct {
  193. PrometheusClient promclient.Client
  194. KubeClusterCache clustercache.ClusterCache
  195. CloudProvider cloud.Provider
  196. Model *CostModel
  197. // Metrics
  198. CPUPriceRecorder *prometheus.GaugeVec
  199. RAMPriceRecorder *prometheus.GaugeVec
  200. PersistentVolumePriceRecorder *prometheus.GaugeVec
  201. GPUPriceRecorder *prometheus.GaugeVec
  202. GPUCountRecorder *prometheus.GaugeVec
  203. PVAllocationRecorder *prometheus.GaugeVec
  204. NodeSpotRecorder *prometheus.GaugeVec
  205. NodeTotalPriceRecorder *prometheus.GaugeVec
  206. RAMAllocationRecorder *prometheus.GaugeVec
  207. CPUAllocationRecorder *prometheus.GaugeVec
  208. GPUAllocationRecorder *prometheus.GaugeVec
  209. ClusterManagementCostRecorder *prometheus.GaugeVec
  210. LBCostRecorder *prometheus.GaugeVec
  211. NetworkZoneEgressRecorder prometheus.Gauge
  212. NetworkRegionEgressRecorder prometheus.Gauge
  213. NetworkInternetEgressRecorder prometheus.Gauge
  214. // Flow Control
  215. recordingLock *sync.Mutex
  216. recordingStopping bool
  217. recordingStop chan bool
  218. }
  219. // NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
  220. func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clustercache.ClusterCache, provider cloud.Provider, model *CostModel) *CostModelMetricsEmitter {
  221. // init will only actually execute once to register the custom gauges
  222. initCostModelMetrics(clusterCache, provider)
  223. metrics.InitKubeMetrics(clusterCache, &metrics.KubeMetricsOpts{
  224. EmitKubecostControllerMetrics: true,
  225. EmitNamespaceAnnotations: env.IsEmitNamespaceAnnotationsMetric(),
  226. EmitPodAnnotations: env.IsEmitPodAnnotationsMetric(),
  227. EmitKubeStateMetrics: env.IsEmitKsmV1Metrics(),
  228. EmitKubeStateMetricsV1Only: env.IsEmitKsmV1MetricsOnly(),
  229. })
  230. return &CostModelMetricsEmitter{
  231. PrometheusClient: promClient,
  232. KubeClusterCache: clusterCache,
  233. CloudProvider: provider,
  234. Model: model,
  235. CPUPriceRecorder: cpuGv,
  236. RAMPriceRecorder: ramGv,
  237. GPUPriceRecorder: gpuGv,
  238. GPUCountRecorder: gpuCountGv,
  239. PersistentVolumePriceRecorder: pvGv,
  240. NodeSpotRecorder: spotGv,
  241. NodeTotalPriceRecorder: totalGv,
  242. RAMAllocationRecorder: ramAllocGv,
  243. CPUAllocationRecorder: cpuAllocGv,
  244. GPUAllocationRecorder: gpuAllocGv,
  245. PVAllocationRecorder: pvAllocGv,
  246. NetworkZoneEgressRecorder: networkZoneEgressCostG,
  247. NetworkRegionEgressRecorder: networkRegionEgressCostG,
  248. NetworkInternetEgressRecorder: networkInternetEgressCostG,
  249. ClusterManagementCostRecorder: clusterManagementCostGv,
  250. LBCostRecorder: lbCostGv,
  251. recordingLock: new(sync.Mutex),
  252. recordingStopping: false,
  253. recordingStop: nil,
  254. }
  255. }
  256. // Checks to see if there is a metric recording stop channel. If it exists, a new
  257. // channel is not created and false is returned. If it doesn't exist, a new channel
  258. // is created and true is returned.
  259. func (cmme *CostModelMetricsEmitter) checkOrCreateRecordingChan() bool {
  260. cmme.recordingLock.Lock()
  261. defer cmme.recordingLock.Unlock()
  262. if cmme.recordingStop != nil {
  263. return false
  264. }
  265. cmme.recordingStop = make(chan bool, 1)
  266. return true
  267. }
  268. // IsRunning returns true if metric recording is running.
  269. func (cmme *CostModelMetricsEmitter) IsRunning() bool {
  270. cmme.recordingLock.Lock()
  271. defer cmme.recordingLock.Unlock()
  272. return cmme.recordingStop != nil
  273. }
  274. // NodeCostAverages tracks a running average of a node's cost attributes.
  275. // The averages are used to detect and discard spurrious outliers.
  276. type NodeCostAverages struct {
  277. CpuCostAverage float64
  278. RamCostAverage float64
  279. NumCpuDataPoints float64
  280. NumRamDataPoints float64
  281. }
  282. // StartCostModelMetricRecording starts the go routine that emits metrics used to determine
  283. // cluster costs.
  284. func (cmme *CostModelMetricsEmitter) Start() bool {
  285. // Check to see if we're already recording
  286. // This function will create the stop recording channel and return true
  287. // if it doesn't exist.
  288. if !cmme.checkOrCreateRecordingChan() {
  289. log.Errorf("Attempted to start cost model metric recording when it's already running.")
  290. return false
  291. }
  292. go func() {
  293. defer errors.HandlePanic()
  294. containerSeen := make(map[string]bool)
  295. nodeSeen := make(map[string]bool)
  296. loadBalancerSeen := make(map[string]bool)
  297. pvSeen := make(map[string]bool)
  298. pvcSeen := make(map[string]bool)
  299. nodeCostAverages := make(map[string]NodeCostAverages)
  300. getKeyFromLabelStrings := func(labels ...string) string {
  301. return strings.Join(labels, ",")
  302. }
  303. getLabelStringsFromKey := func(key string) []string {
  304. return strings.Split(key, ",")
  305. }
  306. var defaultRegion string = ""
  307. nodeList := cmme.KubeClusterCache.GetAllNodes()
  308. if len(nodeList) > 0 {
  309. var ok bool
  310. defaultRegion, ok = util.GetRegion(nodeList[0].Labels)
  311. if !ok {
  312. log.DedupedWarningf(5, "Failed to locate default region")
  313. }
  314. }
  315. for {
  316. klog.V(4).Info("Recording prices...")
  317. podlist := cmme.KubeClusterCache.GetAllPods()
  318. podStatus := make(map[string]v1.PodPhase)
  319. for _, pod := range podlist {
  320. podStatus[pod.Name] = pod.Status.Phase
  321. }
  322. cfg, _ := cmme.CloudProvider.GetConfig()
  323. provisioner, clusterManagementCost, err := cmme.CloudProvider.ClusterManagementPricing()
  324. if err != nil {
  325. klog.V(1).Infof("Error getting cluster management cost %s", err.Error())
  326. }
  327. cmme.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
  328. // Record network pricing at global scope
  329. networkCosts, err := cmme.CloudProvider.NetworkPricing()
  330. if err != nil {
  331. klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
  332. } else {
  333. cmme.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
  334. cmme.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
  335. cmme.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
  336. }
  337. // TODO: Pass PrometheusClient and CloudProvider into CostModel on instantiation so this isn't so awkward
  338. data, err := cmme.Model.ComputeCostData(cmme.PrometheusClient, cmme.CloudProvider, "2m", "", "")
  339. if err != nil {
  340. // For an error collection, we'll just log the length of the errors (ComputeCostData already logs the
  341. // actual errors)
  342. if prom.IsErrorCollection(err) {
  343. if ec, ok := err.(prom.QueryErrorCollection); ok {
  344. log.Errorf("Error in price recording: %d errors occurred", len(ec.Errors()))
  345. }
  346. } else {
  347. log.Errorf("Error in price recording: " + err.Error())
  348. }
  349. // zero the for loop so the time.Sleep will still work
  350. data = map[string]*CostData{}
  351. }
  352. // TODO: Pass CloudProvider into CostModel on instantiation so this isn't so awkward
  353. nodes, err := cmme.Model.GetNodeCost(cmme.CloudProvider)
  354. if err != nil {
  355. log.Warningf("Metric emission: error getting Node cost: %s", err)
  356. }
  357. for nodeName, node := range nodes {
  358. // Emit costs, guarding against NaN inputs for custom pricing.
  359. cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
  360. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  361. cpuCost, _ = strconv.ParseFloat(cfg.CPU, 64)
  362. if math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  363. cpuCost = 0
  364. }
  365. }
  366. cpu, _ := strconv.ParseFloat(node.VCPU, 64)
  367. if math.IsNaN(cpu) || math.IsInf(cpu, 0) {
  368. cpu = 1 // Assume 1 CPU
  369. }
  370. ramCost, _ := strconv.ParseFloat(node.RAMCost, 64)
  371. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  372. ramCost, _ = strconv.ParseFloat(cfg.RAM, 64)
  373. if math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  374. ramCost = 0
  375. }
  376. }
  377. ram, _ := strconv.ParseFloat(node.RAMBytes, 64)
  378. if math.IsNaN(ram) || math.IsInf(ram, 0) {
  379. ram = 0
  380. }
  381. gpu, _ := strconv.ParseFloat(node.GPU, 64)
  382. if math.IsNaN(gpu) || math.IsInf(gpu, 0) {
  383. gpu = 0
  384. }
  385. gpuCost, _ := strconv.ParseFloat(node.GPUCost, 64)
  386. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  387. gpuCost, _ = strconv.ParseFloat(cfg.GPU, 64)
  388. if math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  389. gpuCost = 0
  390. }
  391. }
  392. nodeType := node.InstanceType
  393. nodeRegion := node.Region
  394. totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
  395. labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
  396. avgCosts, ok := nodeCostAverages[labelKey]
  397. // initialize average cost tracking for this node if there is none
  398. if !ok {
  399. avgCosts = NodeCostAverages{
  400. CpuCostAverage: cpuCost,
  401. RamCostAverage: ramCost,
  402. NumCpuDataPoints: 1,
  403. NumRamDataPoints: 1,
  404. }
  405. nodeCostAverages[labelKey] = avgCosts
  406. }
  407. cmme.GPUCountRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpu)
  408. cmme.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
  409. const outlierFactor float64 = 30
  410. // don't record cpuCost, ramCost, or gpuCost in the case of wild outliers
  411. // k8s api sometimes causes cost spikes as described here:
  412. // https://github.com/kubecost/cost-model/issues/927
  413. if cpuCost < outlierFactor*avgCosts.CpuCostAverage {
  414. cmme.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
  415. avgCosts.CpuCostAverage = (avgCosts.CpuCostAverage*avgCosts.NumCpuDataPoints + cpuCost) / (avgCosts.NumCpuDataPoints + 1)
  416. avgCosts.NumCpuDataPoints += 1
  417. } else {
  418. log.Warningf("CPU cost outlier detected; skipping data point.")
  419. }
  420. if ramCost < outlierFactor*avgCosts.RamCostAverage {
  421. cmme.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
  422. avgCosts.RamCostAverage = (avgCosts.RamCostAverage*avgCosts.NumRamDataPoints + ramCost) / (avgCosts.NumRamDataPoints + 1)
  423. avgCosts.NumRamDataPoints += 1
  424. } else {
  425. log.Warningf("RAM cost outlier detected; skipping data point.")
  426. }
  427. // skip redording totalCost if any constituent costs were outliers
  428. if cpuCost < outlierFactor*avgCosts.CpuCostAverage &&
  429. ramCost < outlierFactor*avgCosts.RamCostAverage {
  430. cmme.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
  431. }
  432. nodeCostAverages[labelKey] = avgCosts
  433. if node.IsSpot() {
  434. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(1.0)
  435. } else {
  436. cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(0.0)
  437. }
  438. nodeSeen[labelKey] = true
  439. }
  440. // TODO: Pass CloudProvider into CostModel on instantiation so this isn't so awkward
  441. loadBalancers, err := cmme.Model.GetLBCost(cmme.CloudProvider)
  442. if err != nil {
  443. log.Warningf("Metric emission: error getting LoadBalancer cost: %s", err)
  444. }
  445. for lbKey, lb := range loadBalancers {
  446. // TODO: parse (if necessary) and calculate cost associated with loadBalancer based on dynamic cloud prices fetched into each lb struct on GetLBCost() call
  447. keyParts := getLabelStringsFromKey(lbKey)
  448. namespace := keyParts[0]
  449. serviceName := keyParts[1]
  450. ingressIP := ""
  451. if len(lb.IngressIPAddresses) > 0 {
  452. ingressIP = lb.IngressIPAddresses[0] // assumes one ingress IP per load balancer
  453. }
  454. cmme.LBCostRecorder.WithLabelValues(ingressIP, namespace, serviceName).Set(lb.Cost)
  455. labelKey := getKeyFromLabelStrings(ingressIP, namespace, serviceName)
  456. loadBalancerSeen[labelKey] = true
  457. }
  458. for _, costs := range data {
  459. nodeName := costs.NodeName
  460. namespace := costs.Namespace
  461. podName := costs.PodName
  462. containerName := costs.Name
  463. if costs.PVCData != nil {
  464. for _, pvc := range costs.PVCData {
  465. if pvc.Volume != nil {
  466. timesClaimed := pvc.TimesClaimed
  467. if timesClaimed == 0 {
  468. timesClaimed = 1 // unallocated PVs are unclaimed but have a full allocation
  469. }
  470. cmme.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value / float64(timesClaimed))
  471. labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
  472. pvcSeen[labelKey] = true
  473. }
  474. }
  475. }
  476. if len(costs.RAMAllocation) > 0 {
  477. cmme.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
  478. }
  479. if len(costs.CPUAllocation) > 0 {
  480. cmme.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
  481. }
  482. if len(costs.GPUReq) > 0 {
  483. // allocation here is set to the request because shared GPU usage not yet supported.
  484. cmme.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
  485. }
  486. labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
  487. if podStatus[podName] == v1.PodRunning { // Only report data for current pods
  488. containerSeen[labelKey] = true
  489. } else {
  490. containerSeen[labelKey] = false
  491. }
  492. }
  493. storageClasses := cmme.KubeClusterCache.GetAllStorageClasses()
  494. storageClassMap := make(map[string]map[string]string)
  495. for _, storageClass := range storageClasses {
  496. params := storageClass.Parameters
  497. storageClassMap[storageClass.ObjectMeta.Name] = params
  498. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  499. storageClassMap["default"] = params
  500. storageClassMap[""] = params
  501. }
  502. }
  503. pvs := cmme.KubeClusterCache.GetAllPersistentVolumes()
  504. for _, pv := range pvs {
  505. // Omit pv_hourly_cost if the volume status is failed
  506. if pv.Status.Phase == v1.VolumeFailed {
  507. continue
  508. }
  509. parameters, ok := storageClassMap[pv.Spec.StorageClassName]
  510. if !ok {
  511. klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
  512. }
  513. var region string
  514. if r, ok := util.GetRegion(pv.Labels); ok {
  515. region = r
  516. } else {
  517. region = defaultRegion
  518. }
  519. cacPv := &cloud.PV{
  520. Class: pv.Spec.StorageClassName,
  521. Region: region,
  522. Parameters: parameters,
  523. }
  524. // TODO: GetPVCost should be a method in CostModel?
  525. GetPVCost(cacPv, pv, cmme.CloudProvider, region)
  526. c, _ := strconv.ParseFloat(cacPv.Cost, 64)
  527. cmme.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name, cacPv.ProviderID).Set(c)
  528. labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
  529. pvSeen[labelKey] = true
  530. }
  531. for labelString, seen := range nodeSeen {
  532. if !seen {
  533. klog.V(4).Infof("Removing %s from nodes", labelString)
  534. labels := getLabelStringsFromKey(labelString)
  535. ok := cmme.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
  536. if ok {
  537. klog.V(4).Infof("removed %s from totalprice", labelString)
  538. } else {
  539. klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
  540. }
  541. ok = cmme.NodeSpotRecorder.DeleteLabelValues(labels...)
  542. if ok {
  543. klog.V(4).Infof("removed %s from spot records", labelString)
  544. } else {
  545. klog.Infof("FAILURE TO REMOVE %s from spot records", labelString)
  546. }
  547. ok = cmme.CPUPriceRecorder.DeleteLabelValues(labels...)
  548. if ok {
  549. klog.V(4).Infof("removed %s from cpuprice", labelString)
  550. } else {
  551. klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
  552. }
  553. ok = cmme.GPUPriceRecorder.DeleteLabelValues(labels...)
  554. if ok {
  555. klog.V(4).Infof("removed %s from gpuprice", labelString)
  556. } else {
  557. klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
  558. }
  559. ok = cmme.GPUCountRecorder.DeleteLabelValues(labels...)
  560. if ok {
  561. klog.V(4).Infof("removed %s from gpucount", labelString)
  562. } else {
  563. klog.Infof("FAILURE TO REMOVE %s from gpucount", labelString)
  564. }
  565. ok = cmme.RAMPriceRecorder.DeleteLabelValues(labels...)
  566. if ok {
  567. klog.V(4).Infof("removed %s from ramprice", labelString)
  568. } else {
  569. klog.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
  570. }
  571. delete(nodeSeen, labelString)
  572. delete(nodeCostAverages, labelString)
  573. } else {
  574. nodeSeen[labelString] = false
  575. }
  576. }
  577. for labelString, seen := range loadBalancerSeen {
  578. if !seen {
  579. labels := getLabelStringsFromKey(labelString)
  580. ok := cmme.LBCostRecorder.DeleteLabelValues(labels...)
  581. if !ok {
  582. log.Warningf("Metric emission: failed to delete LoadBalancer with labels: %v", labels)
  583. }
  584. delete(loadBalancerSeen, labelString)
  585. } else {
  586. loadBalancerSeen[labelString] = false
  587. }
  588. }
  589. for labelString, seen := range containerSeen {
  590. if !seen {
  591. labels := getLabelStringsFromKey(labelString)
  592. cmme.RAMAllocationRecorder.DeleteLabelValues(labels...)
  593. cmme.CPUAllocationRecorder.DeleteLabelValues(labels...)
  594. cmme.GPUAllocationRecorder.DeleteLabelValues(labels...)
  595. delete(containerSeen, labelString)
  596. } else {
  597. containerSeen[labelString] = false
  598. }
  599. }
  600. for labelString, seen := range pvSeen {
  601. if !seen {
  602. labels := getLabelStringsFromKey(labelString)
  603. cmme.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
  604. delete(pvSeen, labelString)
  605. } else {
  606. pvSeen[labelString] = false
  607. }
  608. }
  609. for labelString, seen := range pvcSeen {
  610. if !seen {
  611. labels := getLabelStringsFromKey(labelString)
  612. cmme.PVAllocationRecorder.DeleteLabelValues(labels...)
  613. delete(pvcSeen, labelString)
  614. } else {
  615. pvcSeen[labelString] = false
  616. }
  617. }
  618. select {
  619. case <-time.After(time.Minute):
  620. case <-cmme.recordingStop:
  621. cmme.recordingLock.Lock()
  622. cmme.recordingStopping = false
  623. cmme.recordingStop = nil
  624. cmme.recordingLock.Unlock()
  625. return
  626. }
  627. }
  628. }()
  629. return true
  630. }
  631. // Stop halts the metrics emission loop after the current emission is completed
  632. // or if the emission is paused.
  633. func (cmme *CostModelMetricsEmitter) Stop() {
  634. cmme.recordingLock.Lock()
  635. defer cmme.recordingLock.Unlock()
  636. if !cmme.recordingStopping && cmme.recordingStop != nil {
  637. cmme.recordingStopping = true
  638. close(cmme.recordingStop)
  639. }
  640. }