awsprovider.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strings"
  15. "time"
  16. "k8s.io/klog"
  17. "github.com/aws/aws-sdk-go/aws"
  18. "github.com/aws/aws-sdk-go/aws/awserr"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/athena"
  21. "github.com/aws/aws-sdk-go/service/ec2"
  22. "github.com/aws/aws-sdk-go/service/s3"
  23. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  24. "github.com/jszwec/csvutil"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/client-go/kubernetes"
  28. )
  29. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  30. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  31. const supportedSpotFeedVersion = "1"
  32. // AWS represents an Amazon Provider
  33. type AWS struct {
  34. Pricing map[string]*AWSProductTerms
  35. SpotPricingByInstanceID map[string]*spotInfo
  36. ValidPricingKeys map[string]bool
  37. Clientset *kubernetes.Clientset
  38. BaseCPUPrice string
  39. BaseSpotCPUPrice string
  40. BaseSpotRAMPrice string
  41. SpotLabelName string
  42. SpotLabelValue string
  43. ServiceKeyName string
  44. ServiceKeySecret string
  45. SpotDataRegion string
  46. SpotDataBucket string
  47. SpotDataPrefix string
  48. ProjectID string
  49. }
  50. // AWSPricing maps a k8s node to an AWS Pricing "product"
  51. type AWSPricing struct {
  52. Products map[string]*AWSProduct `json:"products"`
  53. Terms AWSPricingTerms `json:"terms"`
  54. }
  55. // AWSProduct represents a purchased SKU
  56. type AWSProduct struct {
  57. Sku string `json:"sku"`
  58. Attributes AWSProductAttributes `json:"attributes"`
  59. }
  60. // AWSProductAttributes represents metadata about the product used to map to a node.
  61. type AWSProductAttributes struct {
  62. Location string `json:"location"`
  63. InstanceType string `json:"instanceType"`
  64. Memory string `json:"memory"`
  65. Storage string `json:"storage"`
  66. VCpu string `json:"vcpu"`
  67. UsageType string `json:"usagetype"`
  68. OperatingSystem string `json:"operatingSystem"`
  69. PreInstalledSw string `json:"preInstalledSw"`
  70. InstanceFamily string `json:"instanceFamily"`
  71. GPU string `json:gpu`
  72. }
  73. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  74. type AWSPricingTerms struct {
  75. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  76. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  77. }
  78. // AWSOfferTerm is a sku extension used to pay for the node.
  79. type AWSOfferTerm struct {
  80. Sku string `json:"sku"`
  81. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  82. }
  83. // AWSRateCode encodes data about the price of a product
  84. type AWSRateCode struct {
  85. Unit string `json:"unit"`
  86. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  87. }
  88. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  89. type AWSCurrencyCode struct {
  90. USD string `json:"USD"`
  91. }
  92. // AWSProductTerms represents the full terms of the product
  93. type AWSProductTerms struct {
  94. Sku string `json:"sku"`
  95. OnDemand *AWSOfferTerm `json:"OnDemand"`
  96. Reserved *AWSOfferTerm `json:"Reserved"`
  97. Memory string `json:"memory"`
  98. Storage string `json:"storage"`
  99. VCpu string `json:"vcpu"`
  100. GPU string `json:"gpu"`
  101. }
  102. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  103. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  104. // OnDemandRateCode is appended to an node sku
  105. const OnDemandRateCode = ".JRTCKXETXF"
  106. // ReservedRateCode is appended to a node sku
  107. const ReservedRateCode = ".38NPMPTW36"
  108. // HourlyRateCode is appended to a node sku
  109. const HourlyRateCode = ".6YS6EN2CT7"
  110. // KubeAttrConversion maps the k8s labels for region to an aws region
  111. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  112. locationToRegion := map[string]string{
  113. "US East (Ohio)": "us-east-2",
  114. "US East (N. Virginia)": "us-east-1",
  115. "US West (N. California)": "us-west-1",
  116. "US West (Oregon)": "us-west-2",
  117. "Asia Pacific (Mumbai)": "ap-south-1",
  118. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  119. "Asia Pacific (Seoul)": "ap-northeast-2",
  120. "Asia Pacific (Singapore)": "ap-southeast-1",
  121. "Asia Pacific (Sydney)": "ap-southeast-2",
  122. "Asia Pacific (Tokyo)": "ap-northeast-1",
  123. "Canada (Central)": "ca-central-1",
  124. "China (Beijing)": "cn-north-1",
  125. "China (Ningxia)": "cn-northwest-1",
  126. "EU (Frankfurt)": "eu-central-1",
  127. "EU (Ireland)": "eu-west-1",
  128. "EU (London)": "eu-west-2",
  129. "EU (Paris)": "eu-west-3",
  130. "EU (Stockholm)": "eu-north-1",
  131. "South America (São Paulo)": "sa-east-1",
  132. "AWS GovCloud (US-East)": "us-gov-east-1",
  133. "AWS GovCloud (US)": "us-gov-west-1",
  134. }
  135. operatingSystem = strings.ToLower(operatingSystem)
  136. region := locationToRegion[location]
  137. return region + "," + instanceType + "," + operatingSystem
  138. }
  139. type AwsSpotFeedInfo struct {
  140. BucketName string `json:"bucketName"`
  141. Prefix string `json:"prefix"`
  142. Region string `json:"region"`
  143. AccountID string `json:"accountId"`
  144. ServiceKeyName string `json:"serviceKeyName"`
  145. ServiceKeySecret string `json:"serviceKeySecret"`
  146. }
  147. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  148. c, err := GetDefaultPricingData("aws.json")
  149. if err != nil {
  150. return nil, err
  151. }
  152. return c, nil
  153. }
  154. func (aws *AWS) UpdateConfig(r io.Reader) (*CustomPricing, error) {
  155. a := AwsSpotFeedInfo{}
  156. err := json.NewDecoder(r).Decode(&a)
  157. if err != nil {
  158. return nil, err
  159. }
  160. c, err := GetDefaultPricingData("aws.json")
  161. if err != nil {
  162. return nil, err
  163. }
  164. c.ServiceKeyName = a.ServiceKeyName
  165. c.ServiceKeySecret = a.ServiceKeySecret
  166. c.SpotDataPrefix = a.Prefix
  167. c.SpotDataBucket = a.BucketName
  168. c.ProjectID = a.AccountID
  169. c.SpotDataRegion = a.Region
  170. cj, err := json.Marshal(c)
  171. if err != nil {
  172. return nil, err
  173. }
  174. err = ioutil.WriteFile("/models/aws.json", cj, 0644)
  175. if err != nil {
  176. return nil, err
  177. }
  178. return c, nil
  179. }
  180. type awsKey struct {
  181. SpotLabelName string
  182. SpotLabelValue string
  183. Labels map[string]string
  184. ProviderID string
  185. }
  186. func (k *awsKey) GPUType() string {
  187. return ""
  188. }
  189. func (k *awsKey) ID() string {
  190. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  191. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  192. if matchNum == 2 {
  193. return group
  194. }
  195. }
  196. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  197. return ""
  198. }
  199. func (k *awsKey) Features() string {
  200. instanceType := k.Labels[v1.LabelInstanceType]
  201. var operatingSystem string
  202. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  203. if !ok {
  204. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  205. }
  206. region := k.Labels[v1.LabelZoneRegion]
  207. key := region + "," + instanceType + "," + operatingSystem
  208. usageType := "preemptible"
  209. spotKey := key + "," + usageType
  210. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  211. return spotKey
  212. }
  213. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  214. return spotKey
  215. }
  216. return key
  217. }
  218. // GetKey maps node labels to information needed to retrieve pricing data
  219. func (aws *AWS) GetKey(labels map[string]string) Key {
  220. return &awsKey{
  221. SpotLabelName: aws.SpotLabelName,
  222. SpotLabelValue: aws.SpotLabelValue,
  223. Labels: labels,
  224. ProviderID: labels["providerID"],
  225. }
  226. }
  227. func (aws *AWS) isPreemptible(key string) bool {
  228. s := strings.Split(key, ",")
  229. if len(s) == 4 && s[3] == "preemptible" {
  230. return true
  231. }
  232. return false
  233. }
  234. // DownloadPricingData fetches data from the AWS Pricing API
  235. func (aws *AWS) DownloadPricingData() error {
  236. c, err := GetDefaultPricingData("aws.json")
  237. if err != nil {
  238. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  239. }
  240. aws.BaseCPUPrice = c.CPU
  241. aws.BaseSpotCPUPrice = c.SpotCPU
  242. aws.BaseSpotRAMPrice = c.SpotRAM
  243. aws.SpotLabelName = c.SpotLabel
  244. aws.SpotLabelValue = c.SpotLabelValue
  245. aws.SpotDataBucket = c.SpotDataBucket
  246. aws.SpotDataPrefix = c.SpotDataPrefix
  247. aws.ProjectID = c.ProjectID
  248. aws.SpotDataRegion = c.SpotDataRegion
  249. aws.ServiceKeyName = c.ServiceKeyName
  250. aws.ServiceKeySecret = c.ServiceKeySecret
  251. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  252. return fmt.Errorf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  253. }
  254. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  255. if err != nil {
  256. return err
  257. }
  258. inputkeys := make(map[string]bool)
  259. for _, n := range nodeList.Items {
  260. labels := n.GetObjectMeta().GetLabels()
  261. key := aws.GetKey(labels)
  262. inputkeys[key.Features()] = true
  263. }
  264. aws.Pricing = make(map[string]*AWSProductTerms)
  265. aws.ValidPricingKeys = make(map[string]bool)
  266. skusToKeys := make(map[string]string)
  267. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  268. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  269. resp, err := http.Get(pricingURL)
  270. if err != nil {
  271. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  272. return err
  273. }
  274. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  275. dec := json.NewDecoder(resp.Body)
  276. for {
  277. t, err := dec.Token()
  278. if err == io.EOF {
  279. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  280. break
  281. }
  282. if t == "products" {
  283. _, err := dec.Token() // this should parse the opening "{""
  284. if err != nil {
  285. return err
  286. }
  287. for dec.More() {
  288. _, err := dec.Token() // the sku token
  289. if err != nil {
  290. return err
  291. }
  292. product := &AWSProduct{}
  293. err = dec.Decode(&product)
  294. if err != nil {
  295. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  296. break
  297. }
  298. if product.Attributes.PreInstalledSw == "NA" &&
  299. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  300. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  301. spotKey := key + ",preemptible"
  302. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  303. productTerms := &AWSProductTerms{
  304. Sku: product.Sku,
  305. Memory: product.Attributes.Memory,
  306. Storage: product.Attributes.Storage,
  307. VCpu: product.Attributes.VCpu,
  308. GPU: product.Attributes.GPU,
  309. }
  310. aws.Pricing[key] = productTerms
  311. aws.Pricing[spotKey] = productTerms
  312. skusToKeys[product.Sku] = key
  313. }
  314. aws.ValidPricingKeys[key] = true
  315. aws.ValidPricingKeys[spotKey] = true
  316. }
  317. }
  318. }
  319. if t == "terms" {
  320. _, err := dec.Token() // this should parse the opening "{""
  321. if err != nil {
  322. return err
  323. }
  324. termType, err := dec.Token()
  325. if err != nil {
  326. return err
  327. }
  328. if termType == "OnDemand" {
  329. _, err := dec.Token()
  330. if err != nil { // again, should parse an opening "{"
  331. return err
  332. }
  333. for dec.More() {
  334. sku, err := dec.Token()
  335. if err != nil {
  336. return err
  337. }
  338. _, err = dec.Token() // another opening "{"
  339. if err != nil {
  340. return err
  341. }
  342. skuOnDemand, err := dec.Token()
  343. if err != nil {
  344. return err
  345. }
  346. offerTerm := &AWSOfferTerm{}
  347. err = dec.Decode(&offerTerm)
  348. if err != nil {
  349. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  350. }
  351. if sku.(string)+OnDemandRateCode == skuOnDemand {
  352. key, ok := skusToKeys[sku.(string)]
  353. spotKey := key + ",preemptible"
  354. if ok {
  355. aws.Pricing[key].OnDemand = offerTerm
  356. aws.Pricing[spotKey].OnDemand = offerTerm
  357. }
  358. }
  359. _, err = dec.Token()
  360. if err != nil {
  361. return err
  362. }
  363. }
  364. _, err = dec.Token()
  365. if err != nil {
  366. return err
  367. }
  368. }
  369. }
  370. }
  371. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  372. if err != nil {
  373. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  374. } else {
  375. aws.SpotPricingByInstanceID = sp
  376. }
  377. return nil
  378. }
  379. // AllNodePricing returns all the billing data fetched.
  380. func (aws *AWS) AllNodePricing() (interface{}, error) {
  381. return aws.Pricing, nil
  382. }
  383. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  384. key := k.Features()
  385. if aws.isPreemptible(key) {
  386. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  387. var spotcost string
  388. arr := strings.Split(spotInfo.Charge, " ")
  389. if len(arr) == 2 {
  390. spotcost = arr[0]
  391. } else {
  392. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  393. }
  394. return &Node{
  395. Cost: spotcost,
  396. VCPU: terms.VCpu,
  397. RAM: terms.Memory,
  398. GPU: terms.GPU,
  399. Storage: terms.Storage,
  400. BaseCPUPrice: aws.BaseCPUPrice,
  401. UsageType: usageType,
  402. }, nil
  403. }
  404. return &Node{
  405. VCPU: terms.VCpu,
  406. VCPUCost: aws.BaseSpotCPUPrice,
  407. RAM: terms.Memory,
  408. GPU: terms.GPU,
  409. RAMCost: aws.BaseSpotRAMPrice,
  410. Storage: terms.Storage,
  411. BaseCPUPrice: aws.BaseCPUPrice,
  412. UsageType: usageType,
  413. }, nil
  414. }
  415. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  416. return &Node{
  417. Cost: cost,
  418. VCPU: terms.VCpu,
  419. RAM: terms.Memory,
  420. GPU: terms.GPU,
  421. Storage: terms.Storage,
  422. BaseCPUPrice: aws.BaseCPUPrice,
  423. UsageType: usageType,
  424. }, nil
  425. }
  426. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  427. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  428. //return json.Marshal(aws.Pricing[key])
  429. key := k.Features()
  430. usageType := "ondemand"
  431. if aws.isPreemptible(key) {
  432. usageType = "preemptible"
  433. }
  434. terms, ok := aws.Pricing[key]
  435. if ok {
  436. return aws.createNode(terms, usageType, k)
  437. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  438. err := aws.DownloadPricingData()
  439. if err != nil {
  440. return nil, err
  441. }
  442. terms, termsOk := aws.Pricing[key]
  443. if !termsOk {
  444. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  445. }
  446. return aws.createNode(terms, usageType, k)
  447. } else { // Fall back to base pricing if we can't find the key.
  448. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  449. return &Node{
  450. Cost: aws.BaseCPUPrice,
  451. BaseCPUPrice: aws.BaseCPUPrice,
  452. UsageType: usageType,
  453. UsesBaseCPUPrice: true,
  454. }, nil
  455. }
  456. }
  457. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  458. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  459. defaultClusterName := "AWS Cluster #1"
  460. makeStructure := func(clusterName string) ([]byte, error) {
  461. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  462. m := make(map[string]string)
  463. m["name"] = clusterName
  464. m["provider"] = "AWS"
  465. return json.Marshal(m)
  466. }
  467. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  468. if len(maybeClusterId) != 0 {
  469. return makeStructure(maybeClusterId)
  470. }
  471. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  472. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  473. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  474. if err != nil {
  475. return nil, err
  476. }
  477. for _, n := range nodeList.Items {
  478. region := ""
  479. instanceId := ""
  480. providerId := n.Spec.ProviderID
  481. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  482. if matchNum == 1 {
  483. region = group
  484. } else if matchNum == 2 {
  485. instanceId = group
  486. }
  487. }
  488. if len(instanceId) == 0 {
  489. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  490. continue
  491. }
  492. c := &aws.Config{
  493. Region: aws.String(region),
  494. }
  495. s := session.Must(session.NewSession(c))
  496. ec2Svc := ec2.New(s)
  497. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  498. InstanceIds: []*string{
  499. aws.String(instanceId),
  500. },
  501. })
  502. if diErr != nil {
  503. // maybe log this?
  504. continue
  505. }
  506. if len(di.Reservations) != 1 {
  507. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  508. continue
  509. }
  510. res := di.Reservations[0]
  511. if len(res.Instances) != 1 {
  512. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  513. continue
  514. }
  515. inst := res.Instances[0]
  516. for _, tag := range inst.Tags {
  517. tagKey := *tag.Key
  518. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  519. if matchNum != 1 {
  520. continue
  521. }
  522. return makeStructure(group)
  523. }
  524. }
  525. }
  526. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  527. return makeStructure(defaultClusterName)
  528. }
  529. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  530. func (*AWS) AddServiceKey(formValues url.Values) error {
  531. keyID := formValues.Get("access_key_ID")
  532. key := formValues.Get("secret_access_key")
  533. m := make(map[string]string)
  534. m["access_key_ID"] = keyID
  535. m["secret_access_key"] = key
  536. result, err := json.Marshal(m)
  537. if err != nil {
  538. return err
  539. }
  540. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  541. }
  542. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  543. func (*AWS) GetDisks() ([]byte, error) {
  544. jsonFile, err := os.Open("/var/configs/key.json")
  545. if err == nil {
  546. byteValue, _ := ioutil.ReadAll(jsonFile)
  547. var result map[string]string
  548. err := json.Unmarshal([]byte(byteValue), &result)
  549. if err != nil {
  550. return nil, err
  551. }
  552. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  553. if err != nil {
  554. return nil, err
  555. }
  556. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  557. if err != nil {
  558. return nil, err
  559. }
  560. } else if os.IsNotExist(err) {
  561. klog.V(2).Infof("Using Default Credentials")
  562. } else {
  563. return nil, err
  564. }
  565. defer jsonFile.Close()
  566. clusterConfig, err := os.Open("/var/configs/cluster.json")
  567. if err != nil {
  568. return nil, err
  569. }
  570. defer clusterConfig.Close()
  571. b, err := ioutil.ReadAll(clusterConfig)
  572. if err != nil {
  573. return nil, err
  574. }
  575. var clusterConf map[string]string
  576. err = json.Unmarshal([]byte(b), &clusterConf)
  577. if err != nil {
  578. return nil, err
  579. }
  580. region := aws.String(clusterConf["region"])
  581. c := &aws.Config{
  582. Region: region,
  583. }
  584. s := session.Must(session.NewSession(c))
  585. ec2Svc := ec2.New(s)
  586. input := &ec2.DescribeVolumesInput{}
  587. volumeResult, err := ec2Svc.DescribeVolumes(input)
  588. if err != nil {
  589. if aerr, ok := err.(awserr.Error); ok {
  590. switch aerr.Code() {
  591. default:
  592. return nil, aerr
  593. }
  594. } else {
  595. return nil, err
  596. }
  597. }
  598. return json.Marshal(volumeResult)
  599. }
  600. func (*AWS) ExternalAllocations(start string, end string) ([]*OutOfClusterAllocation, error) {
  601. return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  602. }
  603. // QuerySQL can query a properly configured Athena database.
  604. // Used to fetch billing data.
  605. // Requires a json config in /var/configs with key region, output, and database.
  606. func (*AWS) QuerySQL(query string) ([]byte, error) {
  607. jsonFile, err := os.Open("/var/configs/key.json")
  608. if err == nil {
  609. byteValue, _ := ioutil.ReadAll(jsonFile)
  610. var result map[string]string
  611. json.Unmarshal([]byte(byteValue), &result)
  612. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  613. if err != nil {
  614. return nil, err
  615. }
  616. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  617. if err != nil {
  618. return nil, err
  619. }
  620. } else if os.IsNotExist(err) {
  621. klog.V(2).Infof("Using Default Credentials")
  622. } else {
  623. return nil, err
  624. }
  625. defer jsonFile.Close()
  626. athenaConfigs, err := os.Open("/var/configs/athena.json")
  627. if err != nil {
  628. return nil, err
  629. }
  630. defer athenaConfigs.Close()
  631. b, err := ioutil.ReadAll(athenaConfigs)
  632. if err != nil {
  633. return nil, err
  634. }
  635. var athenaConf map[string]string
  636. json.Unmarshal([]byte(b), &athenaConf)
  637. region := aws.String(athenaConf["region"])
  638. resultsBucket := athenaConf["output"]
  639. database := athenaConf["database"]
  640. c := &aws.Config{
  641. Region: region,
  642. }
  643. s := session.Must(session.NewSession(c))
  644. svc := athena.New(s)
  645. var e athena.StartQueryExecutionInput
  646. var r athena.ResultConfiguration
  647. r.SetOutputLocation(resultsBucket)
  648. e.SetResultConfiguration(&r)
  649. e.SetQueryString(query)
  650. var q athena.QueryExecutionContext
  651. q.SetDatabase(database)
  652. e.SetQueryExecutionContext(&q)
  653. res, err := svc.StartQueryExecution(&e)
  654. if err != nil {
  655. return nil, err
  656. }
  657. klog.V(2).Infof("StartQueryExecution result:")
  658. klog.V(2).Infof(res.GoString())
  659. var qri athena.GetQueryExecutionInput
  660. qri.SetQueryExecutionId(*res.QueryExecutionId)
  661. var qrop *athena.GetQueryExecutionOutput
  662. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  663. for {
  664. qrop, err = svc.GetQueryExecution(&qri)
  665. if err != nil {
  666. return nil, err
  667. }
  668. if *qrop.QueryExecution.Status.State != "RUNNING" {
  669. break
  670. }
  671. time.Sleep(duration)
  672. }
  673. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  674. var ip athena.GetQueryResultsInput
  675. ip.SetQueryExecutionId(*res.QueryExecutionId)
  676. op, err := svc.GetQueryResults(&ip)
  677. if err != nil {
  678. return nil, err
  679. }
  680. b, err := json.Marshal(op.ResultSet)
  681. if err != nil {
  682. return nil, err
  683. }
  684. return b, nil
  685. }
  686. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  687. }
  688. type spotInfo struct {
  689. Timestamp string `csv:"Timestamp"`
  690. UsageType string `csv:"UsageType"`
  691. Operation string `csv:"Operation"`
  692. InstanceID string `csv:"InstanceID"`
  693. MyBidID string `csv:"MyBidID"`
  694. MyMaxPrice string `csv:"MyMaxPrice"`
  695. MarketPrice string `csv:"MarketPrice"`
  696. Charge string `csv:"Charge"`
  697. Version string `csv:"Version"`
  698. }
  699. type fnames []*string
  700. func (f fnames) Len() int {
  701. return len(f)
  702. }
  703. func (f fnames) Swap(i, j int) {
  704. f[i], f[j] = f[j], f[i]
  705. }
  706. func (f fnames) Less(i, j int) bool {
  707. key1 := strings.Split(*f[i], ".")
  708. key2 := strings.Split(*f[j], ".")
  709. t1, err := time.Parse("2006-01-02-15", key1[1])
  710. if err != nil {
  711. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  712. return false
  713. }
  714. t2, err := time.Parse("2006-01-02-15", key2[1])
  715. if err != nil {
  716. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  717. return false
  718. }
  719. return t1.Before(t2)
  720. }
  721. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  722. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  723. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  724. if err != nil {
  725. return nil, err
  726. }
  727. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  728. if err != nil {
  729. return nil, err
  730. }
  731. }
  732. s3Prefix := projectID
  733. if len(prefix) != 0 {
  734. s3Prefix = prefix + "/" + s3Prefix
  735. }
  736. c := aws.NewConfig().WithRegion(region)
  737. s := session.Must(session.NewSession(c))
  738. s3Svc := s3.New(s)
  739. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  740. tNow := time.Now()
  741. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  742. ls := &s3.ListObjectsInput{
  743. Bucket: aws.String(bucket),
  744. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  745. }
  746. ls2 := &s3.ListObjectsInput{
  747. Bucket: aws.String(bucket),
  748. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  749. }
  750. lso, err := s3Svc.ListObjects(ls)
  751. if err != nil {
  752. return nil, err
  753. }
  754. lsoLen := len(lso.Contents)
  755. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  756. if lsoLen == 0 {
  757. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  758. }
  759. lso2, err := s3Svc.ListObjects(ls2)
  760. if err != nil {
  761. return nil, err
  762. }
  763. lso2Len := len(lso2.Contents)
  764. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  765. if lso2Len == 0 {
  766. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  767. }
  768. var keys []*string
  769. for _, obj := range lso.Contents {
  770. keys = append(keys, obj.Key)
  771. }
  772. for _, obj := range lso2.Contents {
  773. keys = append(keys, obj.Key)
  774. }
  775. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  776. header, err := csvutil.Header(spotInfo{}, "csv")
  777. if err != nil {
  778. return nil, err
  779. }
  780. fieldsPerRecord := len(header)
  781. spots := make(map[string]*spotInfo)
  782. for _, key := range keys {
  783. getObj := &s3.GetObjectInput{
  784. Bucket: aws.String(bucket),
  785. Key: key,
  786. }
  787. buf := aws.NewWriteAtBuffer([]byte{})
  788. _, err := downloader.Download(buf, getObj)
  789. if err != nil {
  790. return nil, err
  791. }
  792. r := bytes.NewReader(buf.Bytes())
  793. gr, err := gzip.NewReader(r)
  794. if err != nil {
  795. return nil, err
  796. }
  797. csvReader := csv.NewReader(gr)
  798. csvReader.Comma = '\t'
  799. csvReader.FieldsPerRecord = fieldsPerRecord
  800. dec, err := csvutil.NewDecoder(csvReader, header...)
  801. if err != nil {
  802. return nil, err
  803. }
  804. var foundVersion string
  805. for {
  806. spot := spotInfo{}
  807. err := dec.Decode(&spot)
  808. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  809. if err == io.EOF {
  810. break
  811. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  812. rec := dec.Record()
  813. // the first two "Record()" will be the comment lines
  814. // and they show up as len() == 1
  815. // the first of which is "#Version"
  816. // the second of which is "#Fields: "
  817. if len(rec) != 1 {
  818. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  819. continue
  820. }
  821. if len(foundVersion) == 0 {
  822. spotFeedVersion := rec[0]
  823. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  824. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  825. if matches != nil {
  826. foundVersion = matches[1]
  827. if foundVersion != supportedSpotFeedVersion {
  828. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  829. break
  830. }
  831. }
  832. continue
  833. } else if strings.Index(rec[0], "#") == 0 {
  834. continue
  835. } else {
  836. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  837. continue
  838. }
  839. } else if err != nil {
  840. klog.V(2).Infof("Error during spot info decode: %+v", err)
  841. continue
  842. }
  843. klog.V(3).Infof("Found spot info %+v", spot)
  844. spots[spot.InstanceID] = &spot
  845. }
  846. gr.Close()
  847. }
  848. return spots, nil
  849. }