awsprovider.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. package cloud
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "regexp"
  13. "strings"
  14. "time"
  15. "github.com/aws/aws-sdk-go/aws"
  16. "github.com/aws/aws-sdk-go/aws/awserr"
  17. "github.com/aws/aws-sdk-go/aws/credentials"
  18. "github.com/aws/aws-sdk-go/aws/session"
  19. "github.com/aws/aws-sdk-go/service/athena"
  20. "github.com/aws/aws-sdk-go/service/ec2"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/client-go/kubernetes"
  23. )
  24. // AWS represents an Amazon Provider
  25. type AWS struct {
  26. Pricing map[string]*AWSProductTerms
  27. ValidPricingKeys map[string]bool
  28. Clientset *kubernetes.Clientset
  29. BaseCPUPrice string
  30. BaseSpotCPUPrice string
  31. BaseSpotRAMPrice string
  32. }
  33. // AWSPricing maps a k8s node to an AWS Pricing "product"
  34. type AWSPricing struct {
  35. Products map[string]*AWSProduct `json:"products"`
  36. Terms AWSPricingTerms `json:"terms"`
  37. }
  38. // AWSProduct represents a purchased SKU
  39. type AWSProduct struct {
  40. Sku string `json:"sku"`
  41. Attributes AWSProductAttributes `json:"attributes"`
  42. }
  43. // AWSProductAttributes represents metadata about the product used to map to a node.
  44. type AWSProductAttributes struct {
  45. Location string `json:"location"`
  46. InstanceType string `json:"instanceType"`
  47. Memory string `json:"memory"`
  48. Storage string `json:"storage"`
  49. VCpu string `json:"vcpu"`
  50. UsageType string `json:"usagetype"`
  51. OperatingSystem string `json:"operatingSystem"`
  52. PreInstalledSw string `json:"preInstalledSw"`
  53. }
  54. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  55. type AWSPricingTerms struct {
  56. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  57. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  58. }
  59. // AWSOfferTerm is a sku extension used to pay for the node.
  60. type AWSOfferTerm struct {
  61. Sku string `json:"sku"`
  62. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  63. }
  64. // AWSRateCode encodes data about the price of a product
  65. type AWSRateCode struct {
  66. Unit string `json:"unit"`
  67. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  68. }
  69. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  70. type AWSCurrencyCode struct {
  71. USD string `json:"USD"`
  72. }
  73. // AWSProductTerms represents the full terms of the product
  74. type AWSProductTerms struct {
  75. Sku string `json:"sku"`
  76. OnDemand *AWSOfferTerm `json:"OnDemand"`
  77. Reserved *AWSOfferTerm `json:"Reserved"`
  78. Memory string `json:"memory"`
  79. Storage string `json:"storage"`
  80. VCpu string `json:"vcpu"`
  81. }
  82. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  83. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  84. // OnDemandRateCode is appended to an node sku
  85. const OnDemandRateCode = ".JRTCKXETXF"
  86. // ReservedRateCode is appended to a node sku
  87. const ReservedRateCode = ".38NPMPTW36"
  88. // HourlyRateCode is appended to a node sku
  89. const HourlyRateCode = ".6YS6EN2CT7"
  90. // KubeAttrConversion maps the k8s labels for region to an aws region
  91. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  92. locationToRegion := map[string]string{
  93. "US East (Ohio)": "us-east-2",
  94. "US East (N. Virginia)": "us-east-1",
  95. "US West (N. California)": "us-west-1",
  96. "US West (Oregon)": "us-west-2",
  97. "Asia Pacific (Mumbai)": "ap-south-1",
  98. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  99. "Asia Pacific (Seoul)": "ap-northeast-2",
  100. "Asia Pacific (Singapore)": "ap-southeast-1",
  101. "Asia Pacific (Sydney)": "ap-southeast-2",
  102. "Asia Pacific (Tokyo)": "ap-northeast-1",
  103. "Canada (Central)": "ca-central-1",
  104. "China (Beijing)": "cn-north-1",
  105. "China (Ningxia)": "cn-northwest-1",
  106. "EU (Frankfurt)": "eu-central-1",
  107. "EU (Ireland)": "eu-west-1",
  108. "EU (London)": "eu-west-2",
  109. "EU (Paris)": "eu-west-3",
  110. "EU (Stockholm)": "eu-north-1",
  111. "South America (São Paulo)": "sa-east-1",
  112. "AWS GovCloud (US-East)": "us-gov-east-1",
  113. "AWS GovCloud (US)": "us-gov-west-1",
  114. }
  115. operatingSystem = strings.ToLower(operatingSystem)
  116. region := locationToRegion[location]
  117. return region + "," + instanceType + "," + operatingSystem
  118. }
  119. // GetKey maps node labels to information needed to retrieve pricing data
  120. func (aws *AWS) GetKey(labels map[string]string) string {
  121. instanceType := labels["beta.kubernetes.io/instance-type"]
  122. operatingSystem := labels["beta.kubernetes.io/os"]
  123. region := labels["failure-domain.beta.kubernetes.io/region"]
  124. if l, ok := labels["lifecycle"]; ok && l == "EC2Spot" {
  125. usageType := "preemptible"
  126. return region + "," + instanceType + "," + operatingSystem + "," + usageType
  127. }
  128. return region + "," + instanceType + "," + operatingSystem
  129. }
  130. func (aws *AWS) isPreemptible(key string) bool {
  131. s := strings.Split(key, ",")
  132. if len(s) == 4 && s[3] == "preemptible" {
  133. return true
  134. }
  135. return false
  136. }
  137. // DownloadPricingData fetches data from the AWS Pricing API
  138. func (aws *AWS) DownloadPricingData() error {
  139. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  140. if err != nil {
  141. return err
  142. }
  143. inputkeys := make(map[string]bool)
  144. for _, n := range nodeList.Items {
  145. labels := n.GetObjectMeta().GetLabels()
  146. key := aws.GetKey(labels)
  147. inputkeys[key] = true
  148. }
  149. aws.Pricing = make(map[string]*AWSProductTerms)
  150. aws.ValidPricingKeys = make(map[string]bool)
  151. skusToKeys := make(map[string]string)
  152. resp, err := http.Get("https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json")
  153. if err != nil {
  154. return err
  155. }
  156. dec := json.NewDecoder(resp.Body)
  157. for {
  158. t, err := dec.Token()
  159. if err == io.EOF {
  160. fmt.Printf("done \n")
  161. break
  162. }
  163. if t == "products" {
  164. dec.Token() //{
  165. for dec.More() {
  166. dec.Token() // the sku token
  167. product := &AWSProduct{}
  168. err := dec.Decode(&product)
  169. if err != nil {
  170. fmt.Printf("Error: " + err.Error())
  171. break
  172. }
  173. if product.Attributes.PreInstalledSw == "NA" &&
  174. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  175. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  176. spotKey := key + ",preemptible"
  177. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  178. aws.Pricing[key] = &AWSProductTerms{
  179. Sku: product.Sku,
  180. Memory: product.Attributes.Memory,
  181. Storage: product.Attributes.Storage,
  182. VCpu: product.Attributes.VCpu,
  183. }
  184. skusToKeys[product.Sku] = key
  185. }
  186. aws.ValidPricingKeys[key] = true
  187. aws.ValidPricingKeys[spotKey] = true
  188. }
  189. }
  190. }
  191. if t == "terms" {
  192. dec.Token()
  193. termType, _ := dec.Token()
  194. if termType == "OnDemand" {
  195. dec.Token()
  196. for dec.More() {
  197. sku, _ := dec.Token()
  198. dec.Token()
  199. skuOnDemand, _ := dec.Token()
  200. offerTerm := &AWSOfferTerm{}
  201. err := dec.Decode(&offerTerm)
  202. if err != nil {
  203. fmt.Printf("Error: " + err.Error())
  204. }
  205. if sku.(string)+OnDemandRateCode == skuOnDemand {
  206. key, ok := skusToKeys[sku.(string)]
  207. if ok {
  208. aws.Pricing[key].OnDemand = offerTerm
  209. }
  210. }
  211. dec.Token()
  212. }
  213. dec.Token()
  214. }
  215. }
  216. }
  217. if err != nil {
  218. return err
  219. }
  220. c, err := GetDefaultPricingData("default.json")
  221. if err != nil {
  222. log.Printf("Error downloading default pricing data: %s", err.Error())
  223. }
  224. aws.BaseCPUPrice = c.CPU
  225. aws.BaseSpotCPUPrice = c.SpotCPU
  226. aws.BaseSpotRAMPrice = c.SpotRAM
  227. return nil
  228. }
  229. // AllNodePricing returns all the billing data fetched.
  230. func (aws *AWS) AllNodePricing() (interface{}, error) {
  231. return aws.Pricing, nil
  232. }
  233. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  234. func (aws *AWS) NodePricing(key string) (*Node, error) {
  235. //return json.Marshal(aws.Pricing[key])
  236. usageType := "ondemand"
  237. if aws.isPreemptible(key) {
  238. usageType = "preemptible"
  239. }
  240. terms, ok := aws.Pricing[key]
  241. if ok {
  242. if aws.isPreemptible(key) {
  243. return &Node{
  244. VCPU: terms.VCpu,
  245. VCPUCost: aws.BaseSpotCPUPrice,
  246. RAM: terms.Memory,
  247. RAMCost: aws.BaseSpotRAMPrice,
  248. Storage: terms.Storage,
  249. BaseCPUPrice: aws.BaseCPUPrice,
  250. UsageType: usageType,
  251. }, nil
  252. }
  253. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  254. return &Node{
  255. Cost: cost,
  256. VCPU: terms.VCpu,
  257. RAM: terms.Memory,
  258. Storage: terms.Storage,
  259. BaseCPUPrice: aws.BaseCPUPrice,
  260. UsageType: usageType,
  261. }, nil
  262. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  263. err := aws.DownloadPricingData()
  264. if err != nil {
  265. return nil, err
  266. }
  267. terms := aws.Pricing[key]
  268. if aws.isPreemptible(key) {
  269. return &Node{
  270. VCPU: terms.VCpu,
  271. VCPUCost: aws.BaseSpotCPUPrice,
  272. RAM: terms.Memory,
  273. RAMCost: aws.BaseSpotRAMPrice,
  274. Storage: terms.Storage,
  275. BaseCPUPrice: aws.BaseCPUPrice,
  276. UsageType: usageType,
  277. }, nil
  278. }
  279. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  280. return &Node{
  281. Cost: cost,
  282. VCPU: terms.VCpu,
  283. RAM: terms.Memory,
  284. Storage: terms.Storage,
  285. BaseCPUPrice: aws.BaseCPUPrice,
  286. UsageType: usageType,
  287. }, nil
  288. } else {
  289. return nil, errors.New("Invalid Pricing Key: " + key + "\n")
  290. }
  291. }
  292. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  293. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  294. makeStructure := func(clusterName string) ([]byte, error) {
  295. log.Printf("Returning \"%s\" as ClusterName", clusterName)
  296. m := make(map[string]string)
  297. m["name"] = clusterName
  298. m["provider"] = "AWS"
  299. return json.Marshal(m)
  300. }
  301. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  302. if len(maybeClusterId) != 0 {
  303. return makeStructure(maybeClusterId)
  304. }
  305. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  306. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  307. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  308. if err != nil {
  309. return nil, err
  310. }
  311. for _, n := range nodeList.Items {
  312. region := ""
  313. instanceId := ""
  314. providerId := n.Spec.ProviderID
  315. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  316. if matchNum == 1 {
  317. region = group
  318. } else if matchNum == 2 {
  319. instanceId = group
  320. }
  321. }
  322. if len(instanceId) == 0 {
  323. log.Printf("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  324. continue
  325. }
  326. c := &aws.Config{
  327. Region: aws.String(region),
  328. }
  329. s := session.Must(session.NewSession(c))
  330. ec2Svc := ec2.New(s)
  331. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  332. InstanceIds: []*string{
  333. aws.String(instanceId),
  334. },
  335. })
  336. if diErr != nil {
  337. // maybe log this?
  338. continue
  339. }
  340. if len(di.Reservations) != 1 {
  341. log.Printf("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  342. continue
  343. }
  344. res := di.Reservations[0]
  345. if len(res.Instances) != 1 {
  346. log.Printf("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  347. continue
  348. }
  349. inst := res.Instances[0]
  350. for _, tag := range inst.Tags {
  351. tagKey := *tag.Key
  352. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  353. if matchNum != 1 {
  354. continue
  355. }
  356. return makeStructure(group)
  357. }
  358. }
  359. }
  360. return nil, fmt.Errorf("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  361. }
  362. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  363. func (*AWS) AddServiceKey(formValues url.Values) error {
  364. keyID := formValues.Get("access_key_ID")
  365. key := formValues.Get("secret_access_key")
  366. m := make(map[string]string)
  367. m["access_key_ID"] = keyID
  368. m["secret_access_key"] = key
  369. json, err := json.Marshal(m)
  370. if err != nil {
  371. return err
  372. }
  373. return ioutil.WriteFile("/var/configs/key.json", json, 0644)
  374. }
  375. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  376. func (*AWS) GetDisks() ([]byte, error) {
  377. jsonFile, err := os.Open("/var/configs/key.json")
  378. if err == nil {
  379. byteValue, _ := ioutil.ReadAll(jsonFile)
  380. var result map[string]string
  381. json.Unmarshal([]byte(byteValue), &result)
  382. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  383. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  384. } else if os.IsNotExist(err) {
  385. log.Printf("Using Default Credentials")
  386. } else {
  387. return nil, err
  388. }
  389. defer jsonFile.Close()
  390. clusterConfig, err := os.Open("/var/configs/cluster.json")
  391. if err != nil {
  392. return nil, err
  393. }
  394. defer clusterConfig.Close()
  395. bytes, _ := ioutil.ReadAll(clusterConfig)
  396. var clusterConf map[string]string
  397. json.Unmarshal([]byte(bytes), &clusterConf)
  398. region := aws.String(clusterConf["region"])
  399. c := &aws.Config{
  400. Region: region,
  401. Credentials: credentials.NewEnvCredentials(),
  402. }
  403. s := session.Must(session.NewSession(c))
  404. ec2Svc := ec2.New(s)
  405. input := &ec2.DescribeVolumesInput{}
  406. volumeResult, err := ec2Svc.DescribeVolumes(input)
  407. if err != nil {
  408. if aerr, ok := err.(awserr.Error); ok {
  409. switch aerr.Code() {
  410. default:
  411. return nil, aerr
  412. }
  413. } else {
  414. return nil, err
  415. }
  416. }
  417. return json.Marshal(volumeResult)
  418. }
  419. // QuerySQL can query a properly configured Athena database.
  420. // Used to fetch billing data.
  421. // Requires a json config in /var/configs with key region, output, and database.
  422. func (*AWS) QuerySQL(query string) ([]byte, error) {
  423. jsonFile, err := os.Open("/var/configs/key.json")
  424. if err == nil {
  425. byteValue, _ := ioutil.ReadAll(jsonFile)
  426. var result map[string]string
  427. json.Unmarshal([]byte(byteValue), &result)
  428. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  429. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  430. } else if os.IsNotExist(err) {
  431. log.Printf("Using Default Credentials")
  432. } else {
  433. return nil, err
  434. }
  435. defer jsonFile.Close()
  436. athenaConfigs, err := os.Open("/var/configs/athena.json")
  437. if err != nil {
  438. return nil, err
  439. }
  440. defer athenaConfigs.Close()
  441. bytes, _ := ioutil.ReadAll(athenaConfigs)
  442. var athenaConf map[string]string
  443. json.Unmarshal([]byte(bytes), &athenaConf)
  444. region := aws.String(athenaConf["region"])
  445. resultsBucket := athenaConf["output"]
  446. database := athenaConf["database"]
  447. c := &aws.Config{
  448. Region: region,
  449. Credentials: credentials.NewEnvCredentials(),
  450. }
  451. s := session.Must(session.NewSession(c))
  452. svc := athena.New(s)
  453. var e athena.StartQueryExecutionInput
  454. var r athena.ResultConfiguration
  455. r.SetOutputLocation(resultsBucket)
  456. e.SetResultConfiguration(&r)
  457. e.SetQueryString(query)
  458. var q athena.QueryExecutionContext
  459. q.SetDatabase(database)
  460. e.SetQueryExecutionContext(&q)
  461. res, err := svc.StartQueryExecution(&e)
  462. if err != nil {
  463. return nil, err
  464. }
  465. fmt.Println("StartQueryExecution result:")
  466. fmt.Println(res.GoString())
  467. var qri athena.GetQueryExecutionInput
  468. qri.SetQueryExecutionId(*res.QueryExecutionId)
  469. var qrop *athena.GetQueryExecutionOutput
  470. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  471. for {
  472. qrop, err = svc.GetQueryExecution(&qri)
  473. if err != nil {
  474. return nil, err
  475. }
  476. if *qrop.QueryExecution.Status.State != "RUNNING" {
  477. break
  478. }
  479. time.Sleep(duration)
  480. }
  481. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  482. var ip athena.GetQueryResultsInput
  483. ip.SetQueryExecutionId(*res.QueryExecutionId)
  484. op, err := svc.GetQueryResults(&ip)
  485. if err != nil {
  486. return nil, err
  487. }
  488. bytes, err := json.Marshal(op.ResultSet)
  489. if err != nil {
  490. return nil, err
  491. }
  492. return bytes, nil
  493. }
  494. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  495. }