2
0
Niko Kovacevic 6 жил өмнө
parent
commit
db32263ade

+ 267 - 101
pkg/cloud/awsprovider.go

@@ -44,6 +44,9 @@ const AthenaInfoUpdateType = "athenainfo"
 type AWS struct {
 type AWS struct {
 	Pricing                 map[string]*AWSProductTerms
 	Pricing                 map[string]*AWSProductTerms
 	SpotPricingByInstanceID map[string]*spotInfo
 	SpotPricingByInstanceID map[string]*spotInfo
+	RIPricingByInstanceID   map[string]*RIData
+	RIDataRunning           bool
+	RIDataLock              sync.RWMutex
 	ValidPricingKeys        map[string]bool
 	ValidPricingKeys        map[string]bool
 	Clientset               clustercache.ClusterCache
 	Clientset               clustercache.ClusterCache
 	BaseCPUPrice            string
 	BaseCPUPrice            string
@@ -60,7 +63,6 @@ type AWS struct {
 	SpotDataPrefix          string
 	SpotDataPrefix          string
 	ProjectID               string
 	ProjectID               string
 	DownloadPricingDataLock sync.RWMutex
 	DownloadPricingDataLock sync.RWMutex
-	ReservedInstances       []*AWSReservedInstance
 	Config                  *ProviderConfig
 	Config                  *ProviderConfig
 	*CustomProvider
 	*CustomProvider
 }
 }
@@ -207,7 +209,7 @@ var regionToBillingRegionCode = map[string]string{
 	"us-gov-west-1":  "UGW1",
 	"us-gov-west-1":  "UGW1",
 }
 }
 
 
-func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool) string {
+func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
 	return ""
 	return ""
 }
 }
 
 
@@ -510,15 +512,22 @@ func (aws *AWS) DownloadPricingData() error {
 		key := aws.GetPVKey(pv, params, "")
 		key := aws.GetPVKey(pv, params, "")
 		pvkeys[key.Features()] = key
 		pvkeys[key.Features()] = key
 	}
 	}
-
-	reserved, err := aws.getReservedInstances()
-	if err != nil {
-		klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
-	} else {
-		klog.V(1).Infof("Found %d reserved instances", len(reserved))
-		aws.ReservedInstances = reserved
-		for _, r := range reserved {
-			klog.V(1).Infof("%s", r)
+	if !aws.RIDataRunning && c.AthenaBucketName != "" {
+		err = aws.GetReservationDataFromAthena() // Block until one run has completed.
+		if err != nil {
+			klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
+		} else { // If we make one successful run, check on new reservation data every hour
+			go func() {
+				for {
+					aws.RIDataRunning = true
+					klog.Infof("Reserved Instance watcher running... next update in 1h")
+					time.Sleep(time.Hour)
+					err := aws.GetReservationDataFromAthena()
+					if err != nil {
+						klog.Infof("Error updating RI data: %s", err.Error())
+					}
+				}
+			}()
 		}
 		}
 	}
 	}
 
 
@@ -714,6 +723,8 @@ func (aws *AWS) AllNodePricing() (interface{}, error) {
 
 
 func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
 func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
 	key := k.Features()
 	key := k.Features()
+	aws.RIDataLock.RLock()
+	defer aws.RIDataLock.RUnlock()
 	if aws.isPreemptible(key) {
 	if aws.isPreemptible(key) {
 		if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
 		if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
 			var spotcost string
 			var spotcost string
@@ -747,6 +758,20 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseGPUPrice: aws.BaseGPUPrice,
 			BaseGPUPrice: aws.BaseGPUPrice,
 			UsageType:    usageType,
 			UsageType:    usageType,
 		}, nil
 		}, nil
+	} else if ri, ok := aws.RIPricingByInstanceID[k.ID()]; ok {
+		strCost := fmt.Sprintf("%f", ri.EffectiveCost)
+		return &Node{
+			Cost:         strCost,
+			VCPU:         terms.VCpu,
+			RAM:          terms.Memory,
+			GPU:          terms.GPU,
+			Storage:      terms.Storage,
+			BaseCPUPrice: aws.BaseCPUPrice,
+			BaseRAMPrice: aws.BaseRAMPrice,
+			BaseGPUPrice: aws.BaseGPUPrice,
+			UsageType:    usageType,
+		}, nil
+
 	}
 	}
 	c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
 	c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
 	if !ok {
 	if !ok {
@@ -1051,6 +1076,146 @@ func generateAWSGroupBy(lastIdx int) string {
 	return strings.Join(sequence, ",")
 	return strings.Join(sequence, ",")
 }
 }
 
 
+func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
+	customPricing, err := a.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	if customPricing.ServiceKeyName != "" {
+		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		if err != nil {
+			return nil, err
+		}
+	}
+	region := aws.String(customPricing.AthenaRegion)
+	resultsBucket := customPricing.AthenaBucketName
+	database := customPricing.AthenaDatabase
+	c := &aws.Config{
+		Region: region,
+	}
+	s := session.Must(session.NewSession(c))
+	svc := athena.New(s)
+
+	var e athena.StartQueryExecutionInput
+
+	var r athena.ResultConfiguration
+	r.SetOutputLocation(resultsBucket)
+	e.SetResultConfiguration(&r)
+
+	e.SetQueryString(query)
+	var q athena.QueryExecutionContext
+	q.SetDatabase(database)
+	e.SetQueryExecutionContext(&q)
+
+	res, err := svc.StartQueryExecution(&e)
+	if err != nil {
+		return nil, err
+	}
+
+	klog.V(2).Infof("StartQueryExecution result:")
+	klog.V(2).Infof(res.GoString())
+
+	var qri athena.GetQueryExecutionInput
+	qri.SetQueryExecutionId(*res.QueryExecutionId)
+
+	var qrop *athena.GetQueryExecutionOutput
+	duration := time.Duration(2) * time.Second // Pause for 2 seconds
+
+	for {
+		qrop, err = svc.GetQueryExecution(&qri)
+		if err != nil {
+			return nil, err
+		}
+		if *qrop.QueryExecution.Status.State != "RUNNING" {
+			break
+		}
+		time.Sleep(duration)
+	}
+	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
+
+		var ip athena.GetQueryResultsInput
+		ip.SetQueryExecutionId(*res.QueryExecutionId)
+
+		return svc.GetQueryResults(&ip)
+	} else {
+		return nil, fmt.Errorf("No results available for %s", query)
+	}
+}
+
+type RIData struct {
+	ResourceID     string
+	EffectiveCost  float64
+	ReservationARN string
+	MostRecentDate string
+}
+
+func (a *AWS) GetReservationDataFromAthena() error {
+	cfg, err := a.GetConfig()
+	if err != nil {
+		return err
+	}
+	if cfg.AthenaBucketName == "" {
+		return fmt.Errorf("No Athena Bucket configured")
+	}
+	if a.RIPricingByInstanceID == nil {
+		a.RIPricingByInstanceID = make(map[string]*RIData)
+	}
+	tNow := time.Now()
+	tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+	start := tOneDayAgo.Format("2006-01-02")
+	end := tNow.Format("2006-01-02")
+	q := `SELECT   
+		line_item_usage_start_date,
+		reservation_reservation_a_r_n,
+		line_item_resource_id,
+		reservation_effective_cost
+	FROM athena_test as cost_data
+	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
+	AND reservation_reservation_a_r_n <> '' ORDER BY 
+	line_item_usage_start_date DESC`
+	query := fmt.Sprintf(q, start, end)
+	op, err := a.QueryAthenaBillingData(query)
+	if err != nil {
+		return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
+	}
+	klog.Infof("Fetching RI data...")
+	if len(op.ResultSet.Rows) > 1 {
+		a.RIDataLock.Lock()
+		mostRecentDate := ""
+		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
+			d := *r.Data[0].VarCharValue
+			if mostRecentDate == "" {
+				mostRecentDate = d
+			} else if mostRecentDate != d { // Get all most recent assignments
+				break
+			}
+			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+			if err != nil {
+				klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
+			}
+			r := &RIData{
+				ResourceID:     *r.Data[2].VarCharValue,
+				EffectiveCost:  cost,
+				ReservationARN: *r.Data[1].VarCharValue,
+				MostRecentDate: d,
+			}
+			a.RIPricingByInstanceID[r.ResourceID] = r
+		}
+		klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
+		for k, r := range a.RIPricingByInstanceID {
+			klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
+		}
+		a.RIDataLock.Unlock()
+	} else {
+		klog.Infof("No reserved instance data found")
+	}
+	return nil
+}
+
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
@@ -1482,119 +1647,120 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 }
 }
 
 
 func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
 func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
-	numReserved := len(a.ReservedInstances)
+	/*
+		numReserved := len(a.ReservedInstances)
 
 
-	// Early return if no reserved instance data loaded
-	if numReserved == 0 {
-		klog.V(4).Infof("[Reserved] No Reserved Instances")
-		return
-	}
+		// Early return if no reserved instance data loaded
+		if numReserved == 0 {
+			klog.V(4).Infof("[Reserved] No Reserved Instances")
+			return
+		}
 
 
-	cfg, err := a.GetConfig()
-	defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
-	if err != nil {
-		klog.V(3).Infof("Could not parse default cpu price")
-		defaultCPU = 0.031611
-	}
+		cfg, err := a.GetConfig()
+		defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
+		if err != nil {
+			klog.V(3).Infof("Could not parse default cpu price")
+			defaultCPU = 0.031611
+		}
 
 
-	defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
-	if err != nil {
-		klog.V(3).Infof("Could not parse default ram price")
-		defaultRAM = 0.004237
-	}
+		defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
+		if err != nil {
+			klog.V(3).Infof("Could not parse default ram price")
+			defaultRAM = 0.004237
+		}
 
 
-	cpuToRAMRatio := defaultCPU / defaultRAM
+		cpuToRAMRatio := defaultCPU / defaultRAM
 
 
-	now := time.Now()
+		now := time.Now()
 
 
-	instances := make(map[string][]*AWSReservedInstance)
-	for _, r := range a.ReservedInstances {
-		if now.Before(r.StartDate) || now.After(r.EndDate) {
-			klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
-			continue
-		}
+		instances := make(map[string][]*AWSReservedInstance)
+		for _, r := range a.ReservedInstances {
+			if now.Before(r.StartDate) || now.After(r.EndDate) {
+				klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
+				continue
+			}
 
 
-		_, ok := instances[r.Region]
-		if !ok {
-			instances[r.Region] = []*AWSReservedInstance{r}
-		} else {
-			instances[r.Region] = append(instances[r.Region], r)
+			_, ok := instances[r.Region]
+			if !ok {
+				instances[r.Region] = []*AWSReservedInstance{r}
+			} else {
+				instances[r.Region] = append(instances[r.Region], r)
+			}
 		}
 		}
-	}
 
 
-	awsNodes := make(map[string]*v1.Node)
-	currentNodes := a.Clientset.GetAllNodes()
+		awsNodes := make(map[string]*v1.Node)
+		currentNodes := a.Clientset.GetAllNodes()
 
 
-	// Create a node name -> node map
-	for _, awsNode := range currentNodes {
-		awsNodes[awsNode.GetName()] = awsNode
-	}
+		// Create a node name -> node map
+		for _, awsNode := range currentNodes {
+			awsNodes[awsNode.GetName()] = awsNode
+		}
 
 
-	// go through all provider nodes using k8s nodes for region
-	for nodeName, node := range nodes {
-		// Reset reserved allocation to prevent double allocation
-		node.Reserved = nil
+		// go through all provider nodes using k8s nodes for region
+		for nodeName, node := range nodes {
+			// Reset reserved allocation to prevent double allocation
+			node.Reserved = nil
 
 
-		kNode, ok := awsNodes[nodeName]
-		if !ok {
-			klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
-			continue
-		}
+			kNode, ok := awsNodes[nodeName]
+			if !ok {
+				klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
+				continue
+			}
 
 
-		nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
-		if !ok {
-			klog.V(1).Infof("[Reserved] Could not find node region")
-			continue
-		}
+			nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
+			if !ok {
+				klog.V(1).Infof("[Reserved] Could not find node region")
+				continue
+			}
 
 
-		reservedInstances, ok := instances[nodeRegion]
-		if !ok {
-			klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
-			continue
-		}
+			reservedInstances, ok := instances[nodeRegion]
+			if !ok {
+				klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
+				continue
+			}
 
 
-		// Determine the InstanceType of the node
-		instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
-		if !ok {
-			continue
-		}
+			// Determine the InstanceType of the node
+			instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
+			if !ok {
+				continue
+			}
 
 
-		ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
-		if err != nil {
-			continue
-		}
-		ramGB := ramBytes / 1024 / 1024 / 1024
+			ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
+			if err != nil {
+				continue
+			}
+			ramGB := ramBytes / 1024 / 1024 / 1024
 
 
-		cpu, err := strconv.ParseFloat(node.VCPU, 64)
-		if err != nil {
-			continue
-		}
+			cpu, err := strconv.ParseFloat(node.VCPU, 64)
+			if err != nil {
+				continue
+			}
 
 
-		ramMultiple := cpu*cpuToRAMRatio + ramGB
+			ramMultiple := cpu*cpuToRAMRatio + ramGB
 
 
-		node.Reserved = &ReservedInstanceData{
-			ReservedCPU: 0,
-			ReservedRAM: 0,
-		}
+			node.Reserved = &ReservedInstanceData{
+				ReservedCPU: 0,
+				ReservedRAM: 0,
+			}
 
 
-		for i, reservedInstance := range reservedInstances {
-			if reservedInstance.InstanceType == instanceType {
-				// Use < 0 to mark as ALL
-				node.Reserved.ReservedCPU = -1
-				node.Reserved.ReservedRAM = -1
+			for i, reservedInstance := range reservedInstances {
+				if reservedInstance.InstanceType == instanceType {
+					// Use < 0 to mark as ALL
+					node.Reserved.ReservedCPU = -1
+					node.Reserved.ReservedRAM = -1
 
 
-				// Set Costs based on CPU/RAM ratios
-				ramPrice := reservedInstance.PricePerHour / ramMultiple
-				node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
-				node.Reserved.RAMCost = ramPrice
+					// Set Costs based on CPU/RAM ratios
+					ramPrice := reservedInstance.PricePerHour / ramMultiple
+					node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
+					node.Reserved.RAMCost = ramPrice
 
 
-				// Remove the reserve from the temporary slice to prevent
-				// being reallocated
-				instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
-				break
+					// Remove the reserve from the temporary slice to prevent
+					// being reallocated
+					instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
+					break
+				}
 			}
 			}
-		}
-	}
+		}*/
 }
 }
 
 
 type AWSReservedInstance struct {
 type AWSReservedInstance struct {

+ 3 - 3
pkg/cloud/azureprovider.go

@@ -408,8 +408,8 @@ func (az *Azure) DownloadPricingData() error {
 						for _, rate := range v.MeterRates {
 						for _, rate := range v.MeterRates {
 							priceInUsd += *rate
 							priceInUsd += *rate
 						}
 						}
-						// rate is in GB per month, resolve to GB per hour
-						pricePerHour := priceInUsd / 730.0
+						// rate is in disk per month, resolve price per hour, then GB per hour
+						pricePerHour := priceInUsd / 730.0 / 32.0
 						priceStr := fmt.Sprintf("%f", pricePerHour)
 						priceStr := fmt.Sprintf("%f", pricePerHour)
 
 
 						key := region + "," + storageClass
 						key := region + "," + storageClass
@@ -688,6 +688,6 @@ func (az *Azure) PVPricing(pvk PVKey) (*PV, error) {
 	return pricing.PV, nil
 	return pricing.PV, nil
 }
 }
 
 
-func (az *Azure) GetLocalStorageQuery(window, offset string, rate bool) string {
+func (az *Azure) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
 	return ""
 	return ""
 }
 }

+ 1 - 1
pkg/cloud/customprovider.go

@@ -37,7 +37,7 @@ type customProviderKey struct {
 	Labels         map[string]string
 	Labels         map[string]string
 }
 }
 
 
-func (*CustomProvider) GetLocalStorageQuery(window, offset string, rate bool) string {
+func (*CustomProvider) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
 	return ""
 	return ""
 }
 }
 
 

+ 11 - 4
pkg/cloud/gcpprovider.go

@@ -115,22 +115,29 @@ func gcpAllocationToOutOfClusterAllocation(gcpAlloc gcpAllocation) *OutOfCluster
 	}
 	}
 }
 }
 
 
-func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool) string {
+// GetLocalStorageQuery returns the cost of local storage for the given window. Setting rate=true
+// returns hourly spend. Setting used=true only tracks used storage, not total.
+func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
 	// TODO Set to the price for the appropriate storage class. It's not trivial to determine the local storage disk type
 	// TODO Set to the price for the appropriate storage class. It's not trivial to determine the local storage disk type
 	// See https://cloud.google.com/compute/disks-image-pricing#persistentdisk
 	// See https://cloud.google.com/compute/disks-image-pricing#persistentdisk
 	localStorageCost := 0.04
 	localStorageCost := 0.04
 
 
+	baseMetric := "container_fs_limit_bytes"
+	if used {
+		baseMetric = "container_fs_usage_bytes"
+	}
+
 	fmtOffset := ""
 	fmtOffset := ""
 	if offset != "" {
 	if offset != "" {
 		fmtOffset = fmt.Sprintf("offset %s", offset)
 		fmtOffset = fmt.Sprintf("offset %s", offset)
 	}
 	}
 
 
 	fmtCumulativeQuery := `sum(
 	fmtCumulativeQuery := `sum(
-		sum_over_time(container_fs_limit_bytes{device!="tmpfs", id="/"}[%s:1m]%s)
+		sum_over_time(%s{device!="tmpfs", id="/"}[%s:1m]%s)
 	) by (cluster_id) / 60 / 730 / 1024 / 1024 / 1024 * %f`
 	) by (cluster_id) / 60 / 730 / 1024 / 1024 / 1024 * %f`
 
 
 	fmtMonthlyQuery := `sum(
 	fmtMonthlyQuery := `sum(
-		avg_over_time(container_fs_limit_bytes{device!="tmpfs", id="/"}[%s:1m]%s)
+		avg_over_time(%s{device!="tmpfs", id="/"}[%s:1m]%s)
 	) by (cluster_id) / 1024 / 1024 / 1024 * %f`
 	) by (cluster_id) / 1024 / 1024 / 1024 * %f`
 
 
 	fmtQuery := fmtCumulativeQuery
 	fmtQuery := fmtCumulativeQuery
@@ -138,7 +145,7 @@ func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool) string {
 		fmtQuery = fmtMonthlyQuery
 		fmtQuery = fmtMonthlyQuery
 	}
 	}
 
 
-	return fmt.Sprintf(fmtQuery, window, fmtOffset, localStorageCost)
+	return fmt.Sprintf(fmtQuery, baseMetric, window, fmtOffset, localStorageCost)
 }
 }
 
 
 func (gcp *GCP) GetConfig() (*CustomPricing, error) {
 func (gcp *GCP) GetConfig() (*CustomPricing, error) {

+ 1 - 1
pkg/cloud/provider.go

@@ -175,7 +175,7 @@ type Provider interface {
 	UpdateConfigFromConfigMap(map[string]string) (*CustomPricing, error)
 	UpdateConfigFromConfigMap(map[string]string) (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
 	GetManagementPlatform() (string, error)
-	GetLocalStorageQuery(string, string, bool) string
+	GetLocalStorageQuery(string, string, bool, bool) string
 	ExternalAllocations(string, string, []string, string, string, bool) ([]*OutOfClusterAllocation, error)
 	ExternalAllocations(string, string, []string, string, string, bool) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 	ApplyReservedInstancePricing(map[string]*Node)
 }
 }

+ 188 - 67
pkg/costmodel/cluster.go

@@ -38,23 +38,23 @@ const (
 
 
 // TODO move this to a package-accessible helper
 // TODO move this to a package-accessible helper
 type PromQueryContext struct {
 type PromQueryContext struct {
-	client prometheus.Client
-	ec     *util.ErrorCollector
-	wg     *sync.WaitGroup
+	Client         prometheus.Client
+	ErrorCollector *util.ErrorCollector
+	WaitGroup      *sync.WaitGroup
 }
 }
 
 
 // TODO move this to a package-accessible helper function once dependencies are able to
 // TODO move this to a package-accessible helper function once dependencies are able to
 // be extricated from costmodel package (PromQueryResult -> util.Vector). Otherwise, circular deps.
 // be extricated from costmodel package (PromQueryResult -> util.Vector). Otherwise, circular deps.
 func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
 func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
-	if ctx.wg != nil {
-		defer ctx.wg.Done()
+	if ctx.WaitGroup != nil {
+		defer ctx.WaitGroup.Done()
 	}
 	}
 
 
-	raw, promErr := Query(ctx.client, query)
-	ctx.ec.Report(promErr)
+	raw, promErr := Query(ctx.Client, query)
+	ctx.ErrorCollector.Report(promErr)
 
 
 	results, parseErr := NewQueryResults(raw)
 	results, parseErr := NewQueryResults(raw)
-	ctx.ec.Report(parseErr)
+	ctx.ErrorCollector.Report(parseErr)
 
 
 	resultCh <- results
 	resultCh <- results
 }
 }
@@ -62,18 +62,30 @@ func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQuer
 // Costs represents cumulative and monthly cluster costs over a given duration. Costs
 // Costs represents cumulative and monthly cluster costs over a given duration. Costs
 // are broken down by cores, memory, and storage.
 // are broken down by cores, memory, and storage.
 type ClusterCosts struct {
 type ClusterCosts struct {
-	Start             *time.Time `json:"startTime"`
-	End               *time.Time `json:"endTime"`
-	CPUCumulative     float64    `json:"cpuCumulativeCost"`
-	CPUMonthly        float64    `json:"cpuMonthlyCost"`
-	GPUCumulative     float64    `json:"gpuCumulativeCost"`
-	GPUMonthly        float64    `json:"gpuMonthlyCost"`
-	RAMCumulative     float64    `json:"ramCumulativeCost"`
-	RAMMonthly        float64    `json:"ramMonthlyCost"`
-	StorageCumulative float64    `json:"storageCumulativeCost"`
-	StorageMonthly    float64    `json:"storageMonthlyCost"`
-	TotalCumulative   float64    `json:"totalCumulativeCost"`
-	TotalMonthly      float64    `json:"totalMonthlyCost"`
+	Start             *time.Time             `json:"startTime"`
+	End               *time.Time             `json:"endTime"`
+	CPUCumulative     float64                `json:"cpuCumulativeCost"`
+	CPUMonthly        float64                `json:"cpuMonthlyCost"`
+	CPUBreakdown      *ClusterCostsBreakdown `json:"cpuBreakdown"`
+	GPUCumulative     float64                `json:"gpuCumulativeCost"`
+	GPUMonthly        float64                `json:"gpuMonthlyCost"`
+	RAMCumulative     float64                `json:"ramCumulativeCost"`
+	RAMMonthly        float64                `json:"ramMonthlyCost"`
+	RAMBreakdown      *ClusterCostsBreakdown `json:"ramBreakdown"`
+	StorageCumulative float64                `json:"storageCumulativeCost"`
+	StorageMonthly    float64                `json:"storageMonthlyCost"`
+	StorageBreakdown  *ClusterCostsBreakdown `json:"storageBreakdown"`
+	TotalCumulative   float64                `json:"totalCumulativeCost"`
+	TotalMonthly      float64                `json:"totalMonthlyCost"`
+}
+
+// ClusterCostsBreakdown provides percentage-based breakdown of a resource by
+// categories: user for user-space (i.e. non-system) usage, system, and idle.
+type ClusterCostsBreakdown struct {
+	Idle   float64 `json:"idle"`
+	Other  float64 `json:"other"`
+	System float64 `json:"system"`
+	User   float64 `json:"user"`
 }
 }
 
 
 // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
 // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
@@ -112,42 +124,6 @@ func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offse
 	return cc, nil
 	return cc, nil
 }
 }
 
 
-// NewClusterCostsFromMonthly takes monthly-rate cost data over a given time range, computes
-// the associated cumulative cost data, and returns the Costs.
-func NewClusterCostsFromMonthly(cpuMonthly, gpuMonthly, ramMonthly, storageMonthly float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
-	start, end, err := util.ParseTimeRange(window, offset)
-	if err != nil {
-		return nil, err
-	}
-
-	// If the number of hours is not given (i.e. is zero) compute one from the window and offset
-	if dataHours == 0 {
-		dataHours = end.Sub(*start).Hours()
-	}
-
-	// Do not allow zero-length windows to prevent divide-by-zero issues
-	if dataHours == 0 {
-		return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
-	}
-
-	cc := &ClusterCosts{
-		Start:             start,
-		End:               end,
-		CPUMonthly:        cpuMonthly,
-		GPUMonthly:        gpuMonthly,
-		RAMMonthly:        ramMonthly,
-		StorageMonthly:    storageMonthly,
-		TotalMonthly:      cpuMonthly + gpuMonthly + ramMonthly + storageMonthly,
-		CPUCumulative:     cpuMonthly / util.HoursPerMonth * dataHours,
-		GPUCumulative:     gpuMonthly / util.HoursPerMonth * dataHours,
-		RAMCumulative:     ramMonthly / util.HoursPerMonth * dataHours,
-		StorageCumulative: storageMonthly / util.HoursPerMonth * dataHours,
-	}
-	cc.TotalCumulative = cc.CPUCumulative + cc.GPUCumulative + cc.RAMCumulative + cc.StorageCumulative
-
-	return cc, nil
-}
-
 // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
 // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
 func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*ClusterCosts, error) {
 func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*ClusterCosts, error) {
 	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
 	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
@@ -178,7 +154,22 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
 		avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
 	) by (cluster_id) %s`
 	) by (cluster_id) %s`
 
 
-	queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false)
+	const fmtQueryCPUModePct = `sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
+	group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)`
+
+	const fmtQueryRAMSystemPct = `sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:1m]%s)) by (cluster_id)
+	/ sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1m]%s)) by (cluster_id)`
+
+	const fmtQueryRAMUserPct = `sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:1m]%s)) by (cluster_id)
+	/ sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1m]%s)) by (cluster_id)`
+
+	// TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
+	// const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
+	// group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
+
+	queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
+
+	queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
 	if queryTotalLocalStorage != "" {
 	if queryTotalLocalStorage != "" {
 		queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
 		queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
 	}
 	}
@@ -193,19 +184,27 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
 	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
 	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
 	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
 	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset, queryTotalLocalStorage)
 	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset, queryTotalLocalStorage)
-	numQueries := 5
+	queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
+	queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, fmtOffset, window, fmtOffset)
+	queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, fmtOffset, window, fmtOffset)
+
+	numQueries := 9
 
 
 	klog.V(4).Infof("[Debug] queryDataCount: %s", queryDataCount)
 	klog.V(4).Infof("[Debug] queryDataCount: %s", queryDataCount)
 	klog.V(4).Infof("[Debug] queryTotalGPU: %s", queryTotalGPU)
 	klog.V(4).Infof("[Debug] queryTotalGPU: %s", queryTotalGPU)
 	klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
 	klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
 	klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
 	klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
 	klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
 	klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
+	klog.V(4).Infof("[Debug] queryCPUModePct: %s", queryCPUModePct)
+	klog.V(4).Infof("[Debug] queryRAMSystemPct: %s", queryRAMSystemPct)
+	klog.V(4).Infof("[Debug] queryRAMUserPct: %s", queryRAMUserPct)
+	klog.V(4).Infof("[Debug] queryUsedLocalStorage: %s", queryUsedLocalStorage)
 
 
 	// Submit queries to Prometheus asynchronously
 	// Submit queries to Prometheus asynchronously
 	var ec util.ErrorCollector
 	var ec util.ErrorCollector
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	ctx := PromQueryContext{client, &ec, &wg}
 	ctx := PromQueryContext{client, &ec, &wg}
-	ctx.wg.Add(numQueries)
+	ctx.WaitGroup.Add(numQueries)
 
 
 	chDataCount := make(chan []*PromQueryResult, 1)
 	chDataCount := make(chan []*PromQueryResult, 1)
 	go AsyncPromQuery(queryDataCount, chDataCount, ctx)
 	go AsyncPromQuery(queryDataCount, chDataCount, ctx)
@@ -222,6 +221,18 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 	chTotalStorage := make(chan []*PromQueryResult, 1)
 	chTotalStorage := make(chan []*PromQueryResult, 1)
 	go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
 	go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
 
 
+	chCPUModePct := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryCPUModePct, chCPUModePct, ctx)
+
+	chRAMSystemPct := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryRAMSystemPct, chRAMSystemPct, ctx)
+
+	chRAMUserPct := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryRAMUserPct, chRAMUserPct, ctx)
+
+	chUsedLocalStorage := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryUsedLocalStorage, chUsedLocalStorage, ctx)
+
 	// After queries complete, retrieve results
 	// After queries complete, retrieve results
 	wg.Wait()
 	wg.Wait()
 
 
@@ -240,6 +251,18 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 	resultsTotalStorage := <-chTotalStorage
 	resultsTotalStorage := <-chTotalStorage
 	close(chTotalStorage)
 	close(chTotalStorage)
 
 
+	resultsCPUModePct := <-chCPUModePct
+	close(chCPUModePct)
+
+	resultsRAMSystemPct := <-chRAMSystemPct
+	close(chRAMSystemPct)
+
+	resultsRAMUserPct := <-chRAMUserPct
+	close(chRAMUserPct)
+
+	resultsUsedLocalStorage := <-chUsedLocalStorage
+	close(chUsedLocalStorage)
+
 	defaultClusterID := os.Getenv(clusterIDKey)
 	defaultClusterID := os.Getenv(clusterIDKey)
 
 
 	dataMinsByCluster := map[string]float64{}
 	dataMinsByCluster := map[string]float64{}
@@ -257,12 +280,26 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		dataMinsByCluster[clusterID] = dataMins
 		dataMinsByCluster[clusterID] = dataMins
 	}
 	}
 
 
+	// Determine combined discount
+	discount, customDiscount := 0.0, 0.0
+	c, err := A.Cloud.GetConfig()
+	if err == nil {
+		discount, err = ParsePercentString(c.Discount)
+		if err != nil {
+			discount = 0.0
+		}
+		customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
+		if err != nil {
+			customDiscount = 0.0
+		}
+	}
+
 	// Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
 	// Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
 	costData := make(map[string]map[string]float64)
 	costData := make(map[string]map[string]float64)
 
 
 	// Helper function to iterate over Prom query results, parsing the raw values into
 	// Helper function to iterate over Prom query results, parsing the raw values into
 	// the intermediate costData structure.
 	// the intermediate costData structure.
-	setCostsFromResults := func(costData map[string]map[string]float64, results []*PromQueryResult, name string) {
+	setCostsFromResults := func(costData map[string]map[string]float64, results []*PromQueryResult, name string, discount float64, customDiscount float64) {
 		for _, result := range results {
 		for _, result := range results {
 			clusterID, _ := result.GetString("cluster_id")
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 			if clusterID == "" {
@@ -272,15 +309,86 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 				costData[clusterID] = map[string]float64{}
 				costData[clusterID] = map[string]float64{}
 			}
 			}
 			if len(result.Values) > 0 {
 			if len(result.Values) > 0 {
-				costData[clusterID][name] += result.Values[0].Value
-				costData[clusterID]["total"] += result.Values[0].Value
+				costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
+				costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
 			}
 			}
 		}
 		}
 	}
 	}
-	setCostsFromResults(costData, resultsTotalGPU, "gpu")
-	setCostsFromResults(costData, resultsTotalCPU, "cpu")
-	setCostsFromResults(costData, resultsTotalRAM, "ram")
-	setCostsFromResults(costData, resultsTotalStorage, "storage")
+	// Apply both sustained use and custom discounts to RAM and CPU
+	setCostsFromResults(costData, resultsTotalCPU, "cpu", discount, customDiscount)
+	setCostsFromResults(costData, resultsTotalRAM, "ram", discount, customDiscount)
+	// Apply only custom discount to GPU and storage
+	setCostsFromResults(costData, resultsTotalGPU, "gpu", 0.0, customDiscount)
+	setCostsFromResults(costData, resultsTotalStorage, "storage", 0.0, customDiscount)
+
+	cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
+	for _, result := range resultsCPUModePct {
+		clusterID, _ := result.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+		if _, ok := cpuBreakdownMap[clusterID]; !ok {
+			cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
+		}
+		cpuBD := cpuBreakdownMap[clusterID]
+
+		mode, err := result.GetString("mode")
+		if err != nil {
+			klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
+			mode = "other"
+		}
+
+		switch mode {
+		case "idle":
+			cpuBD.Idle += result.Values[0].Value
+		case "system":
+			cpuBD.System += result.Values[0].Value
+		case "user":
+			cpuBD.User += result.Values[0].Value
+		default:
+			cpuBD.Other += result.Values[0].Value
+		}
+	}
+
+	ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
+	for _, result := range resultsRAMSystemPct {
+		clusterID, _ := result.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+		if _, ok := ramBreakdownMap[clusterID]; !ok {
+			ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
+		}
+		ramBD := ramBreakdownMap[clusterID]
+		ramBD.System += result.Values[0].Value
+	}
+	for _, result := range resultsRAMUserPct {
+		clusterID, _ := result.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+		if _, ok := ramBreakdownMap[clusterID]; !ok {
+			ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
+		}
+		ramBD := ramBreakdownMap[clusterID]
+		ramBD.User += result.Values[0].Value
+	}
+	for _, ramBD := range ramBreakdownMap {
+		remaining := 1.0
+		remaining -= ramBD.Other
+		remaining -= ramBD.System
+		remaining -= ramBD.User
+		ramBD.Idle = remaining
+	}
+
+	pvUsedCostMap := map[string]float64{}
+	for _, result := range resultsUsedLocalStorage {
+		clusterID, _ := result.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+		pvUsedCostMap[clusterID] += result.Values[0].Value
+	}
 
 
 	// Convert intermediate structure to Costs instances
 	// Convert intermediate structure to Costs instances
 	costsByCluster := map[string]*ClusterCosts{}
 	costsByCluster := map[string]*ClusterCosts{}
@@ -295,6 +403,19 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
 			klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
 			return nil, err
 			return nil, err
 		}
 		}
+
+		if cpuBD, ok := cpuBreakdownMap[id]; ok {
+			costs.CPUBreakdown = cpuBD
+		}
+		if ramBD, ok := ramBreakdownMap[id]; ok {
+			costs.RAMBreakdown = ramBD
+		}
+		costs.StorageBreakdown = &ClusterCostsBreakdown{}
+		if pvUC, ok := pvUsedCostMap[id]; ok {
+			costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
+			costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
+		}
+
 		costsByCluster[id] = costs
 		costsByCluster[id] = costs
 	}
 	}
 
 
@@ -334,7 +455,7 @@ func resultToTotals(qr interface{}) ([][]string, error) {
 
 
 // ClusterCostsOverTime gives the full cluster costs over time
 // ClusterCostsOverTime gives the full cluster costs over time
 func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
 func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
-	localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
+	localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true, false)
 	if localStorageQuery != "" {
 	if localStorageQuery != "" {
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}
 	}