| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344 |
- package cloud
- import (
- "bytes"
- "compress/gzip"
- "context"
- "encoding/csv"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "os"
- "regexp"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/aws/aws-sdk-go/aws/endpoints"
- "k8s.io/klog"
- "github.com/kubecost/cost-model/pkg/clustercache"
- "github.com/kubecost/cost-model/pkg/env"
- "github.com/kubecost/cost-model/pkg/errors"
- "github.com/kubecost/cost-model/pkg/log"
- "github.com/kubecost/cost-model/pkg/util"
- "github.com/kubecost/cost-model/pkg/util/cloudutil"
- "github.com/kubecost/cost-model/pkg/util/fileutil"
- "github.com/kubecost/cost-model/pkg/util/json"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/aws/aws-sdk-go/aws/credentials"
- "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/athena"
- "github.com/aws/aws-sdk-go/service/ec2"
- "github.com/aws/aws-sdk-go/service/s3"
- "github.com/aws/aws-sdk-go/service/s3/s3manager"
- awsV2 "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/jszwec/csvutil"
- v1 "k8s.io/api/core/v1"
- )
- const supportedSpotFeedVersion = "1"
- const SpotInfoUpdateType = "spotinfo"
- const AthenaInfoUpdateType = "athenainfo"
- const PreemptibleType = "preemptible"
- const APIPricingSource = "Public API"
- const SpotPricingSource = "Spot Data Feed"
- const ReservedInstancePricingSource = "Savings Plan, Reserved Instance, and Out-Of-Cluster"
- func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
- sources := make(map[string]*PricingSource)
- sps := &PricingSource{
- Name: SpotPricingSource,
- }
- sps.Error = ""
- if aws.SpotPricingError != nil {
- sps.Error = aws.SpotPricingError.Error()
- }
- if sps.Error != "" {
- sps.Available = false
- } else if len(aws.SpotPricingByInstanceID) > 0 {
- sps.Available = true
- } else {
- sps.Error = "No spot instances detected"
- }
- sources[SpotPricingSource] = sps
- rps := &PricingSource{
- Name: ReservedInstancePricingSource,
- }
- rps.Error = ""
- if aws.RIPricingError != nil {
- rps.Error = aws.RIPricingError.Error()
- }
- if rps.Error != "" {
- rps.Available = false
- } else {
- rps.Available = true
- }
- sources[ReservedInstancePricingSource] = rps
- return sources
- }
- // How often spot data is refreshed
- const SpotRefreshDuration = 15 * time.Minute
- const defaultConfigPath = "/var/configs/"
- var awsRegions = []string{
- "us-east-2",
- "us-east-1",
- "us-west-1",
- "us-west-2",
- "ap-east-1",
- "ap-south-1",
- "ap-northeast-3",
- "ap-northeast-2",
- "ap-southeast-1",
- "ap-southeast-2",
- "ap-northeast-1",
- "ca-central-1",
- "cn-north-1",
- "cn-northwest-1",
- "eu-central-1",
- "eu-west-1",
- "eu-west-2",
- "eu-west-3",
- "eu-north-1",
- "me-south-1",
- "sa-east-1",
- "us-gov-east-1",
- "us-gov-west-1",
- }
- // AWS represents an Amazon Provider
- type AWS struct {
- Pricing map[string]*AWSProductTerms
- SpotPricingByInstanceID map[string]*spotInfo
- SpotPricingUpdatedAt *time.Time
- SpotRefreshRunning bool
- SpotPricingLock sync.RWMutex
- SpotPricingError error
- RIPricingByInstanceID map[string]*RIData
- RIPricingError error
- RIDataRunning bool
- RIDataLock sync.RWMutex
- SavingsPlanDataByInstanceID map[string]*SavingsPlanData
- SavingsPlanDataRunning bool
- SavingsPlanDataLock sync.RWMutex
- ValidPricingKeys map[string]bool
- Clientset clustercache.ClusterCache
- BaseCPUPrice string
- BaseRAMPrice string
- BaseGPUPrice string
- BaseSpotCPUPrice string
- BaseSpotRAMPrice string
- BaseSpotGPUPrice string
- SpotLabelName string
- SpotLabelValue string
- SpotDataRegion string
- SpotDataBucket string
- SpotDataPrefix string
- ProjectID string
- DownloadPricingDataLock sync.RWMutex
- Config *ProviderConfig
- ServiceAccountChecks map[string]*ServiceAccountCheck
- clusterManagementPrice float64
- clusterAccountId string
- clusterRegion string
- clusterProvisioner string
- *CustomProvider
- }
- type AWSAccessKey struct {
- AccessKeyID string `json:"aws_access_key_id"`
- SecretAccessKey string `json:"aws_secret_access_key"`
- }
- // Retrieve returns a set of awsV2 credentials using the AWSAccessKey's key and secret.
- // This fullfils the awsV2.CredentialsProvider interface contract.
- func (accessKey AWSAccessKey) Retrieve(ctx context.Context) (awsV2.Credentials, error) {
- return awsV2.Credentials{
- AccessKeyID: accessKey.AccessKeyID,
- SecretAccessKey: accessKey.SecretAccessKey,
- }, nil
- }
- // AWSPricing maps a k8s node to an AWS Pricing "product"
- type AWSPricing struct {
- Products map[string]*AWSProduct `json:"products"`
- Terms AWSPricingTerms `json:"terms"`
- }
- // AWSProduct represents a purchased SKU
- type AWSProduct struct {
- Sku string `json:"sku"`
- Attributes AWSProductAttributes `json:"attributes"`
- }
- // AWSProductAttributes represents metadata about the product used to map to a node.
- type AWSProductAttributes struct {
- Location string `json:"location"`
- InstanceType string `json:"instanceType"`
- Memory string `json:"memory"`
- Storage string `json:"storage"`
- VCpu string `json:"vcpu"`
- UsageType string `json:"usagetype"`
- OperatingSystem string `json:"operatingSystem"`
- PreInstalledSw string `json:"preInstalledSw"`
- InstanceFamily string `json:"instanceFamily"`
- CapacityStatus string `json:"capacitystatus"`
- GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
- }
- // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
- type AWSPricingTerms struct {
- OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
- Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
- }
- // AWSOfferTerm is a sku extension used to pay for the node.
- type AWSOfferTerm struct {
- Sku string `json:"sku"`
- PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
- }
- func (ot *AWSOfferTerm) String() string {
- var strs []string
- for k, rc := range ot.PriceDimensions {
- strs = append(strs, fmt.Sprintf("%s:%s", k, rc.String()))
- }
- return fmt.Sprintf("%s:%s", ot.Sku, strings.Join(strs, ","))
- }
- // AWSRateCode encodes data about the price of a product
- type AWSRateCode struct {
- Unit string `json:"unit"`
- PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
- }
- func (rc *AWSRateCode) String() string {
- return fmt.Sprintf("{unit: %s, pricePerUnit: %v", rc.Unit, rc.PricePerUnit)
- }
- // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
- type AWSCurrencyCode struct {
- USD string `json:"USD,omitempty"`
- CNY string `json:"CNY,omitempty"`
- }
- // AWSProductTerms represents the full terms of the product
- type AWSProductTerms struct {
- Sku string `json:"sku"`
- OnDemand *AWSOfferTerm `json:"OnDemand"`
- Reserved *AWSOfferTerm `json:"Reserved"`
- Memory string `json:"memory"`
- Storage string `json:"storage"`
- VCpu string `json:"vcpu"`
- GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
- PV *PV `json:"pv"`
- }
- // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
- const ClusterIdEnvVar = "AWS_CLUSTER_ID"
- // OnDemandRateCode is appended to an node sku
- const OnDemandRateCode = ".JRTCKXETXF"
- const OnDemandRateCodeCn = ".99YE2YK9UR"
- // ReservedRateCode is appended to a node sku
- const ReservedRateCode = ".38NPMPTW36"
- // HourlyRateCode is appended to a node sku
- const HourlyRateCode = ".6YS6EN2CT7"
- const HourlyRateCodeCn = ".Q7UJUT2CE6"
- // volTypes are used to map between AWS UsageTypes and
- // EBS volume types, as they would appear in K8s storage class
- // name and the EC2 API.
- var volTypes = map[string]string{
- "EBS:VolumeUsage.gp2": "gp2",
- "EBS:VolumeUsage": "standard",
- "EBS:VolumeUsage.sc1": "sc1",
- "EBS:VolumeP-IOPS.piops": "io1",
- "EBS:VolumeUsage.st1": "st1",
- "EBS:VolumeUsage.piops": "io1",
- "gp2": "EBS:VolumeUsage.gp2",
- "standard": "EBS:VolumeUsage",
- "sc1": "EBS:VolumeUsage.sc1",
- "io1": "EBS:VolumeUsage.piops",
- "st1": "EBS:VolumeUsage.st1",
- }
- // locationToRegion maps AWS region names (As they come from Billing)
- // to actual region identifiers
- var locationToRegion = map[string]string{
- "US East (Ohio)": "us-east-2",
- "US East (N. Virginia)": "us-east-1",
- "US West (N. California)": "us-west-1",
- "US West (Oregon)": "us-west-2",
- "Asia Pacific (Hong Kong)": "ap-east-1",
- "Asia Pacific (Mumbai)": "ap-south-1",
- "Asia Pacific (Osaka-Local)": "ap-northeast-3",
- "Asia Pacific (Seoul)": "ap-northeast-2",
- "Asia Pacific (Singapore)": "ap-southeast-1",
- "Asia Pacific (Sydney)": "ap-southeast-2",
- "Asia Pacific (Tokyo)": "ap-northeast-1",
- "Canada (Central)": "ca-central-1",
- "China (Beijing)": "cn-north-1",
- "China (Ningxia)": "cn-northwest-1",
- "EU (Frankfurt)": "eu-central-1",
- "EU (Ireland)": "eu-west-1",
- "EU (London)": "eu-west-2",
- "EU (Paris)": "eu-west-3",
- "EU (Stockholm)": "eu-north-1",
- "South America (Sao Paulo)": "sa-east-1",
- "AWS GovCloud (US-East)": "us-gov-east-1",
- "AWS GovCloud (US-West)": "us-gov-west-1",
- }
- var regionToBillingRegionCode = map[string]string{
- "us-east-2": "USE2",
- "us-east-1": "",
- "us-west-1": "USW1",
- "us-west-2": "USW2",
- "ap-east-1": "APE1",
- "ap-south-1": "APS3",
- "ap-northeast-3": "APN3",
- "ap-northeast-2": "APN2",
- "ap-southeast-1": "APS1",
- "ap-southeast-2": "APS2",
- "ap-northeast-1": "APN1",
- "ca-central-1": "CAN1",
- "cn-north-1": "",
- "cn-northwest-1": "",
- "eu-central-1": "EUC1",
- "eu-west-1": "EU",
- "eu-west-2": "EUW2",
- "eu-west-3": "EUW3",
- "eu-north-1": "EUN1",
- "sa-east-1": "SAE1",
- "us-gov-east-1": "UGE1",
- "us-gov-west-1": "UGW1",
- }
- var loadedAWSSecret bool = false
- var awsSecret *AWSAccessKey = nil
- func (aws *AWS) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
- return ""
- }
- // KubeAttrConversion maps the k8s labels for region to an aws region
- func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
- operatingSystem = strings.ToLower(operatingSystem)
- region := locationToRegion[location]
- return region + "," + instanceType + "," + operatingSystem
- }
- type AwsSpotFeedInfo struct {
- BucketName string `json:"bucketName"`
- Prefix string `json:"prefix"`
- Region string `json:"region"`
- AccountID string `json:"projectID"`
- ServiceKeyName string `json:"serviceKeyName"`
- ServiceKeySecret string `json:"serviceKeySecret"`
- SpotLabel string `json:"spotLabel"`
- SpotLabelValue string `json:"spotLabelValue"`
- }
- type AwsAthenaInfo struct {
- AthenaBucketName string `json:"athenaBucketName"`
- AthenaRegion string `json:"athenaRegion"`
- AthenaDatabase string `json:"athenaDatabase"`
- AthenaTable string `json:"athenaTable"`
- ServiceKeyName string `json:"serviceKeyName"`
- ServiceKeySecret string `json:"serviceKeySecret"`
- AccountID string `json:"projectID"`
- MasterPayerARN string `json:"masterPayerARN"`
- }
- func (aws *AWS) GetManagementPlatform() (string, error) {
- nodes := aws.Clientset.GetAllNodes()
- if len(nodes) > 0 {
- n := nodes[0]
- version := n.Status.NodeInfo.KubeletVersion
- if strings.Contains(version, "eks") {
- return "eks", nil
- }
- if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
- return "kops", nil
- }
- }
- return "", nil
- }
- func (aws *AWS) GetConfig() (*CustomPricing, error) {
- c, err := aws.Config.GetCustomPricingData()
- if err != nil {
- return nil, err
- }
- if c.Discount == "" {
- c.Discount = "0%"
- }
- if c.NegotiatedDiscount == "" {
- c.NegotiatedDiscount = "0%"
- }
- if c.ShareTenancyCosts == "" {
- c.ShareTenancyCosts = defaultShareTenancyCost
- }
- return c, nil
- }
- func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
- return aws.Config.UpdateFromMap(a)
- }
- func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
- return aws.Config.Update(func(c *CustomPricing) error {
- if updateType == SpotInfoUpdateType {
- a := AwsSpotFeedInfo{}
- err := json.NewDecoder(r).Decode(&a)
- if err != nil {
- return err
- }
- c.ServiceKeyName = a.ServiceKeyName
- if a.ServiceKeySecret != "" {
- c.ServiceKeySecret = a.ServiceKeySecret
- }
- c.SpotDataPrefix = a.Prefix
- c.SpotDataBucket = a.BucketName
- c.ProjectID = a.AccountID
- c.SpotDataRegion = a.Region
- c.SpotLabel = a.SpotLabel
- c.SpotLabelValue = a.SpotLabelValue
- } else if updateType == AthenaInfoUpdateType {
- a := AwsAthenaInfo{}
- err := json.NewDecoder(r).Decode(&a)
- if err != nil {
- return err
- }
- c.AthenaBucketName = a.AthenaBucketName
- c.AthenaRegion = a.AthenaRegion
- c.AthenaDatabase = a.AthenaDatabase
- c.AthenaTable = a.AthenaTable
- c.ServiceKeyName = a.ServiceKeyName
- if a.ServiceKeySecret != "" {
- c.ServiceKeySecret = a.ServiceKeySecret
- }
- if a.MasterPayerARN != "" {
- c.MasterPayerARN = a.MasterPayerARN
- }
- c.AthenaProjectID = a.AccountID
- } else {
- a := make(map[string]interface{})
- err := json.NewDecoder(r).Decode(&a)
- if err != nil {
- return err
- }
- for k, v := range a {
- kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
- vstr, ok := v.(string)
- if ok {
- err := SetCustomPricingField(c, kUpper, vstr)
- if err != nil {
- return err
- }
- } else {
- return fmt.Errorf("type error while updating config for %s", kUpper)
- }
- }
- }
- if env.IsRemoteEnabled() {
- err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
- if err != nil {
- return err
- }
- }
- return nil
- })
- }
- type awsKey struct {
- SpotLabelName string
- SpotLabelValue string
- Labels map[string]string
- ProviderID string
- }
- func (k *awsKey) GPUType() string {
- return ""
- }
- func (k *awsKey) ID() string {
- provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
- for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
- if matchNum == 2 {
- return group
- }
- }
- klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
- return ""
- }
- func (k *awsKey) Features() string {
- instanceType, _ := util.GetInstanceType(k.Labels)
- operatingSystem, _ := util.GetOperatingSystem(k.Labels)
- region, _ := util.GetRegion(k.Labels)
- key := region + "," + instanceType + "," + operatingSystem
- usageType := PreemptibleType
- spotKey := key + "," + usageType
- if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
- return spotKey
- }
- if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
- return spotKey
- }
- return key
- }
- func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
- pricing, ok := aws.Pricing[pvk.Features()]
- if !ok {
- klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
- return &PV{}, nil
- }
- return pricing.PV, nil
- }
- type awsPVKey struct {
- Labels map[string]string
- StorageClassParameters map[string]string
- StorageClassName string
- Name string
- DefaultRegion string
- ProviderID string
- }
- func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
- providerID := ""
- if pv.Spec.AWSElasticBlockStore != nil {
- providerID = pv.Spec.AWSElasticBlockStore.VolumeID
- } else if pv.Spec.CSI != nil {
- providerID = pv.Spec.CSI.VolumeHandle
- }
- return &awsPVKey{
- Labels: pv.Labels,
- StorageClassName: pv.Spec.StorageClassName,
- StorageClassParameters: parameters,
- Name: pv.Name,
- DefaultRegion: defaultRegion,
- ProviderID: providerID,
- }
- }
- func (key *awsPVKey) ID() string {
- return key.ProviderID
- }
- func (key *awsPVKey) GetStorageClass() string {
- return key.StorageClassName
- }
- func (key *awsPVKey) Features() string {
- storageClass := key.StorageClassParameters["type"]
- if storageClass == "standard" {
- storageClass = "gp2"
- }
- // Storage class names are generally EBS volume types (gp2)
- // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
- // Converts between the 2
- region, ok := util.GetRegion(key.Labels)
- if !ok {
- region = key.DefaultRegion
- }
- class, ok := volTypes[storageClass]
- if !ok {
- klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
- }
- return region + "," + class
- }
- // GetKey maps node labels to information needed to retrieve pricing data
- func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
- return &awsKey{
- SpotLabelName: aws.SpotLabelName,
- SpotLabelValue: aws.SpotLabelValue,
- Labels: labels,
- ProviderID: labels["providerID"],
- }
- }
- func (aws *AWS) isPreemptible(key string) bool {
- s := strings.Split(key, ",")
- if len(s) == 4 && s[3] == PreemptibleType {
- return true
- }
- return false
- }
- func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
- return aws.clusterProvisioner, aws.clusterManagementPrice, nil
- }
- // Use the pricing data from the current region. Fall back to using all region data if needed.
- func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
- pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
- region := ""
- multiregion := false
- for _, n := range nodeList {
- labels := n.GetLabels()
- currentNodeRegion := ""
- if r, ok := util.GetRegion(labels); ok {
- currentNodeRegion = r
- // Switch to Chinese endpoint for regions with the Chinese prefix
- if strings.HasPrefix(currentNodeRegion, "cn-") {
- pricingURL = "https://pricing.cn-north-1.amazonaws.com.cn/offers/v1.0/cn/AmazonEC2/current/"
- }
- } else {
- multiregion = true // We weren't able to detect the node's region, so pull all data.
- break
- }
- if region == "" { // We haven't set a region yet
- region = currentNodeRegion
- } else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
- multiregion = true
- break
- }
- }
- // Chinese multiregion endpoint only contains data for Chinese regions and Chinese regions are excluded from other endpoint
- if region != "" && !multiregion {
- pricingURL += region + "/"
- }
- pricingURL += "index.json"
- klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
- resp, err := http.Get(pricingURL)
- if err != nil {
- klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
- return nil, pricingURL, err
- }
- return resp, pricingURL, err
- }
- // DownloadPricingData fetches data from the AWS Pricing API
- func (aws *AWS) DownloadPricingData() error {
- aws.DownloadPricingDataLock.Lock()
- defer aws.DownloadPricingDataLock.Unlock()
- if aws.ServiceAccountChecks == nil {
- aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
- }
- c, err := aws.Config.GetCustomPricingData()
- if err != nil {
- klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
- }
- aws.BaseCPUPrice = c.CPU
- aws.BaseRAMPrice = c.RAM
- aws.BaseGPUPrice = c.GPU
- aws.BaseSpotCPUPrice = c.SpotCPU
- aws.BaseSpotRAMPrice = c.SpotRAM
- aws.BaseSpotGPUPrice = c.SpotGPU
- aws.SpotLabelName = c.SpotLabel
- aws.SpotLabelValue = c.SpotLabelValue
- aws.SpotDataBucket = c.SpotDataBucket
- aws.SpotDataPrefix = c.SpotDataPrefix
- aws.ProjectID = c.ProjectID
- aws.SpotDataRegion = c.SpotDataRegion
- aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
- if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
- klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
- }
- nodeList := aws.Clientset.GetAllNodes()
- inputkeys := make(map[string]bool)
- for _, n := range nodeList {
- if _, ok := n.Labels["eks.amazonaws.com/nodegroup"]; ok {
- aws.clusterManagementPrice = 0.10
- aws.clusterProvisioner = "EKS"
- } else if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
- aws.clusterProvisioner = "KOPS"
- }
- labels := n.GetObjectMeta().GetLabels()
- key := aws.GetKey(labels, n)
- inputkeys[key.Features()] = true
- }
- pvList := aws.Clientset.GetAllPersistentVolumes()
- storageClasses := aws.Clientset.GetAllStorageClasses()
- storageClassMap := make(map[string]map[string]string)
- for _, storageClass := range storageClasses {
- params := storageClass.Parameters
- storageClassMap[storageClass.ObjectMeta.Name] = params
- if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
- storageClassMap["default"] = params
- storageClassMap[""] = params
- }
- }
- pvkeys := make(map[string]PVKey)
- for _, pv := range pvList {
- params, ok := storageClassMap[pv.Spec.StorageClassName]
- if !ok {
- klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
- continue
- }
- key := aws.GetPVKey(pv, params, "")
- pvkeys[key.Features()] = key
- }
- // RIDataRunning establishes the existance of the goroutine. Since it's possible we
- // run multiple downloads, we don't want to create multiple go routines if one already exists
- if !aws.RIDataRunning && c.AthenaBucketName != "" {
- err = aws.GetReservationDataFromAthena() // Block until one run has completed.
- if err != nil {
- klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
- } else { // If we make one successful run, check on new reservation data every hour
- go func() {
- defer errors.HandlePanic()
- aws.RIDataRunning = true
- for {
- klog.Infof("Reserved Instance watcher running... next update in 1h")
- time.Sleep(time.Hour)
- err := aws.GetReservationDataFromAthena()
- if err != nil {
- klog.Infof("Error updating RI data: %s", err.Error())
- }
- }
- }()
- }
- }
- if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
- err = aws.GetSavingsPlanDataFromAthena()
- if err != nil {
- klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
- } else {
- go func() {
- defer errors.HandlePanic()
- aws.SavingsPlanDataRunning = true
- for {
- klog.Infof("Savings Plan watcher running... next update in 1h")
- time.Sleep(time.Hour)
- err := aws.GetSavingsPlanDataFromAthena()
- if err != nil {
- klog.Infof("Error updating Savings Plan data: %s", err.Error())
- }
- }
- }()
- }
- }
- aws.Pricing = make(map[string]*AWSProductTerms)
- aws.ValidPricingKeys = make(map[string]bool)
- skusToKeys := make(map[string]string)
- resp, pricingURL, err := aws.getRegionPricing(nodeList)
- if err != nil {
- return err
- }
- dec := json.NewDecoder(resp.Body)
- for {
- t, err := dec.Token()
- if err == io.EOF {
- klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
- break
- } else if err != nil {
- klog.V(2).Infof("error parsing response json %v", resp.Body)
- break
- }
- if t == "products" {
- _, err := dec.Token() // this should parse the opening "{""
- if err != nil {
- return err
- }
- for dec.More() {
- _, err := dec.Token() // the sku token
- if err != nil {
- return err
- }
- product := &AWSProduct{}
- err = dec.Decode(&product)
- if err != nil {
- klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
- break
- }
- if product.Attributes.PreInstalledSw == "NA" &&
- (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) &&
- product.Attributes.CapacityStatus == "Used" {
- key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
- spotKey := key + ",preemptible"
- if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
- productTerms := &AWSProductTerms{
- Sku: product.Sku,
- Memory: product.Attributes.Memory,
- Storage: product.Attributes.Storage,
- VCpu: product.Attributes.VCpu,
- GPU: product.Attributes.GPU,
- }
- aws.Pricing[key] = productTerms
- aws.Pricing[spotKey] = productTerms
- skusToKeys[product.Sku] = key
- }
- aws.ValidPricingKeys[key] = true
- aws.ValidPricingKeys[spotKey] = true
- } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
- // UsageTypes may be prefixed with a region code - we're removing this when using
- // volTypes to keep lookups generic
- usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
- usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
- usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
- key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
- spotKey := key + ",preemptible"
- pv := &PV{
- Class: volTypes[usageTypeNoRegion],
- Region: locationToRegion[product.Attributes.Location],
- }
- productTerms := &AWSProductTerms{
- Sku: product.Sku,
- PV: pv,
- }
- aws.Pricing[key] = productTerms
- aws.Pricing[spotKey] = productTerms
- skusToKeys[product.Sku] = key
- aws.ValidPricingKeys[key] = true
- aws.ValidPricingKeys[spotKey] = true
- }
- }
- }
- if t == "terms" {
- _, err := dec.Token() // this should parse the opening "{""
- if err != nil {
- return err
- }
- termType, err := dec.Token()
- if err != nil {
- return err
- }
- if termType == "OnDemand" {
- _, err := dec.Token()
- if err != nil { // again, should parse an opening "{"
- return err
- }
- for dec.More() {
- sku, err := dec.Token()
- if err != nil {
- return err
- }
- _, err = dec.Token() // another opening "{"
- if err != nil {
- return err
- }
- skuOnDemand, err := dec.Token()
- if err != nil {
- return err
- }
- offerTerm := &AWSOfferTerm{}
- err = dec.Decode(&offerTerm)
- if err != nil {
- klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
- }
- key, ok := skusToKeys[sku.(string)]
- spotKey := key + ",preemptible"
- if ok {
- aws.Pricing[key].OnDemand = offerTerm
- aws.Pricing[spotKey].OnDemand = offerTerm
- var cost string
- if sku.(string)+OnDemandRateCode == skuOnDemand {
- cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
- } else if sku.(string)+OnDemandRateCodeCn == skuOnDemand {
- cost = offerTerm.PriceDimensions[sku.(string)+OnDemandRateCodeCn+HourlyRateCodeCn].PricePerUnit.CNY
- }
- if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
- // If the specific UsageType is the per IO cost used on io1 volumes
- // we need to add the per IO cost to the io1 PV cost
- // Add the per IO cost to the PV object for the io1 volume type
- aws.Pricing[key].PV.CostPerIO = cost
- } else if strings.Contains(key, "EBS:Volume") {
- // If volume, we need to get hourly cost and add it to the PV object
- costFloat, _ := strconv.ParseFloat(cost, 64)
- hourlyPrice := costFloat / 730
- aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
- }
- }
- _, err = dec.Token()
- if err != nil {
- return err
- }
- }
- _, err = dec.Token()
- if err != nil {
- return err
- }
- }
- }
- }
- klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
- // Always run spot pricing refresh when performing download
- aws.refreshSpotPricing(true)
- // Only start a single refresh goroutine
- if !aws.SpotRefreshRunning {
- aws.SpotRefreshRunning = true
- go func() {
- defer errors.HandlePanic()
- for {
- klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
- time.Sleep(SpotRefreshDuration)
- // Reoccurring refresh checks update times
- aws.refreshSpotPricing(false)
- }
- }()
- }
- return nil
- }
- func (aws *AWS) refreshSpotPricing(force bool) {
- aws.SpotPricingLock.Lock()
- defer aws.SpotPricingLock.Unlock()
- now := time.Now().UTC()
- updateTime := now.Add(-SpotRefreshDuration)
- // Return if there was an update time set and an hour hasn't elapsed
- if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
- return
- }
- sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
- if err != nil {
- klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
- aws.SpotPricingError = err
- return
- }
- aws.SpotPricingError = nil
- // update time last updated
- aws.SpotPricingUpdatedAt = &now
- aws.SpotPricingByInstanceID = sp
- }
- // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
- func (aws *AWS) NetworkPricing() (*Network, error) {
- cpricing, err := aws.Config.GetCustomPricingData()
- if err != nil {
- return nil, err
- }
- znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
- if err != nil {
- return nil, err
- }
- rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
- if err != nil {
- return nil, err
- }
- inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
- if err != nil {
- return nil, err
- }
- return &Network{
- ZoneNetworkEgressCost: znec,
- RegionNetworkEgressCost: rnec,
- InternetNetworkEgressCost: inec,
- }, nil
- }
- func (aws *AWS) LoadBalancerPricing() (*LoadBalancer, error) {
- fffrc := 0.025
- afrc := 0.010
- lbidc := 0.008
- numForwardingRules := 1.0
- dataIngressGB := 0.0
- var totalCost float64
- if numForwardingRules < 5 {
- totalCost = fffrc*numForwardingRules + lbidc*dataIngressGB
- } else {
- totalCost = fffrc*5 + afrc*(numForwardingRules-5) + lbidc*dataIngressGB
- }
- return &LoadBalancer{
- Cost: totalCost,
- }, nil
- }
- // AllNodePricing returns all the billing data fetched.
- func (aws *AWS) AllNodePricing() (interface{}, error) {
- aws.DownloadPricingDataLock.RLock()
- defer aws.DownloadPricingDataLock.RUnlock()
- return aws.Pricing, nil
- }
- func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
- aws.SpotPricingLock.RLock()
- defer aws.SpotPricingLock.RUnlock()
- info, ok := aws.SpotPricingByInstanceID[instanceID]
- return info, ok
- }
- func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
- aws.RIDataLock.RLock()
- defer aws.RIDataLock.RUnlock()
- data, ok := aws.RIPricingByInstanceID[instanceID]
- return data, ok
- }
- func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
- aws.SavingsPlanDataLock.RLock()
- defer aws.SavingsPlanDataLock.RUnlock()
- data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
- return data, ok
- }
- func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
- key := k.Features()
- if spotInfo, ok := aws.spotPricing(k.ID()); ok {
- var spotcost string
- log.DedupedInfof(5, "Looking up spot data from feed for node %s", k.ID())
- arr := strings.Split(spotInfo.Charge, " ")
- if len(arr) == 2 {
- spotcost = arr[0]
- } else {
- klog.V(2).Infof("Spot data for node %s is missing", k.ID())
- }
- return &Node{
- Cost: spotcost,
- VCPU: terms.VCpu,
- RAM: terms.Memory,
- GPU: terms.GPU,
- Storage: terms.Storage,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: PreemptibleType,
- }, nil
- } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
- log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
- return &Node{
- VCPU: terms.VCpu,
- VCPUCost: aws.BaseSpotCPUPrice,
- RAM: terms.Memory,
- GPU: terms.GPU,
- Storage: terms.Storage,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: PreemptibleType,
- }, nil
- } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
- strCost := fmt.Sprintf("%f", sp.EffectiveCost)
- return &Node{
- Cost: strCost,
- VCPU: terms.VCpu,
- RAM: terms.Memory,
- GPU: terms.GPU,
- Storage: terms.Storage,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- }, nil
- } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
- strCost := fmt.Sprintf("%f", ri.EffectiveCost)
- return &Node{
- Cost: strCost,
- VCPU: terms.VCpu,
- RAM: terms.Memory,
- GPU: terms.GPU,
- Storage: terms.Storage,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- }, nil
- }
- var cost string
- c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
- if ok {
- cost = c.PricePerUnit.USD
- } else {
- // Check for Chinese pricing before throwing error
- c, ok = terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCodeCn+HourlyRateCodeCn]
- if ok {
- cost = c.PricePerUnit.CNY
- } else {
- return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
- }
- }
- return &Node{
- Cost: cost,
- VCPU: terms.VCpu,
- RAM: terms.Memory,
- GPU: terms.GPU,
- Storage: terms.Storage,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- }, nil
- }
- // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
- func (aws *AWS) NodePricing(k Key) (*Node, error) {
- aws.DownloadPricingDataLock.RLock()
- defer aws.DownloadPricingDataLock.RUnlock()
- key := k.Features()
- usageType := "ondemand"
- if aws.isPreemptible(key) {
- usageType = PreemptibleType
- }
- terms, ok := aws.Pricing[key]
- if ok {
- return aws.createNode(terms, usageType, k)
- } else if _, ok := aws.ValidPricingKeys[key]; ok {
- aws.DownloadPricingDataLock.RUnlock()
- err := aws.DownloadPricingData()
- aws.DownloadPricingDataLock.RLock()
- if err != nil {
- return &Node{
- Cost: aws.BaseCPUPrice,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- UsesBaseCPUPrice: true,
- }, err
- }
- terms, termsOk := aws.Pricing[key]
- if !termsOk {
- return &Node{
- Cost: aws.BaseCPUPrice,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- UsesBaseCPUPrice: true,
- }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
- }
- return aws.createNode(terms, usageType, k)
- } else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
- return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
- }
- }
- // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
- func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
- defaultClusterName := "AWS Cluster #1"
- c, err := awsProvider.GetConfig()
- if err != nil {
- return nil, err
- }
- remoteEnabled := env.IsRemoteEnabled()
- if c.ClusterName != "" {
- m := make(map[string]string)
- m["name"] = c.ClusterName
- m["provider"] = "AWS"
- m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
- m["region"] = awsProvider.clusterRegion
- m["id"] = env.GetClusterID()
- m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
- m["provisioner"] = awsProvider.clusterProvisioner
- return m, nil
- }
- makeStructure := func(clusterName string) (map[string]string, error) {
- klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
- m := make(map[string]string)
- m["name"] = clusterName
- m["provider"] = "AWS"
- m["account"] = c.AthenaProjectID // this value requires configuration but is unavailable else where
- m["region"] = awsProvider.clusterRegion
- m["id"] = env.GetClusterID()
- m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
- return m, nil
- }
- maybeClusterId := env.GetAWSClusterID()
- if len(maybeClusterId) != 0 {
- return makeStructure(maybeClusterId)
- }
- // TODO: This should be cached, it can take a long time to hit the API
- //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
- //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
- //klog.Infof("nodelist get here %s", time.Now())
- //nodeList := awsProvider.Clientset.GetAllNodes()
- //klog.Infof("nodelist done here %s", time.Now())
- /*for _, n := range nodeList {
- region := ""
- instanceId := ""
- providerId := n.Spec.ProviderID
- for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
- if matchNum == 1 {
- region = group
- } else if matchNum == 2 {
- instanceId = group
- }
- }
- if len(instanceId) == 0 {
- klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
- continue
- }
- c := &aws.Config{
- Region: aws.String(region),
- }
- s := session.Must(session.NewSession(c))
- ec2Svc := ec2.New(s)
- di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
- InstanceIds: []*string{
- aws.String(instanceId),
- },
- })
- if diErr != nil {
- klog.Infof("Error describing instances: %s", diErr)
- continue
- }
- if len(di.Reservations) != 1 {
- klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
- continue
- }
- res := di.Reservations[0]
- if len(res.Instances) != 1 {
- klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
- continue
- }
- inst := res.Instances[0]
- for _, tag := range inst.Tags {
- tagKey := *tag.Key
- for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
- if matchNum != 1 {
- continue
- }
- return makeStructure(group)
- }
- }
- }*/
- klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
- return makeStructure(defaultClusterName)
- }
- // updates the authentication to the latest values (via config or secret)
- func (aws *AWS) ConfigureAuth() error {
- c, err := aws.Config.GetCustomPricingData()
- if err != nil {
- klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
- }
- return aws.ConfigureAuthWith(c)
- }
- // updates the authentication to the latest values (via config or secret)
- func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
- accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
- if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
- err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
- if err != nil {
- return err
- }
- err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // Gets the aws key id and secret
- func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
- if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
- aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
- }
- // 1. Check config values first (set from frontend UI)
- if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
- aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
- Message: "AWS ServiceKey exists",
- Status: true,
- }
- return cp.ServiceKeyName, cp.ServiceKeySecret
- }
- // 2. Check for secret
- s, _ := aws.loadAWSAuthSecret(forceReload)
- if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
- aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
- Message: "AWS ServiceKey exists",
- Status: true,
- }
- return s.AccessKeyID, s.SecretAccessKey
- }
- // 3. Fall back to env vars
- if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
- aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
- Message: "AWS ServiceKey exists",
- Status: false,
- }
- } else {
- aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
- Message: "AWS ServiceKey exists",
- Status: true,
- }
- }
- return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
- }
- // Load once and cache the result (even on failure). This is an install time secret, so
- // we don't expect the secret to change. If it does, however, we can force reload using
- // the input parameter.
- func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
- if !force && loadedAWSSecret {
- return awsSecret, nil
- }
- loadedAWSSecret = true
- exists, err := fileutil.FileExists(authSecretPath)
- if !exists || err != nil {
- return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
- }
- result, err := ioutil.ReadFile(authSecretPath)
- if err != nil {
- return nil, err
- }
- var ak AWSAccessKey
- err = json.Unmarshal(result, &ak)
- if err != nil {
- return nil, err
- }
- awsSecret = &ak
- return awsSecret, nil
- }
- func getClusterConfig(ccFile string) (map[string]string, error) {
- clusterConfig, err := os.Open(ccFile)
- if err != nil {
- return nil, err
- }
- defer clusterConfig.Close()
- b, err := ioutil.ReadAll(clusterConfig)
- if err != nil {
- return nil, err
- }
- var clusterConf map[string]string
- err = json.Unmarshal([]byte(b), &clusterConf)
- if err != nil {
- return nil, err
- }
- return clusterConf, nil
- }
- func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
- sess, err := session.NewSession(&aws.Config{
- Region: aws.String(region),
- Credentials: credentials.NewEnvCredentials(),
- })
- if err != nil {
- return nil, err
- }
- ec2Svc := ec2.New(sess)
- return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
- }
- func (a *AWS) GetAddresses() ([]byte, error) {
- a.ConfigureAuth() // load authentication data into env vars
- addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
- errorCh := make(chan error, len(awsRegions))
- var wg sync.WaitGroup
- wg.Add(len(awsRegions))
- // Get volumes from each AWS region
- for _, r := range awsRegions {
- // Fetch IP address response and send results and errors to their
- // respective channels
- go func(region string) {
- defer wg.Done()
- defer errors.HandlePanic()
- // Query for first page of volume results
- resp, err := a.getAddressesForRegion(region)
- if err != nil {
- if aerr, ok := err.(awserr.Error); ok {
- switch aerr.Code() {
- default:
- errorCh <- aerr
- }
- return
- } else {
- errorCh <- err
- return
- }
- }
- addressCh <- resp
- }(r)
- }
- // Close the result channels after everything has been sent
- go func() {
- defer errors.HandlePanic()
- wg.Wait()
- close(errorCh)
- close(addressCh)
- }()
- addresses := []*ec2.Address{}
- for adds := range addressCh {
- addresses = append(addresses, adds.Addresses...)
- }
- errors := []error{}
- for err := range errorCh {
- log.DedupedWarningf(5, "unable to get addresses: %s", err)
- errors = append(errors, err)
- }
- // Return error if no addresses are returned
- if len(errors) > 0 && len(addresses) == 0 {
- return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
- }
- // Format the response this way to match the JSON-encoded formatting of a single response
- // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
- // a "Addresss" key at the top level.
- return json.Marshal(map[string][]*ec2.Address{
- "Addresses": addresses,
- })
- }
- func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
- sess, err := session.NewSession(&aws.Config{
- Region: aws.String(region),
- Credentials: credentials.NewEnvCredentials(),
- })
- if err != nil {
- return nil, err
- }
- ec2Svc := ec2.New(sess)
- return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
- MaxResults: &maxResults,
- NextToken: nextToken,
- })
- }
- // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
- func (a *AWS) GetDisks() ([]byte, error) {
- a.ConfigureAuth() // load authentication data into env vars
- volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
- errorCh := make(chan error, len(awsRegions))
- var wg sync.WaitGroup
- wg.Add(len(awsRegions))
- // Get volumes from each AWS region
- for _, r := range awsRegions {
- // Fetch volume response and send results and errors to their
- // respective channels
- go func(region string) {
- defer wg.Done()
- defer errors.HandlePanic()
- // Query for first page of volume results
- resp, err := a.getDisksForRegion(region, 1000, nil)
- if err != nil {
- if aerr, ok := err.(awserr.Error); ok {
- switch aerr.Code() {
- default:
- errorCh <- aerr
- }
- return
- } else {
- errorCh <- err
- return
- }
- }
- volumeCh <- resp
- // A NextToken indicates more pages of results. Keep querying
- // until all pages are retrieved.
- for resp.NextToken != nil {
- resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
- if err != nil {
- if aerr, ok := err.(awserr.Error); ok {
- switch aerr.Code() {
- default:
- errorCh <- aerr
- }
- return
- } else {
- errorCh <- err
- return
- }
- }
- volumeCh <- resp
- }
- }(r)
- }
- // Close the result channels after everything has been sent
- go func() {
- defer errors.HandlePanic()
- wg.Wait()
- close(errorCh)
- close(volumeCh)
- }()
- volumes := []*ec2.Volume{}
- for vols := range volumeCh {
- volumes = append(volumes, vols.Volumes...)
- }
- errors := []error{}
- for err := range errorCh {
- log.DedupedWarningf(5, "unable to get disks: %s", err)
- errors = append(errors, err)
- }
- // Return error if no volumes are returned
- if len(errors) > 0 && len(volumes) == 0 {
- return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
- }
- // Format the response this way to match the JSON-encoded formatting of a single response
- // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
- // a "Volumes" key at the top level.
- return json.Marshal(map[string][]*ec2.Volume{
- "Volumes": volumes,
- })
- }
- func generateAWSGroupBy(lastIdx int) string {
- sequence := []string{}
- for i := 1; i < lastIdx+1; i++ {
- sequence = append(sequence, strconv.Itoa(i))
- }
- return strings.Join(sequence, ",")
- }
- func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
- customPricing, err := a.GetConfig()
- if err != nil {
- return nil, nil, err
- }
- a.ConfigureAuthWith(customPricing)
- region := aws.String(customPricing.AthenaRegion)
- resultsBucket := customPricing.AthenaBucketName
- database := customPricing.AthenaDatabase
- c := &aws.Config{
- Region: region,
- STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
- }
- s := session.Must(session.NewSession(c))
- svc := athena.New(s)
- if customPricing.MasterPayerARN != "" {
- creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
- svc = athena.New(s, &aws.Config{
- Region: region,
- Credentials: creds,
- })
- }
- var e athena.StartQueryExecutionInput
- var r athena.ResultConfiguration
- r.SetOutputLocation(resultsBucket)
- e.SetResultConfiguration(&r)
- e.SetQueryString(query)
- var q athena.QueryExecutionContext
- q.SetDatabase(database)
- e.SetQueryExecutionContext(&q)
- res, err := svc.StartQueryExecution(&e)
- if err != nil {
- return nil, svc, err
- }
- klog.V(2).Infof("StartQueryExecution result:")
- klog.V(2).Infof(res.GoString())
- var qri athena.GetQueryExecutionInput
- qri.SetQueryExecutionId(*res.QueryExecutionId)
- var qrop *athena.GetQueryExecutionOutput
- duration := time.Duration(2) * time.Second // Pause for 2 seconds
- for {
- qrop, err = svc.GetQueryExecution(&qri)
- if err != nil {
- return nil, svc, err
- }
- if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
- break
- }
- time.Sleep(duration)
- }
- if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
- var ip athena.GetQueryResultsInput
- ip.SetQueryExecutionId(*res.QueryExecutionId)
- return &ip, svc, nil
- } else {
- return nil, svc, fmt.Errorf("No results available for %s", query)
- }
- }
- func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
- customPricing, err := a.GetConfig()
- if err != nil {
- return nil, err
- }
- a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
- region := aws.String(customPricing.AthenaRegion)
- resultsBucket := customPricing.AthenaBucketName
- database := customPricing.AthenaDatabase
- c := &aws.Config{
- Region: region,
- STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
- }
- s := session.Must(session.NewSession(c))
- svc := athena.New(s)
- if customPricing.MasterPayerARN != "" {
- creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
- svc = athena.New(s, &aws.Config{
- Region: region,
- Credentials: creds,
- })
- }
- var e athena.StartQueryExecutionInput
- var r athena.ResultConfiguration
- r.SetOutputLocation(resultsBucket)
- e.SetResultConfiguration(&r)
- e.SetQueryString(query)
- var q athena.QueryExecutionContext
- q.SetDatabase(database)
- e.SetQueryExecutionContext(&q)
- res, err := svc.StartQueryExecution(&e)
- if err != nil {
- return nil, err
- }
- klog.V(2).Infof("StartQueryExecution result:")
- klog.V(2).Infof(res.GoString())
- var qri athena.GetQueryExecutionInput
- qri.SetQueryExecutionId(*res.QueryExecutionId)
- var qrop *athena.GetQueryExecutionOutput
- duration := time.Duration(2) * time.Second // Pause for 2 seconds
- for {
- qrop, err = svc.GetQueryExecution(&qri)
- if err != nil {
- return nil, err
- }
- if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
- break
- }
- time.Sleep(duration)
- }
- if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
- var ip athena.GetQueryResultsInput
- ip.SetQueryExecutionId(*res.QueryExecutionId)
- return svc.GetQueryResults(&ip)
- } else {
- return nil, fmt.Errorf("No results available for %s", query)
- }
- }
- type SavingsPlanData struct {
- ResourceID string
- EffectiveCost float64
- SavingsPlanARN string
- MostRecentDate string
- }
- func (a *AWS) GetSavingsPlanDataFromAthena() error {
- cfg, err := a.GetConfig()
- if err != nil {
- return err
- }
- if cfg.AthenaBucketName == "" {
- return fmt.Errorf("No Athena Bucket configured")
- }
- if a.SavingsPlanDataByInstanceID == nil {
- a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
- }
- tNow := time.Now()
- tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
- start := tOneDayAgo.Format("2006-01-02")
- end := tNow.Format("2006-01-02")
- // Use Savings Plan Effective Rate as an estimation for cost, assuming the 1h most recent period got a fully loaded savings plan.
- //
- q := `SELECT
- line_item_usage_start_date,
- savings_plan_savings_plan_a_r_n,
- line_item_resource_id,
- savings_plan_savings_plan_rate
- FROM %s as cost_data
- WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
- AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
- line_item_usage_start_date DESC`
- page := 0
- processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
- a.SavingsPlanDataLock.Lock()
- a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData) // Clean out the old data and only report a savingsplan price if its in the most recent run.
- mostRecentDate := ""
- iter := op.ResultSet.Rows
- if page == 0 && len(iter) > 0 {
- iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
- }
- page++
- for _, r := range iter {
- d := *r.Data[0].VarCharValue
- if mostRecentDate == "" {
- mostRecentDate = d
- } else if mostRecentDate != d { // Get all most recent assignments
- break
- }
- cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
- if err != nil {
- klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
- }
- r := &SavingsPlanData{
- ResourceID: *r.Data[2].VarCharValue,
- EffectiveCost: cost,
- SavingsPlanARN: *r.Data[1].VarCharValue,
- MostRecentDate: d,
- }
- a.SavingsPlanDataByInstanceID[r.ResourceID] = r
- }
- klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
- for k, r := range a.SavingsPlanDataByInstanceID {
- log.DedupedInfof(5, "Savings Plan Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
- }
- a.SavingsPlanDataLock.Unlock()
- return true
- }
- query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
- klog.V(3).Infof("Running Query: %s", query)
- ip, svc, err := a.QueryAthenaPaginated(query)
- if err != nil {
- return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
- }
- athenaErr := svc.GetQueryResultsPages(ip, processResults)
- if athenaErr != nil {
- return athenaErr
- }
- return nil
- }
- type RIData struct {
- ResourceID string
- EffectiveCost float64
- ReservationARN string
- MostRecentDate string
- }
- func (a *AWS) GetReservationDataFromAthena() error {
- cfg, err := a.GetConfig()
- if err != nil {
- return err
- }
- if cfg.AthenaBucketName == "" {
- return fmt.Errorf("No Athena Bucket configured")
- }
- // Query for all column names in advance in order to validate configured
- // label columns
- columns, _ := a.ShowAthenaColumns()
- if columns["reservation_reservation_a_r_n"] && columns["reservation_effective_cost"] {
- if a.RIPricingByInstanceID == nil {
- a.RIPricingByInstanceID = make(map[string]*RIData)
- }
- tNow := time.Now()
- tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
- start := tOneDayAgo.Format("2006-01-02")
- end := tNow.Format("2006-01-02")
- q := `SELECT
- line_item_usage_start_date,
- reservation_reservation_a_r_n,
- line_item_resource_id,
- reservation_effective_cost
- FROM %s as cost_data
- WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
- AND reservation_reservation_a_r_n <> '' ORDER BY
- line_item_usage_start_date DESC`
- query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
- op, err := a.QueryAthenaBillingData(query)
- if err != nil {
- a.RIPricingError = err
- return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
- }
- a.RIPricingError = nil
- klog.Infof("Fetching RI data...")
- if len(op.ResultSet.Rows) > 1 {
- a.RIDataLock.Lock()
- mostRecentDate := ""
- for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
- d := *r.Data[0].VarCharValue
- if mostRecentDate == "" {
- mostRecentDate = d
- } else if mostRecentDate != d { // Get all most recent assignments
- break
- }
- cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
- if err != nil {
- klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
- }
- r := &RIData{
- ResourceID: *r.Data[2].VarCharValue,
- EffectiveCost: cost,
- ReservationARN: *r.Data[1].VarCharValue,
- MostRecentDate: d,
- }
- a.RIPricingByInstanceID[r.ResourceID] = r
- }
- klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
- for k, r := range a.RIPricingByInstanceID {
- log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
- }
- a.RIDataLock.Unlock()
- } else {
- klog.Infof("No reserved instance data found")
- }
- } else {
- klog.Infof("No reserved data available in Athena")
- a.RIPricingError = nil
- }
- return nil
- }
- // ShowAthenaColumns returns a list of the names of all columns in the configured
- // Athena tables
- func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
- columnSet := map[string]bool{}
- // Configure Athena query
- cfg, err := aws.GetConfig()
- if err != nil {
- return nil, err
- }
- if cfg.AthenaTable == "" {
- return nil, fmt.Errorf("AthenaTable not configured")
- }
- if cfg.AthenaBucketName == "" {
- return nil, fmt.Errorf("AthenaBucketName not configured")
- }
- q := `SHOW COLUMNS IN %s`
- query := fmt.Sprintf(q, cfg.AthenaTable)
- results, svc, err := aws.QueryAthenaPaginated(query)
- columns := []string{}
- pageNum := 0
- athenaErr := svc.GetQueryResultsPages(results, func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
- for _, row := range page.ResultSet.Rows {
- columns = append(columns, *row.Data[0].VarCharValue)
- }
- pageNum++
- return true
- })
- if athenaErr != nil {
- log.Warningf("Error getting Athena columns: %s", err)
- return columnSet, athenaErr
- }
- for _, col := range columns {
- columnSet[col] = true
- }
- return columnSet, nil
- }
- // ExternalAllocations represents tagged assets outside the scope of kubernetes.
- // "start" and "end" are dates of the format YYYY-MM-DD
- // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
- func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
- customPricing, err := a.GetConfig()
- if err != nil {
- return nil, err
- }
- formattedAggregators := []string{}
- for _, agg := range aggregators {
- aggregator_column_name := "resource_tags_user_" + agg
- aggregator_column_name = cloudutil.ConvertToGlueColumnFormat(aggregator_column_name)
- formattedAggregators = append(formattedAggregators, aggregator_column_name)
- }
- aggregatorNames := strings.Join(formattedAggregators, ",")
- aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
- aggregatorOr = aggregatorOr + " <> ''"
- filter_column_name := "resource_tags_user_" + filterType
- filter_column_name = cloudutil.ConvertToGlueColumnFormat(filter_column_name)
- var query string
- var lastIdx int
- if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
- lastIdx = len(formattedAggregators) + 3
- groupby := generateAWSGroupBy(lastIdx)
- query = fmt.Sprintf(`SELECT
- CAST(line_item_usage_start_date AS DATE) as start_date,
- %s,
- line_item_product_code,
- %s,
- SUM(line_item_blended_cost) as blended_cost
- FROM %s as cost_data
- WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
- GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
- } else {
- lastIdx = len(formattedAggregators) + 2
- groupby := generateAWSGroupBy(lastIdx)
- query = fmt.Sprintf(`SELECT
- CAST(line_item_usage_start_date AS DATE) as start_date,
- %s,
- line_item_product_code,
- SUM(line_item_blended_cost) as blended_cost
- FROM %s as cost_data
- WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
- GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
- }
- var oocAllocs []*OutOfClusterAllocation
- page := 0
- processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
- iter := op.ResultSet.Rows
- if page == 0 && len(iter) > 0 {
- iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
- }
- page++
- for _, r := range iter {
- cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
- if err != nil {
- klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
- }
- environment := ""
- for _, d := range r.Data[1 : len(formattedAggregators)+1] {
- if *d.VarCharValue != "" {
- environment = *d.VarCharValue // just set to the first nonempty match
- }
- break
- }
- ooc := &OutOfClusterAllocation{
- Aggregator: strings.Join(aggregators, ","),
- Environment: environment,
- Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
- Cost: cost,
- }
- oocAllocs = append(oocAllocs, ooc)
- }
- return true
- }
- // Query for all column names in advance in order to validate configured
- // label columns
- columns, _ := a.ShowAthenaColumns()
- // Check for all aggregators being formatted into the query
- containsColumns := true
- for _, agg := range formattedAggregators {
- if columns[agg] != true {
- containsColumns = false
- klog.Warningf("Athena missing column: %s", agg)
- }
- }
- if containsColumns {
- klog.V(3).Infof("Running Query: %s", query)
- ip, svc, _ := a.QueryAthenaPaginated(query)
- athenaErr := svc.GetQueryResultsPages(ip, processResults)
- if athenaErr != nil {
- klog.Infof("RETURNING ATHENA ERROR")
- return nil, athenaErr
- }
- if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
- gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
- if err != nil {
- klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
- }
- gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
- if err != nil {
- klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
- }
- oocAllocs = append(oocAllocs, gcpOOC...)
- }
- } else {
- klog.Infof("External Allocations: Athena Query skipped due to missing columns")
- }
- return oocAllocs, nil
- }
- // QuerySQL can query a properly configured Athena database.
- // Used to fetch billing data.
- // Requires a json config in /var/configs with key region, output, and database.
- func (a *AWS) QuerySQL(query string) ([]byte, error) {
- customPricing, err := a.GetConfig()
- if err != nil {
- return nil, err
- }
- a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
- athenaConfigs, err := os.Open("/var/configs/athena.json")
- if err != nil {
- return nil, err
- }
- defer athenaConfigs.Close()
- b, err := ioutil.ReadAll(athenaConfigs)
- if err != nil {
- return nil, err
- }
- var athenaConf map[string]string
- json.Unmarshal([]byte(b), &athenaConf)
- region := aws.String(customPricing.AthenaRegion)
- resultsBucket := customPricing.AthenaBucketName
- database := customPricing.AthenaDatabase
- c := &aws.Config{
- Region: region,
- }
- s := session.Must(session.NewSession(c))
- svc := athena.New(s)
- var e athena.StartQueryExecutionInput
- var r athena.ResultConfiguration
- r.SetOutputLocation(resultsBucket)
- e.SetResultConfiguration(&r)
- e.SetQueryString(query)
- var q athena.QueryExecutionContext
- q.SetDatabase(database)
- e.SetQueryExecutionContext(&q)
- res, err := svc.StartQueryExecution(&e)
- if err != nil {
- return nil, err
- }
- klog.V(2).Infof("StartQueryExecution result:")
- klog.V(2).Infof(res.GoString())
- var qri athena.GetQueryExecutionInput
- qri.SetQueryExecutionId(*res.QueryExecutionId)
- var qrop *athena.GetQueryExecutionOutput
- duration := time.Duration(2) * time.Second // Pause for 2 seconds
- for {
- qrop, err = svc.GetQueryExecution(&qri)
- if err != nil {
- return nil, err
- }
- if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
- break
- }
- time.Sleep(duration)
- }
- if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
- var ip athena.GetQueryResultsInput
- ip.SetQueryExecutionId(*res.QueryExecutionId)
- op, err := svc.GetQueryResults(&ip)
- if err != nil {
- return nil, err
- }
- b, err := json.Marshal(op.ResultSet)
- if err != nil {
- return nil, err
- }
- return b, nil
- }
- return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
- }
- type spotInfo struct {
- Timestamp string `csv:"Timestamp"`
- UsageType string `csv:"UsageType"`
- Operation string `csv:"Operation"`
- InstanceID string `csv:"InstanceID"`
- MyBidID string `csv:"MyBidID"`
- MyMaxPrice string `csv:"MyMaxPrice"`
- MarketPrice string `csv:"MarketPrice"`
- Charge string `csv:"Charge"`
- Version string `csv:"Version"`
- }
- type fnames []*string
- func (f fnames) Len() int {
- return len(f)
- }
- func (f fnames) Swap(i, j int) {
- f[i], f[j] = f[j], f[i]
- }
- func (f fnames) Less(i, j int) bool {
- key1 := strings.Split(*f[i], ".")
- key2 := strings.Split(*f[j], ".")
- t1, err := time.Parse("2006-01-02-15", key1[1])
- if err != nil {
- klog.V(1).Info("Unable to parse timestamp" + key1[1])
- return false
- }
- t2, err := time.Parse("2006-01-02-15", key2[1])
- if err != nil {
- klog.V(1).Info("Unable to parse timestamp" + key2[1])
- return false
- }
- return t1.Before(t2)
- }
- func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
- if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
- a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
- }
- a.ConfigureAuth() // configure aws api authentication by setting env vars
- s3Prefix := projectID
- if len(prefix) != 0 {
- s3Prefix = prefix + "/" + s3Prefix
- }
- c := aws.NewConfig().WithRegion(region)
- s := session.Must(session.NewSession(c))
- s3Svc := s3.New(s)
- downloader := s3manager.NewDownloaderWithClient(s3Svc)
- tNow := time.Now()
- tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
- ls := &s3.ListObjectsInput{
- Bucket: aws.String(bucket),
- Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
- }
- ls2 := &s3.ListObjectsInput{
- Bucket: aws.String(bucket),
- Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
- }
- lso, err := s3Svc.ListObjects(ls)
- if err != nil {
- a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
- Message: "Bucket List Permissions Available",
- Status: false,
- AdditionalInfo: err.Error(),
- }
- return nil, err
- } else {
- a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
- Message: "Bucket List Permissions Available",
- Status: true,
- }
- }
- lsoLen := len(lso.Contents)
- klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
- if lsoLen == 0 {
- klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
- }
- lso2, err := s3Svc.ListObjects(ls2)
- if err != nil {
- return nil, err
- }
- lso2Len := len(lso2.Contents)
- klog.V(2).Infof("Found %d spot data files from today", lso2Len)
- if lso2Len == 0 {
- klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
- }
- // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
- var keys []*string
- for _, obj := range lso.Contents {
- keys = append(keys, obj.Key)
- }
- for _, obj := range lso2.Contents {
- keys = append(keys, obj.Key)
- }
- versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
- header, err := csvutil.Header(spotInfo{}, "csv")
- if err != nil {
- return nil, err
- }
- fieldsPerRecord := len(header)
- spots := make(map[string]*spotInfo)
- for _, key := range keys {
- getObj := &s3.GetObjectInput{
- Bucket: aws.String(bucket),
- Key: key,
- }
- buf := aws.NewWriteAtBuffer([]byte{})
- _, err := downloader.Download(buf, getObj)
- if err != nil {
- a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
- Message: "Object Get Permissions Available",
- Status: false,
- AdditionalInfo: err.Error(),
- }
- return nil, err
- } else {
- a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
- Message: "Object Get Permissions Available",
- Status: true,
- }
- }
- r := bytes.NewReader(buf.Bytes())
- gr, err := gzip.NewReader(r)
- if err != nil {
- return nil, err
- }
- csvReader := csv.NewReader(gr)
- csvReader.Comma = '\t'
- csvReader.FieldsPerRecord = fieldsPerRecord
- dec, err := csvutil.NewDecoder(csvReader, header...)
- if err != nil {
- return nil, err
- }
- var foundVersion string
- for {
- spot := spotInfo{}
- err := dec.Decode(&spot)
- csvParseErr, isCsvParseErr := err.(*csv.ParseError)
- if err == io.EOF {
- break
- } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
- rec := dec.Record()
- // the first two "Record()" will be the comment lines
- // and they show up as len() == 1
- // the first of which is "#Version"
- // the second of which is "#Fields: "
- if len(rec) != 1 {
- klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
- continue
- }
- if len(foundVersion) == 0 {
- spotFeedVersion := rec[0]
- klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
- matches := versionRx.FindStringSubmatch(spotFeedVersion)
- if matches != nil {
- foundVersion = matches[1]
- if foundVersion != supportedSpotFeedVersion {
- klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
- break
- }
- }
- continue
- } else if strings.Index(rec[0], "#") == 0 {
- continue
- } else {
- klog.V(3).Infof("skipping non-TSV line: %s", rec)
- continue
- }
- } else if err != nil {
- klog.V(2).Infof("Error during spot info decode: %+v", err)
- continue
- }
- log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
- spots[spot.InstanceID] = &spot
- }
- gr.Close()
- }
- return spots, nil
- }
- func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
- }
- func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
- checks := []*ServiceAccountCheck{}
- for _, v := range a.ServiceAccountChecks {
- checks = append(checks, v)
- }
- return &ServiceAccountStatus{
- Checks: checks,
- }
- }
- func (aws *AWS) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
- return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
- }
- func (aws *AWS) Regions() []string {
- return awsRegions
- }
|