Просмотр исходного кода

Merge branch 'develop' into kaelan-disable-metrics

Kaelan Patel 4 лет назад
Родитель
Сommit
6263463d09

+ 0 - 5
cmd/costmodel/main.go

@@ -1,7 +1,6 @@
 package main
 package main
 
 
 import (
 import (
-	"flag"
 	"os"
 	"os"
 
 
 	"github.com/kubecost/cost-model/pkg/cmd"
 	"github.com/kubecost/cost-model/pkg/cmd"
@@ -9,10 +8,6 @@ import (
 )
 )
 
 
 func main() {
 func main() {
-	klog.InitFlags(nil)
-	flag.Set("v", "3")
-	flag.Parse()
-
 	// runs the appropriate application mode using the default cost-model command
 	// runs the appropriate application mode using the default cost-model command
 	// see: github.com/kubecost/cost-model/pkg/cmd package for details
 	// see: github.com/kubecost/cost-model/pkg/cmd package for details
 	if err := cmd.Execute(nil); err != nil {
 	if err := cmd.Execute(nil); err != nil {

+ 4 - 2
go.mod

@@ -6,7 +6,6 @@ require (
 	cloud.google.com/go v0.81.0
 	cloud.google.com/go v0.81.0
 	cloud.google.com/go/bigquery v1.8.0
 	cloud.google.com/go/bigquery v1.8.0
 	github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
 	github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
-	github.com/Azure/azure-storage-blob-go v0.13.0
 	github.com/Azure/go-autorest/autorest v0.11.17
 	github.com/Azure/go-autorest/autorest v0.11.17
 	github.com/Azure/go-autorest/autorest/azure/auth v0.5.6
 	github.com/Azure/go-autorest/autorest/azure/auth v0.5.6
 	github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
 	github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
@@ -20,8 +19,9 @@ require (
 	github.com/jszwec/csvutil v1.2.1
 	github.com/jszwec/csvutil v1.2.1
 	github.com/julienschmidt/httprouter v1.3.0
 	github.com/julienschmidt/httprouter v1.3.0
 	github.com/lib/pq v1.2.0
 	github.com/lib/pq v1.2.0
-	github.com/microcosm-cc/bluemonday v1.0.5
+	github.com/microcosm-cc/bluemonday v1.0.16
 	github.com/minio/minio-go/v7 v7.0.15
 	github.com/minio/minio-go/v7 v7.0.15
+	github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
 	github.com/patrickmn/go-cache v2.1.0+incompatible
 	github.com/patrickmn/go-cache v2.1.0+incompatible
 	github.com/pkg/errors v0.9.1
 	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.0.0
 	github.com/prometheus/client_golang v1.0.0
@@ -30,10 +30,12 @@ require (
 	github.com/satori/go.uuid v1.2.0 // indirect
 	github.com/satori/go.uuid v1.2.0 // indirect
 	github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
 	github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
 	github.com/spf13/cobra v1.2.1
 	github.com/spf13/cobra v1.2.1
+	github.com/spf13/pflag v1.0.5
 	go.etcd.io/bbolt v1.3.5
 	go.etcd.io/bbolt v1.3.5
 	golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
 	golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
 	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
 	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
 	google.golang.org/api v0.44.0
 	google.golang.org/api v0.44.0
+	gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
 	gopkg.in/yaml.v2 v2.4.0
 	gopkg.in/yaml.v2 v2.4.0
 	k8s.io/api v0.20.4
 	k8s.io/api v0.20.4
 	k8s.io/apimachinery v0.20.4
 	k8s.io/apimachinery v0.20.4

+ 7 - 16
go.sum

@@ -41,19 +41,14 @@ cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
 github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
-github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
-github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
-github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc=
-github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs=
 github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
 github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
 github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
 github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
 github.com/Azure/go-autorest/autorest v0.11.1/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
 github.com/Azure/go-autorest/autorest v0.11.1/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
 github.com/Azure/go-autorest/autorest v0.11.17 h1:2zCdHwNgRH+St1J+ZMf66xI8aLr/5KMy+wWLH97zwYM=
 github.com/Azure/go-autorest/autorest v0.11.17 h1:2zCdHwNgRH+St1J+ZMf66xI8aLr/5KMy+wWLH97zwYM=
 github.com/Azure/go-autorest/autorest v0.11.17/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw=
 github.com/Azure/go-autorest/autorest v0.11.17/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw=
 github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
 github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
-github.com/Azure/go-autorest/autorest/adal v0.9.2/go.mod h1:/3SMAM86bP6wC9Ev35peQDUeqFZBMH07vvUOmg4z/fE=
 github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
 github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
 github.com/Azure/go-autorest/autorest/adal v0.9.10 h1:r6fZHMaHD8B6LDCn0o5vyBFHIHrM6Ywwx7mb49lPItI=
 github.com/Azure/go-autorest/autorest/adal v0.9.10 h1:r6fZHMaHD8B6LDCn0o5vyBFHIHrM6Ywwx7mb49lPItI=
 github.com/Azure/go-autorest/autorest/adal v0.9.10/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
 github.com/Azure/go-autorest/autorest/adal v0.9.10/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
@@ -108,8 +103,6 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
 github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
 github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
 github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
 github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
-github.com/chris-ramon/douceur v0.2.0 h1:IDMEdxlEUUBYBKE4z/mJnFyVXox+MjuEVDJNN27glkU=
-github.com/chris-ramon/douceur v0.2.0/go.mod h1:wDW5xjJdeoMm1mRt4sD4c/LbF/mWdEpRXQKjTR8nIBE=
 github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
 github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
 github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
 github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -368,8 +361,6 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
 github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
 github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
 github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
 github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
 github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
 github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
-github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI=
-github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
 github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
 github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
 github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
 github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
 github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
 github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
@@ -380,8 +371,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
 github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
 github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
 github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
 github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
 github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
 github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
-github.com/microcosm-cc/bluemonday v1.0.5 h1:cF59UCKMmmUgqN1baLvqU/B1ZsMori+duLVTLpgiG3w=
-github.com/microcosm-cc/bluemonday v1.0.5/go.mod h1:8iwZnFn2CDDNZ0r6UXhF4xawGvzaqzCRa1n3/lO3W2w=
+github.com/microcosm-cc/bluemonday v1.0.16 h1:kHmAq2t7WPWLjiGvzKa5o3HzSfahUKiOq7fAPUiMNIc=
+github.com/microcosm-cc/bluemonday v1.0.16/go.mod h1:Z0r70sCuXHig8YpBzCc5eGHAap2K7e/u082ZUpDRRqM=
 github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
 github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
 github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
 github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
 github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
 github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
@@ -611,7 +602,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -633,8 +623,9 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
-golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
+golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -682,7 +673,6 @@ golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -699,7 +689,6 @@ golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -713,6 +702,7 @@ golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
@@ -724,8 +714,9 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

+ 1 - 1
pkg/cloud/csvprovider.go

@@ -54,6 +54,7 @@ func GetCsv(location string) (io.Reader, error) {
 
 
 func (c *CSVProvider) DownloadPricingData() error {
 func (c *CSVProvider) DownloadPricingData() error {
 	c.DownloadPricingDataLock.Lock()
 	c.DownloadPricingDataLock.Lock()
+	defer time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
 	defer c.DownloadPricingDataLock.Unlock()
 	defer c.DownloadPricingDataLock.Unlock()
 	pricing := make(map[string]*price)
 	pricing := make(map[string]*price)
 	nodeclasspricing := make(map[string]float64)
 	nodeclasspricing := make(map[string]float64)
@@ -177,7 +178,6 @@ func (c *CSVProvider) DownloadPricingData() error {
 	} else {
 	} else {
 		log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
 		log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
 	}
 	}
-	time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
 	return nil
 	return nil
 }
 }
 
 

+ 23 - 1
pkg/cmd/agent/agent.go

@@ -85,8 +85,30 @@ func newPrometheusClient() (prometheus.Client, error) {
 
 
 	timeout := 120 * time.Second
 	timeout := 120 * time.Second
 	keepAlive := 120 * time.Second
 	keepAlive := 120 * time.Second
+	tlsHandshakeTimeout := 10 * time.Second
 
 
-	promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+	var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
+	if env.IsPrometheusRetryOnRateLimitResponse() {
+		rateLimitRetryOpts = &prom.RateLimitRetryOpts{
+			MaxRetries:       env.GetPrometheusRetryOnRateLimitMaxRetries(),
+			DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
+		}
+	}
+
+	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
+		Timeout:               timeout,
+		KeepAlive:             keepAlive,
+		TLSHandshakeTimeout:   tlsHandshakeTimeout,
+		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+		RateLimitRetryOpts:    rateLimitRetryOpts,
+		Auth: &prom.ClientAuth{
+			Username:    env.GetDBBasicAuthUsername(),
+			Password:    env.GetDBBasicAuthUserPassword(),
+			BearerToken: env.GetDBBearerToken(),
+		},
+		QueryConcurrency: queryConcurrency,
+		QueryLogFile:     "",
+	})
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
 		return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
 	}
 	}

+ 9 - 0
pkg/cmd/commands.go

@@ -1,12 +1,15 @@
 package cmd
 package cmd
 
 
 import (
 import (
+	"flag"
 	"fmt"
 	"fmt"
 	"os"
 	"os"
 
 
 	"github.com/kubecost/cost-model/pkg/cmd/agent"
 	"github.com/kubecost/cost-model/pkg/cmd/agent"
 	"github.com/kubecost/cost-model/pkg/cmd/costmodel"
 	"github.com/kubecost/cost-model/pkg/cmd/costmodel"
 	"github.com/spf13/cobra"
 	"github.com/spf13/cobra"
+	"github.com/spf13/pflag"
+	"k8s.io/klog"
 )
 )
 
 
 const (
 const (
@@ -38,6 +41,12 @@ func Execute(costModelCmd *cobra.Command) error {
 
 
 	rootCmd := newRootCommand(costModelCmd)
 	rootCmd := newRootCommand(costModelCmd)
 
 
+	// initialize klog and make cobra aware of all the go flags
+	klog.InitFlags(nil)
+	pflag.CommandLine.AddGoFlag(flag.CommandLine.Lookup("v"))
+	pflag.CommandLine.AddGoFlag(flag.CommandLine.Lookup("logtostderr"))
+	pflag.CommandLine.Set("v", "3")
+
 	// in the event that no directive/command is passed, we want to default to using the cost-model command
 	// in the event that no directive/command is passed, we want to default to using the cost-model command
 	// cobra doesn't provide a way within the API to do this, so we'll prepend the command if it is omitted.
 	// cobra doesn't provide a way within the API to do this, so we'll prepend the command if it is omitted.
 	if len(os.Args) > 1 {
 	if len(os.Args) > 1 {

+ 79 - 12
pkg/costmodel/allocation.go

@@ -15,7 +15,6 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/apimachinery/pkg/labels"
-	"k8s.io/klog"
 )
 )
 
 
 const (
 const (
@@ -375,6 +374,67 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 	podPVCMap := map[podKey][]*PVC{}
 	podPVCMap := map[podKey][]*PVC{}
 	buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
 	buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
 
 
+	// Because PVCs can be shared among pods, the respective PV cost
+	// needs to be evenly distributed to those pods based on time
+	// running, as well as the amount of time the PVC was shared.
+
+	// Build a relation between every PVC to the pods that mount it
+	// and a window representing the interval during which they
+	// were associated.
+	pvcPodIntervalMap := make(map[pvcKey]map[podKey]kubecost.Window)
+
+	for _, pod := range podMap {
+
+		for _, alloc := range pod.Allocations {
+
+			cluster := alloc.Properties.Cluster
+			namespace := alloc.Properties.Namespace
+			pod := alloc.Properties.Pod
+			thisPodKey := newPodKey(cluster, namespace, pod)
+
+			if pvcs, ok := podPVCMap[thisPodKey]; ok {
+				for _, pvc := range pvcs {
+
+					// Determine the (start, end) of the relationship between the
+					// given PVC and the associated Allocation so that a precise
+					// number of hours can be used to compute cumulative cost.
+					s, e := alloc.Start, alloc.End
+					if pvc.Start.After(alloc.Start) {
+						s = pvc.Start
+					}
+					if pvc.End.Before(alloc.End) {
+						e = pvc.End
+					}
+
+					thisPVCKey := newPVCKey(cluster, namespace, pvc.Name)
+					if pvcPodIntervalMap[thisPVCKey] == nil {
+						pvcPodIntervalMap[thisPVCKey] = make(map[podKey]kubecost.Window)
+					}
+
+					pvcPodIntervalMap[thisPVCKey][thisPodKey] = kubecost.NewWindow(&s, &e)
+				}
+			}
+
+			// We only need to look at one alloc per pod
+			break
+		}
+
+	}
+
+	// Build out a PV price coefficient for each pod with a PVC. Each
+	// PVC-pod relation needs a coefficient which modifies the PV cost
+	// such that PV costs can be shared between all pods using that PVC.
+	sharedPVCCostCoefficientMap := make(map[pvcKey]map[podKey][]CoefficientComponent)
+	for pvcKey, podIntervalMap := range pvcPodIntervalMap {
+
+		// Get single-point intervals from alloc-PVC relation windows.
+		intervals := getIntervalPointsFromWindows(podIntervalMap)
+
+		// Determine coefficients for each PVC-pod relation.
+		sharedPVCCostCoefficientMap[pvcKey] = getPVCCostCoefficients(intervals, podIntervalMap)
+
+	}
+
 	// Identify unmounted PVs (PVs without PVCs) and add one Allocation per
 	// Identify unmounted PVs (PVs without PVCs) and add one Allocation per
 	// cluster representing each cluster's unmounted PVs (if necessary).
 	// cluster representing each cluster's unmounted PVs (if necessary).
 	applyUnmountedPVs(window, podMap, pvMap, pvcMap)
 	applyUnmountedPVs(window, podMap, pvMap, pvcMap)
@@ -402,16 +462,16 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 			alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
 			alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
 			if pvcs, ok := podPVCMap[podKey]; ok {
 			if pvcs, ok := podPVCMap[podKey]; ok {
 				for _, pvc := range pvcs {
 				for _, pvc := range pvcs {
-					// Determine the (start, end) of the relationship between the
-					// given PVC and the associated Allocation so that a precise
-					// number of hours can be used to compute cumulative cost.
+
+					pvcKey := newPVCKey(cluster, namespace, pvc.Name)
+
 					s, e := alloc.Start, alloc.End
 					s, e := alloc.Start, alloc.End
-					if pvc.Start.After(alloc.Start) {
-						s = pvc.Start
-					}
-					if pvc.End.Before(alloc.End) {
-						e = pvc.End
+					if pvcInterval, ok := pvcPodIntervalMap[pvcKey][podKey]; ok {
+						s, e = *pvcInterval.Start(), *pvcInterval.End()
+					} else {
+						log.Warningf("CostModel.ComputeAllocation: allocation %s and PVC %s have no associated active window", alloc.Name, pvc.Name)
 					}
 					}
+
 					minutes := e.Sub(s).Minutes()
 					minutes := e.Sub(s).Minutes()
 					hrs := minutes / 60.0
 					hrs := minutes / 60.0
 
 
@@ -423,6 +483,13 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 					gib := pvc.Bytes / 1024 / 1024 / 1024
 					gib := pvc.Bytes / 1024 / 1024 / 1024
 					cost := pvc.Volume.CostPerGiBHour * gib * hrs
 					cost := pvc.Volume.CostPerGiBHour * gib * hrs
 
 
+					// Scale PV cost by PVC sharing coefficient.
+					if coeffComponents, ok := sharedPVCCostCoefficientMap[pvcKey][podKey]; ok {
+						cost *= getCoefficientFromComponents(coeffComponents)
+					} else {
+						log.Warningf("CostModel.ComputeAllocation: allocation %s and PVC %s have relation but no coeff", alloc.Name, pvc.Name)
+					}
+
 					// Apply the size and cost of the PV to the allocation, each
 					// Apply the size and cost of the PV to the allocation, each
 					// weighted by count (i.e. the number of containers in the pod)
 					// weighted by count (i.e. the number of containers in the pod)
 					// record the amount of total PVBytes Hours attributable to a given PV
 					// record the amount of total PVBytes Hours attributable to a given PV
@@ -678,7 +745,7 @@ func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom
 
 
 		cpuCores := res.Values[0].Value
 		cpuCores := res.Values[0].Value
 		if cpuCores > MAX_CPU_CAP {
 		if cpuCores > MAX_CPU_CAP {
-			klog.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
+			log.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
 			cpuCores = 0.0
 			cpuCores = 0.0
 		}
 		}
 		hours := pod.Allocations[container].Minutes() / 60.0
 		hours := pod.Allocations[container].Minutes() / 60.0
@@ -724,7 +791,7 @@ func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom
 			pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
 			pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
 		}
 		}
 		if pod.Allocations[container].CPUCores() > MAX_CPU_CAP {
 		if pod.Allocations[container].CPUCores() > MAX_CPU_CAP {
-			klog.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
+			log.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
 			pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
 			pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
 		}
 		}
 
 
@@ -764,7 +831,7 @@ func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.Que
 
 
 		pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
 		pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
 		if res.Values[0].Value > MAX_CPU_CAP {
 		if res.Values[0].Value > MAX_CPU_CAP {
-			klog.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
+			log.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
 			pod.Allocations[container].CPUCoreUsageAverage = 0.0
 			pod.Allocations[container].CPUCoreUsageAverage = 0.0
 		}
 		}
 	}
 	}

+ 148 - 0
pkg/costmodel/intervals.go

@@ -0,0 +1,148 @@
+package costmodel
+
+import (
+	"sort"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/kubecost"
+)
+
+// IntervalPoint describes a start or end of a window of time
+// Currently, this used in PVC-pod relations to detect/calculate
+// coefficients for PV cost when a PVC is shared between pods.
+type IntervalPoint struct {
+	Time      time.Time
+	PointType string
+	Key       podKey
+}
+
+// IntervalPoints describes a slice of IntervalPoint structs
+type IntervalPoints []IntervalPoint
+
+// Requisite functions for implementing sort.Sort for
+// IntervalPointList
+func (ips IntervalPoints) Len() int {
+	return len(ips)
+}
+
+func (ips IntervalPoints) Less(i, j int) bool {
+	if ips[i].Time.Equal(ips[j].Time) {
+		return ips[i].PointType == "start" && ips[j].PointType == "end"
+	}
+	return ips[i].Time.Before(ips[j].Time)
+}
+
+func (ips IntervalPoints) Swap(i, j int) {
+	ips[i], ips[j] = ips[j], ips[i]
+}
+
+// NewIntervalPoint creates and returns a new IntervalPoint instance with given parameters.
+func NewIntervalPoint(time time.Time, pointType string, key podKey) IntervalPoint {
+	return IntervalPoint{
+		Time:      time,
+		PointType: pointType,
+		Key:       key,
+	}
+}
+
+// CoefficientComponent is a representitive struct holding two fields which describe an interval
+// as part of a single number cost coefficient calculation:
+// 1. Proportion: The division of cost based on how many pods were running between those points
+// 2. Time: The ratio of the time between those points to the total time that pod was running
+type CoefficientComponent struct {
+	Proportion float64
+	Time       float64
+}
+
+// getIntervalPointFromWindows takes a map of podKeys to windows
+// and returns a sorted list of IntervalPoints representing the
+// starts and ends of all those windows.
+func getIntervalPointsFromWindows(windows map[podKey]kubecost.Window) IntervalPoints {
+
+	var intervals IntervalPoints
+
+	for podKey, podInterval := range windows {
+
+		start := NewIntervalPoint(*podInterval.Start(), "start", podKey)
+		end := NewIntervalPoint(*podInterval.End(), "end", podKey)
+
+		intervals = append(intervals, []IntervalPoint{start, end}...)
+
+	}
+
+	sort.Sort(intervals)
+
+	return intervals
+
+}
+
+// getPVCCostCoefficients gets a coefficient which represents the scale
+// factor that each PVC in a pvcIntervalMap and corresponding slice of
+// IntervalPoints intervals uses to calculate a cost for that PVC's PV.
+func getPVCCostCoefficients(intervals IntervalPoints, pvcIntervalMap map[podKey]kubecost.Window) map[podKey][]CoefficientComponent {
+
+	pvcCostCoefficientMap := make(map[podKey][]CoefficientComponent)
+
+	// pvcCostCoefficientMap is mutated in this function. The format is
+	// such that the individual coefficient components are preserved for
+	// testing purposes.
+
+	activeKeys := map[podKey]struct{}{
+		intervals[0].Key: struct{}{},
+	}
+
+	// For each interval i.e. for any time a pod-PVC relation ends or starts...
+	for i := 1; i < len(intervals); i++ {
+
+		// intervals will always have at least two IntervalPoints (one start/end)
+		point := intervals[i]
+		prevPoint := intervals[i-1]
+
+		// If the current point happens at a later time than the previous point
+		if !point.Time.Equal(prevPoint.Time) {
+			for key := range activeKeys {
+				if pvcIntervalMap[key].Duration().Minutes() != 0 {
+					pvcCostCoefficientMap[key] = append(
+						pvcCostCoefficientMap[key],
+						CoefficientComponent{
+							Time:       point.Time.Sub(prevPoint.Time).Minutes() / pvcIntervalMap[key].Duration().Minutes(),
+							Proportion: 1.0 / float64(len(activeKeys)),
+						},
+					)
+				}
+			}
+		}
+
+		// If the point was a start, increment and track
+		if point.PointType == "start" {
+			activeKeys[point.Key] = struct{}{}
+		}
+
+		// If the point was an end, decrement and stop tracking
+		if point.PointType == "end" {
+			delete(activeKeys, point.Key)
+		}
+
+	}
+
+	return pvcCostCoefficientMap
+}
+
+// getCoefficientFromComponents takes the components of a PVC-pod PV cost coefficient
+// determined by getPVCCostCoefficient and gets the resulting single
+// floating point coefficient.
+func getCoefficientFromComponents(coefficientComponents []CoefficientComponent) float64 {
+
+	coefficient := 0.0
+
+	for i := range coefficientComponents {
+
+		proportion := coefficientComponents[i].Proportion
+		time := coefficientComponents[i].Time
+
+		coefficient += proportion * time
+
+	}
+
+	return coefficient
+}

+ 336 - 0
pkg/costmodel/intervals_test.go

@@ -0,0 +1,336 @@
+package costmodel
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/kubecost"
+)
+
+func TestGetIntervalPointsFromWindows(t *testing.T) {
+	cases := []struct {
+		name           string
+		pvcIntervalMap map[podKey]kubecost.Window
+		expected       []IntervalPoint
+	}{
+		{
+			name: "four pods w/ various overlaps",
+			pvcIntervalMap: map[podKey]kubecost.Window{
+				// Pod running from 8 am to 9 am
+				podKey{
+					Pod: "Pod1",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+				// Pod running from 8:30 am to 9 am
+				podKey{
+					Pod: "Pod2",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+				// Pod running from 8:45 am to 9 am
+				podKey{
+					Pod: "Pod3",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 45, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+				// Pod running from 8 am to 8:15 am
+				podKey{
+					Pod: "Pod4",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 8, 15, 0, 0, time.UTC),
+				)),
+			},
+			expected: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", podKey{Pod: "Pod4"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 15, 0, 0, time.UTC), "end", podKey{Pod: "Pod4"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", podKey{Pod: "Pod2"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 45, 0, 0, time.UTC), "start", podKey{Pod: "Pod3"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod2"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod3"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod1"}),
+			},
+		},
+		{
+			name: "two pods no overlap",
+			pvcIntervalMap: map[podKey]kubecost.Window{
+				// Pod running from 8 am to 8:30 am
+				podKey{
+					Pod: "Pod1",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+				)),
+				// Pod running from 8:30 am to 9 am
+				podKey{
+					Pod: "Pod2",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+			},
+			expected: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", podKey{Pod: "Pod2"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "end", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod2"}),
+			},
+		},
+		{
+			name: "two pods total overlap",
+			pvcIntervalMap: map[podKey]kubecost.Window{
+				// Pod running from 8:30 am to 9 am
+				podKey{
+					Pod: "Pod1",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+				// Pod running from 8:30 am to 9 am
+				podKey{
+					Pod: "Pod2",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+			},
+			expected: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", podKey{Pod: "Pod2"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod2"}),
+			},
+		},
+		{
+			name: "one pod",
+			pvcIntervalMap: map[podKey]kubecost.Window{
+				// Pod running from 8 am to 9 am
+				podKey{
+					Pod: "Pod1",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+			},
+			expected: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod1"}),
+			},
+		},
+	}
+
+	for _, testCase := range cases {
+		t.Run(testCase.name, func(t *testing.T) {
+
+			result := getIntervalPointsFromWindows(testCase.pvcIntervalMap)
+
+			if len(result) != len(testCase.expected) {
+				t.Errorf("getIntervalPointsFromWindows test failed: %s: Got %+v but expected %+v", testCase.name, result, testCase.expected)
+			}
+
+			for i := range testCase.expected {
+
+				// For correctness in terms of individual position of IntervalPoints, we only need to check the time/type.
+				// Key is used in other associated calculations, so it must exist, but order does not matter if other sorting
+				// logic is obeyed.
+				if !testCase.expected[i].Time.Equal(result[i].Time) || testCase.expected[i].PointType != result[i].PointType {
+					t.Errorf("getIntervalPointsFromWindows test failed: %s: Got point %s:%s but expected %s:%s", testCase.name, testCase.expected[i].PointType, testCase.expected[i].Time, result[i].PointType, result[i].Time)
+				}
+
+			}
+
+		})
+	}
+}
+
+func TestGetPVCCostCoefficients(t *testing.T) {
+	cases := []struct {
+		name           string
+		pvcIntervalMap map[podKey]kubecost.Window
+		intervals      []IntervalPoint
+		expected       map[podKey][]CoefficientComponent
+	}{
+		{
+			name: "four pods w/ various overlaps",
+			pvcIntervalMap: map[podKey]kubecost.Window{
+				// Pod running from 8 am to 9 am
+				podKey{
+					Pod: "Pod1",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+				// Pod running from 8:30 am to 9 am
+				podKey{
+					Pod: "Pod2",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+				// Pod running from 8:45 am to 9 am
+				podKey{
+					Pod: "Pod3",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 45, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+				// Pod running from 8 am to 8:15 am
+				podKey{
+					Pod: "Pod4",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 8, 15, 0, 0, time.UTC),
+				)),
+			},
+			intervals: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", podKey{Pod: "Pod4"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 15, 0, 0, time.UTC), "end", podKey{Pod: "Pod4"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", podKey{Pod: "Pod2"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 45, 0, 0, time.UTC), "start", podKey{Pod: "Pod3"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod2"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod3"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod1"}),
+			},
+			expected: map[podKey][]CoefficientComponent{
+				podKey{
+					Pod: "Pod1",
+				}: []CoefficientComponent{
+					CoefficientComponent{0.5, 0.25},
+					CoefficientComponent{1, 0.25},
+					CoefficientComponent{0.5, 0.25},
+					CoefficientComponent{1.0 / 3.0, 0.25},
+				},
+				podKey{
+					Pod: "Pod2",
+				}: []CoefficientComponent{
+					CoefficientComponent{0.5, 0.50},
+					CoefficientComponent{1.0 / 3.0, 0.50},
+				},
+				podKey{
+					Pod: "Pod3",
+				}: []CoefficientComponent{
+					CoefficientComponent{1.0 / 3.0, 1.0},
+				},
+				podKey{
+					Pod: "Pod4",
+				}: []CoefficientComponent{
+					CoefficientComponent{0.5, 1.0},
+				},
+			},
+		},
+		{
+			name: "two pods no overlap",
+			pvcIntervalMap: map[podKey]kubecost.Window{
+				// Pod running from 8 am to 8:30 am
+				podKey{
+					Pod: "Pod1",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+				)),
+				// Pod running from 8:30 am to 9 am
+				podKey{
+					Pod: "Pod2",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+			},
+			intervals: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", podKey{Pod: "Pod2"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "end", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod2"}),
+			},
+			expected: map[podKey][]CoefficientComponent{
+				podKey{
+					Pod: "Pod1",
+				}: []CoefficientComponent{
+					CoefficientComponent{1.0, 1.0},
+				},
+				podKey{
+					Pod: "Pod2",
+				}: []CoefficientComponent{
+					CoefficientComponent{1.0, 1.0},
+				},
+			},
+		},
+		{
+			name: "two pods total overlap",
+			pvcIntervalMap: map[podKey]kubecost.Window{
+				// Pod running from 8:30 am to 9 am
+				podKey{
+					Pod: "Pod1",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+				// Pod running from 8:30 am to 9 am
+				podKey{
+					Pod: "Pod2",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+			},
+			intervals: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 30, 0, 0, time.UTC), "start", podKey{Pod: "Pod2"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod2"}),
+			},
+			expected: map[podKey][]CoefficientComponent{
+				podKey{
+					Pod: "Pod1",
+				}: []CoefficientComponent{
+					CoefficientComponent{0.5, 1.0},
+				},
+				podKey{
+					Pod: "Pod2",
+				}: []CoefficientComponent{
+					CoefficientComponent{0.5, 1.0},
+				},
+			},
+		},
+		{
+			name: "one pod",
+			pvcIntervalMap: map[podKey]kubecost.Window{
+				// Pod running from 8 am to 9 am
+				podKey{
+					Pod: "Pod1",
+				}: kubecost.Window(kubecost.NewClosedWindow(
+					time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC),
+					time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC),
+				)),
+			},
+			intervals: []IntervalPoint{
+				NewIntervalPoint(time.Date(2021, 2, 19, 8, 0, 0, 0, time.UTC), "start", podKey{Pod: "Pod1"}),
+				NewIntervalPoint(time.Date(2021, 2, 19, 9, 0, 0, 0, time.UTC), "end", podKey{Pod: "Pod1"}),
+			},
+			expected: map[podKey][]CoefficientComponent{
+				podKey{
+					Pod: "Pod1",
+				}: []CoefficientComponent{
+					CoefficientComponent{1.0, 1.0},
+				},
+			},
+		},
+	}
+
+	for _, testCase := range cases {
+		t.Run(testCase.name, func(t *testing.T) {
+			result := getPVCCostCoefficients(testCase.intervals, testCase.pvcIntervalMap)
+
+			if !reflect.DeepEqual(result, testCase.expected) {
+				t.Errorf("getPVCCostCoefficients test failed: %s: Got %+v but expected %+v", testCase.name, result, testCase.expected)
+			}
+		})
+	}
+}

+ 37 - 2
pkg/costmodel/router.go

@@ -1343,9 +1343,31 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 
 
 	timeout := 120 * time.Second
 	timeout := 120 * time.Second
 	keepAlive := 120 * time.Second
 	keepAlive := 120 * time.Second
+	tlsHandshakeTimeout := 10 * time.Second
 	scrapeInterval := time.Minute
 	scrapeInterval := time.Minute
 
 
-	promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+	var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
+	if env.IsPrometheusRetryOnRateLimitResponse() {
+		rateLimitRetryOpts = &prom.RateLimitRetryOpts{
+			MaxRetries:       env.GetPrometheusRetryOnRateLimitMaxRetries(),
+			DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
+		}
+	}
+
+	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
+		Timeout:               timeout,
+		KeepAlive:             keepAlive,
+		TLSHandshakeTimeout:   tlsHandshakeTimeout,
+		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+		RateLimitRetryOpts:    rateLimitRetryOpts,
+		Auth: &prom.ClientAuth{
+			Username:    env.GetDBBasicAuthUsername(),
+			Password:    env.GetDBBasicAuthUserPassword(),
+			BearerToken: env.GetDBBearerToken(),
+		},
+		QueryConcurrency: queryConcurrency,
+		QueryLogFile:     "",
+	})
 	if err != nil {
 	if err != nil {
 		klog.Fatalf("Failed to create prometheus client, Error: %v", err)
 		klog.Fatalf("Failed to create prometheus client, Error: %v", err)
 	}
 	}
@@ -1455,7 +1477,20 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		thanosAddress := thanos.QueryURL()
 		thanosAddress := thanos.QueryURL()
 
 
 		if thanosAddress != "" {
 		if thanosAddress != "" {
-			thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
+			thanosCli, _ := thanos.NewThanosClient(thanosAddress, &prom.PrometheusClientConfig{
+				Timeout:               timeout,
+				KeepAlive:             keepAlive,
+				TLSHandshakeTimeout:   tlsHandshakeTimeout,
+				TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+				RateLimitRetryOpts:    rateLimitRetryOpts,
+				Auth: &prom.ClientAuth{
+					Username:    env.GetMultiClusterBasicAuthUsername(),
+					Password:    env.GetMultiClusterBasicAuthPassword(),
+					BearerToken: env.GetMultiClusterBearerToken(),
+				},
+				QueryConcurrency: queryConcurrency,
+				QueryLogFile:     env.GetQueryLoggingFile(),
+			})
 
 
 			_, err = prom.Validate(thanosCli)
 			_, err = prom.Validate(thanosCli)
 			if err != nil {
 			if err != nil {

+ 24 - 2
pkg/env/costmodelenv.go

@@ -82,7 +82,11 @@ const (
 	KubecostConfigBucketEnvVar    = "KUBECOST_CONFIG_BUCKET"
 	KubecostConfigBucketEnvVar    = "KUBECOST_CONFIG_BUCKET"
 	ClusterInfoFileEnabledEnvVar  = "CLUSTER_INFO_FILE_ENABLED"
 	ClusterInfoFileEnabledEnvVar  = "CLUSTER_INFO_FILE_ENABLED"
 	ClusterCacheFileEnabledEnvVar = "CLUSTER_CACHE_FILE_ENABLED"
 	ClusterCacheFileEnabledEnvVar = "CLUSTER_CACHE_FILE_ENABLED"
-	PrometheusQueryOffsetEnvVar   = "PROMETHEUS_QUERY_OFFSET"
+
+	PrometheusQueryOffsetEnvVar                 = "PROMETHEUS_QUERY_OFFSET"
+	PrometheusRetryOnRateLimitResponseEnvVar    = "PROMETHEUS_RETRY_ON_RATE_LIMIT"
+	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
+	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
 )
 )
 
 
 // GetKubecostConfigBucket returns a file location for a mounted bucket configuration which is used to store
 // GetKubecostConfigBucket returns a file location for a mounted bucket configuration which is used to store
@@ -103,6 +107,24 @@ func IsClusterCacheFileEnabled() bool {
 	return GetBool(ClusterCacheFileEnabledEnvVar, false)
 	return GetBool(ClusterCacheFileEnabledEnvVar, false)
 }
 }
 
 
+// IsPrometheusRetryOnRateLimitResponse will attempt to retry if a 429 response is received OR a 400 with a body containing
+// ThrottleException (common in AWS services like AMP)
+func IsPrometheusRetryOnRateLimitResponse() bool {
+	return GetBool(PrometheusRetryOnRateLimitResponseEnvVar, true)
+}
+
+// GetPrometheusRetryOnRateLimitMaxRetries returns the maximum number of retries that should be attempted prior to failing.
+// Only used if IsPrometheusRetryOnRateLimitResponse() is true.
+func GetPrometheusRetryOnRateLimitMaxRetries() int {
+	return GetInt(PrometheusRetryOnRateLimitMaxRetriesEnvVar, 5)
+}
+
+// GetPrometheusRetryOnRateLimitDefaultWait returns the default wait time for a retriable rate limit response without a
+// Retry-After header.
+func GetPrometheusRetryOnRateLimitDefaultWait() time.Duration {
+	return GetDuration(PrometheusRetryOnRateLimitDefaultWaitEnvVar, 100*time.Millisecond)
+}
+
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
 // to all non-range queries made via our query context. This should only be applied when there is a significant delay in
 // to all non-range queries made via our query context. This should only be applied when there is a significant delay in
 // data arriving in the target prom db. For example, if supplying a thanos or cortex querier for the prometheus server, using
 // data arriving in the target prom db. For example, if supplying a thanos or cortex querier for the prometheus server, using
@@ -134,7 +156,7 @@ func GetMetricsConfigmapName() string {
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
 // the AWS access key for authentication
 // the AWS access key for authentication
 func GetAppVersion() string {
 func GetAppVersion() string {
-	return Get(AppVersionEnvVar, "1.90.0")
+	return Get(AppVersionEnvVar, "1.90.1")
 }
 }
 
 
 // IsEmitNamespaceAnnotationsMetric returns true if cost-model is configured to emit the kube_namespace_annotations metric
 // IsEmitNamespaceAnnotationsMetric returns true if cost-model is configured to emit the kube_namespace_annotations metric

+ 12 - 0
pkg/env/env.go

@@ -2,6 +2,7 @@ package env
 
 
 import (
 import (
 	"os"
 	"os"
+	"time"
 
 
 	"github.com/kubecost/cost-model/pkg/util/mapper"
 	"github.com/kubecost/cost-model/pkg/util/mapper"
 )
 )
@@ -116,6 +117,12 @@ func GetBool(key string, defaultValue bool) bool {
 	return envMapper.GetBool(key, defaultValue)
 	return envMapper.GetBool(key, defaultValue)
 }
 }
 
 
+// GetDuration parses a time.Duration from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetDuration(key string, defaultValue time.Duration) time.Duration {
+	return envMapper.GetDuration(key, defaultValue)
+}
+
 // Set sets the environment variable for the key provided using the value provided.
 // Set sets the environment variable for the key provided using the value provided.
 func Set(key string, value string) error {
 func Set(key string, value string) error {
 	return envMapper.Set(key, value)
 	return envMapper.Set(key, value)
@@ -175,3 +182,8 @@ func SetUInt64(key string, value uint64) error {
 func SetBool(key string, value bool) error {
 func SetBool(key string, value bool) error {
 	return envMapper.SetBool(key, value)
 	return envMapper.SetBool(key, value)
 }
 }
+
+// SetDuration sets the environment variable to a string formatted time.Duration
+func SetDuration(key string, value time.Duration) error {
+	return envMapper.SetDuration(key, value)
+}

+ 168 - 62
pkg/kubecost/totals.go

@@ -54,6 +54,21 @@ func (art *AllocationTotals) TotalGPUCost() float64 {
 	return art.GPUCost + art.GPUCostAdjustment
 	return art.GPUCost + art.GPUCostAdjustment
 }
 }
 
 
+// TotalLoadBalancerCost returns LoadBalancer cost with adjustment.
+func (art *AllocationTotals) TotalLoadBalancerCost() float64 {
+	return art.LoadBalancerCost + art.LoadBalancerCostAdjustment
+}
+
+// TotalNetworkCost returns Network cost with adjustment.
+func (art *AllocationTotals) TotalNetworkCost() float64 {
+	return art.NetworkCost + art.NetworkCostAdjustment
+}
+
+// TotalPersistentVolumeCost returns PersistentVolume cost with adjustment.
+func (art *AllocationTotals) TotalPersistentVolumeCost() float64 {
+	return art.PersistentVolumeCost + art.PersistentVolumeCostAdjustment
+}
+
 // TotalRAMCost returns RAM cost with adjustment.
 // TotalRAMCost returns RAM cost with adjustment.
 func (art *AllocationTotals) TotalRAMCost() float64 {
 func (art *AllocationTotals) TotalRAMCost() float64 {
 	return art.RAMCost + art.RAMCostAdjustment
 	return art.RAMCost + art.RAMCostAdjustment
@@ -61,8 +76,8 @@ func (art *AllocationTotals) TotalRAMCost() float64 {
 
 
 // TotalCost returns the sum of all costs.
 // TotalCost returns the sum of all costs.
 func (art *AllocationTotals) TotalCost() float64 {
 func (art *AllocationTotals) TotalCost() float64 {
-	return art.TotalCPUCost() + art.TotalGPUCost() + art.LoadBalancerCost +
-		art.NetworkCost + art.PersistentVolumeCost + art.TotalRAMCost()
+	return art.TotalCPUCost() + art.TotalGPUCost() + art.TotalLoadBalancerCost() +
+		art.TotalNetworkCost() + art.TotalPersistentVolumeCost() + art.TotalRAMCost()
 }
 }
 
 
 // ComputeAllocationTotals totals the resource costs of the given AllocationSet
 // ComputeAllocationTotals totals the resource costs of the given AllocationSet
@@ -104,13 +119,22 @@ func ComputeAllocationTotals(as *AllocationSet, prop string) map[string]*Allocat
 		}
 		}
 
 
 		arts[key].Count++
 		arts[key].Count++
+
 		arts[key].CPUCost += alloc.CPUCost
 		arts[key].CPUCost += alloc.CPUCost
 		arts[key].CPUCostAdjustment += alloc.CPUCostAdjustment
 		arts[key].CPUCostAdjustment += alloc.CPUCostAdjustment
+
 		arts[key].GPUCost += alloc.GPUCost
 		arts[key].GPUCost += alloc.GPUCost
 		arts[key].GPUCostAdjustment += alloc.GPUCostAdjustment
 		arts[key].GPUCostAdjustment += alloc.GPUCostAdjustment
-		arts[key].LoadBalancerCost += alloc.LBTotalCost()
-		arts[key].NetworkCost += alloc.NetworkTotalCost()
-		arts[key].PersistentVolumeCost += alloc.PVCost()
+
+		arts[key].LoadBalancerCost += alloc.LoadBalancerCost
+		arts[key].LoadBalancerCostAdjustment += alloc.LoadBalancerCostAdjustment
+
+		arts[key].NetworkCost += alloc.NetworkCost
+		arts[key].NetworkCostAdjustment += alloc.NetworkCostAdjustment
+
+		arts[key].PersistentVolumeCost += alloc.PVCost() // NOTE: PVCost() does not include adjustment
+		arts[key].PersistentVolumeCostAdjustment += alloc.PVCostAdjustment
+
 		arts[key].RAMCost += alloc.RAMCost
 		arts[key].RAMCost += alloc.RAMCost
 		arts[key].RAMCostAdjustment += alloc.RAMCostAdjustment
 		arts[key].RAMCostAdjustment += alloc.RAMCostAdjustment
 	})
 	})
@@ -125,29 +149,48 @@ func ComputeAllocationTotals(as *AllocationSet, prop string) map[string]*Allocat
 // knowledge is required to carry out a task, but computing totals on-the-fly
 // knowledge is required to carry out a task, but computing totals on-the-fly
 // would be expensive; e.g. idle allocation, shared tenancy costs
 // would be expensive; e.g. idle allocation, shared tenancy costs
 type AssetTotals struct {
 type AssetTotals struct {
-	Start                 time.Time `json:"start"`
-	End                   time.Time `json:"end"`
-	Cluster               string    `json:"cluster"`
-	Node                  string    `json:"node"`
-	Count                 int       `json:"count"`
-	AttachedVolumeCost    float64   `json:"attachedVolumeCost"`
-	ClusterManagementCost float64   `json:"clusterManagementCost"`
-	CPUCost               float64   `json:"cpuCost"`
-	CPUCostAdjustment     float64   `json:"cpuCostAdjustment"`
-	GPUCost               float64   `json:"gpuCost"`
-	GPUCostAdjustment     float64   `json:"gpuCostAdjustment"`
-	PersistentVolumeCost  float64   `json:"persistentVolumeCost"`
-	RAMCost               float64   `json:"ramCost"`
-	RAMCostAdjustment     float64   `json:"ramCostAdjustment"`
+	Start                           time.Time `json:"start"`
+	End                             time.Time `json:"end"`
+	Cluster                         string    `json:"cluster"`
+	Node                            string    `json:"node"`
+	Count                           int       `json:"count"`
+	AttachedVolumeCost              float64   `json:"attachedVolumeCost"`
+	AttachedVolumeCostAdjustment    float64   `json:"attachedVolumeCostAdjustment"`
+	ClusterManagementCost           float64   `json:"clusterManagementCost"`
+	ClusterManagementCostAdjustment float64   `json:"clusterManagementCostAdjustment"`
+	CPUCost                         float64   `json:"cpuCost"`
+	CPUCostAdjustment               float64   `json:"cpuCostAdjustment"`
+	GPUCost                         float64   `json:"gpuCost"`
+	GPUCostAdjustment               float64   `json:"gpuCostAdjustment"`
+	LoadBalancerCost                float64   `json:"loadBalancerCost"`
+	LoadBalancerCostAdjustment      float64   `json:"loadBalancerCostAdjustment"`
+	PersistentVolumeCost            float64   `json:"persistentVolumeCost"`
+	PersistentVolumeCostAdjustment  float64   `json:"persistentVolumeCostAdjustment"`
+	RAMCost                         float64   `json:"ramCost"`
+	RAMCostAdjustment               float64   `json:"ramCostAdjustment"`
 }
 }
 
 
 // ClearAdjustments sets all adjustment fields to 0.0
 // ClearAdjustments sets all adjustment fields to 0.0
 func (art *AssetTotals) ClearAdjustments() {
 func (art *AssetTotals) ClearAdjustments() {
+	art.AttachedVolumeCostAdjustment = 0.0
+	art.ClusterManagementCostAdjustment = 0.0
 	art.CPUCostAdjustment = 0.0
 	art.CPUCostAdjustment = 0.0
 	art.GPUCostAdjustment = 0.0
 	art.GPUCostAdjustment = 0.0
+	art.LoadBalancerCostAdjustment = 0.0
+	art.PersistentVolumeCostAdjustment = 0.0
 	art.RAMCostAdjustment = 0.0
 	art.RAMCostAdjustment = 0.0
 }
 }
 
 
+// TotalAttachedVolumeCost returns CPU cost with adjustment.
+func (art *AssetTotals) TotalAttachedVolumeCost() float64 {
+	return art.AttachedVolumeCost + art.AttachedVolumeCostAdjustment
+}
+
+// TotalClusterManagementCost returns ClusterManagement cost with adjustment.
+func (art *AssetTotals) TotalClusterManagementCost() float64 {
+	return art.ClusterManagementCost + art.ClusterManagementCostAdjustment
+}
+
 // TotalCPUCost returns CPU cost with adjustment.
 // TotalCPUCost returns CPU cost with adjustment.
 func (art *AssetTotals) TotalCPUCost() float64 {
 func (art *AssetTotals) TotalCPUCost() float64 {
 	return art.CPUCost + art.CPUCostAdjustment
 	return art.CPUCost + art.CPUCostAdjustment
@@ -158,6 +201,16 @@ func (art *AssetTotals) TotalGPUCost() float64 {
 	return art.GPUCost + art.GPUCostAdjustment
 	return art.GPUCost + art.GPUCostAdjustment
 }
 }
 
 
+// TotalLoadBalancerCost returns LoadBalancer cost with adjustment.
+func (art *AssetTotals) TotalLoadBalancerCost() float64 {
+	return art.LoadBalancerCost + art.LoadBalancerCostAdjustment
+}
+
+// TotalPersistentVolumeCost returns PersistentVolume cost with adjustment.
+func (art *AssetTotals) TotalPersistentVolumeCost() float64 {
+	return art.PersistentVolumeCost + art.PersistentVolumeCostAdjustment
+}
+
 // TotalRAMCost returns RAM cost with adjustment.
 // TotalRAMCost returns RAM cost with adjustment.
 func (art *AssetTotals) TotalRAMCost() float64 {
 func (art *AssetTotals) TotalRAMCost() float64 {
 	return art.RAMCost + art.RAMCostAdjustment
 	return art.RAMCost + art.RAMCostAdjustment
@@ -165,8 +218,9 @@ func (art *AssetTotals) TotalRAMCost() float64 {
 
 
 // TotalCost returns the sum of all costs
 // TotalCost returns the sum of all costs
 func (art *AssetTotals) TotalCost() float64 {
 func (art *AssetTotals) TotalCost() float64 {
-	return art.AttachedVolumeCost + art.ClusterManagementCost + art.TotalCPUCost() +
-		art.TotalGPUCost() + art.PersistentVolumeCost + art.TotalRAMCost()
+	return art.TotalAttachedVolumeCost() + art.TotalClusterManagementCost() +
+		art.TotalCPUCost() + art.TotalGPUCost() + art.TotalLoadBalancerCost() +
+		art.TotalPersistentVolumeCost() + art.TotalRAMCost()
 }
 }
 
 
 // ComputeAssetTotals totals the resource costs of the given AssetSet,
 // ComputeAssetTotals totals the resource costs of the given AssetSet,
@@ -198,9 +252,9 @@ func ComputeAssetTotals(as *AssetSet, prop AssetProperty) map[string]*AssetTotal
 			// by the adjustment. This is necessary because we only get one
 			// by the adjustment. This is necessary because we only get one
 			// adjustment per Node, not one per-resource-per-Node.
 			// adjustment per Node, not one per-resource-per-Node.
 			//
 			//
-			// e.g. total cost = $90, adjustment = -$10 => 0.9
-			// e.g. total cost = $150, adjustment = -$300 => 0.3333
-			// e.g. total cost = $150, adjustment = $50 => 1.5
+			// e.g. total cost =  $90 (cost = $100, adjustment = -$10)  => 0.9000 ( 90 / 100)
+			// e.g. total cost = $150 (cost = $450, adjustment = -$300) => 0.3333 (150 / 450)
+			// e.g. total cost = $150 (cost = $100, adjustment = $50)   => 1.5000 (150 / 100)
 			adjustmentRate := 1.0
 			adjustmentRate := 1.0
 			if node.TotalCost()-node.Adjustment() == 0 {
 			if node.TotalCost()-node.Adjustment() == 0 {
 				// If (totalCost - adjustment) is 0.0 then adjustment cancels
 				// If (totalCost - adjustment) is 0.0 then adjustment cancels
@@ -214,17 +268,34 @@ func ComputeAssetTotals(as *AssetSet, prop AssetProperty) map[string]*AssetTotal
 				adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
 				adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
 			}
 			}
 
 
-			totalCPUCost := node.CPUCost * (1.0 - node.Discount)
-			cpuCost := totalCPUCost * adjustmentRate
-			cpuCostAdjustment := totalCPUCost - cpuCost
+			// 1. Start with raw, measured resource cost
+			// 2. Apply discount to get discounted resource cost
+			// 3. Apply adjustment to get final "adjusted" resource cost
+			// 4. Subtract (3 - 2) to get adjustment in doller-terms
+			// 5. Use (2 + 4) as total cost, so (2) is "cost" and (4) is "adjustment"
 
 
-			totalGPUCost := node.GPUCost * (1.0 - node.Discount)
-			gpuCost := totalGPUCost * adjustmentRate
-			gpuCostAdjustment := totalGPUCost - gpuCost
+			// Example:
+			// - node.CPUCost   = 10.00
+			// - node.Discount  =  0.20  // We assume a 20% discount
+			// - adjustmentRate =  0.75  // CUR says we need to reduce to 75% of our post-discount node cost
+			//
+			// 1. See above
+			// 2. discountedCPUCost = 10.00 * (1.0 - 0.2) =  8.00
+			// 3. adjustedCPUCost   =  8.00 * 0.75        =  6.00  // this is the actual cost according to the CUR
+			// 4. adjustment        =  6.00 - 8.00        = -2.00
+			// 5. totalCost = 6.00, which is the sum of (2) cost = 8.00 and (4) adjustment = -2.00
+
+			discountedCPUCost := node.CPUCost * (1.0 - node.Discount)
+			adjustedCPUCost := discountedCPUCost * adjustmentRate
+			cpuCostAdjustment := adjustedCPUCost - discountedCPUCost
+
+			discountedGPUCost := node.GPUCost * (1.0 - node.Discount)
+			adjustedGPUCost := discountedGPUCost * adjustmentRate
+			gpuCostAdjustment := discountedGPUCost - adjustedGPUCost
 
 
-			totalRAMCost := node.RAMCost * (1.0 - node.Discount)
-			ramCost := totalRAMCost * adjustmentRate
-			ramCostAdjustment := totalRAMCost - ramCost
+			discountedRAMCost := node.RAMCost * (1.0 - node.Discount)
+			adjustedRAMCost := discountedRAMCost * adjustmentRate
+			ramCostAdjustment := adjustedRAMCost - discountedRAMCost
 
 
 			if _, ok := arts[key]; !ok {
 			if _, ok := arts[key]; !ok {
 				arts[key] = &AssetTotals{
 				arts[key] = &AssetTotals{
@@ -247,15 +318,34 @@ func ComputeAssetTotals(as *AssetSet, prop AssetProperty) map[string]*AssetTotal
 			}
 			}
 
 
 			arts[key].Count++
 			arts[key].Count++
-			arts[key].CPUCost += cpuCost
+
+			// TotalCPUCost will be discounted cost + adjustment
+			arts[key].CPUCost += discountedCPUCost
 			arts[key].CPUCostAdjustment += cpuCostAdjustment
 			arts[key].CPUCostAdjustment += cpuCostAdjustment
-			arts[key].RAMCost += ramCost
+
+			// TotalRAMCost will be discounted cost + adjustment
+			arts[key].RAMCost += discountedRAMCost
 			arts[key].RAMCostAdjustment += ramCostAdjustment
 			arts[key].RAMCostAdjustment += ramCostAdjustment
-			arts[key].GPUCost += gpuCost
+
+			// TotalGPUCost will be discounted cost + adjustment
+			arts[key].GPUCost += discountedGPUCost
 			arts[key].GPUCostAdjustment += gpuCostAdjustment
 			arts[key].GPUCostAdjustment += gpuCostAdjustment
-		} else if disk, ok := asset.(*Disk); ok {
-			key := fmt.Sprintf("%s/%s", disk.Properties().Cluster, disk.Properties().Name)
-			disks[key] = disk
+		} else if lb, ok := asset.(*LoadBalancer); ok && prop == AssetClusterProp {
+			// Only record load balancers when prop is Cluster because we
+			// can't break down LoadBalancer by node.
+			key := lb.Properties().Cluster
+
+			if _, ok := arts[key]; !ok {
+				arts[key] = &AssetTotals{
+					Start:   lb.Start(),
+					End:     lb.End(),
+					Cluster: lb.Properties().Cluster,
+				}
+			}
+
+			arts[key].Count++
+			arts[key].LoadBalancerCost += lb.Cost
+			arts[key].LoadBalancerCost += lb.adjustment
 		} else if cm, ok := asset.(*ClusterManagement); ok && prop == AssetClusterProp {
 		} else if cm, ok := asset.(*ClusterManagement); ok && prop == AssetClusterProp {
 			// Only record cluster management when prop is Cluster because we
 			// Only record cluster management when prop is Cluster because we
 			// can't break down ClusterManagement by node.
 			// can't break down ClusterManagement by node.
@@ -271,34 +361,50 @@ func ComputeAssetTotals(as *AssetSet, prop AssetProperty) map[string]*AssetTotal
 
 
 			arts[key].Count++
 			arts[key].Count++
 			arts[key].ClusterManagementCost += cm.TotalCost()
 			arts[key].ClusterManagementCost += cm.TotalCost()
+		} else if disk, ok := asset.(*Disk); ok {
+			// Record disks in an intermediate structure, which will be
+			// processed after all assets have been seen.
+			key := fmt.Sprintf("%s/%s", disk.Properties().Cluster, disk.Properties().Name)
+
+			disks[key] = disk
 		}
 		}
 	})
 	})
 
 
-	// Identify attached volumes as disks with names matching a node's name
-	for name := range nodeNames {
-		if disk, ok := disks[name]; ok {
-			// By default, the key will be the name, which is the tuple of
-			// cluster/node. But if we're aggregating by cluster only, then
-			// reset the key to just the cluster.
-			key := name
-			if prop == AssetClusterProp {
-				key = disk.Properties().Cluster
-			}
+	// Record all disks as either attached volumes or persistent volumes.
+	for name, disk := range disks {
+		// By default, the key will be the name, which is the tuple of
+		// cluster/node. But if we're aggregating by cluster only, then
+		// reset the key to just the cluster.
+		key := name
+		if prop == AssetClusterProp {
+			key = disk.Properties().Cluster
+		}
 
 
-			if _, ok := arts[key]; !ok {
-				arts[key] = &AssetTotals{
-					Start:   disk.Start(),
-					End:     disk.End(),
-					Cluster: disk.Properties().Cluster,
-				}
+		if _, ok := arts[key]; !ok {
+			arts[key] = &AssetTotals{
+				Start:   disk.Start(),
+				End:     disk.End(),
+				Cluster: disk.Properties().Cluster,
+			}
 
 
-				if prop == AssetNodeProp {
-					arts[key].Node = disk.Properties().Name
-				}
+			if prop == AssetNodeProp {
+				arts[key].Node = disk.Properties().Name
 			}
 			}
+		}
 
 
+		_, isAttached := nodeNames[name]
+		if isAttached {
+			// Record attached volume data at the cluster and node level, using
+			// name matching to distinguish from PersistentVolumes.
+			// TODO can we make a stronger match at the underlying ETL layer?
+			arts[key].Count++
+			arts[key].AttachedVolumeCost += disk.Cost
+			arts[key].AttachedVolumeCostAdjustment += disk.adjustment
+		} else if prop == AssetClusterProp {
+			// Only record PersistentVolume data at the cluster level
 			arts[key].Count++
 			arts[key].Count++
-			arts[key].AttachedVolumeCost += disk.TotalCost()
+			arts[key].PersistentVolumeCost += disk.Cost
+			arts[key].PersistentVolumeCostAdjustment += disk.adjustment
 		}
 		}
 	}
 	}
 
 
@@ -328,15 +434,15 @@ func ComputeIdleCoefficients(shareSplit, key string, cpuCost, gpuCost, ramCost f
 	}
 	}
 
 
 	if allocationTotals[key].CPUCost > 0 {
 	if allocationTotals[key].CPUCost > 0 {
-		cpuCoeff = cpuCost / allocationTotals[key].CPUCost
+		cpuCoeff = cpuCost / allocationTotals[key].TotalCPUCost()
 	}
 	}
 
 
 	if allocationTotals[key].GPUCost > 0 {
 	if allocationTotals[key].GPUCost > 0 {
-		gpuCoeff = cpuCost / allocationTotals[key].GPUCost
+		gpuCoeff = gpuCost / allocationTotals[key].TotalGPUCost()
 	}
 	}
 
 
 	if allocationTotals[key].RAMCost > 0 {
 	if allocationTotals[key].RAMCost > 0 {
-		ramCoeff = ramCost / allocationTotals[key].RAMCost
+		ramCoeff = ramCost / allocationTotals[key].TotalRAMCost()
 	}
 	}
 
 
 	return cpuCoeff, gpuCoeff, ramCoeff
 	return cpuCoeff, gpuCoeff, ramCoeff

+ 154 - 32
pkg/prom/prom.go

@@ -3,14 +3,15 @@ package prom
 import (
 import (
 	"context"
 	"context"
 	"crypto/tls"
 	"crypto/tls"
+	"fmt"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"os"
 	"os"
+	"strings"
 	"time"
 	"time"
 
 
 	"github.com/kubecost/cost-model/pkg/collections"
 	"github.com/kubecost/cost-model/pkg/collections"
-	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util/atomic"
 	"github.com/kubecost/cost-model/pkg/util/atomic"
 	"github.com/kubecost/cost-model/pkg/util/fileutil"
 	"github.com/kubecost/cost-model/pkg/util/fileutil"
@@ -56,6 +57,53 @@ func (auth *ClientAuth) Apply(req *http.Request) {
 	}
 	}
 }
 }
 
 
+//--------------------------------------------------------------------------
+//  Rate Limit Options
+//--------------------------------------------------------------------------
+
+// MaxRetryAfterDuration is the maximum amount of time we should ever wait
+// during a retry. This is to prevent starvation on the request threads
+const MaxRetryAfterDuration = 10 * time.Second
+
+// RateLimitRetryOpts contains retry options
+type RateLimitRetryOpts struct {
+	MaxRetries       int
+	DefaultRetryWait time.Duration
+}
+
+// RateLimitResponseStatus contains the status of the rate limited retries
+type RateLimitResponseStatus struct {
+	RetriesRemaining int
+	WaitTime         time.Duration
+}
+
+// String creates a string representation of the rate limit status
+func (rtrs *RateLimitResponseStatus) String() string {
+	return fmt.Sprintf("Wait Time: %.2f seconds, Retries Remaining: %d", rtrs.WaitTime.Seconds(), rtrs.RetriesRemaining)
+}
+
+// RateLimitedError contains a list of retry statuses that occurred during
+// retries on a rate limited response
+type RateLimitedResponseError struct {
+	RateLimitStatus []*RateLimitResponseStatus
+}
+
+// Error returns a string representation of the error, including the rate limit
+// status reports
+func (rlre *RateLimitedResponseError) Error() string {
+	var sb strings.Builder
+
+	sb.WriteString("Request was Rate Limited and Retries Exhausted:\n")
+
+	for _, rls := range rlre.RateLimitStatus {
+		sb.WriteString(" * ")
+		sb.WriteString(rls.String())
+		sb.WriteString("\n")
+	}
+
+	return sb.String()
+}
+
 //--------------------------------------------------------------------------
 //--------------------------------------------------------------------------
 //  RateLimitedPrometheusClient
 //  RateLimitedPrometheusClient
 //--------------------------------------------------------------------------
 //--------------------------------------------------------------------------
@@ -63,13 +111,14 @@ func (auth *ClientAuth) Apply(req *http.Request) {
 // RateLimitedPrometheusClient is a prometheus client which limits the total number of
 // RateLimitedPrometheusClient is a prometheus client which limits the total number of
 // concurrent outbound requests allowed at a given moment.
 // concurrent outbound requests allowed at a given moment.
 type RateLimitedPrometheusClient struct {
 type RateLimitedPrometheusClient struct {
-	id         string
-	client     prometheus.Client
-	auth       *ClientAuth
-	queue      collections.BlockingQueue
-	decorator  QueryParamsDecorator
-	outbound   *atomic.AtomicInt32
-	fileLogger *golog.Logger
+	id             string
+	client         prometheus.Client
+	auth           *ClientAuth
+	queue          collections.BlockingQueue
+	decorator      QueryParamsDecorator
+	rateLimitRetry *RateLimitRetryOpts
+	outbound       *atomic.AtomicInt32
+	fileLogger     *golog.Logger
 }
 }
 
 
 // requestCounter is used to determine if the prometheus client keeps track of
 // requestCounter is used to determine if the prometheus client keeps track of
@@ -81,11 +130,14 @@ type requestCounter interface {
 
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
 // prometheus requests.
-func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, decorator QueryParamsDecorator, queryLogFile string) (prometheus.Client, error) {
-	c, err := prometheus.NewClient(config)
-	if err != nil {
-		return nil, err
-	}
+func NewRateLimitedClient(
+	id string,
+	client prometheus.Client,
+	maxConcurrency int,
+	auth *ClientAuth,
+	decorator QueryParamsDecorator,
+	rateLimitRetryOpts *RateLimitRetryOpts,
+	queryLogFile string) (prometheus.Client, error) {
 
 
 	queue := collections.NewBlockingQueue()
 	queue := collections.NewBlockingQueue()
 	outbound := atomic.NewAtomicInt32(0)
 	outbound := atomic.NewAtomicInt32(0)
@@ -105,14 +157,24 @@ func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency in
 		}
 		}
 	}
 	}
 
 
+	// default authentication
+	if auth == nil {
+		auth = &ClientAuth{
+			Username:    "",
+			Password:    "",
+			BearerToken: "",
+		}
+	}
+
 	rlpc := &RateLimitedPrometheusClient{
 	rlpc := &RateLimitedPrometheusClient{
-		id:         id,
-		client:     c,
-		queue:      queue,
-		decorator:  decorator,
-		outbound:   outbound,
-		auth:       auth,
-		fileLogger: logger,
+		id:             id,
+		client:         client,
+		queue:          queue,
+		decorator:      decorator,
+		rateLimitRetry: rateLimitRetryOpts,
+		outbound:       outbound,
+		auth:           auth,
+		fileLogger:     logger,
 	}
 	}
 
 
 	// Start concurrent request processing
 	// Start concurrent request processing
@@ -168,6 +230,9 @@ type workResponse struct {
 
 
 // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
 // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
 func (rlpc *RateLimitedPrometheusClient) worker() {
 func (rlpc *RateLimitedPrometheusClient) worker() {
+	retryOpts := rlpc.rateLimitRetry
+	retryRateLimit := retryOpts != nil
+
 	for {
 	for {
 		// blocks until there is an item available
 		// blocks until there is an item available
 		item := rlpc.queue.Dequeue()
 		item := rlpc.queue.Dequeue()
@@ -198,6 +263,43 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 			roundTripStart := time.Now()
 			roundTripStart := time.Now()
 			res, body, warnings, err := rlpc.client.Do(ctx, req)
 			res, body, warnings, err := rlpc.client.Do(ctx, req)
 
 
+			// If retries on rate limited response is enabled:
+			// * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
+			// * Attempt to parse a Retry-After from response headers (common on 429)
+			// * If we couldn't determine how long to wait for a retry, use 1 second by default
+			if retryRateLimit {
+				var status []*RateLimitResponseStatus
+				var retries int = retryOpts.MaxRetries
+				var defaultWait time.Duration = retryOpts.DefaultRetryWait
+
+				for httputil.IsRateLimited(res, body) && retries > 0 {
+					// calculate amount of time to wait before retry, in the event the default wait is used,
+					// an exponential backoff is applied based on the number of times we've retried.
+					retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
+					retries--
+
+					status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
+					log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
+
+					// To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
+					// to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
+					// don't want to block for 10 days.
+					if retryAfter > MaxRetryAfterDuration {
+						retryAfter = MaxRetryAfterDuration
+					}
+
+					// execute wait and retry
+					time.Sleep(retryAfter)
+					res, body, warnings, err = rlpc.client.Do(ctx, req)
+				}
+
+				// if we've broken out of our retry loop and the resp is still rate limited,
+				// then let's generate a meaningful error to pass back
+				if retries == 0 && httputil.IsRateLimited(res, body) {
+					err = &RateLimitedResponseError{RateLimitStatus: status}
+				}
+			}
+
 			// Decrement outbound counter
 			// Decrement outbound counter
 			rlpc.outbound.Decrement()
 			rlpc.outbound.Decrement()
 			LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
 			LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
@@ -245,30 +347,50 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 //  Client Helpers
 //  Client Helpers
 //--------------------------------------------------------------------------
 //--------------------------------------------------------------------------
 
 
-func NewPrometheusClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
-	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
+// PrometheusClientConfig contains all configurable options for creating a new prometheus client
+type PrometheusClientConfig struct {
+	Timeout               time.Duration
+	KeepAlive             time.Duration
+	TLSHandshakeTimeout   time.Duration
+	TLSInsecureSkipVerify bool
+	RateLimitRetryOpts    *RateLimitRetryOpts
+	Auth                  *ClientAuth
+	QueryConcurrency      int
+	QueryLogFile          string
+}
 
 
-	// may be necessary for long prometheus queries. TODO: make this configurable
+// NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
+func NewPrometheusClient(address string, config *PrometheusClientConfig) (prometheus.Client, error) {
+	// may be necessary for long prometheus queries
 	pc := prometheus.Config{
 	pc := prometheus.Config{
 		Address: address,
 		Address: address,
 		RoundTripper: &http.Transport{
 		RoundTripper: &http.Transport{
 			Proxy: http.ProxyFromEnvironment,
 			Proxy: http.ProxyFromEnvironment,
 			DialContext: (&net.Dialer{
 			DialContext: (&net.Dialer{
-				Timeout:   timeout,
-				KeepAlive: keepAlive,
+				Timeout:   config.Timeout,
+				KeepAlive: config.KeepAlive,
 			}).DialContext,
 			}).DialContext,
-			TLSHandshakeTimeout: 10 * time.Second,
-			TLSClientConfig:     tlsConfig,
+			TLSHandshakeTimeout: config.TLSHandshakeTimeout,
+			TLSClientConfig: &tls.Config{
+				InsecureSkipVerify: config.TLSInsecureSkipVerify,
+			},
 		},
 		},
 	}
 	}
 
 
-	auth := &ClientAuth{
-		Username:    env.GetDBBasicAuthUsername(),
-		Password:    env.GetDBBasicAuthUserPassword(),
-		BearerToken: env.GetDBBearerToken(),
+	client, err := prometheus.NewClient(pc)
+	if err != nil {
+		return nil, err
 	}
 	}
 
 
-	return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, nil, queryLogFile)
+	return NewRateLimitedClient(
+		PrometheusClientID,
+		client,
+		config.QueryConcurrency,
+		config.Auth,
+		nil,
+		config.RateLimitRetryOpts,
+		config.QueryLogFile,
+	)
 }
 }
 
 
 // LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
 // LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent

+ 382 - 0
pkg/prom/ratelimitedclient_test.go

@@ -0,0 +1,382 @@
+package prom
+
+import (
+	"bytes"
+	"context"
+	"io"
+	"math"
+	"net/http"
+
+	"net/url"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/httputil"
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+// ResponseAndBody is just a test objet used to hold predefined responses
+// and response bodies
+type ResponseAndBody struct {
+	Response *http.Response
+	Body     []byte
+}
+
+// MockPromClient accepts a slice of responses and bodies to return on requests made.
+// It will cycle these responses linearly, then reset back to the first.
+// Also works with concurrent requests.
+type MockPromClient struct {
+	sync.Mutex
+	responses []*ResponseAndBody
+	current   int
+}
+
+// prometheus.Client URL()
+func (mpc *MockPromClient) URL(ep string, args map[string]string) *url.URL {
+	return nil
+}
+
+// prometheus.Client Do
+func (mpc *MockPromClient) Do(context.Context, *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
+	// fake latency
+	time.Sleep(250 * time.Millisecond)
+
+	mpc.Lock()
+	defer mpc.Unlock()
+	rnb := mpc.responses[mpc.current]
+	mpc.current++
+	if mpc.current >= len(mpc.responses) {
+		mpc.current = 0
+	}
+
+	return rnb.Response, rnb.Body, nil, nil
+}
+
+// Creates a new mock prometheus client
+func newMockPromClientWith(responses []*ResponseAndBody) prometheus.Client {
+	return &MockPromClient{
+		responses: responses,
+		current:   0,
+	}
+}
+
+// creates a ResponseAndBody representing a 200 status code
+func newSuccessfulResponse() *ResponseAndBody {
+	body := []byte("Success")
+
+	return &ResponseAndBody{
+		Response: &http.Response{
+			StatusCode: 200,
+			Body:       io.NopCloser(bytes.NewReader(body)),
+		},
+		Body: body,
+	}
+}
+
+// creates a ResponseAndBody representing a 400 status code
+func newFailureResponse() *ResponseAndBody {
+	body := []byte("Fail")
+
+	return &ResponseAndBody{
+		Response: &http.Response{
+			StatusCode: 400,
+			Body:       io.NopCloser(bytes.NewReader(body)),
+		},
+		Body: body,
+	}
+}
+
+// creates a ResponseAndBody representing a 429 status code and 'Retry-After' header
+func newNormalRateLimitedResponse(retryAfter string) *ResponseAndBody {
+	body := []byte("Rate Limitted")
+
+	return &ResponseAndBody{
+		Response: &http.Response{
+			StatusCode: 429,
+			Header: http.Header{
+				"Retry-After": []string{retryAfter},
+			},
+			Body: io.NopCloser(bytes.NewReader(body)),
+		},
+		Body: body,
+	}
+}
+
+// creates a ResponseAndBody representing some amazon services ThrottlingException 400 status
+func newHackyAmazonRateLimitedResponse() *ResponseAndBody {
+	body := []byte("<ThrottlingException>\n  <Message>Rate exceeded</Message>\n</ThrottlingException>\n")
+
+	return &ResponseAndBody{
+		Response: &http.Response{
+			StatusCode: 400,
+			Body:       io.NopCloser(bytes.NewReader(body)),
+		},
+		Body: body,
+	}
+}
+
+func newTestRetryOpts() *RateLimitRetryOpts {
+	return &RateLimitRetryOpts{
+		MaxRetries:       5,
+		DefaultRetryWait: 100 * time.Millisecond,
+	}
+}
+
+func TestRateLimitedOnceAndSuccess(t *testing.T) {
+	t.Parallel()
+
+	// creates a prom client with hard coded responses for any requests that
+	// are issued
+	promClient := newMockPromClientWith([]*ResponseAndBody{
+		newNormalRateLimitedResponse("2"),
+		newSuccessfulResponse(),
+	})
+
+	client, err := NewRateLimitedClient(
+		"TestClient",
+		promClient,
+		1,
+		nil,
+		nil,
+		newTestRetryOpts(),
+		"",
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req, err := http.NewRequest(http.MethodPost, "", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// we just need to execute this  once to see retries in effect
+	res, body, _, err := client.Do(context.Background(), req)
+
+	if res.StatusCode != 200 {
+		t.Fatalf("200 StatusCode expected. Got: %d", res.StatusCode)
+	}
+
+	if string(body) != "Success" {
+		t.Fatalf("Expected 'Success' message body. Got: %s", string(body))
+	}
+}
+
+func TestRateLimitedOnceAndFail(t *testing.T) {
+	t.Parallel()
+
+	// creates a prom client with hard coded responses for any requests that
+	// are issued
+	promClient := newMockPromClientWith([]*ResponseAndBody{
+		newNormalRateLimitedResponse("2"),
+		newFailureResponse(),
+	})
+
+	client, err := NewRateLimitedClient(
+		"TestClient",
+		promClient,
+		1,
+		nil,
+		nil,
+		newTestRetryOpts(),
+		"",
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req, err := http.NewRequest(http.MethodPost, "", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// we just need to execute this  once to see retries in effect
+	res, body, _, err := client.Do(context.Background(), req)
+
+	if res.StatusCode != 400 {
+		t.Fatalf("400 StatusCode expected. Got: %d", res.StatusCode)
+	}
+
+	if string(body) != "Fail" {
+		t.Fatalf("Expected 'fail' message body. Got: %s", string(body))
+	}
+}
+
+func TestRateLimitedResponses(t *testing.T) {
+	t.Parallel()
+
+	dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
+
+	// creates a prom client with hard coded responses for any requests that
+	// are issued
+	promClient := newMockPromClientWith([]*ResponseAndBody{
+		newNormalRateLimitedResponse("2"),
+		newNormalRateLimitedResponse(dateRetry),
+		newHackyAmazonRateLimitedResponse(),
+		newHackyAmazonRateLimitedResponse(),
+		newNormalRateLimitedResponse("3"),
+	})
+
+	client, err := NewRateLimitedClient(
+		"TestClient",
+		promClient,
+		1,
+		nil,
+		nil,
+		newTestRetryOpts(),
+		"",
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req, err := http.NewRequest(http.MethodPost, "", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// we just need to execute this  once to see retries in effect
+	_, _, _, err = client.Do(context.Background(), req)
+
+	if err == nil {
+		t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
+	}
+
+	rateLimitErr, ok := err.(*RateLimitedResponseError)
+	if !ok {
+		t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
+	}
+
+	t.Logf("%s\n", rateLimitErr.Error())
+
+	// RateLimitedResponseStatus checks just ensure that wait times were close configuration
+	rateLimitRetries := rateLimitErr.RateLimitStatus
+
+	if len(rateLimitRetries) != 5 {
+		t.Fatalf("Expected 5 retries. Got: %d", len(rateLimitRetries))
+	}
+
+	// check 2s wait after
+	seconds := rateLimitRetries[0].WaitTime.Seconds()
+	if !util.IsApproximately(seconds, 2.0) {
+		t.Fatalf("Expected 2.0 seconds. Got %.2f", seconds)
+	}
+
+	// check to see if fuzzed wait time for datetime parsing
+	seconds = rateLimitRetries[1].WaitTime.Seconds()
+	if math.Abs(seconds-2.0) > 3.0 {
+		t.Fatalf("Expected delta between 2s and resulting wait time to be within 3s. Seconds: %.2f, Delta: %.2f", seconds, math.Abs(seconds-2.0))
+	}
+
+	// check 1s wait
+	seconds = rateLimitRetries[2].WaitTime.Seconds()
+	if !util.IsApproximately(seconds, 0.4) {
+		t.Fatalf("Expected 0.4 seconds. Got %.2f", seconds)
+	}
+
+	// check 1s wait
+	seconds = rateLimitRetries[3].WaitTime.Seconds()
+	if !util.IsApproximately(seconds, 0.8) {
+		t.Fatalf("Expected 0.8 seconds. Got %.2f", seconds)
+	}
+
+	// check 3s wait
+	seconds = rateLimitRetries[4].WaitTime.Seconds()
+	if !util.IsApproximately(seconds, 3.0) {
+		t.Fatalf("Expected 3.0 seconds. Got %.2f", seconds)
+	}
+
+}
+
+//
+func AssertDurationEqual(t *testing.T, expected, actual time.Duration) {
+	if actual != expected {
+		t.Fatalf("Expected: %dms, Got: %dms", expected.Milliseconds(), actual.Milliseconds())
+	}
+}
+
+func TestExponentialBackOff(t *testing.T) {
+	var ExpectedResults = []time.Duration{
+		100 * time.Millisecond,
+		200 * time.Millisecond,
+		400 * time.Millisecond,
+		800 * time.Millisecond,
+		1600 * time.Millisecond,
+	}
+
+	w := 100 * time.Millisecond
+
+	for retry := 0; retry < 5; retry++ {
+		AssertDurationEqual(t, ExpectedResults[retry], httputil.ExponentialBackoffWaitFor(w, retry))
+	}
+}
+
+func TestConcurrentRateLimiting(t *testing.T) {
+	t.Parallel()
+
+	// Set QueryConcurrency to 3 here, then add a few for total requests
+	const QueryConcurrency = 3
+	const TotalRequests = QueryConcurrency + 2
+
+	dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
+
+	// creates a prom client with hard coded responses for any requests that
+	// are issued
+	promClient := newMockPromClientWith([]*ResponseAndBody{
+		newNormalRateLimitedResponse("2"),
+		newNormalRateLimitedResponse(dateRetry),
+		newHackyAmazonRateLimitedResponse(),
+		newHackyAmazonRateLimitedResponse(),
+		newNormalRateLimitedResponse("3"),
+	})
+
+	client, err := NewRateLimitedClient(
+		"TestClient",
+		promClient,
+		QueryConcurrency,
+		nil,
+		nil,
+		newTestRetryOpts(),
+		"",
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	errs := make(chan error, TotalRequests)
+
+	for i := 0; i < TotalRequests; i++ {
+		go func() {
+			req, err := http.NewRequest(http.MethodPost, "", nil)
+			if err != nil {
+				errs <- err
+				return
+			}
+
+			// we just need to execute this  once to see retries in effect
+			_, _, _, err = client.Do(context.Background(), req)
+
+			errs <- err
+		}()
+	}
+
+	for i := 0; i < TotalRequests; i++ {
+		err := <-errs
+		if err == nil {
+			t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
+		}
+
+		rateLimitErr, ok := err.(*RateLimitedResponseError)
+		if !ok {
+			t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
+		}
+
+		t.Logf("%s\n", rateLimitErr.Error())
+	}
+}

+ 19 - 12
pkg/thanos/thanos.go

@@ -67,26 +67,25 @@ func QueryOffset() string {
 	return queryOffset
 	return queryOffset
 }
 }
 
 
-func NewThanosClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
-	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
-
+func NewThanosClient(address string, config *prom.PrometheusClientConfig) (prometheus.Client, error) {
 	tc := prometheus.Config{
 	tc := prometheus.Config{
 		Address: address,
 		Address: address,
 		RoundTripper: &http.Transport{
 		RoundTripper: &http.Transport{
 			Proxy: http.ProxyFromEnvironment,
 			Proxy: http.ProxyFromEnvironment,
 			DialContext: (&net.Dialer{
 			DialContext: (&net.Dialer{
-				Timeout:   timeout,
-				KeepAlive: keepAlive,
+				Timeout:   config.Timeout,
+				KeepAlive: config.KeepAlive,
 			}).DialContext,
 			}).DialContext,
-			TLSHandshakeTimeout: 10 * time.Second,
-			TLSClientConfig:     tlsConfig,
+			TLSHandshakeTimeout: config.TLSHandshakeTimeout,
+			TLSClientConfig: &tls.Config{
+				InsecureSkipVerify: config.TLSInsecureSkipVerify,
+			},
 		},
 		},
 	}
 	}
 
 
-	auth := &prom.ClientAuth{
-		Username:    env.GetMultiClusterBasicAuthUsername(),
-		Password:    env.GetMultiClusterBasicAuthPassword(),
-		BearerToken: env.GetMultiClusterBearerToken(),
+	client, err := prometheus.NewClient(tc)
+	if err != nil {
+		return nil, err
 	}
 	}
 
 
 	// max source resolution decorator
 	// max source resolution decorator
@@ -97,5 +96,13 @@ func NewThanosClient(address string, timeout, keepAlive time.Duration, queryConc
 		return queryParams
 		return queryParams
 	}
 	}
 
 
-	return prom.NewRateLimitedClient(prom.ThanosClientID, tc, queryConcurrency, auth, maxSourceDecorator, queryLogFile)
+	return prom.NewRateLimitedClient(
+		prom.ThanosClientID,
+		client,
+		config.QueryConcurrency,
+		config.Auth,
+		maxSourceDecorator,
+		config.RateLimitRetryOpts,
+		config.QueryLogFile,
+	)
 }
 }

+ 65 - 0
pkg/util/httputil/httputil.go

@@ -3,9 +3,12 @@ package httputil
 import (
 import (
 	"context"
 	"context"
 	"fmt"
 	"fmt"
+	"math"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
+	"strconv"
 	"strings"
 	"strings"
+	"time"
 
 
 	"github.com/kubecost/cost-model/pkg/util/mapper"
 	"github.com/kubecost/cost-model/pkg/util/mapper"
 )
 )
@@ -90,6 +93,68 @@ func SetQuery(r *http.Request, query string) *http.Request {
 //  Package Funcs
 //  Package Funcs
 //--------------------------------------------------------------------------
 //--------------------------------------------------------------------------
 
 
+// IsRateLimited accepts a response and body to determine if either indicate
+// a rate limited return
+func IsRateLimited(resp *http.Response, body []byte) bool {
+	return IsRateLimitedResponse(resp) || IsRateLimitedBody(resp, body)
+}
+
+// RateLimitedRetryFor returns the parsed Retry-After header relative to the
+// current time. If the Retry-After header does not exist, the defaultWait parameter
+// is returned.
+func RateLimitedRetryFor(resp *http.Response, defaultWait time.Duration, retry int) time.Duration {
+	if resp.Header == nil {
+		return ExponentialBackoffWaitFor(defaultWait, retry)
+	}
+
+	// Retry-After is either the number of seconds to wait or a target datetime (RFC1123)
+	value := resp.Header.Get("Retry-After")
+	if value == "" {
+		return defaultWait
+	}
+
+	seconds, err := strconv.ParseInt(value, 10, 64)
+	if err == nil {
+		return time.Duration(seconds) * time.Second
+	}
+
+	// failed to parse an integer, try datetime RFC1123
+	t, err := time.Parse(time.RFC1123, value)
+	if err == nil {
+		// return 0 if the datetime has already elapsed
+		result := t.Sub(time.Now())
+		if result < 0 {
+			return 0
+		}
+		return result
+	}
+
+	// failed to parse datetime, return default
+	return defaultWait
+}
+
+// ExpontentialBackoffWatiFor accepts a default wait duration and the current retry count
+// and returns a new duration
+func ExponentialBackoffWaitFor(defaultWait time.Duration, retry int) time.Duration {
+	return time.Duration(math.Pow(2, float64(retry))*float64(defaultWait.Milliseconds())) * time.Millisecond
+}
+
+// IsRateLimitedResponse returns true if the status code is a 429 (TooManyRequests)
+func IsRateLimitedResponse(resp *http.Response) bool {
+	return resp.StatusCode == http.StatusTooManyRequests
+}
+
+// IsRateLimitedBody attempts to determine if a response body indicates throttling
+// has occurred. This function is a result of some API providers (AWS) returning
+// a 400 status code instead of 429 for rate limit exceptions.
+func IsRateLimitedBody(resp *http.Response, body []byte) bool {
+	// ignore non-400 status
+	if resp.StatusCode < http.StatusBadRequest || resp.StatusCode >= http.StatusInternalServerError {
+		return false
+	}
+	return strings.Contains(string(body), "ThrottlingException")
+}
+
 // HeaderString writes the request/response http.Header to a string.
 // HeaderString writes the request/response http.Header to a string.
 func HeaderString(h http.Header) string {
 func HeaderString(h http.Header) string {
 	var sb strings.Builder
 	var sb strings.Builder

+ 220 - 25
pkg/util/timeutil/timeutil.go

@@ -1,6 +1,7 @@
 package timeutil
 package timeutil
 
 
 import (
 import (
+	"errors"
 	"fmt"
 	"fmt"
 	"regexp"
 	"regexp"
 	"strconv"
 	"strconv"
@@ -86,35 +87,129 @@ func FormatStoreResolution(dur time.Duration) string {
 	return fmt.Sprint(dur)
 	return fmt.Sprint(dur)
 }
 }
 
 
-// ParseDuration converts a Prometheus-style duration string into a Duration
+// ParseDuration parses a duration string.
+// A duration string is a possibly signed sequence of
+// decimal numbers, each with optional fraction and a unit suffix,
+// such as "300ms", "-1.5h" or "2h45m".
+// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d"
 func ParseDuration(duration string) (time.Duration, error) {
 func ParseDuration(duration string) (time.Duration, error) {
-	// Trim prefix of Prometheus format duration
 	duration = CleanDurationString(duration)
 	duration = CleanDurationString(duration)
-	if len(duration) < 2 {
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
-	unitStr := duration[len(duration)-1:]
-	var unit time.Duration
-	switch unitStr {
-	case "s":
-		unit = time.Second
-	case "m":
-		unit = time.Minute
-	case "h":
-		unit = time.Hour
-	case "d":
-		unit = 24.0 * time.Hour
-	default:
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
-
-	amountStr := duration[:len(duration)-1]
-	amount, err := strconv.ParseInt(amountStr, 10, 64)
-	if err != nil {
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	return goParseDuration(duration)
+}
+
+// unitMap contains a list of units that can be parsed by ParseDuration
+var unitMap = map[string]int64{
+	"ns": int64(time.Nanosecond),
+	"us": int64(time.Microsecond),
+	"µs": int64(time.Microsecond), // U+00B5 = micro symbol
+	"μs": int64(time.Microsecond), // U+03BC = Greek letter mu
+	"ms": int64(time.Millisecond),
+	"s":  int64(time.Second),
+	"m":  int64(time.Minute),
+	"h":  int64(time.Hour),
+	"d":  int64(time.Hour * 24),
+}
+
+// goParseDuration is time.ParseDuration lifted from the go std library and enhanced with the ability to
+// handle the "d" (day) unit. The contents of the function itself are identical to the std library, it is
+// only the unitMap above that contains the added unit.
+func goParseDuration(s string) (time.Duration, error) {
+	// [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+
+	orig := s
+	var d int64
+	neg := false
+
+	// Consume [-+]?
+	if s != "" {
+		c := s[0]
+		if c == '-' || c == '+' {
+			neg = c == '-'
+			s = s[1:]
+		}
+	}
+	// Special case: if all that is left is "0", this is zero.
+	if s == "0" {
+		return 0, nil
+	}
+	if s == "" {
+		return 0, errors.New("time: invalid duration " + quote(orig))
+	}
+	for s != "" {
+		var (
+			v, f  int64       // integers before, after decimal point
+			scale float64 = 1 // value = v + f/scale
+		)
+
+		var err error
+
+		// The next character must be [0-9.]
+		if !(s[0] == '.' || '0' <= s[0] && s[0] <= '9') {
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
+		// Consume [0-9]*
+		pl := len(s)
+		v, s, err = leadingInt(s)
+		if err != nil {
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
+		pre := pl != len(s) // whether we consumed anything before a period
+
+		// Consume (\.[0-9]*)?
+		post := false
+		if s != "" && s[0] == '.' {
+			s = s[1:]
+			pl := len(s)
+			f, scale, s = leadingFraction(s)
+			post = pl != len(s)
+		}
+		if !pre && !post {
+			// no digits (e.g. ".s" or "-.s")
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
+
+		// Consume unit.
+		i := 0
+		for ; i < len(s); i++ {
+			c := s[i]
+			if c == '.' || '0' <= c && c <= '9' {
+				break
+			}
+		}
+		if i == 0 {
+			return 0, errors.New("time: missing unit in duration " + quote(orig))
+		}
+		u := s[:i]
+		s = s[i:]
+		unit, ok := unitMap[u]
+		if !ok {
+			return 0, errors.New("time: unknown unit " + quote(u) + " in duration " + quote(orig))
+		}
+		if v > (1<<63-1)/unit {
+			// overflow
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
+		v *= unit
+		if f > 0 {
+			// float64 is needed to be nanosecond accurate for fractions of hours.
+			// v >= 0 && (f*unit/scale) <= 3.6e+12 (ns/h, h is the largest unit)
+			v += int64(float64(f) * (float64(unit) / scale))
+			if v < 0 {
+				// overflow
+				return 0, errors.New("time: invalid duration " + quote(orig))
+			}
+		}
+		d += v
+		if d < 0 {
+			// overflow
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
 	}
 	}
 
 
-	return time.Duration(amount) * unit, nil
+	if neg {
+		d = -d
+	}
+
+	return time.Duration(d), nil
 }
 }
 
 
 // CleanDurationString removes prometheus formatted prefix "offset " allong with leading a trailing whitespace
 // CleanDurationString removes prometheus formatted prefix "offset " allong with leading a trailing whitespace
@@ -238,3 +333,103 @@ func (jt *JobTicker) TickIn(d time.Duration) {
 		}
 		}
 	}(d)
 	}(d)
 }
 }
+
+// NOTE: The following functions were lifted from the go std library to support the ParseDuration enhancement
+// NOTE: described above.
+
+const (
+	lowerhex  = "0123456789abcdef"
+	runeSelf  = 0x80
+	runeError = '\uFFFD'
+)
+
+// quote is lifted from the go std library to support the custom ParseDuration enhancement
+func quote(s string) string {
+	buf := make([]byte, 1, len(s)+2) // slice will be at least len(s) + quotes
+	buf[0] = '"'
+	for i, c := range s {
+		if c >= runeSelf || c < ' ' {
+			// This means you are asking us to parse a time.Duration or
+			// time.Location with unprintable or non-ASCII characters in it.
+			// We don't expect to hit this case very often. We could try to
+			// reproduce strconv.Quote's behavior with full fidelity but
+			// given how rarely we expect to hit these edge cases, speed and
+			// conciseness are better.
+			var width int
+			if c == runeError {
+				width = 1
+				if i+2 < len(s) && s[i:i+3] == string(runeError) {
+					width = 3
+				}
+			} else {
+				width = len(string(c))
+			}
+			for j := 0; j < width; j++ {
+				buf = append(buf, `\x`...)
+				buf = append(buf, lowerhex[s[i+j]>>4])
+				buf = append(buf, lowerhex[s[i+j]&0xF])
+			}
+		} else {
+			if c == '"' || c == '\\' {
+				buf = append(buf, '\\')
+			}
+			buf = append(buf, string(c)...)
+		}
+	}
+	buf = append(buf, '"')
+	return string(buf)
+}
+
+// leadingFraction consumes the leading [0-9]* from s.
+// It is used only for fractions, so does not return an error on overflow,
+// it just stops accumulating precision.
+func leadingFraction(s string) (x int64, scale float64, rem string) {
+	i := 0
+	scale = 1
+	overflow := false
+	for ; i < len(s); i++ {
+		c := s[i]
+		if c < '0' || c > '9' {
+			break
+		}
+		if overflow {
+			continue
+		}
+		if x > (1<<63-1)/10 {
+			// It's possible for overflow to give a positive number, so take care.
+			overflow = true
+			continue
+		}
+		y := x*10 + int64(c) - '0'
+		if y < 0 {
+			overflow = true
+			continue
+		}
+		x = y
+		scale *= 10
+	}
+	return x, scale, s[i:]
+}
+
+var errLeadingInt = errors.New("time: bad [0-9]*") // never printed
+
+// leadingInt consumes the leading [0-9]* from s.
+func leadingInt(s string) (x int64, rem string, err error) {
+	i := 0
+	for ; i < len(s); i++ {
+		c := s[i]
+		if c < '0' || c > '9' {
+			break
+		}
+		if x > (1<<63-1)/10 {
+			// overflow
+			return 0, "", errLeadingInt
+		}
+		x = x*10 + int64(c) - '0'
+		if x < 0 {
+			// overflow
+			return 0, "", errLeadingInt
+		}
+	}
+	return x, s[i:], nil
+}

+ 12 - 0
pkg/util/timeutil/timeutil_test.go

@@ -262,6 +262,18 @@ func Test_ParseDuration(t *testing.T) {
 			input:    " offset 3d ",
 			input:    " offset 3d ",
 			expected: 24.0 * time.Hour * 3,
 			expected: 24.0 * time.Hour * 3,
 		},
 		},
+		"ms duration": {
+			input:    "100ms",
+			expected: 100 * time.Millisecond,
+		},
+		"complex duration": {
+			input:    "2d3h14m2s",
+			expected: (24 * time.Hour * 2) + (3 * time.Hour) + (14 * time.Minute) + (2 * time.Second),
+		},
+		"negative duration": {
+			input:    "-2d",
+			expected: -48 * time.Hour,
+		},
 		"zero": {
 		"zero": {
 			input:    "0h",
 			input:    "0h",
 			expected: time.Duration(0),
 			expected: time.Duration(0),

+ 3 - 3
ui/package-lock.json

@@ -3073,9 +3073,9 @@
       }
       }
     },
     },
     "follow-redirects": {
     "follow-redirects": {
-      "version": "1.14.7",
-      "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.14.7.tgz",
-      "integrity": "sha512-+hbxoLbFMbRKDwohX8GkTataGqO6Jb7jGwpAlwgy2bIz25XtRm7KEzJM76R1WiNT5SwZkX4Y75SwBolkpmE7iQ=="
+      "version": "1.14.8",
+      "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.14.8.tgz",
+      "integrity": "sha512-1x0S9UVJHsQprFcEC/qnNzBLcIxsjAV905f/UkQxbclCsoTWlacCNOpQa/anodLl2uaEKFhfWOvM2Qg77+15zA=="
     },
     },
     "foreach": {
     "foreach": {
       "version": "2.0.5",
       "version": "2.0.5",