Просмотр исходного кода

Merge pull request #863 from kubecost/develop

Merge develop into master
Ajay Tripathy 4 лет назад
Родитель
Сommit
25bad4e701

+ 1 - 40
pkg/cloud/awsprovider.go

@@ -314,7 +314,7 @@ var regionToBillingRegionCode = map[string]string{
 var loadedAWSSecret bool = false
 var awsSecret *AWSAccessKey = nil
 
-func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
+func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
 	return ""
 }
 
@@ -2359,42 +2359,3 @@ func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
 func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
 }
-
-func (aws *AWS) ParseID(id string) string {
-	// It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
-	rx := regexp.MustCompile("aws://[^/]*/[^/]*/([^/]+)")
-	match := rx.FindStringSubmatch(id)
-	if len(match) < 2 {
-		if id != "" {
-			log.Infof("awsprovider.ParseID: failed to parse %s", id)
-		}
-		return id
-	}
-
-	return match[1]
-}
-
-func (aws *AWS) ParsePVID(id string) string {
-	rx := regexp.MustCompile("aws:/[^/]*/[^/]*/([^/]+)") // Capture "vol-0fc54c5e83b8d2b76" from "aws://us-east-2a/vol-0fc54c5e83b8d2b76"
-	match := rx.FindStringSubmatch(id)
-	if len(match) < 2 {
-		if id != "" {
-			log.Infof("awsprovider.ParseID: failed to parse %s", id)
-		}
-		return id
-	}
-
-	return match[1]
-}
-
-func (aws *AWS) ParseLBID(id string) string {
-	rx := regexp.MustCompile("^([^-]+)-.+$") // Capture "ad9d88195b52a47c89b5055120f28c58" from "ad9d88195b52a47c89b5055120f28c58-1037804914.us-east-2.elb.amazonaws.com"
-	match := rx.FindStringSubmatch(id)
-	if len(match) < 2 {
-		if id != "" {
-			log.Infof("awsprovider.ParseLBID: failed to parse %s, %v", id, match)
-		}
-		return id
-	}
-	return match[1]
-}

+ 26 - 26
pkg/cloud/azureprovider.go

@@ -210,6 +210,7 @@ func (k *azureKey) Features() string {
 	return fmt.Sprintf("%s,%s,%s", region, instance, usageType)
 }
 
+// GPUType returns value of GPULabel if present
 func (k *azureKey) GPUType() string {
 	if t, ok := k.Labels[k.GPULabel]; ok {
 		return t
@@ -217,6 +218,10 @@ func (k *azureKey) GPUType() string {
 	return ""
 }
 
+func (k *azureKey) isValidGPUNode() bool {
+	return k.GPUType() == k.GPULabelValue && k.GetGPUCount() != "0"
+}
+
 func (k *azureKey) ID() string {
 	return ""
 }
@@ -260,9 +265,10 @@ func (k *azureKey) GetGPUCount() string {
 
 // Represents an azure storage config
 type AzureStorageConfig struct {
-	AccountName   string `json:"azureStorageAccount"`
-	AccessKey     string `json:"azureStorageAccessKey"`
-	ContainerName string `json:"azureStorageContainer"`
+	SubscriptionId string `json:"azureSubscriptionID"`
+	AccountName    string `json:"azureStorageAccount"`
+	AccessKey      string `json:"azureStorageAccessKey"`
+	ContainerName  string `json:"azureStorageContainer"`
 }
 
 // Represents an azure app key
@@ -734,24 +740,30 @@ func (az *Azure) AllNodePricing() (interface{}, error) {
 func (az *Azure) NodePricing(key Key) (*Node, error) {
 	az.DownloadPricingDataLock.RLock()
 	defer az.DownloadPricingDataLock.RUnlock()
-	if n, ok := az.Pricing[key.Features()]; ok {
-		klog.V(4).Infof("Returning pricing for node %s: %+v from key %s", key, n, key.Features())
-		if key.GPUType() != "" {
-			n.Node.GPU = key.(*azureKey).GetGPUCount()
+	azKey, ok := key.(*azureKey)
+	if !ok {
+		return nil, fmt.Errorf("azure: NodePricing: key is of type %T", key)
+	}
+
+	if n, ok := az.Pricing[azKey.Features()]; ok {
+		klog.V(4).Infof("Returning pricing for node %s: %+v from key %s", azKey, n, azKey.Features())
+		if azKey.isValidGPUNode() {
+			n.Node.GPU = azKey.GetGPUCount()
 		}
 		return n.Node, nil
 	}
-	klog.V(1).Infof("[Warning] no pricing data found for %s: %s", key.Features(), key)
+	klog.V(1).Infof("[Warning] no pricing data found for %s: %s", azKey.Features(), azKey)
 	c, err := az.GetConfig()
 	if err != nil {
 		return nil, fmt.Errorf("No default pricing data available")
 	}
-	if key.GPUType() != "" {
+	if azKey.isValidGPUNode() {
 		return &Node{
-			VCPUCost: c.CPU,
-			RAMCost:  c.RAM,
-			GPUCost:  c.GPU,
-			GPU:      key.(*azureKey).GetGPUCount(),
+			VCPUCost:         c.CPU,
+			RAMCost:          c.RAM,
+			UsesBaseCPUPrice: true,
+			GPUCost:          c.GPU,
+			GPU:              azKey.GetGPUCount(),
 		}, nil
 	}
 	return &Node{
@@ -1116,7 +1128,7 @@ func (az *Azure) PVPricing(pvk PVKey) (*PV, error) {
 	return pricing.PV, nil
 }
 
-func (az *Azure) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
+func (az *Azure) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
 	return ""
 }
 
@@ -1141,15 +1153,3 @@ func (*Azure) ClusterManagementPricing() (string, float64, error) {
 func (az *Azure) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
 }
-
-func (az *Azure) ParseID(id string) string {
-	return id
-}
-
-func (az *Azure) ParsePVID(id string) string {
-	return id
-}
-
-func (az *Azure) ParseLBID(id string) string {
-	return id
-}

+ 0 - 12
pkg/cloud/csvprovider.go

@@ -366,15 +366,3 @@ func (*CSVProvider) ClusterManagementPricing() (string, float64, error) {
 func (c *CSVProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
 }
-
-func (c *CSVProvider) ParseID(id string) string {
-	return id
-}
-
-func (c *CSVProvider) ParsePVID(id string) string {
-	return id
-}
-
-func (c *CSVProvider) ParseLBID(id string) string {
-	return id
-}

+ 2 - 13
pkg/cloud/customprovider.go

@@ -5,6 +5,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"time"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/env"
@@ -42,7 +43,7 @@ func (*CustomProvider) ClusterManagementPricing() (string, float64, error) {
 	return "", 0.0, nil
 }
 
-func (*CustomProvider) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
+func (*CustomProvider) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
 	return ""
 }
 
@@ -311,15 +312,3 @@ func (cp *CustomProvider) PricingSourceStatus() map[string]*PricingSource {
 func (cp *CustomProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
 }
-
-func (cp *CustomProvider) ParseID(id string) string {
-	return id
-}
-
-func (cp *CustomProvider) ParsePVID(id string) string {
-	return id
-}
-
-func (cp *CustomProvider) ParseLBID(id string) string {
-	return id
-}

+ 5 - 29
pkg/cloud/gcpprovider.go

@@ -3,6 +3,7 @@ package cloud
 import (
 	"context"
 	"fmt"
+	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"io"
 	"io/ioutil"
 	"math"
@@ -124,7 +125,7 @@ func gcpAllocationToOutOfClusterAllocation(gcpAlloc gcpAllocation) *OutOfCluster
 
 // GetLocalStorageQuery returns the cost of local storage for the given window. Setting rate=true
 // returns hourly spend. Setting used=true only tracks used storage, not total.
-func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
+func (gcp *GCP) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
 	// TODO Set to the price for the appropriate storage class. It's not trivial to determine the local storage disk type
 	// See https://cloud.google.com/compute/disks-image-pricing#persistentdisk
 	localStorageCost := 0.04
@@ -134,10 +135,7 @@ func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool, used bool
 		baseMetric = "container_fs_usage_bytes"
 	}
 
-	fmtOffset := ""
-	if offset != "" {
-		fmtOffset = fmt.Sprintf("offset %s", offset)
-	}
+	fmtOffset := timeutil.DurationToPromOffsetString(offset)
 
 	fmtCumulativeQuery := `sum(
 		sum_over_time(%s{device!="tmpfs", id="/"}[%s:1m]%s)
@@ -151,8 +149,9 @@ func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool, used bool
 	if rate {
 		fmtQuery = fmtMonthlyQuery
 	}
+	fmtWindow := timeutil.DurationString(window)
 
-	return fmt.Sprintf(fmtQuery, baseMetric, window, fmtOffset, env.GetPromClusterLabel(), localStorageCost)
+	return fmt.Sprintf(fmtQuery, baseMetric, fmtWindow, fmtOffset, env.GetPromClusterLabel(), localStorageCost)
 }
 
 func (gcp *GCP) GetConfig() (*CustomPricing, error) {
@@ -1472,26 +1471,3 @@ func sustainedUseDiscount(class string, defaultDiscount float64, isPreemptible b
 	}
 	return discount
 }
-
-func (gcp *GCP) ParseID(id string) string {
-	// gce://guestbook-227502/us-central1-a/gke-niko-n1-standard-2-wljla-8df8e58a-hfy7
-	//  => gke-niko-n1-standard-2-wljla-8df8e58a-hfy7
-	rx := regexp.MustCompile("gce://[^/]*/[^/]*/([^/]+)")
-	match := rx.FindStringSubmatch(id)
-	if len(match) < 2 {
-		if id != "" {
-			log.Infof("gcpprovider.ParseID: failed to parse %s", id)
-		}
-		return id
-	}
-
-	return match[1]
-}
-
-func (gcp *GCP) ParsePVID(id string) string {
-	return id
-}
-
-func (gcp *GCP) ParseLBID(id string) string {
-	return id
-}

+ 58 - 5
pkg/cloud/provider.go

@@ -5,7 +5,9 @@ import (
 	"errors"
 	"fmt"
 	"io"
+	"regexp"
 	"strings"
+	"time"
 
 	"k8s.io/klog"
 
@@ -185,7 +187,7 @@ type ServiceAccountStatus struct {
 type ServiceAccountCheck struct {
 	Message        string `json:"message"`
 	Status         bool   `json:"status"`
-	AdditionalInfo string `json:additionalInfo`
+	AdditionalInfo string `json:"additionalInfo"`
 }
 
 type PricingSources struct {
@@ -232,16 +234,13 @@ type Provider interface {
 	UpdateConfigFromConfigMap(map[string]string) (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
-	GetLocalStorageQuery(string, string, bool, bool) string
+	GetLocalStorageQuery(time.Duration, time.Duration, bool, bool) string
 	ExternalAllocations(string, string, []string, string, string, bool) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 	ServiceAccountStatus() *ServiceAccountStatus
 	PricingSourceStatus() map[string]*PricingSource
 	ClusterManagementPricing() (string, float64, error)
 	CombinedDiscountForNode(string, bool, float64, float64) float64
-	ParseID(string) string
-	ParsePVID(string) string
-	ParseLBID(string) string
 }
 
 // ClusterName returns the name defined in cluster info, defaulting to the
@@ -346,6 +345,11 @@ func NewCrossClusterProvider(ctype string, overrideConfigPath string, cache clus
 			Clientset: cache,
 			Config:    NewProviderConfig(overrideConfigPath),
 		}, nil
+	} else if ctype == "azure" {
+		return &Azure{
+			Clientset: cache,
+			Config:    NewProviderConfig(overrideConfigPath),
+		}, nil
 	}
 	return &CustomProvider{
 		Clientset: cache,
@@ -501,3 +505,52 @@ func GetOrCreateClusterMeta(cluster_id, cluster_name string) (string, string, er
 
 	return id, name, nil
 }
+
+// ParseID attempts to parse a ProviderId from a string based on formats from the various providers and
+// returns the string as is if it cannot find a match
+func ParseID(id string) string {
+	// It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
+	rx := regexp.MustCompile("aws://[^/]*/[^/]*/([^/]+)")
+	match := rx.FindStringSubmatch(id)
+	if len(match) >= 2 {
+		return match[1]
+	}
+
+	// gce://guestbook-227502/us-central1-a/gke-niko-n1-standard-2-wljla-8df8e58a-hfy7
+	//  => gke-niko-n1-standard-2-wljla-8df8e58a-hfy7
+	rx = regexp.MustCompile("gce://[^/]*/[^/]*/([^/]+)")
+	match = rx.FindStringSubmatch(id)
+	if len(match) >= 2 {
+		return match[1]
+	}
+
+	// Return id for Azure Provider, CSV Provider and Custom Provider
+	return id
+}
+
+// ParsePVID attempts to parse a PV ProviderId from a string based on formats from the various providers and
+// returns the string as is if it cannot find a match
+func ParsePVID(id string) string {
+	// Capture "vol-0fc54c5e83b8d2b76" from "aws://us-east-2a/vol-0fc54c5e83b8d2b76"
+	rx := regexp.MustCompile("aws:/[^/]*/[^/]*/([^/]+)")
+	match := rx.FindStringSubmatch(id)
+	if len(match) >= 2 {
+		return match[1]
+	}
+
+	// Return id for GCP Provider, Azure Provider, CSV Provider and Custom Provider
+	return id
+}
+
+// ParseLBID attempts to parse a LB ProviderId from a string based on formats from the various providers and
+// returns the string as is if it cannot find a match
+func ParseLBID(id string) string {
+	rx := regexp.MustCompile("^([^-]+)-.+amazonaws\\.com$") // Capture "ad9d88195b52a47c89b5055120f28c58" from "ad9d88195b52a47c89b5055120f28c58-1037804914.us-east-2.elb.amazonaws.com"
+	match := rx.FindStringSubmatch(id)
+	if len(match) >= 2 {
+		return match[1]
+	}
+
+	// Return id for GCP Provider, Azure Provider, CSV Provider and Custom Provider
+	return id
+}

+ 44 - 49
pkg/costmodel/aggregation.go

@@ -2,6 +2,7 @@ package costmodel
 
 import (
 	"fmt"
+	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"math"
 	"net/http"
 	"regexp"
@@ -118,9 +119,9 @@ func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) f
 	coeff := 1.0
 	switch rateStr {
 	case "daily":
-		coeff = util.HoursPerDay
+		coeff = timeutil.HoursPerDay
 	case "monthly":
-		coeff = util.HoursPerMonth
+		coeff = timeutil.HoursPerMonth
 	}
 
 	return coeff / a.TotalHours(resolutionHours)
@@ -195,7 +196,7 @@ func GetTotalContainerCost(costData map[string]*CostData, rate string, cp cloud.
 	return totalContainerCost
 }
 
-func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, windowString, offset string) (map[string]float64, error) {
+func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, window, offset time.Duration) (map[string]float64, error) {
 	coefficients := make(map[string]float64)
 
 	profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
@@ -203,12 +204,12 @@ func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli pro
 
 	var clusterCosts map[string]*ClusterCosts
 	var err error
-
-	key := fmt.Sprintf("%s:%s", windowString, offset)
+	fmtWindow, fmtOffset := timeutil.DurationOffsetStrings(window, offset)
+	key := fmt.Sprintf("%s:%s", fmtWindow, fmtOffset)
 	if data, valid := a.ClusterCostsCache.Get(key); valid {
 		clusterCosts = data.(map[string]*ClusterCosts)
 	} else {
-		clusterCosts, err = a.ComputeClusterCosts(cli, cp, windowString, offset, false)
+		clusterCosts, err = a.ComputeClusterCosts(cli, cp, window, offset, false)
 		if err != nil {
 			return nil, err
 		}
@@ -224,7 +225,7 @@ func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli pro
 		}
 
 		if costs.TotalCumulative == 0 {
-			return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, windowString, offset)
+			return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, fmtWindow, fmtOffset)
 		}
 
 		totalContainerCost := 0.0
@@ -1452,7 +1453,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 	if !disableSharedOverhead {
 		for key, val := range c.SharedCosts {
 			cost, err := strconv.ParseFloat(val, 64)
-			durationCoefficient := window.Hours() / util.HoursPerMonth
+			durationCoefficient := window.Hours() / timeutil.HoursPerMonth
 			if err != nil {
 				return nil, "", fmt.Errorf("unable to parse shared cost %s: %s", val, err)
 			}
@@ -1491,11 +1492,9 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 			}
 		}
 
-		// Convert to Prometheus-compatible strings
-		durStr, offStr := util.DurationOffsetStrings(dur, off)
-
-		idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, durStr, offStr)
+		idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, dur, off)
 		if err != nil {
+			durStr, offStr := timeutil.DurationOffsetStrings(dur, off)
 			log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: duration=%s, offset=%s, err=%s", durStr, offStr, err)
 			return nil, "", err
 		}
@@ -1743,10 +1742,16 @@ func (a *Accesses) warmAggregateCostModelCache() {
 	// for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
 	// if the default parameters change, the old cached defaults with eventually expire. Thus, the
 	// timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
-	warmFunc := func(duration, durationHrs, offset string, cacheEfficiencyData bool) (error, error) {
+	warmFunc := func(duration, offset time.Duration, cacheEfficiencyData bool) (error, error) {
+		if a.ThanosClient != nil {
+			duration = thanos.OffsetDuration()
+			log.Infof("Setting Offset to %s", duration)
+		}
+		fmtDuration, fmtOffset := timeutil.DurationOffsetStrings(duration, offset)
+		durationHrs, err := timeutil.FormatDurationStringDaysToHours(fmtDuration)
 		promClient := a.GetPrometheusClient(true)
 
-		windowStr := fmt.Sprintf("%s offset %s", duration, offset)
+		windowStr := fmt.Sprintf("%s offset %s", fmtDuration, fmtOffset)
 		window, err := kubecost.ParseWindowUTC(windowStr)
 		if err != nil {
 			return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
@@ -1777,17 +1782,15 @@ func (a *Accesses) warmAggregateCostModelCache() {
 
 		aggKey := GenerateAggKey(window, field, subfields, aggOpts)
 		log.Infof("aggregation: cache warming defaults: %s", aggKey)
-		key := fmt.Sprintf("%s:%s", durationHrs, offset)
+		key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
 
 		_, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
 		if aggErr != nil {
 			log.Infof("Error building cache %s: %s", window, aggErr)
 		}
-		if a.ThanosClient != nil {
-			offset = thanos.Offset()
-			log.Infof("Setting offset to %s", offset)
-		}
-		totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, durationHrs, offset, cacheEfficiencyData)
+
+
+		totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, duration, offset, cacheEfficiencyData)
 		if err != nil {
 			log.Infof("Error building cluster costs cache %s", key)
 		}
@@ -1799,9 +1802,9 @@ func (a *Accesses) warmAggregateCostModelCache() {
 		}
 		if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
 			a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
-			log.Infof("caching %s cluster costs for %s", duration, a.GetCacheExpiration(window.Duration()))
+			log.Infof("caching %s cluster costs for %s", fmtDuration, a.GetCacheExpiration(window.Duration()))
 		} else {
-			log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", duration, clusterCostsCacheMinutes)
+			log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", fmtDuration, clusterCostsCacheMinutes)
 		}
 		return aggErr, err
 	}
@@ -1810,18 +1813,16 @@ func (a *Accesses) warmAggregateCostModelCache() {
 	go func(sem *util.Semaphore) {
 		defer errors.HandlePanic()
 
-		duration := "1d"
-		offset := "1m"
-		durHrs := "24h"
-		dur := 24 * time.Hour
+		offset := time.Minute
+		duration := 24 * time.Hour
 
 		for {
 			sem.Acquire()
-			warmFunc(duration, durHrs, offset, true)
+			warmFunc(duration, offset, true)
 			sem.Return()
 
-			log.Infof("aggregation: warm cache: %s", duration)
-			time.Sleep(a.GetCacheRefresh(dur))
+			log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
+			time.Sleep(a.GetCacheRefresh(duration))
 		}
 	}(sem)
 
@@ -1830,18 +1831,16 @@ func (a *Accesses) warmAggregateCostModelCache() {
 		go func(sem *util.Semaphore) {
 			defer errors.HandlePanic()
 
-			duration := "2d"
-			offset := "1m"
-			durHrs := "48h"
-			dur := 2 * 24 * time.Hour
+			offset := time.Minute
+			duration := 2 * 24 * time.Hour
 
 			for {
 				sem.Acquire()
-				warmFunc(duration, durHrs, offset, false)
+				warmFunc(duration, offset, false)
 				sem.Return()
 
-				log.Infof("aggregation: warm cache: %s", duration)
-				time.Sleep(a.GetCacheRefresh(dur))
+				log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
+				time.Sleep(a.GetCacheRefresh(duration))
 			}
 		}(sem)
 
@@ -1849,19 +1848,17 @@ func (a *Accesses) warmAggregateCostModelCache() {
 		go func(sem *util.Semaphore) {
 			defer errors.HandlePanic()
 
-			duration := "7d"
-			offset := "1m"
-			durHrs := "168h"
-			dur := 7 * 24 * time.Hour
+			offset := time.Minute
+			duration := 7 * 24 * time.Hour
 
 			for {
 				sem.Acquire()
-				aggErr, err := warmFunc(duration, durHrs, offset, false)
+				aggErr, err := warmFunc(duration, offset, false)
 				sem.Return()
 
-				log.Infof("aggregation: warm cache: %s", duration)
+				log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
 				if aggErr == nil && err == nil {
-					time.Sleep(a.GetCacheRefresh(dur))
+					time.Sleep(a.GetCacheRefresh(duration))
 				} else {
 					time.Sleep(5 * time.Minute)
 				}
@@ -1873,16 +1870,14 @@ func (a *Accesses) warmAggregateCostModelCache() {
 			defer errors.HandlePanic()
 
 			for {
-				duration := "30d"
-				offset := "1m"
-				durHrs := "720h"
-				dur := 30 * 24 * time.Hour
+				offset := time.Minute
+				duration := 30 * 24 * time.Hour
 
 				sem.Acquire()
-				aggErr, err := warmFunc(duration, durHrs, offset, false)
+				aggErr, err := warmFunc(duration, offset, false)
 				sem.Return()
 				if aggErr == nil && err == nil {
-					time.Sleep(a.GetCacheRefresh(dur))
+					time.Sleep(a.GetCacheRefresh(duration))
 				} else {
 					time.Sleep(5 * time.Minute)
 				}

+ 58 - 15
pkg/costmodel/allocation.go

@@ -2,6 +2,7 @@ package costmodel
 
 import (
 	"fmt"
+	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"math"
 	"strconv"
 	"strings"
@@ -12,7 +13,6 @@ import (
 	"github.com/kubecost/cost-model/pkg/kubecost"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
-	"github.com/kubecost/cost-model/pkg/util"
 	"k8s.io/apimachinery/pkg/labels"
 )
 
@@ -51,6 +51,8 @@ const (
 	queryFmtNetRegionCostPerGiB   = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (%s)`
 	queryFmtNetInternetGiB        = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
 	queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (%s)`
+	queryFmtNetReceiveBytes       = `sum(increase(container_network_receive_bytes_total{pod_name!=""}[%s]%s)) by (pod_name, namespace, %s)`
+	queryFmtNetTransferBytes      = `sum(increase(container_network_transmit_bytes_total{pod_name!=""}[%s]%s)) by (pod_name, namespace, %s)`
 	queryFmtNamespaceLabels       = `avg_over_time(kube_namespace_labels[%s]%s)`
 	queryFmtNamespaceAnnotations  = `avg_over_time(kube_namespace_annotations[%s]%s)`
 	queryFmtPodLabels             = `avg_over_time(kube_pod_labels[%s]%s)`
@@ -122,7 +124,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 	}
 
 	// Convert resolution duration to a query-ready string
-	resStr := util.DurationString(resolution)
+	resStr := timeutil.DurationString(resolution)
 
 	ctx := prom.NewContext(cm.PrometheusClient)
 
@@ -180,6 +182,12 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 	queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr, env.GetPromClusterLabel())
 	resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
 
+	queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, offStr, env.GetPromClusterLabel())
+	resChNetTransferBytes := ctx.Query(queryNetTransferBytes)
+
+	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, offStr, env.GetPromClusterLabel())
+	resChNetReceiveBytes := ctx.Query(queryNetReceiveBytes)
+
 	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr, env.GetPromClusterLabel())
 	resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
 
@@ -253,6 +261,8 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 	resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
 	resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
 
+	resNetTransferBytes, _ := resChNetTransferBytes.Await()
+	resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
 	resNetZoneGiB, _ := resChNetZoneGiB.Await()
 	resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
 	resNetRegionGiB, _ := resChNetRegionGiB.Await()
@@ -292,6 +302,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 	applyRAMBytesUsedAvg(podMap, resRAMUsageAvg)
 	applyRAMBytesUsedMax(podMap, resRAMUsageMax)
 	applyGPUsRequested(podMap, resGPUsRequested)
+	applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes)
 	applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
 	applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
 	applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
@@ -322,9 +333,9 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 	// for converting resource allocation data to cumulative costs.
 	nodeMap := map[nodeKey]*NodePricing{}
 
-	applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr, cm.Provider.ParseID)
-	applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr, cm.Provider.ParseID)
-	applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr, cm.Provider.ParseID)
+	applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
+	applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
+	applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
 	applyNodeSpot(nodeMap, resNodeIsSpot)
 	applyNodeDiscount(nodeMap, cm)
 
@@ -430,7 +441,7 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
 	start, end := *window.Start(), *window.End()
 
 	// Convert resolution duration to a query-ready string
-	resStr := util.DurationString(resolution)
+	resStr := timeutil.DurationString(resolution)
 
 	ctx := prom.NewContext(cm.PrometheusClient)
 
@@ -906,6 +917,41 @@ func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryRe
 	}
 }
 
+func applyNetworkTotals(podMap map[podKey]*Pod, resNetworkTransferBytes []*prom.QueryResult, resNetworkReceiveBytes []*prom.QueryResult) {
+	for _, res := range resNetworkTransferBytes {
+		podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
+		if err != nil {
+			log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
+			continue
+		}
+
+		pod, ok := podMap[podKey]
+		if !ok {
+			continue
+		}
+
+		for _, alloc := range pod.Allocations {
+			alloc.NetworkTransferBytes = res.Values[0].Value / float64(len(pod.Allocations))
+		}
+	}
+	for _, res := range resNetworkReceiveBytes {
+		podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
+		if err != nil {
+			log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
+			continue
+		}
+
+		pod, ok := podMap[podKey]
+		if !ok {
+			continue
+		}
+
+		for _, alloc := range pod.Allocations {
+			alloc.NetworkReceiveBytes = res.Values[0].Value / float64(len(pod.Allocations))
+		}
+	}
+}
+
 func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
 	costPerGiBByCluster := map[string]float64{}
 
@@ -1306,8 +1352,7 @@ func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]
 	}
 }
 
-func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult,
-	providerIDParser func(string) string) {
+func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
 	for _, res := range resNodeCostPerCPUHr {
 		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
@@ -1337,7 +1382,7 @@ func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr
 			nodeMap[key] = &NodePricing{
 				Name:       node,
 				NodeType:   instanceType,
-				ProviderID: providerIDParser(providerID),
+				ProviderID: cloud.ParseID(providerID),
 			}
 		}
 
@@ -1345,8 +1390,7 @@ func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr
 	}
 }
 
-func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult,
-	providerIDParser func(string) string) {
+func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
 	for _, res := range resNodeCostPerRAMGiBHr {
 		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
@@ -1376,7 +1420,7 @@ func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRA
 			nodeMap[key] = &NodePricing{
 				Name:       node,
 				NodeType:   instanceType,
-				ProviderID: providerIDParser(providerID),
+				ProviderID: cloud.ParseID(providerID),
 			}
 		}
 
@@ -1384,8 +1428,7 @@ func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRA
 	}
 }
 
-func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult,
-	providerIDParser func(string) string) {
+func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
 	for _, res := range resNodeCostPerGPUHr {
 		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
@@ -1415,7 +1458,7 @@ func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr
 			nodeMap[key] = &NodePricing{
 				Name:       node,
 				NodeType:   instanceType,
-				ProviderID: providerIDParser(providerID),
+				ProviderID: cloud.ParseID(providerID),
 			}
 		}
 

+ 39 - 48
pkg/costmodel/cluster.go

@@ -2,13 +2,13 @@ package costmodel
 
 import (
 	"fmt"
+	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
-	"github.com/kubecost/cost-model/pkg/util"
 
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
@@ -70,15 +70,12 @@ type ClusterCostsBreakdown struct {
 
 // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
 // the associated monthly rate data, and returns the Costs.
-func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
-	start, end, err := util.ParseTimeRange(window, offset)
-	if err != nil {
-		return nil, err
-	}
+func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
+	start, end := timeutil.ParseTimeRange(window, offset)
 
 	// If the number of hours is not given (i.e. is zero) compute one from the window and offset
 	if dataHours == 0 {
-		dataHours = end.Sub(*start).Hours()
+		dataHours = end.Sub(start).Hours()
 	}
 
 	// Do not allow zero-length windows to prevent divide-by-zero issues
@@ -87,17 +84,17 @@ func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offse
 	}
 
 	cc := &ClusterCosts{
-		Start:             start,
-		End:               end,
+		Start:             &start,
+		End:               &end,
 		CPUCumulative:     cpu,
 		GPUCumulative:     gpu,
 		RAMCumulative:     ram,
 		StorageCumulative: storage,
 		TotalCumulative:   cpu + gpu + ram + storage,
-		CPUMonthly:        cpu / dataHours * (util.HoursPerMonth),
-		GPUMonthly:        gpu / dataHours * (util.HoursPerMonth),
-		RAMMonthly:        ram / dataHours * (util.HoursPerMonth),
-		StorageMonthly:    storage / dataHours * (util.HoursPerMonth),
+		CPUMonthly:        cpu / dataHours * (timeutil.HoursPerMonth),
+		GPUMonthly:        gpu / dataHours * (timeutil.HoursPerMonth),
+		RAMMonthly:        ram / dataHours * (timeutil.HoursPerMonth),
+		StorageMonthly:    storage / dataHours * (timeutil.HoursPerMonth),
 	}
 	cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
 
@@ -195,7 +192,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		diskMap[key].Cost += cost
 		providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
 		if providerID != "" {
-			diskMap[key].ProviderID = provider.ParsePVID(providerID)
+			diskMap[key].ProviderID = cloud.ParsePVID(providerID)
 		}
 	}
 
@@ -522,18 +519,18 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		return nil, requiredCtx.ErrorCollection()
 	}
 
-	activeDataMap := buildActiveDataMap(resActiveMins, resolution, cp.ParseID)
+	activeDataMap := buildActiveDataMap(resActiveMins, resolution)
 
-	gpuCountMap := buildGPUCountMap(resNodeGPUCount, cp.ParseID)
+	gpuCountMap := buildGPUCountMap(resNodeGPUCount)
 
-	cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp.ParseID)
-	ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp.ParseID)
-	gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp.ParseID)
+	cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost)
+	ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost)
+	gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap)
 
 	clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
 	clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
 
-	cpuCoresMap := buildCPUCoresMap(resNodeCPUCores, clusterAndNameToType)
+	cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
 
 	ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
 
@@ -541,7 +538,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
 
 	cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
-	preemptibleMap := buildPreemptibleMap(resIsSpot, cp.ParseID)
+	preemptibleMap := buildPreemptibleMap(resIsSpot)
 	labelsMap := buildLabelsMap(resLabels)
 
 	costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
@@ -595,7 +592,7 @@ type LoadBalancer struct {
 	Minutes    float64
 }
 
-func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
+func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
 	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
 	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
 	if offset < time.Minute {
@@ -655,7 +652,7 @@ func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration,
 			loadBalancerMap[key] = &LoadBalancer{
 				Cluster:    cluster,
 				Name:       namespace + "/" + serviceName,
-				ProviderID: cp.ParseLBID(providerID),
+				ProviderID: cloud.ParseLBID(providerID),
 			}
 		}
 		// Fill in Provider ID if it is available and missing in the loadBalancerMap
@@ -702,13 +699,11 @@ func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration,
 }
 
 // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
-func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
+func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
 	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
-	start, end, err := util.ParseTimeRange(window, offset)
-	if err != nil {
-		return nil, err
-	}
-	mins := end.Sub(*start).Minutes()
+	start, end := timeutil.ParseTimeRange(window, offset)
+
+	mins := end.Sub(start).Minutes()
 
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
@@ -777,10 +772,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 		queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
 	}
 
-	fmtOffset := ""
-	if offset != "" {
-		fmtOffset = fmt.Sprintf("offset %s", offset)
-	}
+	fmtOffset := timeutil.DurationToPromOffsetString(offset)
 
 	queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, minsPerResolution)
 	queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
@@ -1003,7 +995,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 			dataMins = mins
 			klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
 		}
-		costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/util.MinsPerHour)
+		costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
 		if err != nil {
 			klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
 			return nil, err
@@ -1054,8 +1046,8 @@ func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
 }
 
 // ClusterCostsOverTime gives the full cluster costs over time
-func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
-	localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true, false)
+func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
+	localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
 	if localStorageQuery != "" {
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}
@@ -1064,28 +1056,27 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 
 	start, err := time.Parse(layout, startString)
 	if err != nil {
-		klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
+		klog.V(1).Infof("Error parsing time %s. Error: %s", startString, err.Error())
 		return nil, err
 	}
 	end, err := time.Parse(layout, endString)
 	if err != nil {
-		klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
+		klog.V(1).Infof("Error parsing time %s. Error: %s", endString, err.Error())
 		return nil, err
 	}
-	window, err := time.ParseDuration(windowString)
-	if err != nil {
-		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
+	fmtWindow := timeutil.DurationString(window)
+
+	if fmtWindow == "" {
+		err := fmt.Errorf("window value invalid or missing")
+		klog.V(1).Infof("Error parsing time %v. Error: %s", window, err.Error())
 		return nil, err
 	}
 
-	// turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
-	if offset != "" {
-		offset = fmt.Sprintf("offset %s", offset)
-	}
+	fmtOffset := timeutil.DurationToPromOffsetString(offset)
 
-	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, env.GetPromClusterLabel(), windowString, offset, env.GetPromClusterLabel(), windowString, offset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
-	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, env.GetPromClusterLabel(), windowString, offset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
-	qStorage := fmt.Sprintf(queryStorage, windowString, offset, env.GetPromClusterLabel(), windowString, offset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
+	qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
 
 	ctx := prom.NewContext(cli)

+ 9 - 14
pkg/costmodel/cluster_helpers.go

@@ -1,6 +1,7 @@
 package costmodel
 
 import (
+	"github.com/kubecost/cost-model/pkg/cloud"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/env"
@@ -27,7 +28,6 @@ func mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2 map[nodeIdentifi
 
 func buildCPUCostMap(
 	resNodeCPUCost []*prom.QueryResult,
-	providerIDParser func(string) string,
 ) (
 	map[NodeIdentifier]float64,
 	map[nodeIdentifierNoProviderID]string,
@@ -56,7 +56,7 @@ func buildCPUCostMap(
 		key := NodeIdentifier{
 			Cluster:    cluster,
 			Name:       name,
-			ProviderID: providerIDParser(providerID),
+			ProviderID: cloud.ParseID(providerID),
 		}
 		keyNon := nodeIdentifierNoProviderID{
 			Cluster: cluster,
@@ -73,7 +73,6 @@ func buildCPUCostMap(
 
 func buildRAMCostMap(
 	resNodeRAMCost []*prom.QueryResult,
-	providerIDParser func(string) string,
 ) (
 	map[NodeIdentifier]float64,
 	map[nodeIdentifierNoProviderID]string,
@@ -102,7 +101,7 @@ func buildRAMCostMap(
 		key := NodeIdentifier{
 			Cluster:    cluster,
 			Name:       name,
-			ProviderID: providerIDParser(providerID),
+			ProviderID: cloud.ParseID(providerID),
 		}
 		keyNon := nodeIdentifierNoProviderID{
 			Cluster: cluster,
@@ -119,7 +118,6 @@ func buildRAMCostMap(
 func buildGPUCostMap(
 	resNodeGPUCost []*prom.QueryResult,
 	gpuCountMap map[NodeIdentifier]float64,
-	providerIDParser func(string) string,
 ) (
 	map[NodeIdentifier]float64,
 	map[nodeIdentifierNoProviderID]string,
@@ -148,7 +146,7 @@ func buildGPUCostMap(
 		key := NodeIdentifier{
 			Cluster:    cluster,
 			Name:       name,
-			ProviderID: providerIDParser(providerID),
+			ProviderID: cloud.ParseID(providerID),
 		}
 		keyNon := nodeIdentifierNoProviderID{
 			Cluster: cluster,
@@ -171,7 +169,6 @@ func buildGPUCostMap(
 
 func buildGPUCountMap(
 	resNodeGPUCount []*prom.QueryResult,
-	providerIDParser func(string) string,
 ) map[NodeIdentifier]float64 {
 
 	gpuCountMap := make(map[NodeIdentifier]float64)
@@ -194,7 +191,7 @@ func buildGPUCountMap(
 		key := NodeIdentifier{
 			Cluster:    cluster,
 			Name:       name,
-			ProviderID: providerIDParser(providerID),
+			ProviderID: cloud.ParseID(providerID),
 		}
 		gpuCountMap[key] = gpuCount
 	}
@@ -204,7 +201,6 @@ func buildGPUCountMap(
 
 func buildCPUCoresMap(
 	resNodeCPUCores []*prom.QueryResult,
-	clusterAndNameToType map[nodeIdentifierNoProviderID]string,
 ) map[nodeIdentifierNoProviderID]float64 {
 
 	m := make(map[nodeIdentifierNoProviderID]float64)
@@ -403,7 +399,7 @@ type activeData struct {
 	minutes float64
 }
 
-func buildActiveDataMap(resActiveMins []*prom.QueryResult, resolution time.Duration, providerIDParser func(string) string) map[NodeIdentifier]activeData {
+func buildActiveDataMap(resActiveMins []*prom.QueryResult, resolution time.Duration) map[NodeIdentifier]activeData {
 
 	m := make(map[NodeIdentifier]activeData)
 
@@ -424,7 +420,7 @@ func buildActiveDataMap(resActiveMins []*prom.QueryResult, resolution time.Durat
 		key := NodeIdentifier{
 			Cluster:    cluster,
 			Name:       name,
-			ProviderID: providerIDParser(providerID),
+			ProviderID: cloud.ParseID(providerID),
 		}
 
 		if len(result.Values) == 0 {
@@ -450,7 +446,6 @@ func buildActiveDataMap(resActiveMins []*prom.QueryResult, resolution time.Durat
 // node id -> is preemptible?
 func buildPreemptibleMap(
 	resIsSpot []*prom.QueryResult,
-	providerIDParser func(string) string,
 ) map[NodeIdentifier]bool {
 
 	m := make(map[NodeIdentifier]bool)
@@ -474,7 +469,7 @@ func buildPreemptibleMap(
 		key := NodeIdentifier{
 			Cluster:    cluster,
 			Name:       nodeName,
-			ProviderID: providerIDParser(providerID),
+			ProviderID: cloud.ParseID(providerID),
 		}
 
 		// TODO(michaelmdresser): check this condition at merge time?
@@ -503,7 +498,7 @@ func buildLabelsMap(
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
-		node, err := result.GetString("kubernetes_node")
+		node, err := result.GetString("node")
 		if err != nil {
 			log.DedupedWarningf(5, "ClusterNodes: label data missing node")
 			continue

+ 1 - 2
pkg/costmodel/cluster_helpers_test.go

@@ -699,7 +699,6 @@ func TestBuildNodeMap(t *testing.T) {
 }
 
 func TestBuildGPUCostMap(t *testing.T) {
-	providerIDParser := func(s string) string { return s }
 	cases := []struct {
 		name       string
 		promResult []*prom.QueryResult
@@ -850,7 +849,7 @@ func TestBuildGPUCostMap(t *testing.T) {
 
 	for _, testCase := range cases {
 		t.Run(testCase.name, func(t *testing.T) {
-			result, _ := buildGPUCostMap(testCase.promResult, testCase.countMap, providerIDParser)
+			result, _ := buildGPUCostMap(testCase.promResult, testCase.countMap)
 			if !reflect.DeepEqual(result, testCase.expected) {
 				t.Errorf("buildGPUCostMap case %s failed. Got %+v but expected %+v", testCase.name, result, testCase.expected)
 			}

+ 24 - 20
pkg/costmodel/costmodel.go

@@ -233,7 +233,6 @@ const (
 func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset, env.GetPromClusterLabel())
 	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, window, offset, env.GetPromClusterLabel())
-	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, window, offset, window, offset, 1.0, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), window, offset, env.GetPromClusterLabel())
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel())
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, window, "", env.GetPromClusterLabel())
 	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, window, "", env.GetPromClusterLabel())
@@ -247,7 +246,6 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 	ctx := prom.NewContext(cli)
 	resChRAMUsage := ctx.Query(queryRAMUsage)
 	resChCPUUsage := ctx.Query(queryCPUUsage)
-	resChGPURequests := ctx.Query(queryGPURequests)
 	resChPVRequests := ctx.Query(queryPVRequests)
 	resChNetZoneRequests := ctx.Query(queryNetZoneRequests)
 	resChNetRegionRequests := ctx.Query(queryNetRegionRequests)
@@ -280,7 +278,6 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 	// Process Prometheus query results. Handle errors using ctx.Errors.
 	resRAMUsage, _ := resChRAMUsage.Await()
 	resCPUUsage, _ := resChCPUUsage.Await()
-	resGPURequests, _ := resChGPURequests.Await()
 	resPVRequests, _ := resChPVRequests.Await()
 	resNetZoneRequests, _ := resChNetZoneRequests.Await()
 	resNetRegionRequests, _ := resChNetRegionRequests.Await()
@@ -353,13 +350,6 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 	for key := range RAMUsedMap {
 		containers[key] = true
 	}
-	GPUReqMap, err := GetContainerMetricVector(resGPURequests, true, normalizationValue, clusterID)
-	if err != nil {
-		return nil, err
-	}
-	for key := range GPUReqMap {
-		containers[key] = true
-	}
 	CPUUsedMap, err := GetContainerMetricVector(resCPUUsage, false, 0, clusterID) // No need to normalize here, as this comes from a counter
 	if err != nil {
 		return nil, err
@@ -504,17 +494,29 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 					},
 				}
 
+				gpuReqCount := 0.0
+				if g, ok := container.Resources.Requests["nvidia.com/gpu"]; ok {
+					gpuReqCount = float64(g.Value())
+				} else if g, ok := container.Resources.Limits["nvidia.com/gpu"]; ok {
+					gpuReqCount = float64(g.Value())
+				} else if g, ok := container.Resources.Requests["k8s.amazonaws.com/vgpu"]; ok {
+					gpuReqCount = float64(g.Value())
+				} else if g, ok := container.Resources.Limits["k8s.amazonaws.com/vgpu"]; ok {
+					gpuReqCount = float64(g.Value())
+				}
+				GPUReqV := []*util.Vector{
+					{
+						Value:     float64(gpuReqCount),
+						Timestamp: float64(time.Now().UTC().Unix()),
+					},
+				}
+
 				RAMUsedV, ok := RAMUsedMap[newKey]
 				if !ok {
 					klog.V(4).Info("no RAM usage for " + newKey)
 					RAMUsedV = []*util.Vector{{}}
 				}
 
-				GPUReqV, ok := GPUReqMap[newKey]
-				if !ok {
-					klog.V(4).Info("no GPU requests for " + newKey)
-					GPUReqV = []*util.Vector{{}}
-				}
 				CPUUsedV, ok := CPUUsedMap[newKey]
 				if !ok {
 					klog.V(4).Info("no CPU usage for " + newKey)
@@ -576,6 +578,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 			// with very short-lived pods that over-request resources.
 			RAMReqV := []*util.Vector{{}}
 			CPUReqV := []*util.Vector{{}}
+			GPUReqV := []*util.Vector{{}}
 
 			RAMUsedV, ok := RAMUsedMap[key]
 			if !ok {
@@ -583,11 +586,6 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyze
 				RAMUsedV = []*util.Vector{{}}
 			}
 
-			GPUReqV, ok := GPUReqMap[key]
-			if !ok {
-				klog.V(4).Info("no GPU requests for " + key)
-				GPUReqV = []*util.Vector{{}}
-			}
 			CPUUsedV, ok := CPUUsedMap[key]
 			if !ok {
 				klog.V(4).Info("no CPU usage for " + key)
@@ -1010,6 +1008,12 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 				newCnode.GPU = fmt.Sprintf("%d", q.Value())
 				gpuc = float64(gpuCount)
 			}
+		} else if g, ok := n.Status.Capacity["k8s.amazonaws.com/vgpu"]; ok {
+			gpuCount := g.Value()
+			if gpuCount != 0 {
+				newCnode.GPU = fmt.Sprintf("%d", q.Value())
+				gpuc = float64(gpuCount)
+			}
 		} else {
 			gpuc, err = strconv.ParseFloat(newCnode.GPU, 64)
 			if err != nil {

+ 52 - 67
pkg/costmodel/router.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"flag"
 	"fmt"
+	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"net/http"
 	"reflect"
 	"strconv"
@@ -123,16 +124,18 @@ func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
 func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 
+	duration := 24 * time.Hour
+	offset := time.Minute
 	durationHrs := "24h"
-	offset := "1m"
+	fmtOffset := "1m"
 	pClient := a.GetPrometheusClient(true)
 
-	key := fmt.Sprintf("%s:%s", durationHrs, offset)
+	key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
 	if data, valid := a.ClusterCostsCache.Get(key); valid {
 		clusterCosts := data.(map[string]*ClusterCosts)
 		w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
 	} else {
-		data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, durationHrs, offset, true)
+		data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
 		w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
 	}
 }
@@ -224,7 +227,7 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
-// parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
+// ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
 // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
 // return 0.0.
 func ParsePercentString(percentStr string) (float64, error) {
@@ -243,66 +246,6 @@ func ParsePercentString(percentStr string) (float64, error) {
 	return discount, nil
 }
 
-// parseDuration converts a Prometheus-style duration string into a Duration
-// TODO:CLEANUP delete this. do it now.
-func ParseDuration(duration string) (*time.Duration, error) {
-	unitStr := duration[len(duration)-1:]
-	var unit time.Duration
-	switch unitStr {
-	case "s":
-		unit = time.Second
-	case "m":
-		unit = time.Minute
-	case "h":
-		unit = time.Hour
-	case "d":
-		unit = 24.0 * time.Hour
-	default:
-		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
-
-	amountStr := duration[:len(duration)-1]
-	amount, err := strconv.ParseInt(amountStr, 10, 64)
-	if err != nil {
-		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
-
-	dur := time.Duration(amount) * unit
-	return &dur, nil
-}
-
-// ParseTimeRange returns a start and end time, respectively, which are converted from
-// a duration and offset, defined as strings with Prometheus-style syntax.
-func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
-	// endTime defaults to the current time, unless an offset is explicity declared,
-	// in which case it shifts endTime back by given duration
-	endTime := time.Now()
-	if offset != "" {
-		o, err := ParseDuration(offset)
-		if err != nil {
-			return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
-		}
-		endTime = endTime.Add(-1 * *o)
-	}
-
-	// if duration is defined in terms of days, convert to hours
-	// e.g. convert "2d" to "48h"
-	durationNorm, err := normalizeTimeParam(duration)
-	if err != nil {
-		return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
-	}
-
-	// convert time duration into start and end times, formatted
-	// as ISO datetime strings
-	dur, err := time.ParseDuration(durationNorm)
-	if err != nil {
-		return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
-	}
-	startTime := endTime.Add(-1 * dur)
-
-	return &startTime, &endTime, nil
-}
-
 func WrapData(data interface{}, err error) []byte {
 	var resp []byte
 
@@ -438,6 +381,27 @@ func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httpr
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 
+	if window == "" {
+		w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
+		return
+	}
+	windowDur, err := timeutil.ParseDuration(window)
+	if err != nil {
+		w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
+		return
+	}
+
+	// offset is not a required parameter
+	var offsetDur time.Duration
+	if offset != "" {
+		offsetDur, err = timeutil.ParseDuration(offset)
+		if err != nil {
+			w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
+			return
+		}
+	}
+
+
 	useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
 
 	if useThanos && !thanos.IsEnabled() {
@@ -448,12 +412,13 @@ func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httpr
 	var client prometheusClient.Client
 	if useThanos {
 		client = a.ThanosClient
-		offset = thanos.Offset()
+		offsetDur = thanos.OffsetDuration()
+
 	} else {
 		client = a.PrometheusClient
 	}
 
-	data, err := a.ComputeClusterCosts(client, a.CloudProvider, window, offset, true)
+	data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
 	w.Write(WrapData(data, err))
 }
 
@@ -466,7 +431,27 @@ func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request,
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 
-	data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, window, offset)
+	if window == "" {
+		w.Write(WrapData(nil, fmt.Errorf("missing window arguement")))
+		return
+	}
+	windowDur, err := timeutil.ParseDuration(window)
+	if err != nil {
+		w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
+		return
+	}
+
+	// offset is not a required parameter
+	var offsetDur time.Duration
+	if offset != "" {
+		offsetDur, err = timeutil.ParseDuration(offset)
+		if err != nil {
+			w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
+			return
+		}
+	}
+
+	data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
 	w.Write(WrapData(data, err))
 }
 

+ 27 - 2
pkg/kubecost/allocation.go

@@ -63,6 +63,8 @@ type Allocation struct {
 	GPUHours                   float64               `json:"gpuHours"`
 	GPUCost                    float64               `json:"gpuCost"`
 	GPUCostAdjustment          float64               `json:"gpuCostAdjustment"`
+	NetworkTransferBytes       float64               `json:"networkTransferBytes"`
+	NetworkReceiveBytes        float64               `json:"networkReceiveBytes"`
 	NetworkCost                float64               `json:"networkCost"`
 	NetworkCostAdjustment      float64               `json:"networkCostAdjustment"`
 	LoadBalancerCost           float64               `json:"loadBalancerCost"`
@@ -205,6 +207,8 @@ func (a *Allocation) Clone() *Allocation {
 		GPUHours:                   a.GPUHours,
 		GPUCost:                    a.GPUCost,
 		GPUCostAdjustment:          a.GPUCostAdjustment,
+		NetworkTransferBytes:       a.NetworkTransferBytes,
+		NetworkReceiveBytes:        a.NetworkReceiveBytes,
 		NetworkCost:                a.NetworkCost,
 		NetworkCostAdjustment:      a.NetworkCostAdjustment,
 		LoadBalancerCost:           a.LoadBalancerCost,
@@ -276,6 +280,12 @@ func (a *Allocation) Equal(that *Allocation) bool {
 	if !util.IsApproximately(a.GPUCostAdjustment, that.GPUCostAdjustment) {
 		return false
 	}
+	if !util.IsApproximately(a.NetworkTransferBytes, that.NetworkTransferBytes) {
+		return false
+	}
+	if !util.IsApproximately(a.NetworkReceiveBytes, that.NetworkReceiveBytes) {
+		return false
+	}
 	if !util.IsApproximately(a.NetworkCost, that.NetworkCost) {
 		return false
 	}
@@ -500,6 +510,8 @@ func (a *Allocation) MarshalJSON() ([]byte, error) {
 	jsonEncodeFloat64(buffer, "gpuHours", a.GPUHours, ",")
 	jsonEncodeFloat64(buffer, "gpuCost", a.GPUCost, ",")
 	jsonEncodeFloat64(buffer, "gpuCostAdjustment", a.GPUCostAdjustment, ",")
+	jsonEncodeFloat64(buffer, "networkTransferBytes", a.NetworkTransferBytes, ",")
+	jsonEncodeFloat64(buffer, "networkReceiveBytes", a.NetworkReceiveBytes, ",")
 	jsonEncodeFloat64(buffer, "networkCost", a.NetworkCost, ",")
 	jsonEncodeFloat64(buffer, "networkCostAdjustment", a.NetworkCostAdjustment, ",")
 	jsonEncodeFloat64(buffer, "loadBalancerCost", a.LoadBalancerCost, ",")
@@ -551,6 +563,11 @@ func (a *Allocation) IsUnallocated() bool {
 	return strings.Contains(a.Name, UnallocatedSuffix)
 }
 
+// IsUnmounted is true if the given Allocation represents unmounted volume costs.
+func (a *Allocation) IsUnmounted() bool {
+	return strings.Contains(a.Name, UnmountedSuffix)
+}
+
 // Minutes returns the number of minutes the Allocation represents, as defined
 // by the difference between the end and start times.
 func (a *Allocation) Minutes() float64 {
@@ -632,6 +649,8 @@ func (a *Allocation) add(that *Allocation) {
 	a.CPUCoreHours += that.CPUCoreHours
 	a.GPUHours += that.GPUHours
 	a.RAMByteHours += that.RAMByteHours
+	a.NetworkTransferBytes += that.NetworkTransferBytes
+	a.NetworkReceiveBytes += that.NetworkReceiveBytes
 
 	// Sum all cumulative cost fields
 	a.CPUCost += that.CPUCost
@@ -934,7 +953,7 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 	for _, alloc := range as.allocations {
 		idleId, err := alloc.getIdleId(options)
 		if err != nil {
-			log.DedupedWarningf(3,"AllocationSet.AggregateBy: missing idleId for allocation: %s", alloc.Name)
+			log.DedupedWarningf(3, "AllocationSet.AggregateBy: missing idleId for allocation: %s", alloc.Name)
 		}
 
 		skip := false
@@ -1110,7 +1129,9 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 		for _, alloc := range aggSet.allocations {
 			for _, sharedAlloc := range shareSet.allocations {
 				if _, ok := shareCoefficients[alloc.Name]; !ok {
-					log.Warningf("AllocationSet.AggregateBy: error getting share coefficienct for '%s'", alloc.Name)
+					if !alloc.IsIdle() {
+						log.Warningf("AllocationSet.AggregateBy: error getting share coefficienct for '%s'", alloc.Name)
+					}
 					continue
 				}
 
@@ -1170,6 +1191,10 @@ func computeShareCoeffs(aggregateBy []string, options *AllocationAggregationOpti
 			// Skip idle allocations in coefficient calculation
 			continue
 		}
+		if alloc.IsUnmounted() {
+			// Skip unmounted allocations in coefficient calculation
+			continue
+		}
 
 		// Determine the post-aggregation key under which the allocation will
 		// be shared.

+ 2 - 2
pkg/kubecost/allocation_test.go

@@ -1498,8 +1498,8 @@ func TestAllocationSet_AggregateBy(t *testing.T) {
 			start: start,
 			aggBy: []string{AllocationNamespaceProp},
 			aggOpts: &AllocationAggregationOptions{
-				ShareIdle:   ShareEven,
-				IdleByNode:  true,
+				ShareIdle:  ShareEven,
+				IdleByNode: true,
 			},
 			numResults: 3,
 			totalCost:  112.00,

+ 1 - 1
pkg/kubecost/bingen.go

@@ -29,4 +29,4 @@ package kubecost
 // @bingen:generate:PVKey
 // @bingen:generate:PVAllocation
 
-//go:generate bingen -package=kubecost -version=14 -buffer=github.com/kubecost/cost-model/pkg/util
+//go:generate bingen -package=kubecost -version=15 -buffer=github.com/kubecost/cost-model/pkg/util

+ 48 - 40
pkg/kubecost/kubecost_codecs.go

@@ -26,7 +26,7 @@ const (
 	GeneratorPackageName string = "kubecost"
 
 	// CodecVersion is the version passed into the generator
-	CodecVersion uint8 = 14
+	CodecVersion uint8 = 15
 )
 
 //--------------------------------------------------------------------------
@@ -171,6 +171,8 @@ func (target *Allocation) MarshalBinary() (data []byte, err error) {
 	buff.WriteFloat64(target.GPUHours)                   // write float64
 	buff.WriteFloat64(target.GPUCost)                    // write float64
 	buff.WriteFloat64(target.GPUCostAdjustment)          // write float64
+	buff.WriteFloat64(target.NetworkTransferBytes)       // write float64
+	buff.WriteFloat64(target.NetworkReceiveBytes)        // write float64
 	buff.WriteFloat64(target.NetworkCost)                // write float64
 	buff.WriteFloat64(target.NetworkCostAdjustment)      // write float64
 	buff.WriteFloat64(target.LoadBalancerCost)           // write float64
@@ -340,35 +342,41 @@ func (target *Allocation) UnmarshalBinary(data []byte) (err error) {
 	target.GPUCostAdjustment = x
 
 	y := buff.ReadFloat64() // read float64
-	target.NetworkCost = y
+	target.NetworkTransferBytes = y
 
 	aa := buff.ReadFloat64() // read float64
-	target.NetworkCostAdjustment = aa
+	target.NetworkReceiveBytes = aa
 
 	bb := buff.ReadFloat64() // read float64
-	target.LoadBalancerCost = bb
+	target.NetworkCost = bb
 
 	cc := buff.ReadFloat64() // read float64
-	target.LoadBalancerCostAdjustment = cc
+	target.NetworkCostAdjustment = cc
+
+	dd := buff.ReadFloat64() // read float64
+	target.LoadBalancerCost = dd
+
+	ee := buff.ReadFloat64() // read float64
+	target.LoadBalancerCostAdjustment = ee
 
 	// --- [begin][read][alias](PVAllocations) ---
-	var dd map[PVKey]*PVAllocation
+	var ff map[PVKey]*PVAllocation
 	if buff.ReadUInt8() == uint8(0) {
-		dd = nil
+		ff = nil
 	} else {
 		// --- [begin][read][map](map[PVKey]*PVAllocation) ---
-		ff := buff.ReadInt() // map len
-		ee := make(map[PVKey]*PVAllocation, ff)
-		for i := 0; i < ff; i++ {
+		hh := buff.ReadInt() // map len
+		gg := make(map[PVKey]*PVAllocation, hh)
+		for i := 0; i < hh; i++ {
 			// --- [begin][read][struct](PVKey) ---
-			gg := &PVKey{}
-			hh := buff.ReadInt()     // byte array length
-			kk := buff.ReadBytes(hh) // byte array
-			errE := gg.UnmarshalBinary(kk)
+			kk := &PVKey{}
+			ll := buff.ReadInt()     // byte array length
+			mm := buff.ReadBytes(ll) // byte array
+			errE := kk.UnmarshalBinary(mm)
 			if errE != nil {
 				return errE
 			}
-			v := *gg
+			v := *kk
 			// --- [end][read][struct](PVKey) ---
 
 			var z *PVAllocation
@@ -376,62 +384,62 @@ func (target *Allocation) UnmarshalBinary(data []byte) (err error) {
 				z = nil
 			} else {
 				// --- [begin][read][struct](PVAllocation) ---
-				ll := &PVAllocation{}
-				mm := buff.ReadInt()     // byte array length
-				nn := buff.ReadBytes(mm) // byte array
-				errF := ll.UnmarshalBinary(nn)
+				nn := &PVAllocation{}
+				oo := buff.ReadInt()     // byte array length
+				pp := buff.ReadBytes(oo) // byte array
+				errF := nn.UnmarshalBinary(pp)
 				if errF != nil {
 					return errF
 				}
-				z = ll
+				z = nn
 				// --- [end][read][struct](PVAllocation) ---
 
 			}
-			ee[v] = z
+			gg[v] = z
 		}
-		dd = ee
+		ff = gg
 		// --- [end][read][map](map[PVKey]*PVAllocation) ---
 
 	}
-	target.PVs = PVAllocations(dd)
+	target.PVs = PVAllocations(ff)
 	// --- [end][read][alias](PVAllocations) ---
 
-	oo := buff.ReadFloat64() // read float64
-	target.PVCostAdjustment = oo
-
-	pp := buff.ReadFloat64() // read float64
-	target.RAMByteHours = pp
-
 	qq := buff.ReadFloat64() // read float64
-	target.RAMBytesRequestAverage = qq
+	target.PVCostAdjustment = qq
 
 	rr := buff.ReadFloat64() // read float64
-	target.RAMBytesUsageAverage = rr
+	target.RAMByteHours = rr
 
 	ss := buff.ReadFloat64() // read float64
-	target.RAMCost = ss
+	target.RAMBytesRequestAverage = ss
 
 	tt := buff.ReadFloat64() // read float64
-	target.RAMCostAdjustment = tt
+	target.RAMBytesUsageAverage = tt
 
 	uu := buff.ReadFloat64() // read float64
-	target.SharedCost = uu
+	target.RAMCost = uu
 
 	ww := buff.ReadFloat64() // read float64
-	target.ExternalCost = ww
+	target.RAMCostAdjustment = ww
+
+	xx := buff.ReadFloat64() // read float64
+	target.SharedCost = xx
+
+	yy := buff.ReadFloat64() // read float64
+	target.ExternalCost = yy
 
 	if buff.ReadUInt8() == uint8(0) {
 		target.RawAllocationOnly = nil
 	} else {
 		// --- [begin][read][struct](RawAllocationOnlyData) ---
-		xx := &RawAllocationOnlyData{}
-		yy := buff.ReadInt()      // byte array length
-		aaa := buff.ReadBytes(yy) // byte array
-		errG := xx.UnmarshalBinary(aaa)
+		aaa := &RawAllocationOnlyData{}
+		bbb := buff.ReadInt()      // byte array length
+		ccc := buff.ReadBytes(bbb) // byte array
+		errG := aaa.UnmarshalBinary(ccc)
 		if errG != nil {
 			return errG
 		}
-		target.RawAllocationOnly = xx
+		target.RawAllocationOnly = aaa
 		// --- [end][read][struct](RawAllocationOnlyData) ---
 
 	}

+ 3 - 1
pkg/kubecost/mock.go

@@ -4,8 +4,10 @@ import (
 	"fmt"
 	"time"
 )
+
 const gb = 1024 * 1024 * 1024
 const day = 24 * time.Hour
+
 var disk = PVKey{}
 
 // NewMockUnitAllocation creates an *Allocation with all of its float64 values set to 1 and generic properties if not provided in arg
@@ -304,7 +306,7 @@ func GenerateMockAllocationSet(start time.Time) *AllocationSet {
 }
 
 // GenerateMockAllocationSetWithAssetProperties with no idle and connections to Assets in properties
-func GenerateMockAllocationSetWithAssetProperties(start time.Time)  *AllocationSet {
+func GenerateMockAllocationSetWithAssetProperties(start time.Time) *AllocationSet {
 	as := GenerateMockAllocationSet(start)
 	disk1 := PVKey{
 		Cluster: "cluster2",

+ 3 - 3
pkg/kubecost/window.go

@@ -3,6 +3,7 @@ package kubecost
 import (
 	"bytes"
 	"fmt"
+	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"math"
 	"regexp"
 	"strconv"
@@ -10,7 +11,6 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/thanos"
-	"github.com/kubecost/cost-model/pkg/util"
 )
 
 const (
@@ -611,7 +611,7 @@ func (w Window) DurationOffsetForPrometheus() (string, string, error) {
 		offset = 0
 	}
 
-	durStr, offStr := util.DurationOffsetStrings(duration, offset)
+	durStr, offStr := timeutil.DurationOffsetStrings(duration, offset)
 	if offset < time.Minute {
 		offStr = ""
 	} else {
@@ -630,7 +630,7 @@ func (w Window) DurationOffsetStrings() (string, string) {
 		return "", ""
 	}
 
-	return util.DurationOffsetStrings(dur, off)
+	return timeutil.DurationOffsetStrings(dur, off)
 }
 
 type BoundaryError struct {

+ 1 - 1
pkg/prom/query.go

@@ -190,7 +190,7 @@ func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error
 	statusCode := resp.StatusCode
 	statusText := http.StatusText(statusCode)
 	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
-		return nil, warnings, CommErrorf("%d (%s) URL: '%s' Headers: '%s', Body: '%s' Query: '%s'", statusCode, statusText, req.URL, util.HeaderString(resp.Header), body, query)
+		return nil, warnings, CommErrorf("%d (%s) URL: '%s', Request Headers: '%s', Headers: '%s', Body: '%s' Query: '%s'", statusCode, statusText, req.URL, req.Header, util.HeaderString(resp.Header), body, query)
 	}
 
 	var toReturn interface{}

+ 3 - 64
pkg/util/mapper/mapper.go

@@ -1,7 +1,7 @@
 package mapper
 
 import (
-	"fmt"
+	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"strconv"
 	"strings"
 	"time"
@@ -397,7 +397,7 @@ func (rom *readOnlyMapper) GetBool(key string, defaultValue bool) bool {
 func (rom *readOnlyMapper) GetDuration(key string, defaultValue time.Duration) time.Duration {
 	r := rom.getter.Get(key)
 
-	d, err := parseDuration(r)
+	d, err := timeutil.ParseDuration(r)
 	if err != nil {
 		return defaultValue
 	}
@@ -488,7 +488,7 @@ func (wom *writeOnlyMapper) SetBool(key string, value bool) error {
 
 // SetDuration sets the map to a string formatted bool value.
 func (wom *writeOnlyMapper) SetDuration(key string, value time.Duration) error {
-	return wom.setter.Set(key, durationString(value))
+	return wom.setter.Set(key, timeutil.DurationString(value))
 }
 
 // SetList sets the map's value at key to a string consistent of each value in the list separated
@@ -497,66 +497,5 @@ func (wom *writeOnlyMapper) SetList(key string, values []string, delimiter strin
 	return wom.setter.Set(key, strings.Join(values, delimiter))
 }
 
-const (
-	secsPerMin  = 60.0
-	secsPerHour = 3600.0
-	secsPerDay  = 86400.0
-)
-
-// durationString converts duration to a string of the form "4d", "4h", "4m", or "4s" if
-// the number of seconds in the string is evenly divisible into an integer number of
-// days, hours, minutes, or seconds respectively.
-func durationString(duration time.Duration) string {
-	durSecs := int64(duration.Seconds())
-
-	durStr := ""
-	if durSecs > 0 {
-		if durSecs%secsPerDay == 0 {
-			// convert to days
-			durStr = fmt.Sprintf("%dd", durSecs/secsPerDay)
-		} else if durSecs%secsPerHour == 0 {
-			// convert to hours
-			durStr = fmt.Sprintf("%dh", durSecs/secsPerHour)
-		} else if durSecs%secsPerMin == 0 {
-			// convert to mins
-			durStr = fmt.Sprintf("%dm", durSecs/secsPerMin)
-		} else if durSecs > 0 {
-			// default to mins, as long as duration is positive
-			durStr = fmt.Sprintf("%ds", durSecs)
-		}
-	}
 
-	return durStr
-}
-
-func parseDuration(duration string) (time.Duration, error) {
-	var amountStr string
-	var unit time.Duration
-	switch {
-	case strings.HasSuffix(duration, "s"):
-		unit = time.Second
-		amountStr = strings.TrimSuffix(duration, "s")
-	case strings.HasSuffix(duration, "m"):
-		unit = time.Minute
-		amountStr = strings.TrimSuffix(duration, "m")
-	case strings.HasSuffix(duration, "h"):
-		unit = time.Hour
-		amountStr = strings.TrimSuffix(duration, "h")
-	case strings.HasSuffix(duration, "d"):
-		unit = 24.0 * time.Hour
-		amountStr = strings.TrimSuffix(duration, "d")
-	default:
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
 
-	if len(amountStr) == 0 {
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
-
-	amount, err := strconv.ParseInt(amountStr, 10, 64)
-	if err != nil {
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
-
-	return time.Duration(amount) * unit, nil
-}

+ 1 - 1
pkg/util/retry/retry.go

@@ -36,7 +36,7 @@ func Retry(ctx context.Context, f func() (interface{}, error), attempts uint, de
 
 		time.Sleep(d)
 
-		jitter := time.Duration(rand.Int63n(int64(d)))
+		jitter := time.Duration(rand.Int63n(int64(d))) // #nosec No need for a cryptographic strength random here
 		d = d + jitter/2
 	}
 

+ 1 - 1
pkg/util/strings.go

@@ -30,7 +30,7 @@ const (
 func RandSeq(n int) string {
 	b := make([]rune, n)
 	for i := range b {
-		b[i] = alpha[rand.Intn(len(alpha))]
+		b[i] = alpha[rand.Intn(len(alpha))] // #nosec No need for a cryptographic strength random here
 	}
 	return string(b)
 }

+ 0 - 56
pkg/util/time_test.go

@@ -1,56 +0,0 @@
-package util
-
-import (
-	"testing"
-	"time"
-)
-
-func TestDurationOffsetStrings(t *testing.T) {
-	dur, off := "", ""
-
-	dur, off = DurationOffsetStrings(0, 0)
-	if dur != "" || off != "" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "", "", dur, off)
-	}
-
-	dur, off = DurationOffsetStrings(24*time.Hour, 0)
-	if dur != "1d" || off != "" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "1d", "", dur, off)
-	}
-
-	dur, off = DurationOffsetStrings(24*time.Hour+5*time.Minute, 0)
-	if dur != "1445m" || off != "" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "1445m", "", dur, off)
-	}
-
-	dur, off = DurationOffsetStrings(25*time.Hour, 5*time.Minute)
-	if dur != "25h" || off != "5m" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "25h", "5m", dur, off)
-	}
-
-	dur, off = DurationOffsetStrings(25*time.Hour, 60*time.Minute)
-	if dur != "25h" || off != "1h" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "25h", "1h", dur, off)
-	}
-
-	dur, off = DurationOffsetStrings(72*time.Hour, 1440*time.Minute)
-	if dur != "3d" || off != "1d" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "3d", "1d", dur, off)
-	}
-
-	dur, off = DurationOffsetStrings(25*time.Hour, 1*time.Second)
-	if dur != "25h" || off != "1s" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "25h", "1s", dur, off)
-	}
-
-	dur, off = DurationOffsetStrings(24*time.Hour+time.Second, 1*time.Second)
-	if dur != "86401s" || off != "1s" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "86401s", "1s", dur, off)
-	}
-
-	// Expect empty strings if durations are negative
-	dur, off = DurationOffsetStrings(-25*time.Hour, -1*time.Second)
-	if dur != "" || off != "" {
-		t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", "", "", dur, off)
-	}
-}

+ 57 - 42
pkg/util/time.go → pkg/util/timeutil/timeutil.go

@@ -1,8 +1,10 @@
-package util
+package timeutil
 
 import (
 	"fmt"
+	"regexp"
 	"strconv"
+	"strings"
 	"sync"
 	"time"
 )
@@ -50,7 +52,7 @@ func DurationString(duration time.Duration) string {
 			// convert to mins
 			durStr = fmt.Sprintf("%dm", durSecs/SecsPerMin)
 		} else if durSecs > 0 {
-			// default to mins, as long as duration is positive
+			// default to secs, as long as duration is positive
 			durStr = fmt.Sprintf("%ds", durSecs)
 		}
 	}
@@ -58,14 +60,39 @@ func DurationString(duration time.Duration) string {
 	return durStr
 }
 
+// DurationToPromOffsetString returns a Prometheus formatted string with leading offset or empty string if given a negative duration
+func DurationToPromOffsetString(duration time.Duration) string {
+	dirStr := DurationString(duration)
+	if dirStr != "" {
+		dirStr = fmt.Sprintf("offset %s", dirStr)
+	}
+	return dirStr
+}
+
 // DurationOffsetStrings converts a (duration, offset) pair to Prometheus-
 // compatible strings in terms of days, hours, minutes, or seconds.
 func DurationOffsetStrings(duration, offset time.Duration) (string, string) {
 	return DurationString(duration), DurationString(offset)
 }
 
+// FormatStoreResolution provides a clean notation for ETL store resolutions.
+// e.g. daily => 1d; hourly => 1h
+func FormatStoreResolution(dur time.Duration) string {
+	if dur >= 24*time.Hour {
+		return fmt.Sprintf("%dd", int(dur.Hours()/24.0))
+	} else if dur >= time.Hour {
+		return fmt.Sprintf("%dh", int(dur.Hours()))
+	}
+	return fmt.Sprint(dur)
+}
+
 // ParseDuration converts a Prometheus-style duration string into a Duration
-func ParseDuration(duration string) (*time.Duration, error) {
+func ParseDuration(duration string) (time.Duration, error) {
+	// Trim prefix of Prometheus format duration
+	duration = CleanDurationString(duration)
+	if len(duration) < 2 {
+		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
 	unitStr := duration[len(duration)-1:]
 	var unit time.Duration
 	switch unitStr {
@@ -78,52 +105,51 @@ func ParseDuration(duration string) (*time.Duration, error) {
 	case "d":
 		unit = 24.0 * time.Hour
 	default:
-		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
 	}
 
 	amountStr := duration[:len(duration)-1]
 	amount, err := strconv.ParseInt(amountStr, 10, 64)
 	if err != nil {
-		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
 	}
 
-	dur := time.Duration(amount) * unit
-	return &dur, nil
+	return time.Duration(amount) * unit, nil
+}
+
+// CleanDurationString removes prometheus formatted prefix "offset " allong with leading a trailing whitespace
+// from duration string, leaving behind a string with format [0-9+](s|m|d|h)
+func CleanDurationString(duration string) string {
+	duration = strings.TrimSpace(duration)
+	duration = strings.TrimPrefix(duration, "offset ")
+	return duration
 }
 
 // ParseTimeRange returns a start and end time, respectively, which are converted from
 // a duration and offset, defined as strings with Prometheus-style syntax.
-func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
+func ParseTimeRange(duration, offset time.Duration) (time.Time, time.Time) {
 	// endTime defaults to the current time, unless an offset is explicity declared,
 	// in which case it shifts endTime back by given duration
 	endTime := time.Now()
-	if offset != "" {
-		o, err := ParseDuration(offset)
-		if err != nil {
-			return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
-		}
-		endTime = endTime.Add(-1 * *o)
+	if offset > 0 {
+		endTime = endTime.Add(-1 * offset)
 	}
 
-	// if duration is defined in terms of days, convert to hours
-	// e.g. convert "2d" to "48h"
-	durationNorm, err := normalizeTimeParam(duration)
-	if err != nil {
-		return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
-	}
+	startTime := endTime.Add(-1 * duration)
 
-	// convert time duration into start and end times, formatted
-	// as ISO datetime strings
-	dur, err := time.ParseDuration(durationNorm)
-	if err != nil {
-		return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
-	}
-	startTime := endTime.Add(-1 * dur)
-
-	return &startTime, &endTime, nil
+	return startTime, endTime
 }
 
-func normalizeTimeParam(param string) (string, error) {
+// FormatDurationStringDaysToHours converts string from format [0-9+]d to [0-9+]h
+func FormatDurationStringDaysToHours(param string) (string, error) {
+	//check that input matches format
+	ok, err := regexp.MatchString("[0-9+]d", param)
+	if !ok {
+		return param, fmt.Errorf("FormatDurationStringDaysToHours: input string (%s) not formatted as [0-9+]d", param)
+	}
+	if err != nil {
+		return "", err
+	}
 	// convert days to hours
 	if param[len(param)-1:] == "d" {
 		count := param[:len(param)-1]
@@ -138,17 +164,6 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
-// FormatStoreResolution provides a clean notation for ETL store resolutions.
-// e.g. daily => 1d; hourly => 1h
-func FormatStoreResolution(dur time.Duration) string {
-	if dur >= 24*time.Hour {
-		return fmt.Sprintf("%dd", int(dur.Hours()/24.0))
-	} else if dur >= time.Hour {
-		return fmt.Sprintf("%dh", int(dur.Hours()))
-	}
-	return fmt.Sprint(dur)
-}
-
 // JobTicker is a ticker used to synchronize the next run of a repeating
 // process. The designated use-case is for infinitely-looping selects,
 // where a timeout or an exit channel might cancel the process, but otherwise
@@ -222,4 +237,4 @@ func (jt *JobTicker) TickIn(d time.Duration) {
 			jt.ch <- time.Now()
 		}
 	}(d)
-}
+}

+ 381 - 0
pkg/util/timeutil/timeutil_test.go

@@ -0,0 +1,381 @@
+package timeutil
+
+import (
+	"testing"
+	"time"
+)
+
+func Test_DurationString(t *testing.T) {
+	testCases := map[string]struct {
+		duration time.Duration
+		expectedDuration string
+	}{
+		"1a": {
+			duration:         0,
+			expectedDuration: "",
+		},
+		"1b": {
+			duration:         24*time.Hour,
+			expectedDuration: "1d",
+		},
+		"1c": {
+			duration:         24*time.Hour+5*time.Minute,
+			expectedDuration: "1445m",
+		},
+		"1d": {
+			duration:         25*time.Hour,
+			expectedDuration: "25h",
+		},
+		"1e": {
+			duration:         25*time.Hour,
+			expectedDuration: "25h",
+		},
+		"1f": {
+			duration:         72*time.Hour,
+			expectedDuration: "3d",
+		},
+		"1g": {
+			duration:         25*time.Hour,
+			expectedDuration: "25h",
+		},
+		"1h": {
+			duration:         24*time.Hour+time.Second,
+			expectedDuration: "86401s",
+		},
+		// Expect empty strings if durations are negative
+		"1i": {
+			duration:         -25*time.Hour,
+			expectedDuration: "",
+		},
+	}
+
+	for name, test := range testCases {
+		t.Run(name, func(t *testing.T) {
+			dur := DurationString(test.duration)
+			if dur != test.expectedDuration {
+				t.Fatalf("DurationOffsetStrings: exp (%s); act (%s)", test.expectedDuration, dur)
+			}
+		})
+	}
+}
+
+func Test_DurationToPromOffsetString(t *testing.T) {
+	testCases := map[string]struct {
+		duration time.Duration
+		expectedDuration string
+	}{
+		"1a": {
+			duration:         0,
+			expectedDuration: "",
+		},
+		"1b": {
+			duration:         24*time.Hour,
+			expectedDuration: "offset 1d",
+		},
+		"1c": {
+			duration:         24*time.Hour+5*time.Minute,
+			expectedDuration: "offset 1445m",
+		},
+		"1d": {
+			duration:         25*time.Hour,
+			expectedDuration: "offset 25h",
+		},
+		"1e": {
+			duration:         25*time.Hour,
+			expectedDuration: "offset 25h",
+		},
+		"1f": {
+			duration:         72*time.Hour,
+			expectedDuration: "offset 3d",
+		},
+		"1g": {
+			duration:         25*time.Hour,
+			expectedDuration: "offset 25h",
+		},
+		"1h": {
+			duration:         24*time.Hour+time.Second,
+			expectedDuration: "offset 86401s",
+		},
+		// Expect empty strings if durations are negative
+		"1i": {
+			duration:         -25*time.Hour,
+			expectedDuration: "",
+		},
+	}
+
+	for name, test := range testCases {
+		t.Run(name, func(t *testing.T) {
+			dur := DurationToPromOffsetString(test.duration)
+			if dur != test.expectedDuration {
+				t.Fatalf("DurationOffsetStrings: exp (%s); act (%s)", test.expectedDuration, dur)
+			}
+		})
+	}
+}
+
+func Test_FormatStoreResolution(t *testing.T) {
+	testCases := map[string]struct {
+		duration time.Duration
+		expectedDuration string
+	}{
+		"1a": {
+			duration:         0,
+			expectedDuration: "0s",
+		},
+		"1b": {
+			duration:         24*time.Hour,
+			expectedDuration: "1d",
+		},
+		"1c": {
+			duration:         24*time.Hour+5*time.Minute,
+			expectedDuration: "1d",
+		},
+		"1d": {
+			duration:         25*time.Hour,
+			expectedDuration: "1d",
+		},
+		"1e": {
+			duration:         25*time.Hour,
+			expectedDuration: "1d",
+		},
+		"1f": {
+			duration:         72*time.Hour,
+			expectedDuration: "3d",
+		},
+		"1g": {
+			duration:         25*time.Hour,
+			expectedDuration: "1d",
+		},
+		"1h": {
+			duration:         24*time.Hour+time.Second,
+			expectedDuration: "1d",
+		},
+		// Expect empty strings if durations are negative
+		"1i": {
+			duration:         -25*time.Hour,
+			expectedDuration: "-25h0m0s",
+		},
+	}
+
+	for name, test := range testCases {
+		t.Run(name, func(t *testing.T) {
+			dur := FormatStoreResolution(test.duration)
+			if dur != test.expectedDuration {
+				t.Fatalf("DurationOffsetStrings: exp (%s); act (%s)", test.expectedDuration, dur)
+			}
+		})
+	}
+}
+
+func Test_DurationOffsetStrings(t *testing.T) {
+	testCases := map[string]struct {
+		duration time.Duration
+		offset time.Duration
+		expectedDuration string
+		expectedOffset string
+	}{
+		"1a": {
+			duration:         0,
+			offset:           0,
+			expectedDuration: "",
+			expectedOffset:   "",
+		},
+		"1b": {
+			duration:         24*time.Hour,
+			offset:           0,
+			expectedDuration: "1d",
+			expectedOffset:   "",
+		},
+		"1c": {
+			duration:         24*time.Hour+5*time.Minute,
+			offset:           0,
+			expectedDuration: "1445m",
+			expectedOffset:   "",
+		},
+		"1d": {
+			duration:         25*time.Hour,
+			offset:           5*time.Minute,
+			expectedDuration: "25h",
+			expectedOffset:   "5m",
+		},
+		"1e": {
+			duration:         25*time.Hour,
+			offset:           60*time.Minute,
+			expectedDuration: "25h",
+			expectedOffset:   "1h",
+		},
+		"1f": {
+			duration:         72*time.Hour,
+			offset:           1440*time.Minute,
+			expectedDuration: "3d",
+			expectedOffset:   "1d",
+		},
+		"1g": {
+			duration:         25*time.Hour,
+			offset:           1*time.Second,
+			expectedDuration: "25h",
+			expectedOffset:   "1s",
+		},
+		"1h": {
+			duration:         24*time.Hour+time.Second,
+			offset:           1*time.Second,
+			expectedDuration: "86401s",
+			expectedOffset:   "1s",
+		},
+		// Expect empty strings if durations are negative
+		"1i": {
+			duration:         -25*time.Hour,
+			offset:           -1*time.Second,
+			expectedDuration: "",
+			expectedOffset:   "",
+		},
+	}
+
+	for name, test := range testCases {
+		t.Run(name, func(t *testing.T) {
+			dur, off:= DurationOffsetStrings(test.duration, test.offset)
+			if dur != test.expectedDuration || off != test.expectedOffset {
+				t.Fatalf("DurationOffsetStrings: exp (%s %s); act (%s, %s)", test.expectedDuration, test.expectedOffset, dur, off)
+			}
+		})
+	}
+}
+
+func Test_ParseDuration(t *testing.T) {
+	testCases := map[string]struct {
+		input    string
+		expected time.Duration
+	}{
+		"expected": {
+			input:    "3h",
+			expected: time.Hour * 3,
+		},
+		"white space": {
+			input:    " 4s ",
+			expected: time.Second * 4,
+		},
+		"prom prefix": {
+			input:    "offset 3m",
+			expected: time.Minute * 3,
+		},
+		"prom prefix white space": {
+			input:    " offset 3d ",
+			expected: 24.0 * time.Hour * 3,
+		},
+		"zero": {
+			input:    "0h",
+			expected: time.Duration(0),
+		},
+		"empty": {
+			input:    "",
+			expected: time.Duration(0),
+		},
+		"bad string": {
+			input:    "oqwd3dk5hk",
+			expected: time.Duration(0),
+		},
+		"digit": {
+			input:    "3",
+			expected: time.Duration(0),
+		},
+		"unit": {
+			input:    "h",
+			expected: time.Duration(0),
+		},
+	}
+	for name, test := range testCases {
+		t.Run(name, func(t *testing.T) {
+			dur, _ := ParseDuration(test.input)
+			if dur != test.expected {
+				t.Errorf("Expected duration %v did not match result %v", test.expected, dur)
+			}
+		})
+	}
+}
+
+func Test_CleanDurationString(t *testing.T) {
+	testCases := map[string]struct {
+		input    string
+		expected string
+	}{
+		"white space": {
+			input:    " 1d ",
+			expected: "1d",
+		},
+		"no change": {
+			input:    "1d",
+			expected: "1d",
+		},
+		"prefix": {
+			input:    "offset 1d",
+			expected: "1d",
+		},
+		"prefix white space": {
+			input:    " offset 1d ",
+			expected: "1d",
+		},
+		"empty": {
+			input:    "",
+			expected: "",
+		},
+		"random": {
+			input:    "oqwd3dk5hk",
+			expected: "oqwd3dk5hk",
+		},
+
+
+	}
+	for name, test := range testCases {
+		t.Run(name, func(t *testing.T) {
+			res := CleanDurationString(test.input)
+			if res != test.expected {
+				t.Errorf("Expected output %s did not match result %s", test.expected, res)
+			}
+		})
+	}
+}
+
+func Test_FormatDurationStringDaysToHours(t *testing.T) {
+	testCases := map[string]struct {
+		input    string
+		expected string
+	}{
+		"1 day": {
+			input:    "1d",
+			expected: "24h",
+		},
+		"2 days": {
+			input:    "1d",
+			expected: "24h",
+		},
+		"500 days": {
+			input:    "500d",
+			expected: "12000h",
+		},
+		"1h": {
+			input:    "1h",
+			expected: "1h",
+		},
+		"empty": {
+			input:    "",
+			expected: "",
+		},
+		"no unit": {
+			input:    "1",
+			expected: "1",
+		},
+		"random": {
+			input:    "oqwd3dk5hk",
+			expected: "oqwd3dk5hk",
+		},
+
+	}
+	for name, test := range testCases {
+		t.Run(name, func(t *testing.T) {
+			res, _ := FormatDurationStringDaysToHours(test.input)
+			if res != test.expected {
+				t.Errorf("Expected output %s did not match result %s", test.expected, res)
+			}
+		})
+	}
+}