Explorar o código

support s3 csv load

AjayTripathy %!s(int64=6) %!d(string=hai) anos
pai
achega
6f003f3c17
Modificáronse 2 ficheiros con 54 adicións e 10 borrados
  1. 35 3
      pkg/cloud/csvprovider.go
  2. 19 7
      pkg/cloud/provider.go

+ 35 - 3
pkg/cloud/csvprovider.go

@@ -8,6 +8,9 @@ import (
 	"strings"
 	"sync"
 
+	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/aws/session"
+	"github.com/aws/aws-sdk-go/service/s3"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/klog"
 
@@ -47,13 +50,42 @@ func (c *CSVProvider) DownloadPricingData() error {
 		return err
 	}
 	fieldsPerRecord := len(header)
-	csvr, err := GetCsv(c.CSVLocation)
+	var csvr io.Reader
+	var csverr error
+	if strings.HasPrefix(c.CSVLocation, "s3://") {
+		region := os.Getenv("CSV_REGION")
+		conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
+		s3Client := s3.New(session.New(conf))
+		bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
+		if len(bucketAndKey) == 2 {
+			out, err := s3Client.GetObject(&s3.GetObjectInput{
+				Bucket: aws.String(bucketAndKey[0]),
+				Key:    aws.String(bucketAndKey[1]),
+			})
+			csverr = err
+			csvr = out.Body
+		} else {
+			c.Pricing = pricing
+			c.PricingPV = pvpricing
+			return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
+		}
+	} else {
+		csvr, csverr = GetCsv(c.CSVLocation)
+	}
+	if csverr != nil {
+		klog.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
+		c.Pricing = pricing
+		c.PricingPV = pvpricing
+		return nil
+	}
 	csvReader := csv.NewReader(csvr)
 	csvReader.Comma = ','
 	csvReader.FieldsPerRecord = fieldsPerRecord
 
 	dec, err := csvutil.NewDecoder(csvReader, header...)
 	if err != nil {
+		c.Pricing = pricing
+		c.PricingPV = pvpricing
 		return err
 	}
 	for {
@@ -161,11 +193,11 @@ func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
 			akey := strings.Join(mf[2:len(mf)], "")
 			return n.Annotations[akey]
 		} else {
-			klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
+			klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
 			return ""
 		}
 	} else {
-		klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
+		klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
 		return ""
 	}
 }

+ 19 - 7
pkg/cloud/provider.go

@@ -230,13 +230,31 @@ func NewCrossClusterProvider(ctype string, overrideConfigPath string, cache clus
 
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
 func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, error) {
+	nodes := cache.GetAllNodes()
+	if len(nodes) == 0 {
+		return nil, fmt.Errorf("Could not locate any nodes for cluster.")
+	}
+
+	provider := strings.ToLower(nodes[0].Spec.ProviderID)
+
 	if os.Getenv("USE_CSV_PROVIDER") == "true" {
 		klog.Infof("Using CSV Provider with CSV at %s", os.Getenv("CSV_PATH"))
+		configFileName := ""
+		if metadata.OnGCE() {
+			configFileName = "gcp.json"
+		} else if strings.HasPrefix(provider, "aws") {
+			configFileName = "aws.json"
+		} else if strings.HasPrefix(provider, "azure") {
+			configFileName = "azure.json"
+
+		} else {
+			configFileName = "default.json"
+		}
 		return &CSVProvider{
 			CSVLocation: os.Getenv("CSV_PATH"),
 			CustomProvider: &CustomProvider{
 				Clientset: cache,
-				Config:    NewProviderConfig("default.json"),
+				Config:    NewProviderConfig(configFileName),
 			},
 		}, nil
 	}
@@ -252,12 +270,6 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		}, nil
 	}
 
-	nodes := cache.GetAllNodes()
-	if len(nodes) == 0 {
-		return nil, fmt.Errorf("Could not locate any nodes for cluster.")
-	}
-
-	provider := strings.ToLower(nodes[0].Spec.ProviderID)
 	if strings.HasPrefix(provider, "aws") {
 		klog.V(2).Info("Found ProviderID starting with \"aws\", using AWS Provider")
 		return &AWS{