Browse Source

refactor(cloud): Move aws and gcp to dedicated modules

Following the move by Azure folks to refactor the azure module in #1869,
I am refactoring AWS and GCP into their own dedicated folders. This was
for the most part straight forward. The one caveat is that the
customprovider had hard dependencies on AWS resources. This didn't feel
right to me, so instead of the two sharing certain structs, I opted to
copy over the pvKey implementation to customprovider and a regex.

Signed-off-by: pokom <mark.poko@grafana.com>
pokom 3 years ago
parent
commit
7de0a34444

+ 16 - 17
pkg/cloud/awsprovider.go → pkg/cloud/aws/awsprovider.go

@@ -1,4 +1,4 @@
-package cloud
+package aws
 
 import (
 	"bytes"
@@ -178,13 +178,12 @@ type AWS struct {
 	SpotDataPrefix              string
 	ProjectID                   string
 	DownloadPricingDataLock     sync.RWMutex
-	Config                      *ProviderConfig
-	serviceAccountChecks        *models.ServiceAccountChecks
+	Config                      models.ProviderConfig
+	ServiceAccountChecks        *models.ServiceAccountChecks
 	clusterManagementPrice      float64
-	clusterRegion               string
-	clusterAccountID            string
+	ClusterRegion               string
+	ClusterAccountID            string
 	clusterProvisioner          string
-	*CustomProvider
 }
 
 // AWSAccessKey holds AWS credentials and fulfils the awsV2.CredentialsProvider interface
@@ -650,7 +649,7 @@ func (k *awsKey) getUsageType(labels map[string]string) string {
 		// We currently write out spot instances as "preemptible" in the pricing data, so these need to match
 		return PreemptibleType
 	}
-	if kLabel, ok := labels[KarpenterCapacityTypeLabel]; ok && kLabel == KarpenterCapacitySpotTypeValue {
+	if kLabel, ok := labels[models.KarpenterCapacityTypeLabel]; ok && kLabel == models.KarpenterCapacitySpotTypeValue {
 		return PreemptibleType
 	}
 	return ""
@@ -1366,7 +1365,7 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 	m["name"] = clusterName
 	m["provider"] = kubecost.AWSProvider
 	m["account"] = clusterAccountID
-	m["region"] = awsProvider.clusterRegion
+	m["region"] = awsProvider.ClusterRegion
 	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(env.IsRemoteEnabled())
 	m["provisioner"] = awsProvider.clusterProvisioner
@@ -1403,7 +1402,7 @@ func (aws *AWS) getAWSAuth(forceReload bool, cp *models.CustomPricing) (string,
 
 	// 1. Check config values first (set from frontend UI)
 	if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
-		aws.serviceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
 			Message: "AWS ServiceKey exists",
 			Status:  true,
 		})
@@ -1413,7 +1412,7 @@ func (aws *AWS) getAWSAuth(forceReload bool, cp *models.CustomPricing) (string,
 	// 2. Check for secret
 	s, _ := aws.loadAWSAuthSecret(forceReload)
 	if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
-		aws.serviceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
 			Message: "AWS ServiceKey exists",
 			Status:  true,
 		})
@@ -1422,12 +1421,12 @@ func (aws *AWS) getAWSAuth(forceReload bool, cp *models.CustomPricing) (string,
 
 	// 3. Fall back to env vars
 	if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeySecret() == "" {
-		aws.serviceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
 			Message: "AWS ServiceKey exists",
 			Status:  false,
 		})
 	} else {
-		aws.serviceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
 			Message: "AWS ServiceKey exists",
 			Status:  true,
 		})
@@ -2154,14 +2153,14 @@ func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, re
 	}
 	lso, err := cli.ListObjects(context.TODO(), ls)
 	if err != nil {
-		aws.serviceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
 			Message:        "Bucket List Permissions Available",
 			Status:         false,
 			AdditionalInfo: err.Error(),
 		})
 		return nil, err
 	} else {
-		aws.serviceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
 			Message: "Bucket List Permissions Available",
 			Status:  true,
 		})
@@ -2206,14 +2205,14 @@ func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, re
 		buf := manager.NewWriteAtBuffer([]byte{})
 		_, err := downloader.Download(context.TODO(), buf, getObj)
 		if err != nil {
-			aws.serviceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
+			aws.ServiceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
 				Message:        "Object Get Permissions Available",
 				Status:         false,
 				AdditionalInfo: err.Error(),
 			})
 			return nil, err
 		} else {
-			aws.serviceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
+			aws.ServiceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
 				Message: "Object Get Permissions Available",
 				Status:  true,
 			})
@@ -2289,7 +2288,7 @@ func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*models.Node) {
 }
 
 func (aws *AWS) ServiceAccountStatus() *models.ServiceAccountStatus {
-	return aws.serviceAccountChecks.GetStatus()
+	return aws.ServiceAccountChecks.GetStatus()
 }
 
 func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {

+ 4 - 4
pkg/cloud/awsprovider_test.go → pkg/cloud/aws/awsprovider_test.go

@@ -1,4 +1,4 @@
-package cloud
+package aws
 
 import (
 	"bytes"
@@ -64,7 +64,7 @@ func Test_awsKey_getUsageType(t *testing.T) {
 			name: "Karpenter label with a capacityType set to empty string should return empty string",
 			args: args{
 				labels: map[string]string{
-					KarpenterCapacityTypeLabel: "",
+					aws.KarpenterCapacityTypeLabel: "",
 				},
 			},
 			want: "",
@@ -73,7 +73,7 @@ func Test_awsKey_getUsageType(t *testing.T) {
 			name: "Karpenter label with capacityType set to a random value should return empty string",
 			args: args{
 				labels: map[string]string{
-					KarpenterCapacityTypeLabel: "TEST_ME",
+					aws.KarpenterCapacityTypeLabel: "TEST_ME",
 				},
 			},
 			want: "",
@@ -82,7 +82,7 @@ func Test_awsKey_getUsageType(t *testing.T) {
 			name: "Karpenter label with capacityType set to spot should return spot",
 			args: args{
 				labels: map[string]string{
-					KarpenterCapacityTypeLabel: KarpenterCapacitySpotTypeValue,
+					aws.KarpenterCapacityTypeLabel: aws.KarpenterCapacitySpotTypeValue,
 				},
 			},
 			want: PreemptibleType,

+ 5 - 0
pkg/cloud/csvprovider.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"regexp"
 	"strconv"
 	"strings"
 	"sync"
@@ -25,6 +26,10 @@ import (
 
 const refreshMinutes = 60
 
+var (
+	provIdRx = regexp.MustCompile("aws:///([^/]+)/([^/]+)")
+)
+
 type CSVProvider struct {
 	*CustomProvider
 	CSVLocation             string

+ 56 - 1
pkg/cloud/customprovider.go

@@ -14,6 +14,7 @@ import (
 	"github.com/opencost/opencost/pkg/env"
 	"github.com/opencost/opencost/pkg/kubecost"
 	"github.com/opencost/opencost/pkg/log"
+	"github.com/opencost/opencost/pkg/util"
 	"github.com/opencost/opencost/pkg/util/json"
 
 	v1 "k8s.io/api/core/v1"
@@ -38,6 +39,31 @@ type CustomProvider struct {
 	Config                  *ProviderConfig
 }
 
+var volTypes = map[string]string{
+	"EBS:VolumeUsage.gp2":    "gp2",
+	"EBS:VolumeUsage.gp3":    "gp3",
+	"EBS:VolumeUsage":        "standard",
+	"EBS:VolumeUsage.sc1":    "sc1",
+	"EBS:VolumeP-IOPS.piops": "io1",
+	"EBS:VolumeUsage.st1":    "st1",
+	"EBS:VolumeUsage.piops":  "io1",
+	"gp2":                    "EBS:VolumeUsage.gp2",
+	"gp3":                    "EBS:VolumeUsage.gp3",
+	"standard":               "EBS:VolumeUsage",
+	"sc1":                    "EBS:VolumeUsage.sc1",
+	"io1":                    "EBS:VolumeUsage.piops",
+	"st1":                    "EBS:VolumeUsage.st1",
+}
+
+type customPVKey struct {
+	Labels                 map[string]string
+	StorageClassParameters map[string]string
+	StorageClassName       string
+	Name                   string
+	DefaultRegion          string
+	ProviderID             string
+}
+
 // PricingSourceSummary returns the pricing source summary for the provider.
 // The summary represents what was _parsed_ from the pricing source, not what
 // was returned from the relevant API.
@@ -302,7 +328,7 @@ func (cp *CustomProvider) LoadBalancerPricing() (*models.LoadBalancer, error) {
 }
 
 func (*CustomProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) models.PVKey {
-	return &awsPVKey{
+	return &customPVKey{
 		Labels:                 pv.Labels,
 		StorageClassName:       pv.Spec.StorageClassName,
 		StorageClassParameters: parameters,
@@ -310,6 +336,35 @@ func (*CustomProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]s
 	}
 }
 
+func (key *customPVKey) ID() string {
+	return key.ProviderID
+}
+
+func (key *customPVKey) GetStorageClass() string {
+	return key.StorageClassName
+}
+
+// Features returns a comma separated string of features for a given PV
+// (@pokom): This was imported from aws which caused a cyclical dependency. This _should_ be refactored to be specific to a custom pvkey
+func (key *customPVKey) Features() string {
+	storageClass := key.StorageClassParameters["type"]
+	if storageClass == "standard" {
+		storageClass = "gp2"
+	}
+	// Storage class names are generally EBS volume types (gp2)
+	// Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
+	// Converts between the 2
+	region, ok := util.GetRegion(key.Labels)
+	if !ok {
+		region = key.DefaultRegion
+	}
+	class, ok := volTypes[storageClass]
+	if !ok {
+		log.Debugf("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
+	}
+	return region + "," + class
+}
+
 func (k *customProviderKey) GPUCount() int {
 	return 0
 }

+ 17 - 17
pkg/cloud/gcpprovider.go → pkg/cloud/gcp/gcpprovider.go

@@ -1,4 +1,4 @@
-package cloud
+package gcp
 
 import (
 	"context"
@@ -14,6 +14,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/opencost/opencost/pkg/cloud/aws"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 	"github.com/opencost/opencost/pkg/kubecost"
@@ -99,16 +100,15 @@ type GCP struct {
 	BillingDataDataset      string
 	DownloadPricingDataLock sync.RWMutex
 	ReservedInstances       []*GCPReservedInstance
-	Config                  *ProviderConfig
+	Config                  models.ProviderConfig
 	ServiceKeyProvided      bool
 	ValidPricingKeys        map[string]bool
-	metadataClient          *metadata.Client
+	MetadataClient          *metadata.Client
 	clusterManagementPrice  float64
-	clusterRegion           string
-	clusterAccountID        string
-	clusterProjectID        string
+	ClusterRegion           string
+	ClusterAccountID        string
+	ClusterProjectID        string
 	clusterProvisioner      string
-	*CustomProvider
 }
 
 type gcpAllocation struct {
@@ -266,8 +266,8 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*models.CustomPric
 				}
 				gcp.ServiceKeyProvided = true
 			}
-		} else if updateType == AthenaInfoUpdateType {
-			a := AwsAthenaInfo{}
+		} else if updateType == aws.AthenaInfoUpdateType {
+			a := aws.AwsAthenaInfo{}
 			err := json.NewDecoder(r).Decode(&a)
 			if err != nil {
 				return err
@@ -315,7 +315,7 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*models.CustomPric
 func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	remoteEnabled := env.IsRemoteEnabled()
 
-	attribute, err := gcp.metadataClient.InstanceAttributeValue("cluster-name")
+	attribute, err := gcp.MetadataClient.InstanceAttributeValue("cluster-name")
 	if err != nil {
 		log.Infof("Error loading metadata cluster-name: %s", err.Error())
 	}
@@ -336,9 +336,9 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m := make(map[string]string)
 	m["name"] = attribute
 	m["provider"] = kubecost.GCPProvider
-	m["region"] = gcp.clusterRegion
-	m["account"] = gcp.clusterAccountID
-	m["project"] = gcp.clusterProjectID
+	m["region"] = gcp.ClusterRegion
+	m["account"] = gcp.ClusterAccountID
+	m["project"] = gcp.ClusterProjectID
 	m["provisioner"] = gcp.clusterProvisioner
 	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
@@ -350,7 +350,7 @@ func (gcp *GCP) ClusterManagementPricing() (string, float64, error) {
 }
 
 func (gcp *GCP) getAllAddresses() (*compute.AddressAggregatedList, error) {
-	projID, err := gcp.metadataClient.ProjectID()
+	projID, err := gcp.MetadataClient.ProjectID()
 	if err != nil {
 		return nil, err
 	}
@@ -390,7 +390,7 @@ func (gcp *GCP) isAddressOrphaned(address *compute.Address) bool {
 }
 
 func (gcp *GCP) getAllDisks() (*compute.DiskAggregatedList, error) {
-	projID, err := gcp.metadataClient.ProjectID()
+	projID, err := gcp.MetadataClient.ProjectID()
 	if err != nil {
 		return nil, err
 	}
@@ -1600,7 +1600,7 @@ func sustainedUseDiscount(class string, defaultDiscount float64, isPreemptible b
 	return discount
 }
 
-func parseGCPProjectID(id string) string {
+func ParseGCPProjectID(id string) string {
 	// gce://guestbook-12345/...
 	//  => guestbook-12345
 	match := gceRegex.FindStringSubmatch(id)
@@ -1617,7 +1617,7 @@ func getUsageType(labels map[string]string) string {
 	} else if t, ok := labels[GKESpotLabel]; ok && t == "true" {
 		// https://cloud.google.com/kubernetes-engine/docs/concepts/spot-vms
 		return "preemptible"
-	} else if t, ok := labels[KarpenterCapacityTypeLabel]; ok && t == KarpenterCapacitySpotTypeValue {
+	} else if t, ok := labels[models.KarpenterCapacityTypeLabel]; ok && t == models.KarpenterCapacitySpotTypeValue {
 		return "preemptible"
 	}
 	return "ondemand"

+ 3 - 3
pkg/cloud/gcpprovider_test.go → pkg/cloud/gcp/gcpprovider_test.go

@@ -1,4 +1,4 @@
-package cloud
+package gcp
 
 import (
 	"bytes"
@@ -68,7 +68,7 @@ func TestParseGCPProjectID(t *testing.T) {
 	}
 
 	for _, test := range cases {
-		result := parseGCPProjectID(test.input)
+		result := ParseGCPProjectID(test.input)
 		if result != test.expected {
 			t.Errorf("Input: %s, Expected: %s, Actual: %s", test.input, test.expected, result)
 		}
@@ -94,7 +94,7 @@ func TestGetUsageType(t *testing.T) {
 		},
 		{
 			input: map[string]string{
-				KarpenterCapacityTypeLabel: KarpenterCapacitySpotTypeValue,
+				models.KarpenterCapacityTypeLabel: models.KarpenterCapacitySpotTypeValue,
 			},
 			expected: "preemptible",
 		},

+ 4 - 0
pkg/cloud/models/models.go

@@ -299,3 +299,7 @@ type ProviderConfig interface {
 	Update(func(*CustomPricing) error) (*CustomPricing, error)
 	UpdateFromMap(map[string]string) (*CustomPricing, error)
 }
+
+const KarpenterCapacityTypeLabel = "karpenter.sh/capacity-type"
+
+const KarpenterCapacitySpotTypeValue = "spot"

+ 12 - 13
pkg/cloud/provider.go

@@ -8,7 +8,9 @@ import (
 	"strings"
 	"time"
 
+	"github.com/opencost/opencost/pkg/cloud/aws"
 	"github.com/opencost/opencost/pkg/cloud/azure"
+	"github.com/opencost/opencost/pkg/cloud/gcp"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/kubecost"
 
@@ -26,9 +28,6 @@ import (
 	v1 "k8s.io/api/core/v1"
 )
 
-const KarpenterCapacityTypeLabel = "karpenter.sh/capacity-type"
-const KarpenterCapacitySpotTypeValue = "spot"
-
 // ClusterName returns the name defined in cluster info, defaulting to the
 // CLUSTER_ID environment variable
 func ClusterName(p models.Provider) string {
@@ -178,14 +177,14 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string, config *config.
 		if apiKey == "" {
 			return nil, errors.New("Supply a GCP Key to start getting data")
 		}
-		return &GCP{
+		return &gcp.GCP{
 			Clientset:        cache,
 			APIKey:           apiKey,
 			Config:           NewProviderConfig(config, cp.configFileName),
-			clusterRegion:    cp.region,
-			clusterAccountID: cp.accountID,
-			clusterProjectID: cp.projectID,
-			metadataClient: metadata.NewClient(
+			ClusterRegion:    cp.region,
+			ClusterAccountID: cp.accountID,
+			ClusterProjectID: cp.projectID,
+			MetadataClient: metadata.NewClient(
 				&http.Client{
 					Transport: httputil.NewUserAgentTransport("kubecost", &http.Transport{
 						Dial: (&net.Dialer{
@@ -198,12 +197,12 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string, config *config.
 		}, nil
 	case kubecost.AWSProvider:
 		log.Info("Found ProviderID starting with \"aws\", using AWS Provider")
-		return &AWS{
+		return &aws.AWS{
 			Clientset:            cache,
 			Config:               NewProviderConfig(config, cp.configFileName),
-			clusterRegion:        cp.region,
-			clusterAccountID:     cp.accountID,
-			serviceAccountChecks: models.NewServiceAccountChecks(),
+			ClusterRegion:        cp.region,
+			ClusterAccountID:     cp.accountID,
+			ServiceAccountChecks: models.NewServiceAccountChecks(),
 		}, nil
 	case kubecost.AzureProvider:
 		log.Info("Found ProviderID starting with \"azure\", using Azure Provider")
@@ -265,7 +264,7 @@ func getClusterProperties(node *v1.Node) clusterProperties {
 	if metadata.OnGCE() || strings.HasPrefix(providerID, "gce") {
 		cp.provider = kubecost.GCPProvider
 		cp.configFileName = "gcp.json"
-		cp.projectID = parseGCPProjectID(providerID)
+		cp.projectID = gcp.ParseGCPProjectID(providerID)
 	} else if strings.HasPrefix(providerID, "aws") {
 		cp.provider = kubecost.AWSProvider
 		cp.configFileName = "aws.json"

+ 6 - 4
pkg/costmodel/router.go

@@ -16,6 +16,8 @@ import (
 	"time"
 
 	"github.com/microcosm-cc/bluemonday"
+	"github.com/opencost/opencost/pkg/cloud/aws"
+	"github.com/opencost/opencost/pkg/cloud/gcp"
 	"github.com/opencost/opencost/pkg/config"
 	"github.com/opencost/opencost/pkg/kubeconfig"
 	"github.com/opencost/opencost/pkg/metrics"
@@ -30,7 +32,7 @@ import (
 
 	"github.com/julienschmidt/httprouter"
 
-	sentry "github.com/getsentry/sentry-go"
+	"github.com/getsentry/sentry-go"
 
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud/azure"
@@ -585,7 +587,7 @@ func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprou
 func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.SpotInfoUpdateType)
+	data, err := a.CloudProvider.UpdateConfig(r.Body, aws.SpotInfoUpdateType)
 	if err != nil {
 		w.Write(WrapData(data, err))
 		return
@@ -601,7 +603,7 @@ func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request,
 func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.AthenaInfoUpdateType)
+	data, err := a.CloudProvider.UpdateConfig(r.Body, aws.AthenaInfoUpdateType)
 	if err != nil {
 		w.Write(WrapData(data, err))
 		return
@@ -613,7 +615,7 @@ func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Reques
 func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.BigqueryUpdateType)
+	data, err := a.CloudProvider.UpdateConfig(r.Body, gcp.BigqueryUpdateType)
 	if err != nil {
 		w.Write(WrapData(data, err))
 		return