awsprovider.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "k8s.io/klog"
  18. "github.com/aws/aws-sdk-go/aws"
  19. "github.com/aws/aws-sdk-go/aws/awserr"
  20. "github.com/aws/aws-sdk-go/aws/session"
  21. "github.com/aws/aws-sdk-go/service/athena"
  22. "github.com/aws/aws-sdk-go/service/ec2"
  23. "github.com/aws/aws-sdk-go/service/s3"
  24. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  25. "github.com/jszwec/csvutil"
  26. v1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/client-go/kubernetes"
  29. )
  30. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  31. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  32. const supportedSpotFeedVersion = "1"
  33. const SpotInfoUpdateType = "spotinfo"
  34. const AthenaInfoUpdateType = "athenainfo"
  35. // AWS represents an Amazon Provider
  36. type AWS struct {
  37. Pricing map[string]*AWSProductTerms
  38. SpotPricingByInstanceID map[string]*spotInfo
  39. ValidPricingKeys map[string]bool
  40. Clientset *kubernetes.Clientset
  41. BaseCPUPrice string
  42. BaseSpotCPUPrice string
  43. BaseSpotRAMPrice string
  44. SpotLabelName string
  45. SpotLabelValue string
  46. ServiceKeyName string
  47. ServiceKeySecret string
  48. SpotDataRegion string
  49. SpotDataBucket string
  50. SpotDataPrefix string
  51. ProjectID string
  52. *CustomProvider
  53. }
  54. // AWSPricing maps a k8s node to an AWS Pricing "product"
  55. type AWSPricing struct {
  56. Products map[string]*AWSProduct `json:"products"`
  57. Terms AWSPricingTerms `json:"terms"`
  58. }
  59. // AWSProduct represents a purchased SKU
  60. type AWSProduct struct {
  61. Sku string `json:"sku"`
  62. Attributes AWSProductAttributes `json:"attributes"`
  63. }
  64. // AWSProductAttributes represents metadata about the product used to map to a node.
  65. type AWSProductAttributes struct {
  66. Location string `json:"location"`
  67. InstanceType string `json:"instanceType"`
  68. Memory string `json:"memory"`
  69. Storage string `json:"storage"`
  70. VCpu string `json:"vcpu"`
  71. UsageType string `json:"usagetype"`
  72. OperatingSystem string `json:"operatingSystem"`
  73. PreInstalledSw string `json:"preInstalledSw"`
  74. InstanceFamily string `json:"instanceFamily"`
  75. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  76. }
  77. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  78. type AWSPricingTerms struct {
  79. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  80. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  81. }
  82. // AWSOfferTerm is a sku extension used to pay for the node.
  83. type AWSOfferTerm struct {
  84. Sku string `json:"sku"`
  85. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  86. }
  87. // AWSRateCode encodes data about the price of a product
  88. type AWSRateCode struct {
  89. Unit string `json:"unit"`
  90. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  91. }
  92. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  93. type AWSCurrencyCode struct {
  94. USD string `json:"USD"`
  95. }
  96. // AWSProductTerms represents the full terms of the product
  97. type AWSProductTerms struct {
  98. Sku string `json:"sku"`
  99. OnDemand *AWSOfferTerm `json:"OnDemand"`
  100. Reserved *AWSOfferTerm `json:"Reserved"`
  101. Memory string `json:"memory"`
  102. Storage string `json:"storage"`
  103. VCpu string `json:"vcpu"`
  104. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  105. }
  106. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  107. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  108. // OnDemandRateCode is appended to an node sku
  109. const OnDemandRateCode = ".JRTCKXETXF"
  110. // ReservedRateCode is appended to a node sku
  111. const ReservedRateCode = ".38NPMPTW36"
  112. // HourlyRateCode is appended to a node sku
  113. const HourlyRateCode = ".6YS6EN2CT7"
  114. // KubeAttrConversion maps the k8s labels for region to an aws region
  115. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  116. locationToRegion := map[string]string{
  117. "US East (Ohio)": "us-east-2",
  118. "US East (N. Virginia)": "us-east-1",
  119. "US West (N. California)": "us-west-1",
  120. "US West (Oregon)": "us-west-2",
  121. "Asia Pacific (Mumbai)": "ap-south-1",
  122. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  123. "Asia Pacific (Seoul)": "ap-northeast-2",
  124. "Asia Pacific (Singapore)": "ap-southeast-1",
  125. "Asia Pacific (Sydney)": "ap-southeast-2",
  126. "Asia Pacific (Tokyo)": "ap-northeast-1",
  127. "Canada (Central)": "ca-central-1",
  128. "China (Beijing)": "cn-north-1",
  129. "China (Ningxia)": "cn-northwest-1",
  130. "EU (Frankfurt)": "eu-central-1",
  131. "EU (Ireland)": "eu-west-1",
  132. "EU (London)": "eu-west-2",
  133. "EU (Paris)": "eu-west-3",
  134. "EU (Stockholm)": "eu-north-1",
  135. "South America (São Paulo)": "sa-east-1",
  136. "AWS GovCloud (US-East)": "us-gov-east-1",
  137. "AWS GovCloud (US)": "us-gov-west-1",
  138. }
  139. operatingSystem = strings.ToLower(operatingSystem)
  140. region := locationToRegion[location]
  141. return region + "," + instanceType + "," + operatingSystem
  142. }
  143. type AwsSpotFeedInfo struct {
  144. BucketName string `json:"bucketName"`
  145. Prefix string `json:"prefix"`
  146. Region string `json:"region"`
  147. AccountID string `json:"projectID"`
  148. ServiceKeyName string `json:"serviceKeyName"`
  149. ServiceKeySecret string `json:"serviceKeySecret"`
  150. SpotLabel string `json:"spotLabel"`
  151. SpotLabelValue string `json:"spotLabelValue"`
  152. }
  153. type AwsAthenaInfo struct {
  154. AthenaBucketName string `json:"athenaBucketName"`
  155. AthenaRegion string `json:"athenaRegion"`
  156. AthenaDatabase string `json:"athenaDatabase"`
  157. AthenaTable string `json:"athenaTable"`
  158. ServiceKeyName string `json:"serviceKeyName"`
  159. ServiceKeySecret string `json:"serviceKeySecret"`
  160. AccountID string `json:"projectID"`
  161. }
  162. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  163. c, err := GetDefaultPricingData("aws.json")
  164. if err != nil {
  165. return nil, err
  166. }
  167. return c, nil
  168. }
  169. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  170. c, err := GetDefaultPricingData("aws.json")
  171. if err != nil {
  172. return nil, err
  173. }
  174. if updateType == SpotInfoUpdateType {
  175. a := AwsSpotFeedInfo{}
  176. err := json.NewDecoder(r).Decode(&a)
  177. if err != nil {
  178. return nil, err
  179. }
  180. if err != nil {
  181. return nil, err
  182. }
  183. c.ServiceKeyName = a.ServiceKeyName
  184. c.ServiceKeySecret = a.ServiceKeySecret
  185. c.SpotDataPrefix = a.Prefix
  186. c.SpotDataBucket = a.BucketName
  187. c.ProjectID = a.AccountID
  188. c.SpotDataRegion = a.Region
  189. c.SpotLabel = a.SpotLabel
  190. c.SpotLabelValue = a.SpotLabelValue
  191. } else if updateType == AthenaInfoUpdateType {
  192. a := AwsAthenaInfo{}
  193. err := json.NewDecoder(r).Decode(&a)
  194. if err != nil {
  195. return nil, err
  196. }
  197. c.AthenaBucketName = a.AthenaBucketName
  198. c.AthenaRegion = a.AthenaRegion
  199. c.AthenaDatabase = a.AthenaDatabase
  200. c.AthenaTable = a.AthenaTable
  201. c.ServiceKeyName = a.ServiceKeyName
  202. c.ServiceKeySecret = a.ServiceKeySecret
  203. c.ProjectID = a.AccountID
  204. } else {
  205. a := make(map[string]string)
  206. err = json.NewDecoder(r).Decode(&a)
  207. if err != nil {
  208. return nil, err
  209. }
  210. for k, v := range a {
  211. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  212. err := SetCustomPricingField(c, kUpper, v)
  213. if err != nil {
  214. return nil, err
  215. }
  216. }
  217. }
  218. cj, err := json.Marshal(c)
  219. if err != nil {
  220. return nil, err
  221. }
  222. path := os.Getenv("CONFIG_PATH")
  223. if path == "" {
  224. path = "/models/"
  225. }
  226. path += "aws.json"
  227. err = ioutil.WriteFile(path, cj, 0644)
  228. if err != nil {
  229. return nil, err
  230. }
  231. return c, nil
  232. }
  233. type awsKey struct {
  234. SpotLabelName string
  235. SpotLabelValue string
  236. Labels map[string]string
  237. ProviderID string
  238. }
  239. func (k *awsKey) GPUType() string {
  240. return ""
  241. }
  242. func (k *awsKey) ID() string {
  243. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  244. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  245. if matchNum == 2 {
  246. return group
  247. }
  248. }
  249. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  250. return ""
  251. }
  252. func (k *awsKey) Features() string {
  253. instanceType := k.Labels[v1.LabelInstanceType]
  254. var operatingSystem string
  255. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  256. if !ok {
  257. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  258. }
  259. region := k.Labels[v1.LabelZoneRegion]
  260. key := region + "," + instanceType + "," + operatingSystem
  261. usageType := "preemptible"
  262. spotKey := key + "," + usageType
  263. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  264. return spotKey
  265. }
  266. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  267. return spotKey
  268. }
  269. return key
  270. }
  271. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  272. return nil, nil
  273. }
  274. func (key *awsPVKey) Features() string {
  275. storageClass := key.StorageClassName
  276. if storageClass == "standard" {
  277. storageClass = "pdstandard"
  278. }
  279. return key.Labels[v1.LabelZoneRegion] + storageClass
  280. }
  281. type awsPVKey struct {
  282. Labels map[string]string
  283. StorageClassParameters map[string]string
  284. StorageClassName string
  285. }
  286. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  287. return &awsPVKey{
  288. Labels: pv.Labels,
  289. StorageClassName: pv.Spec.StorageClassName,
  290. }
  291. }
  292. // GetKey maps node labels to information needed to retrieve pricing data
  293. func (aws *AWS) GetKey(labels map[string]string) Key {
  294. return &awsKey{
  295. SpotLabelName: aws.SpotLabelName,
  296. SpotLabelValue: aws.SpotLabelValue,
  297. Labels: labels,
  298. ProviderID: labels["providerID"],
  299. }
  300. }
  301. func (aws *AWS) isPreemptible(key string) bool {
  302. s := strings.Split(key, ",")
  303. if len(s) == 4 && s[3] == "preemptible" {
  304. return true
  305. }
  306. return false
  307. }
  308. // DownloadPricingData fetches data from the AWS Pricing API
  309. func (aws *AWS) DownloadPricingData() error {
  310. c, err := GetDefaultPricingData("aws.json")
  311. if err != nil {
  312. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  313. }
  314. aws.BaseCPUPrice = c.CPU
  315. aws.BaseSpotCPUPrice = c.SpotCPU
  316. aws.BaseSpotRAMPrice = c.SpotRAM
  317. aws.SpotLabelName = c.SpotLabel
  318. aws.SpotLabelValue = c.SpotLabelValue
  319. aws.SpotDataBucket = c.SpotDataBucket
  320. aws.SpotDataPrefix = c.SpotDataPrefix
  321. aws.ProjectID = c.ProjectID
  322. aws.SpotDataRegion = c.SpotDataRegion
  323. aws.ServiceKeyName = c.ServiceKeyName
  324. aws.ServiceKeySecret = c.ServiceKeySecret
  325. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  326. return fmt.Errorf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  327. }
  328. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  329. if err != nil {
  330. return err
  331. }
  332. inputkeys := make(map[string]bool)
  333. for _, n := range nodeList.Items {
  334. labels := n.GetObjectMeta().GetLabels()
  335. key := aws.GetKey(labels)
  336. inputkeys[key.Features()] = true
  337. }
  338. aws.Pricing = make(map[string]*AWSProductTerms)
  339. aws.ValidPricingKeys = make(map[string]bool)
  340. skusToKeys := make(map[string]string)
  341. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  342. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  343. resp, err := http.Get(pricingURL)
  344. if err != nil {
  345. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  346. return err
  347. }
  348. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  349. dec := json.NewDecoder(resp.Body)
  350. for {
  351. t, err := dec.Token()
  352. if err == io.EOF {
  353. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  354. break
  355. }
  356. if t == "products" {
  357. _, err := dec.Token() // this should parse the opening "{""
  358. if err != nil {
  359. return err
  360. }
  361. for dec.More() {
  362. _, err := dec.Token() // the sku token
  363. if err != nil {
  364. return err
  365. }
  366. product := &AWSProduct{}
  367. err = dec.Decode(&product)
  368. if err != nil {
  369. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  370. break
  371. }
  372. if product.Attributes.PreInstalledSw == "NA" &&
  373. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  374. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  375. spotKey := key + ",preemptible"
  376. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  377. productTerms := &AWSProductTerms{
  378. Sku: product.Sku,
  379. Memory: product.Attributes.Memory,
  380. Storage: product.Attributes.Storage,
  381. VCpu: product.Attributes.VCpu,
  382. GPU: product.Attributes.GPU,
  383. }
  384. aws.Pricing[key] = productTerms
  385. aws.Pricing[spotKey] = productTerms
  386. skusToKeys[product.Sku] = key
  387. }
  388. aws.ValidPricingKeys[key] = true
  389. aws.ValidPricingKeys[spotKey] = true
  390. }
  391. }
  392. }
  393. if t == "terms" {
  394. _, err := dec.Token() // this should parse the opening "{""
  395. if err != nil {
  396. return err
  397. }
  398. termType, err := dec.Token()
  399. if err != nil {
  400. return err
  401. }
  402. if termType == "OnDemand" {
  403. _, err := dec.Token()
  404. if err != nil { // again, should parse an opening "{"
  405. return err
  406. }
  407. for dec.More() {
  408. sku, err := dec.Token()
  409. if err != nil {
  410. return err
  411. }
  412. _, err = dec.Token() // another opening "{"
  413. if err != nil {
  414. return err
  415. }
  416. skuOnDemand, err := dec.Token()
  417. if err != nil {
  418. return err
  419. }
  420. offerTerm := &AWSOfferTerm{}
  421. err = dec.Decode(&offerTerm)
  422. if err != nil {
  423. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  424. }
  425. if sku.(string)+OnDemandRateCode == skuOnDemand {
  426. key, ok := skusToKeys[sku.(string)]
  427. spotKey := key + ",preemptible"
  428. if ok {
  429. aws.Pricing[key].OnDemand = offerTerm
  430. aws.Pricing[spotKey].OnDemand = offerTerm
  431. }
  432. }
  433. _, err = dec.Token()
  434. if err != nil {
  435. return err
  436. }
  437. }
  438. _, err = dec.Token()
  439. if err != nil {
  440. return err
  441. }
  442. }
  443. }
  444. }
  445. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  446. if err != nil {
  447. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  448. } else {
  449. aws.SpotPricingByInstanceID = sp
  450. }
  451. return nil
  452. }
  453. // AllNodePricing returns all the billing data fetched.
  454. func (aws *AWS) AllNodePricing() (interface{}, error) {
  455. return aws.Pricing, nil
  456. }
  457. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  458. key := k.Features()
  459. if aws.isPreemptible(key) {
  460. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  461. var spotcost string
  462. arr := strings.Split(spotInfo.Charge, " ")
  463. if len(arr) == 2 {
  464. spotcost = arr[0]
  465. } else {
  466. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  467. }
  468. return &Node{
  469. Cost: spotcost,
  470. VCPU: terms.VCpu,
  471. RAM: terms.Memory,
  472. GPU: terms.GPU,
  473. Storage: terms.Storage,
  474. BaseCPUPrice: aws.BaseCPUPrice,
  475. UsageType: usageType,
  476. }, nil
  477. }
  478. return &Node{
  479. VCPU: terms.VCpu,
  480. VCPUCost: aws.BaseSpotCPUPrice,
  481. RAM: terms.Memory,
  482. GPU: terms.GPU,
  483. RAMCost: aws.BaseSpotRAMPrice,
  484. Storage: terms.Storage,
  485. BaseCPUPrice: aws.BaseCPUPrice,
  486. UsageType: usageType,
  487. }, nil
  488. }
  489. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  490. if !ok {
  491. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  492. }
  493. cost := c.PricePerUnit.USD
  494. return &Node{
  495. Cost: cost,
  496. VCPU: terms.VCpu,
  497. RAM: terms.Memory,
  498. GPU: terms.GPU,
  499. Storage: terms.Storage,
  500. BaseCPUPrice: aws.BaseCPUPrice,
  501. UsageType: usageType,
  502. }, nil
  503. }
  504. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  505. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  506. //return json.Marshal(aws.Pricing[key])
  507. key := k.Features()
  508. usageType := "ondemand"
  509. if aws.isPreemptible(key) {
  510. usageType = "preemptible"
  511. }
  512. terms, ok := aws.Pricing[key]
  513. if ok {
  514. return aws.createNode(terms, usageType, k)
  515. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  516. err := aws.DownloadPricingData()
  517. if err != nil {
  518. return nil, err
  519. }
  520. terms, termsOk := aws.Pricing[key]
  521. if !termsOk {
  522. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  523. }
  524. return aws.createNode(terms, usageType, k)
  525. } else { // Fall back to base pricing if we can't find the key.
  526. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  527. return &Node{
  528. Cost: aws.BaseCPUPrice,
  529. BaseCPUPrice: aws.BaseCPUPrice,
  530. UsageType: usageType,
  531. UsesBaseCPUPrice: true,
  532. }, nil
  533. }
  534. }
  535. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  536. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  537. defaultClusterName := "AWS Cluster #1"
  538. makeStructure := func(clusterName string) ([]byte, error) {
  539. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  540. m := make(map[string]string)
  541. m["name"] = clusterName
  542. m["provider"] = "AWS"
  543. return json.Marshal(m)
  544. }
  545. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  546. if len(maybeClusterId) != 0 {
  547. return makeStructure(maybeClusterId)
  548. }
  549. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  550. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  551. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  552. if err != nil {
  553. return nil, err
  554. }
  555. for _, n := range nodeList.Items {
  556. region := ""
  557. instanceId := ""
  558. providerId := n.Spec.ProviderID
  559. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  560. if matchNum == 1 {
  561. region = group
  562. } else if matchNum == 2 {
  563. instanceId = group
  564. }
  565. }
  566. if len(instanceId) == 0 {
  567. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  568. continue
  569. }
  570. c := &aws.Config{
  571. Region: aws.String(region),
  572. }
  573. s := session.Must(session.NewSession(c))
  574. ec2Svc := ec2.New(s)
  575. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  576. InstanceIds: []*string{
  577. aws.String(instanceId),
  578. },
  579. })
  580. if diErr != nil {
  581. // maybe log this?
  582. continue
  583. }
  584. if len(di.Reservations) != 1 {
  585. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  586. continue
  587. }
  588. res := di.Reservations[0]
  589. if len(res.Instances) != 1 {
  590. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  591. continue
  592. }
  593. inst := res.Instances[0]
  594. for _, tag := range inst.Tags {
  595. tagKey := *tag.Key
  596. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  597. if matchNum != 1 {
  598. continue
  599. }
  600. return makeStructure(group)
  601. }
  602. }
  603. }
  604. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  605. return makeStructure(defaultClusterName)
  606. }
  607. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  608. func (*AWS) AddServiceKey(formValues url.Values) error {
  609. keyID := formValues.Get("access_key_ID")
  610. key := formValues.Get("secret_access_key")
  611. m := make(map[string]string)
  612. m["access_key_ID"] = keyID
  613. m["secret_access_key"] = key
  614. result, err := json.Marshal(m)
  615. if err != nil {
  616. return err
  617. }
  618. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  619. }
  620. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  621. func (*AWS) GetDisks() ([]byte, error) {
  622. jsonFile, err := os.Open("/var/configs/key.json")
  623. if err == nil {
  624. byteValue, _ := ioutil.ReadAll(jsonFile)
  625. var result map[string]string
  626. err := json.Unmarshal([]byte(byteValue), &result)
  627. if err != nil {
  628. return nil, err
  629. }
  630. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  631. if err != nil {
  632. return nil, err
  633. }
  634. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  635. if err != nil {
  636. return nil, err
  637. }
  638. } else if os.IsNotExist(err) {
  639. klog.V(2).Infof("Using Default Credentials")
  640. } else {
  641. return nil, err
  642. }
  643. defer jsonFile.Close()
  644. clusterConfig, err := os.Open("/var/configs/cluster.json")
  645. if err != nil {
  646. return nil, err
  647. }
  648. defer clusterConfig.Close()
  649. b, err := ioutil.ReadAll(clusterConfig)
  650. if err != nil {
  651. return nil, err
  652. }
  653. var clusterConf map[string]string
  654. err = json.Unmarshal([]byte(b), &clusterConf)
  655. if err != nil {
  656. return nil, err
  657. }
  658. region := aws.String(clusterConf["region"])
  659. c := &aws.Config{
  660. Region: region,
  661. }
  662. s := session.Must(session.NewSession(c))
  663. ec2Svc := ec2.New(s)
  664. input := &ec2.DescribeVolumesInput{}
  665. volumeResult, err := ec2Svc.DescribeVolumes(input)
  666. if err != nil {
  667. if aerr, ok := err.(awserr.Error); ok {
  668. switch aerr.Code() {
  669. default:
  670. return nil, aerr
  671. }
  672. } else {
  673. return nil, err
  674. }
  675. }
  676. return json.Marshal(volumeResult)
  677. }
  678. // ConvertToGlueColumnFormat takes a string and runs through various regex
  679. // and string replacement statements to convert it to a format compatible
  680. // with AWS Glue and Athena column names.
  681. // Following guidance from AWS provided here ('Column Names' section):
  682. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  683. // It returns a string containing the column name in proper column name format and length.
  684. func ConvertToGlueColumnFormat(column_name string) string {
  685. klog.V(2).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  686. // An underscore is added in front of uppercase letters
  687. capital_underscore := regexp.MustCompile(`[A-Z]`)
  688. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  689. // Any non-alphanumeric characters are replaced with an underscore
  690. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  691. final = no_space_punc.ReplaceAllString(final, "_")
  692. // Duplicate underscores are removed
  693. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  694. final = no_dup_underscore.ReplaceAllString(final, "_")
  695. // Any leading and trailing underscores are removed
  696. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  697. final = no_front_end_underscore.ReplaceAllString(final, "")
  698. // Uppercase to lowercase
  699. final = strings.ToLower(final)
  700. // Longer column name than expected - remove _ left to right
  701. allowed_col_len := 128
  702. undersc_to_remove := len(final) - allowed_col_len
  703. if undersc_to_remove > 0 {
  704. final = strings.Replace(final, "_", "", undersc_to_remove)
  705. }
  706. // If removing all of the underscores still didn't
  707. // make the column name < 128 characters, trim it!
  708. if len(final) > allowed_col_len {
  709. final = final[:allowed_col_len]
  710. }
  711. klog.V(2).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  712. return final
  713. }
  714. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  715. // "start" and "end" are dates of the format YYYY-MM-DD
  716. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  717. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  718. customPricing, err := a.GetConfig()
  719. if err != nil {
  720. return nil, err
  721. }
  722. aggregator_column_name := "resource_tags_user_kubernetes_" + aggregator
  723. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  724. query := fmt.Sprintf(`SELECT
  725. CAST(line_item_usage_start_date AS DATE) as start_date,
  726. %s,
  727. line_item_product_code,
  728. SUM(line_item_blended_cost) as blended_cost
  729. FROM %s as cost_data
  730. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  731. GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
  732. if customPricing.ServiceKeyName != "" {
  733. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  734. if err != nil {
  735. return nil, err
  736. }
  737. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  738. if err != nil {
  739. return nil, err
  740. }
  741. }
  742. region := aws.String(customPricing.AthenaRegion)
  743. resultsBucket := customPricing.AthenaBucketName
  744. database := customPricing.AthenaDatabase
  745. c := &aws.Config{
  746. Region: region,
  747. }
  748. s := session.Must(session.NewSession(c))
  749. svc := athena.New(s)
  750. var e athena.StartQueryExecutionInput
  751. var r athena.ResultConfiguration
  752. r.SetOutputLocation(resultsBucket)
  753. e.SetResultConfiguration(&r)
  754. e.SetQueryString(query)
  755. var q athena.QueryExecutionContext
  756. q.SetDatabase(database)
  757. e.SetQueryExecutionContext(&q)
  758. res, err := svc.StartQueryExecution(&e)
  759. if err != nil {
  760. return nil, err
  761. }
  762. klog.V(2).Infof("StartQueryExecution result:")
  763. klog.V(2).Infof(res.GoString())
  764. var qri athena.GetQueryExecutionInput
  765. qri.SetQueryExecutionId(*res.QueryExecutionId)
  766. var qrop *athena.GetQueryExecutionOutput
  767. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  768. for {
  769. qrop, err = svc.GetQueryExecution(&qri)
  770. if err != nil {
  771. return nil, err
  772. }
  773. if *qrop.QueryExecution.Status.State != "RUNNING" {
  774. break
  775. }
  776. time.Sleep(duration)
  777. }
  778. var oocAllocs []*OutOfClusterAllocation
  779. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  780. var ip athena.GetQueryResultsInput
  781. ip.SetQueryExecutionId(*res.QueryExecutionId)
  782. op, err := svc.GetQueryResults(&ip)
  783. if err != nil {
  784. return nil, err
  785. }
  786. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  787. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  788. if err != nil {
  789. return nil, err
  790. }
  791. ooc := &OutOfClusterAllocation{
  792. Aggregator: aggregator,
  793. Environment: *r.Data[1].VarCharValue,
  794. Service: *r.Data[2].VarCharValue,
  795. Cost: cost,
  796. }
  797. oocAllocs = append(oocAllocs, ooc)
  798. }
  799. }
  800. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  801. }
  802. // QuerySQL can query a properly configured Athena database.
  803. // Used to fetch billing data.
  804. // Requires a json config in /var/configs with key region, output, and database.
  805. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  806. customPricing, err := a.GetConfig()
  807. if err != nil {
  808. return nil, err
  809. }
  810. if customPricing.ServiceKeyName != "" {
  811. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  812. if err != nil {
  813. return nil, err
  814. }
  815. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  816. if err != nil {
  817. return nil, err
  818. }
  819. }
  820. athenaConfigs, err := os.Open("/var/configs/athena.json")
  821. if err != nil {
  822. return nil, err
  823. }
  824. defer athenaConfigs.Close()
  825. b, err := ioutil.ReadAll(athenaConfigs)
  826. if err != nil {
  827. return nil, err
  828. }
  829. var athenaConf map[string]string
  830. json.Unmarshal([]byte(b), &athenaConf)
  831. region := aws.String(customPricing.AthenaRegion)
  832. resultsBucket := customPricing.AthenaBucketName
  833. database := customPricing.AthenaDatabase
  834. c := &aws.Config{
  835. Region: region,
  836. }
  837. s := session.Must(session.NewSession(c))
  838. svc := athena.New(s)
  839. var e athena.StartQueryExecutionInput
  840. var r athena.ResultConfiguration
  841. r.SetOutputLocation(resultsBucket)
  842. e.SetResultConfiguration(&r)
  843. e.SetQueryString(query)
  844. var q athena.QueryExecutionContext
  845. q.SetDatabase(database)
  846. e.SetQueryExecutionContext(&q)
  847. res, err := svc.StartQueryExecution(&e)
  848. if err != nil {
  849. return nil, err
  850. }
  851. klog.V(2).Infof("StartQueryExecution result:")
  852. klog.V(2).Infof(res.GoString())
  853. var qri athena.GetQueryExecutionInput
  854. qri.SetQueryExecutionId(*res.QueryExecutionId)
  855. var qrop *athena.GetQueryExecutionOutput
  856. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  857. for {
  858. qrop, err = svc.GetQueryExecution(&qri)
  859. if err != nil {
  860. return nil, err
  861. }
  862. if *qrop.QueryExecution.Status.State != "RUNNING" {
  863. break
  864. }
  865. time.Sleep(duration)
  866. }
  867. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  868. var ip athena.GetQueryResultsInput
  869. ip.SetQueryExecutionId(*res.QueryExecutionId)
  870. op, err := svc.GetQueryResults(&ip)
  871. if err != nil {
  872. return nil, err
  873. }
  874. b, err := json.Marshal(op.ResultSet)
  875. if err != nil {
  876. return nil, err
  877. }
  878. return b, nil
  879. }
  880. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  881. }
  882. type spotInfo struct {
  883. Timestamp string `csv:"Timestamp"`
  884. UsageType string `csv:"UsageType"`
  885. Operation string `csv:"Operation"`
  886. InstanceID string `csv:"InstanceID"`
  887. MyBidID string `csv:"MyBidID"`
  888. MyMaxPrice string `csv:"MyMaxPrice"`
  889. MarketPrice string `csv:"MarketPrice"`
  890. Charge string `csv:"Charge"`
  891. Version string `csv:"Version"`
  892. }
  893. type fnames []*string
  894. func (f fnames) Len() int {
  895. return len(f)
  896. }
  897. func (f fnames) Swap(i, j int) {
  898. f[i], f[j] = f[j], f[i]
  899. }
  900. func (f fnames) Less(i, j int) bool {
  901. key1 := strings.Split(*f[i], ".")
  902. key2 := strings.Split(*f[j], ".")
  903. t1, err := time.Parse("2006-01-02-15", key1[1])
  904. if err != nil {
  905. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  906. return false
  907. }
  908. t2, err := time.Parse("2006-01-02-15", key2[1])
  909. if err != nil {
  910. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  911. return false
  912. }
  913. return t1.Before(t2)
  914. }
  915. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  916. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  917. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  918. if err != nil {
  919. return nil, err
  920. }
  921. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  922. if err != nil {
  923. return nil, err
  924. }
  925. }
  926. s3Prefix := projectID
  927. if len(prefix) != 0 {
  928. s3Prefix = prefix + "/" + s3Prefix
  929. }
  930. c := aws.NewConfig().WithRegion(region)
  931. s := session.Must(session.NewSession(c))
  932. s3Svc := s3.New(s)
  933. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  934. tNow := time.Now()
  935. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  936. ls := &s3.ListObjectsInput{
  937. Bucket: aws.String(bucket),
  938. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  939. }
  940. ls2 := &s3.ListObjectsInput{
  941. Bucket: aws.String(bucket),
  942. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  943. }
  944. lso, err := s3Svc.ListObjects(ls)
  945. if err != nil {
  946. return nil, err
  947. }
  948. lsoLen := len(lso.Contents)
  949. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  950. if lsoLen == 0 {
  951. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  952. }
  953. lso2, err := s3Svc.ListObjects(ls2)
  954. if err != nil {
  955. return nil, err
  956. }
  957. lso2Len := len(lso2.Contents)
  958. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  959. if lso2Len == 0 {
  960. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  961. }
  962. var keys []*string
  963. for _, obj := range lso.Contents {
  964. keys = append(keys, obj.Key)
  965. }
  966. for _, obj := range lso2.Contents {
  967. keys = append(keys, obj.Key)
  968. }
  969. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  970. header, err := csvutil.Header(spotInfo{}, "csv")
  971. if err != nil {
  972. return nil, err
  973. }
  974. fieldsPerRecord := len(header)
  975. spots := make(map[string]*spotInfo)
  976. for _, key := range keys {
  977. getObj := &s3.GetObjectInput{
  978. Bucket: aws.String(bucket),
  979. Key: key,
  980. }
  981. buf := aws.NewWriteAtBuffer([]byte{})
  982. _, err := downloader.Download(buf, getObj)
  983. if err != nil {
  984. return nil, err
  985. }
  986. r := bytes.NewReader(buf.Bytes())
  987. gr, err := gzip.NewReader(r)
  988. if err != nil {
  989. return nil, err
  990. }
  991. csvReader := csv.NewReader(gr)
  992. csvReader.Comma = '\t'
  993. csvReader.FieldsPerRecord = fieldsPerRecord
  994. dec, err := csvutil.NewDecoder(csvReader, header...)
  995. if err != nil {
  996. return nil, err
  997. }
  998. var foundVersion string
  999. for {
  1000. spot := spotInfo{}
  1001. err := dec.Decode(&spot)
  1002. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1003. if err == io.EOF {
  1004. break
  1005. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1006. rec := dec.Record()
  1007. // the first two "Record()" will be the comment lines
  1008. // and they show up as len() == 1
  1009. // the first of which is "#Version"
  1010. // the second of which is "#Fields: "
  1011. if len(rec) != 1 {
  1012. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1013. continue
  1014. }
  1015. if len(foundVersion) == 0 {
  1016. spotFeedVersion := rec[0]
  1017. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1018. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1019. if matches != nil {
  1020. foundVersion = matches[1]
  1021. if foundVersion != supportedSpotFeedVersion {
  1022. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1023. break
  1024. }
  1025. }
  1026. continue
  1027. } else if strings.Index(rec[0], "#") == 0 {
  1028. continue
  1029. } else {
  1030. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1031. continue
  1032. }
  1033. } else if err != nil {
  1034. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1035. continue
  1036. }
  1037. klog.V(3).Infof("Found spot info %+v", spot)
  1038. spots[spot.InstanceID] = &spot
  1039. }
  1040. gr.Close()
  1041. }
  1042. return spots, nil
  1043. }