Explorar el Código

Merge pull request #461 from kubecost/develop

Merge develop into master
Ajay Tripathy hace 5 años
padre
commit
5b37bcf359
Se han modificado 5 ficheros con 51 adiciones y 13 borrados
  1. 20 4
      pkg/cloud/awsprovider.go
  2. 4 0
      pkg/cloud/csvprovider.go
  3. 1 0
      pkg/cloud/provider.go
  4. 1 1
      pkg/costmodel/costmodel.go
  5. 25 8
      test/cloud_test.go

+ 20 - 4
pkg/cloud/awsprovider.go

@@ -26,6 +26,7 @@ import (
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/awserr"
 	"github.com/aws/aws-sdk-go/aws/credentials"
+	"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/service/athena"
 	"github.com/aws/aws-sdk-go/service/ec2"
@@ -43,6 +44,9 @@ const supportedSpotFeedVersion = "1"
 const SpotInfoUpdateType = "spotinfo"
 const AthenaInfoUpdateType = "athenainfo"
 
+// How often spot data is refreshed
+const SpotRefreshDuration = 15 * time.Minute
+
 const defaultConfigPath = "/var/configs/"
 
 var awsRegions = []string{
@@ -282,6 +286,7 @@ type AwsAthenaInfo struct {
 	ServiceKeyName   string `json:"serviceKeyName"`
 	ServiceKeySecret string `json:"serviceKeySecret"`
 	AccountID        string `json:"projectID"`
+	MasterPayerARN   string `json:"masterPayerARN"`
 }
 
 func (aws *AWS) GetManagementPlatform() (string, error) {
@@ -352,6 +357,9 @@ func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			if a.ServiceKeySecret != "" {
 				c.ServiceKeySecret = a.ServiceKeySecret
 			}
+			if a.MasterPayerARN != "" {
+				c.MasterPayerARN = a.MasterPayerARN
+			}
 			c.AthenaProjectID = a.AccountID
 		} else {
 			a := make(map[string]interface{})
@@ -743,8 +751,8 @@ func (aws *AWS) DownloadPricingData() error {
 			defer errors.HandlePanic()
 
 			for {
-				klog.Infof("Spot Pricing Refresh scheduled in 1 hr.")
-				time.Sleep(time.Hour)
+				klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
+				time.Sleep(SpotRefreshDuration)
 
 				// Reoccurring refresh checks update times
 				aws.refreshSpotPricing(false)
@@ -760,7 +768,7 @@ func (aws *AWS) refreshSpotPricing(force bool) {
 	defer aws.SpotPricingLock.Unlock()
 
 	now := time.Now().UTC()
-	updateTime := now.Add(-time.Hour)
+	updateTime := now.Add(-SpotRefreshDuration)
 
 	// Return if there was an update time set and an hour hasn't elapsed
 	if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
@@ -1429,6 +1437,13 @@ func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutpu
 	}
 	s := session.Must(session.NewSession(c))
 	svc := athena.New(s)
+	if customPricing.MasterPayerARN != "" {
+		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
+		svc = athena.New(s, &aws.Config{
+			Region:      region,
+			Credentials: creds,
+		})
+	}
 
 	var e athena.StartQueryExecutionInput
 
@@ -1884,6 +1899,7 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 		klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
 	}
 
+	// TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
 	var keys []*string
 	for _, obj := range lso.Contents {
 		keys = append(keys, obj.Key)
@@ -1968,7 +1984,7 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 				continue
 			}
 
-			klog.V(4).Infof("Found spot info %+v", spot)
+			klog.V(1).Infof("Found spot info for: %s", spot.InstanceID)
 			spots[spot.InstanceID] = &spot
 		}
 		gr.Close()

+ 4 - 0
pkg/cloud/csvprovider.go

@@ -172,6 +172,10 @@ func NodeValueFromMapField(m string, n *v1.Node) string {
 				return group
 			}
 		}
+		if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
+			vmOrScaleSet := strings.TrimPrefix(n.Spec.ProviderID, "azure://")
+			return vmOrScaleSet
+		}
 		return n.Spec.ProviderID
 	} else if len(mf) > 1 && mf[0] == "metadata" {
 		if mf[1] == "name" {

+ 1 - 0
pkg/cloud/provider.go

@@ -140,6 +140,7 @@ type CustomPricing struct {
 	AthenaRegion          string            `json:"athenaRegion"`
 	AthenaDatabase        string            `json:"athenaDatabase"`
 	AthenaTable           string            `json:"athenaTable"`
+	MasterPayerARN        string            `json:"masterPayerARN"`
 	BillingDataDataset    string            `json:"billingDataDataset,omitempty"`
 	CustomPricesEnabled   string            `json:"customPricesEnabled"`
 	DefaultIdle           string            `json:"defaultIdle"`

+ 1 - 1
pkg/costmodel/costmodel.go

@@ -2444,7 +2444,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	unmounted := findUnmountedPVCostData(unmountedPVs, namespaceLabelsMapping)
 	for k, costs := range unmounted {
-		klog.V(3).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
+		klog.V(4).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
 
 		if costDataPassesFilters(costs, filterNamespace, filterCluster) {
 			containerNameCost[k] = costs

+ 25 - 8
test/cloud_test.go

@@ -19,8 +19,25 @@ func TestTransformedValueFromMapField(t *testing.T) {
 	if got != providerIDWant {
 		t.Errorf("Assert on '%s' want '%s' got '%s'", providerIDMap, providerIDWant, got)
 	}
+
+	providerIDWant2 := "/subscriptions/0bd50fdf-c923-4e1e-850c-196dd3dcc5d3/resourceGroups/MC_test_test_eastus/providers/Microsoft.Compute/virtualMachines/aks-agentpool-20139558-0"
+	n2 := &v1.Node{}
+	n2.Spec.ProviderID = "azure:///subscriptions/0bd50fdf-c923-4e1e-850c-196dd3dcc5d3/resourceGroups/MC_test_test_eastus/providers/Microsoft.Compute/virtualMachines/aks-agentpool-20139558-0"
+	got2 := cloud.NodeValueFromMapField(providerIDMap, n2)
+	if got2 != providerIDWant2 {
+		t.Errorf("Assert on '%s' want '%s' got '%s'", providerIDMap, providerIDWant2, got2)
+	}
+
+	providerIDWant3 := "/subscriptions/0bd50fdf-c923-4e1e-850c-196dd3dcc5d3/resourceGroups/mc_testspot_testspot_eastus/providers/Microsoft.Compute/virtualMachineScaleSets/aks-nodepool1-19213364-vmss/virtualMachines/0"
+	n3 := &v1.Node{}
+	n3.Spec.ProviderID = "azure:///subscriptions/0bd50fdf-c923-4e1e-850c-196dd3dcc5d3/resourceGroups/mc_testspot_testspot_eastus/providers/Microsoft.Compute/virtualMachineScaleSets/aks-nodepool1-19213364-vmss/virtualMachines/0"
+	got3 := cloud.NodeValueFromMapField(providerIDMap, n3)
+	if got3 != providerIDWant3 {
+		t.Errorf("Assert on '%s' want '%s' got '%s'", providerIDMap, providerIDWant3, got3)
+	}
 }
 
+
 func TestNodeValueFromMapField(t *testing.T) {
 	providerIDWant := "providerid"
 	nameWant := "gke-standard-cluster-1-pool-1-91dc432d-cg69"
@@ -71,11 +88,14 @@ func TestNodePriceFromCSV(t * testing.T) {
 	}
 	c.DownloadPricingData()
 	k := c.GetKey(n.Labels, n)
-	resN, _ := c.NodePricing(k)
-	gotPrice := resN.Cost
-
-	if gotPrice != wantPrice {
-		t.Errorf("Wanted price '%s' got price '%s'", wantPrice, gotPrice)
+	resN, err := c.NodePricing(k)
+	if err != nil {
+		t.Errorf("Error in NodePricing: %s", err.Error())
+	} else {
+		gotPrice := resN.Cost
+		if gotPrice != wantPrice {
+			t.Errorf("Wanted price '%s' got price '%s'", wantPrice, gotPrice)
+		}
 	}
 
 	unknownN := &v1.Node{}
@@ -100,7 +120,4 @@ func TestNodePriceFromCSV(t * testing.T) {
 	if resN3 != nil {
 		t.Errorf("CSV provider should return nil on missing csv")
 	}
-
-
-
 }