Browse Source

Merge pull request #561 from kubecost/develop

Merge develop into master 1.67.0
Ajay Tripathy 5 years ago
parent
commit
6bbfa8c26c

+ 9 - 0
CONTRIBUTING.md

@@ -22,6 +22,15 @@ To test, build the cost-model docker container and then push it to a Kubernetes
 
 
 To confirm that the server is running, you can hit [http://localhost:9003/costDataModel?timeWindow=1d](http://localhost:9003/costDataModel?timeWindow=1d)
 To confirm that the server is running, you can hit [http://localhost:9003/costDataModel?timeWindow=1d](http://localhost:9003/costDataModel?timeWindow=1d)
 
 
+## Running locally ##
+
+In order to run cost-model locally, or outside of the runtime of a Kubernetes cluster, you can set the environment variable `KUBECONFIG_PATH`.
+
+Example:
+```bash
+export KUBECONFIG_PATH=~/.kube/config
+```
+
 ## Running the integration tests ##
 ## Running the integration tests ##
 To run these tests:
 To run these tests:
 * Make sure you have a kubeconfig that can point to your cluster, and have permissions to create/modify a namespace called "test"
 * Make sure you have a kubeconfig that can point to your cluster, and have permissions to create/modify a namespace called "test"

+ 1 - 1
Dockerfile

@@ -15,7 +15,7 @@ RUN set -e ;\
     CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
     CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
     go build -a -installsuffix cgo -o /go/bin/app
     go build -a -installsuffix cgo -o /go/bin/app
 
 
-FROM alpine:3.10.2
+FROM alpine:3.11.6
 RUN apk add --update --no-cache ca-certificates
 RUN apk add --update --no-cache ca-certificates
 COPY --from=build-env /go/bin/app /go/bin/app
 COPY --from=build-env /go/bin/app /go/bin/app
 ADD ./configs/default.json /models/default.json
 ADD ./configs/default.json /models/default.json

+ 2 - 0
go.sum

@@ -559,6 +559,8 @@ k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6 h1:tGU1C/vMoUV2ZakSH6wQq2
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6/go.mod h1:nL6pwRT8NgfF8TT68DBI8uEePRt89cSvoXUVqbkWHq4=
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6/go.mod h1:nL6pwRT8NgfF8TT68DBI8uEePRt89cSvoXUVqbkWHq4=
 k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
 k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
 k8s.io/apimachinery v0.19.0 h1:gjKnAda/HZp5k4xQYjL0K/Yb66IvNqjthCb03QlKpaQ=
 k8s.io/apimachinery v0.19.0 h1:gjKnAda/HZp5k4xQYjL0K/Yb66IvNqjthCb03QlKpaQ=
+k8s.io/apimachinery v0.19.1 h1:cwsxZazM/LA9aUsBaL4bRS5ygoM6bYp8dFk22DSYQa4=
+k8s.io/apimachinery v0.19.2 h1:5Gy9vQpAGTKHPVOh5c4plE274X8D/6cuEiTO2zve7tc=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5 h1:BwY2C//EoWktJi74O6R2REBonrhsfhRI0qfVwOjOPp8=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5 h1:BwY2C//EoWktJi74O6R2REBonrhsfhRI0qfVwOjOPp8=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5/go.mod h1:bIEHXHbykaOlj+pgLllzLJ2RPGdzkjtqdk0Il07KPEM=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5/go.mod h1:bIEHXHbykaOlj+pgLllzLJ2RPGdzkjtqdk0Il07KPEM=
 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab h1:E8Fecph0qbNsAbijJJQryKu4Oi9QTp5cVpjTE+nqg6g=
 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab h1:E8Fecph0qbNsAbijJJQryKu4Oi9QTp5cVpjTE+nqg6g=

+ 38 - 0
pkg/cloud/awsprovider.go

@@ -460,6 +460,7 @@ type awsPVKey struct {
 	StorageClassName       string
 	StorageClassName       string
 	Name                   string
 	Name                   string
 	DefaultRegion          string
 	DefaultRegion          string
+	ProviderID             string
 }
 }
 
 
 func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
 func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
@@ -469,9 +470,14 @@ func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string,
 		StorageClassParameters: parameters,
 		StorageClassParameters: parameters,
 		Name:                   pv.Name,
 		Name:                   pv.Name,
 		DefaultRegion:          defaultRegion,
 		DefaultRegion:          defaultRegion,
+		ProviderID:             pv.Spec.AWSElasticBlockStore.VolumeID,
 	}
 	}
 }
 }
 
 
+func (key *awsPVKey) ID() string {
+	return key.ProviderID
+}
+
 func (key *awsPVKey) GetStorageClass() string {
 func (key *awsPVKey) GetStorageClass() string {
 	return key.StorageClassName
 	return key.StorageClassName
 }
 }
@@ -848,6 +854,25 @@ func (aws *AWS) NetworkPricing() (*Network, error) {
 	}, nil
 	}, nil
 }
 }
 
 
+func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
+	fffrc := 0.025
+	afrc := 0.010
+	lbidc := 0.008
+
+	numForwardingRules := 1.0
+	dataIngressGB := 0.0
+
+	var totalCost float64
+	if numForwardingRules < 5 {
+		totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
+	} else {
+		totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
+	}
+	return &LoadBalancer{
+		Cost: totalCost,
+	}, nil
+}
+
 // AllNodePricing returns all the billing data fetched.
 // AllNodePricing returns all the billing data fetched.
 func (aws *AWS) AllNodePricing() (interface{}, error) {
 func (aws *AWS) AllNodePricing() (interface{}, error) {
 	aws.DownloadPricingDataLock.RLock()
 	aws.DownloadPricingDataLock.RLock()
@@ -2427,3 +2452,16 @@ func (aws *AWS) ParseID(id string) string {
 
 
 	return match[1]
 	return match[1]
 }
 }
+
+func (aws *AWS) ParsePVID(id string) string {
+	rx := regexp.MustCompile("aws:/[^/]*/[^/]*/([^/]+)") // Capture "vol-0fc54c5e83b8d2b76" from "aws://us-east-2a/vol-0fc54c5e83b8d2b76"
+	match := rx.FindStringSubmatch(id)
+	if len(match) < 2 {
+		if id != "" {
+			log.Infof("awsprovider.ParseID: failed to parse %s", id)
+		}
+		return id
+	}
+
+	return match[1]
+}

+ 27 - 0
pkg/cloud/azureprovider.go

@@ -650,6 +650,25 @@ func (az *Azure) NetworkPricing() (*Network, error) {
 	}, nil
 	}, nil
 }
 }
 
 
+func (azr *Azure) LoadBalancerPricing() (*LoadBalancer, error) {
+	fffrc := 0.025
+	afrc := 0.010
+	lbidc := 0.008
+
+	numForwardingRules := 1.0
+	dataIngressGB := 0.0
+
+	var totalCost float64
+	if numForwardingRules < 5 {
+		totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
+	} else {
+		totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
+	}
+	return &LoadBalancer{
+		Cost: totalCost,
+	}, nil
+}
+
 type azurePvKey struct {
 type azurePvKey struct {
 	Labels                 map[string]string
 	Labels                 map[string]string
 	StorageClass           string
 	StorageClass           string
@@ -666,6 +685,10 @@ func (az *Azure) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string,
 	}
 	}
 }
 }
 
 
+func (key *azurePvKey) ID() string {
+	return ""
+}
+
 func (key *azurePvKey) GetStorageClass() string {
 func (key *azurePvKey) GetStorageClass() string {
 	return key.StorageClass
 	return key.StorageClass
 }
 }
@@ -823,3 +846,7 @@ func (az *Azure) CombinedDiscountForNode(instanceType string, isPreemptible bool
 func (az *Azure) ParseID(id string) string {
 func (az *Azure) ParseID(id string) string {
 	return id
 	return id
 }
 }
+
+func (az *Azure) ParsePVID(id string) string {
+	return id
+}

+ 8 - 0
pkg/cloud/csvprovider.go

@@ -256,6 +256,10 @@ type csvPVKey struct {
 	DefaultRegion          string
 	DefaultRegion          string
 }
 }
 
 
+func (key *csvPVKey) ID() string {
+	return ""
+}
+
 func (key *csvPVKey) GetStorageClass() string {
 func (key *csvPVKey) GetStorageClass() string {
 	return key.StorageClassName
 	return key.StorageClassName
 }
 }
@@ -306,3 +310,7 @@ func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible
 func (c *CSVProvider) ParseID(id string) string {
 func (c *CSVProvider) ParseID(id string) string {
 	return id
 	return id
 }
 }
+
+func (c *CSVProvider) ParsePVID(id string) string {
+	return id
+}

+ 35 - 0
pkg/cloud/customprovider.go

@@ -240,6 +240,37 @@ func (cp *CustomProvider) NetworkPricing() (*Network, error) {
 	}, nil
 	}, nil
 }
 }
 
 
+func (cp *CustomProvider) LoadBalancerPricing() (*LoadBalancer, error) {
+	cpricing, err := cp.Config.GetCustomPricingData()
+	if err != nil {
+		return nil, err
+	}
+	fffrc, err := strconv.ParseFloat(cpricing.FirstFiveForwardingRulesCost, 64)
+	if err != nil {
+		return nil, err
+	}
+	afrc, err := strconv.ParseFloat(cpricing.AdditionalForwardingRuleCost, 64)
+	if err != nil {
+		return nil, err
+	}
+	lbidc, err := strconv.ParseFloat(cpricing.LBIngressDataCost, 64)
+	if err != nil {
+		return nil, err
+	}
+	var totalCost float64
+	numForwardingRules := 1.0 // hard-code at 1 for now
+	dataIngressGB := 0.0      // hard-code at 0 for now
+
+	if numForwardingRules < 5 {
+		totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
+	} else {
+		totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
+	}
+	return &LoadBalancer{
+		Cost: totalCost,
+	}, nil
+}
+
 func (*CustomProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
 func (*CustomProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
 	return &awsPVKey{
 	return &awsPVKey{
 		Labels:                 pv.Labels,
 		Labels:                 pv.Labels,
@@ -280,3 +311,7 @@ func (cp *CustomProvider) CombinedDiscountForNode(instanceType string, isPreempt
 func (cp *CustomProvider) ParseID(id string) string {
 func (cp *CustomProvider) ParseID(id string) string {
 	return id
 	return id
 }
 }
+
+func (cp *CustomProvider) ParsePVID(id string) string {
+	return id
+}

+ 27 - 0
pkg/cloud/gcpprovider.go

@@ -1048,6 +1048,25 @@ func (gcp *GCP) NetworkPricing() (*Network, error) {
 	}, nil
 	}, nil
 }
 }
 
 
+func (gcp *GCP) LoadBalancerPricing() (*LoadBalancer, error) {
+	fffrc := 0.025
+	afrc := 0.010
+	lbidc := 0.008
+
+	numForwardingRules := 1.0
+	dataIngressGB := 0.0
+
+	var totalCost float64
+	if numForwardingRules < 5 {
+		totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
+	} else {
+		totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
+	}
+	return &LoadBalancer{
+		Cost: totalCost,
+	}, nil
+}
+
 const (
 const (
 	GCPReservedInstanceResourceTypeRAM string = "MEMORY"
 	GCPReservedInstanceResourceTypeRAM string = "MEMORY"
 	GCPReservedInstanceResourceTypeCPU string = "VCPU"
 	GCPReservedInstanceResourceTypeCPU string = "VCPU"
@@ -1278,6 +1297,10 @@ type pvKey struct {
 	DefaultRegion          string
 	DefaultRegion          string
 }
 }
 
 
+func (key *pvKey) ID() string {
+	return ""
+}
+
 func (key *pvKey) GetStorageClass() string {
 func (key *pvKey) GetStorageClass() string {
 	return key.StorageClass
 	return key.StorageClass
 }
 }
@@ -1439,3 +1462,7 @@ func (gcp *GCP) ParseID(id string) string {
 
 
 	return match[1]
 	return match[1]
 }
 }
+
+func (gcp *GCP) ParsePVID(id string) string {
+	return id
+}

+ 65 - 46
pkg/cloud/provider.go

@@ -70,6 +70,18 @@ func (n *Node) IsSpot() bool {
 	}
 	}
 }
 }
 
 
+// LoadBalancer is the interface by which the provider and cost model communicate LoadBalancer prices.
+// The provider will best-effort try to fill out this struct.
+type LoadBalancer struct {
+	IngressIPAddresses []string `json:"IngressIPAddresses"`
+	Cost               float64  `json:"hourlyCost"`
+}
+
+// TODO: used for dynamic cloud provider price fetching.
+// determine what identifies a load balancer in the json returned from the cloud provider pricing API call
+// type LBKey interface {
+// }
+
 // Network is the interface by which the provider and cost model communicate network egress prices.
 // Network is the interface by which the provider and cost model communicate network egress prices.
 // The provider will best-effort try to fill out this struct.
 // The provider will best-effort try to fill out this struct.
 type Network struct {
 type Network struct {
@@ -86,6 +98,7 @@ type PV struct {
 	Class      string            `json:"storageClass"`
 	Class      string            `json:"storageClass"`
 	Size       string            `json:"size"`
 	Size       string            `json:"size"`
 	Region     string            `json:"region"`
 	Region     string            `json:"region"`
+	ProviderID string            `json:"providerID,omitempty"`
 	Parameters map[string]string `json:"parameters"`
 	Parameters map[string]string `json:"parameters"`
 }
 }
 
 
@@ -99,6 +112,7 @@ type Key interface {
 type PVKey interface {
 type PVKey interface {
 	Features() string
 	Features() string
 	GetStorageClass() string
 	GetStorageClass() string
+	ID() string
 }
 }
 
 
 // OutOfClusterAllocation represents a cloud provider cost not associated with kubernetes
 // OutOfClusterAllocation represents a cloud provider cost not associated with kubernetes
@@ -111,51 +125,54 @@ type OutOfClusterAllocation struct {
 }
 }
 
 
 type CustomPricing struct {
 type CustomPricing struct {
-	Provider              string            `json:"provider"`
-	Description           string            `json:"description"`
-	CPU                   string            `json:"CPU"`
-	SpotCPU               string            `json:"spotCPU"`
-	RAM                   string            `json:"RAM"`
-	SpotRAM               string            `json:"spotRAM"`
-	GPU                   string            `json:"GPU"`
-	SpotGPU               string            `json:"spotGPU"`
-	Storage               string            `json:"storage"`
-	ZoneNetworkEgress     string            `json:"zoneNetworkEgress"`
-	RegionNetworkEgress   string            `json:"regionNetworkEgress"`
-	InternetNetworkEgress string            `json:"internetNetworkEgress"`
-	SpotLabel             string            `json:"spotLabel,omitempty"`
-	SpotLabelValue        string            `json:"spotLabelValue,omitempty"`
-	GpuLabel              string            `json:"gpuLabel,omitempty"`
-	GpuLabelValue         string            `json:"gpuLabelValue,omitempty"`
-	ServiceKeyName        string            `json:"awsServiceKeyName,omitempty"`
-	ServiceKeySecret      string            `json:"awsServiceKeySecret,omitempty"`
-	SpotDataRegion        string            `json:"awsSpotDataRegion,omitempty"`
-	SpotDataBucket        string            `json:"awsSpotDataBucket,omitempty"`
-	SpotDataPrefix        string            `json:"awsSpotDataPrefix,omitempty"`
-	ProjectID             string            `json:"projectID,omitempty"`
-	AthenaProjectID       string            `json:"athenaProjectID,omitempty"`
-	AthenaBucketName      string            `json:"athenaBucketName"`
-	AthenaRegion          string            `json:"athenaRegion"`
-	AthenaDatabase        string            `json:"athenaDatabase"`
-	AthenaTable           string            `json:"athenaTable"`
-	MasterPayerARN        string            `json:"masterPayerARN"`
-	BillingDataDataset    string            `json:"billingDataDataset,omitempty"`
-	CustomPricesEnabled   string            `json:"customPricesEnabled"`
-	DefaultIdle           string            `json:"defaultIdle"`
-	AzureSubscriptionID   string            `json:"azureSubscriptionID"`
-	AzureClientID         string            `json:"azureClientID"`
-	AzureClientSecret     string            `json:"azureClientSecret"`
-	AzureTenantID         string            `json:"azureTenantID"`
-	AzureBillingRegion    string            `json:"azureBillingRegion"`
-	CurrencyCode          string            `json:"currencyCode"`
-	Discount              string            `json:"discount"`
-	NegotiatedDiscount    string            `json:"negotiatedDiscount"`
-	SharedCosts           map[string]string `json:"sharedCost"`
-	ClusterName           string            `json:"clusterName"`
-	SharedNamespaces      string            `json:"sharedNamespaces"`
-	SharedLabelNames      string            `json:"sharedLabelNames"`
-	SharedLabelValues     string            `json:"sharedLabelValues"`
-	ReadOnly              string            `json:"readOnly"`
+	Provider                     string            `json:"provider"`
+	Description                  string            `json:"description"`
+	CPU                          string            `json:"CPU"`
+	SpotCPU                      string            `json:"spotCPU"`
+	RAM                          string            `json:"RAM"`
+	SpotRAM                      string            `json:"spotRAM"`
+	GPU                          string            `json:"GPU"`
+	SpotGPU                      string            `json:"spotGPU"`
+	Storage                      string            `json:"storage"`
+	ZoneNetworkEgress            string            `json:"zoneNetworkEgress"`
+	RegionNetworkEgress          string            `json:"regionNetworkEgress"`
+	InternetNetworkEgress        string            `json:"internetNetworkEgress"`
+	FirstFiveForwardingRulesCost string            `json:"firstFiveForwardingRulesCost"`
+	AdditionalForwardingRuleCost string            `json:"additionalForwardingRuleCost"`
+	LBIngressDataCost            string            `json:"LBIngressDataCost"`
+	SpotLabel                    string            `json:"spotLabel,omitempty"`
+	SpotLabelValue               string            `json:"spotLabelValue,omitempty"`
+	GpuLabel                     string            `json:"gpuLabel,omitempty"`
+	GpuLabelValue                string            `json:"gpuLabelValue,omitempty"`
+	ServiceKeyName               string            `json:"awsServiceKeyName,omitempty"`
+	ServiceKeySecret             string            `json:"awsServiceKeySecret,omitempty"`
+	SpotDataRegion               string            `json:"awsSpotDataRegion,omitempty"`
+	SpotDataBucket               string            `json:"awsSpotDataBucket,omitempty"`
+	SpotDataPrefix               string            `json:"awsSpotDataPrefix,omitempty"`
+	ProjectID                    string            `json:"projectID,omitempty"`
+	AthenaProjectID              string            `json:"athenaProjectID,omitempty"`
+	AthenaBucketName             string            `json:"athenaBucketName"`
+	AthenaRegion                 string            `json:"athenaRegion"`
+	AthenaDatabase               string            `json:"athenaDatabase"`
+	AthenaTable                  string            `json:"athenaTable"`
+	MasterPayerARN               string            `json:"masterPayerARN"`
+	BillingDataDataset           string            `json:"billingDataDataset,omitempty"`
+	CustomPricesEnabled          string            `json:"customPricesEnabled"`
+	DefaultIdle                  string            `json:"defaultIdle"`
+	AzureSubscriptionID          string            `json:"azureSubscriptionID"`
+	AzureClientID                string            `json:"azureClientID"`
+	AzureClientSecret            string            `json:"azureClientSecret"`
+	AzureTenantID                string            `json:"azureTenantID"`
+	AzureBillingRegion           string            `json:"azureBillingRegion"`
+	CurrencyCode                 string            `json:"currencyCode"`
+	Discount                     string            `json:"discount"`
+	NegotiatedDiscount           string            `json:"negotiatedDiscount"`
+	SharedCosts                  map[string]string `json:"sharedCost"`
+	ClusterName                  string            `json:"clusterName"`
+	SharedNamespaces             string            `json:"sharedNamespaces"`
+	SharedLabelNames             string            `json:"sharedLabelNames"`
+	SharedLabelValues            string            `json:"sharedLabelValues"`
+	ReadOnly                     string            `json:"readOnly"`
 }
 }
 
 
 type ServiceAccountStatus struct {
 type ServiceAccountStatus struct {
@@ -175,7 +192,8 @@ type Provider interface {
 	GetDisks() ([]byte, error)
 	GetDisks() ([]byte, error)
 	NodePricing(Key) (*Node, error)
 	NodePricing(Key) (*Node, error)
 	PVPricing(PVKey) (*PV, error)
 	PVPricing(PVKey) (*PV, error)
-	NetworkPricing() (*Network, error)
+	NetworkPricing() (*Network, error)           // TODO: add key interface arg for dynamic price fetching
+	LoadBalancerPricing() (*LoadBalancer, error) // TODO: add key interface arg for dynamic price fetching
 	AllNodePricing() (interface{}, error)
 	AllNodePricing() (interface{}, error)
 	DownloadPricingData() error
 	DownloadPricingData() error
 	GetKey(map[string]string, *v1.Node) Key
 	GetKey(map[string]string, *v1.Node) Key
@@ -191,6 +209,7 @@ type Provider interface {
 	ClusterManagementPricing() (string, float64, error)
 	ClusterManagementPricing() (string, float64, error)
 	CombinedDiscountForNode(string, bool, float64, float64) float64
 	CombinedDiscountForNode(string, bool, float64, float64) float64
 	ParseID(string) string
 	ParseID(string) string
+	ParsePVID(string) string
 }
 }
 
 
 // ClusterName returns the name defined in cluster info, defaulting to the
 // ClusterName returns the name defined in cluster info, defaulting to the

+ 114 - 5
pkg/costmodel/cluster.go

@@ -139,7 +139,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	costPerGBHr := 0.04 / 730.0
 	costPerGBHr := 0.04 / 730.0
 
 
 	ctx := prom.NewContext(client)
 	ctx := prom.NewContext(client)
-	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume)  * on(cluster_id, persistentvolume) group_right avg(pv_hourly_cost) by (cluster_id, persistentvolume,provider_id))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 
@@ -193,6 +193,10 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 			}
 			}
 		}
 		}
 		diskMap[key].Cost += cost
 		diskMap[key].Cost += cost
+		providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
+		if providerID != "" {
+			diskMap[key].ProviderID = provider.ParsePVID(providerID)
+		}
 	}
 	}
 
 
 	for _, result := range resPVSize {
 	for _, result := range resPVSize {
@@ -287,9 +291,10 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 			diskMap[key] = &Disk{
-				Cluster: cluster,
-				Name:    name,
-				Local:   true,
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
+				Local:     true,
 			}
 			}
 		}
 		}
 		diskMap[key].Bytes = bytes
 		diskMap[key].Bytes = bytes
@@ -417,7 +422,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node, provider_id))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost * %d.0 / 60.0) by (cluster_id, node, provider_id))[%s:%dm]%s)`, minsPerResolution, durationStr, minsPerResolution, offsetStr)
 	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
@@ -783,6 +788,110 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	return nodeMap, nil
 	return nodeMap, nil
 }
 }
 
 
+type LoadBalancer struct {
+	Cluster    string
+	Name       string
+	ProviderID string
+	Cost       float64
+	Start      time.Time
+	Minutes    float64
+}
+
+func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, []error) {
+	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
+	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
+	if offset < time.Minute {
+		offsetStr = ""
+	}
+
+	// minsPerResolution determines accuracy and resource use for the following
+	// queries. Smaller values (higher resolution) result in better accuracy,
+	// but more expensive queries, and vice-a-versa.
+	minsPerResolution := 5
+
+	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
+	// value, converts it to a cumulative value; i.e.
+	// [$/hr] * [min/res]*[hr/min] = [$/res]
+	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
+
+	ctx := prom.NewContext(client)
+	queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+
+	resChLBCost := ctx.Query(queryLBCost)
+	resChActiveMins := ctx.Query(queryActiveMins)
+
+	resLBCost, _ := resChLBCost.Await()
+	resActiveMins, _ := resChActiveMins.Await()
+
+	if ctx.ErrorCollector.IsError() {
+		return nil, ctx.Errors()
+	}
+
+	loadBalancerMap := map[string]*LoadBalancer{}
+
+	for _, result := range resLBCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+		namespace, err := result.GetString("namespace")
+		if err != nil {
+			log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
+			continue
+		}
+		serviceName, err := result.GetString("service_name")
+		if err != nil {
+			log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
+			continue
+		}
+		providerID := ""
+		lbCost := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
+		if _, ok := loadBalancerMap[key]; !ok {
+			loadBalancerMap[key] = &LoadBalancer{
+				Cluster:    cluster,
+				Name:       namespace + "/" + serviceName,
+				ProviderID: providerID, // cp.ParseID(providerID) if providerID does get recorded later
+			}
+		}
+		loadBalancerMap[key].Cost += lbCost
+	}
+
+	for _, result := range resActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+		namespace, err := result.GetString("namespace")
+		if err != nil {
+			log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
+			continue
+		}
+		serviceName, err := result.GetString("service_name")
+		if err != nil {
+			log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
+			continue
+		}
+		key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		loadBalancerMap[key].Start = s
+		loadBalancerMap[key].Minutes = mins
+	}
+	return loadBalancerMap, nil
+}
+
 // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
 // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
 func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
 func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
 	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
 	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data

+ 32 - 0
pkg/costmodel/costmodel.go

@@ -913,6 +913,7 @@ func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cp costAnalyz
 		return nil // set default cost
 		return nil // set default cost
 	}
 	}
 	pv.Cost = pvWithCost.Cost
 	pv.Cost = pvWithCost.Cost
+	pv.ProviderID = key.ID()
 	return nil
 	return nil
 }
 }
 
 
@@ -1183,6 +1184,37 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 	return nodes, nil
 	return nodes, nil
 }
 }
 
 
+// TODO: drop some logs
+func (cm *CostModel) GetLBCost(cp costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.LoadBalancer, error) {
+	// for fetching prices from cloud provider
+	// cfg, err := cp.GetConfig()
+	// if err != nil {
+	// 	return nil, err
+	// }
+
+	servicesList := cm.Cache.GetAllServices()
+	loadBalancerMap := make(map[string]*costAnalyzerCloud.LoadBalancer)
+
+	for _, service := range servicesList {
+		namespace := service.GetObjectMeta().GetNamespace()
+		name := service.GetObjectMeta().GetName()
+		key := namespace + "," + name // + "," + clusterID?
+
+		if service.Spec.Type == "LoadBalancer" {
+			loadBalancer, err := cp.LoadBalancerPricing()
+			if err != nil {
+				return nil, err
+			}
+			newLoadBalancer := *loadBalancer
+			for _, loadBalancerIngress := range service.Status.LoadBalancer.Ingress {
+				newLoadBalancer.IngressIPAddresses = append(newLoadBalancer.IngressIPAddresses, loadBalancerIngress.IP)
+			}
+			loadBalancerMap[key] = &newLoadBalancer
+		}
+	}
+	return loadBalancerMap, nil
+}
+
 func getPodServices(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 func getPodServices(cache clustercache.ClusterCache, podList []*v1.Pod, clusterID string) (map[string]map[string][]string, error) {
 	servicesList := cache.GetAllServices()
 	servicesList := cache.GetAllServices()
 	podServicesMapping := make(map[string]map[string][]string)
 	podServicesMapping := make(map[string]map[string][]string)

+ 29 - 7
pkg/costmodel/metrics.go

@@ -408,6 +408,7 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 
 
 		containerSeen := make(map[string]bool)
 		containerSeen := make(map[string]bool)
 		nodeSeen := make(map[string]bool)
 		nodeSeen := make(map[string]bool)
+		loadBalancerSeen := make(map[string]bool)
 		pvSeen := make(map[string]bool)
 		pvSeen := make(map[string]bool)
 		pvcSeen := make(map[string]bool)
 		pvcSeen := make(map[string]bool)
 
 
@@ -511,6 +512,19 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 				nodeSeen[labelKey] = true
 				nodeSeen[labelKey] = true
 			}
 			}
 
 
+			loadBalancers, err := a.Model.GetLBCost(a.Cloud)
+			for lbKey, lb := range loadBalancers {
+				// TODO: parse (if necessary) and calculate cost associated with loadBalancer based on dynamic cloud prices fetched into each lb struct on GetLBCost() call
+				keyParts := getLabelStringsFromKey(lbKey)
+				namespace := keyParts[0]
+				serviceName := keyParts[1]
+				ingressIP := lb.IngressIPAddresses[0] // assumes one ingress IP per load balancer
+				a.LBCostRecorder.WithLabelValues(ingressIP, namespace, serviceName).Set(lb.Cost)
+
+				labelKey := getKeyFromLabelStrings(namespace, serviceName)
+				loadBalancerSeen[labelKey] = true
+			}
+
 			for _, costs := range data {
 			for _, costs := range data {
 				nodeName := costs.NodeName
 				nodeName := costs.NodeName
 
 
@@ -579,42 +593,42 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 					}
 					}
 					GetPVCost(cacPv, pv, a.Cloud, region)
 					GetPVCost(cacPv, pv, a.Cloud, region)
 					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
 					c, _ := strconv.ParseFloat(cacPv.Cost, 64)
-					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name).Set(c)
+					a.PersistentVolumePriceRecorder.WithLabelValues(pv.Name, pv.Name, cacPv.ProviderID).Set(c)
 					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
 					labelKey := getKeyFromLabelStrings(pv.Name, pv.Name)
 					pvSeen[labelKey] = true
 					pvSeen[labelKey] = true
 				}
 				}
 			}
 			}
 			for labelString, seen := range nodeSeen {
 			for labelString, seen := range nodeSeen {
 				if !seen {
 				if !seen {
-					klog.Infof("Removing %s from nodes", labelString)
+					klog.V(4).Infof("Removing %s from nodes", labelString)
 					labels := getLabelStringsFromKey(labelString)
 					labels := getLabelStringsFromKey(labelString)
 					ok := a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
 					ok := a.NodeTotalPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 					if ok {
-						klog.Infof("removed %s from totalprice", labelString)
+						klog.V(4).Infof("removed %s from totalprice", labelString)
 					} else {
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
 						klog.Infof("FAILURE TO REMOVE %s from totalprice", labelString)
 					}
 					}
 					ok = a.NodeSpotRecorder.DeleteLabelValues(labels...)
 					ok = a.NodeSpotRecorder.DeleteLabelValues(labels...)
 					if ok {
 					if ok {
-						klog.Infof("removed %s from spot records", labelString)
+						klog.V(4).Infof("removed %s from spot records", labelString)
 					} else {
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from spot records", labelString)
 						klog.Infof("FAILURE TO REMOVE %s from spot records", labelString)
 					}
 					}
 					ok = a.CPUPriceRecorder.DeleteLabelValues(labels...)
 					ok = a.CPUPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 					if ok {
-						klog.Infof("removed %s from cpuprice", labelString)
+						klog.V(4).Infof("removed %s from cpuprice", labelString)
 					} else {
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
 						klog.Infof("FAILURE TO REMOVE %s from cpuprice", labelString)
 					}
 					}
 					ok = a.GPUPriceRecorder.DeleteLabelValues(labels...)
 					ok = a.GPUPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 					if ok {
-						klog.Infof("removed %s from gpuprice", labelString)
+						klog.V(4).Infof("removed %s from gpuprice", labelString)
 					} else {
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
 						klog.Infof("FAILURE TO REMOVE %s from gpuprice", labelString)
 					}
 					}
 					ok = a.RAMPriceRecorder.DeleteLabelValues(labels...)
 					ok = a.RAMPriceRecorder.DeleteLabelValues(labels...)
 					if ok {
 					if ok {
-						klog.Infof("removed %s from ramprice", labelString)
+						klog.V(4).Infof("removed %s from ramprice", labelString)
 					} else {
 					} else {
 						klog.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
 						klog.Infof("FAILURE TO REMOVE %s from ramprice", labelString)
 					}
 					}
@@ -623,6 +637,14 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 					nodeSeen[labelString] = false
 					nodeSeen[labelString] = false
 				}
 				}
 			}
 			}
+			for labelString, seen := range loadBalancerSeen {
+				if !seen {
+					labels := getLabelStringsFromKey(labelString)
+					a.LBCostRecorder.DeleteLabelValues(labels...)
+				} else {
+					loadBalancerSeen[labelString] = false
+				}
+			}
 			for labelString, seen := range containerSeen {
 			for labelString, seen := range containerSeen {
 				if !seen {
 				if !seen {
 					labels := getLabelStringsFromKey(labelString)
 					labels := getLabelStringsFromKey(labelString)

+ 19 - 5
pkg/costmodel/router.go

@@ -36,6 +36,7 @@ import (
 
 
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
 	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/clientcmd"
 )
 )
 
 
 const (
 const (
@@ -74,6 +75,7 @@ type Accesses struct {
 	GPUAllocationRecorder         *prometheus.GaugeVec
 	GPUAllocationRecorder         *prometheus.GaugeVec
 	PVAllocationRecorder          *prometheus.GaugeVec
 	PVAllocationRecorder          *prometheus.GaugeVec
 	ClusterManagementCostRecorder *prometheus.GaugeVec
 	ClusterManagementCostRecorder *prometheus.GaugeVec
+	LBCostRecorder                *prometheus.GaugeVec
 	NetworkZoneEgressRecorder     prometheus.Gauge
 	NetworkZoneEgressRecorder     prometheus.Gauge
 	NetworkRegionEgressRecorder   prometheus.Gauge
 	NetworkRegionEgressRecorder   prometheus.Gauge
 	NetworkInternetEgressRecorder prometheus.Gauge
 	NetworkInternetEgressRecorder prometheus.Gauge
@@ -743,7 +745,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Address:      address,
 		Address:      address,
 		RoundTripper: LongTimeoutRoundTripper,
 		RoundTripper: LongTimeoutRoundTripper,
 	}
 	}
-	promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency, dbBasicAuthUsername, dbBasicAuthPW, dbBearerToken)
+	promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency, dbBasicAuthUsername, dbBasicAuthPW, dbBearerToken, env.GetQueryLoggingFile())
 
 
 	m, err := ValidatePrometheus(promCli, false)
 	m, err := ValidatePrometheus(promCli, false)
 	if err != nil || m.Running == false {
 	if err != nil || m.Running == false {
@@ -764,7 +766,13 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	}
 	}
 
 
 	// Kubernetes API setup
 	// Kubernetes API setup
-	kc, err := rest.InClusterConfig()
+	var kc *rest.Config
+	if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
+		kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
+	} else {
+		kc, err = rest.InClusterConfig()
+	}
+
 	if err != nil {
 	if err != nil {
 		panic(err.Error())
 		panic(err.Error())
 	}
 	}
@@ -822,7 +830,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 
 
 	// TODO: General Architecture Note: Several passes have been made to modularize a lot of
 	// TODO: General Architecture Note: Several passes have been made to modularize a lot of
 	// TODO: our code, but the router still continues to be the obvious entry point for new \
 	// TODO: our code, but the router still continues to be the obvious entry point for new \
-	// TODO: features. We should look to spliting out the actual "router" functionality and
+	// TODO: features. We should look to split out the actual "router" functionality and
 	// TODO: implement a builder -> controller for stitching new features and other dependencies.
 	// TODO: implement a builder -> controller for stitching new features and other dependencies.
 	clusterManager := newClusterManager()
 	clusterManager := newClusterManager()
 
 
@@ -854,7 +862,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 	pvGv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "pv_hourly_cost",
 		Name: "pv_hourly_cost",
 		Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
 		Help: "pv_hourly_cost Cost per GB per hour on a persistent disk",
-	}, []string{"volumename", "persistentvolume"})
+	}, []string{"volumename", "persistentvolume", "provider_id"})
 
 
 	RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 	RAMAllocation := prometheus.NewGaugeVec(prometheus.GaugeOpts{
 		Name: "container_memory_allocation_bytes",
 		Name: "container_memory_allocation_bytes",
@@ -891,6 +899,10 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Name: "kubecost_cluster_management_cost",
 		Name: "kubecost_cluster_management_cost",
 		Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
 		Help: "kubecost_cluster_management_cost Hourly cost paid as a cluster management fee.",
 	}, []string{"provisioner_name"})
 	}, []string{"provisioner_name"})
+	LBCostRecorder := prometheus.NewGaugeVec(prometheus.GaugeOpts{ // no differentiation between ELB and ALB right now
+		Name: "kubecost_load_balancer_cost",
+		Help: "kubecost_load_balancer_cost Hourly cost of load balancer",
+	}, []string{"ingress_ip", "namespace", "service_name"}) // assumes one ingress IP per load balancer
 
 
 	prometheus.MustRegister(cpuGv)
 	prometheus.MustRegister(cpuGv)
 	prometheus.MustRegister(ramGv)
 	prometheus.MustRegister(ramGv)
@@ -904,6 +916,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	prometheus.MustRegister(GPUAllocation)
 	prometheus.MustRegister(GPUAllocation)
 	prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
 	prometheus.MustRegister(NetworkZoneEgressRecorder, NetworkRegionEgressRecorder, NetworkInternetEgressRecorder)
 	prometheus.MustRegister(ClusterManagementCostRecorder)
 	prometheus.MustRegister(ClusterManagementCostRecorder)
+	prometheus.MustRegister(LBCostRecorder)
 	prometheus.MustRegister(ServiceCollector{
 	prometheus.MustRegister(ServiceCollector{
 		KubeClientSet: kubeClientset,
 		KubeClientSet: kubeClientset,
 	})
 	})
@@ -940,6 +953,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
 		PersistentVolumePriceRecorder: pvGv,
 		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
 		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
+		LBCostRecorder:                LBCostRecorder,
 		Model:                         NewCostModel(k8sCache),
 		Model:                         NewCostModel(k8sCache),
 		OutOfClusterCache:             outOfClusterCache,
 		OutOfClusterCache:             outOfClusterCache,
 	}
 	}
@@ -977,7 +991,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 				RoundTripper: thanosRT,
 				RoundTripper: thanosRT,
 			}
 			}
 
 
-			thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency, multiclusterDBBasicAuthUsername, multiclusterDBBasicAuthPW, multiClusterBearerToken)
+			thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency, multiclusterDBBasicAuthUsername, multiclusterDBBasicAuthPW, multiClusterBearerToken, env.GetQueryLoggingFile())
 
 
 			_, err = ValidatePrometheus(thanosCli, true)
 			_, err = ValidatePrometheus(thanosCli, true)
 			if err != nil {
 			if err != nil {

+ 13 - 0
pkg/env/costmodelenv.go

@@ -10,6 +10,7 @@ const (
 	ClusterProfileEnvVar           = "CLUSTER_PROFILE"
 	ClusterProfileEnvVar           = "CLUSTER_PROFILE"
 	PrometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
 	PrometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
 	MaxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
 	MaxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
+	QueryLoggingFileEnvVar         = "QUERY_LOGGING_FILE"
 	RemoteEnabledEnvVar            = "REMOTE_WRITE_ENABLED"
 	RemoteEnabledEnvVar            = "REMOTE_WRITE_ENABLED"
 	RemotePWEnvVar                 = "REMOTE_WRITE_PASSWORD"
 	RemotePWEnvVar                 = "REMOTE_WRITE_PASSWORD"
 	SQLAddressEnvVar               = "SQL_ADDRESS"
 	SQLAddressEnvVar               = "SQL_ADDRESS"
@@ -37,6 +38,8 @@ const (
 	MultiClusterBearerToken       = "MC_BEARER_TOKEN"
 	MultiClusterBearerToken       = "MC_BEARER_TOKEN"
 
 
 	InsecureSkipVerify = "INSECURE_SKIP_VERIFY"
 	InsecureSkipVerify = "INSECURE_SKIP_VERIFY"
+
+	KubeConfigPathEnvVar = "KUBECONFIG_PATH"
 )
 )
 
 
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
@@ -183,6 +186,11 @@ func GetMaxQueryConcurrency() int {
 	return GetInt(MaxQueryConcurrencyEnvVar, 5)
 	return GetInt(MaxQueryConcurrencyEnvVar, 5)
 }
 }
 
 
+// GetQueryLoggingFile returns a file location if query logging is enabled. Otherwise, empty string
+func GetQueryLoggingFile() string {
+	return Get(QueryLoggingFileEnvVar, "")
+}
+
 func GetDBBasicAuthUsername() string {
 func GetDBBasicAuthUsername() string {
 	return Get(DBBasicAuthUsername, "")
 	return Get(DBBasicAuthUsername, "")
 }
 }
@@ -209,3 +217,8 @@ func GetMultiClusterBasicAuthPassword() string {
 func GetMultiClusterBearerToken() string {
 func GetMultiClusterBearerToken() string {
 	return Get(MultiClusterBearerToken, "")
 	return Get(MultiClusterBearerToken, "")
 }
 }
+
+// GetKubeConfigPath returns the environment variable value for KubeConfigPathEnvVar
+func GetKubeConfigPath() string {
+	return Get(KubeConfigPathEnvVar, "")
+}

+ 122 - 24
pkg/prom/prom.go

@@ -4,6 +4,10 @@ import (
 	"context"
 	"context"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
+	"os"
+	"time"
+
+	golog "log"
 
 
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -15,41 +19,63 @@ import (
 type RateLimitedPrometheusClient struct {
 type RateLimitedPrometheusClient struct {
 	client      prometheus.Client
 	client      prometheus.Client
 	limiter     *util.Semaphore
 	limiter     *util.Semaphore
-	requests    *util.AtomicInt32
+	queue       util.BlockingQueue
 	outbound    *util.AtomicInt32
 	outbound    *util.AtomicInt32
 	username    string
 	username    string
 	password    string
 	password    string
 	bearerToken string
 	bearerToken string
+	fileLogger  *golog.Logger
 }
 }
 
 
 // requestCounter is used to determine if the prometheus client keeps track of
 // requestCounter is used to determine if the prometheus client keeps track of
 // the concurrent outbound requests
 // the concurrent outbound requests
 type requestCounter interface {
 type requestCounter interface {
-	TotalRequests() int32
-	TotalOutboundRequests() int32
+	TotalRequests() int
+	TotalOutboundRequests() int
 }
 }
 
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
 // prometheus requests.
-func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password, bearerToken string) (prometheus.Client, error) {
+func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password, bearerToken string, queryLogFile string) (prometheus.Client, error) {
 	c, err := prometheus.NewClient(config)
 	c, err := prometheus.NewClient(config)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	limiter := util.NewSemaphore(maxConcurrency)
-	requests := util.NewAtomicInt32(0)
+	queue := util.NewBlockingQueue()
 	outbound := util.NewAtomicInt32(0)
 	outbound := util.NewAtomicInt32(0)
 
 
-	return &RateLimitedPrometheusClient{
+	var logger *golog.Logger
+	if queryLogFile != "" {
+		exists, err := util.FileExists(queryLogFile)
+		if exists {
+			os.Remove(queryLogFile)
+		}
+
+		f, err := os.OpenFile(queryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
+		if err != nil {
+			log.Infof("Failed to open queryLogFile: %s for query logging: %s", queryLogFile, err)
+		} else {
+			logger = golog.New(f, "query-log", golog.LstdFlags)
+		}
+	}
+
+	rlpc := &RateLimitedPrometheusClient{
 		client:      c,
 		client:      c,
-		limiter:     limiter,
-		requests:    requests,
+		queue:       queue,
 		outbound:    outbound,
 		outbound:    outbound,
 		username:    username,
 		username:    username,
 		password:    password,
 		password:    password,
 		bearerToken: bearerToken,
 		bearerToken: bearerToken,
-	}, nil
+		fileLogger:  logger,
+	}
+
+	// Start concurrent request processing
+	for i := 0; i < maxConcurrency; i++ {
+		go rlpc.worker()
+	}
+
+	return rlpc, nil
 }
 }
 
 
 // LogPrometheusClientState logs the current state, with respect to outbound requests, if that
 // LogPrometheusClientState logs the current state, with respect to outbound requests, if that
@@ -64,16 +90,27 @@ func LogPrometheusClientState(client prometheus.Client) {
 	}
 	}
 }
 }
 
 
+// LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
+func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
+	if l == nil {
+		return
+	}
+	qp := util.NewQueryParams(req.URL.Query())
+	query := qp.Get("query", "<Unknown>")
+
+	l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
+}
+
 // TotalRequests returns the total number of requests that are either waiting to be sent and/or
 // TotalRequests returns the total number of requests that are either waiting to be sent and/or
 // are currently outbound.
 // are currently outbound.
-func (rlpc *RateLimitedPrometheusClient) TotalRequests() int32 {
-	return rlpc.requests.Get()
+func (rlpc *RateLimitedPrometheusClient) TotalRequests() int {
+	return rlpc.queue.Length()
 }
 }
 
 
 // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
 // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
 // sent to the server and are awaiting response.
 // sent to the server and are awaiting response.
-func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int32 {
-	return rlpc.outbound.Get()
+func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
+	return int(rlpc.outbound.Get())
 }
 }
 
 
 // Passthrough to the prometheus client API
 // Passthrough to the prometheus client API
@@ -81,6 +118,66 @@ func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string)
 	return rlpc.client.URL(ep, args)
 	return rlpc.client.URL(ep, args)
 }
 }
 
 
+// workRequest is used to queue requests
+type workRequest struct {
+	ctx      context.Context
+	req      *http.Request
+	start    time.Time
+	respChan chan *workResponse
+	// used as a sentinel value to close the worker goroutine
+	closer bool
+}
+
+// workResponse is the response payload returned to the Do method
+type workResponse struct {
+	res      *http.Response
+	body     []byte
+	warnings prometheus.Warnings
+	err      error
+}
+
+// worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
+func (rlpc *RateLimitedPrometheusClient) worker() {
+	for {
+		// blocks until there is an item available
+		item := rlpc.queue.Dequeue()
+
+		// Ensure the dequeued item was a workRequest
+		if we, ok := item.(*workRequest); ok {
+			// if we need to shut down all workers, we'll need to submit sentinel values
+			// that will force the worker to return
+			if we.closer {
+				return
+			}
+
+			ctx := we.ctx
+			req := we.req
+
+			// measure time in queue
+			timeInQueue := time.Since(we.start)
+
+			// Increment outbound counter
+			rlpc.outbound.Increment()
+
+			// Execute Request
+			roundTripStart := time.Now()
+			res, body, warnings, err := rlpc.client.Do(ctx, req)
+
+			// Decrement outbound counter
+			rlpc.outbound.Decrement()
+			LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
+
+			// Pass back response data over channel to caller
+			we.respChan <- &workResponse{
+				res:      res,
+				body:     body,
+				warnings: warnings,
+				err:      err,
+			}
+		}
+	}
+}
+
 // Rate limit and passthrough to prometheus client API
 // Rate limit and passthrough to prometheus client API
 func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
 func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
 	if rlpc.username != "" {
 	if rlpc.username != "" {
@@ -90,17 +187,18 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 		token := "Bearer " + rlpc.bearerToken
 		token := "Bearer " + rlpc.bearerToken
 		req.Header.Add("Authorization", token)
 		req.Header.Add("Authorization", token)
 	}
 	}
-	// Increment the total request counter first
-	rlpc.requests.Increment()
-	defer rlpc.requests.Decrement()
 
 
-	// Acquire mutex based on concurrency limiter
-	rlpc.limiter.Acquire()
-	defer rlpc.limiter.Return()
+	respChan := make(chan *workResponse)
+	defer close(respChan)
 
 
-	// Increment outbound once mutex acquired
-	rlpc.outbound.Increment()
-	defer rlpc.outbound.Decrement()
+	rlpc.queue.Enqueue(&workRequest{
+		ctx:      ctx,
+		req:      req,
+		start:    time.Now(),
+		respChan: respChan,
+		closer:   false,
+	})
 
 
-	return rlpc.client.Do(ctx, req)
+	workRes := <-respChan
+	return workRes.res, workRes.body, workRes.warnings, workRes.err
 }
 }

+ 2 - 2
pkg/prom/result.go

@@ -143,7 +143,7 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 				return qrs
 				return qrs
 			}
 			}
 			if warn != nil {
 			if warn != nil {
-				log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
+				log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
 			}
 			}
 
 
 			vectors = append(vectors, v)
 			vectors = append(vectors, v)
@@ -165,7 +165,7 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 					if labelString == "" {
 					if labelString == "" {
 						labelString = labelsForMetric(metricMap)
 						labelString = labelsForMetric(metricMap)
 					}
 					}
-					log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
+					log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
 				}
 				}
 
 
 				vectors = append(vectors, v)
 				vectors = append(vectors, v)

+ 81 - 0
pkg/util/blockingqueue.go

@@ -0,0 +1,81 @@
+package util
+
+import (
+	"sync"
+)
+
+//--------------------------------------------------------------------------
+//  BlockingQueue
+//--------------------------------------------------------------------------
+
+// BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
+// This data structure should use a pool of worker goroutines to await work.
+type BlockingQueue interface {
+	// Enqueue pushes an item onto the queue
+	Enqueue(item interface{})
+
+	// Dequeue removes the first item from the queue and returns it.
+	Dequeue() interface{}
+
+	// Length returns the length of the queue
+	Length() int
+
+	// IsEmpty returns true if the queue is empty
+	IsEmpty() bool
+}
+
+// blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
+type blockingSliceQueue struct {
+	q        []interface{}
+	l        *sync.Mutex
+	nonEmpty *sync.Cond
+}
+
+// NewBlockingQueue returns a new BlockingQueue implementation
+func NewBlockingQueue() BlockingQueue {
+	l := new(sync.Mutex)
+
+	return &blockingSliceQueue{
+		q:        []interface{}{},
+		l:        l,
+		nonEmpty: sync.NewCond(l),
+	}
+}
+
+// Enqueue pushes an item onto the queue
+func (q *blockingSliceQueue) Enqueue(item interface{}) {
+	q.l.Lock()
+	defer q.l.Unlock()
+
+	q.q = append(q.q, item)
+	q.nonEmpty.Broadcast()
+}
+
+// Dequeue removes the first item from the queue and returns it.
+func (q *blockingSliceQueue) Dequeue() interface{} {
+	q.l.Lock()
+	defer q.l.Unlock()
+
+	// need to tight loop here to ensure only one thread wins and
+	// others wait again
+	for len(q.q) == 0 {
+		q.nonEmpty.Wait()
+	}
+
+	e := q.q[0]
+	q.q = q.q[1:]
+	return e
+}
+
+// Length returns the length of the queue
+func (q *blockingSliceQueue) Length() int {
+	q.l.Lock()
+	defer q.l.Unlock()
+
+	return len(q.q)
+}
+
+// IsEmpty returns true if the queue is empty
+func (q *blockingSliceQueue) IsEmpty() bool {
+	return q.Length() == 0
+}