awsprovider.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. package cloud
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "regexp"
  12. "strings"
  13. "time"
  14. "github.com/aws/aws-sdk-go/aws"
  15. "github.com/aws/aws-sdk-go/aws/awserr"
  16. "github.com/aws/aws-sdk-go/aws/credentials"
  17. "github.com/aws/aws-sdk-go/aws/session"
  18. "github.com/aws/aws-sdk-go/service/athena"
  19. "github.com/aws/aws-sdk-go/service/ec2"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/client-go/kubernetes"
  22. )
  23. // AWS represents an Amazon Provider
  24. type AWS struct {
  25. Pricing map[string]*AWSProductTerms
  26. ValidPricingKeys map[string]bool
  27. Clientset *kubernetes.Clientset
  28. BaseCPUPrice string
  29. BaseSpotCPUPrice string
  30. BaseSpotRAMPrice string
  31. SpotLabelName string
  32. SpotLabelValue string
  33. }
  34. // AWSPricing maps a k8s node to an AWS Pricing "product"
  35. type AWSPricing struct {
  36. Products map[string]*AWSProduct `json:"products"`
  37. Terms AWSPricingTerms `json:"terms"`
  38. }
  39. // AWSProduct represents a purchased SKU
  40. type AWSProduct struct {
  41. Sku string `json:"sku"`
  42. Attributes AWSProductAttributes `json:"attributes"`
  43. }
  44. // AWSProductAttributes represents metadata about the product used to map to a node.
  45. type AWSProductAttributes struct {
  46. Location string `json:"location"`
  47. InstanceType string `json:"instanceType"`
  48. Memory string `json:"memory"`
  49. Storage string `json:"storage"`
  50. VCpu string `json:"vcpu"`
  51. UsageType string `json:"usagetype"`
  52. OperatingSystem string `json:"operatingSystem"`
  53. PreInstalledSw string `json:"preInstalledSw"`
  54. }
  55. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  56. type AWSPricingTerms struct {
  57. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  58. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  59. }
  60. // AWSOfferTerm is a sku extension used to pay for the node.
  61. type AWSOfferTerm struct {
  62. Sku string `json:"sku"`
  63. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  64. }
  65. // AWSRateCode encodes data about the price of a product
  66. type AWSRateCode struct {
  67. Unit string `json:"unit"`
  68. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  69. }
  70. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  71. type AWSCurrencyCode struct {
  72. USD string `json:"USD"`
  73. }
  74. // AWSProductTerms represents the full terms of the product
  75. type AWSProductTerms struct {
  76. Sku string `json:"sku"`
  77. OnDemand *AWSOfferTerm `json:"OnDemand"`
  78. Reserved *AWSOfferTerm `json:"Reserved"`
  79. Memory string `json:"memory"`
  80. Storage string `json:"storage"`
  81. VCpu string `json:"vcpu"`
  82. }
  83. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  84. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  85. // OnDemandRateCode is appended to an node sku
  86. const OnDemandRateCode = ".JRTCKXETXF"
  87. // ReservedRateCode is appended to a node sku
  88. const ReservedRateCode = ".38NPMPTW36"
  89. // HourlyRateCode is appended to a node sku
  90. const HourlyRateCode = ".6YS6EN2CT7"
  91. // KubeAttrConversion maps the k8s labels for region to an aws region
  92. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  93. locationToRegion := map[string]string{
  94. "US East (Ohio)": "us-east-2",
  95. "US East (N. Virginia)": "us-east-1",
  96. "US West (N. California)": "us-west-1",
  97. "US West (Oregon)": "us-west-2",
  98. "Asia Pacific (Mumbai)": "ap-south-1",
  99. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  100. "Asia Pacific (Seoul)": "ap-northeast-2",
  101. "Asia Pacific (Singapore)": "ap-southeast-1",
  102. "Asia Pacific (Sydney)": "ap-southeast-2",
  103. "Asia Pacific (Tokyo)": "ap-northeast-1",
  104. "Canada (Central)": "ca-central-1",
  105. "China (Beijing)": "cn-north-1",
  106. "China (Ningxia)": "cn-northwest-1",
  107. "EU (Frankfurt)": "eu-central-1",
  108. "EU (Ireland)": "eu-west-1",
  109. "EU (London)": "eu-west-2",
  110. "EU (Paris)": "eu-west-3",
  111. "EU (Stockholm)": "eu-north-1",
  112. "South America (São Paulo)": "sa-east-1",
  113. "AWS GovCloud (US-East)": "us-gov-east-1",
  114. "AWS GovCloud (US)": "us-gov-west-1",
  115. }
  116. operatingSystem = strings.ToLower(operatingSystem)
  117. region := locationToRegion[location]
  118. return region + "," + instanceType + "," + operatingSystem
  119. }
  120. // GetKey maps node labels to information needed to retrieve pricing data
  121. func (aws *AWS) GetKey(labels map[string]string) string {
  122. instanceType := labels["beta.kubernetes.io/instance-type"]
  123. operatingSystem := labels["beta.kubernetes.io/os"]
  124. region := labels["failure-domain.beta.kubernetes.io/region"]
  125. if l, ok := labels["lifecycle"]; ok && l == "EC2Spot" {
  126. usageType := "preemptible"
  127. return region + "," + instanceType + "," + operatingSystem + "," + usageType
  128. }
  129. if l, ok := labels[aws.SpotLabelName]; ok && l == aws.SpotLabelValue {
  130. usageType := "preemptible"
  131. return region + "," + instanceType + "," + operatingSystem + "," + usageType
  132. }
  133. return region + "," + instanceType + "," + operatingSystem
  134. }
  135. func (aws *AWS) isPreemptible(key string) bool {
  136. s := strings.Split(key, ",")
  137. if len(s) == 4 && s[3] == "preemptible" {
  138. return true
  139. }
  140. return false
  141. }
  142. // DownloadPricingData fetches data from the AWS Pricing API
  143. func (aws *AWS) DownloadPricingData() error {
  144. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  145. if err != nil {
  146. return err
  147. }
  148. inputkeys := make(map[string]bool)
  149. for _, n := range nodeList.Items {
  150. labels := n.GetObjectMeta().GetLabels()
  151. key := aws.GetKey(labels)
  152. inputkeys[key] = true
  153. }
  154. aws.Pricing = make(map[string]*AWSProductTerms)
  155. aws.ValidPricingKeys = make(map[string]bool)
  156. skusToKeys := make(map[string]string)
  157. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  158. log.Printf("starting download of \"%s\", which is quite large ...", pricingURL)
  159. resp, err := http.Get(pricingURL)
  160. if err != nil {
  161. log.Printf("Bogus fetch of \"%s\": %v", pricingURL, err)
  162. return err
  163. }
  164. dec := json.NewDecoder(resp.Body)
  165. for {
  166. t, err := dec.Token()
  167. if err == io.EOF {
  168. log.Printf("done loading \"%s\"\n", pricingURL)
  169. break
  170. }
  171. if t == "products" {
  172. dec.Token() //{
  173. for dec.More() {
  174. dec.Token() // the sku token
  175. product := &AWSProduct{}
  176. err := dec.Decode(&product)
  177. if err != nil {
  178. log.Printf("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  179. break
  180. }
  181. if product.Attributes.PreInstalledSw == "NA" &&
  182. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  183. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  184. spotKey := key + ",preemptible"
  185. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  186. productTerms := &AWSProductTerms{
  187. Sku: product.Sku,
  188. Memory: product.Attributes.Memory,
  189. Storage: product.Attributes.Storage,
  190. VCpu: product.Attributes.VCpu,
  191. }
  192. aws.Pricing[key] = productTerms
  193. aws.Pricing[spotKey] = productTerms
  194. skusToKeys[product.Sku] = key
  195. }
  196. aws.ValidPricingKeys[key] = true
  197. aws.ValidPricingKeys[spotKey] = true
  198. }
  199. }
  200. }
  201. if t == "terms" {
  202. dec.Token()
  203. termType, _ := dec.Token()
  204. if termType == "OnDemand" {
  205. dec.Token()
  206. for dec.More() {
  207. sku, _ := dec.Token()
  208. dec.Token()
  209. skuOnDemand, _ := dec.Token()
  210. offerTerm := &AWSOfferTerm{}
  211. err := dec.Decode(&offerTerm)
  212. if err != nil {
  213. log.Printf("Error decoding AWS Offer Term: " + err.Error())
  214. }
  215. if sku.(string)+OnDemandRateCode == skuOnDemand {
  216. key, ok := skusToKeys[sku.(string)]
  217. spotKey := key + ",preemptible"
  218. if ok {
  219. aws.Pricing[key].OnDemand = offerTerm
  220. aws.Pricing[spotKey].OnDemand = offerTerm
  221. }
  222. }
  223. dec.Token()
  224. }
  225. dec.Token()
  226. }
  227. }
  228. }
  229. if err != nil {
  230. return err
  231. }
  232. c, err := GetDefaultPricingData("aws.json")
  233. if err != nil {
  234. log.Printf("Error downloading default pricing data: %s", err.Error())
  235. }
  236. aws.BaseCPUPrice = c.CPU
  237. aws.BaseSpotCPUPrice = c.SpotCPU
  238. aws.BaseSpotRAMPrice = c.SpotRAM
  239. aws.SpotLabelName = c.SpotLabel
  240. aws.SpotLabelValue = c.SpotLabelValue
  241. return nil
  242. }
  243. // AllNodePricing returns all the billing data fetched.
  244. func (aws *AWS) AllNodePricing() (interface{}, error) {
  245. return aws.Pricing, nil
  246. }
  247. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  248. func (aws *AWS) NodePricing(key string) (*Node, error) {
  249. //return json.Marshal(aws.Pricing[key])
  250. usageType := "ondemand"
  251. if aws.isPreemptible(key) {
  252. usageType = "preemptible"
  253. }
  254. terms, ok := aws.Pricing[key]
  255. if ok {
  256. if aws.isPreemptible(key) {
  257. return &Node{
  258. VCPU: terms.VCpu,
  259. VCPUCost: aws.BaseSpotCPUPrice,
  260. RAM: terms.Memory,
  261. RAMCost: aws.BaseSpotRAMPrice,
  262. Storage: terms.Storage,
  263. BaseCPUPrice: aws.BaseCPUPrice,
  264. UsageType: usageType,
  265. }, nil
  266. }
  267. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  268. return &Node{
  269. Cost: cost,
  270. VCPU: terms.VCpu,
  271. RAM: terms.Memory,
  272. Storage: terms.Storage,
  273. BaseCPUPrice: aws.BaseCPUPrice,
  274. UsageType: usageType,
  275. }, nil
  276. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  277. err := aws.DownloadPricingData()
  278. if err != nil {
  279. return nil, err
  280. }
  281. terms, termsOk := aws.Pricing[key]
  282. if !termsOk {
  283. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  284. }
  285. if aws.isPreemptible(key) {
  286. return &Node{
  287. VCPU: terms.VCpu,
  288. VCPUCost: aws.BaseSpotCPUPrice,
  289. RAM: terms.Memory,
  290. RAMCost: aws.BaseSpotRAMPrice,
  291. Storage: terms.Storage,
  292. BaseCPUPrice: aws.BaseCPUPrice,
  293. UsageType: usageType,
  294. }, nil
  295. }
  296. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  297. return &Node{
  298. Cost: cost,
  299. VCPU: terms.VCpu,
  300. RAM: terms.Memory,
  301. Storage: terms.Storage,
  302. BaseCPUPrice: aws.BaseCPUPrice,
  303. UsageType: usageType,
  304. }, nil
  305. } else {
  306. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  307. }
  308. }
  309. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  310. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  311. defaultClusterName := "AWS Cluster #1"
  312. makeStructure := func(clusterName string) ([]byte, error) {
  313. log.Printf("Returning \"%s\" as ClusterName", clusterName)
  314. m := make(map[string]string)
  315. m["name"] = clusterName
  316. m["provider"] = "AWS"
  317. return json.Marshal(m)
  318. }
  319. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  320. if len(maybeClusterId) != 0 {
  321. return makeStructure(maybeClusterId)
  322. }
  323. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  324. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  325. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  326. if err != nil {
  327. return nil, err
  328. }
  329. for _, n := range nodeList.Items {
  330. region := ""
  331. instanceId := ""
  332. providerId := n.Spec.ProviderID
  333. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  334. if matchNum == 1 {
  335. region = group
  336. } else if matchNum == 2 {
  337. instanceId = group
  338. }
  339. }
  340. if len(instanceId) == 0 {
  341. log.Printf("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  342. continue
  343. }
  344. c := &aws.Config{
  345. Region: aws.String(region),
  346. }
  347. s := session.Must(session.NewSession(c))
  348. ec2Svc := ec2.New(s)
  349. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  350. InstanceIds: []*string{
  351. aws.String(instanceId),
  352. },
  353. })
  354. if diErr != nil {
  355. // maybe log this?
  356. continue
  357. }
  358. if len(di.Reservations) != 1 {
  359. log.Printf("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  360. continue
  361. }
  362. res := di.Reservations[0]
  363. if len(res.Instances) != 1 {
  364. log.Printf("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  365. continue
  366. }
  367. inst := res.Instances[0]
  368. for _, tag := range inst.Tags {
  369. tagKey := *tag.Key
  370. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  371. if matchNum != 1 {
  372. continue
  373. }
  374. return makeStructure(group)
  375. }
  376. }
  377. }
  378. log.Printf("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  379. return makeStructure(defaultClusterName)
  380. }
  381. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  382. func (*AWS) AddServiceKey(formValues url.Values) error {
  383. keyID := formValues.Get("access_key_ID")
  384. key := formValues.Get("secret_access_key")
  385. m := make(map[string]string)
  386. m["access_key_ID"] = keyID
  387. m["secret_access_key"] = key
  388. result, err := json.Marshal(m)
  389. if err != nil {
  390. return err
  391. }
  392. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  393. }
  394. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  395. func (*AWS) GetDisks() ([]byte, error) {
  396. jsonFile, err := os.Open("/var/configs/key.json")
  397. if err == nil {
  398. byteValue, _ := ioutil.ReadAll(jsonFile)
  399. var result map[string]string
  400. json.Unmarshal([]byte(byteValue), &result)
  401. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  402. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  403. } else if os.IsNotExist(err) {
  404. log.Print("Using Default Credentials")
  405. } else {
  406. return nil, err
  407. }
  408. defer jsonFile.Close()
  409. clusterConfig, err := os.Open("/var/configs/cluster.json")
  410. if err != nil {
  411. return nil, err
  412. }
  413. defer clusterConfig.Close()
  414. bytes, _ := ioutil.ReadAll(clusterConfig)
  415. var clusterConf map[string]string
  416. json.Unmarshal([]byte(bytes), &clusterConf)
  417. region := aws.String(clusterConf["region"])
  418. c := &aws.Config{
  419. Region: region,
  420. Credentials: credentials.NewEnvCredentials(),
  421. }
  422. s := session.Must(session.NewSession(c))
  423. ec2Svc := ec2.New(s)
  424. input := &ec2.DescribeVolumesInput{}
  425. volumeResult, err := ec2Svc.DescribeVolumes(input)
  426. if err != nil {
  427. if aerr, ok := err.(awserr.Error); ok {
  428. switch aerr.Code() {
  429. default:
  430. return nil, aerr
  431. }
  432. } else {
  433. return nil, err
  434. }
  435. }
  436. return json.Marshal(volumeResult)
  437. }
  438. // QuerySQL can query a properly configured Athena database.
  439. // Used to fetch billing data.
  440. // Requires a json config in /var/configs with key region, output, and database.
  441. func (*AWS) QuerySQL(query string) ([]byte, error) {
  442. jsonFile, err := os.Open("/var/configs/key.json")
  443. if err == nil {
  444. byteValue, _ := ioutil.ReadAll(jsonFile)
  445. var result map[string]string
  446. json.Unmarshal([]byte(byteValue), &result)
  447. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  448. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  449. } else if os.IsNotExist(err) {
  450. log.Print("Using Default Credentials")
  451. } else {
  452. return nil, err
  453. }
  454. defer jsonFile.Close()
  455. athenaConfigs, err := os.Open("/var/configs/athena.json")
  456. if err != nil {
  457. return nil, err
  458. }
  459. defer athenaConfigs.Close()
  460. bytes, _ := ioutil.ReadAll(athenaConfigs)
  461. var athenaConf map[string]string
  462. json.Unmarshal([]byte(bytes), &athenaConf)
  463. region := aws.String(athenaConf["region"])
  464. resultsBucket := athenaConf["output"]
  465. database := athenaConf["database"]
  466. c := &aws.Config{
  467. Region: region,
  468. Credentials: credentials.NewEnvCredentials(),
  469. }
  470. s := session.Must(session.NewSession(c))
  471. svc := athena.New(s)
  472. var e athena.StartQueryExecutionInput
  473. var r athena.ResultConfiguration
  474. r.SetOutputLocation(resultsBucket)
  475. e.SetResultConfiguration(&r)
  476. e.SetQueryString(query)
  477. var q athena.QueryExecutionContext
  478. q.SetDatabase(database)
  479. e.SetQueryExecutionContext(&q)
  480. res, err := svc.StartQueryExecution(&e)
  481. if err != nil {
  482. return nil, err
  483. }
  484. log.Println("StartQueryExecution result:")
  485. log.Println(res.GoString())
  486. var qri athena.GetQueryExecutionInput
  487. qri.SetQueryExecutionId(*res.QueryExecutionId)
  488. var qrop *athena.GetQueryExecutionOutput
  489. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  490. for {
  491. qrop, err = svc.GetQueryExecution(&qri)
  492. if err != nil {
  493. return nil, err
  494. }
  495. if *qrop.QueryExecution.Status.State != "RUNNING" {
  496. break
  497. }
  498. time.Sleep(duration)
  499. }
  500. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  501. var ip athena.GetQueryResultsInput
  502. ip.SetQueryExecutionId(*res.QueryExecutionId)
  503. op, err := svc.GetQueryResults(&ip)
  504. if err != nil {
  505. return nil, err
  506. }
  507. bytes, err := json.Marshal(op.ResultSet)
  508. if err != nil {
  509. return nil, err
  510. }
  511. return bytes, nil
  512. }
  513. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  514. }