awsprovider.go 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "math"
  11. "net/http"
  12. "net/url"
  13. "os"
  14. "regexp"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "time"
  19. "k8s.io/klog"
  20. "github.com/aws/aws-sdk-go/aws"
  21. "github.com/aws/aws-sdk-go/aws/awserr"
  22. "github.com/aws/aws-sdk-go/aws/session"
  23. "github.com/aws/aws-sdk-go/service/athena"
  24. "github.com/aws/aws-sdk-go/service/ec2"
  25. "github.com/aws/aws-sdk-go/service/s3"
  26. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  27. "github.com/jszwec/csvutil"
  28. v1 "k8s.io/api/core/v1"
  29. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  30. "k8s.io/client-go/kubernetes"
  31. )
  32. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  33. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  34. const supportedSpotFeedVersion = "1"
  35. const SpotInfoUpdateType = "spotinfo"
  36. const AthenaInfoUpdateType = "athenainfo"
  37. // AWS represents an Amazon Provider
  38. type AWS struct {
  39. Pricing map[string]*AWSProductTerms
  40. SpotPricingByInstanceID map[string]*spotInfo
  41. ValidPricingKeys map[string]bool
  42. Clientset *kubernetes.Clientset
  43. BaseCPUPrice string
  44. BaseRAMPrice string
  45. BaseGPUPrice string
  46. BaseSpotCPUPrice string
  47. BaseSpotRAMPrice string
  48. SpotLabelName string
  49. SpotLabelValue string
  50. ServiceKeyName string
  51. ServiceKeySecret string
  52. SpotDataRegion string
  53. SpotDataBucket string
  54. SpotDataPrefix string
  55. ProjectID string
  56. DownloadPricingDataLock sync.RWMutex
  57. *CustomProvider
  58. }
  59. // AWSPricing maps a k8s node to an AWS Pricing "product"
  60. type AWSPricing struct {
  61. Products map[string]*AWSProduct `json:"products"`
  62. Terms AWSPricingTerms `json:"terms"`
  63. }
  64. // AWSProduct represents a purchased SKU
  65. type AWSProduct struct {
  66. Sku string `json:"sku"`
  67. Attributes AWSProductAttributes `json:"attributes"`
  68. }
  69. // AWSProductAttributes represents metadata about the product used to map to a node.
  70. type AWSProductAttributes struct {
  71. Location string `json:"location"`
  72. InstanceType string `json:"instanceType"`
  73. Memory string `json:"memory"`
  74. Storage string `json:"storage"`
  75. VCpu string `json:"vcpu"`
  76. UsageType string `json:"usagetype"`
  77. OperatingSystem string `json:"operatingSystem"`
  78. PreInstalledSw string `json:"preInstalledSw"`
  79. InstanceFamily string `json:"instanceFamily"`
  80. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  81. }
  82. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  83. type AWSPricingTerms struct {
  84. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  85. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  86. }
  87. // AWSOfferTerm is a sku extension used to pay for the node.
  88. type AWSOfferTerm struct {
  89. Sku string `json:"sku"`
  90. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  91. }
  92. // AWSRateCode encodes data about the price of a product
  93. type AWSRateCode struct {
  94. Unit string `json:"unit"`
  95. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  96. }
  97. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  98. type AWSCurrencyCode struct {
  99. USD string `json:"USD"`
  100. }
  101. // AWSProductTerms represents the full terms of the product
  102. type AWSProductTerms struct {
  103. Sku string `json:"sku"`
  104. OnDemand *AWSOfferTerm `json:"OnDemand"`
  105. Reserved *AWSOfferTerm `json:"Reserved"`
  106. Memory string `json:"memory"`
  107. Storage string `json:"storage"`
  108. VCpu string `json:"vcpu"`
  109. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  110. PV *PV `json:"pv"`
  111. }
  112. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  113. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  114. // OnDemandRateCode is appended to an node sku
  115. const OnDemandRateCode = ".JRTCKXETXF"
  116. // ReservedRateCode is appended to a node sku
  117. const ReservedRateCode = ".38NPMPTW36"
  118. // HourlyRateCode is appended to a node sku
  119. const HourlyRateCode = ".6YS6EN2CT7"
  120. // volTypes are used to map between AWS UsageTypes and
  121. // EBS volume types, as they would appear in K8s storage class
  122. // name and the EC2 API.
  123. var volTypes = map[string]string{
  124. "EBS:VolumeUsage.gp2": "gp2",
  125. "EBS:VolumeUsage": "standard",
  126. "EBS:VolumeUsage.sc1": "sc1",
  127. "EBS:VolumeP-IOPS.piops": "io1",
  128. "EBS:VolumeUsage.st1": "st1",
  129. "EBS:VolumeUsage.piops": "io1",
  130. "gp2": "EBS:VolumeUsage.gp2",
  131. "standard": "EBS:VolumeUsage",
  132. "sc1": "EBS:VolumeUsage.sc1",
  133. "io1": "EBS:VolumeUsage.piops",
  134. "st1": "EBS:VolumeUsage.st1",
  135. }
  136. // locationToRegion maps AWS region names (As they come from Billing)
  137. // to actual region identifiers
  138. var locationToRegion = map[string]string{
  139. "US East (Ohio)": "us-east-2",
  140. "US East (N. Virginia)": "us-east-1",
  141. "US West (N. California)": "us-west-1",
  142. "US West (Oregon)": "us-west-2",
  143. "Asia Pacific (Hong Kong)": "ap-east-1",
  144. "Asia Pacific (Mumbai)": "ap-south-1",
  145. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  146. "Asia Pacific (Seoul)": "ap-northeast-2",
  147. "Asia Pacific (Singapore)": "ap-southeast-1",
  148. "Asia Pacific (Sydney)": "ap-southeast-2",
  149. "Asia Pacific (Tokyo)": "ap-northeast-1",
  150. "Canada (Central)": "ca-central-1",
  151. "China (Beijing)": "cn-north-1",
  152. "China (Ningxia)": "cn-northwest-1",
  153. "EU (Frankfurt)": "eu-central-1",
  154. "EU (Ireland)": "eu-west-1",
  155. "EU (London)": "eu-west-2",
  156. "EU (Paris)": "eu-west-3",
  157. "EU (Stockholm)": "eu-north-1",
  158. "South America (Sao Paulo)": "sa-east-1",
  159. "AWS GovCloud (US-East)": "us-gov-east-1",
  160. "AWS GovCloud (US)": "us-gov-west-1",
  161. }
  162. var regionToBillingRegionCode = map[string]string{
  163. "us-east-2": "USE2",
  164. "us-east-1": "",
  165. "us-west-1": "USW1",
  166. "us-west-2": "USW2",
  167. "ap-east-1": "APE1",
  168. "ap-south-1": "APS3",
  169. "ap-northeast-3": "APN3",
  170. "ap-northeast-2": "APN2",
  171. "ap-southeast-1": "APS1",
  172. "ap-southeast-2": "APS2",
  173. "ap-northeast-1": "APN1",
  174. "ca-central-1": "CAN1",
  175. "cn-north-1": "",
  176. "cn-northwest-1": "",
  177. "eu-central-1": "EUC1",
  178. "eu-west-1": "EU",
  179. "eu-west-2": "EUW2",
  180. "eu-west-3": "EUW3",
  181. "eu-north-1": "EUN1",
  182. "sa-east-1": "SAE1",
  183. "us-gov-east-1": "UGE1",
  184. "us-gov-west-1": "UGW1",
  185. }
  186. // KubeAttrConversion maps the k8s labels for region to an aws region
  187. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  188. operatingSystem = strings.ToLower(operatingSystem)
  189. region := locationToRegion[location]
  190. return region + "," + instanceType + "," + operatingSystem
  191. }
  192. type AwsSpotFeedInfo struct {
  193. BucketName string `json:"bucketName"`
  194. Prefix string `json:"prefix"`
  195. Region string `json:"region"`
  196. AccountID string `json:"projectID"`
  197. ServiceKeyName string `json:"serviceKeyName"`
  198. ServiceKeySecret string `json:"serviceKeySecret"`
  199. SpotLabel string `json:"spotLabel"`
  200. SpotLabelValue string `json:"spotLabelValue"`
  201. }
  202. type AwsAthenaInfo struct {
  203. AthenaBucketName string `json:"athenaBucketName"`
  204. AthenaRegion string `json:"athenaRegion"`
  205. AthenaDatabase string `json:"athenaDatabase"`
  206. AthenaTable string `json:"athenaTable"`
  207. ServiceKeyName string `json:"serviceKeyName"`
  208. ServiceKeySecret string `json:"serviceKeySecret"`
  209. AccountID string `json:"projectID"`
  210. }
  211. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  212. c, err := GetDefaultPricingData("aws.json")
  213. if err != nil {
  214. return nil, err
  215. }
  216. return c, nil
  217. }
  218. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  219. c, err := GetDefaultPricingData("aws.json")
  220. if err != nil {
  221. return nil, err
  222. }
  223. if updateType == SpotInfoUpdateType {
  224. a := AwsSpotFeedInfo{}
  225. err := json.NewDecoder(r).Decode(&a)
  226. if err != nil {
  227. return nil, err
  228. }
  229. if err != nil {
  230. return nil, err
  231. }
  232. c.ServiceKeyName = a.ServiceKeyName
  233. c.ServiceKeySecret = a.ServiceKeySecret
  234. c.SpotDataPrefix = a.Prefix
  235. c.SpotDataBucket = a.BucketName
  236. c.ProjectID = a.AccountID
  237. c.SpotDataRegion = a.Region
  238. c.SpotLabel = a.SpotLabel
  239. c.SpotLabelValue = a.SpotLabelValue
  240. } else if updateType == AthenaInfoUpdateType {
  241. a := AwsAthenaInfo{}
  242. err := json.NewDecoder(r).Decode(&a)
  243. if err != nil {
  244. return nil, err
  245. }
  246. c.AthenaBucketName = a.AthenaBucketName
  247. c.AthenaRegion = a.AthenaRegion
  248. c.AthenaDatabase = a.AthenaDatabase
  249. c.AthenaTable = a.AthenaTable
  250. c.ServiceKeyName = a.ServiceKeyName
  251. c.ServiceKeySecret = a.ServiceKeySecret
  252. c.ProjectID = a.AccountID
  253. } else {
  254. a := make(map[string]string)
  255. err = json.NewDecoder(r).Decode(&a)
  256. if err != nil {
  257. return nil, err
  258. }
  259. for k, v := range a {
  260. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  261. err := SetCustomPricingField(c, kUpper, v)
  262. if err != nil {
  263. return nil, err
  264. }
  265. }
  266. }
  267. cj, err := json.Marshal(c)
  268. if err != nil {
  269. return nil, err
  270. }
  271. path := os.Getenv("CONFIG_PATH")
  272. if path == "" {
  273. path = "/models/"
  274. }
  275. path += "aws.json"
  276. err = ioutil.WriteFile(path, cj, 0644)
  277. if err != nil {
  278. return nil, err
  279. }
  280. return c, nil
  281. }
  282. type awsKey struct {
  283. SpotLabelName string
  284. SpotLabelValue string
  285. Labels map[string]string
  286. ProviderID string
  287. }
  288. func (k *awsKey) GPUType() string {
  289. return ""
  290. }
  291. func (k *awsKey) ID() string {
  292. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  293. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  294. if matchNum == 2 {
  295. return group
  296. }
  297. }
  298. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  299. return ""
  300. }
  301. func (k *awsKey) Features() string {
  302. instanceType := k.Labels[v1.LabelInstanceType]
  303. var operatingSystem string
  304. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  305. if !ok {
  306. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  307. }
  308. region := k.Labels[v1.LabelZoneRegion]
  309. key := region + "," + instanceType + "," + operatingSystem
  310. usageType := "preemptible"
  311. spotKey := key + "," + usageType
  312. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  313. return spotKey
  314. }
  315. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  316. return spotKey
  317. }
  318. return key
  319. }
  320. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  321. pricing, ok := aws.Pricing[pvk.Features()]
  322. if !ok {
  323. klog.V(2).Infof("Persistent Volume pricing not found for %s", pvk.Features())
  324. return &PV{}, nil
  325. }
  326. return pricing.PV, nil
  327. }
  328. type awsPVKey struct {
  329. Labels map[string]string
  330. StorageClassParameters map[string]string
  331. StorageClassName string
  332. }
  333. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  334. return &awsPVKey{
  335. Labels: pv.Labels,
  336. StorageClassName: pv.Spec.StorageClassName,
  337. }
  338. }
  339. func (key *awsPVKey) Features() string {
  340. storageClass := key.StorageClassName
  341. if storageClass == "standard" {
  342. storageClass = "gp2"
  343. }
  344. // Storage class names are generally EBS volume types (gp2)
  345. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  346. // Converts between the 2
  347. return key.Labels[v1.LabelZoneRegion] + "," + volTypes[storageClass]
  348. }
  349. // GetKey maps node labels to information needed to retrieve pricing data
  350. func (aws *AWS) GetKey(labels map[string]string) Key {
  351. return &awsKey{
  352. SpotLabelName: aws.SpotLabelName,
  353. SpotLabelValue: aws.SpotLabelValue,
  354. Labels: labels,
  355. ProviderID: labels["providerID"],
  356. }
  357. }
  358. func (aws *AWS) isPreemptible(key string) bool {
  359. s := strings.Split(key, ",")
  360. if len(s) == 4 && s[3] == "preemptible" {
  361. return true
  362. }
  363. return false
  364. }
  365. // DownloadPricingData fetches data from the AWS Pricing API
  366. func (aws *AWS) DownloadPricingData() error {
  367. aws.DownloadPricingDataLock.Lock()
  368. defer aws.DownloadPricingDataLock.Unlock()
  369. c, err := GetDefaultPricingData("aws.json")
  370. if err != nil {
  371. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  372. }
  373. aws.BaseCPUPrice = c.CPU
  374. aws.BaseRAMPrice = c.RAM
  375. aws.BaseGPUPrice = c.GPU
  376. aws.BaseSpotCPUPrice = c.SpotCPU
  377. aws.BaseSpotRAMPrice = c.SpotRAM
  378. aws.SpotLabelName = c.SpotLabel
  379. aws.SpotLabelValue = c.SpotLabelValue
  380. aws.SpotDataBucket = c.SpotDataBucket
  381. aws.SpotDataPrefix = c.SpotDataPrefix
  382. aws.ProjectID = c.ProjectID
  383. aws.SpotDataRegion = c.SpotDataRegion
  384. aws.ServiceKeyName = c.ServiceKeyName
  385. aws.ServiceKeySecret = c.ServiceKeySecret
  386. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  387. return fmt.Errorf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  388. }
  389. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  390. if err != nil {
  391. return err
  392. }
  393. inputkeys := make(map[string]bool)
  394. for _, n := range nodeList.Items {
  395. labels := n.GetObjectMeta().GetLabels()
  396. key := aws.GetKey(labels)
  397. inputkeys[key.Features()] = true
  398. }
  399. pvList, err := aws.Clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
  400. if err != nil {
  401. return err
  402. }
  403. storageClasses, err := aws.Clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
  404. storageClassMap := make(map[string]map[string]string)
  405. for _, storageClass := range storageClasses.Items {
  406. params := storageClass.Parameters
  407. storageClassMap[storageClass.ObjectMeta.Name] = params
  408. }
  409. pvkeys := make(map[string]PVKey)
  410. for _, pv := range pvList.Items {
  411. params, ok := storageClassMap[pv.Spec.StorageClassName]
  412. if !ok {
  413. klog.Infof("Unable to find params for storageClassName %s", pv.Name)
  414. continue
  415. }
  416. key := aws.GetPVKey(&pv, params)
  417. pvkeys[key.Features()] = key
  418. }
  419. aws.Pricing = make(map[string]*AWSProductTerms)
  420. aws.ValidPricingKeys = make(map[string]bool)
  421. skusToKeys := make(map[string]string)
  422. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  423. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  424. resp, err := http.Get(pricingURL)
  425. if err != nil {
  426. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  427. return err
  428. }
  429. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  430. dec := json.NewDecoder(resp.Body)
  431. for {
  432. t, err := dec.Token()
  433. if err == io.EOF {
  434. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  435. break
  436. }
  437. if t == "products" {
  438. _, err := dec.Token() // this should parse the opening "{""
  439. if err != nil {
  440. return err
  441. }
  442. for dec.More() {
  443. _, err := dec.Token() // the sku token
  444. if err != nil {
  445. return err
  446. }
  447. product := &AWSProduct{}
  448. err = dec.Decode(&product)
  449. if err != nil {
  450. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  451. break
  452. }
  453. if product.Attributes.PreInstalledSw == "NA" &&
  454. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  455. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  456. spotKey := key + ",preemptible"
  457. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  458. productTerms := &AWSProductTerms{
  459. Sku: product.Sku,
  460. Memory: product.Attributes.Memory,
  461. Storage: product.Attributes.Storage,
  462. VCpu: product.Attributes.VCpu,
  463. GPU: product.Attributes.GPU,
  464. }
  465. aws.Pricing[key] = productTerms
  466. aws.Pricing[spotKey] = productTerms
  467. skusToKeys[product.Sku] = key
  468. }
  469. aws.ValidPricingKeys[key] = true
  470. aws.ValidPricingKeys[spotKey] = true
  471. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  472. // UsageTypes may be prefixed with a region code - we're removing this when using
  473. // volTypes to keep lookups generic
  474. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  475. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  476. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  477. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  478. spotKey := key + ",preemptible"
  479. klog.V(1).Infof("KEY %s", key)
  480. pv := &PV{
  481. Class: volTypes[usageTypeNoRegion],
  482. Region: locationToRegion[product.Attributes.Location],
  483. }
  484. productTerms := &AWSProductTerms{
  485. Sku: product.Sku,
  486. PV: pv,
  487. }
  488. aws.Pricing[key] = productTerms
  489. aws.Pricing[spotKey] = productTerms
  490. skusToKeys[product.Sku] = key
  491. aws.ValidPricingKeys[key] = true
  492. aws.ValidPricingKeys[spotKey] = true
  493. }
  494. }
  495. }
  496. if t == "terms" {
  497. _, err := dec.Token() // this should parse the opening "{""
  498. if err != nil {
  499. return err
  500. }
  501. termType, err := dec.Token()
  502. if err != nil {
  503. return err
  504. }
  505. if termType == "OnDemand" {
  506. _, err := dec.Token()
  507. if err != nil { // again, should parse an opening "{"
  508. return err
  509. }
  510. for dec.More() {
  511. sku, err := dec.Token()
  512. if err != nil {
  513. return err
  514. }
  515. _, err = dec.Token() // another opening "{"
  516. if err != nil {
  517. return err
  518. }
  519. skuOnDemand, err := dec.Token()
  520. if err != nil {
  521. return err
  522. }
  523. offerTerm := &AWSOfferTerm{}
  524. err = dec.Decode(&offerTerm)
  525. if err != nil {
  526. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  527. }
  528. if sku.(string)+OnDemandRateCode == skuOnDemand {
  529. key, ok := skusToKeys[sku.(string)]
  530. spotKey := key + ",preemptible"
  531. if ok {
  532. aws.Pricing[key].OnDemand = offerTerm
  533. aws.Pricing[spotKey].OnDemand = offerTerm
  534. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  535. // If the specific UsageType is the per IO cost used on io1 volumes
  536. // we need to add the per IO cost to the io1 PV cost
  537. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  538. // Add the per IO cost to the PV object for the io1 volume type
  539. aws.Pricing[key].PV.CostPerIO = cost
  540. } else if strings.Contains(key, "EBS:Volume") {
  541. // If volume, we need to get hourly cost and add it to the PV object
  542. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  543. costFloat, _ := strconv.ParseFloat(cost, 64)
  544. hourlyPrice := (costFloat * math.Pow10(-9)) / 730
  545. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  546. }
  547. }
  548. }
  549. _, err = dec.Token()
  550. if err != nil {
  551. return err
  552. }
  553. }
  554. _, err = dec.Token()
  555. if err != nil {
  556. return err
  557. }
  558. }
  559. }
  560. }
  561. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  562. if err != nil {
  563. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  564. } else {
  565. aws.SpotPricingByInstanceID = sp
  566. }
  567. return nil
  568. }
  569. // AllNodePricing returns all the billing data fetched.
  570. func (aws *AWS) AllNodePricing() (interface{}, error) {
  571. aws.DownloadPricingDataLock.RLock()
  572. defer aws.DownloadPricingDataLock.RUnlock()
  573. return aws.Pricing, nil
  574. }
  575. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  576. key := k.Features()
  577. if aws.isPreemptible(key) {
  578. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  579. var spotcost string
  580. arr := strings.Split(spotInfo.Charge, " ")
  581. if len(arr) == 2 {
  582. spotcost = arr[0]
  583. } else {
  584. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  585. }
  586. return &Node{
  587. Cost: spotcost,
  588. VCPU: terms.VCpu,
  589. RAM: terms.Memory,
  590. GPU: terms.GPU,
  591. Storage: terms.Storage,
  592. BaseCPUPrice: aws.BaseCPUPrice,
  593. BaseRAMPrice: aws.BaseRAMPrice,
  594. BaseGPUPrice: aws.BaseGPUPrice,
  595. UsageType: usageType,
  596. }, nil
  597. }
  598. return &Node{
  599. VCPU: terms.VCpu,
  600. VCPUCost: aws.BaseSpotCPUPrice,
  601. RAM: terms.Memory,
  602. GPU: terms.GPU,
  603. RAMCost: aws.BaseSpotRAMPrice,
  604. Storage: terms.Storage,
  605. BaseCPUPrice: aws.BaseCPUPrice,
  606. BaseRAMPrice: aws.BaseRAMPrice,
  607. BaseGPUPrice: aws.BaseGPUPrice,
  608. UsageType: usageType,
  609. }, nil
  610. }
  611. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  612. if !ok {
  613. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  614. }
  615. cost := c.PricePerUnit.USD
  616. return &Node{
  617. Cost: cost,
  618. VCPU: terms.VCpu,
  619. RAM: terms.Memory,
  620. GPU: terms.GPU,
  621. Storage: terms.Storage,
  622. BaseCPUPrice: aws.BaseCPUPrice,
  623. BaseRAMPrice: aws.BaseRAMPrice,
  624. BaseGPUPrice: aws.BaseGPUPrice,
  625. UsageType: usageType,
  626. }, nil
  627. }
  628. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  629. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  630. aws.DownloadPricingDataLock.RLock()
  631. defer aws.DownloadPricingDataLock.RUnlock()
  632. key := k.Features()
  633. usageType := "ondemand"
  634. if aws.isPreemptible(key) {
  635. usageType = "preemptible"
  636. }
  637. terms, ok := aws.Pricing[key]
  638. if ok {
  639. return aws.createNode(terms, usageType, k)
  640. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  641. aws.DownloadPricingDataLock.RUnlock()
  642. err := aws.DownloadPricingData()
  643. aws.DownloadPricingDataLock.RLock()
  644. if err != nil {
  645. return nil, err
  646. }
  647. terms, termsOk := aws.Pricing[key]
  648. if !termsOk {
  649. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  650. }
  651. return aws.createNode(terms, usageType, k)
  652. } else { // Fall back to base pricing if we can't find the key.
  653. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  654. return &Node{
  655. Cost: aws.BaseCPUPrice,
  656. BaseCPUPrice: aws.BaseCPUPrice,
  657. BaseRAMPrice: aws.BaseRAMPrice,
  658. BaseGPUPrice: aws.BaseGPUPrice,
  659. UsageType: usageType,
  660. UsesBaseCPUPrice: true,
  661. }, nil
  662. }
  663. }
  664. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  665. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  666. defaultClusterName := "AWS Cluster #1"
  667. makeStructure := func(clusterName string) ([]byte, error) {
  668. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  669. m := make(map[string]string)
  670. m["name"] = clusterName
  671. m["provider"] = "AWS"
  672. return json.Marshal(m)
  673. }
  674. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  675. if len(maybeClusterId) != 0 {
  676. return makeStructure(maybeClusterId)
  677. }
  678. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  679. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  680. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  681. if err != nil {
  682. return nil, err
  683. }
  684. for _, n := range nodeList.Items {
  685. region := ""
  686. instanceId := ""
  687. providerId := n.Spec.ProviderID
  688. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  689. if matchNum == 1 {
  690. region = group
  691. } else if matchNum == 2 {
  692. instanceId = group
  693. }
  694. }
  695. if len(instanceId) == 0 {
  696. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  697. continue
  698. }
  699. c := &aws.Config{
  700. Region: aws.String(region),
  701. }
  702. s := session.Must(session.NewSession(c))
  703. ec2Svc := ec2.New(s)
  704. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  705. InstanceIds: []*string{
  706. aws.String(instanceId),
  707. },
  708. })
  709. if diErr != nil {
  710. // maybe log this?
  711. continue
  712. }
  713. if len(di.Reservations) != 1 {
  714. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  715. continue
  716. }
  717. res := di.Reservations[0]
  718. if len(res.Instances) != 1 {
  719. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  720. continue
  721. }
  722. inst := res.Instances[0]
  723. for _, tag := range inst.Tags {
  724. tagKey := *tag.Key
  725. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  726. if matchNum != 1 {
  727. continue
  728. }
  729. return makeStructure(group)
  730. }
  731. }
  732. }
  733. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  734. return makeStructure(defaultClusterName)
  735. }
  736. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  737. func (*AWS) AddServiceKey(formValues url.Values) error {
  738. keyID := formValues.Get("access_key_ID")
  739. key := formValues.Get("secret_access_key")
  740. m := make(map[string]string)
  741. m["access_key_ID"] = keyID
  742. m["secret_access_key"] = key
  743. result, err := json.Marshal(m)
  744. if err != nil {
  745. return err
  746. }
  747. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  748. }
  749. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  750. func (*AWS) GetDisks() ([]byte, error) {
  751. jsonFile, err := os.Open("/var/configs/key.json")
  752. if err == nil {
  753. byteValue, _ := ioutil.ReadAll(jsonFile)
  754. var result map[string]string
  755. err := json.Unmarshal([]byte(byteValue), &result)
  756. if err != nil {
  757. return nil, err
  758. }
  759. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  760. if err != nil {
  761. return nil, err
  762. }
  763. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  764. if err != nil {
  765. return nil, err
  766. }
  767. } else if os.IsNotExist(err) {
  768. klog.V(2).Infof("Using Default Credentials")
  769. } else {
  770. return nil, err
  771. }
  772. defer jsonFile.Close()
  773. clusterConfig, err := os.Open("/var/configs/cluster.json")
  774. if err != nil {
  775. return nil, err
  776. }
  777. defer clusterConfig.Close()
  778. b, err := ioutil.ReadAll(clusterConfig)
  779. if err != nil {
  780. return nil, err
  781. }
  782. var clusterConf map[string]string
  783. err = json.Unmarshal([]byte(b), &clusterConf)
  784. if err != nil {
  785. return nil, err
  786. }
  787. region := aws.String(clusterConf["region"])
  788. c := &aws.Config{
  789. Region: region,
  790. }
  791. s := session.Must(session.NewSession(c))
  792. ec2Svc := ec2.New(s)
  793. input := &ec2.DescribeVolumesInput{}
  794. volumeResult, err := ec2Svc.DescribeVolumes(input)
  795. if err != nil {
  796. if aerr, ok := err.(awserr.Error); ok {
  797. switch aerr.Code() {
  798. default:
  799. return nil, aerr
  800. }
  801. } else {
  802. return nil, err
  803. }
  804. }
  805. return json.Marshal(volumeResult)
  806. }
  807. // ConvertToGlueColumnFormat takes a string and runs through various regex
  808. // and string replacement statements to convert it to a format compatible
  809. // with AWS Glue and Athena column names.
  810. // Following guidance from AWS provided here ('Column Names' section):
  811. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  812. // It returns a string containing the column name in proper column name format and length.
  813. func ConvertToGlueColumnFormat(column_name string) string {
  814. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  815. // An underscore is added in front of uppercase letters
  816. capital_underscore := regexp.MustCompile(`[A-Z]`)
  817. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  818. // Any non-alphanumeric characters are replaced with an underscore
  819. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  820. final = no_space_punc.ReplaceAllString(final, "_")
  821. // Duplicate underscores are removed
  822. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  823. final = no_dup_underscore.ReplaceAllString(final, "_")
  824. // Any leading and trailing underscores are removed
  825. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  826. final = no_front_end_underscore.ReplaceAllString(final, "")
  827. // Uppercase to lowercase
  828. final = strings.ToLower(final)
  829. // Longer column name than expected - remove _ left to right
  830. allowed_col_len := 128
  831. undersc_to_remove := len(final) - allowed_col_len
  832. if undersc_to_remove > 0 {
  833. final = strings.Replace(final, "_", "", undersc_to_remove)
  834. }
  835. // If removing all of the underscores still didn't
  836. // make the column name < 128 characters, trim it!
  837. if len(final) > allowed_col_len {
  838. final = final[:allowed_col_len]
  839. }
  840. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  841. return final
  842. }
  843. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  844. // "start" and "end" are dates of the format YYYY-MM-DD
  845. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  846. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  847. customPricing, err := a.GetConfig()
  848. if err != nil {
  849. return nil, err
  850. }
  851. aggregator_column_name := "resource_tags_user_kubernetes_" + aggregator
  852. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  853. query := fmt.Sprintf(`SELECT
  854. CAST(line_item_usage_start_date AS DATE) as start_date,
  855. %s,
  856. line_item_product_code,
  857. SUM(line_item_blended_cost) as blended_cost
  858. FROM %s as cost_data
  859. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  860. GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
  861. if customPricing.ServiceKeyName != "" {
  862. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  863. if err != nil {
  864. return nil, err
  865. }
  866. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  867. if err != nil {
  868. return nil, err
  869. }
  870. }
  871. region := aws.String(customPricing.AthenaRegion)
  872. resultsBucket := customPricing.AthenaBucketName
  873. database := customPricing.AthenaDatabase
  874. c := &aws.Config{
  875. Region: region,
  876. }
  877. s := session.Must(session.NewSession(c))
  878. svc := athena.New(s)
  879. var e athena.StartQueryExecutionInput
  880. var r athena.ResultConfiguration
  881. r.SetOutputLocation(resultsBucket)
  882. e.SetResultConfiguration(&r)
  883. e.SetQueryString(query)
  884. var q athena.QueryExecutionContext
  885. q.SetDatabase(database)
  886. e.SetQueryExecutionContext(&q)
  887. res, err := svc.StartQueryExecution(&e)
  888. if err != nil {
  889. return nil, err
  890. }
  891. klog.V(2).Infof("StartQueryExecution result:")
  892. klog.V(2).Infof(res.GoString())
  893. var qri athena.GetQueryExecutionInput
  894. qri.SetQueryExecutionId(*res.QueryExecutionId)
  895. var qrop *athena.GetQueryExecutionOutput
  896. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  897. for {
  898. qrop, err = svc.GetQueryExecution(&qri)
  899. if err != nil {
  900. return nil, err
  901. }
  902. if *qrop.QueryExecution.Status.State != "RUNNING" {
  903. break
  904. }
  905. time.Sleep(duration)
  906. }
  907. var oocAllocs []*OutOfClusterAllocation
  908. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  909. var ip athena.GetQueryResultsInput
  910. ip.SetQueryExecutionId(*res.QueryExecutionId)
  911. op, err := svc.GetQueryResults(&ip)
  912. if err != nil {
  913. return nil, err
  914. }
  915. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  916. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  917. if err != nil {
  918. return nil, err
  919. }
  920. ooc := &OutOfClusterAllocation{
  921. Aggregator: aggregator,
  922. Environment: *r.Data[1].VarCharValue,
  923. Service: *r.Data[2].VarCharValue,
  924. Cost: cost,
  925. }
  926. oocAllocs = append(oocAllocs, ooc)
  927. }
  928. }
  929. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  930. }
  931. // QuerySQL can query a properly configured Athena database.
  932. // Used to fetch billing data.
  933. // Requires a json config in /var/configs with key region, output, and database.
  934. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  935. customPricing, err := a.GetConfig()
  936. if err != nil {
  937. return nil, err
  938. }
  939. if customPricing.ServiceKeyName != "" {
  940. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  941. if err != nil {
  942. return nil, err
  943. }
  944. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  945. if err != nil {
  946. return nil, err
  947. }
  948. }
  949. athenaConfigs, err := os.Open("/var/configs/athena.json")
  950. if err != nil {
  951. return nil, err
  952. }
  953. defer athenaConfigs.Close()
  954. b, err := ioutil.ReadAll(athenaConfigs)
  955. if err != nil {
  956. return nil, err
  957. }
  958. var athenaConf map[string]string
  959. json.Unmarshal([]byte(b), &athenaConf)
  960. region := aws.String(customPricing.AthenaRegion)
  961. resultsBucket := customPricing.AthenaBucketName
  962. database := customPricing.AthenaDatabase
  963. c := &aws.Config{
  964. Region: region,
  965. }
  966. s := session.Must(session.NewSession(c))
  967. svc := athena.New(s)
  968. var e athena.StartQueryExecutionInput
  969. var r athena.ResultConfiguration
  970. r.SetOutputLocation(resultsBucket)
  971. e.SetResultConfiguration(&r)
  972. e.SetQueryString(query)
  973. var q athena.QueryExecutionContext
  974. q.SetDatabase(database)
  975. e.SetQueryExecutionContext(&q)
  976. res, err := svc.StartQueryExecution(&e)
  977. if err != nil {
  978. return nil, err
  979. }
  980. klog.V(2).Infof("StartQueryExecution result:")
  981. klog.V(2).Infof(res.GoString())
  982. var qri athena.GetQueryExecutionInput
  983. qri.SetQueryExecutionId(*res.QueryExecutionId)
  984. var qrop *athena.GetQueryExecutionOutput
  985. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  986. for {
  987. qrop, err = svc.GetQueryExecution(&qri)
  988. if err != nil {
  989. return nil, err
  990. }
  991. if *qrop.QueryExecution.Status.State != "RUNNING" {
  992. break
  993. }
  994. time.Sleep(duration)
  995. }
  996. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  997. var ip athena.GetQueryResultsInput
  998. ip.SetQueryExecutionId(*res.QueryExecutionId)
  999. op, err := svc.GetQueryResults(&ip)
  1000. if err != nil {
  1001. return nil, err
  1002. }
  1003. b, err := json.Marshal(op.ResultSet)
  1004. if err != nil {
  1005. return nil, err
  1006. }
  1007. return b, nil
  1008. }
  1009. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1010. }
  1011. type spotInfo struct {
  1012. Timestamp string `csv:"Timestamp"`
  1013. UsageType string `csv:"UsageType"`
  1014. Operation string `csv:"Operation"`
  1015. InstanceID string `csv:"InstanceID"`
  1016. MyBidID string `csv:"MyBidID"`
  1017. MyMaxPrice string `csv:"MyMaxPrice"`
  1018. MarketPrice string `csv:"MarketPrice"`
  1019. Charge string `csv:"Charge"`
  1020. Version string `csv:"Version"`
  1021. }
  1022. type fnames []*string
  1023. func (f fnames) Len() int {
  1024. return len(f)
  1025. }
  1026. func (f fnames) Swap(i, j int) {
  1027. f[i], f[j] = f[j], f[i]
  1028. }
  1029. func (f fnames) Less(i, j int) bool {
  1030. key1 := strings.Split(*f[i], ".")
  1031. key2 := strings.Split(*f[j], ".")
  1032. t1, err := time.Parse("2006-01-02-15", key1[1])
  1033. if err != nil {
  1034. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1035. return false
  1036. }
  1037. t2, err := time.Parse("2006-01-02-15", key2[1])
  1038. if err != nil {
  1039. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1040. return false
  1041. }
  1042. return t1.Before(t2)
  1043. }
  1044. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1045. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1046. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  1047. if err != nil {
  1048. return nil, err
  1049. }
  1050. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  1051. if err != nil {
  1052. return nil, err
  1053. }
  1054. }
  1055. s3Prefix := projectID
  1056. if len(prefix) != 0 {
  1057. s3Prefix = prefix + "/" + s3Prefix
  1058. }
  1059. c := aws.NewConfig().WithRegion(region)
  1060. s := session.Must(session.NewSession(c))
  1061. s3Svc := s3.New(s)
  1062. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1063. tNow := time.Now()
  1064. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1065. ls := &s3.ListObjectsInput{
  1066. Bucket: aws.String(bucket),
  1067. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1068. }
  1069. ls2 := &s3.ListObjectsInput{
  1070. Bucket: aws.String(bucket),
  1071. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1072. }
  1073. lso, err := s3Svc.ListObjects(ls)
  1074. if err != nil {
  1075. return nil, err
  1076. }
  1077. lsoLen := len(lso.Contents)
  1078. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1079. if lsoLen == 0 {
  1080. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1081. }
  1082. lso2, err := s3Svc.ListObjects(ls2)
  1083. if err != nil {
  1084. return nil, err
  1085. }
  1086. lso2Len := len(lso2.Contents)
  1087. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1088. if lso2Len == 0 {
  1089. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1090. }
  1091. var keys []*string
  1092. for _, obj := range lso.Contents {
  1093. keys = append(keys, obj.Key)
  1094. }
  1095. for _, obj := range lso2.Contents {
  1096. keys = append(keys, obj.Key)
  1097. }
  1098. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1099. header, err := csvutil.Header(spotInfo{}, "csv")
  1100. if err != nil {
  1101. return nil, err
  1102. }
  1103. fieldsPerRecord := len(header)
  1104. spots := make(map[string]*spotInfo)
  1105. for _, key := range keys {
  1106. getObj := &s3.GetObjectInput{
  1107. Bucket: aws.String(bucket),
  1108. Key: key,
  1109. }
  1110. buf := aws.NewWriteAtBuffer([]byte{})
  1111. _, err := downloader.Download(buf, getObj)
  1112. if err != nil {
  1113. return nil, err
  1114. }
  1115. r := bytes.NewReader(buf.Bytes())
  1116. gr, err := gzip.NewReader(r)
  1117. if err != nil {
  1118. return nil, err
  1119. }
  1120. csvReader := csv.NewReader(gr)
  1121. csvReader.Comma = '\t'
  1122. csvReader.FieldsPerRecord = fieldsPerRecord
  1123. dec, err := csvutil.NewDecoder(csvReader, header...)
  1124. if err != nil {
  1125. return nil, err
  1126. }
  1127. var foundVersion string
  1128. for {
  1129. spot := spotInfo{}
  1130. err := dec.Decode(&spot)
  1131. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1132. if err == io.EOF {
  1133. break
  1134. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1135. rec := dec.Record()
  1136. // the first two "Record()" will be the comment lines
  1137. // and they show up as len() == 1
  1138. // the first of which is "#Version"
  1139. // the second of which is "#Fields: "
  1140. if len(rec) != 1 {
  1141. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1142. continue
  1143. }
  1144. if len(foundVersion) == 0 {
  1145. spotFeedVersion := rec[0]
  1146. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1147. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1148. if matches != nil {
  1149. foundVersion = matches[1]
  1150. if foundVersion != supportedSpotFeedVersion {
  1151. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1152. break
  1153. }
  1154. }
  1155. continue
  1156. } else if strings.Index(rec[0], "#") == 0 {
  1157. continue
  1158. } else {
  1159. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1160. continue
  1161. }
  1162. } else if err != nil {
  1163. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1164. continue
  1165. }
  1166. klog.V(3).Infof("Found spot info %+v", spot)
  1167. spots[spot.InstanceID] = &spot
  1168. }
  1169. gr.Close()
  1170. }
  1171. return spots, nil
  1172. }