ソースを参照

Merge pull request #1902 from Pokom/refactor/cloud-aws-gcp-modules

refactor(cloud): Move aws and gcp to dedicated modules
Sean Holcomb 3 年 前
コミット
fa3c825a2b

+ 16 - 17
pkg/cloud/awsprovider.go → pkg/cloud/aws/awsprovider.go

@@ -1,4 +1,4 @@
-package cloud
+package aws
 
 import (
 	"bytes"
@@ -178,13 +178,12 @@ type AWS struct {
 	SpotDataPrefix              string
 	ProjectID                   string
 	DownloadPricingDataLock     sync.RWMutex
-	Config                      *ProviderConfig
-	serviceAccountChecks        *models.ServiceAccountChecks
+	Config                      models.ProviderConfig
+	ServiceAccountChecks        *models.ServiceAccountChecks
 	clusterManagementPrice      float64
-	clusterRegion               string
-	clusterAccountID            string
+	ClusterRegion               string
+	ClusterAccountID            string
 	clusterProvisioner          string
-	*CustomProvider
 }
 
 // AWSAccessKey holds AWS credentials and fulfils the awsV2.CredentialsProvider interface
@@ -650,7 +649,7 @@ func (k *awsKey) getUsageType(labels map[string]string) string {
 		// We currently write out spot instances as "preemptible" in the pricing data, so these need to match
 		return PreemptibleType
 	}
-	if kLabel, ok := labels[KarpenterCapacityTypeLabel]; ok && kLabel == KarpenterCapacitySpotTypeValue {
+	if kLabel, ok := labels[models.KarpenterCapacityTypeLabel]; ok && kLabel == models.KarpenterCapacitySpotTypeValue {
 		return PreemptibleType
 	}
 	return ""
@@ -1366,7 +1365,7 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 	m["name"] = clusterName
 	m["provider"] = kubecost.AWSProvider
 	m["account"] = clusterAccountID
-	m["region"] = awsProvider.clusterRegion
+	m["region"] = awsProvider.ClusterRegion
 	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(env.IsRemoteEnabled())
 	m["provisioner"] = awsProvider.clusterProvisioner
@@ -1403,7 +1402,7 @@ func (aws *AWS) getAWSAuth(forceReload bool, cp *models.CustomPricing) (string,
 
 	// 1. Check config values first (set from frontend UI)
 	if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
-		aws.serviceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
 			Message: "AWS ServiceKey exists",
 			Status:  true,
 		})
@@ -1413,7 +1412,7 @@ func (aws *AWS) getAWSAuth(forceReload bool, cp *models.CustomPricing) (string,
 	// 2. Check for secret
 	s, _ := aws.loadAWSAuthSecret(forceReload)
 	if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
-		aws.serviceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
 			Message: "AWS ServiceKey exists",
 			Status:  true,
 		})
@@ -1422,12 +1421,12 @@ func (aws *AWS) getAWSAuth(forceReload bool, cp *models.CustomPricing) (string,
 
 	// 3. Fall back to env vars
 	if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeySecret() == "" {
-		aws.serviceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
 			Message: "AWS ServiceKey exists",
 			Status:  false,
 		})
 	} else {
-		aws.serviceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("hasKey", &models.ServiceAccountCheck{
 			Message: "AWS ServiceKey exists",
 			Status:  true,
 		})
@@ -2154,14 +2153,14 @@ func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, re
 	}
 	lso, err := cli.ListObjects(context.TODO(), ls)
 	if err != nil {
-		aws.serviceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
 			Message:        "Bucket List Permissions Available",
 			Status:         false,
 			AdditionalInfo: err.Error(),
 		})
 		return nil, err
 	} else {
-		aws.serviceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
+		aws.ServiceAccountChecks.Set("bucketList", &models.ServiceAccountCheck{
 			Message: "Bucket List Permissions Available",
 			Status:  true,
 		})
@@ -2206,14 +2205,14 @@ func (aws *AWS) parseSpotData(bucket string, prefix string, projectID string, re
 		buf := manager.NewWriteAtBuffer([]byte{})
 		_, err := downloader.Download(context.TODO(), buf, getObj)
 		if err != nil {
-			aws.serviceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
+			aws.ServiceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
 				Message:        "Object Get Permissions Available",
 				Status:         false,
 				AdditionalInfo: err.Error(),
 			})
 			return nil, err
 		} else {
-			aws.serviceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
+			aws.ServiceAccountChecks.Set("objectList", &models.ServiceAccountCheck{
 				Message: "Object Get Permissions Available",
 				Status:  true,
 			})
@@ -2289,7 +2288,7 @@ func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*models.Node) {
 }
 
 func (aws *AWS) ServiceAccountStatus() *models.ServiceAccountStatus {
-	return aws.serviceAccountChecks.GetStatus()
+	return aws.ServiceAccountChecks.GetStatus()
 }
 
 func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {

+ 4 - 4
pkg/cloud/awsprovider_test.go → pkg/cloud/aws/awsprovider_test.go

@@ -1,4 +1,4 @@
-package cloud
+package aws
 
 import (
 	"bytes"
@@ -64,7 +64,7 @@ func Test_awsKey_getUsageType(t *testing.T) {
 			name: "Karpenter label with a capacityType set to empty string should return empty string",
 			args: args{
 				labels: map[string]string{
-					KarpenterCapacityTypeLabel: "",
+					aws.KarpenterCapacityTypeLabel: "",
 				},
 			},
 			want: "",
@@ -73,7 +73,7 @@ func Test_awsKey_getUsageType(t *testing.T) {
 			name: "Karpenter label with capacityType set to a random value should return empty string",
 			args: args{
 				labels: map[string]string{
-					KarpenterCapacityTypeLabel: "TEST_ME",
+					aws.KarpenterCapacityTypeLabel: "TEST_ME",
 				},
 			},
 			want: "",
@@ -82,7 +82,7 @@ func Test_awsKey_getUsageType(t *testing.T) {
 			name: "Karpenter label with capacityType set to spot should return spot",
 			args: args{
 				labels: map[string]string{
-					KarpenterCapacityTypeLabel: KarpenterCapacitySpotTypeValue,
+					aws.KarpenterCapacityTypeLabel: aws.KarpenterCapacitySpotTypeValue,
 				},
 			},
 			want: PreemptibleType,

+ 5 - 0
pkg/cloud/csvprovider.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"regexp"
 	"strconv"
 	"strings"
 	"sync"
@@ -25,6 +26,10 @@ import (
 
 const refreshMinutes = 60
 
+var (
+	provIdRx = regexp.MustCompile("aws:///([^/]+)/([^/]+)")
+)
+
 type CSVProvider struct {
 	*CustomProvider
 	CSVLocation             string

+ 56 - 1
pkg/cloud/customprovider.go

@@ -14,6 +14,7 @@ import (
 	"github.com/opencost/opencost/pkg/env"
 	"github.com/opencost/opencost/pkg/kubecost"
 	"github.com/opencost/opencost/pkg/log"
+	"github.com/opencost/opencost/pkg/util"
 	"github.com/opencost/opencost/pkg/util/json"
 
 	v1 "k8s.io/api/core/v1"
@@ -38,6 +39,31 @@ type CustomProvider struct {
 	Config                  *ProviderConfig
 }
 
+var volTypes = map[string]string{
+	"EBS:VolumeUsage.gp2":    "gp2",
+	"EBS:VolumeUsage.gp3":    "gp3",
+	"EBS:VolumeUsage":        "standard",
+	"EBS:VolumeUsage.sc1":    "sc1",
+	"EBS:VolumeP-IOPS.piops": "io1",
+	"EBS:VolumeUsage.st1":    "st1",
+	"EBS:VolumeUsage.piops":  "io1",
+	"gp2":                    "EBS:VolumeUsage.gp2",
+	"gp3":                    "EBS:VolumeUsage.gp3",
+	"standard":               "EBS:VolumeUsage",
+	"sc1":                    "EBS:VolumeUsage.sc1",
+	"io1":                    "EBS:VolumeUsage.piops",
+	"st1":                    "EBS:VolumeUsage.st1",
+}
+
+type customPVKey struct {
+	Labels                 map[string]string
+	StorageClassParameters map[string]string
+	StorageClassName       string
+	Name                   string
+	DefaultRegion          string
+	ProviderID             string
+}
+
 // PricingSourceSummary returns the pricing source summary for the provider.
 // The summary represents what was _parsed_ from the pricing source, not what
 // was returned from the relevant API.
@@ -302,7 +328,7 @@ func (cp *CustomProvider) LoadBalancerPricing() (*models.LoadBalancer, error) {
 }
 
 func (*CustomProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) models.PVKey {
-	return &awsPVKey{
+	return &customPVKey{
 		Labels:                 pv.Labels,
 		StorageClassName:       pv.Spec.StorageClassName,
 		StorageClassParameters: parameters,
@@ -310,6 +336,35 @@ func (*CustomProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]s
 	}
 }
 
+func (key *customPVKey) ID() string {
+	return key.ProviderID
+}
+
+func (key *customPVKey) GetStorageClass() string {
+	return key.StorageClassName
+}
+
+// Features returns a comma separated string of features for a given PV
+// (@pokom): This was imported from aws which caused a cyclical dependency. This _should_ be refactored to be specific to a custom pvkey
+func (key *customPVKey) Features() string {
+	storageClass := key.StorageClassParameters["type"]
+	if storageClass == "standard" {
+		storageClass = "gp2"
+	}
+	// Storage class names are generally EBS volume types (gp2)
+	// Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
+	// Converts between the 2
+	region, ok := util.GetRegion(key.Labels)
+	if !ok {
+		region = key.DefaultRegion
+	}
+	class, ok := volTypes[storageClass]
+	if !ok {
+		log.Debugf("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
+	}
+	return region + "," + class
+}
+
 func (k *customProviderKey) GPUCount() int {
 	return 0
 }

+ 17 - 17
pkg/cloud/gcpprovider.go → pkg/cloud/gcp/gcpprovider.go

@@ -1,4 +1,4 @@
-package cloud
+package gcp
 
 import (
 	"context"
@@ -14,6 +14,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/opencost/opencost/pkg/cloud/aws"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/utils"
 	"github.com/opencost/opencost/pkg/kubecost"
@@ -99,16 +100,15 @@ type GCP struct {
 	BillingDataDataset      string
 	DownloadPricingDataLock sync.RWMutex
 	ReservedInstances       []*GCPReservedInstance
-	Config                  *ProviderConfig
+	Config                  models.ProviderConfig
 	ServiceKeyProvided      bool
 	ValidPricingKeys        map[string]bool
-	metadataClient          *metadata.Client
+	MetadataClient          *metadata.Client
 	clusterManagementPrice  float64
-	clusterRegion           string
-	clusterAccountID        string
-	clusterProjectID        string
+	ClusterRegion           string
+	ClusterAccountID        string
+	ClusterProjectID        string
 	clusterProvisioner      string
-	*CustomProvider
 }
 
 type gcpAllocation struct {
@@ -266,8 +266,8 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*models.CustomPric
 				}
 				gcp.ServiceKeyProvided = true
 			}
-		} else if updateType == AthenaInfoUpdateType {
-			a := AwsAthenaInfo{}
+		} else if updateType == aws.AthenaInfoUpdateType {
+			a := aws.AwsAthenaInfo{}
 			err := json.NewDecoder(r).Decode(&a)
 			if err != nil {
 				return err
@@ -315,7 +315,7 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*models.CustomPric
 func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	remoteEnabled := env.IsRemoteEnabled()
 
-	attribute, err := gcp.metadataClient.InstanceAttributeValue("cluster-name")
+	attribute, err := gcp.MetadataClient.InstanceAttributeValue("cluster-name")
 	if err != nil {
 		log.Infof("Error loading metadata cluster-name: %s", err.Error())
 	}
@@ -336,9 +336,9 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m := make(map[string]string)
 	m["name"] = attribute
 	m["provider"] = kubecost.GCPProvider
-	m["region"] = gcp.clusterRegion
-	m["account"] = gcp.clusterAccountID
-	m["project"] = gcp.clusterProjectID
+	m["region"] = gcp.ClusterRegion
+	m["account"] = gcp.ClusterAccountID
+	m["project"] = gcp.ClusterProjectID
 	m["provisioner"] = gcp.clusterProvisioner
 	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
@@ -350,7 +350,7 @@ func (gcp *GCP) ClusterManagementPricing() (string, float64, error) {
 }
 
 func (gcp *GCP) getAllAddresses() (*compute.AddressAggregatedList, error) {
-	projID, err := gcp.metadataClient.ProjectID()
+	projID, err := gcp.MetadataClient.ProjectID()
 	if err != nil {
 		return nil, err
 	}
@@ -390,7 +390,7 @@ func (gcp *GCP) isAddressOrphaned(address *compute.Address) bool {
 }
 
 func (gcp *GCP) getAllDisks() (*compute.DiskAggregatedList, error) {
-	projID, err := gcp.metadataClient.ProjectID()
+	projID, err := gcp.MetadataClient.ProjectID()
 	if err != nil {
 		return nil, err
 	}
@@ -1600,7 +1600,7 @@ func sustainedUseDiscount(class string, defaultDiscount float64, isPreemptible b
 	return discount
 }
 
-func parseGCPProjectID(id string) string {
+func ParseGCPProjectID(id string) string {
 	// gce://guestbook-12345/...
 	//  => guestbook-12345
 	match := gceRegex.FindStringSubmatch(id)
@@ -1617,7 +1617,7 @@ func getUsageType(labels map[string]string) string {
 	} else if t, ok := labels[GKESpotLabel]; ok && t == "true" {
 		// https://cloud.google.com/kubernetes-engine/docs/concepts/spot-vms
 		return "preemptible"
-	} else if t, ok := labels[KarpenterCapacityTypeLabel]; ok && t == KarpenterCapacitySpotTypeValue {
+	} else if t, ok := labels[models.KarpenterCapacityTypeLabel]; ok && t == models.KarpenterCapacitySpotTypeValue {
 		return "preemptible"
 	}
 	return "ondemand"

+ 3 - 3
pkg/cloud/gcpprovider_test.go → pkg/cloud/gcp/gcpprovider_test.go

@@ -1,4 +1,4 @@
-package cloud
+package gcp
 
 import (
 	"bytes"
@@ -68,7 +68,7 @@ func TestParseGCPProjectID(t *testing.T) {
 	}
 
 	for _, test := range cases {
-		result := parseGCPProjectID(test.input)
+		result := ParseGCPProjectID(test.input)
 		if result != test.expected {
 			t.Errorf("Input: %s, Expected: %s, Actual: %s", test.input, test.expected, result)
 		}
@@ -94,7 +94,7 @@ func TestGetUsageType(t *testing.T) {
 		},
 		{
 			input: map[string]string{
-				KarpenterCapacityTypeLabel: KarpenterCapacitySpotTypeValue,
+				models.KarpenterCapacityTypeLabel: models.KarpenterCapacitySpotTypeValue,
 			},
 			expected: "preemptible",
 		},

+ 5 - 3
pkg/cloud/models/models.go

@@ -20,9 +20,11 @@ var (
 )
 
 const (
-	AuthSecretPath          = "/var/secrets/service-key.json"
-	StorageConfigSecretPath = "/var/azure-storage-config/azure-storage-config.json"
-	DefaultShareTenancyCost = "true"
+	AuthSecretPath                 = "/var/secrets/service-key.json"
+	StorageConfigSecretPath        = "/var/azure-storage-config/azure-storage-config.json"
+	DefaultShareTenancyCost        = "true"
+	KarpenterCapacityTypeLabel     = "karpenter.sh/capacity-type"
+	KarpenterCapacitySpotTypeValue = "spot"
 )
 
 // ReservedInstanceData keeps record of resources on a node should be

+ 12 - 13
pkg/cloud/provider.go

@@ -8,7 +8,9 @@ import (
 	"strings"
 	"time"
 
+	"github.com/opencost/opencost/pkg/cloud/aws"
 	"github.com/opencost/opencost/pkg/cloud/azure"
+	"github.com/opencost/opencost/pkg/cloud/gcp"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/kubecost"
 
@@ -26,9 +28,6 @@ import (
 	v1 "k8s.io/api/core/v1"
 )
 
-const KarpenterCapacityTypeLabel = "karpenter.sh/capacity-type"
-const KarpenterCapacitySpotTypeValue = "spot"
-
 // ClusterName returns the name defined in cluster info, defaulting to the
 // CLUSTER_ID environment variable
 func ClusterName(p models.Provider) string {
@@ -178,14 +177,14 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string, config *config.
 		if apiKey == "" {
 			return nil, errors.New("Supply a GCP Key to start getting data")
 		}
-		return &GCP{
+		return &gcp.GCP{
 			Clientset:        cache,
 			APIKey:           apiKey,
 			Config:           NewProviderConfig(config, cp.configFileName),
-			clusterRegion:    cp.region,
-			clusterAccountID: cp.accountID,
-			clusterProjectID: cp.projectID,
-			metadataClient: metadata.NewClient(
+			ClusterRegion:    cp.region,
+			ClusterAccountID: cp.accountID,
+			ClusterProjectID: cp.projectID,
+			MetadataClient: metadata.NewClient(
 				&http.Client{
 					Transport: httputil.NewUserAgentTransport("kubecost", &http.Transport{
 						Dial: (&net.Dialer{
@@ -198,12 +197,12 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string, config *config.
 		}, nil
 	case kubecost.AWSProvider:
 		log.Info("Found ProviderID starting with \"aws\", using AWS Provider")
-		return &AWS{
+		return &aws.AWS{
 			Clientset:            cache,
 			Config:               NewProviderConfig(config, cp.configFileName),
-			clusterRegion:        cp.region,
-			clusterAccountID:     cp.accountID,
-			serviceAccountChecks: models.NewServiceAccountChecks(),
+			ClusterRegion:        cp.region,
+			ClusterAccountID:     cp.accountID,
+			ServiceAccountChecks: models.NewServiceAccountChecks(),
 		}, nil
 	case kubecost.AzureProvider:
 		log.Info("Found ProviderID starting with \"azure\", using Azure Provider")
@@ -265,7 +264,7 @@ func getClusterProperties(node *v1.Node) clusterProperties {
 	if metadata.OnGCE() || strings.HasPrefix(providerID, "gce") {
 		cp.provider = kubecost.GCPProvider
 		cp.configFileName = "gcp.json"
-		cp.projectID = parseGCPProjectID(providerID)
+		cp.projectID = gcp.ParseGCPProjectID(providerID)
 	} else if strings.HasPrefix(providerID, "aws") {
 		cp.provider = kubecost.AWSProvider
 		cp.configFileName = "aws.json"

+ 6 - 4
pkg/costmodel/router.go

@@ -16,6 +16,8 @@ import (
 	"time"
 
 	"github.com/microcosm-cc/bluemonday"
+	"github.com/opencost/opencost/pkg/cloud/aws"
+	"github.com/opencost/opencost/pkg/cloud/gcp"
 	"github.com/opencost/opencost/pkg/config"
 	"github.com/opencost/opencost/pkg/kubeconfig"
 	"github.com/opencost/opencost/pkg/metrics"
@@ -30,7 +32,7 @@ import (
 
 	"github.com/julienschmidt/httprouter"
 
-	sentry "github.com/getsentry/sentry-go"
+	"github.com/getsentry/sentry-go"
 
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud/azure"
@@ -585,7 +587,7 @@ func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprou
 func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.SpotInfoUpdateType)
+	data, err := a.CloudProvider.UpdateConfig(r.Body, aws.SpotInfoUpdateType)
 	if err != nil {
 		w.Write(WrapData(data, err))
 		return
@@ -601,7 +603,7 @@ func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request,
 func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.AthenaInfoUpdateType)
+	data, err := a.CloudProvider.UpdateConfig(r.Body, aws.AthenaInfoUpdateType)
 	if err != nil {
 		w.Write(WrapData(data, err))
 		return
@@ -613,7 +615,7 @@ func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Reques
 func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	data, err := a.CloudProvider.UpdateConfig(r.Body, cloud.BigqueryUpdateType)
+	data, err := a.CloudProvider.UpdateConfig(r.Body, gcp.BigqueryUpdateType)
 	if err != nil {
 		w.Write(WrapData(data, err))
 		return