Kaynağa Gözat

Move the cost-model metrics to a CostModelMetricsEmitter struct, Remove unused parameter from ComputeCostData and ComputeCostDataRange

Matt Bolt 5 yıl önce
ebeveyn
işleme
7491d62113

+ 1 - 1
pkg/costmodel/aggregation.go

@@ -1296,7 +1296,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 		start := startTime.Format(RFC3339Milli)
 		end := endTime.Format(RFC3339Milli)
 
-		costData, err = a.Model.ComputeCostDataRange(promClient, a.KubeClientSet, a.CloudProvider, start, end, window, resolutionHours, "", "", remoteEnabled, offset)
+		costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, start, end, window, resolutionHours, "", "", remoteEnabled, offset)
 		if err != nil {
 			if prom.IsErrorCollection(err) {
 				return nil, "", err

+ 4 - 5
pkg/costmodel/costmodel.go

@@ -19,7 +19,6 @@ import (
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
-	"k8s.io/client-go/kubernetes"
 	"k8s.io/klog"
 
 	"github.com/google/uuid"
@@ -222,7 +221,7 @@ const (
 	normalizationStr          = `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[%s] %s))`
 )
 
-func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
+func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, window, offset, window, offset)
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset)
 	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, window, offset, window, offset)
@@ -1462,7 +1461,7 @@ func requestKeyFor(startString string, endString string, windowString string, fi
 
 // ComputeCostDataRange executes a range query for cost data.
 // Note that "offset" represents the time between the function call and "endString", and is also passed for convenience
-func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
+func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider,
 	startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool, offset string) (map[string]*CostData, error) {
 	// Create a request key for request grouping. This key will be used to represent the cost-model result
 	// for the specific inputs to prevent multiple queries for identical data.
@@ -1473,7 +1472,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	// If there is already a request out that uses the same data, wait for it to return to share the results.
 	// Otherwise, start executing.
 	result, err, _ := cm.RequestGroup.Do(key, func() (interface{}, error) {
-		return cm.costDataRange(cli, clientset, cp, startString, endString, windowString, resolutionHours, filterNamespace, filterCluster, remoteEnabled, offset)
+		return cm.costDataRange(cli, cp, startString, endString, windowString, resolutionHours, filterNamespace, filterCluster, remoteEnabled, offset)
 	})
 
 	data, ok := result.(map[string]*CostData)
@@ -1484,7 +1483,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return data, err
 }
 
-func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool, offset string) (map[string]*CostData, error) {
+func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool, offset string) (map[string]*CostData, error) {
 	layout := "2006-01-02T15:04:05.000Z"
 
 	start, err := time.Parse(layout, startString)

+ 256 - 68
pkg/costmodel/metrics.go

@@ -7,12 +7,13 @@ import (
 	"sync"
 	"time"
 
-	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 
+	promclient "github.com/prometheus/client_golang/api"
 	"github.com/prometheus/client_golang/prometheus"
 	dto "github.com/prometheus/client_model/go"
 	v1 "k8s.io/api/core/v1"
@@ -451,7 +452,7 @@ func (pam PodAnnotationsMetric) Write(m *dto.Metric) error {
 
 // ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
 type ClusterInfoCollector struct {
-	Cloud         costAnalyzerCloud.Provider
+	Cloud         cloud.Provider
 	KubeClientSet kubernetes.Interface
 }
 
@@ -521,45 +522,227 @@ func toStringPtr(s string) *string {
 }
 
 //--------------------------------------------------------------------------
-//  Package Functions
+//  Cost Model Metrics Initialization
 //--------------------------------------------------------------------------
 
+// Only allow the metrics to be instantiated and registered once
+var metricsInit sync.Once
+
 var (
-	recordingLock     sync.Mutex
+	cpuGv                      *prometheus.GaugeVec
+	ramGv                      *prometheus.GaugeVec
+	gpuGv                      *prometheus.GaugeVec
+	pvGv                       *prometheus.GaugeVec
+	spotGv                     *prometheus.GaugeVec
+	totalGv                    *prometheus.GaugeVec
+	ramAllocGv                 *prometheus.GaugeVec
+	cpuAllocGv                 *prometheus.GaugeVec
+	gpuAllocGv                 *prometheus.GaugeVec
+	pvAllocGv                  *prometheus.GaugeVec
+	networkZoneEgressCostG     prometheus.Gauge
+	networkRegionEgressCostG   prometheus.Gauge
+	networkInternetEgressCostG prometheus.Gauge
+	clusterManagementCostGv    *prometheus.GaugeVec
+	lbCostGv                   *prometheus.GaugeVec
+)
+
+// initCostModelMetrics uses a sync.Once to ensure that these metrics are only created once
+func initCostModelMetrics(clusterCache clustercache.ClusterCache, provider cloud.Provider) {
+	metricsInit.Do(func() {
+		cpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "node_cpu_hourly_cost",
+			Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
+		}, []string{"instance", "node", "instance_type", "region", "provider_id"})
+
+		ramGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "node_ram_hourly_cost",
+			Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
+		}, []string{"instance", "node", "instance_type", "region", "provider_id"})
+
+		gpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "node_gpu_hourly_cost",
+			Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
+		}, []string{"instance", "node", "instance_type", "region", "provider_id"})
+
+		pvGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "pv_hourly_cost",
+			Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
+		}, []string{"volumename", "persistentvolume", "provider_id"})
+
+		spotGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "kubecost_node_is_spot",
+			Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
+		}, []string{"instance", "node", "instance_type", "region", "provider_id"})
+
+		totalGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "node_total_hourly_cost",
+			Help: "node_total_hourly_cost Total node cost per hour",
+		}, []string{"instance", "node", "instance_type", "region", "provider_id"})
+
+		ramAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "container_memory_allocation_bytes",
+			Help: "container_memory_allocation_bytes Bytes of RAM used",
+		}, []string{"namespace", "pod", "container", "instance", "node"})
+
+		cpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "container_cpu_allocation",
+			Help: "container_cpu_allocation Percent of a single CPU used in a minute",
+		}, []string{"namespace", "pod", "container", "instance", "node"})
+
+		gpuAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "container_gpu_allocation",
+			Help: "container_gpu_allocation GPU used",
+		}, []string{"namespace", "pod", "container", "instance", "node"})
+
+		pvAllocGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "pod_pvc_allocation",
+			Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
+		}, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
+
+		networkZoneEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
+			Name: "kubecost_network_zone_egress_cost",
+			Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
+		})
+
+		networkRegionEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
+			Name: "kubecost_network_region_egress_cost",
+			Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
+		})
+
+		networkInternetEgressCostG = prometheus.NewGauge(prometheus.GaugeOpts{
+			Name: "kubecost_network_internet_egress_cost",
+			Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
+		})
+
+		clusterManagementCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "kubecost_cluster_management_cost",
+			Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
+		}, []string{"provisioner_name"})
+
+		lbCostGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
+			Name: "kubecost_load_balancer_cost",
+			Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
+		}, []string{"ingress_ip", "namespace", "service_name"}) // assumes one ingress IP per load balancer
+
+		// Register cost-model metrics for emission
+		prometheus.MustRegister(cpuGv, ramGv, gpuGv, totalGv, pvGv, spotGv)
+		prometheus.MustRegister(ramAllocGv, cpuAllocGv, gpuAllocGv, pvAllocGv)
+		prometheus.MustRegister(networkZoneEgressCostG, networkRegionEgressCostG, networkInternetEgressCostG)
+		prometheus.MustRegister(clusterManagementCostGv, lbCostGv)
+
+		// General Metric Collectors
+		prometheus.MustRegister(ServiceCollector{
+			KubeClusterCache: clusterCache,
+		})
+		prometheus.MustRegister(DeploymentCollector{
+			KubeClusterCache: clusterCache,
+		})
+		prometheus.MustRegister(StatefulsetCollector{
+			KubeClusterCache: clusterCache,
+		})
+		prometheus.MustRegister(ClusterInfoCollector{
+			KubeClientSet: clusterCache.GetClient(),
+			Cloud:         provider,
+		})
+	})
+}
+
+//--------------------------------------------------------------------------
+//  CostModelMetricsEmitter
+//--------------------------------------------------------------------------
+
+// CostModelMetricsEmitter emits all cost-model specific metrics calculated by
+// the CostModel.ComputeCostData() method.
+type CostModelMetricsEmitter struct {
+	PrometheusClient promclient.Client
+	KubeClusterCache clustercache.ClusterCache
+	CloudProvider    cloud.Provider
+	Model            *CostModel
+
+	// Metrics
+	CPUPriceRecorder              *prometheus.GaugeVec
+	RAMPriceRecorder              *prometheus.GaugeVec
+	PersistentVolumePriceRecorder *prometheus.GaugeVec
+	GPUPriceRecorder              *prometheus.GaugeVec
+	PVAllocationRecorder          *prometheus.GaugeVec
+	NodeSpotRecorder              *prometheus.GaugeVec
+	NodeTotalPriceRecorder        *prometheus.GaugeVec
+	RAMAllocationRecorder         *prometheus.GaugeVec
+	CPUAllocationRecorder         *prometheus.GaugeVec
+	GPUAllocationRecorder         *prometheus.GaugeVec
+	ClusterManagementCostRecorder *prometheus.GaugeVec
+	LBCostRecorder                *prometheus.GaugeVec
+	NetworkZoneEgressRecorder     prometheus.Gauge
+	NetworkRegionEgressRecorder   prometheus.Gauge
+	NetworkInternetEgressRecorder prometheus.Gauge
+
+	// Flow Control
+	recordingLock     *sync.Mutex
 	recordingStopping bool
 	recordingStop     chan bool
-)
+}
+
+// NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
+func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clustercache.ClusterCache, provider cloud.Provider, model *CostModel) *CostModelMetricsEmitter {
+	// init will only actually execute once to register the custom gauges
+	initCostModelMetrics(clusterCache, provider)
+
+	return &CostModelMetricsEmitter{
+		PrometheusClient:              promClient,
+		KubeClusterCache:              clusterCache,
+		CloudProvider:                 provider,
+		Model:                         model,
+		CPUPriceRecorder:              cpuGv,
+		RAMPriceRecorder:              ramGv,
+		GPUPriceRecorder:              gpuGv,
+		PersistentVolumePriceRecorder: pvGv,
+		NodeSpotRecorder:              spotGv,
+		NodeTotalPriceRecorder:        totalGv,
+		RAMAllocationRecorder:         ramAllocGv,
+		CPUAllocationRecorder:         cpuAllocGv,
+		GPUAllocationRecorder:         gpuAllocGv,
+		PVAllocationRecorder:          pvAllocGv,
+		NetworkZoneEgressRecorder:     networkZoneEgressCostG,
+		NetworkRegionEgressRecorder:   networkRegionEgressCostG,
+		NetworkInternetEgressRecorder: networkInternetEgressCostG,
+		ClusterManagementCostRecorder: clusterManagementCostGv,
+		LBCostRecorder:                lbCostGv,
+		recordingLock:                 new(sync.Mutex),
+		recordingStopping:             false,
+		recordingStop:                 nil,
+	}
+}
 
 // Checks to see if there is a metric recording stop channel. If it exists, a new
 // channel is not created and false is returned. If it doesn't exist, a new channel
 // is created and true is returned.
-func checkOrCreateRecordingChan() bool {
-	recordingLock.Lock()
-	defer recordingLock.Unlock()
+func (cmme *CostModelMetricsEmitter) checkOrCreateRecordingChan() bool {
+	cmme.recordingLock.Lock()
+	defer cmme.recordingLock.Unlock()
 
-	if recordingStop != nil {
+	if cmme.recordingStop != nil {
 		return false
 	}
 
-	recordingStop = make(chan bool, 1)
+	cmme.recordingStop = make(chan bool, 1)
 	return true
 }
 
-// IsCostModelMetricRecordingRunning returns true if metric recording is still running.
-func IsCostModelMetricRecordingRunning() bool {
-	recordingLock.Lock()
-	defer recordingLock.Unlock()
+// IsRunning returns true if metric recording is running.
+func (cmme *CostModelMetricsEmitter) IsRunning() bool {
+	cmme.recordingLock.Lock()
+	defer cmme.recordingLock.Unlock()
 
-	return recordingStop != nil
+	return cmme.recordingStop != nil
 }
 
 // StartCostModelMetricRecording starts the go routine that emits metrics used to determine
 // cluster costs.
-func StartCostModelMetricRecording(a *Accesses) bool {
+func (cmme *CostModelMetricsEmitter) Start() bool {
 	// Check to see if we're already recording
 	// This function will create the stop recording channel and return true
 	// if it doesn't exist.
-	if !checkOrCreateRecordingChan() {
+	if !cmme.checkOrCreateRecordingChan() {
 		log.Errorf("Attempted to start cost model metric recording when it's already running.")
 		return false
 	}
@@ -581,38 +764,39 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 		}
 
 		var defaultRegion string = ""
-		nodeList := a.Model.Cache.GetAllNodes()
+		nodeList := cmme.KubeClusterCache.GetAllNodes()
 		if len(nodeList) > 0 {
 			defaultRegion = nodeList[0].Labels[v1.LabelZoneRegion]
 		}
 
 		for {
 			klog.V(4).Info("Recording prices...")
-			podlist := a.Model.Cache.GetAllPods()
+			podlist := cmme.KubeClusterCache.GetAllPods()
 			podStatus := make(map[string]v1.PodPhase)
 			for _, pod := range podlist {
 				podStatus[pod.Name] = pod.Status.Phase
 			}
 
-			cfg, _ := a.CloudProvider.GetConfig()
+			cfg, _ := cmme.CloudProvider.GetConfig()
 
-			provisioner, clusterManagementCost, err := a.CloudProvider.ClusterManagementPricing()
+			provisioner, clusterManagementCost, err := cmme.CloudProvider.ClusterManagementPricing()
 			if err != nil {
 				klog.V(1).Infof("Error getting cluster management cost %s", err.Error())
 			}
-			a.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
+			cmme.ClusterManagementCostRecorder.WithLabelValues(provisioner).Set(clusterManagementCost)
 
 			// Record network pricing at global scope
-			networkCosts, err := a.CloudProvider.NetworkPricing()
+			networkCosts, err := cmme.CloudProvider.NetworkPricing()
 			if err != nil {
 				klog.V(4).Infof("Failed to retrieve network costs: %s", err.Error())
 			} else {
-				a.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
-				a.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
-				a.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
+				cmme.NetworkZoneEgressRecorder.Set(networkCosts.ZoneNetworkEgressCost)
+				cmme.NetworkRegionEgressRecorder.Set(networkCosts.RegionNetworkEgressCost)
+				cmme.NetworkInternetEgressRecorder.Set(networkCosts.InternetNetworkEgressCost)
 			}
 
-			data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.CloudProvider, "2m", "", "")
+			// TODO: Pass PrometheusClient and CloudProvider into CostModel on instantiation so this isn't so awkward
+			data, err := cmme.Model.ComputeCostData(cmme.PrometheusClient, cmme.CloudProvider, "2m", "", "")
 			if err != nil {
 				// For an error collection, we'll just log the length of the errors (ComputeCostData already logs the
 				// actual errors)
@@ -628,7 +812,8 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 				data = map[string]*CostData{}
 			}
 
-			nodes, err := a.Model.GetNodeCost(a.CloudProvider)
+			// TODO: Pass CloudProvider into CostModel on instantiation so this isn't so awkward
+			nodes, err := cmme.Model.GetNodeCost(cmme.CloudProvider)
 			for nodeName, node := range nodes {
 				// Emit costs, guarding against NaN inputs for custom pricing.
 				cpuCost, _ := strconv.ParseFloat(node.VCPUCost, 64)
@@ -669,20 +854,21 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 
 				totalCost := cpu*cpuCost + ramCost*(ram/1024/1024/1024) + gpu*gpuCost
 
-				a.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
-				a.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
-				a.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
-				a.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
+				cmme.CPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(cpuCost)
+				cmme.RAMPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(ramCost)
+				cmme.GPUPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(gpuCost)
+				cmme.NodeTotalPriceRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(totalCost)
 				if node.IsSpot() {
-					a.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(1.0)
+					cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(1.0)
 				} else {
-					a.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(0.0)
+					cmme.NodeSpotRecorder.WithLabelValues(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID).Set(0.0)
 				}
 				labelKey := getKeyFromLabelStrings(nodeName, nodeName, nodeType, nodeRegion, node.ProviderID)
 				nodeSeen[labelKey] = true
 			}
 
-			loadBalancers, err := a.Model.GetLBCost(a.CloudProvider)
+			// TODO: Pass CloudProvider into CostModel on instantiation so this isn't so awkward
+			loadBalancers, err := cmme.Model.GetLBCost(cmme.CloudProvider)
 			for lbKey, lb := range loadBalancers {
 				// TODO: parse (if necessary) and calculate cost associated with loadBalancer based on dynamic cloud prices fetched into each lb struct on GetLBCost() call
 				keyParts := getLabelStringsFromKey(lbKey)
@@ -692,7 +878,7 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 				if len(lb.IngressIPAddresses) > 0 {
 					ingressIP = lb.IngressIPAddresses[0] // assumes one ingress IP per load balancer
 				}
-				a.LBCostRecorder.WithLabelValues(ingressIP, namespace, serviceName).Set(lb.Cost)
+				cmme.LBCostRecorder.WithLabelValues(ingressIP, namespace, serviceName).Set(lb.Cost)
 
 				labelKey := getKeyFromLabelStrings(namespace, serviceName)
 				loadBalancerSeen[labelKey] = true
@@ -712,7 +898,7 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 							if timesClaimed == 0 {
 								timesClaimed = 1 // unallocated PVs are unclaimed but have a full allocation
 							}
-							a.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value / float64(timesClaimed))
+							cmme.PVAllocationRecorder.WithLabelValues(namespace, podName, pvc.Claim, pvc.VolumeName).Set(pvc.Values[0].Value / float64(timesClaimed))
 							labelKey := getKeyFromLabelStrings(namespace, podName, pvc.Claim, pvc.VolumeName)
 							pvcSeen[labelKey] = true
 						}
@@ -720,14 +906,14 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 				}
 
 				if len(costs.RAMAllocation) > 0 {
-					a.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
+					cmme.RAMAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.RAMAllocation[0].Value)
 				}
 				if len(costs.CPUAllocation) > 0 {
-					a.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
+					cmme.CPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.CPUAllocation[0].Value)
 				}
 				if len(costs.GPUReq) > 0 {
 					// allocation here is set to the request because shared GPU usage not yet supported.
-					a.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
+					cmme.GPUAllocationRecorder.WithLabelValues(namespace, podName, containerName, nodeName, nodeName).Set(costs.GPUReq[0].Value)
 				}
 				labelKey := getKeyFromLabelStrings(namespace, podName, containerName, nodeName, nodeName)
 				if podStatus[podName] == v1.PodRunning { // Only report data for current pods
@@ -736,7 +922,7 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 					containerSeen[labelKey] = false
 				}
 
-				storageClasses := a.Model.Cache.GetAllStorageClasses()
+				storageClasses := cmme.KubeClusterCache.GetAllStorageClasses()
 				storageClassMap := make(map[string]map[string]string)
 				for _, storageClass := range storageClasses {
 					params := storageClass.Parameters
@@ -747,7 +933,7 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 					}
 				}
 
-				pvs := a.Model.Cache.GetAllPersistentVolumes()
+				pvs := cmme.KubeClusterCache.GetAllPersistentVolumes()
 				for _, pv := range pvs {
 					parameters, ok := storageClassMap[pv.Spec.StorageClassName]
 					if !ok {
@@ -759,14 +945,16 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 					} else {
 						region = defaultRegion
 					}
-					cacPv := &costAnalyzerCloud.PV{
+					cacPv := &cloud.PV{
 						Class:      pv.Spec.StorageClassName,
 						Region:     region,
 						Parameters: parameters,
 					}
-					GetPVCost(cacPv, pv, a.CloudProvider, region)
+
+					// TODO: GetPVCost should be a method in CostModel?
+					GetPVCost(cacPv, pv, cmme.CloudProvider, region)
 					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
-					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name, cacPv.ProviderID).Set(c)
+					cmme.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name, cacPv.ProviderID).Set(c)
 					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
 					pvSeen[labelKey] = true
 				}
@@ -775,31 +963,31 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 				if !seen {
 					klog.V(4).Infof("Removing %s from nodes", labelString)
 					labels := getLabelStringsFromKey(labelString)
-					ok := a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
+					ok := cmme.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 						klog.V(4).Infof("removed %s from totalprice", labelString)
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
 					}
-					ok = a.NodeSpotRecorder.DeleteLabelValues(labels...)
+					ok = cmme.NodeSpotRecorder.DeleteLabelValues(labels...)
 					if ok {
 						klog.V(4).Infof("removed %s from spot records", labelString)
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from spot records", labelString)
 					}
-					ok = a.CPUPriceRecorder.DeleteLabelValues(labels...)
+					ok = cmme.CPUPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 						klog.V(4).Infof("removed %s from cpuprice", labelString)
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
 					}
-					ok = a.GPUPriceRecorder.DeleteLabelValues(labels...)
+					ok = cmme.GPUPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 						klog.V(4).Infof("removed %s from gpuprice", labelString)
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
 					}
-					ok = a.RAMPriceRecorder.DeleteLabelValues(labels...)
+					ok = cmme.RAMPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 						klog.V(4).Infof("removed %s from ramprice", labelString)
 					} else {
@@ -813,7 +1001,7 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 			for labelString, seen := range loadBalancerSeen {
 				if !seen {
 					labels := getLabelStringsFromKey(labelString)
-					a.LBCostRecorder.DeleteLabelValues(labels...)
+					cmme.LBCostRecorder.DeleteLabelValues(labels...)
 				} else {
 					loadBalancerSeen[labelString] = false
 				}
@@ -821,9 +1009,9 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 			for labelString, seen := range containerSeen {
 				if !seen {
 					labels := getLabelStringsFromKey(labelString)
-					a.RAMAllocationRecorder.DeleteLabelValues(labels...)
-					a.CPUAllocationRecorder.DeleteLabelValues(labels...)
-					a.GPUAllocationRecorder.DeleteLabelValues(labels...)
+					cmme.RAMAllocationRecorder.DeleteLabelValues(labels...)
+					cmme.CPUAllocationRecorder.DeleteLabelValues(labels...)
+					cmme.GPUAllocationRecorder.DeleteLabelValues(labels...)
 					delete(containerSeen, labelString)
 				} else {
 					containerSeen[labelString] = false
@@ -832,7 +1020,7 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 			for labelString, seen := range pvSeen {
 				if !seen {
 					labels := getLabelStringsFromKey(labelString)
-					a.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
+					cmme.PersistentVolumePriceRecorder.DeleteLabelValues(labels...)
 					delete(pvSeen, labelString)
 				} else {
 					pvSeen[labelString] = false
@@ -841,7 +1029,7 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 			for labelString, seen := range pvcSeen {
 				if !seen {
 					labels := getLabelStringsFromKey(labelString)
-					a.PVAllocationRecorder.DeleteLabelValues(labels...)
+					cmme.PVAllocationRecorder.DeleteLabelValues(labels...)
 					delete(pvcSeen, labelString)
 				} else {
 					pvcSeen[labelString] = false
@@ -850,11 +1038,11 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 
 			select {
 			case <-time.After(time.Minute):
-			case <-recordingStop:
-				recordingLock.Lock()
-				recordingStopping = false
-				recordingStop = nil
-				recordingLock.Unlock()
+			case <-cmme.recordingStop:
+				cmme.recordingLock.Lock()
+				cmme.recordingStopping = false
+				cmme.recordingStop = nil
+				cmme.recordingLock.Unlock()
 				return
 			}
 		}
@@ -863,14 +1051,14 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 	return true
 }
 
-// StopCostModelMetricRecording halts the metrics emission loop after the current emission is completed
+// Stop halts the metrics emission loop after the current emission is completed
 // or if the emission is paused.
-func StopCostModelMetricRecording() {
-	recordingLock.Lock()
-	defer recordingLock.Unlock()
+func (cmme *CostModelMetricsEmitter) Stop() {
+	cmme.recordingLock.Lock()
+	defer cmme.recordingLock.Unlock()
 
-	if !recordingStopping && recordingStop != nil {
-		recordingStopping = true
-		close(recordingStop)
+	if !cmme.recordingStopping && cmme.recordingStop != nil {
+		cmme.recordingStopping = true
+		close(cmme.recordingStop)
 	}
 }

+ 35 - 158
pkg/costmodel/router.go

@@ -31,7 +31,6 @@ import (
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/patrickmn/go-cache"
-	"github.com/prometheus/client_golang/prometheus"
 
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
@@ -55,37 +54,21 @@ var (
 // Accesses defines a singleton application instance, providing access to
 // Prometheus, Kubernetes, the cloud provider, and caches.
 type Accesses struct {
-	Router                        *httprouter.Router
-	PrometheusClient              prometheusClient.Client
-	ThanosClient                  prometheusClient.Client
-	KubeClientSet                 kubernetes.Interface
-	ClusterManager                *cm.ClusterManager
-	ClusterMap                    clusters.ClusterMap
-	CloudProvider                 cloud.Provider
-	CPUPriceRecorder              *prometheus.GaugeVec
-	RAMPriceRecorder              *prometheus.GaugeVec
-	PersistentVolumePriceRecorder *prometheus.GaugeVec
-	GPUPriceRecorder              *prometheus.GaugeVec
-	NodeTotalPriceRecorder        *prometheus.GaugeVec
-	NodeSpotRecorder              *prometheus.GaugeVec
-	RAMAllocationRecorder         *prometheus.GaugeVec
-	CPUAllocationRecorder         *prometheus.GaugeVec
-	GPUAllocationRecorder         *prometheus.GaugeVec
-	PVAllocationRecorder          *prometheus.GaugeVec
-	ClusterManagementCostRecorder *prometheus.GaugeVec
-	LBCostRecorder                *prometheus.GaugeVec
-	NetworkZoneEgressRecorder     prometheus.Gauge
-	NetworkRegionEgressRecorder   prometheus.Gauge
-	NetworkInternetEgressRecorder prometheus.Gauge
-	ServiceSelectorRecorder       *prometheus.GaugeVec
-	DeploymentSelectorRecorder    *prometheus.GaugeVec
-	Model                         *CostModel
-	OutOfClusterCache             *cache.Cache
-	AggregateCache                *cache.Cache
-	CostDataCache                 *cache.Cache
-	ClusterCostsCache             *cache.Cache
-	CacheExpiration               map[string]time.Duration
-	AggAPI                        CostModelAggregator
+	Router            *httprouter.Router
+	PrometheusClient  prometheusClient.Client
+	ThanosClient      prometheusClient.Client
+	KubeClientSet     kubernetes.Interface
+	ClusterManager    *cm.ClusterManager
+	ClusterMap        clusters.ClusterMap
+	CloudProvider     cloud.Provider
+	Model             *CostModel
+	MetricsEmitter    *CostModelMetricsEmitter
+	OutOfClusterCache *cache.Cache
+	AggregateCache    *cache.Cache
+	CostDataCache     *cache.Cache
+	ClusterCostsCache *cache.Cache
+	CacheExpiration   map[string]time.Duration
+	AggAPI            CostModelAggregator
 }
 
 // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
@@ -424,7 +407,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 		offset = "offset " + offset
 	}
 
-	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.CloudProvider, window, offset, namespace)
+	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
 
 	if fields != "" {
 		filteredData := filterFields(fields, data)
@@ -497,7 +480,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	}
 
 	resolutionHours := 1.0
-	data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.CloudProvider, start, end, window, resolutionHours, namespace, cluster, remoteEnabled, "")
+	data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, start, end, window, resolutionHours, namespace, cluster, remoteEnabled, "")
 	if err != nil {
 		w.Write(WrapData(nil, err))
 	}
@@ -1022,102 +1005,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	// TODO: implement a builder -> controller for stitching new features and other dependencies.
 	clusterManager := newClusterManager()
 
-	cpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "node_cpu_hourly_cost",
-		Help: "node_cpu_hourly_cost hourly cost for each cpu on this node",
-	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
-
-	ramGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "node_ram_hourly_cost",
-		Help: "node_ram_hourly_cost hourly cost for each gb of ram on this node",
-	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
-
-	gpuGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "node_gpu_hourly_cost",
-		Help: "node_gpu_hourly_cost hourly cost for each gpu on this node",
-	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
-
-	totalGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "node_total_hourly_cost",
-		Help: "node_total_hourly_cost Total node cost per hour",
-	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
-
-	spotGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "kubecost_node_is_spot",
-		Help: "kubecost_node_is_spot Cloud provider info about node preemptibility",
-	}, []string{"instance", "node", "instance_type", "region", "provider_id"})
-
-	pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "pv_hourly_cost",
-		Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
-	}, []string{"volumename", "persistentvolume", "provider_id"})
-
-	RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "container_memory_allocation_bytes",
-		Help: "container_memory_allocation_bytes Bytes of RAM used",
-	}, []string{"namespace", "pod", "container", "instance", "node"})
-
-	CPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "container_cpu_allocation",
-		Help: "container_cpu_allocation Percent of a single CPU used in a minute",
-	}, []string{"namespace", "pod", "container", "instance", "node"})
-
-	GPUAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "container_gpu_allocation",
-		Help: "container_gpu_allocation GPU used",
-	}, []string{"namespace", "pod", "container", "instance", "node"})
-	PVAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "pod_pvc_allocation",
-		Help: "pod_pvc_allocation Bytes used by a PVC attached to a pod",
-	}, []string{"namespace", "pod", "persistentvolumeclaim", "persistentvolume"})
-
-	NetworkZoneEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
-		Name: "kubecost_network_zone_egress_cost",
-		Help: "kubecost_network_zone_egress_cost Total cost per GB egress across zones",
-	})
-	NetworkRegionEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
-		Name: "kubecost_network_region_egress_cost",
-		Help: "kubecost_network_region_egress_cost Total cost per GB egress across regions",
-	})
-	NetworkInternetEgressRecorder := prometheus.NewGauge(prometheus.GaugeOpts{
-		Name: "kubecost_network_internet_egress_cost",
-		Help: "kubecost_network_internet_egress_cost Total cost per GB of internet egress.",
-	})
-	ClusterManagementCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{
-		Name: "kubecost_cluster_management_cost",
-		Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
-	}, []string{"provisioner_name"})
-	LBCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
-		Name: "kubecost_load_balancer_cost",
-		Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
-	}, []string{"ingress_ip", "namespace", "service_name"}) // assumes one ingress IP per load balancer
-
-	prometheus.MustRegister(cpuGv)
-	prometheus.MustRegister(ramGv)
-	prometheus.MustRegister(gpuGv)
-	prometheus.MustRegister(totalGv)
-	prometheus.MustRegister(pvGv)
-	prometheus.MustRegister(spotGv)
-	prometheus.MustRegister(RAMAllocation)
-	prometheus.MustRegister(CPUAllocation)
-	prometheus.MustRegister(PVAllocation)
-	prometheus.MustRegister(GPUAllocation)
-	prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
-	prometheus.MustRegister(ClusterManagementCostRecorder)
-	prometheus.MustRegister(LBCostRecorder)
-	prometheus.MustRegister(ServiceCollector{
-		KubeClusterCache: k8sCache,
-	})
-	prometheus.MustRegister(DeploymentCollector{
-		KubeClusterCache: k8sCache,
-	})
-	prometheus.MustRegister(StatefulsetCollector{
-		KubeClusterCache: k8sCache,
-	})
-	prometheus.MustRegister(ClusterInfoCollector{
-		KubeClientSet: kubeClientset,
-		Cloud:         cloudProvider,
-	})
+	// Initialize metrics here
 
 	remoteEnabled := env.IsRemoteEnabled()
 	if remoteEnabled {
@@ -1187,34 +1075,23 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	// 	log.Infof("Init: AggregateCostModel cache warming disabled")
 	// }
 
+	costModel := NewCostModel(k8sCache, clusterMap, scrapeInterval)
+	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
+
 	a := Accesses{
-		Router:                        httprouter.New(),
-		PrometheusClient:              promCli,
-		ThanosClient:                  thanosClient,
-		KubeClientSet:                 kubeClientset,
-		ClusterManager:                clusterManager,
-		ClusterMap:                    clusterMap,
-		CloudProvider:                 cloudProvider,
-		CPUPriceRecorder:              cpuGv,
-		RAMPriceRecorder:              ramGv,
-		GPUPriceRecorder:              gpuGv,
-		NodeTotalPriceRecorder:        totalGv,
-		NodeSpotRecorder:              spotGv,
-		RAMAllocationRecorder:         RAMAllocation,
-		CPUAllocationRecorder:         CPUAllocation,
-		GPUAllocationRecorder:         GPUAllocation,
-		PVAllocationRecorder:          PVAllocation,
-		NetworkZoneEgressRecorder:     NetworkZoneEgressRecorder,
-		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
-		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
-		PersistentVolumePriceRecorder: pvGv,
-		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
-		LBCostRecorder:                LBCostRecorder,
-		Model:                         NewCostModel(k8sCache, clusterMap, scrapeInterval),
-		AggregateCache:                aggregateCache,
-		CostDataCache:                 costDataCache,
-		ClusterCostsCache:             clusterCostsCache,
-		CacheExpiration:               cacheExpiration,
+		Router:            httprouter.New(),
+		PrometheusClient:  promCli,
+		ThanosClient:      thanosClient,
+		KubeClientSet:     kubeClientset,
+		ClusterManager:    clusterManager,
+		ClusterMap:        clusterMap,
+		CloudProvider:     cloudProvider,
+		Model:             costModel,
+		MetricsEmitter:    metricsEmitter,
+		AggregateCache:    aggregateCache,
+		CostDataCache:     costDataCache,
+		ClusterCostsCache: clusterCostsCache,
+		CacheExpiration:   cacheExpiration,
 	}
 	a.AggAPI = &a
 
@@ -1223,7 +1100,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		klog.V(1).Info("Failed to download pricing data: " + err.Error())
 	}
 
-	StartCostModelMetricRecording(&a)
+	a.MetricsEmitter.Start()
 
 	managerEndpoints := cm.NewClusterManagerEndpoints(a.ClusterManager)