Ver código fonte

Merge branch 'develop' into patch-2

Matt Ray 3 anos atrás
pai
commit
fef1cb357a

+ 129 - 7
pkg/cloud/awsprovider.go

@@ -5,7 +5,6 @@ import (
 	"compress/gzip"
 	"context"
 	"encoding/csv"
-	"errors"
 	"fmt"
 	"io"
 	"net/http"
@@ -25,6 +24,7 @@ import (
 	"github.com/opencost/opencost/pkg/util"
 	"github.com/opencost/opencost/pkg/util/fileutil"
 	"github.com/opencost/opencost/pkg/util/json"
+	"github.com/opencost/opencost/pkg/util/timeutil"
 
 	awsSDK "github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/config"
@@ -51,6 +51,11 @@ const (
 	APIPricingSource              = "Public API"
 	SpotPricingSource             = "Spot Data Feed"
 	ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
+
+	InUseState    = "in-use"
+	AttachedState = "attached"
+
+	AWSHourlyPublicIPCost = 0.005
 )
 
 var (
@@ -1452,8 +1457,7 @@ func (aws *AWS) getAddressesForRegion(ctx context.Context, region string) (*ec2.
 	return cli.DescribeAddresses(ctx, &ec2.DescribeAddressesInput{})
 }
 
-// GetAddresses retrieves EC2 addresses
-func (aws *AWS) GetAddresses() ([]byte, error) {
+func (aws *AWS) getAllAddresses() ([]*ec2Types.Address, error) {
 	aws.ConfigureAuth() // load authentication data into env vars
 
 	addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
@@ -1509,6 +1513,16 @@ func (aws *AWS) GetAddresses() ([]byte, error) {
 		return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errs), errs)
 	}
 
+	return addresses, nil
+}
+
+// GetAddresses retrieves EC2 addresses
+func (aws *AWS) GetAddresses() ([]byte, error) {
+	addresses, err := aws.getAllAddresses()
+	if err != nil {
+		return nil, err
+	}
+
 	// Format the response this way to match the JSON-encoded formatting of a single response
 	// from DescribeAddresss, so that consumers can always expect AWS disk responses to have
 	// a "Addresss" key at the top level.
@@ -1517,6 +1531,14 @@ func (aws *AWS) GetAddresses() ([]byte, error) {
 	})
 }
 
+func (aws *AWS) isAddressOrphaned(address *ec2Types.Address) bool {
+	if address.AssociationId != nil {
+		return false
+	}
+
+	return true
+}
+
 func (aws *AWS) getDisksForRegion(ctx context.Context, region string, maxResults int32, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
 	aak, err := aws.GetAWSAccessKey()
 	if err != nil {
@@ -1535,8 +1557,7 @@ func (aws *AWS) getDisksForRegion(ctx context.Context, region string, maxResults
 	})
 }
 
-// GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
-func (aws *AWS) GetDisks() ([]byte, error) {
+func (aws *AWS) getAllDisks() ([]*ec2Types.Volume, error) {
 	aws.ConfigureAuth() // load authentication data into env vars
 
 	volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
@@ -1602,6 +1623,16 @@ func (aws *AWS) GetDisks() ([]byte, error) {
 		return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errs), errs)
 	}
 
+	return volumes, nil
+}
+
+// GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
+func (aws *AWS) GetDisks() ([]byte, error) {
+	volumes, err := aws.getAllDisks()
+	if err != nil {
+		return nil, err
+	}
+
 	// Format the response this way to match the JSON-encoded formatting of a single response
 	// from DescribeVolumes, so that consumers can always expect AWS disk responses to have
 	// a "Volumes" key at the top level.
@@ -1610,8 +1641,99 @@ func (aws *AWS) GetDisks() ([]byte, error) {
 	})
 }
 
-func (*AWS) GetOrphanedResources() ([]OrphanedResource, error) {
-	return nil, errors.New("not implemented")
+func (aws *AWS) isDiskOrphaned(vol *ec2Types.Volume) bool {
+	// Do not consider volume orphaned if in use
+	if vol.State == InUseState {
+		return false
+	}
+
+	// Do not consider volume orphaned if volume is attached to any attachments
+	if len(vol.Attachments) != 0 {
+		for _, attachment := range vol.Attachments {
+			if attachment.State == AttachedState {
+				return false
+			}
+		}
+	}
+
+	return true
+}
+
+func (aws *AWS) GetOrphanedResources() ([]OrphanedResource, error) {
+	volumes, err := aws.getAllDisks()
+	if err != nil {
+		return nil, err
+	}
+
+	addresses, err := aws.getAllAddresses()
+	if err != nil {
+		return nil, err
+	}
+
+	var orphanedResources []OrphanedResource
+
+	for _, volume := range volumes {
+		if aws.isDiskOrphaned(volume) {
+			cost, err := aws.findCostForDisk(volume)
+			if err != nil {
+				return nil, err
+			}
+
+			var volumeSize int64
+			if volume.Size != nil {
+				volumeSize = int64(*volume.Size)
+			}
+
+			or := OrphanedResource{
+				Kind:        "disk",
+				Region:      *volume.AvailabilityZone,
+				Size:        &volumeSize,
+				DiskName:    *volume.VolumeId,
+				MonthlyCost: cost,
+			}
+
+			orphanedResources = append(orphanedResources, or)
+		}
+	}
+
+	for _, address := range addresses {
+		if aws.isAddressOrphaned(address) {
+			cost := AWSHourlyPublicIPCost * timeutil.HoursPerMonth
+
+			or := OrphanedResource{
+				Kind:        "address",
+				Address:     *address.PublicIp,
+				MonthlyCost: &cost,
+			}
+
+			orphanedResources = append(orphanedResources, or)
+		}
+	}
+	return orphanedResources, nil
+}
+
+func (aws *AWS) findCostForDisk(disk *ec2Types.Volume) (*float64, error) {
+	//todo: use AWS pricing from all regions
+	if disk.AvailabilityZone == nil {
+		return nil, fmt.Errorf("nil region")
+	}
+	if disk.Size == nil {
+		return nil, fmt.Errorf("nil disk size")
+	}
+
+	class := volTypes[string(disk.VolumeType)]
+
+	key := "us-east-2" + "," + class
+
+	priceStr := aws.Pricing[key].PV.Cost
+
+	price, err := strconv.ParseFloat(priceStr, 64)
+	if err != nil {
+		return nil, err
+	}
+
+	cost := price * timeutil.HoursPerMonth * float64(*disk.Size)
+	return &cost, nil
 }
 
 // QueryAthenaPaginated executes athena query and processes results.

+ 8 - 3
pkg/cloud/azureprovider.go

@@ -1247,7 +1247,7 @@ func (az *Azure) getDisks() ([]*compute.Disk, error) {
 	return disks, nil
 }
 
-func isDiskOrphaned(disk *compute.Disk) bool {
+func (az *Azure) isDiskOrphaned(disk *compute.Disk) bool {
 	//TODO: needs better algorithm
 	return disk.DiskState == "Unattached" || disk.DiskState == "Reserved"
 }
@@ -1261,7 +1261,7 @@ func (az *Azure) GetOrphanedResources() ([]OrphanedResource, error) {
 	var orphanedResources []OrphanedResource
 
 	for _, d := range disks {
-		if isDiskOrphaned(d) {
+		if az.isDiskOrphaned(d) {
 			cost, err := az.findCostForDisk(d)
 			if err != nil {
 				return nil, err
@@ -1277,6 +1277,11 @@ func (az *Azure) GetOrphanedResources() ([]OrphanedResource, error) {
 				diskRegion = *d.Location
 			}
 
+			var diskSize int64
+			if d.DiskSizeGB != nil {
+				diskSize = int64(*d.DiskSizeGB)
+			}
+
 			or := OrphanedResource{
 				Kind:   "disk",
 				Region: diskRegion,
@@ -1284,7 +1289,7 @@ func (az *Azure) GetOrphanedResources() ([]OrphanedResource, error) {
 					"diskState":   string(d.DiskState),
 					"timeCreated": d.TimeCreated.String(),
 				},
-				Size:        d.DiskSizeGB,
+				Size:        &diskSize,
 				DiskName:    diskName,
 				MonthlyCost: &cost,
 			}

+ 154 - 6
pkg/cloud/gcpprovider.go

@@ -2,7 +2,6 @@ package cloud
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"io"
 	"math"
@@ -36,6 +35,14 @@ import (
 const GKE_GPU_TAG = "cloud.google.com/gke-accelerator"
 const BigqueryUpdateType = "bigqueryupdate"
 
+const (
+	GCPHourlyPublicIPCost = 0.01
+
+	GCPMonthlyBasicDiskCost = 0.04
+	GCPMonthlySSDDiskCost   = 0.17
+	GCPMonthlyGP2DiskCost   = 0.1
+)
+
 // List obtained by installing the `gcloud` CLI tool,
 // logging into gcp account, and running command
 // `gcloud compute regions list`
@@ -335,7 +342,7 @@ func (gcp *GCP) ClusterManagementPricing() (string, float64, error) {
 	return gcp.clusterProvisioner, gcp.clusterManagementPrice, nil
 }
 
-func (gcp *GCP) GetAddresses() ([]byte, error) {
+func (gcp *GCP) getAllAddresses() (*compute.AddressAggregatedList, error) {
 	projID, err := gcp.metadataClient.ProjectID()
 	if err != nil {
 		return nil, err
@@ -346,20 +353,36 @@ func (gcp *GCP) GetAddresses() ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
+
 	svc, err := compute.New(client)
 	if err != nil {
 		return nil, err
 	}
+
 	res, err := svc.Addresses.AggregatedList(projID).Do()
 
 	if err != nil {
 		return nil, err
 	}
+
+	return res, nil
+}
+
+func (gcp *GCP) GetAddresses() ([]byte, error) {
+	res, err := gcp.getAllAddresses()
+	if err != nil {
+		return nil, err
+	}
+
 	return json.Marshal(res)
 }
 
-// GetDisks returns the GCP disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
-func (gcp *GCP) GetDisks() ([]byte, error) {
+func (gcp *GCP) isAddressOrphaned(address *compute.Address) bool {
+	// Consider address orphaned if it has 0 users
+	return len(address.Users) == 0
+}
+
+func (gcp *GCP) getAllDisks() (*compute.DiskAggregatedList, error) {
 	projID, err := gcp.metadataClient.ProjectID()
 	if err != nil {
 		return nil, err
@@ -370,21 +393,146 @@ func (gcp *GCP) GetDisks() ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
+
 	svc, err := compute.New(client)
 	if err != nil {
 		return nil, err
 	}
+
 	res, err := svc.Disks.AggregatedList(projID).Do()
 
 	if err != nil {
 		return nil, err
 	}
+
+	return res, nil
+}
+
+// GetDisks returns the GCP disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
+func (gcp *GCP) GetDisks() ([]byte, error) {
+	res, err := gcp.getAllDisks()
+	if err != nil {
+		return nil, err
+	}
+
 	return json.Marshal(res)
+}
+
+func (gcp *GCP) isDiskOrphaned(disk *compute.Disk) (bool, error) {
+	// Do not consider disk orphaned if it has more than 0 users
+	if len(disk.Users) > 0 {
+		return false, nil
+	}
+
+	// Do not consider disk orphaned if it was used within the last hour
+	threshold := time.Now().Add(time.Duration(-1) * time.Hour)
+	if disk.LastDetachTimestamp != "" {
+		lastUsed, err := time.Parse(time.RFC3339, disk.LastDetachTimestamp)
+		if err != nil {
+			// This can return false since errors are checked before the bool
+			return false, fmt.Errorf("error parsing time: %s", err)
+		}
+		if threshold.Before(lastUsed) {
+			return false, nil
+		}
+	}
+	return true, nil
+}
+
+func (gcp *GCP) GetOrphanedResources() ([]OrphanedResource, error) {
+	disks, err := gcp.getAllDisks()
+	if err != nil {
+		return nil, err
+	}
+
+	addresses, err := gcp.getAllAddresses()
+	if err != nil {
+		return nil, err
+	}
+
+	var orphanedResources []OrphanedResource
+
+	for _, diskList := range disks.Items {
+		if len(diskList.Disks) == 0 {
+			continue
+		}
+
+		for _, disk := range diskList.Disks {
+			isOrphaned, err := gcp.isDiskOrphaned(disk)
+			if err != nil {
+				return nil, err
+			}
+			if isOrphaned {
+				cost, err := gcp.findCostForDisk(disk)
+				if err != nil {
+					return nil, err
+				}
 
+				or := OrphanedResource{
+					Kind:        "disk",
+					Region:      disk.Zone,
+					Description: map[string]string{},
+					Size:        &disk.SizeGb,
+					DiskName:    disk.Name,
+					MonthlyCost: cost,
+				}
+				orphanedResources = append(orphanedResources, or)
+			}
+		}
+	}
+
+	for _, addressList := range addresses.Items {
+		if len(addressList.Addresses) == 0 {
+			continue
+		}
+
+		for _, address := range addressList.Addresses {
+			if gcp.isAddressOrphaned(address) {
+				//todo: use GCP pricing
+				cost := GCPHourlyPublicIPCost * timeutil.HoursPerMonth
+
+				or := OrphanedResource{
+					Kind:   "address",
+					Region: address.Region,
+					Description: map[string]string{
+						"type": address.AddressType,
+					},
+					Address:     address.Address,
+					MonthlyCost: &cost,
+				}
+				orphanedResources = append(orphanedResources, or)
+			}
+		}
+	}
+
+	return orphanedResources, nil
 }
 
-func (*GCP) GetOrphanedResources() ([]OrphanedResource, error) {
-	return nil, errors.New("not implemented")
+func (gcp *GCP) findCostForDisk(disk *compute.Disk) (*float64, error) {
+	//todo: use GCP pricing struct
+	price := GCPMonthlyBasicDiskCost
+	if strings.Contains(disk.Type, "ssd") {
+		price = GCPMonthlySSDDiskCost
+	}
+	if strings.Contains(disk.Type, "gp2") {
+		price = GCPMonthlyGP2DiskCost
+	}
+	cost := price * float64(disk.SizeGb)
+
+	// This isn't much use but I (Nick) think its could be going down the
+	// right path. Disk region isnt returning anything (and if it did its
+	// a url, same with type). Currently the only region stored in the
+	// Pricing struct is uscentral-1, so that would need to be fixed
+	// key := disk.Region + "," + disk.Type
+
+	// priceStr := gcp.Pricing[key].PV.Cost
+	// price, err := strconv.ParseFloat(priceStr, 64)
+	// if err != nil {
+	// 	return nil, err
+	// }
+
+	// cost := price * timeutil.HoursPerMonth * float64(disk.SizeGb)
+	return &cost, nil
 }
 
 // GCPPricing represents GCP pricing data for a SKU

+ 1 - 1
pkg/cloud/provider.go

@@ -108,7 +108,7 @@ type OrphanedResource struct {
 	Kind        string            `json:"resourceKind"`
 	Region      string            `json:"region"`
 	Description map[string]string `json:"description"`
-	Size        *int32            `json:"diskSizeInGB,omitempty"`
+	Size        *int64            `json:"diskSizeInGB,omitempty"`
 	DiskName    string            `json:"diskName,omitempty"`
 	Address     string            `json:"ipAddress,omitempty"`
 	MonthlyCost *float64          `json:"monthlyCost"`

+ 1 - 0
ui/default.nginx.conf

@@ -58,6 +58,7 @@ server {
     add_header ETag "1.96.0";
     listen 9090;
     listen [::]:9090;
+    resolver 127.0.0.1 valid=5s;
     location /healthz {
         return 200 'OK';
     }