awsprovider.go 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "math"
  11. "net/http"
  12. "net/url"
  13. "os"
  14. "regexp"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "time"
  19. "k8s.io/klog"
  20. "github.com/aws/aws-sdk-go/aws"
  21. "github.com/aws/aws-sdk-go/aws/awserr"
  22. "github.com/aws/aws-sdk-go/aws/session"
  23. "github.com/aws/aws-sdk-go/service/athena"
  24. "github.com/aws/aws-sdk-go/service/ec2"
  25. "github.com/aws/aws-sdk-go/service/s3"
  26. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  27. "github.com/jszwec/csvutil"
  28. v1 "k8s.io/api/core/v1"
  29. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  30. "k8s.io/client-go/kubernetes"
  31. )
  32. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  33. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  34. const supportedSpotFeedVersion = "1"
  35. const SpotInfoUpdateType = "spotinfo"
  36. const AthenaInfoUpdateType = "athenainfo"
  37. // AWS represents an Amazon Provider
  38. type AWS struct {
  39. Pricing map[string]*AWSProductTerms
  40. SpotPricingByInstanceID map[string]*spotInfo
  41. ValidPricingKeys map[string]bool
  42. Clientset *kubernetes.Clientset
  43. BaseCPUPrice string
  44. BaseRAMPrice string
  45. BaseGPUPrice string
  46. BaseSpotCPUPrice string
  47. BaseSpotRAMPrice string
  48. SpotLabelName string
  49. SpotLabelValue string
  50. ServiceKeyName string
  51. ServiceKeySecret string
  52. SpotDataRegion string
  53. SpotDataBucket string
  54. SpotDataPrefix string
  55. ProjectID string
  56. DownloadPricingDataLock sync.RWMutex
  57. *CustomProvider
  58. }
  59. // AWSPricing maps a k8s node to an AWS Pricing "product"
  60. type AWSPricing struct {
  61. Products map[string]*AWSProduct `json:"products"`
  62. Terms AWSPricingTerms `json:"terms"`
  63. }
  64. // AWSProduct represents a purchased SKU
  65. type AWSProduct struct {
  66. Sku string `json:"sku"`
  67. Attributes AWSProductAttributes `json:"attributes"`
  68. }
  69. // AWSProductAttributes represents metadata about the product used to map to a node.
  70. type AWSProductAttributes struct {
  71. Location string `json:"location"`
  72. InstanceType string `json:"instanceType"`
  73. Memory string `json:"memory"`
  74. Storage string `json:"storage"`
  75. VCpu string `json:"vcpu"`
  76. UsageType string `json:"usagetype"`
  77. OperatingSystem string `json:"operatingSystem"`
  78. PreInstalledSw string `json:"preInstalledSw"`
  79. InstanceFamily string `json:"instanceFamily"`
  80. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  81. }
  82. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  83. type AWSPricingTerms struct {
  84. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  85. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  86. }
  87. // AWSOfferTerm is a sku extension used to pay for the node.
  88. type AWSOfferTerm struct {
  89. Sku string `json:"sku"`
  90. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  91. }
  92. // AWSRateCode encodes data about the price of a product
  93. type AWSRateCode struct {
  94. Unit string `json:"unit"`
  95. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  96. }
  97. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  98. type AWSCurrencyCode struct {
  99. USD string `json:"USD"`
  100. }
  101. // AWSProductTerms represents the full terms of the product
  102. type AWSProductTerms struct {
  103. Sku string `json:"sku"`
  104. OnDemand *AWSOfferTerm `json:"OnDemand"`
  105. Reserved *AWSOfferTerm `json:"Reserved"`
  106. Memory string `json:"memory"`
  107. Storage string `json:"storage"`
  108. VCpu string `json:"vcpu"`
  109. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  110. PV *PV `json:"pv"`
  111. }
  112. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  113. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  114. // OnDemandRateCode is appended to an node sku
  115. const OnDemandRateCode = ".JRTCKXETXF"
  116. // ReservedRateCode is appended to a node sku
  117. const ReservedRateCode = ".38NPMPTW36"
  118. // HourlyRateCode is appended to a node sku
  119. const HourlyRateCode = ".6YS6EN2CT7"
  120. // volTypes are used to map between AWS UsageTypes and
  121. // EBS volume types, as they would appear in K8s storage class
  122. // name and the EC2 API.
  123. var volTypes = map[string]string{
  124. "EBS:VolumeUsage.gp2": "gp2",
  125. "EBS:VolumeUsage": "standard",
  126. "EBS:VolumeUsage.sc1": "sc1",
  127. "EBS:VolumeP-IOPS.piops": "io1",
  128. "EBS:VolumeUsage.st1": "st1",
  129. "EBS:VolumeUsage.piops": "io1",
  130. "gp2": "EBS:VolumeUsage.gp2",
  131. "standard": "EBS:VolumeUsage",
  132. "sc1": "EBS:VolumeUsage.sc1",
  133. "io1": "EBS:VolumeUsage.piops",
  134. "st1": "EBS:VolumeUsage.st1",
  135. }
  136. // locationToRegion maps AWS region names (As they come from Billing)
  137. // to actual region identifiers
  138. var locationToRegion = map[string]string{
  139. "US East (Ohio)": "us-east-2",
  140. "US East (N. Virginia)": "us-east-1",
  141. "US West (N. California)": "us-west-1",
  142. "US West (Oregon)": "us-west-2",
  143. "Asia Pacific (Hong Kong)": "ap-east-1",
  144. "Asia Pacific (Mumbai)": "ap-south-1",
  145. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  146. "Asia Pacific (Seoul)": "ap-northeast-2",
  147. "Asia Pacific (Singapore)": "ap-southeast-1",
  148. "Asia Pacific (Sydney)": "ap-southeast-2",
  149. "Asia Pacific (Tokyo)": "ap-northeast-1",
  150. "Canada (Central)": "ca-central-1",
  151. "China (Beijing)": "cn-north-1",
  152. "China (Ningxia)": "cn-northwest-1",
  153. "EU (Frankfurt)": "eu-central-1",
  154. "EU (Ireland)": "eu-west-1",
  155. "EU (London)": "eu-west-2",
  156. "EU (Paris)": "eu-west-3",
  157. "EU (Stockholm)": "eu-north-1",
  158. "South America (Sao Paulo)": "sa-east-1",
  159. "AWS GovCloud (US-East)": "us-gov-east-1",
  160. "AWS GovCloud (US)": "us-gov-west-1",
  161. }
  162. var regionToBillingRegionCode = map[string]string{
  163. "us-east-2": "USE2",
  164. "us-east-1": "",
  165. "us-west-1": "USW1",
  166. "us-west-2": "USW2",
  167. "ap-east-1": "APE1",
  168. "ap-south-1": "APS3",
  169. "ap-northeast-3": "APN3",
  170. "ap-northeast-2": "APN2",
  171. "ap-southeast-1": "APS1",
  172. "ap-southeast-2": "APS2",
  173. "ap-northeast-1": "APN1",
  174. "ca-central-1": "CAN1",
  175. "cn-north-1": "",
  176. "cn-northwest-1": "",
  177. "eu-central-1": "EUC1",
  178. "eu-west-1": "EU",
  179. "eu-west-2": "EUW2",
  180. "eu-west-3": "EUW3",
  181. "eu-north-1": "EUN1",
  182. "sa-east-1": "SAE1",
  183. "us-gov-east-1": "UGE1",
  184. "us-gov-west-1": "UGW1",
  185. }
  186. // KubeAttrConversion maps the k8s labels for region to an aws region
  187. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  188. operatingSystem = strings.ToLower(operatingSystem)
  189. region := locationToRegion[location]
  190. return region + "," + instanceType + "," + operatingSystem
  191. }
  192. type AwsSpotFeedInfo struct {
  193. BucketName string `json:"bucketName"`
  194. Prefix string `json:"prefix"`
  195. Region string `json:"region"`
  196. AccountID string `json:"projectID"`
  197. ServiceKeyName string `json:"serviceKeyName"`
  198. ServiceKeySecret string `json:"serviceKeySecret"`
  199. SpotLabel string `json:"spotLabel"`
  200. SpotLabelValue string `json:"spotLabelValue"`
  201. }
  202. type AwsAthenaInfo struct {
  203. AthenaBucketName string `json:"athenaBucketName"`
  204. AthenaRegion string `json:"athenaRegion"`
  205. AthenaDatabase string `json:"athenaDatabase"`
  206. AthenaTable string `json:"athenaTable"`
  207. ServiceKeyName string `json:"serviceKeyName"`
  208. ServiceKeySecret string `json:"serviceKeySecret"`
  209. AccountID string `json:"projectID"`
  210. }
  211. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  212. c, err := GetDefaultPricingData("aws.json")
  213. if err != nil {
  214. return nil, err
  215. }
  216. return c, nil
  217. }
  218. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  219. c, err := GetDefaultPricingData("aws.json")
  220. if err != nil {
  221. return nil, err
  222. }
  223. if updateType == SpotInfoUpdateType {
  224. a := AwsSpotFeedInfo{}
  225. err := json.NewDecoder(r).Decode(&a)
  226. if err != nil {
  227. return nil, err
  228. }
  229. if err != nil {
  230. return nil, err
  231. }
  232. c.ServiceKeyName = a.ServiceKeyName
  233. c.ServiceKeySecret = a.ServiceKeySecret
  234. c.SpotDataPrefix = a.Prefix
  235. c.SpotDataBucket = a.BucketName
  236. c.ProjectID = a.AccountID
  237. c.SpotDataRegion = a.Region
  238. c.SpotLabel = a.SpotLabel
  239. c.SpotLabelValue = a.SpotLabelValue
  240. } else if updateType == AthenaInfoUpdateType {
  241. a := AwsAthenaInfo{}
  242. err := json.NewDecoder(r).Decode(&a)
  243. if err != nil {
  244. return nil, err
  245. }
  246. c.AthenaBucketName = a.AthenaBucketName
  247. c.AthenaRegion = a.AthenaRegion
  248. c.AthenaDatabase = a.AthenaDatabase
  249. c.AthenaTable = a.AthenaTable
  250. c.ServiceKeyName = a.ServiceKeyName
  251. c.ServiceKeySecret = a.ServiceKeySecret
  252. c.ProjectID = a.AccountID
  253. } else {
  254. a := make(map[string]string)
  255. err = json.NewDecoder(r).Decode(&a)
  256. if err != nil {
  257. return nil, err
  258. }
  259. for k, v := range a {
  260. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  261. err := SetCustomPricingField(c, kUpper, v)
  262. if err != nil {
  263. return nil, err
  264. }
  265. }
  266. }
  267. cj, err := json.Marshal(c)
  268. if err != nil {
  269. return nil, err
  270. }
  271. path := os.Getenv("CONFIG_PATH")
  272. if path == "" {
  273. path = "/models/"
  274. }
  275. path += "aws.json"
  276. err = ioutil.WriteFile(path, cj, 0644)
  277. if err != nil {
  278. return nil, err
  279. }
  280. return c, nil
  281. }
  282. type awsKey struct {
  283. SpotLabelName string
  284. SpotLabelValue string
  285. Labels map[string]string
  286. ProviderID string
  287. }
  288. func (k *awsKey) GPUType() string {
  289. return ""
  290. }
  291. func (k *awsKey) ID() string {
  292. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  293. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  294. if matchNum == 2 {
  295. return group
  296. }
  297. }
  298. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  299. return ""
  300. }
  301. func (k *awsKey) Features() string {
  302. instanceType := k.Labels[v1.LabelInstanceType]
  303. var operatingSystem string
  304. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  305. if !ok {
  306. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  307. }
  308. region := k.Labels[v1.LabelZoneRegion]
  309. key := region + "," + instanceType + "," + operatingSystem
  310. usageType := "preemptible"
  311. spotKey := key + "," + usageType
  312. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  313. return spotKey
  314. }
  315. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  316. return spotKey
  317. }
  318. return key
  319. }
  320. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  321. pricing, ok := aws.Pricing[pvk.Features()]
  322. if !ok {
  323. klog.V(2).Infof("Persistent Volume pricing not found for %s", pvk.Features())
  324. return &PV{}, nil
  325. }
  326. return pricing.PV, nil
  327. }
  328. type awsPVKey struct {
  329. Labels map[string]string
  330. StorageClassParameters map[string]string
  331. StorageClassName string
  332. }
  333. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  334. return &awsPVKey{
  335. Labels: pv.Labels,
  336. StorageClassName: pv.Spec.StorageClassName,
  337. }
  338. }
  339. func (key *awsPVKey) Features() string {
  340. storageClass := key.StorageClassName
  341. if storageClass == "standard" {
  342. storageClass = "gp2"
  343. }
  344. // Storage class names are generally EBS volume types (gp2)
  345. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  346. // Converts between the 2
  347. return key.Labels[v1.LabelZoneRegion] + "," + volTypes[storageClass]
  348. }
  349. // GetKey maps node labels to information needed to retrieve pricing data
  350. func (aws *AWS) GetKey(labels map[string]string) Key {
  351. return &awsKey{
  352. SpotLabelName: aws.SpotLabelName,
  353. SpotLabelValue: aws.SpotLabelValue,
  354. Labels: labels,
  355. ProviderID: labels["providerID"],
  356. }
  357. }
  358. func (aws *AWS) isPreemptible(key string) bool {
  359. s := strings.Split(key, ",")
  360. if len(s) == 4 && s[3] == "preemptible" {
  361. return true
  362. }
  363. return false
  364. }
  365. // DownloadPricingData fetches data from the AWS Pricing API
  366. func (aws *AWS) DownloadPricingData() error {
  367. aws.DownloadPricingDataLock.Lock()
  368. defer aws.DownloadPricingDataLock.Unlock()
  369. c, err := GetDefaultPricingData("aws.json")
  370. if err != nil {
  371. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  372. }
  373. aws.BaseCPUPrice = c.CPU
  374. aws.BaseRAMPrice = c.RAM
  375. aws.BaseGPUPrice = c.GPU
  376. aws.BaseSpotCPUPrice = c.SpotCPU
  377. aws.BaseSpotRAMPrice = c.SpotRAM
  378. aws.SpotLabelName = c.SpotLabel
  379. aws.SpotLabelValue = c.SpotLabelValue
  380. aws.SpotDataBucket = c.SpotDataBucket
  381. aws.SpotDataPrefix = c.SpotDataPrefix
  382. aws.ProjectID = c.ProjectID
  383. aws.SpotDataRegion = c.SpotDataRegion
  384. aws.ServiceKeyName = c.ServiceKeyName
  385. aws.ServiceKeySecret = c.ServiceKeySecret
  386. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  387. return fmt.Errorf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  388. }
  389. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  390. if err != nil {
  391. return err
  392. }
  393. inputkeys := make(map[string]bool)
  394. for _, n := range nodeList.Items {
  395. labels := n.GetObjectMeta().GetLabels()
  396. key := aws.GetKey(labels)
  397. inputkeys[key.Features()] = true
  398. }
  399. pvList, err := aws.Clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
  400. if err != nil {
  401. return err
  402. }
  403. storageClasses, err := aws.Clientset.StorageV1().StorageClasses().List(metav1.ListOptions{})
  404. storageClassMap := make(map[string]map[string]string)
  405. for _, storageClass := range storageClasses.Items {
  406. params := storageClass.Parameters
  407. storageClassMap[storageClass.ObjectMeta.Name] = params
  408. }
  409. pvkeys := make(map[string]PVKey)
  410. for _, pv := range pvList.Items {
  411. params, ok := storageClassMap[pv.Spec.StorageClassName]
  412. if !ok {
  413. klog.Infof("Unable to find params for storageClassName %s", pv.Name)
  414. continue
  415. }
  416. key := aws.GetPVKey(&pv, params)
  417. pvkeys[key.Features()] = key
  418. }
  419. aws.Pricing = make(map[string]*AWSProductTerms)
  420. aws.ValidPricingKeys = make(map[string]bool)
  421. skusToKeys := make(map[string]string)
  422. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  423. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  424. resp, err := http.Get(pricingURL)
  425. if err != nil {
  426. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  427. return err
  428. }
  429. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  430. dec := json.NewDecoder(resp.Body)
  431. for {
  432. t, err := dec.Token()
  433. if err == io.EOF {
  434. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  435. break
  436. }
  437. if t == "products" {
  438. _, err := dec.Token() // this should parse the opening "{""
  439. if err != nil {
  440. return err
  441. }
  442. for dec.More() {
  443. _, err := dec.Token() // the sku token
  444. if err != nil {
  445. return err
  446. }
  447. product := &AWSProduct{}
  448. err = dec.Decode(&product)
  449. if err != nil {
  450. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  451. break
  452. }
  453. if product.Attributes.PreInstalledSw == "NA" &&
  454. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  455. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  456. spotKey := key + ",preemptible"
  457. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  458. productTerms := &AWSProductTerms{
  459. Sku: product.Sku,
  460. Memory: product.Attributes.Memory,
  461. Storage: product.Attributes.Storage,
  462. VCpu: product.Attributes.VCpu,
  463. GPU: product.Attributes.GPU,
  464. }
  465. aws.Pricing[key] = productTerms
  466. aws.Pricing[spotKey] = productTerms
  467. skusToKeys[product.Sku] = key
  468. }
  469. aws.ValidPricingKeys[key] = true
  470. aws.ValidPricingKeys[spotKey] = true
  471. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  472. // UsageTypes may be prefixed with a region code - we're removing this when using
  473. // volTypes to keep lookups generic
  474. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  475. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  476. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  477. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  478. spotKey := key + ",preemptible"
  479. pv := &PV{
  480. Class: volTypes[usageTypeNoRegion],
  481. Region: locationToRegion[product.Attributes.Location],
  482. }
  483. productTerms := &AWSProductTerms{
  484. Sku: product.Sku,
  485. PV: pv,
  486. }
  487. aws.Pricing[key] = productTerms
  488. aws.Pricing[spotKey] = productTerms
  489. skusToKeys[product.Sku] = key
  490. aws.ValidPricingKeys[key] = true
  491. aws.ValidPricingKeys[spotKey] = true
  492. }
  493. }
  494. }
  495. if t == "terms" {
  496. _, err := dec.Token() // this should parse the opening "{""
  497. if err != nil {
  498. return err
  499. }
  500. termType, err := dec.Token()
  501. if err != nil {
  502. return err
  503. }
  504. if termType == "OnDemand" {
  505. _, err := dec.Token()
  506. if err != nil { // again, should parse an opening "{"
  507. return err
  508. }
  509. for dec.More() {
  510. sku, err := dec.Token()
  511. if err != nil {
  512. return err
  513. }
  514. _, err = dec.Token() // another opening "{"
  515. if err != nil {
  516. return err
  517. }
  518. skuOnDemand, err := dec.Token()
  519. if err != nil {
  520. return err
  521. }
  522. offerTerm := &AWSOfferTerm{}
  523. err = dec.Decode(&offerTerm)
  524. if err != nil {
  525. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  526. }
  527. if sku.(string)+OnDemandRateCode == skuOnDemand {
  528. key, ok := skusToKeys[sku.(string)]
  529. spotKey := key + ",preemptible"
  530. if ok {
  531. aws.Pricing[key].OnDemand = offerTerm
  532. aws.Pricing[spotKey].OnDemand = offerTerm
  533. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  534. // If the specific UsageType is the per IO cost used on io1 volumes
  535. // we need to add the per IO cost to the io1 PV cost
  536. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  537. // Add the per IO cost to the PV object for the io1 volume type
  538. aws.Pricing[key].PV.CostPerIO = cost
  539. } else if strings.Contains(key, "EBS:Volume") {
  540. // If volume, we need to get hourly cost and add it to the PV object
  541. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  542. costFloat, _ := strconv.ParseFloat(cost, 64)
  543. hourlyPrice := (costFloat * math.Pow10(-9)) / 730
  544. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  545. }
  546. }
  547. }
  548. _, err = dec.Token()
  549. if err != nil {
  550. return err
  551. }
  552. }
  553. _, err = dec.Token()
  554. if err != nil {
  555. return err
  556. }
  557. }
  558. }
  559. }
  560. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  561. if err != nil {
  562. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  563. } else {
  564. aws.SpotPricingByInstanceID = sp
  565. }
  566. return nil
  567. }
  568. // AllNodePricing returns all the billing data fetched.
  569. func (aws *AWS) AllNodePricing() (interface{}, error) {
  570. aws.DownloadPricingDataLock.RLock()
  571. defer aws.DownloadPricingDataLock.RUnlock()
  572. return aws.Pricing, nil
  573. }
  574. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  575. key := k.Features()
  576. if aws.isPreemptible(key) {
  577. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  578. var spotcost string
  579. arr := strings.Split(spotInfo.Charge, " ")
  580. if len(arr) == 2 {
  581. spotcost = arr[0]
  582. } else {
  583. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  584. }
  585. return &Node{
  586. Cost: spotcost,
  587. VCPU: terms.VCpu,
  588. RAM: terms.Memory,
  589. GPU: terms.GPU,
  590. Storage: terms.Storage,
  591. BaseCPUPrice: aws.BaseCPUPrice,
  592. BaseRAMPrice: aws.BaseRAMPrice,
  593. BaseGPUPrice: aws.BaseGPUPrice,
  594. UsageType: usageType,
  595. }, nil
  596. }
  597. return &Node{
  598. VCPU: terms.VCpu,
  599. VCPUCost: aws.BaseSpotCPUPrice,
  600. RAM: terms.Memory,
  601. GPU: terms.GPU,
  602. RAMCost: aws.BaseSpotRAMPrice,
  603. Storage: terms.Storage,
  604. BaseCPUPrice: aws.BaseCPUPrice,
  605. BaseRAMPrice: aws.BaseRAMPrice,
  606. BaseGPUPrice: aws.BaseGPUPrice,
  607. UsageType: usageType,
  608. }, nil
  609. }
  610. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  611. if !ok {
  612. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  613. }
  614. cost := c.PricePerUnit.USD
  615. return &Node{
  616. Cost: cost,
  617. VCPU: terms.VCpu,
  618. RAM: terms.Memory,
  619. GPU: terms.GPU,
  620. Storage: terms.Storage,
  621. BaseCPUPrice: aws.BaseCPUPrice,
  622. BaseRAMPrice: aws.BaseRAMPrice,
  623. BaseGPUPrice: aws.BaseGPUPrice,
  624. UsageType: usageType,
  625. }, nil
  626. }
  627. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  628. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  629. aws.DownloadPricingDataLock.RLock()
  630. defer aws.DownloadPricingDataLock.RUnlock()
  631. key := k.Features()
  632. usageType := "ondemand"
  633. if aws.isPreemptible(key) {
  634. usageType = "preemptible"
  635. }
  636. terms, ok := aws.Pricing[key]
  637. if ok {
  638. return aws.createNode(terms, usageType, k)
  639. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  640. aws.DownloadPricingDataLock.RUnlock()
  641. err := aws.DownloadPricingData()
  642. aws.DownloadPricingDataLock.RLock()
  643. if err != nil {
  644. return nil, err
  645. }
  646. terms, termsOk := aws.Pricing[key]
  647. if !termsOk {
  648. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  649. }
  650. return aws.createNode(terms, usageType, k)
  651. } else { // Fall back to base pricing if we can't find the key.
  652. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  653. return &Node{
  654. Cost: aws.BaseCPUPrice,
  655. BaseCPUPrice: aws.BaseCPUPrice,
  656. BaseRAMPrice: aws.BaseRAMPrice,
  657. BaseGPUPrice: aws.BaseGPUPrice,
  658. UsageType: usageType,
  659. UsesBaseCPUPrice: true,
  660. }, nil
  661. }
  662. }
  663. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  664. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  665. defaultClusterName := "AWS Cluster #1"
  666. makeStructure := func(clusterName string) ([]byte, error) {
  667. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  668. m := make(map[string]string)
  669. m["name"] = clusterName
  670. m["provider"] = "AWS"
  671. return json.Marshal(m)
  672. }
  673. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  674. if len(maybeClusterId) != 0 {
  675. return makeStructure(maybeClusterId)
  676. }
  677. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  678. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  679. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  680. if err != nil {
  681. return nil, err
  682. }
  683. for _, n := range nodeList.Items {
  684. region := ""
  685. instanceId := ""
  686. providerId := n.Spec.ProviderID
  687. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  688. if matchNum == 1 {
  689. region = group
  690. } else if matchNum == 2 {
  691. instanceId = group
  692. }
  693. }
  694. if len(instanceId) == 0 {
  695. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  696. continue
  697. }
  698. c := &aws.Config{
  699. Region: aws.String(region),
  700. }
  701. s := session.Must(session.NewSession(c))
  702. ec2Svc := ec2.New(s)
  703. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  704. InstanceIds: []*string{
  705. aws.String(instanceId),
  706. },
  707. })
  708. if diErr != nil {
  709. // maybe log this?
  710. continue
  711. }
  712. if len(di.Reservations) != 1 {
  713. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  714. continue
  715. }
  716. res := di.Reservations[0]
  717. if len(res.Instances) != 1 {
  718. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  719. continue
  720. }
  721. inst := res.Instances[0]
  722. for _, tag := range inst.Tags {
  723. tagKey := *tag.Key
  724. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  725. if matchNum != 1 {
  726. continue
  727. }
  728. return makeStructure(group)
  729. }
  730. }
  731. }
  732. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  733. return makeStructure(defaultClusterName)
  734. }
  735. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  736. func (*AWS) AddServiceKey(formValues url.Values) error {
  737. keyID := formValues.Get("access_key_ID")
  738. key := formValues.Get("secret_access_key")
  739. m := make(map[string]string)
  740. m["access_key_ID"] = keyID
  741. m["secret_access_key"] = key
  742. result, err := json.Marshal(m)
  743. if err != nil {
  744. return err
  745. }
  746. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  747. }
  748. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  749. func (*AWS) GetDisks() ([]byte, error) {
  750. jsonFile, err := os.Open("/var/configs/key.json")
  751. if err == nil {
  752. byteValue, _ := ioutil.ReadAll(jsonFile)
  753. var result map[string]string
  754. err := json.Unmarshal([]byte(byteValue), &result)
  755. if err != nil {
  756. return nil, err
  757. }
  758. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  759. if err != nil {
  760. return nil, err
  761. }
  762. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  763. if err != nil {
  764. return nil, err
  765. }
  766. } else if os.IsNotExist(err) {
  767. klog.V(2).Infof("Using Default Credentials")
  768. } else {
  769. return nil, err
  770. }
  771. defer jsonFile.Close()
  772. clusterConfig, err := os.Open("/var/configs/cluster.json")
  773. if err != nil {
  774. return nil, err
  775. }
  776. defer clusterConfig.Close()
  777. b, err := ioutil.ReadAll(clusterConfig)
  778. if err != nil {
  779. return nil, err
  780. }
  781. var clusterConf map[string]string
  782. err = json.Unmarshal([]byte(b), &clusterConf)
  783. if err != nil {
  784. return nil, err
  785. }
  786. region := aws.String(clusterConf["region"])
  787. c := &aws.Config{
  788. Region: region,
  789. }
  790. s := session.Must(session.NewSession(c))
  791. ec2Svc := ec2.New(s)
  792. input := &ec2.DescribeVolumesInput{}
  793. volumeResult, err := ec2Svc.DescribeVolumes(input)
  794. if err != nil {
  795. if aerr, ok := err.(awserr.Error); ok {
  796. switch aerr.Code() {
  797. default:
  798. return nil, aerr
  799. }
  800. } else {
  801. return nil, err
  802. }
  803. }
  804. return json.Marshal(volumeResult)
  805. }
  806. // ConvertToGlueColumnFormat takes a string and runs through various regex
  807. // and string replacement statements to convert it to a format compatible
  808. // with AWS Glue and Athena column names.
  809. // Following guidance from AWS provided here ('Column Names' section):
  810. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  811. // It returns a string containing the column name in proper column name format and length.
  812. func ConvertToGlueColumnFormat(column_name string) string {
  813. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  814. // An underscore is added in front of uppercase letters
  815. capital_underscore := regexp.MustCompile(`[A-Z]`)
  816. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  817. // Any non-alphanumeric characters are replaced with an underscore
  818. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  819. final = no_space_punc.ReplaceAllString(final, "_")
  820. // Duplicate underscores are removed
  821. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  822. final = no_dup_underscore.ReplaceAllString(final, "_")
  823. // Any leading and trailing underscores are removed
  824. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  825. final = no_front_end_underscore.ReplaceAllString(final, "")
  826. // Uppercase to lowercase
  827. final = strings.ToLower(final)
  828. // Longer column name than expected - remove _ left to right
  829. allowed_col_len := 128
  830. undersc_to_remove := len(final) - allowed_col_len
  831. if undersc_to_remove > 0 {
  832. final = strings.Replace(final, "_", "", undersc_to_remove)
  833. }
  834. // If removing all of the underscores still didn't
  835. // make the column name < 128 characters, trim it!
  836. if len(final) > allowed_col_len {
  837. final = final[:allowed_col_len]
  838. }
  839. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  840. return final
  841. }
  842. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  843. // "start" and "end" are dates of the format YYYY-MM-DD
  844. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  845. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  846. customPricing, err := a.GetConfig()
  847. if err != nil {
  848. return nil, err
  849. }
  850. aggregator_column_name := "resource_tags_user_kubernetes_" + aggregator
  851. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  852. query := fmt.Sprintf(`SELECT
  853. CAST(line_item_usage_start_date AS DATE) as start_date,
  854. %s,
  855. line_item_product_code,
  856. SUM(line_item_blended_cost) as blended_cost
  857. FROM %s as cost_data
  858. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  859. GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
  860. if customPricing.ServiceKeyName != "" {
  861. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  862. if err != nil {
  863. return nil, err
  864. }
  865. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  866. if err != nil {
  867. return nil, err
  868. }
  869. }
  870. region := aws.String(customPricing.AthenaRegion)
  871. resultsBucket := customPricing.AthenaBucketName
  872. database := customPricing.AthenaDatabase
  873. c := &aws.Config{
  874. Region: region,
  875. }
  876. s := session.Must(session.NewSession(c))
  877. svc := athena.New(s)
  878. var e athena.StartQueryExecutionInput
  879. var r athena.ResultConfiguration
  880. r.SetOutputLocation(resultsBucket)
  881. e.SetResultConfiguration(&r)
  882. e.SetQueryString(query)
  883. var q athena.QueryExecutionContext
  884. q.SetDatabase(database)
  885. e.SetQueryExecutionContext(&q)
  886. res, err := svc.StartQueryExecution(&e)
  887. if err != nil {
  888. return nil, err
  889. }
  890. klog.V(2).Infof("StartQueryExecution result:")
  891. klog.V(2).Infof(res.GoString())
  892. var qri athena.GetQueryExecutionInput
  893. qri.SetQueryExecutionId(*res.QueryExecutionId)
  894. var qrop *athena.GetQueryExecutionOutput
  895. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  896. for {
  897. qrop, err = svc.GetQueryExecution(&qri)
  898. if err != nil {
  899. return nil, err
  900. }
  901. if *qrop.QueryExecution.Status.State != "RUNNING" {
  902. break
  903. }
  904. time.Sleep(duration)
  905. }
  906. var oocAllocs []*OutOfClusterAllocation
  907. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  908. var ip athena.GetQueryResultsInput
  909. ip.SetQueryExecutionId(*res.QueryExecutionId)
  910. op, err := svc.GetQueryResults(&ip)
  911. if err != nil {
  912. return nil, err
  913. }
  914. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  915. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  916. if err != nil {
  917. return nil, err
  918. }
  919. ooc := &OutOfClusterAllocation{
  920. Aggregator: aggregator,
  921. Environment: *r.Data[1].VarCharValue,
  922. Service: *r.Data[2].VarCharValue,
  923. Cost: cost,
  924. }
  925. oocAllocs = append(oocAllocs, ooc)
  926. }
  927. }
  928. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  929. }
  930. // QuerySQL can query a properly configured Athena database.
  931. // Used to fetch billing data.
  932. // Requires a json config in /var/configs with key region, output, and database.
  933. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  934. customPricing, err := a.GetConfig()
  935. if err != nil {
  936. return nil, err
  937. }
  938. if customPricing.ServiceKeyName != "" {
  939. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  940. if err != nil {
  941. return nil, err
  942. }
  943. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  944. if err != nil {
  945. return nil, err
  946. }
  947. }
  948. athenaConfigs, err := os.Open("/var/configs/athena.json")
  949. if err != nil {
  950. return nil, err
  951. }
  952. defer athenaConfigs.Close()
  953. b, err := ioutil.ReadAll(athenaConfigs)
  954. if err != nil {
  955. return nil, err
  956. }
  957. var athenaConf map[string]string
  958. json.Unmarshal([]byte(b), &athenaConf)
  959. region := aws.String(customPricing.AthenaRegion)
  960. resultsBucket := customPricing.AthenaBucketName
  961. database := customPricing.AthenaDatabase
  962. c := &aws.Config{
  963. Region: region,
  964. }
  965. s := session.Must(session.NewSession(c))
  966. svc := athena.New(s)
  967. var e athena.StartQueryExecutionInput
  968. var r athena.ResultConfiguration
  969. r.SetOutputLocation(resultsBucket)
  970. e.SetResultConfiguration(&r)
  971. e.SetQueryString(query)
  972. var q athena.QueryExecutionContext
  973. q.SetDatabase(database)
  974. e.SetQueryExecutionContext(&q)
  975. res, err := svc.StartQueryExecution(&e)
  976. if err != nil {
  977. return nil, err
  978. }
  979. klog.V(2).Infof("StartQueryExecution result:")
  980. klog.V(2).Infof(res.GoString())
  981. var qri athena.GetQueryExecutionInput
  982. qri.SetQueryExecutionId(*res.QueryExecutionId)
  983. var qrop *athena.GetQueryExecutionOutput
  984. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  985. for {
  986. qrop, err = svc.GetQueryExecution(&qri)
  987. if err != nil {
  988. return nil, err
  989. }
  990. if *qrop.QueryExecution.Status.State != "RUNNING" {
  991. break
  992. }
  993. time.Sleep(duration)
  994. }
  995. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  996. var ip athena.GetQueryResultsInput
  997. ip.SetQueryExecutionId(*res.QueryExecutionId)
  998. op, err := svc.GetQueryResults(&ip)
  999. if err != nil {
  1000. return nil, err
  1001. }
  1002. b, err := json.Marshal(op.ResultSet)
  1003. if err != nil {
  1004. return nil, err
  1005. }
  1006. return b, nil
  1007. }
  1008. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1009. }
  1010. type spotInfo struct {
  1011. Timestamp string `csv:"Timestamp"`
  1012. UsageType string `csv:"UsageType"`
  1013. Operation string `csv:"Operation"`
  1014. InstanceID string `csv:"InstanceID"`
  1015. MyBidID string `csv:"MyBidID"`
  1016. MyMaxPrice string `csv:"MyMaxPrice"`
  1017. MarketPrice string `csv:"MarketPrice"`
  1018. Charge string `csv:"Charge"`
  1019. Version string `csv:"Version"`
  1020. }
  1021. type fnames []*string
  1022. func (f fnames) Len() int {
  1023. return len(f)
  1024. }
  1025. func (f fnames) Swap(i, j int) {
  1026. f[i], f[j] = f[j], f[i]
  1027. }
  1028. func (f fnames) Less(i, j int) bool {
  1029. key1 := strings.Split(*f[i], ".")
  1030. key2 := strings.Split(*f[j], ".")
  1031. t1, err := time.Parse("2006-01-02-15", key1[1])
  1032. if err != nil {
  1033. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1034. return false
  1035. }
  1036. t2, err := time.Parse("2006-01-02-15", key2[1])
  1037. if err != nil {
  1038. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1039. return false
  1040. }
  1041. return t1.Before(t2)
  1042. }
  1043. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1044. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1045. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  1046. if err != nil {
  1047. return nil, err
  1048. }
  1049. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  1050. if err != nil {
  1051. return nil, err
  1052. }
  1053. }
  1054. s3Prefix := projectID
  1055. if len(prefix) != 0 {
  1056. s3Prefix = prefix + "/" + s3Prefix
  1057. }
  1058. c := aws.NewConfig().WithRegion(region)
  1059. s := session.Must(session.NewSession(c))
  1060. s3Svc := s3.New(s)
  1061. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1062. tNow := time.Now()
  1063. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1064. ls := &s3.ListObjectsInput{
  1065. Bucket: aws.String(bucket),
  1066. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1067. }
  1068. ls2 := &s3.ListObjectsInput{
  1069. Bucket: aws.String(bucket),
  1070. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1071. }
  1072. lso, err := s3Svc.ListObjects(ls)
  1073. if err != nil {
  1074. return nil, err
  1075. }
  1076. lsoLen := len(lso.Contents)
  1077. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1078. if lsoLen == 0 {
  1079. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1080. }
  1081. lso2, err := s3Svc.ListObjects(ls2)
  1082. if err != nil {
  1083. return nil, err
  1084. }
  1085. lso2Len := len(lso2.Contents)
  1086. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1087. if lso2Len == 0 {
  1088. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1089. }
  1090. var keys []*string
  1091. for _, obj := range lso.Contents {
  1092. keys = append(keys, obj.Key)
  1093. }
  1094. for _, obj := range lso2.Contents {
  1095. keys = append(keys, obj.Key)
  1096. }
  1097. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1098. header, err := csvutil.Header(spotInfo{}, "csv")
  1099. if err != nil {
  1100. return nil, err
  1101. }
  1102. fieldsPerRecord := len(header)
  1103. spots := make(map[string]*spotInfo)
  1104. for _, key := range keys {
  1105. getObj := &s3.GetObjectInput{
  1106. Bucket: aws.String(bucket),
  1107. Key: key,
  1108. }
  1109. buf := aws.NewWriteAtBuffer([]byte{})
  1110. _, err := downloader.Download(buf, getObj)
  1111. if err != nil {
  1112. return nil, err
  1113. }
  1114. r := bytes.NewReader(buf.Bytes())
  1115. gr, err := gzip.NewReader(r)
  1116. if err != nil {
  1117. return nil, err
  1118. }
  1119. csvReader := csv.NewReader(gr)
  1120. csvReader.Comma = '\t'
  1121. csvReader.FieldsPerRecord = fieldsPerRecord
  1122. dec, err := csvutil.NewDecoder(csvReader, header...)
  1123. if err != nil {
  1124. return nil, err
  1125. }
  1126. var foundVersion string
  1127. for {
  1128. spot := spotInfo{}
  1129. err := dec.Decode(&spot)
  1130. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1131. if err == io.EOF {
  1132. break
  1133. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1134. rec := dec.Record()
  1135. // the first two "Record()" will be the comment lines
  1136. // and they show up as len() == 1
  1137. // the first of which is "#Version"
  1138. // the second of which is "#Fields: "
  1139. if len(rec) != 1 {
  1140. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1141. continue
  1142. }
  1143. if len(foundVersion) == 0 {
  1144. spotFeedVersion := rec[0]
  1145. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1146. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1147. if matches != nil {
  1148. foundVersion = matches[1]
  1149. if foundVersion != supportedSpotFeedVersion {
  1150. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1151. break
  1152. }
  1153. }
  1154. continue
  1155. } else if strings.Index(rec[0], "#") == 0 {
  1156. continue
  1157. } else {
  1158. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1159. continue
  1160. }
  1161. } else if err != nil {
  1162. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1163. continue
  1164. }
  1165. klog.V(3).Infof("Found spot info %+v", spot)
  1166. spots[spot.InstanceID] = &spot
  1167. }
  1168. gr.Close()
  1169. }
  1170. return spots, nil
  1171. }