awsprovider.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "k8s.io/klog"
  18. "github.com/aws/aws-sdk-go/aws"
  19. "github.com/aws/aws-sdk-go/aws/awserr"
  20. "github.com/aws/aws-sdk-go/aws/session"
  21. "github.com/aws/aws-sdk-go/service/athena"
  22. "github.com/aws/aws-sdk-go/service/ec2"
  23. "github.com/aws/aws-sdk-go/service/s3"
  24. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  25. "github.com/jszwec/csvutil"
  26. v1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/client-go/kubernetes"
  29. )
  30. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  31. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  32. const supportedSpotFeedVersion = "1"
  33. const SpotInfoUpdateType = "spotinfo"
  34. const AthenaInfoUpdateType = "athenainfo"
  35. // AWS represents an Amazon Provider
  36. type AWS struct {
  37. Pricing map[string]*AWSProductTerms
  38. SpotPricingByInstanceID map[string]*spotInfo
  39. ValidPricingKeys map[string]bool
  40. Clientset *kubernetes.Clientset
  41. BaseCPUPrice string
  42. BaseSpotCPUPrice string
  43. BaseSpotRAMPrice string
  44. SpotLabelName string
  45. SpotLabelValue string
  46. ServiceKeyName string
  47. ServiceKeySecret string
  48. SpotDataRegion string
  49. SpotDataBucket string
  50. SpotDataPrefix string
  51. ProjectID string
  52. *CustomProvider
  53. }
  54. // AWSPricing maps a k8s node to an AWS Pricing "product"
  55. type AWSPricing struct {
  56. Products map[string]*AWSProduct `json:"products"`
  57. Terms AWSPricingTerms `json:"terms"`
  58. }
  59. // AWSProduct represents a purchased SKU
  60. type AWSProduct struct {
  61. Sku string `json:"sku"`
  62. Attributes AWSProductAttributes `json:"attributes"`
  63. }
  64. // AWSProductAttributes represents metadata about the product used to map to a node.
  65. type AWSProductAttributes struct {
  66. Location string `json:"location"`
  67. InstanceType string `json:"instanceType"`
  68. Memory string `json:"memory"`
  69. Storage string `json:"storage"`
  70. VCpu string `json:"vcpu"`
  71. UsageType string `json:"usagetype"`
  72. OperatingSystem string `json:"operatingSystem"`
  73. PreInstalledSw string `json:"preInstalledSw"`
  74. InstanceFamily string `json:"instanceFamily"`
  75. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  76. }
  77. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  78. type AWSPricingTerms struct {
  79. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  80. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  81. }
  82. // AWSOfferTerm is a sku extension used to pay for the node.
  83. type AWSOfferTerm struct {
  84. Sku string `json:"sku"`
  85. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  86. }
  87. // AWSRateCode encodes data about the price of a product
  88. type AWSRateCode struct {
  89. Unit string `json:"unit"`
  90. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  91. }
  92. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  93. type AWSCurrencyCode struct {
  94. USD string `json:"USD"`
  95. }
  96. // AWSProductTerms represents the full terms of the product
  97. type AWSProductTerms struct {
  98. Sku string `json:"sku"`
  99. OnDemand *AWSOfferTerm `json:"OnDemand"`
  100. Reserved *AWSOfferTerm `json:"Reserved"`
  101. Memory string `json:"memory"`
  102. Storage string `json:"storage"`
  103. VCpu string `json:"vcpu"`
  104. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  105. }
  106. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  107. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  108. // OnDemandRateCode is appended to an node sku
  109. const OnDemandRateCode = ".JRTCKXETXF"
  110. // ReservedRateCode is appended to a node sku
  111. const ReservedRateCode = ".38NPMPTW36"
  112. // HourlyRateCode is appended to a node sku
  113. const HourlyRateCode = ".6YS6EN2CT7"
  114. // KubeAttrConversion maps the k8s labels for region to an aws region
  115. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  116. locationToRegion := map[string]string{
  117. "US East (Ohio)": "us-east-2",
  118. "US East (N. Virginia)": "us-east-1",
  119. "US West (N. California)": "us-west-1",
  120. "US West (Oregon)": "us-west-2",
  121. "Asia Pacific (Mumbai)": "ap-south-1",
  122. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  123. "Asia Pacific (Seoul)": "ap-northeast-2",
  124. "Asia Pacific (Singapore)": "ap-southeast-1",
  125. "Asia Pacific (Sydney)": "ap-southeast-2",
  126. "Asia Pacific (Tokyo)": "ap-northeast-1",
  127. "Canada (Central)": "ca-central-1",
  128. "China (Beijing)": "cn-north-1",
  129. "China (Ningxia)": "cn-northwest-1",
  130. "EU (Frankfurt)": "eu-central-1",
  131. "EU (Ireland)": "eu-west-1",
  132. "EU (London)": "eu-west-2",
  133. "EU (Paris)": "eu-west-3",
  134. "EU (Stockholm)": "eu-north-1",
  135. "South America (São Paulo)": "sa-east-1",
  136. "AWS GovCloud (US-East)": "us-gov-east-1",
  137. "AWS GovCloud (US)": "us-gov-west-1",
  138. }
  139. operatingSystem = strings.ToLower(operatingSystem)
  140. region := locationToRegion[location]
  141. return region + "," + instanceType + "," + operatingSystem
  142. }
  143. type AwsSpotFeedInfo struct {
  144. BucketName string `json:"bucketName"`
  145. Prefix string `json:"prefix"`
  146. Region string `json:"region"`
  147. AccountID string `json:"projectID"`
  148. ServiceKeyName string `json:"serviceKeyName"`
  149. ServiceKeySecret string `json:"serviceKeySecret"`
  150. SpotLabel string `json:"spotLabel"`
  151. SpotLabelValue string `json:"spotLabelValue"`
  152. }
  153. type AwsAthenaInfo struct {
  154. AthenaBucketName string `json:"athenaBucketName"`
  155. AthenaRegion string `json:"athenaRegion"`
  156. AthenaDatabase string `json:"athenaDatabase"`
  157. AthenaTable string `json:"athenaTable"`
  158. ServiceKeyName string `json:"serviceKeyName"`
  159. ServiceKeySecret string `json:"serviceKeySecret"`
  160. AccountID string `json:"projectID"`
  161. }
  162. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  163. c, err := GetDefaultPricingData("aws.json")
  164. if err != nil {
  165. return nil, err
  166. }
  167. return c, nil
  168. }
  169. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  170. c, err := GetDefaultPricingData("aws.json")
  171. if err != nil {
  172. return nil, err
  173. }
  174. if updateType == SpotInfoUpdateType {
  175. a := AwsSpotFeedInfo{}
  176. err := json.NewDecoder(r).Decode(&a)
  177. if err != nil {
  178. return nil, err
  179. }
  180. if err != nil {
  181. return nil, err
  182. }
  183. c.ServiceKeyName = a.ServiceKeyName
  184. c.ServiceKeySecret = a.ServiceKeySecret
  185. c.SpotDataPrefix = a.Prefix
  186. c.SpotDataBucket = a.BucketName
  187. c.ProjectID = a.AccountID
  188. c.SpotDataRegion = a.Region
  189. c.SpotLabel = a.SpotLabel
  190. c.SpotLabelValue = a.SpotLabelValue
  191. } else if updateType == AthenaInfoUpdateType {
  192. a := AwsAthenaInfo{}
  193. err := json.NewDecoder(r).Decode(&a)
  194. if err != nil {
  195. return nil, err
  196. }
  197. c.AthenaBucketName = a.AthenaBucketName
  198. c.AthenaRegion = a.AthenaRegion
  199. c.AthenaDatabase = a.AthenaDatabase
  200. c.AthenaTable = a.AthenaTable
  201. c.ServiceKeyName = a.ServiceKeyName
  202. c.ServiceKeySecret = a.ServiceKeySecret
  203. c.ProjectID = a.AccountID
  204. } else {
  205. a := make(map[string]string)
  206. err = json.NewDecoder(r).Decode(&a)
  207. if err != nil {
  208. return nil, err
  209. }
  210. for k, v := range a {
  211. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  212. err := SetCustomPricingField(c, kUpper, v)
  213. if err != nil {
  214. return nil, err
  215. }
  216. }
  217. }
  218. cj, err := json.Marshal(c)
  219. if err != nil {
  220. return nil, err
  221. }
  222. path := os.Getenv("CONFIG_PATH")
  223. if path == "" {
  224. path = "/models/"
  225. }
  226. path += "aws.json"
  227. err = ioutil.WriteFile(path, cj, 0644)
  228. if err != nil {
  229. return nil, err
  230. }
  231. return c, nil
  232. }
  233. type awsKey struct {
  234. SpotLabelName string
  235. SpotLabelValue string
  236. Labels map[string]string
  237. ProviderID string
  238. }
  239. func (k *awsKey) GPUType() string {
  240. return ""
  241. }
  242. func (k *awsKey) ID() string {
  243. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  244. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  245. if matchNum == 2 {
  246. return group
  247. }
  248. }
  249. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  250. return ""
  251. }
  252. func (k *awsKey) Features() string {
  253. instanceType := k.Labels[v1.LabelInstanceType]
  254. var operatingSystem string
  255. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  256. if !ok {
  257. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  258. }
  259. region := k.Labels[v1.LabelZoneRegion]
  260. key := region + "," + instanceType + "," + operatingSystem
  261. usageType := "preemptible"
  262. spotKey := key + "," + usageType
  263. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  264. return spotKey
  265. }
  266. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  267. return spotKey
  268. }
  269. return key
  270. }
  271. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  272. return nil, nil
  273. }
  274. func (key *awsPVKey) Features() string {
  275. storageClass := key.StorageClassName
  276. if storageClass == "standard" {
  277. storageClass = "pdstandard"
  278. }
  279. return key.Labels[v1.LabelZoneRegion] + storageClass
  280. }
  281. type awsPVKey struct {
  282. Labels map[string]string
  283. StorageClassParameters map[string]string
  284. StorageClassName string
  285. }
  286. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  287. return &awsPVKey{
  288. Labels: pv.Labels,
  289. StorageClassName: pv.Spec.StorageClassName,
  290. }
  291. }
  292. // GetKey maps node labels to information needed to retrieve pricing data
  293. func (aws *AWS) GetKey(labels map[string]string) Key {
  294. return &awsKey{
  295. SpotLabelName: aws.SpotLabelName,
  296. SpotLabelValue: aws.SpotLabelValue,
  297. Labels: labels,
  298. ProviderID: labels["providerID"],
  299. }
  300. }
  301. func (aws *AWS) isPreemptible(key string) bool {
  302. s := strings.Split(key, ",")
  303. if len(s) == 4 && s[3] == "preemptible" {
  304. return true
  305. }
  306. return false
  307. }
  308. // DownloadPricingData fetches data from the AWS Pricing API
  309. func (aws *AWS) DownloadPricingData() error {
  310. c, err := GetDefaultPricingData("aws.json")
  311. if err != nil {
  312. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  313. }
  314. aws.BaseCPUPrice = c.CPU
  315. aws.BaseSpotCPUPrice = c.SpotCPU
  316. aws.BaseSpotRAMPrice = c.SpotRAM
  317. aws.SpotLabelName = c.SpotLabel
  318. aws.SpotLabelValue = c.SpotLabelValue
  319. aws.SpotDataBucket = c.SpotDataBucket
  320. aws.SpotDataPrefix = c.SpotDataPrefix
  321. aws.ProjectID = c.ProjectID
  322. aws.SpotDataRegion = c.SpotDataRegion
  323. aws.ServiceKeyName = c.ServiceKeyName
  324. aws.ServiceKeySecret = c.ServiceKeySecret
  325. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  326. return fmt.Errorf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  327. }
  328. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  329. if err != nil {
  330. return err
  331. }
  332. inputkeys := make(map[string]bool)
  333. for _, n := range nodeList.Items {
  334. labels := n.GetObjectMeta().GetLabels()
  335. key := aws.GetKey(labels)
  336. inputkeys[key.Features()] = true
  337. }
  338. aws.Pricing = make(map[string]*AWSProductTerms)
  339. aws.ValidPricingKeys = make(map[string]bool)
  340. skusToKeys := make(map[string]string)
  341. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  342. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  343. resp, err := http.Get(pricingURL)
  344. if err != nil {
  345. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  346. return err
  347. }
  348. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  349. dec := json.NewDecoder(resp.Body)
  350. for {
  351. t, err := dec.Token()
  352. if err == io.EOF {
  353. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  354. break
  355. }
  356. if t == "products" {
  357. _, err := dec.Token() // this should parse the opening "{""
  358. if err != nil {
  359. return err
  360. }
  361. for dec.More() {
  362. _, err := dec.Token() // the sku token
  363. if err != nil {
  364. return err
  365. }
  366. product := &AWSProduct{}
  367. err = dec.Decode(&product)
  368. if err != nil {
  369. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  370. break
  371. }
  372. if product.Attributes.PreInstalledSw == "NA" &&
  373. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  374. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  375. spotKey := key + ",preemptible"
  376. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  377. productTerms := &AWSProductTerms{
  378. Sku: product.Sku,
  379. Memory: product.Attributes.Memory,
  380. Storage: product.Attributes.Storage,
  381. VCpu: product.Attributes.VCpu,
  382. GPU: product.Attributes.GPU,
  383. }
  384. aws.Pricing[key] = productTerms
  385. aws.Pricing[spotKey] = productTerms
  386. skusToKeys[product.Sku] = key
  387. }
  388. aws.ValidPricingKeys[key] = true
  389. aws.ValidPricingKeys[spotKey] = true
  390. }
  391. }
  392. }
  393. if t == "terms" {
  394. _, err := dec.Token() // this should parse the opening "{""
  395. if err != nil {
  396. return err
  397. }
  398. termType, err := dec.Token()
  399. if err != nil {
  400. return err
  401. }
  402. if termType == "OnDemand" {
  403. _, err := dec.Token()
  404. if err != nil { // again, should parse an opening "{"
  405. return err
  406. }
  407. for dec.More() {
  408. sku, err := dec.Token()
  409. if err != nil {
  410. return err
  411. }
  412. _, err = dec.Token() // another opening "{"
  413. if err != nil {
  414. return err
  415. }
  416. skuOnDemand, err := dec.Token()
  417. if err != nil {
  418. return err
  419. }
  420. offerTerm := &AWSOfferTerm{}
  421. err = dec.Decode(&offerTerm)
  422. if err != nil {
  423. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  424. }
  425. if sku.(string)+OnDemandRateCode == skuOnDemand {
  426. key, ok := skusToKeys[sku.(string)]
  427. spotKey := key + ",preemptible"
  428. if ok {
  429. aws.Pricing[key].OnDemand = offerTerm
  430. aws.Pricing[spotKey].OnDemand = offerTerm
  431. }
  432. }
  433. _, err = dec.Token()
  434. if err != nil {
  435. return err
  436. }
  437. }
  438. _, err = dec.Token()
  439. if err != nil {
  440. return err
  441. }
  442. }
  443. }
  444. }
  445. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  446. if err != nil {
  447. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  448. } else {
  449. aws.SpotPricingByInstanceID = sp
  450. }
  451. return nil
  452. }
  453. // AllNodePricing returns all the billing data fetched.
  454. func (aws *AWS) AllNodePricing() (interface{}, error) {
  455. return aws.Pricing, nil
  456. }
  457. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  458. key := k.Features()
  459. if aws.isPreemptible(key) {
  460. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  461. var spotcost string
  462. arr := strings.Split(spotInfo.Charge, " ")
  463. if len(arr) == 2 {
  464. spotcost = arr[0]
  465. } else {
  466. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  467. }
  468. return &Node{
  469. Cost: spotcost,
  470. VCPU: terms.VCpu,
  471. RAM: terms.Memory,
  472. GPU: terms.GPU,
  473. Storage: terms.Storage,
  474. BaseCPUPrice: aws.BaseCPUPrice,
  475. UsageType: usageType,
  476. }, nil
  477. }
  478. return &Node{
  479. VCPU: terms.VCpu,
  480. VCPUCost: aws.BaseSpotCPUPrice,
  481. RAM: terms.Memory,
  482. GPU: terms.GPU,
  483. RAMCost: aws.BaseSpotRAMPrice,
  484. Storage: terms.Storage,
  485. BaseCPUPrice: aws.BaseCPUPrice,
  486. UsageType: usageType,
  487. }, nil
  488. }
  489. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  490. if !ok {
  491. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  492. }
  493. cost := c.PricePerUnit.USD
  494. return &Node{
  495. Cost: cost,
  496. VCPU: terms.VCpu,
  497. RAM: terms.Memory,
  498. GPU: terms.GPU,
  499. Storage: terms.Storage,
  500. BaseCPUPrice: aws.BaseCPUPrice,
  501. UsageType: usageType,
  502. }, nil
  503. }
  504. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  505. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  506. //return json.Marshal(aws.Pricing[key])
  507. key := k.Features()
  508. usageType := "ondemand"
  509. if aws.isPreemptible(key) {
  510. usageType = "preemptible"
  511. }
  512. terms, ok := aws.Pricing[key]
  513. if ok {
  514. return aws.createNode(terms, usageType, k)
  515. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  516. err := aws.DownloadPricingData()
  517. if err != nil {
  518. return nil, err
  519. }
  520. terms, termsOk := aws.Pricing[key]
  521. if !termsOk {
  522. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  523. }
  524. return aws.createNode(terms, usageType, k)
  525. } else { // Fall back to base pricing if we can't find the key.
  526. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  527. return &Node{
  528. Cost: aws.BaseCPUPrice,
  529. BaseCPUPrice: aws.BaseCPUPrice,
  530. UsageType: usageType,
  531. UsesBaseCPUPrice: true,
  532. }, nil
  533. }
  534. }
  535. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  536. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  537. defaultClusterName := "AWS Cluster #1"
  538. makeStructure := func(clusterName string) ([]byte, error) {
  539. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  540. m := make(map[string]string)
  541. m["name"] = clusterName
  542. m["provider"] = "AWS"
  543. return json.Marshal(m)
  544. }
  545. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  546. if len(maybeClusterId) != 0 {
  547. return makeStructure(maybeClusterId)
  548. }
  549. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  550. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  551. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  552. if err != nil {
  553. return nil, err
  554. }
  555. for _, n := range nodeList.Items {
  556. region := ""
  557. instanceId := ""
  558. providerId := n.Spec.ProviderID
  559. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  560. if matchNum == 1 {
  561. region = group
  562. } else if matchNum == 2 {
  563. instanceId = group
  564. }
  565. }
  566. if len(instanceId) == 0 {
  567. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  568. continue
  569. }
  570. c := &aws.Config{
  571. Region: aws.String(region),
  572. }
  573. s := session.Must(session.NewSession(c))
  574. ec2Svc := ec2.New(s)
  575. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  576. InstanceIds: []*string{
  577. aws.String(instanceId),
  578. },
  579. })
  580. if diErr != nil {
  581. // maybe log this?
  582. continue
  583. }
  584. if len(di.Reservations) != 1 {
  585. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  586. continue
  587. }
  588. res := di.Reservations[0]
  589. if len(res.Instances) != 1 {
  590. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  591. continue
  592. }
  593. inst := res.Instances[0]
  594. for _, tag := range inst.Tags {
  595. tagKey := *tag.Key
  596. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  597. if matchNum != 1 {
  598. continue
  599. }
  600. return makeStructure(group)
  601. }
  602. }
  603. }
  604. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  605. return makeStructure(defaultClusterName)
  606. }
  607. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  608. func (*AWS) AddServiceKey(formValues url.Values) error {
  609. keyID := formValues.Get("access_key_ID")
  610. key := formValues.Get("secret_access_key")
  611. m := make(map[string]string)
  612. m["access_key_ID"] = keyID
  613. m["secret_access_key"] = key
  614. result, err := json.Marshal(m)
  615. if err != nil {
  616. return err
  617. }
  618. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  619. }
  620. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  621. func (*AWS) GetDisks() ([]byte, error) {
  622. jsonFile, err := os.Open("/var/configs/key.json")
  623. if err == nil {
  624. byteValue, _ := ioutil.ReadAll(jsonFile)
  625. var result map[string]string
  626. err := json.Unmarshal([]byte(byteValue), &result)
  627. if err != nil {
  628. return nil, err
  629. }
  630. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  631. if err != nil {
  632. return nil, err
  633. }
  634. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  635. if err != nil {
  636. return nil, err
  637. }
  638. } else if os.IsNotExist(err) {
  639. klog.V(2).Infof("Using Default Credentials")
  640. } else {
  641. return nil, err
  642. }
  643. defer jsonFile.Close()
  644. clusterConfig, err := os.Open("/var/configs/cluster.json")
  645. if err != nil {
  646. return nil, err
  647. }
  648. defer clusterConfig.Close()
  649. b, err := ioutil.ReadAll(clusterConfig)
  650. if err != nil {
  651. return nil, err
  652. }
  653. var clusterConf map[string]string
  654. err = json.Unmarshal([]byte(b), &clusterConf)
  655. if err != nil {
  656. return nil, err
  657. }
  658. region := aws.String(clusterConf["region"])
  659. c := &aws.Config{
  660. Region: region,
  661. }
  662. s := session.Must(session.NewSession(c))
  663. ec2Svc := ec2.New(s)
  664. input := &ec2.DescribeVolumesInput{}
  665. volumeResult, err := ec2Svc.DescribeVolumes(input)
  666. if err != nil {
  667. if aerr, ok := err.(awserr.Error); ok {
  668. switch aerr.Code() {
  669. default:
  670. return nil, aerr
  671. }
  672. } else {
  673. return nil, err
  674. }
  675. }
  676. return json.Marshal(volumeResult)
  677. }
  678. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  679. // "start" and "end" are dates of the format YYYY-MM-DD
  680. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  681. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  682. customPricing, err := a.GetConfig()
  683. if err != nil {
  684. return nil, err
  685. }
  686. query := fmt.Sprintf(`SELECT
  687. CAST(line_item_usage_start_date AS DATE) as start_date,
  688. resource_tags_user_kubernetes_%s,
  689. line_item_product_code,
  690. SUM(line_item_blended_cost) as blended_cost
  691. FROM %s as cost_data
  692. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  693. GROUP BY 1,2,3`, aggregator, customPricing.AthenaTable, start, end)
  694. if customPricing.ServiceKeyName != "" {
  695. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  696. if err != nil {
  697. return nil, err
  698. }
  699. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  700. if err != nil {
  701. return nil, err
  702. }
  703. }
  704. region := aws.String(customPricing.AthenaRegion)
  705. resultsBucket := customPricing.AthenaBucketName
  706. database := customPricing.AthenaDatabase
  707. c := &aws.Config{
  708. Region: region,
  709. }
  710. s := session.Must(session.NewSession(c))
  711. svc := athena.New(s)
  712. var e athena.StartQueryExecutionInput
  713. var r athena.ResultConfiguration
  714. r.SetOutputLocation(resultsBucket)
  715. e.SetResultConfiguration(&r)
  716. e.SetQueryString(query)
  717. var q athena.QueryExecutionContext
  718. q.SetDatabase(database)
  719. e.SetQueryExecutionContext(&q)
  720. res, err := svc.StartQueryExecution(&e)
  721. if err != nil {
  722. return nil, err
  723. }
  724. klog.V(2).Infof("StartQueryExecution result:")
  725. klog.V(2).Infof(res.GoString())
  726. var qri athena.GetQueryExecutionInput
  727. qri.SetQueryExecutionId(*res.QueryExecutionId)
  728. var qrop *athena.GetQueryExecutionOutput
  729. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  730. for {
  731. qrop, err = svc.GetQueryExecution(&qri)
  732. if err != nil {
  733. return nil, err
  734. }
  735. if *qrop.QueryExecution.Status.State != "RUNNING" {
  736. break
  737. }
  738. time.Sleep(duration)
  739. }
  740. var oocAllocs []*OutOfClusterAllocation
  741. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  742. var ip athena.GetQueryResultsInput
  743. ip.SetQueryExecutionId(*res.QueryExecutionId)
  744. op, err := svc.GetQueryResults(&ip)
  745. if err != nil {
  746. return nil, err
  747. }
  748. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  749. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  750. if err != nil {
  751. return nil, err
  752. }
  753. ooc := &OutOfClusterAllocation{
  754. Aggregator: aggregator,
  755. Environment: *r.Data[1].VarCharValue,
  756. Service: *r.Data[2].VarCharValue,
  757. Cost: cost,
  758. }
  759. oocAllocs = append(oocAllocs, ooc)
  760. }
  761. }
  762. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  763. }
  764. // QuerySQL can query a properly configured Athena database.
  765. // Used to fetch billing data.
  766. // Requires a json config in /var/configs with key region, output, and database.
  767. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  768. customPricing, err := a.GetConfig()
  769. if err != nil {
  770. return nil, err
  771. }
  772. if customPricing.ServiceKeyName != "" {
  773. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  774. if err != nil {
  775. return nil, err
  776. }
  777. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  778. if err != nil {
  779. return nil, err
  780. }
  781. }
  782. athenaConfigs, err := os.Open("/var/configs/athena.json")
  783. if err != nil {
  784. return nil, err
  785. }
  786. defer athenaConfigs.Close()
  787. b, err := ioutil.ReadAll(athenaConfigs)
  788. if err != nil {
  789. return nil, err
  790. }
  791. var athenaConf map[string]string
  792. json.Unmarshal([]byte(b), &athenaConf)
  793. region := aws.String(customPricing.AthenaRegion)
  794. resultsBucket := customPricing.AthenaBucketName
  795. database := customPricing.AthenaDatabase
  796. c := &aws.Config{
  797. Region: region,
  798. }
  799. s := session.Must(session.NewSession(c))
  800. svc := athena.New(s)
  801. var e athena.StartQueryExecutionInput
  802. var r athena.ResultConfiguration
  803. r.SetOutputLocation(resultsBucket)
  804. e.SetResultConfiguration(&r)
  805. e.SetQueryString(query)
  806. var q athena.QueryExecutionContext
  807. q.SetDatabase(database)
  808. e.SetQueryExecutionContext(&q)
  809. res, err := svc.StartQueryExecution(&e)
  810. if err != nil {
  811. return nil, err
  812. }
  813. klog.V(2).Infof("StartQueryExecution result:")
  814. klog.V(2).Infof(res.GoString())
  815. var qri athena.GetQueryExecutionInput
  816. qri.SetQueryExecutionId(*res.QueryExecutionId)
  817. var qrop *athena.GetQueryExecutionOutput
  818. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  819. for {
  820. qrop, err = svc.GetQueryExecution(&qri)
  821. if err != nil {
  822. return nil, err
  823. }
  824. if *qrop.QueryExecution.Status.State != "RUNNING" {
  825. break
  826. }
  827. time.Sleep(duration)
  828. }
  829. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  830. var ip athena.GetQueryResultsInput
  831. ip.SetQueryExecutionId(*res.QueryExecutionId)
  832. op, err := svc.GetQueryResults(&ip)
  833. if err != nil {
  834. return nil, err
  835. }
  836. b, err := json.Marshal(op.ResultSet)
  837. if err != nil {
  838. return nil, err
  839. }
  840. return b, nil
  841. }
  842. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  843. }
  844. type spotInfo struct {
  845. Timestamp string `csv:"Timestamp"`
  846. UsageType string `csv:"UsageType"`
  847. Operation string `csv:"Operation"`
  848. InstanceID string `csv:"InstanceID"`
  849. MyBidID string `csv:"MyBidID"`
  850. MyMaxPrice string `csv:"MyMaxPrice"`
  851. MarketPrice string `csv:"MarketPrice"`
  852. Charge string `csv:"Charge"`
  853. Version string `csv:"Version"`
  854. }
  855. type fnames []*string
  856. func (f fnames) Len() int {
  857. return len(f)
  858. }
  859. func (f fnames) Swap(i, j int) {
  860. f[i], f[j] = f[j], f[i]
  861. }
  862. func (f fnames) Less(i, j int) bool {
  863. key1 := strings.Split(*f[i], ".")
  864. key2 := strings.Split(*f[j], ".")
  865. t1, err := time.Parse("2006-01-02-15", key1[1])
  866. if err != nil {
  867. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  868. return false
  869. }
  870. t2, err := time.Parse("2006-01-02-15", key2[1])
  871. if err != nil {
  872. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  873. return false
  874. }
  875. return t1.Before(t2)
  876. }
  877. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  878. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  879. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  880. if err != nil {
  881. return nil, err
  882. }
  883. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  884. if err != nil {
  885. return nil, err
  886. }
  887. }
  888. s3Prefix := projectID
  889. if len(prefix) != 0 {
  890. s3Prefix = prefix + "/" + s3Prefix
  891. }
  892. c := aws.NewConfig().WithRegion(region)
  893. s := session.Must(session.NewSession(c))
  894. s3Svc := s3.New(s)
  895. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  896. tNow := time.Now()
  897. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  898. ls := &s3.ListObjectsInput{
  899. Bucket: aws.String(bucket),
  900. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  901. }
  902. ls2 := &s3.ListObjectsInput{
  903. Bucket: aws.String(bucket),
  904. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  905. }
  906. lso, err := s3Svc.ListObjects(ls)
  907. if err != nil {
  908. return nil, err
  909. }
  910. lsoLen := len(lso.Contents)
  911. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  912. if lsoLen == 0 {
  913. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  914. }
  915. lso2, err := s3Svc.ListObjects(ls2)
  916. if err != nil {
  917. return nil, err
  918. }
  919. lso2Len := len(lso2.Contents)
  920. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  921. if lso2Len == 0 {
  922. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  923. }
  924. var keys []*string
  925. for _, obj := range lso.Contents {
  926. keys = append(keys, obj.Key)
  927. }
  928. for _, obj := range lso2.Contents {
  929. keys = append(keys, obj.Key)
  930. }
  931. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  932. header, err := csvutil.Header(spotInfo{}, "csv")
  933. if err != nil {
  934. return nil, err
  935. }
  936. fieldsPerRecord := len(header)
  937. spots := make(map[string]*spotInfo)
  938. for _, key := range keys {
  939. getObj := &s3.GetObjectInput{
  940. Bucket: aws.String(bucket),
  941. Key: key,
  942. }
  943. buf := aws.NewWriteAtBuffer([]byte{})
  944. _, err := downloader.Download(buf, getObj)
  945. if err != nil {
  946. return nil, err
  947. }
  948. r := bytes.NewReader(buf.Bytes())
  949. gr, err := gzip.NewReader(r)
  950. if err != nil {
  951. return nil, err
  952. }
  953. csvReader := csv.NewReader(gr)
  954. csvReader.Comma = '\t'
  955. csvReader.FieldsPerRecord = fieldsPerRecord
  956. dec, err := csvutil.NewDecoder(csvReader, header...)
  957. if err != nil {
  958. return nil, err
  959. }
  960. var foundVersion string
  961. for {
  962. spot := spotInfo{}
  963. err := dec.Decode(&spot)
  964. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  965. if err == io.EOF {
  966. break
  967. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  968. rec := dec.Record()
  969. // the first two "Record()" will be the comment lines
  970. // and they show up as len() == 1
  971. // the first of which is "#Version"
  972. // the second of which is "#Fields: "
  973. if len(rec) != 1 {
  974. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  975. continue
  976. }
  977. if len(foundVersion) == 0 {
  978. spotFeedVersion := rec[0]
  979. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  980. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  981. if matches != nil {
  982. foundVersion = matches[1]
  983. if foundVersion != supportedSpotFeedVersion {
  984. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  985. break
  986. }
  987. }
  988. continue
  989. } else if strings.Index(rec[0], "#") == 0 {
  990. continue
  991. } else {
  992. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  993. continue
  994. }
  995. } else if err != nil {
  996. klog.V(2).Infof("Error during spot info decode: %+v", err)
  997. continue
  998. }
  999. klog.V(3).Infof("Found spot info %+v", spot)
  1000. spots[spot.InstanceID] = &spot
  1001. }
  1002. gr.Close()
  1003. }
  1004. return spots, nil
  1005. }