awsprovider.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. package cloud
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "strings"
  13. "time"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/awserr"
  16. "github.com/aws/aws-sdk-go/aws/credentials"
  17. "github.com/aws/aws-sdk-go/aws/session"
  18. "github.com/aws/aws-sdk-go/service/athena"
  19. "github.com/aws/aws-sdk-go/service/ec2"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/client-go/kubernetes"
  22. )
  23. // AWS represents an Amazon Provider
  24. type AWS struct {
  25. Pricing map[string]*AWSProductTerms
  26. ValidPricingKeys map[string]bool
  27. Clientset *kubernetes.Clientset
  28. BaseCPUPrice string
  29. BaseSpotCPUPrice string
  30. BaseSpotRAMPrice string
  31. }
  32. // AWSPricing maps a k8s node to an AWS Pricing "product"
  33. type AWSPricing struct {
  34. Products map[string]*AWSProduct `json:"products"`
  35. Terms AWSPricingTerms `json:"terms"`
  36. }
  37. // AWSProduct represents a purchased SKU
  38. type AWSProduct struct {
  39. Sku string `json:"sku"`
  40. Attributes AWSProductAttributes `json:"attributes"`
  41. }
  42. // AWSProductAttributes represents metadata about the product used to map to a node.
  43. type AWSProductAttributes struct {
  44. Location string `json:"location"`
  45. InstanceType string `json:"instanceType"`
  46. Memory string `json:"memory"`
  47. Storage string `json:"storage"`
  48. VCpu string `json:"vcpu"`
  49. UsageType string `json:"usagetype"`
  50. OperatingSystem string `json:"operatingSystem"`
  51. PreInstalledSw string `json:"preInstalledSw"`
  52. }
  53. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  54. type AWSPricingTerms struct {
  55. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  56. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  57. }
  58. // AWSOfferTerm is a sku extension used to pay for the node.
  59. type AWSOfferTerm struct {
  60. Sku string `json:"sku"`
  61. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  62. }
  63. // AWSRateCode encodes data about the price of a product
  64. type AWSRateCode struct {
  65. Unit string `json:"unit"`
  66. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  67. }
  68. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  69. type AWSCurrencyCode struct {
  70. USD string `json:"USD"`
  71. }
  72. // AWSProductTerms represents the full terms of the product
  73. type AWSProductTerms struct {
  74. Sku string `json:"sku"`
  75. OnDemand *AWSOfferTerm `json:"OnDemand"`
  76. Reserved *AWSOfferTerm `json:"Reserved"`
  77. Memory string `json:"memory"`
  78. Storage string `json:"storage"`
  79. VCpu string `json:"vcpu"`
  80. }
  81. // OnDemandRateCode is appended to an node sku
  82. const OnDemandRateCode = ".JRTCKXETXF"
  83. // ReservedRateCode is appended to a node sku
  84. const ReservedRateCode = ".38NPMPTW36"
  85. // HourlyRateCode is appended to a node sku
  86. const HourlyRateCode = ".6YS6EN2CT7"
  87. // KubeAttrConversion maps the k8s labels for region to an aws region
  88. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  89. locationToRegion := map[string]string{
  90. "US East (Ohio)": "us-east-2",
  91. "US East (N. Virginia)": "us-east-1",
  92. "US West (N. California)": "us-west-1",
  93. "US West (Oregon)": "us-west-2",
  94. "Asia Pacific (Mumbai)": "ap-south-1",
  95. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  96. "Asia Pacific (Seoul)": "ap-northeast-2",
  97. "Asia Pacific (Singapore)": "ap-southeast-1",
  98. "Asia Pacific (Sydney)": "ap-southeast-2",
  99. "Asia Pacific (Tokyo)": "ap-northeast-1",
  100. "Canada (Central)": "ca-central-1",
  101. "China (Beijing)": "cn-north-1",
  102. "China (Ningxia)": "cn-northwest-1",
  103. "EU (Frankfurt)": "eu-central-1",
  104. "EU (Ireland)": "eu-west-1",
  105. "EU (London)": "eu-west-2",
  106. "EU (Paris)": "eu-west-3",
  107. "EU (Stockholm)": "eu-north-1",
  108. "South America (São Paulo)": "sa-east-1",
  109. "AWS GovCloud (US-East)": "us-gov-east-1",
  110. "AWS GovCloud (US)": "us-gov-west-1",
  111. }
  112. operatingSystem = strings.ToLower(operatingSystem)
  113. region := locationToRegion[location]
  114. return region + "," + instanceType + "," + operatingSystem
  115. }
  116. // GetKey maps node labels to information needed to retrieve pricing data
  117. func (aws *AWS) GetKey(labels map[string]string) string {
  118. instanceType := labels["beta.kubernetes.io/instance-type"]
  119. operatingSystem := labels["beta.kubernetes.io/os"]
  120. region := labels["failure-domain.beta.kubernetes.io/region"]
  121. if l, ok := labels["lifecycle"]; ok && l == "EC2Spot" {
  122. usageType := "preemptible"
  123. return region + "," + instanceType + "," + operatingSystem + "," + usageType
  124. }
  125. return region + "," + instanceType + "," + operatingSystem
  126. }
  127. func (aws *AWS) isPreemptible(key string) bool {
  128. s := strings.Split(key, ",")
  129. if len(s) == 4 && s[3] == "preemptible" {
  130. return true
  131. }
  132. return false
  133. }
  134. // DownloadPricingData fetches data from the AWS Pricing API
  135. func (aws *AWS) DownloadPricingData() error {
  136. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  137. if err != nil {
  138. return err
  139. }
  140. inputkeys := make(map[string]bool)
  141. for _, n := range nodeList.Items {
  142. labels := n.GetObjectMeta().GetLabels()
  143. key := aws.GetKey(labels)
  144. inputkeys[key] = true
  145. }
  146. aws.Pricing = make(map[string]*AWSProductTerms)
  147. aws.ValidPricingKeys = make(map[string]bool)
  148. skusToKeys := make(map[string]string)
  149. resp, err := http.Get("https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json")
  150. if err != nil {
  151. return err
  152. }
  153. dec := json.NewDecoder(resp.Body)
  154. for {
  155. t, err := dec.Token()
  156. if err == io.EOF {
  157. fmt.Printf("done \n")
  158. break
  159. }
  160. if t == "products" {
  161. dec.Token() //{
  162. for dec.More() {
  163. dec.Token() // the sku token
  164. product := &AWSProduct{}
  165. err := dec.Decode(&product)
  166. if err != nil {
  167. fmt.Printf("Error: " + err.Error())
  168. break
  169. }
  170. if product.Attributes.PreInstalledSw == "NA" &&
  171. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  172. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  173. spotKey := key + ",preemptible"
  174. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  175. aws.Pricing[key] = &AWSProductTerms{
  176. Sku: product.Sku,
  177. Memory: product.Attributes.Memory,
  178. Storage: product.Attributes.Storage,
  179. VCpu: product.Attributes.VCpu,
  180. }
  181. skusToKeys[product.Sku] = key
  182. }
  183. aws.ValidPricingKeys[key] = true
  184. aws.ValidPricingKeys[spotKey] = true
  185. }
  186. }
  187. }
  188. if t == "terms" {
  189. dec.Token()
  190. termType, _ := dec.Token()
  191. if termType == "OnDemand" {
  192. dec.Token()
  193. for dec.More() {
  194. sku, _ := dec.Token()
  195. dec.Token()
  196. skuOnDemand, _ := dec.Token()
  197. offerTerm := &AWSOfferTerm{}
  198. err := dec.Decode(&offerTerm)
  199. if err != nil {
  200. fmt.Printf("Error: " + err.Error())
  201. }
  202. if sku.(string)+OnDemandRateCode == skuOnDemand {
  203. key, ok := skusToKeys[sku.(string)]
  204. if ok {
  205. aws.Pricing[key].OnDemand = offerTerm
  206. }
  207. }
  208. dec.Token()
  209. }
  210. dec.Token()
  211. }
  212. }
  213. }
  214. if err != nil {
  215. return err
  216. }
  217. c, err := GetDefaultPricingData("default.json")
  218. if err != nil {
  219. log.Printf("Error downloading default pricing data: %s", err.Error())
  220. }
  221. aws.BaseCPUPrice = c.CPU
  222. aws.BaseSpotCPUPrice = c.SpotCPU
  223. aws.BaseSpotRAMPrice = c.SpotRAM
  224. return nil
  225. }
  226. // AllNodePricing returns all the billing data fetched.
  227. func (aws *AWS) AllNodePricing() (interface{}, error) {
  228. return aws.Pricing, nil
  229. }
  230. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  231. func (aws *AWS) NodePricing(key string) (*Node, error) {
  232. //return json.Marshal(aws.Pricing[key])
  233. usageType := "ondemand"
  234. if aws.isPreemptible(key) {
  235. usageType = "preemptible"
  236. }
  237. terms, ok := aws.Pricing[key]
  238. if ok {
  239. if aws.isPreemptible(key) {
  240. return &Node{
  241. VCPU: terms.VCpu,
  242. VCPUCost: aws.BaseSpotCPUPrice,
  243. RAM: terms.Memory,
  244. RAMCost: aws.BaseSpotRAMPrice,
  245. Storage: terms.Storage,
  246. BaseCPUPrice: aws.BaseCPUPrice,
  247. UsageType: usageType,
  248. }, nil
  249. }
  250. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  251. return &Node{
  252. Cost: cost,
  253. VCPU: terms.VCpu,
  254. RAM: terms.Memory,
  255. Storage: terms.Storage,
  256. BaseCPUPrice: aws.BaseCPUPrice,
  257. UsageType: usageType,
  258. }, nil
  259. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  260. err := aws.DownloadPricingData()
  261. if err != nil {
  262. return nil, err
  263. }
  264. terms := aws.Pricing[key]
  265. if aws.isPreemptible(key) {
  266. return &Node{
  267. VCPU: terms.VCpu,
  268. VCPUCost: aws.BaseSpotCPUPrice,
  269. RAM: terms.Memory,
  270. RAMCost: aws.BaseSpotRAMPrice,
  271. Storage: terms.Storage,
  272. BaseCPUPrice: aws.BaseCPUPrice,
  273. UsageType: usageType,
  274. }, nil
  275. }
  276. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  277. return &Node{
  278. Cost: cost,
  279. VCPU: terms.VCpu,
  280. RAM: terms.Memory,
  281. Storage: terms.Storage,
  282. BaseCPUPrice: aws.BaseCPUPrice,
  283. UsageType: usageType,
  284. }, nil
  285. } else {
  286. return nil, errors.New("Invalid Pricing Key: " + key + "\n")
  287. }
  288. }
  289. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  290. func (*AWS) ClusterName() ([]byte, error) {
  291. attribute := "AWS Cluster #1"
  292. m := make(map[string]string)
  293. m["name"] = attribute
  294. m["provider"] = "AWS"
  295. return json.Marshal(m)
  296. }
  297. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  298. func (*AWS) AddServiceKey(formValues url.Values) error {
  299. keyID := formValues.Get("access_key_ID")
  300. key := formValues.Get("secret_access_key")
  301. m := make(map[string]string)
  302. m["access_key_ID"] = keyID
  303. m["secret_access_key"] = key
  304. json, err := json.Marshal(m)
  305. if err != nil {
  306. return err
  307. }
  308. return ioutil.WriteFile("/var/configs/key.json", json, 0644)
  309. }
  310. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  311. func (*AWS) GetDisks() ([]byte, error) {
  312. jsonFile, err := os.Open("/var/configs/key.json")
  313. if err == nil {
  314. byteValue, _ := ioutil.ReadAll(jsonFile)
  315. var result map[string]string
  316. json.Unmarshal([]byte(byteValue), &result)
  317. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  318. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  319. } else if os.IsNotExist(err) {
  320. log.Printf("Using Default Credentials")
  321. } else {
  322. return nil, err
  323. }
  324. defer jsonFile.Close()
  325. clusterConfig, err := os.Open("/var/configs/cluster.json")
  326. if err != nil {
  327. return nil, err
  328. }
  329. defer clusterConfig.Close()
  330. bytes, _ := ioutil.ReadAll(clusterConfig)
  331. var clusterConf map[string]string
  332. json.Unmarshal([]byte(bytes), &clusterConf)
  333. region := aws.String(clusterConf["region"])
  334. c := &aws.Config{
  335. Region: region,
  336. Credentials: credentials.NewEnvCredentials(),
  337. }
  338. s := session.Must(session.NewSession(c))
  339. ec2Svc := ec2.New(s)
  340. input := &ec2.DescribeVolumesInput{}
  341. volumeResult, err := ec2Svc.DescribeVolumes(input)
  342. if err != nil {
  343. if aerr, ok := err.(awserr.Error); ok {
  344. switch aerr.Code() {
  345. default:
  346. return nil, aerr
  347. }
  348. } else {
  349. return nil, err
  350. }
  351. }
  352. return json.Marshal(volumeResult)
  353. }
  354. // QuerySQL can query a properly configured Athena database.
  355. // Used to fetch billing data.
  356. // Requires a json config in /var/configs with key region, output, and database.
  357. func (*AWS) QuerySQL(query string) ([]byte, error) {
  358. jsonFile, err := os.Open("/var/configs/key.json")
  359. if err == nil {
  360. byteValue, _ := ioutil.ReadAll(jsonFile)
  361. var result map[string]string
  362. json.Unmarshal([]byte(byteValue), &result)
  363. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  364. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  365. } else if os.IsNotExist(err) {
  366. log.Printf("Using Default Credentials")
  367. } else {
  368. return nil, err
  369. }
  370. defer jsonFile.Close()
  371. athenaConfigs, err := os.Open("/var/configs/athena.json")
  372. if err != nil {
  373. return nil, err
  374. }
  375. defer athenaConfigs.Close()
  376. bytes, _ := ioutil.ReadAll(athenaConfigs)
  377. var athenaConf map[string]string
  378. json.Unmarshal([]byte(bytes), &athenaConf)
  379. region := aws.String(athenaConf["region"])
  380. resultsBucket := athenaConf["output"]
  381. database := athenaConf["database"]
  382. c := &aws.Config{
  383. Region: region,
  384. Credentials: credentials.NewEnvCredentials(),
  385. }
  386. s := session.Must(session.NewSession(c))
  387. svc := athena.New(s)
  388. var e athena.StartQueryExecutionInput
  389. var r athena.ResultConfiguration
  390. r.SetOutputLocation(resultsBucket)
  391. e.SetResultConfiguration(&r)
  392. e.SetQueryString(query)
  393. var q athena.QueryExecutionContext
  394. q.SetDatabase(database)
  395. e.SetQueryExecutionContext(&q)
  396. res, err := svc.StartQueryExecution(&e)
  397. if err != nil {
  398. return nil, err
  399. }
  400. fmt.Println("StartQueryExecution result:")
  401. fmt.Println(res.GoString())
  402. var qri athena.GetQueryExecutionInput
  403. qri.SetQueryExecutionId(*res.QueryExecutionId)
  404. var qrop *athena.GetQueryExecutionOutput
  405. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  406. for {
  407. qrop, err = svc.GetQueryExecution(&qri)
  408. if err != nil {
  409. return nil, err
  410. }
  411. if *qrop.QueryExecution.Status.State != "RUNNING" {
  412. break
  413. }
  414. time.Sleep(duration)
  415. }
  416. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  417. var ip athena.GetQueryResultsInput
  418. ip.SetQueryExecutionId(*res.QueryExecutionId)
  419. op, err := svc.GetQueryResults(&ip)
  420. if err != nil {
  421. return nil, err
  422. }
  423. bytes, err := json.Marshal(op.ResultSet)
  424. if err != nil {
  425. return nil, err
  426. }
  427. return bytes, nil
  428. }
  429. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  430. }