| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403 |
- package cloud
- import (
- "bytes"
- "compress/gzip"
- "encoding/csv"
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "regexp"
- "strconv"
- "strings"
- "sync"
- "time"
- "k8s.io/klog"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/athena"
- "github.com/aws/aws-sdk-go/service/ec2"
- "github.com/aws/aws-sdk-go/service/s3"
- "github.com/aws/aws-sdk-go/service/s3/s3manager"
- "github.com/jszwec/csvutil"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/kubernetes"
- )
- const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
- const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
- const supportedSpotFeedVersion = "1"
- const SpotInfoUpdateType = "spotinfo"
- const AthenaInfoUpdateType = "athenainfo"
- // AWS represents an Amazon Provider
- type AWS struct {
- Pricing map[string]*AWSProductTerms
- SpotPricingByInstanceID map[string]*spotInfo
- ValidPricingKeys map[string]bool
- Clientset *kubernetes.Clientset
- BaseCPUPrice string
- BaseRAMPrice string
- BaseGPUPrice string
- BaseSpotCPUPrice string
- BaseSpotRAMPrice string
- SpotLabelName string
- SpotLabelValue string
- ServiceKeyName string
- ServiceKeySecret string
- SpotDataRegion string
- SpotDataBucket string
- SpotDataPrefix string
- ProjectID string
- DownloadPricingDataLock sync.RWMutex
- *CustomProvider
- }
- // AWSPricing maps a k8s node to an AWS Pricing "product"
- type AWSPricing struct {
- Products map[string]*AWSProduct `json:"products"`
- Terms AWSPricingTerms `json:"terms"`
- }
- // AWSProduct represents a purchased SKU
- type AWSProduct struct {
- Sku string `json:"sku"`
- Attributes AWSProductAttributes `json:"attributes"`
- }
- // AWSProductAttributes represents metadata about the product used to map to a node.
- type AWSProductAttributes struct {
- Location string `json:"location"`
- InstanceType string `json:"instanceType"`
- Memory string `json:"memory"`
- Storage string `json:"storage"`
- VCpu string `json:"vcpu"`
- UsageType string `json:"usagetype"`
- OperatingSystem string `json:"operatingSystem"`
- PreInstalledSw string `json:"preInstalledSw"`
- InstanceFamily string `json:"instanceFamily"`
- GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
- }
- // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
- type AWSPricingTerms struct {
- OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
- Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
- }
- // AWSOfferTerm is a sku extension used to pay for the node.
- type AWSOfferTerm struct {
- Sku string `json:"sku"`
- PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
- }
- // AWSRateCode encodes data about the price of a product
- type AWSRateCode struct {
- Unit string `json:"unit"`
- PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
- }
- // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
- type AWSCurrencyCode struct {
- USD string `json:"USD"`
- }
- // AWSProductTerms represents the full terms of the product
- type AWSProductTerms struct {
- Sku string `json:"sku"`
- OnDemand *AWSOfferTerm `json:"OnDemand"`
- Reserved *AWSOfferTerm `json:"Reserved"`
- Memory string `json:"memory"`
- Storage string `json:"storage"`
- VCpu string `json:"vcpu"`
- GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
- PV *PV `json:"pv"`
- }
- // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
- const ClusterIdEnvVar = "AWS_CLUSTER_ID"
- // OnDemandRateCode is appended to an node sku
- const OnDemandRateCode = ".JRTCKXETXF"
- // ReservedRateCode is appended to a node sku
- const ReservedRateCode = ".38NPMPTW36"
- // HourlyRateCode is appended to a node sku
- const HourlyRateCode = ".6YS6EN2CT7"
- // volTypes are used to map between AWS UsageTypes and
- // EBS volume types, as they would appear in K8s storage class
- // name and the EC2 API.
- var volTypes = map[string]string{
- "EBS:VolumeUsage.gp2": "gp2",
- "EBS:VolumeUsage": "standard",
- "EBS:VolumeUsage.sc1": "sc1",
- "EBS:VolumeP-IOPS.piops": "io1",
- "EBS:VolumeUsage.st1": "st1",
- "EBS:VolumeUsage.piops": "io1",
- "gp2": "EBS:VolumeUsage.gp2",
- "standard": "EBS:VolumeUsage",
- "sc1": "EBS:VolumeUsage.sc1",
- "io1": "EBS:VolumeUsage.piops",
- "st1": "EBS:VolumeUsage.st1",
- }
- // locationToRegion maps AWS region names (As they come from Billing)
- // to actual region identifiers
- var locationToRegion = map[string]string{
- "US East (Ohio)": "us-east-2",
- "US East (N. Virginia)": "us-east-1",
- "US West (N. California)": "us-west-1",
- "US West (Oregon)": "us-west-2",
- "Asia Pacific (Hong Kong)": "ap-east-1",
- "Asia Pacific (Mumbai)": "ap-south-1",
- "Asia Pacific (Osaka-Local)": "ap-northeast-3",
- "Asia Pacific (Seoul)": "ap-northeast-2",
- "Asia Pacific (Singapore)": "ap-southeast-1",
- "Asia Pacific (Sydney)": "ap-southeast-2",
- "Asia Pacific (Tokyo)": "ap-northeast-1",
- "Canada (Central)": "ca-central-1",
- "China (Beijing)": "cn-north-1",
- "China (Ningxia)": "cn-northwest-1",
- "EU (Frankfurt)": "eu-central-1",
- "EU (Ireland)": "eu-west-1",
- "EU (London)": "eu-west-2",
- "EU (Paris)": "eu-west-3",
- "EU (Stockholm)": "eu-north-1",
- "South America (Sao Paulo)": "sa-east-1",
- "AWS GovCloud (US-East)": "us-gov-east-1",
- "AWS GovCloud (US)": "us-gov-west-1",
- }
- var regionToBillingRegionCode = map[string]string{
- "us-east-2": "USE2",
- "us-east-1": "",
- "us-west-1": "USW1",
- "us-west-2": "USW2",
- "ap-east-1": "APE1",
- "ap-south-1": "APS3",
- "ap-northeast-3": "APN3",
- "ap-northeast-2": "APN2",
- "ap-southeast-1": "APS1",
- "ap-southeast-2": "APS2",
- "ap-northeast-1": "APN1",
- "ca-central-1": "CAN1",
- "cn-north-1": "",
- "cn-northwest-1": "",
- "eu-central-1": "EUC1",
- "eu-west-1": "EU",
- "eu-west-2": "EUW2",
- "eu-west-3": "EUW3",
- "eu-north-1": "EUN1",
- "sa-east-1": "SAE1",
- "us-gov-east-1": "UGE1",
- "us-gov-west-1": "UGW1",
- }
- func (aws *AWS) GetLocalStorageQuery(offset string) (string, error) {
- return "", nil
- }
- // KubeAttrConversion maps the k8s labels for region to an aws region
- func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
- operatingSystem = strings.ToLower(operatingSystem)
- region := locationToRegion[location]
- return region + "," + instanceType + "," + operatingSystem
- }
- type AwsSpotFeedInfo struct {
- BucketName string `json:"bucketName"`
- Prefix string `json:"prefix"`
- Region string `json:"region"`
- AccountID string `json:"projectID"`
- ServiceKeyName string `json:"serviceKeyName"`
- ServiceKeySecret string `json:"serviceKeySecret"`
- SpotLabel string `json:"spotLabel"`
- SpotLabelValue string `json:"spotLabelValue"`
- }
- type AwsAthenaInfo struct {
- AthenaBucketName string `json:"athenaBucketName"`
- AthenaRegion string `json:"athenaRegion"`
- AthenaDatabase string `json:"athenaDatabase"`
- AthenaTable string `json:"athenaTable"`
- ServiceKeyName string `json:"serviceKeyName"`
- ServiceKeySecret string `json:"serviceKeySecret"`
- AccountID string `json:"projectID"`
- }
- func (aws *AWS) GetManagementPlatform() (string, error) {
- nodes, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
- if err != nil {
- return "", err
- }
- if len(nodes.Items) > 0 {
- n := nodes.Items[0]
- version := n.Status.NodeInfo.KubeletVersion
- if strings.Contains(version, "eks") {
- return "eks", nil
- }
- if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
- return "kops", nil
- }
- }
- return "", nil
- }
- func (aws *AWS) GetConfig() (*CustomPricing, error) {
- c, err := GetDefaultPricingData("aws.json")
- if c.Discount == "" {
- c.Discount = "0%"
- }
- if err != nil {
- return nil, err
- }
- return c, nil
- }
- func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
- c, err := GetDefaultPricingData("aws.json")
- if err != nil {
- return nil, err
- }
- if updateType == SpotInfoUpdateType {
- a := AwsSpotFeedInfo{}
- err := json.NewDecoder(r).Decode(&a)
- if err != nil {
- return nil, err
- }
- if err != nil {
- return nil, err
- }
- c.ServiceKeyName = a.ServiceKeyName
- c.ServiceKeySecret = a.ServiceKeySecret
- c.SpotDataPrefix = a.Prefix
- c.SpotDataBucket = a.BucketName
- c.ProjectID = a.AccountID
- c.SpotDataRegion = a.Region
- c.SpotLabel = a.SpotLabel
- c.SpotLabelValue = a.SpotLabelValue
- } else if updateType == AthenaInfoUpdateType {
- a := AwsAthenaInfo{}
- err := json.NewDecoder(r).Decode(&a)
- if err != nil {
- return nil, err
- }
- c.AthenaBucketName = a.AthenaBucketName
- c.AthenaRegion = a.AthenaRegion
- c.AthenaDatabase = a.AthenaDatabase
- c.AthenaTable = a.AthenaTable
- c.ServiceKeyName = a.ServiceKeyName
- c.ServiceKeySecret = a.ServiceKeySecret
- c.ProjectID = a.AccountID
- } else {
- a := make(map[string]string)
- err = json.NewDecoder(r).Decode(&a)
- if err != nil {
- return nil, err
- }
- for k, v := range a {
- kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
- err := SetCustomPricingField(c, kUpper, v)
- if err != nil {
- return nil, err
- }
- }
- }
- cj, err := json.Marshal(c)
- if err != nil {
- return nil, err
- }
- path := os.Getenv("CONFIG_PATH")
- if path == "" {
- path = "/models/"
- }
- path += "aws.json"
- remoteEnabled := os.Getenv(remoteEnabled)
- if remoteEnabled == "true" {
- err = UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
- if err != nil {
- return nil, err
- }
- }
- err = ioutil.WriteFile(path, cj, 0644)
- if err != nil {
- return nil, err
- }
- return c, nil
- }
- type awsKey struct {
- SpotLabelName string
- SpotLabelValue string
- Labels map[string]string
- ProviderID string
- }
- func (k *awsKey) GPUType() string {
- return ""
- }
- func (k *awsKey) ID() string {
- provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
- for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
- if matchNum == 2 {
- return group
- }
- }
- klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
- return ""
- }
- func (k *awsKey) Features() string {
- instanceType := k.Labels[v1.LabelInstanceType]
- var operatingSystem string
- operatingSystem, ok := k.Labels[v1.LabelOSStable]
- if !ok {
- operatingSystem = k.Labels["beta.kubernetes.io/os"]
- }
- region := k.Labels[v1.LabelZoneRegion]
- key := region + "," + instanceType + "," + operatingSystem
- usageType := "preemptible"
- spotKey := key + "," + usageType
- if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
- return spotKey
- }
- if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
- return spotKey
- }
- return key
- }
- func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
- pricing, ok := aws.Pricing[pvk.Features()]
- if !ok {
- klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
- return &PV{}, nil
- }
- return pricing.PV, nil
- }
- type awsPVKey struct {
- Labels map[string]string
- StorageClassParameters map[string]string
- StorageClassName string
- Name string
- }
- func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
- return &awsPVKey{
- Labels: pv.Labels,
- StorageClassName: pv.Spec.StorageClassName,
- StorageClassParameters: parameters,
- Name: pv.Name,
- }
- }
- func (key *awsPVKey) GetStorageClass() string {
- return key.StorageClassName
- }
- func (key *awsPVKey) Features() string {
- storageClass := key.StorageClassParameters["type"]
- if storageClass == "standard" {
- storageClass = "gp2"
- }
- // Storage class names are generally EBS volume types (gp2)
- // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
- // Converts between the 2
- region := key.Labels[v1.LabelZoneRegion]
- //if region == "" {
- // region = "us-east-1"
- //}
- class, ok := volTypes[storageClass]
- if !ok {
- klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
- }
- return region + "," + class
- }
- // GetKey maps node labels to information needed to retrieve pricing data
- func (aws *AWS) GetKey(labels map[string]string) Key {
- return &awsKey{
- SpotLabelName: aws.SpotLabelName,
- SpotLabelValue: aws.SpotLabelValue,
- Labels: labels,
- ProviderID: labels["providerID"],
- }
- }
- func (aws *AWS) isPreemptible(key string) bool {
- s := strings.Split(key, ",")
- if len(s) == 4 && s[3] == "preemptible" {
- return true
- }
- return false
- }
- // DownloadPricingData fetches data from the AWS Pricing API
- func (aws *AWS) DownloadPricingData() error {
- aws.DownloadPricingDataLock.Lock()
- defer aws.DownloadPricingDataLock.Unlock()
- c, err := GetDefaultPricingData("aws.json")
- if err != nil {
- klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
- }
- aws.BaseCPUPrice = c.CPU
- aws.BaseRAMPrice = c.RAM
- aws.BaseGPUPrice = c.GPU
- aws.BaseSpotCPUPrice = c.SpotCPU
- aws.BaseSpotRAMPrice = c.SpotRAM
- aws.SpotLabelName = c.SpotLabel
- aws.SpotLabelValue = c.SpotLabelValue
- aws.SpotDataBucket = c.SpotDataBucket
- aws.SpotDataPrefix = c.SpotDataPrefix
- aws.ProjectID = c.ProjectID
- aws.SpotDataRegion = c.SpotDataRegion
- aws.ServiceKeyName = c.ServiceKeyName
- aws.ServiceKeySecret = c.ServiceKeySecret
- if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
- klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
- }
- nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
- if err != nil {
- return err
- }
- inputkeys := make(map[string]bool)
- for _, n := range nodeList.Items {
- labels := n.GetObjectMeta().GetLabels()
- key := aws.GetKey(labels)
- inputkeys[key.Features()] = true
- }
- pvList, err := aws.Clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
- if err != nil {
- return err
- }
- storageClasses, err := aws.Clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
- storageClassMap := make(map[string]map[string]string)
- for _, storageClass := range storageClasses.Items {
- params := storageClass.Parameters
- storageClassMap[storageClass.ObjectMeta.Name] = params
- if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
- storageClassMap["default"] = params
- storageClassMap[""] = params
- }
- }
- pvkeys := make(map[string]PVKey)
- for _, pv := range pvList.Items {
- params, ok := storageClassMap[pv.Spec.StorageClassName]
- if !ok {
- klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
- continue
- }
- key := aws.GetPVKey(&pv, params)
- pvkeys[key.Features()] = key
- }
- aws.Pricing = make(map[string]*AWSProductTerms)
- aws.ValidPricingKeys = make(map[string]bool)
- skusToKeys := make(map[string]string)
- pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
- klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
- resp, err := http.Get(pricingURL)
- if err != nil {
- klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
- return err
- }
- klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
- dec := json.NewDecoder(resp.Body)
- for {
- t, err := dec.Token()
- if err == io.EOF {
- klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
- break
- }
- if t == "products" {
- _, err := dec.Token() // this should parse the opening "{""
- if err != nil {
- return err
- }
- for dec.More() {
- _, err := dec.Token() // the sku token
- if err != nil {
- return err
- }
- product := &AWSProduct{}
- err = dec.Decode(&product)
- if err != nil {
- klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
- break
- }
- if product.Attributes.PreInstalledSw == "NA" &&
- (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
- key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
- spotKey := key + ",preemptible"
- if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
- productTerms := &AWSProductTerms{
- Sku: product.Sku,
- Memory: product.Attributes.Memory,
- Storage: product.Attributes.Storage,
- VCpu: product.Attributes.VCpu,
- GPU: product.Attributes.GPU,
- }
- aws.Pricing[key] = productTerms
- aws.Pricing[spotKey] = productTerms
- skusToKeys[product.Sku] = key
- }
- aws.ValidPricingKeys[key] = true
- aws.ValidPricingKeys[spotKey] = true
- } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
- // UsageTypes may be prefixed with a region code - we're removing this when using
- // volTypes to keep lookups generic
- usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
- usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
- usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
- key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
- spotKey := key + ",preemptible"
- pv := &PV{
- Class: volTypes[usageTypeNoRegion],
- Region: locationToRegion[product.Attributes.Location],
- }
- productTerms := &AWSProductTerms{
- Sku: product.Sku,
- PV: pv,
- }
- aws.Pricing[key] = productTerms
- aws.Pricing[spotKey] = productTerms
- skusToKeys[product.Sku] = key
- aws.ValidPricingKeys[key] = true
- aws.ValidPricingKeys[spotKey] = true
- }
- }
- }
- if t == "terms" {
- _, err := dec.Token() // this should parse the opening "{""
- if err != nil {
- return err
- }
- termType, err := dec.Token()
- if err != nil {
- return err
- }
- if termType == "OnDemand" {
- _, err := dec.Token()
- if err != nil { // again, should parse an opening "{"
- return err
- }
- for dec.More() {
- sku, err := dec.Token()
- if err != nil {
- return err
- }
- _, err = dec.Token() // another opening "{"
- if err != nil {
- return err
- }
- skuOnDemand, err := dec.Token()
- if err != nil {
- return err
- }
- offerTerm := &AWSOfferTerm{}
- err = dec.Decode(&offerTerm)
- if err != nil {
- klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
- }
- if sku.(string)+OnDemandRateCode == skuOnDemand {
- key, ok := skusToKeys[sku.(string)]
- spotKey := key + ",preemptible"
- if ok {
- aws.Pricing[key].OnDemand = offerTerm
- aws.Pricing[spotKey].OnDemand = offerTerm
- if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
- // If the specific UsageType is the per IO cost used on io1 volumes
- // we need to add the per IO cost to the io1 PV cost
- cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
- // Add the per IO cost to the PV object for the io1 volume type
- aws.Pricing[key].PV.CostPerIO = cost
- } else if strings.Contains(key, "EBS:Volume") {
- // If volume, we need to get hourly cost and add it to the PV object
- cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
- costFloat, _ := strconv.ParseFloat(cost, 64)
- hourlyPrice := costFloat / 730
- aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
- }
- }
- }
- _, err = dec.Token()
- if err != nil {
- return err
- }
- }
- _, err = dec.Token()
- if err != nil {
- return err
- }
- }
- }
- }
- sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
- if err != nil {
- klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
- } else {
- aws.SpotPricingByInstanceID = sp
- }
- return nil
- }
- // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
- func (c *AWS) NetworkPricing() (*Network, error) {
- cpricing, err := GetDefaultPricingData("aws.json")
- if err != nil {
- return nil, err
- }
- znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
- if err != nil {
- return nil, err
- }
- rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
- if err != nil {
- return nil, err
- }
- inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
- if err != nil {
- return nil, err
- }
- return &Network{
- ZoneNetworkEgressCost: znec,
- RegionNetworkEgressCost: rnec,
- InternetNetworkEgressCost: inec,
- }, nil
- }
- // AllNodePricing returns all the billing data fetched.
- func (aws *AWS) AllNodePricing() (interface{}, error) {
- aws.DownloadPricingDataLock.RLock()
- defer aws.DownloadPricingDataLock.RUnlock()
- return aws.Pricing, nil
- }
- func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
- key := k.Features()
- if aws.isPreemptible(key) {
- if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
- var spotcost string
- arr := strings.Split(spotInfo.Charge, " ")
- if len(arr) == 2 {
- spotcost = arr[0]
- } else {
- klog.V(2).Infof("Spot data for node %s is missing", k.ID())
- }
- klog.V(1).Infof("SPOT COST FOR %s: %s", k.Features, spotcost)
- return &Node{
- Cost: spotcost,
- VCPU: terms.VCpu,
- RAM: terms.Memory,
- GPU: terms.GPU,
- Storage: terms.Storage,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- }, nil
- }
- return &Node{
- VCPU: terms.VCpu,
- VCPUCost: aws.BaseSpotCPUPrice,
- RAM: terms.Memory,
- GPU: terms.GPU,
- RAMCost: aws.BaseSpotRAMPrice,
- Storage: terms.Storage,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- }, nil
- }
- c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
- if !ok {
- return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
- }
- cost := c.PricePerUnit.USD
- return &Node{
- Cost: cost,
- VCPU: terms.VCpu,
- RAM: terms.Memory,
- GPU: terms.GPU,
- Storage: terms.Storage,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- }, nil
- }
- // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
- func (aws *AWS) NodePricing(k Key) (*Node, error) {
- aws.DownloadPricingDataLock.RLock()
- defer aws.DownloadPricingDataLock.RUnlock()
- key := k.Features()
- usageType := "ondemand"
- if aws.isPreemptible(key) {
- usageType = "preemptible"
- }
- terms, ok := aws.Pricing[key]
- if ok {
- return aws.createNode(terms, usageType, k)
- } else if _, ok := aws.ValidPricingKeys[key]; ok {
- aws.DownloadPricingDataLock.RUnlock()
- err := aws.DownloadPricingData()
- aws.DownloadPricingDataLock.RLock()
- if err != nil {
- return &Node{
- Cost: aws.BaseCPUPrice,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- UsesBaseCPUPrice: true,
- }, err
- }
- terms, termsOk := aws.Pricing[key]
- if !termsOk {
- return &Node{
- Cost: aws.BaseCPUPrice,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- UsesBaseCPUPrice: true,
- }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
- }
- return aws.createNode(terms, usageType, k)
- } else { // Fall back to base pricing if we can't find the key.
- klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
- return &Node{
- Cost: aws.BaseCPUPrice,
- BaseCPUPrice: aws.BaseCPUPrice,
- BaseRAMPrice: aws.BaseRAMPrice,
- BaseGPUPrice: aws.BaseGPUPrice,
- UsageType: usageType,
- UsesBaseCPUPrice: true,
- }, nil
- }
- }
- // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
- func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
- defaultClusterName := "AWS Cluster #1"
- c, err := awsProvider.GetConfig()
- remote := os.Getenv(remoteEnabled)
- remoteEnabled := false
- if os.Getenv(remote) == "true" {
- remoteEnabled = true
- }
- if c.ClusterName != "" {
- m := make(map[string]string)
- m["name"] = c.ClusterName
- m["provider"] = "AWS"
- m["id"] = os.Getenv(clusterIDKey)
- m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
- return m, nil
- }
- makeStructure := func(clusterName string) (map[string]string, error) {
- klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
- m := make(map[string]string)
- m["name"] = clusterName
- m["provider"] = "AWS"
- m["id"] = os.Getenv(clusterIDKey)
- m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
- return m, nil
- }
- maybeClusterId := os.Getenv(ClusterIdEnvVar)
- if len(maybeClusterId) != 0 {
- return makeStructure(maybeClusterId)
- }
- provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
- clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
- nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
- if err != nil {
- return nil, err
- }
- for _, n := range nodeList.Items {
- region := ""
- instanceId := ""
- providerId := n.Spec.ProviderID
- for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
- if matchNum == 1 {
- region = group
- } else if matchNum == 2 {
- instanceId = group
- }
- }
- if len(instanceId) == 0 {
- klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
- continue
- }
- c := &aws.Config{
- Region: aws.String(region),
- }
- s := session.Must(session.NewSession(c))
- ec2Svc := ec2.New(s)
- di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
- InstanceIds: []*string{
- aws.String(instanceId),
- },
- })
- if diErr != nil {
- // maybe log this?
- continue
- }
- if len(di.Reservations) != 1 {
- klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
- continue
- }
- res := di.Reservations[0]
- if len(res.Instances) != 1 {
- klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
- continue
- }
- inst := res.Instances[0]
- for _, tag := range inst.Tags {
- tagKey := *tag.Key
- for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
- if matchNum != 1 {
- continue
- }
- return makeStructure(group)
- }
- }
- }
- klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
- return makeStructure(defaultClusterName)
- }
- // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
- func (*AWS) AddServiceKey(formValues url.Values) error {
- keyID := formValues.Get("access_key_ID")
- key := formValues.Get("secret_access_key")
- m := make(map[string]string)
- m["access_key_ID"] = keyID
- m["secret_access_key"] = key
- result, err := json.Marshal(m)
- if err != nil {
- return err
- }
- return ioutil.WriteFile("/var/configs/key.json", result, 0644)
- }
- // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
- func (*AWS) GetDisks() ([]byte, error) {
- jsonFile, err := os.Open("/var/configs/key.json")
- if err == nil {
- byteValue, _ := ioutil.ReadAll(jsonFile)
- var result map[string]string
- err := json.Unmarshal([]byte(byteValue), &result)
- if err != nil {
- return nil, err
- }
- err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
- if err != nil {
- return nil, err
- }
- err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
- if err != nil {
- return nil, err
- }
- } else if os.IsNotExist(err) {
- klog.V(2).Infof("Using Default Credentials")
- } else {
- return nil, err
- }
- defer jsonFile.Close()
- clusterConfig, err := os.Open("/var/configs/cluster.json")
- if err != nil {
- return nil, err
- }
- defer clusterConfig.Close()
- b, err := ioutil.ReadAll(clusterConfig)
- if err != nil {
- return nil, err
- }
- var clusterConf map[string]string
- err = json.Unmarshal([]byte(b), &clusterConf)
- if err != nil {
- return nil, err
- }
- region := aws.String(clusterConf["region"])
- c := &aws.Config{
- Region: region,
- }
- s := session.Must(session.NewSession(c))
- ec2Svc := ec2.New(s)
- input := &ec2.DescribeVolumesInput{}
- volumeResult, err := ec2Svc.DescribeVolumes(input)
- if err != nil {
- if aerr, ok := err.(awserr.Error); ok {
- switch aerr.Code() {
- default:
- return nil, aerr
- }
- } else {
- return nil, err
- }
- }
- return json.Marshal(volumeResult)
- }
- // ConvertToGlueColumnFormat takes a string and runs through various regex
- // and string replacement statements to convert it to a format compatible
- // with AWS Glue and Athena column names.
- // Following guidance from AWS provided here ('Column Names' section):
- // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
- // It returns a string containing the column name in proper column name format and length.
- func ConvertToGlueColumnFormat(column_name string) string {
- klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
- // An underscore is added in front of uppercase letters
- capital_underscore := regexp.MustCompile(`[A-Z]`)
- final := capital_underscore.ReplaceAllString(column_name, `_$0`)
- // Any non-alphanumeric characters are replaced with an underscore
- no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
- final = no_space_punc.ReplaceAllString(final, "_")
- // Duplicate underscores are removed
- no_dup_underscore := regexp.MustCompile(`_{2,}`)
- final = no_dup_underscore.ReplaceAllString(final, "_")
- // Any leading and trailing underscores are removed
- no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
- final = no_front_end_underscore.ReplaceAllString(final, "")
- // Uppercase to lowercase
- final = strings.ToLower(final)
- // Longer column name than expected - remove _ left to right
- allowed_col_len := 128
- undersc_to_remove := len(final) - allowed_col_len
- if undersc_to_remove > 0 {
- final = strings.Replace(final, "_", "", undersc_to_remove)
- }
- // If removing all of the underscores still didn't
- // make the column name < 128 characters, trim it!
- if len(final) > allowed_col_len {
- final = final[:allowed_col_len]
- }
- klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
- return final
- }
- // ExternalAllocations represents tagged assets outside the scope of kubernetes.
- // "start" and "end" are dates of the format YYYY-MM-DD
- // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
- func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
- customPricing, err := a.GetConfig()
- if err != nil {
- return nil, err
- }
- aggregator_column_name := "resource_tags_user_" + aggregator
- aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
- query := fmt.Sprintf(`SELECT
- CAST(line_item_usage_start_date AS DATE) as start_date,
- %s,
- line_item_product_code,
- SUM(line_item_blended_cost) as blended_cost
- FROM %s as cost_data
- WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
- GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
- if customPricing.ServiceKeyName != "" {
- err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
- if err != nil {
- return nil, err
- }
- err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
- if err != nil {
- return nil, err
- }
- }
- region := aws.String(customPricing.AthenaRegion)
- resultsBucket := customPricing.AthenaBucketName
- database := customPricing.AthenaDatabase
- c := &aws.Config{
- Region: region,
- }
- s := session.Must(session.NewSession(c))
- svc := athena.New(s)
- var e athena.StartQueryExecutionInput
- var r athena.ResultConfiguration
- r.SetOutputLocation(resultsBucket)
- e.SetResultConfiguration(&r)
- e.SetQueryString(query)
- var q athena.QueryExecutionContext
- q.SetDatabase(database)
- e.SetQueryExecutionContext(&q)
- res, err := svc.StartQueryExecution(&e)
- if err != nil {
- return nil, err
- }
- klog.V(2).Infof("StartQueryExecution result:")
- klog.V(2).Infof(res.GoString())
- var qri athena.GetQueryExecutionInput
- qri.SetQueryExecutionId(*res.QueryExecutionId)
- var qrop *athena.GetQueryExecutionOutput
- duration := time.Duration(2) * time.Second // Pause for 2 seconds
- for {
- qrop, err = svc.GetQueryExecution(&qri)
- if err != nil {
- return nil, err
- }
- if *qrop.QueryExecution.Status.State != "RUNNING" {
- break
- }
- time.Sleep(duration)
- }
- var oocAllocs []*OutOfClusterAllocation
- if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
- var ip athena.GetQueryResultsInput
- ip.SetQueryExecutionId(*res.QueryExecutionId)
- op, err := svc.GetQueryResults(&ip)
- if err != nil {
- return nil, err
- }
- for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
- cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
- if err != nil {
- return nil, err
- }
- ooc := &OutOfClusterAllocation{
- Aggregator: aggregator,
- Environment: *r.Data[1].VarCharValue,
- Service: *r.Data[2].VarCharValue,
- Cost: cost,
- }
- oocAllocs = append(oocAllocs, ooc)
- }
- }
- return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
- }
- // QuerySQL can query a properly configured Athena database.
- // Used to fetch billing data.
- // Requires a json config in /var/configs with key region, output, and database.
- func (a *AWS) QuerySQL(query string) ([]byte, error) {
- customPricing, err := a.GetConfig()
- if err != nil {
- return nil, err
- }
- if customPricing.ServiceKeyName != "" {
- err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
- if err != nil {
- return nil, err
- }
- err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
- if err != nil {
- return nil, err
- }
- }
- athenaConfigs, err := os.Open("/var/configs/athena.json")
- if err != nil {
- return nil, err
- }
- defer athenaConfigs.Close()
- b, err := ioutil.ReadAll(athenaConfigs)
- if err != nil {
- return nil, err
- }
- var athenaConf map[string]string
- json.Unmarshal([]byte(b), &athenaConf)
- region := aws.String(customPricing.AthenaRegion)
- resultsBucket := customPricing.AthenaBucketName
- database := customPricing.AthenaDatabase
- c := &aws.Config{
- Region: region,
- }
- s := session.Must(session.NewSession(c))
- svc := athena.New(s)
- var e athena.StartQueryExecutionInput
- var r athena.ResultConfiguration
- r.SetOutputLocation(resultsBucket)
- e.SetResultConfiguration(&r)
- e.SetQueryString(query)
- var q athena.QueryExecutionContext
- q.SetDatabase(database)
- e.SetQueryExecutionContext(&q)
- res, err := svc.StartQueryExecution(&e)
- if err != nil {
- return nil, err
- }
- klog.V(2).Infof("StartQueryExecution result:")
- klog.V(2).Infof(res.GoString())
- var qri athena.GetQueryExecutionInput
- qri.SetQueryExecutionId(*res.QueryExecutionId)
- var qrop *athena.GetQueryExecutionOutput
- duration := time.Duration(2) * time.Second // Pause for 2 seconds
- for {
- qrop, err = svc.GetQueryExecution(&qri)
- if err != nil {
- return nil, err
- }
- if *qrop.QueryExecution.Status.State != "RUNNING" {
- break
- }
- time.Sleep(duration)
- }
- if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
- var ip athena.GetQueryResultsInput
- ip.SetQueryExecutionId(*res.QueryExecutionId)
- op, err := svc.GetQueryResults(&ip)
- if err != nil {
- return nil, err
- }
- b, err := json.Marshal(op.ResultSet)
- if err != nil {
- return nil, err
- }
- return b, nil
- }
- return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
- }
- type spotInfo struct {
- Timestamp string `csv:"Timestamp"`
- UsageType string `csv:"UsageType"`
- Operation string `csv:"Operation"`
- InstanceID string `csv:"InstanceID"`
- MyBidID string `csv:"MyBidID"`
- MyMaxPrice string `csv:"MyMaxPrice"`
- MarketPrice string `csv:"MarketPrice"`
- Charge string `csv:"Charge"`
- Version string `csv:"Version"`
- }
- type fnames []*string
- func (f fnames) Len() int {
- return len(f)
- }
- func (f fnames) Swap(i, j int) {
- f[i], f[j] = f[j], f[i]
- }
- func (f fnames) Less(i, j int) bool {
- key1 := strings.Split(*f[i], ".")
- key2 := strings.Split(*f[j], ".")
- t1, err := time.Parse("2006-01-02-15", key1[1])
- if err != nil {
- klog.V(1).Info("Unable to parse timestamp" + key1[1])
- return false
- }
- t2, err := time.Parse("2006-01-02-15", key2[1])
- if err != nil {
- klog.V(1).Info("Unable to parse timestamp" + key2[1])
- return false
- }
- return t1.Before(t2)
- }
- func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
- if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
- err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
- if err != nil {
- return nil, err
- }
- err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
- if err != nil {
- return nil, err
- }
- }
- s3Prefix := projectID
- if len(prefix) != 0 {
- s3Prefix = prefix + "/" + s3Prefix
- }
- c := aws.NewConfig().WithRegion(region)
- s := session.Must(session.NewSession(c))
- s3Svc := s3.New(s)
- downloader := s3manager.NewDownloaderWithClient(s3Svc)
- tNow := time.Now()
- tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
- ls := &s3.ListObjectsInput{
- Bucket: aws.String(bucket),
- Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
- }
- ls2 := &s3.ListObjectsInput{
- Bucket: aws.String(bucket),
- Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
- }
- lso, err := s3Svc.ListObjects(ls)
- if err != nil {
- return nil, err
- }
- lsoLen := len(lso.Contents)
- klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
- if lsoLen == 0 {
- klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
- }
- lso2, err := s3Svc.ListObjects(ls2)
- if err != nil {
- return nil, err
- }
- lso2Len := len(lso2.Contents)
- klog.V(2).Infof("Found %d spot data files from today", lso2Len)
- if lso2Len == 0 {
- klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
- }
- var keys []*string
- for _, obj := range lso.Contents {
- keys = append(keys, obj.Key)
- }
- for _, obj := range lso2.Contents {
- keys = append(keys, obj.Key)
- }
- versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
- header, err := csvutil.Header(spotInfo{}, "csv")
- if err != nil {
- return nil, err
- }
- fieldsPerRecord := len(header)
- spots := make(map[string]*spotInfo)
- for _, key := range keys {
- getObj := &s3.GetObjectInput{
- Bucket: aws.String(bucket),
- Key: key,
- }
- buf := aws.NewWriteAtBuffer([]byte{})
- _, err := downloader.Download(buf, getObj)
- if err != nil {
- return nil, err
- }
- r := bytes.NewReader(buf.Bytes())
- gr, err := gzip.NewReader(r)
- if err != nil {
- return nil, err
- }
- csvReader := csv.NewReader(gr)
- csvReader.Comma = '\t'
- csvReader.FieldsPerRecord = fieldsPerRecord
- dec, err := csvutil.NewDecoder(csvReader, header...)
- if err != nil {
- return nil, err
- }
- var foundVersion string
- for {
- spot := spotInfo{}
- err := dec.Decode(&spot)
- csvParseErr, isCsvParseErr := err.(*csv.ParseError)
- if err == io.EOF {
- break
- } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
- rec := dec.Record()
- // the first two "Record()" will be the comment lines
- // and they show up as len() == 1
- // the first of which is "#Version"
- // the second of which is "#Fields: "
- if len(rec) != 1 {
- klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
- continue
- }
- if len(foundVersion) == 0 {
- spotFeedVersion := rec[0]
- klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
- matches := versionRx.FindStringSubmatch(spotFeedVersion)
- if matches != nil {
- foundVersion = matches[1]
- if foundVersion != supportedSpotFeedVersion {
- klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
- break
- }
- }
- continue
- } else if strings.Index(rec[0], "#") == 0 {
- continue
- } else {
- klog.V(3).Infof("skipping non-TSV line: %s", rec)
- continue
- }
- } else if err != nil {
- klog.V(2).Infof("Error during spot info decode: %+v", err)
- continue
- }
- klog.V(3).Infof("Found spot info %+v", spot)
- spots[spot.InstanceID] = &spot
- }
- gr.Close()
- }
- return spots, nil
- }
|