awsprovider.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strings"
  15. "time"
  16. "k8s.io/klog"
  17. "github.com/aws/aws-sdk-go/aws"
  18. "github.com/aws/aws-sdk-go/aws/awserr"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/athena"
  21. "github.com/aws/aws-sdk-go/service/ec2"
  22. "github.com/aws/aws-sdk-go/service/s3"
  23. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  24. "github.com/jszwec/csvutil"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/client-go/kubernetes"
  28. )
  29. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  30. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  31. const supportedSpotFeedVersion = "1"
  32. // AWS represents an Amazon Provider
  33. type AWS struct {
  34. Pricing map[string]*AWSProductTerms
  35. SpotPricingByInstanceID map[string]*spotInfo
  36. ValidPricingKeys map[string]bool
  37. Clientset *kubernetes.Clientset
  38. BaseCPUPrice string
  39. BaseSpotCPUPrice string
  40. BaseSpotRAMPrice string
  41. SpotLabelName string
  42. SpotLabelValue string
  43. ServiceKeyName string
  44. ServiceKeySecret string
  45. SpotDataRegion string
  46. SpotDataBucket string
  47. SpotDataPrefix string
  48. ProjectID string
  49. }
  50. // AWSPricing maps a k8s node to an AWS Pricing "product"
  51. type AWSPricing struct {
  52. Products map[string]*AWSProduct `json:"products"`
  53. Terms AWSPricingTerms `json:"terms"`
  54. }
  55. // AWSProduct represents a purchased SKU
  56. type AWSProduct struct {
  57. Sku string `json:"sku"`
  58. Attributes AWSProductAttributes `json:"attributes"`
  59. }
  60. // AWSProductAttributes represents metadata about the product used to map to a node.
  61. type AWSProductAttributes struct {
  62. Location string `json:"location"`
  63. InstanceType string `json:"instanceType"`
  64. Memory string `json:"memory"`
  65. Storage string `json:"storage"`
  66. VCpu string `json:"vcpu"`
  67. UsageType string `json:"usagetype"`
  68. OperatingSystem string `json:"operatingSystem"`
  69. PreInstalledSw string `json:"preInstalledSw"`
  70. InstanceFamily string `json:"instanceFamily"`
  71. GPU string `json:gpu`
  72. }
  73. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  74. type AWSPricingTerms struct {
  75. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  76. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  77. }
  78. // AWSOfferTerm is a sku extension used to pay for the node.
  79. type AWSOfferTerm struct {
  80. Sku string `json:"sku"`
  81. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  82. }
  83. // AWSRateCode encodes data about the price of a product
  84. type AWSRateCode struct {
  85. Unit string `json:"unit"`
  86. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  87. }
  88. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  89. type AWSCurrencyCode struct {
  90. USD string `json:"USD"`
  91. }
  92. // AWSProductTerms represents the full terms of the product
  93. type AWSProductTerms struct {
  94. Sku string `json:"sku"`
  95. OnDemand *AWSOfferTerm `json:"OnDemand"`
  96. Reserved *AWSOfferTerm `json:"Reserved"`
  97. Memory string `json:"memory"`
  98. Storage string `json:"storage"`
  99. VCpu string `json:"vcpu"`
  100. GPU string `json:"gpu"`
  101. }
  102. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  103. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  104. // OnDemandRateCode is appended to an node sku
  105. const OnDemandRateCode = ".JRTCKXETXF"
  106. // ReservedRateCode is appended to a node sku
  107. const ReservedRateCode = ".38NPMPTW36"
  108. // HourlyRateCode is appended to a node sku
  109. const HourlyRateCode = ".6YS6EN2CT7"
  110. // KubeAttrConversion maps the k8s labels for region to an aws region
  111. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  112. locationToRegion := map[string]string{
  113. "US East (Ohio)": "us-east-2",
  114. "US East (N. Virginia)": "us-east-1",
  115. "US West (N. California)": "us-west-1",
  116. "US West (Oregon)": "us-west-2",
  117. "Asia Pacific (Mumbai)": "ap-south-1",
  118. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  119. "Asia Pacific (Seoul)": "ap-northeast-2",
  120. "Asia Pacific (Singapore)": "ap-southeast-1",
  121. "Asia Pacific (Sydney)": "ap-southeast-2",
  122. "Asia Pacific (Tokyo)": "ap-northeast-1",
  123. "Canada (Central)": "ca-central-1",
  124. "China (Beijing)": "cn-north-1",
  125. "China (Ningxia)": "cn-northwest-1",
  126. "EU (Frankfurt)": "eu-central-1",
  127. "EU (Ireland)": "eu-west-1",
  128. "EU (London)": "eu-west-2",
  129. "EU (Paris)": "eu-west-3",
  130. "EU (Stockholm)": "eu-north-1",
  131. "South America (São Paulo)": "sa-east-1",
  132. "AWS GovCloud (US-East)": "us-gov-east-1",
  133. "AWS GovCloud (US)": "us-gov-west-1",
  134. }
  135. operatingSystem = strings.ToLower(operatingSystem)
  136. region := locationToRegion[location]
  137. return region + "," + instanceType + "," + operatingSystem
  138. }
  139. type AwsSpotFeedInfo struct {
  140. BucketName string `json:"bucketName"`
  141. Prefix string `json:"prefix"`
  142. Region string `json:"region"`
  143. AccountID string `json:"accountId"`
  144. ServiceKeyName string `json:"serviceKeyName"`
  145. ServiceKeySecret string `json:"serviceKeySecret"`
  146. }
  147. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  148. c, err := GetDefaultPricingData("aws.json")
  149. if err != nil {
  150. return nil, err
  151. }
  152. return c, nil
  153. }
  154. func (aws *AWS) UpdateConfig(r io.Reader) (*CustomPricing, error) {
  155. a := AwsSpotFeedInfo{}
  156. err := json.NewDecoder(r).Decode(&a)
  157. if err != nil {
  158. return nil, err
  159. }
  160. c, err := GetDefaultPricingData("aws.json")
  161. if err != nil {
  162. return nil, err
  163. }
  164. c.ServiceKeyName = a.ServiceKeyName
  165. c.ServiceKeySecret = a.ServiceKeySecret
  166. c.SpotDataPrefix = a.Prefix
  167. c.SpotDataBucket = a.BucketName
  168. c.ProjectID = a.AccountID
  169. c.SpotDataRegion = a.Region
  170. cj, err := json.Marshal(c)
  171. if err != nil {
  172. return nil, err
  173. }
  174. err = ioutil.WriteFile("/models/aws.json", cj, 0644)
  175. if err != nil {
  176. return nil, err
  177. }
  178. return c, nil
  179. }
  180. type awsKey struct {
  181. SpotLabelName string
  182. SpotLabelValue string
  183. Labels map[string]string
  184. ProviderID string
  185. }
  186. func (k *awsKey) GPUType() string {
  187. return ""
  188. }
  189. func (k *awsKey) ID() string {
  190. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  191. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  192. if matchNum == 2 {
  193. return group
  194. }
  195. }
  196. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  197. return ""
  198. }
  199. func (k *awsKey) Features() string {
  200. instanceType := k.Labels[v1.LabelInstanceType]
  201. var operatingSystem string
  202. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  203. if !ok {
  204. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  205. }
  206. region := k.Labels[v1.LabelZoneRegion]
  207. key := region + "," + instanceType + "," + operatingSystem
  208. usageType := "preemptible"
  209. spotKey := key + "," + usageType
  210. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  211. return spotKey
  212. }
  213. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  214. return spotKey
  215. }
  216. return key
  217. }
  218. // GetKey maps node labels to information needed to retrieve pricing data
  219. func (aws *AWS) GetKey(labels map[string]string) Key {
  220. return &awsKey{
  221. SpotLabelName: aws.SpotLabelName,
  222. SpotLabelValue: aws.SpotLabelValue,
  223. Labels: labels,
  224. ProviderID: labels["providerID"],
  225. }
  226. }
  227. func (aws *AWS) isPreemptible(key string) bool {
  228. s := strings.Split(key, ",")
  229. if len(s) == 4 && s[3] == "preemptible" {
  230. return true
  231. }
  232. return false
  233. }
  234. // DownloadPricingData fetches data from the AWS Pricing API
  235. func (aws *AWS) DownloadPricingData() error {
  236. c, err := GetDefaultPricingData("aws.json")
  237. if err != nil {
  238. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  239. }
  240. aws.BaseCPUPrice = c.CPU
  241. aws.BaseSpotCPUPrice = c.SpotCPU
  242. aws.BaseSpotRAMPrice = c.SpotRAM
  243. aws.SpotLabelName = c.SpotLabel
  244. aws.SpotLabelValue = c.SpotLabelValue
  245. aws.SpotDataBucket = c.SpotDataBucket
  246. aws.SpotDataPrefix = c.SpotDataPrefix
  247. aws.ProjectID = c.ProjectID
  248. aws.SpotDataRegion = c.SpotDataRegion
  249. aws.ServiceKeyName = c.ServiceKeyName
  250. aws.ServiceKeySecret = c.ServiceKeySecret
  251. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  252. if err != nil {
  253. return err
  254. }
  255. inputkeys := make(map[string]bool)
  256. for _, n := range nodeList.Items {
  257. labels := n.GetObjectMeta().GetLabels()
  258. key := aws.GetKey(labels)
  259. inputkeys[key.Features()] = true
  260. }
  261. aws.Pricing = make(map[string]*AWSProductTerms)
  262. aws.ValidPricingKeys = make(map[string]bool)
  263. skusToKeys := make(map[string]string)
  264. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  265. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  266. resp, err := http.Get(pricingURL)
  267. if err != nil {
  268. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  269. return err
  270. }
  271. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  272. dec := json.NewDecoder(resp.Body)
  273. for {
  274. t, err := dec.Token()
  275. if err == io.EOF {
  276. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  277. break
  278. }
  279. if t == "products" {
  280. _, err := dec.Token() // this should parse the opening "{""
  281. if err != nil {
  282. return err
  283. }
  284. for dec.More() {
  285. _, err := dec.Token() // the sku token
  286. if err != nil {
  287. return err
  288. }
  289. product := &AWSProduct{}
  290. err = dec.Decode(&product)
  291. if err != nil {
  292. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  293. break
  294. }
  295. if product.Attributes.PreInstalledSw == "NA" &&
  296. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  297. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  298. spotKey := key + ",preemptible"
  299. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  300. productTerms := &AWSProductTerms{
  301. Sku: product.Sku,
  302. Memory: product.Attributes.Memory,
  303. Storage: product.Attributes.Storage,
  304. VCpu: product.Attributes.VCpu,
  305. GPU: product.Attributes.GPU,
  306. }
  307. aws.Pricing[key] = productTerms
  308. aws.Pricing[spotKey] = productTerms
  309. skusToKeys[product.Sku] = key
  310. }
  311. aws.ValidPricingKeys[key] = true
  312. aws.ValidPricingKeys[spotKey] = true
  313. }
  314. }
  315. }
  316. if t == "terms" {
  317. _, err := dec.Token() // this should parse the opening "{""
  318. if err != nil {
  319. return err
  320. }
  321. termType, err := dec.Token()
  322. if err != nil {
  323. return err
  324. }
  325. if termType == "OnDemand" {
  326. _, err := dec.Token()
  327. if err != nil { // again, should parse an opening "{"
  328. return err
  329. }
  330. for dec.More() {
  331. sku, err := dec.Token()
  332. if err != nil {
  333. return err
  334. }
  335. _, err = dec.Token() // another opening "{"
  336. if err != nil {
  337. return err
  338. }
  339. skuOnDemand, err := dec.Token()
  340. if err != nil {
  341. return err
  342. }
  343. offerTerm := &AWSOfferTerm{}
  344. err = dec.Decode(&offerTerm)
  345. if err != nil {
  346. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  347. }
  348. if sku.(string)+OnDemandRateCode == skuOnDemand {
  349. key, ok := skusToKeys[sku.(string)]
  350. spotKey := key + ",preemptible"
  351. if ok {
  352. aws.Pricing[key].OnDemand = offerTerm
  353. aws.Pricing[spotKey].OnDemand = offerTerm
  354. }
  355. }
  356. _, err = dec.Token()
  357. if err != nil {
  358. return err
  359. }
  360. }
  361. _, err = dec.Token()
  362. if err != nil {
  363. return err
  364. }
  365. }
  366. }
  367. }
  368. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  369. if err != nil {
  370. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  371. } else {
  372. aws.SpotPricingByInstanceID = sp
  373. }
  374. return nil
  375. }
  376. // AllNodePricing returns all the billing data fetched.
  377. func (aws *AWS) AllNodePricing() (interface{}, error) {
  378. return aws.Pricing, nil
  379. }
  380. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  381. key := k.Features()
  382. if aws.isPreemptible(key) {
  383. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  384. var spotcost string
  385. arr := strings.Split(spotInfo.Charge, " ")
  386. if len(arr) == 2 {
  387. spotcost = arr[0]
  388. } else {
  389. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  390. }
  391. return &Node{
  392. Cost: spotcost,
  393. VCPU: terms.VCpu,
  394. RAM: terms.Memory,
  395. GPU: terms.GPU,
  396. Storage: terms.Storage,
  397. BaseCPUPrice: aws.BaseCPUPrice,
  398. UsageType: usageType,
  399. }, nil
  400. }
  401. return &Node{
  402. VCPU: terms.VCpu,
  403. VCPUCost: aws.BaseSpotCPUPrice,
  404. RAM: terms.Memory,
  405. GPU: terms.GPU,
  406. RAMCost: aws.BaseSpotRAMPrice,
  407. Storage: terms.Storage,
  408. BaseCPUPrice: aws.BaseCPUPrice,
  409. UsageType: usageType,
  410. }, nil
  411. }
  412. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  413. return &Node{
  414. Cost: cost,
  415. VCPU: terms.VCpu,
  416. RAM: terms.Memory,
  417. GPU: terms.GPU,
  418. Storage: terms.Storage,
  419. BaseCPUPrice: aws.BaseCPUPrice,
  420. UsageType: usageType,
  421. }, nil
  422. }
  423. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  424. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  425. //return json.Marshal(aws.Pricing[key])
  426. key := k.Features()
  427. usageType := "ondemand"
  428. if aws.isPreemptible(key) {
  429. usageType = "preemptible"
  430. }
  431. terms, ok := aws.Pricing[key]
  432. if ok {
  433. return aws.createNode(terms, usageType, k)
  434. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  435. err := aws.DownloadPricingData()
  436. if err != nil {
  437. return nil, err
  438. }
  439. terms, termsOk := aws.Pricing[key]
  440. if !termsOk {
  441. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  442. }
  443. return aws.createNode(terms, usageType, k)
  444. } else { // Fall back to base pricing if we can't find the key.
  445. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  446. return &Node{
  447. Cost: aws.BaseCPUPrice,
  448. BaseCPUPrice: aws.BaseCPUPrice,
  449. UsageType: usageType,
  450. UsesBaseCPUPrice: true,
  451. }, nil
  452. }
  453. }
  454. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  455. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  456. defaultClusterName := "AWS Cluster #1"
  457. makeStructure := func(clusterName string) ([]byte, error) {
  458. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  459. m := make(map[string]string)
  460. m["name"] = clusterName
  461. m["provider"] = "AWS"
  462. return json.Marshal(m)
  463. }
  464. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  465. if len(maybeClusterId) != 0 {
  466. return makeStructure(maybeClusterId)
  467. }
  468. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  469. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  470. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  471. if err != nil {
  472. return nil, err
  473. }
  474. for _, n := range nodeList.Items {
  475. region := ""
  476. instanceId := ""
  477. providerId := n.Spec.ProviderID
  478. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  479. if matchNum == 1 {
  480. region = group
  481. } else if matchNum == 2 {
  482. instanceId = group
  483. }
  484. }
  485. if len(instanceId) == 0 {
  486. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  487. continue
  488. }
  489. c := &aws.Config{
  490. Region: aws.String(region),
  491. }
  492. s := session.Must(session.NewSession(c))
  493. ec2Svc := ec2.New(s)
  494. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  495. InstanceIds: []*string{
  496. aws.String(instanceId),
  497. },
  498. })
  499. if diErr != nil {
  500. // maybe log this?
  501. continue
  502. }
  503. if len(di.Reservations) != 1 {
  504. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  505. continue
  506. }
  507. res := di.Reservations[0]
  508. if len(res.Instances) != 1 {
  509. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  510. continue
  511. }
  512. inst := res.Instances[0]
  513. for _, tag := range inst.Tags {
  514. tagKey := *tag.Key
  515. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  516. if matchNum != 1 {
  517. continue
  518. }
  519. return makeStructure(group)
  520. }
  521. }
  522. }
  523. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  524. return makeStructure(defaultClusterName)
  525. }
  526. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  527. func (*AWS) AddServiceKey(formValues url.Values) error {
  528. keyID := formValues.Get("access_key_ID")
  529. key := formValues.Get("secret_access_key")
  530. m := make(map[string]string)
  531. m["access_key_ID"] = keyID
  532. m["secret_access_key"] = key
  533. result, err := json.Marshal(m)
  534. if err != nil {
  535. return err
  536. }
  537. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  538. }
  539. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  540. func (*AWS) GetDisks() ([]byte, error) {
  541. jsonFile, err := os.Open("/var/configs/key.json")
  542. if err == nil {
  543. byteValue, _ := ioutil.ReadAll(jsonFile)
  544. var result map[string]string
  545. err := json.Unmarshal([]byte(byteValue), &result)
  546. if err != nil {
  547. return nil, err
  548. }
  549. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  550. if err != nil {
  551. return nil, err
  552. }
  553. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  554. if err != nil {
  555. return nil, err
  556. }
  557. } else if os.IsNotExist(err) {
  558. klog.V(2).Infof("Using Default Credentials")
  559. } else {
  560. return nil, err
  561. }
  562. defer jsonFile.Close()
  563. clusterConfig, err := os.Open("/var/configs/cluster.json")
  564. if err != nil {
  565. return nil, err
  566. }
  567. defer clusterConfig.Close()
  568. b, err := ioutil.ReadAll(clusterConfig)
  569. if err != nil {
  570. return nil, err
  571. }
  572. var clusterConf map[string]string
  573. err = json.Unmarshal([]byte(b), &clusterConf)
  574. if err != nil {
  575. return nil, err
  576. }
  577. region := aws.String(clusterConf["region"])
  578. c := &aws.Config{
  579. Region: region,
  580. }
  581. s := session.Must(session.NewSession(c))
  582. ec2Svc := ec2.New(s)
  583. input := &ec2.DescribeVolumesInput{}
  584. volumeResult, err := ec2Svc.DescribeVolumes(input)
  585. if err != nil {
  586. if aerr, ok := err.(awserr.Error); ok {
  587. switch aerr.Code() {
  588. default:
  589. return nil, aerr
  590. }
  591. } else {
  592. return nil, err
  593. }
  594. }
  595. return json.Marshal(volumeResult)
  596. }
  597. func (*AWS) ExternalAllocations(start string, end string) ([]*OutOfClusterAllocation, error) {
  598. return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  599. }
  600. // QuerySQL can query a properly configured Athena database.
  601. // Used to fetch billing data.
  602. // Requires a json config in /var/configs with key region, output, and database.
  603. func (*AWS) QuerySQL(query string) ([]byte, error) {
  604. jsonFile, err := os.Open("/var/configs/key.json")
  605. if err == nil {
  606. byteValue, _ := ioutil.ReadAll(jsonFile)
  607. var result map[string]string
  608. json.Unmarshal([]byte(byteValue), &result)
  609. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  610. if err != nil {
  611. return nil, err
  612. }
  613. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  614. if err != nil {
  615. return nil, err
  616. }
  617. } else if os.IsNotExist(err) {
  618. klog.V(2).Infof("Using Default Credentials")
  619. } else {
  620. return nil, err
  621. }
  622. defer jsonFile.Close()
  623. athenaConfigs, err := os.Open("/var/configs/athena.json")
  624. if err != nil {
  625. return nil, err
  626. }
  627. defer athenaConfigs.Close()
  628. b, err := ioutil.ReadAll(athenaConfigs)
  629. if err != nil {
  630. return nil, err
  631. }
  632. var athenaConf map[string]string
  633. json.Unmarshal([]byte(b), &athenaConf)
  634. region := aws.String(athenaConf["region"])
  635. resultsBucket := athenaConf["output"]
  636. database := athenaConf["database"]
  637. c := &aws.Config{
  638. Region: region,
  639. }
  640. s := session.Must(session.NewSession(c))
  641. svc := athena.New(s)
  642. var e athena.StartQueryExecutionInput
  643. var r athena.ResultConfiguration
  644. r.SetOutputLocation(resultsBucket)
  645. e.SetResultConfiguration(&r)
  646. e.SetQueryString(query)
  647. var q athena.QueryExecutionContext
  648. q.SetDatabase(database)
  649. e.SetQueryExecutionContext(&q)
  650. res, err := svc.StartQueryExecution(&e)
  651. if err != nil {
  652. return nil, err
  653. }
  654. klog.V(2).Infof("StartQueryExecution result:")
  655. klog.V(2).Infof(res.GoString())
  656. var qri athena.GetQueryExecutionInput
  657. qri.SetQueryExecutionId(*res.QueryExecutionId)
  658. var qrop *athena.GetQueryExecutionOutput
  659. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  660. for {
  661. qrop, err = svc.GetQueryExecution(&qri)
  662. if err != nil {
  663. return nil, err
  664. }
  665. if *qrop.QueryExecution.Status.State != "RUNNING" {
  666. break
  667. }
  668. time.Sleep(duration)
  669. }
  670. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  671. var ip athena.GetQueryResultsInput
  672. ip.SetQueryExecutionId(*res.QueryExecutionId)
  673. op, err := svc.GetQueryResults(&ip)
  674. if err != nil {
  675. return nil, err
  676. }
  677. b, err := json.Marshal(op.ResultSet)
  678. if err != nil {
  679. return nil, err
  680. }
  681. return b, nil
  682. }
  683. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  684. }
  685. type spotInfo struct {
  686. Timestamp string `csv:"Timestamp"`
  687. UsageType string `csv:"UsageType"`
  688. Operation string `csv:"Operation"`
  689. InstanceID string `csv:"InstanceID"`
  690. MyBidID string `csv:"MyBidID"`
  691. MyMaxPrice string `csv:"MyMaxPrice"`
  692. MarketPrice string `csv:"MarketPrice"`
  693. Charge string `csv:"Charge"`
  694. Version string `csv:"Version"`
  695. }
  696. type fnames []*string
  697. func (f fnames) Len() int {
  698. return len(f)
  699. }
  700. func (f fnames) Swap(i, j int) {
  701. f[i], f[j] = f[j], f[i]
  702. }
  703. func (f fnames) Less(i, j int) bool {
  704. key1 := strings.Split(*f[i], ".")
  705. key2 := strings.Split(*f[j], ".")
  706. t1, err := time.Parse("2006-01-02-15", key1[1])
  707. if err != nil {
  708. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  709. return false
  710. }
  711. t2, err := time.Parse("2006-01-02-15", key2[1])
  712. if err != nil {
  713. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  714. return false
  715. }
  716. return t1.Before(t2)
  717. }
  718. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  719. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  720. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  721. if err != nil {
  722. return nil, err
  723. }
  724. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  725. if err != nil {
  726. return nil, err
  727. }
  728. }
  729. s3Prefix := projectID
  730. if len(prefix) != 0 {
  731. s3Prefix = prefix + "/" + s3Prefix
  732. }
  733. c := aws.NewConfig().WithRegion(region)
  734. s := session.Must(session.NewSession(c))
  735. s3Svc := s3.New(s)
  736. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  737. tNow := time.Now()
  738. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  739. ls := &s3.ListObjectsInput{
  740. Bucket: aws.String(bucket),
  741. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  742. }
  743. ls2 := &s3.ListObjectsInput{
  744. Bucket: aws.String(bucket),
  745. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  746. }
  747. lso, err := s3Svc.ListObjects(ls)
  748. if err != nil {
  749. return nil, err
  750. }
  751. lsoLen := len(lso.Contents)
  752. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  753. if lsoLen == 0 {
  754. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  755. }
  756. lso2, err := s3Svc.ListObjects(ls2)
  757. if err != nil {
  758. return nil, err
  759. }
  760. lso2Len := len(lso2.Contents)
  761. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  762. if lso2Len == 0 {
  763. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  764. }
  765. var keys []*string
  766. for _, obj := range lso.Contents {
  767. keys = append(keys, obj.Key)
  768. }
  769. for _, obj := range lso2.Contents {
  770. keys = append(keys, obj.Key)
  771. }
  772. spots := make(map[string]*spotInfo)
  773. for _, key := range keys {
  774. getObj := &s3.GetObjectInput{
  775. Bucket: aws.String(bucket),
  776. Key: key,
  777. }
  778. buf := aws.NewWriteAtBuffer([]byte{})
  779. _, err := downloader.Download(buf, getObj)
  780. if err != nil {
  781. return nil, err
  782. }
  783. r := bytes.NewReader(buf.Bytes())
  784. gr, err := gzip.NewReader(r)
  785. if err != nil {
  786. return nil, err
  787. }
  788. csvReader := csv.NewReader(gr)
  789. csvReader.Comma = '\t'
  790. header, err := csvutil.Header(spotInfo{}, "csv")
  791. if err != nil {
  792. return nil, err
  793. }
  794. dec, err := csvutil.NewDecoder(csvReader, header...)
  795. if err != nil {
  796. return nil, err
  797. }
  798. for {
  799. if err := dec.Decode(&spotInfo{}); err == io.EOF {
  800. break
  801. }
  802. st := dec.Record()
  803. if len(st) == 9 { // it's tab separated but not quite a tsv
  804. klog.V(3).Infof("Found spot info %+v", st)
  805. spot := &spotInfo{
  806. Timestamp: st[0],
  807. UsageType: st[1],
  808. Operation: st[2],
  809. InstanceID: st[3],
  810. MyBidID: st[4],
  811. MyMaxPrice: st[5],
  812. MarketPrice: st[6],
  813. Charge: st[7],
  814. Version: st[8],
  815. }
  816. if spot.Version != supportedSpotFeedVersion {
  817. klog.V(1).Infof("Possibly unsupported spot data feed version " + spot.Version)
  818. }
  819. spots[spot.InstanceID] = spot
  820. }
  821. }
  822. gr.Close()
  823. }
  824. return spots, nil
  825. }