awsprovider.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strings"
  15. "time"
  16. "k8s.io/klog"
  17. "github.com/aws/aws-sdk-go/aws"
  18. "github.com/aws/aws-sdk-go/aws/awserr"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/athena"
  21. "github.com/aws/aws-sdk-go/service/ec2"
  22. "github.com/aws/aws-sdk-go/service/s3"
  23. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  24. "github.com/jszwec/csvutil"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/client-go/kubernetes"
  28. )
  29. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  30. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  31. const supportedSpotFeedVersion = "1"
  32. // AWS represents an Amazon Provider
  33. type AWS struct {
  34. Pricing map[string]*AWSProductTerms
  35. SpotPricingByInstanceID map[string]*spotInfo
  36. ValidPricingKeys map[string]bool
  37. Clientset *kubernetes.Clientset
  38. BaseCPUPrice string
  39. BaseSpotCPUPrice string
  40. BaseSpotRAMPrice string
  41. SpotLabelName string
  42. SpotLabelValue string
  43. ServiceKeyName string
  44. ServiceKeySecret string
  45. SpotDataRegion string
  46. SpotDataBucket string
  47. SpotDataPrefix string
  48. ProjectID string
  49. }
  50. // AWSPricing maps a k8s node to an AWS Pricing "product"
  51. type AWSPricing struct {
  52. Products map[string]*AWSProduct `json:"products"`
  53. Terms AWSPricingTerms `json:"terms"`
  54. }
  55. // AWSProduct represents a purchased SKU
  56. type AWSProduct struct {
  57. Sku string `json:"sku"`
  58. Attributes AWSProductAttributes `json:"attributes"`
  59. }
  60. // AWSProductAttributes represents metadata about the product used to map to a node.
  61. type AWSProductAttributes struct {
  62. Location string `json:"location"`
  63. InstanceType string `json:"instanceType"`
  64. Memory string `json:"memory"`
  65. Storage string `json:"storage"`
  66. VCpu string `json:"vcpu"`
  67. UsageType string `json:"usagetype"`
  68. OperatingSystem string `json:"operatingSystem"`
  69. PreInstalledSw string `json:"preInstalledSw"`
  70. }
  71. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  72. type AWSPricingTerms struct {
  73. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  74. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  75. }
  76. // AWSOfferTerm is a sku extension used to pay for the node.
  77. type AWSOfferTerm struct {
  78. Sku string `json:"sku"`
  79. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  80. }
  81. // AWSRateCode encodes data about the price of a product
  82. type AWSRateCode struct {
  83. Unit string `json:"unit"`
  84. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  85. }
  86. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  87. type AWSCurrencyCode struct {
  88. USD string `json:"USD"`
  89. }
  90. // AWSProductTerms represents the full terms of the product
  91. type AWSProductTerms struct {
  92. Sku string `json:"sku"`
  93. OnDemand *AWSOfferTerm `json:"OnDemand"`
  94. Reserved *AWSOfferTerm `json:"Reserved"`
  95. Memory string `json:"memory"`
  96. Storage string `json:"storage"`
  97. VCpu string `json:"vcpu"`
  98. }
  99. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  100. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  101. // OnDemandRateCode is appended to an node sku
  102. const OnDemandRateCode = ".JRTCKXETXF"
  103. // ReservedRateCode is appended to a node sku
  104. const ReservedRateCode = ".38NPMPTW36"
  105. // HourlyRateCode is appended to a node sku
  106. const HourlyRateCode = ".6YS6EN2CT7"
  107. // KubeAttrConversion maps the k8s labels for region to an aws region
  108. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  109. locationToRegion := map[string]string{
  110. "US East (Ohio)": "us-east-2",
  111. "US East (N. Virginia)": "us-east-1",
  112. "US West (N. California)": "us-west-1",
  113. "US West (Oregon)": "us-west-2",
  114. "Asia Pacific (Mumbai)": "ap-south-1",
  115. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  116. "Asia Pacific (Seoul)": "ap-northeast-2",
  117. "Asia Pacific (Singapore)": "ap-southeast-1",
  118. "Asia Pacific (Sydney)": "ap-southeast-2",
  119. "Asia Pacific (Tokyo)": "ap-northeast-1",
  120. "Canada (Central)": "ca-central-1",
  121. "China (Beijing)": "cn-north-1",
  122. "China (Ningxia)": "cn-northwest-1",
  123. "EU (Frankfurt)": "eu-central-1",
  124. "EU (Ireland)": "eu-west-1",
  125. "EU (London)": "eu-west-2",
  126. "EU (Paris)": "eu-west-3",
  127. "EU (Stockholm)": "eu-north-1",
  128. "South America (São Paulo)": "sa-east-1",
  129. "AWS GovCloud (US-East)": "us-gov-east-1",
  130. "AWS GovCloud (US)": "us-gov-west-1",
  131. }
  132. operatingSystem = strings.ToLower(operatingSystem)
  133. region := locationToRegion[location]
  134. return region + "," + instanceType + "," + operatingSystem
  135. }
  136. type awsKey struct {
  137. SpotLabelName string
  138. SpotLabelValue string
  139. Labels map[string]string
  140. ProviderID string
  141. }
  142. func (k *awsKey) ID() string {
  143. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  144. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  145. if matchNum == 2 {
  146. return group
  147. }
  148. }
  149. klog.V(3).Info("Could not find instance ID in " + k.ProviderID)
  150. return ""
  151. }
  152. func (k *awsKey) Features() string {
  153. instanceType := k.Labels[v1.LabelInstanceType]
  154. var operatingSystem string
  155. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  156. if !ok {
  157. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  158. }
  159. region := k.Labels[v1.LabelZoneRegion]
  160. key := region + "," + instanceType + "," + operatingSystem
  161. usageType := "preemptible"
  162. spotKey := key + "," + usageType
  163. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  164. return spotKey
  165. }
  166. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  167. return spotKey
  168. }
  169. return region + "," + instanceType + "," + operatingSystem
  170. }
  171. // GetKey maps node labels to information needed to retrieve pricing data
  172. func (aws *AWS) GetKey(labels map[string]string) Key {
  173. return &awsKey{
  174. SpotLabelName: aws.SpotLabelName,
  175. SpotLabelValue: aws.SpotLabelValue,
  176. Labels: labels,
  177. ProviderID: labels["providerID"],
  178. }
  179. }
  180. func (aws *AWS) isPreemptible(key string) bool {
  181. s := strings.Split(key, ",")
  182. if len(s) == 4 && s[3] == "preemptible" {
  183. return true
  184. }
  185. return false
  186. }
  187. // DownloadPricingData fetches data from the AWS Pricing API
  188. func (aws *AWS) DownloadPricingData() error {
  189. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  190. if err != nil {
  191. return err
  192. }
  193. inputkeys := make(map[string]bool)
  194. for _, n := range nodeList.Items {
  195. labels := n.GetObjectMeta().GetLabels()
  196. key := aws.GetKey(labels)
  197. inputkeys[key.Features()] = true
  198. }
  199. aws.Pricing = make(map[string]*AWSProductTerms)
  200. aws.ValidPricingKeys = make(map[string]bool)
  201. skusToKeys := make(map[string]string)
  202. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  203. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  204. resp, err := http.Get(pricingURL)
  205. if err != nil {
  206. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  207. return err
  208. }
  209. dec := json.NewDecoder(resp.Body)
  210. for {
  211. t, err := dec.Token()
  212. if err == io.EOF {
  213. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  214. break
  215. }
  216. if t == "products" {
  217. dec.Token() //{
  218. for dec.More() {
  219. dec.Token() // the sku token
  220. product := &AWSProduct{}
  221. err := dec.Decode(&product)
  222. if err != nil {
  223. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  224. break
  225. }
  226. if product.Attributes.PreInstalledSw == "NA" &&
  227. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  228. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  229. spotKey := key + ",preemptible"
  230. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  231. productTerms := &AWSProductTerms{
  232. Sku: product.Sku,
  233. Memory: product.Attributes.Memory,
  234. Storage: product.Attributes.Storage,
  235. VCpu: product.Attributes.VCpu,
  236. }
  237. aws.Pricing[key] = productTerms
  238. aws.Pricing[spotKey] = productTerms
  239. skusToKeys[product.Sku] = key
  240. }
  241. aws.ValidPricingKeys[key] = true
  242. aws.ValidPricingKeys[spotKey] = true
  243. }
  244. }
  245. }
  246. if t == "terms" {
  247. dec.Token()
  248. termType, _ := dec.Token()
  249. if termType == "OnDemand" {
  250. dec.Token()
  251. for dec.More() {
  252. sku, _ := dec.Token()
  253. dec.Token()
  254. skuOnDemand, _ := dec.Token()
  255. offerTerm := &AWSOfferTerm{}
  256. err := dec.Decode(&offerTerm)
  257. if err != nil {
  258. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  259. }
  260. if sku.(string)+OnDemandRateCode == skuOnDemand {
  261. key, ok := skusToKeys[sku.(string)]
  262. spotKey := key + ",preemptible"
  263. if ok {
  264. aws.Pricing[key].OnDemand = offerTerm
  265. aws.Pricing[spotKey].OnDemand = offerTerm
  266. }
  267. }
  268. dec.Token()
  269. }
  270. dec.Token()
  271. }
  272. }
  273. }
  274. if err != nil {
  275. return err
  276. }
  277. c, err := GetDefaultPricingData("aws.json")
  278. if err != nil {
  279. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  280. }
  281. aws.BaseCPUPrice = c.CPU
  282. aws.BaseSpotCPUPrice = c.SpotCPU
  283. aws.BaseSpotRAMPrice = c.SpotRAM
  284. aws.SpotLabelName = c.SpotLabel
  285. aws.SpotLabelValue = c.SpotLabelValue
  286. aws.SpotDataBucket = c.SpotDataBucket
  287. aws.SpotDataPrefix = c.SpotDataPrefix
  288. aws.ProjectID = c.ProjectID
  289. aws.SpotDataRegion = c.SpotDataRegion
  290. aws.ServiceKeyName = c.ServiceKeyName
  291. aws.ServiceKeySecret = c.ServiceKeySecret
  292. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  293. if err != nil {
  294. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  295. } else {
  296. aws.SpotPricingByInstanceID = sp
  297. }
  298. return nil
  299. }
  300. // AllNodePricing returns all the billing data fetched.
  301. func (aws *AWS) AllNodePricing() (interface{}, error) {
  302. return aws.Pricing, nil
  303. }
  304. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  305. key := k.Features()
  306. if aws.isPreemptible(key) {
  307. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  308. var spotcost string
  309. arr := strings.Split(spotInfo.Charge, " ")
  310. if len(arr) == 2 {
  311. spotcost = arr[0]
  312. } else {
  313. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  314. }
  315. return &Node{
  316. Cost: spotcost,
  317. VCPU: terms.VCpu,
  318. RAM: terms.Memory,
  319. Storage: terms.Storage,
  320. BaseCPUPrice: aws.BaseCPUPrice,
  321. UsageType: usageType,
  322. }, nil
  323. }
  324. return &Node{
  325. VCPU: terms.VCpu,
  326. VCPUCost: aws.BaseSpotCPUPrice,
  327. RAM: terms.Memory,
  328. RAMCost: aws.BaseSpotRAMPrice,
  329. Storage: terms.Storage,
  330. BaseCPUPrice: aws.BaseCPUPrice,
  331. UsageType: usageType,
  332. }, nil
  333. }
  334. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  335. return &Node{
  336. Cost: cost,
  337. VCPU: terms.VCpu,
  338. RAM: terms.Memory,
  339. Storage: terms.Storage,
  340. BaseCPUPrice: aws.BaseCPUPrice,
  341. UsageType: usageType,
  342. }, nil
  343. }
  344. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  345. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  346. //return json.Marshal(aws.Pricing[key])
  347. key := k.Features()
  348. usageType := "ondemand"
  349. if aws.isPreemptible(key) {
  350. usageType = "preemptible"
  351. }
  352. terms, ok := aws.Pricing[key]
  353. if ok {
  354. return aws.createNode(terms, usageType, k)
  355. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  356. err := aws.DownloadPricingData()
  357. if err != nil {
  358. return nil, err
  359. }
  360. terms, termsOk := aws.Pricing[key]
  361. if !termsOk {
  362. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  363. }
  364. return aws.createNode(terms, usageType, k)
  365. } else {
  366. return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
  367. }
  368. }
  369. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  370. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  371. defaultClusterName := "AWS Cluster #1"
  372. makeStructure := func(clusterName string) ([]byte, error) {
  373. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  374. m := make(map[string]string)
  375. m["name"] = clusterName
  376. m["provider"] = "AWS"
  377. return json.Marshal(m)
  378. }
  379. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  380. if len(maybeClusterId) != 0 {
  381. return makeStructure(maybeClusterId)
  382. }
  383. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  384. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  385. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  386. if err != nil {
  387. return nil, err
  388. }
  389. for _, n := range nodeList.Items {
  390. region := ""
  391. instanceId := ""
  392. providerId := n.Spec.ProviderID
  393. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  394. if matchNum == 1 {
  395. region = group
  396. } else if matchNum == 2 {
  397. instanceId = group
  398. }
  399. }
  400. if len(instanceId) == 0 {
  401. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  402. continue
  403. }
  404. c := &aws.Config{
  405. Region: aws.String(region),
  406. }
  407. s := session.Must(session.NewSession(c))
  408. ec2Svc := ec2.New(s)
  409. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  410. InstanceIds: []*string{
  411. aws.String(instanceId),
  412. },
  413. })
  414. if diErr != nil {
  415. // maybe log this?
  416. continue
  417. }
  418. if len(di.Reservations) != 1 {
  419. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  420. continue
  421. }
  422. res := di.Reservations[0]
  423. if len(res.Instances) != 1 {
  424. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  425. continue
  426. }
  427. inst := res.Instances[0]
  428. for _, tag := range inst.Tags {
  429. tagKey := *tag.Key
  430. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  431. if matchNum != 1 {
  432. continue
  433. }
  434. return makeStructure(group)
  435. }
  436. }
  437. }
  438. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  439. return makeStructure(defaultClusterName)
  440. }
  441. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  442. func (*AWS) AddServiceKey(formValues url.Values) error {
  443. keyID := formValues.Get("access_key_ID")
  444. key := formValues.Get("secret_access_key")
  445. m := make(map[string]string)
  446. m["access_key_ID"] = keyID
  447. m["secret_access_key"] = key
  448. result, err := json.Marshal(m)
  449. if err != nil {
  450. return err
  451. }
  452. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  453. }
  454. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  455. func (*AWS) GetDisks() ([]byte, error) {
  456. jsonFile, err := os.Open("/var/configs/key.json")
  457. if err == nil {
  458. byteValue, _ := ioutil.ReadAll(jsonFile)
  459. var result map[string]string
  460. err := json.Unmarshal([]byte(byteValue), &result)
  461. if err != nil {
  462. return nil, err
  463. }
  464. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  465. if err != nil {
  466. return nil, err
  467. }
  468. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  469. if err != nil {
  470. return nil, err
  471. }
  472. } else if os.IsNotExist(err) {
  473. klog.V(2).Infof("Using Default Credentials")
  474. } else {
  475. return nil, err
  476. }
  477. defer jsonFile.Close()
  478. clusterConfig, err := os.Open("/var/configs/cluster.json")
  479. if err != nil {
  480. return nil, err
  481. }
  482. defer clusterConfig.Close()
  483. b, err := ioutil.ReadAll(clusterConfig)
  484. if err != nil {
  485. return nil, err
  486. }
  487. var clusterConf map[string]string
  488. err = json.Unmarshal([]byte(b), &clusterConf)
  489. if err != nil {
  490. return nil, err
  491. }
  492. region := aws.String(clusterConf["region"])
  493. c := &aws.Config{
  494. Region: region,
  495. }
  496. s := session.Must(session.NewSession(c))
  497. ec2Svc := ec2.New(s)
  498. input := &ec2.DescribeVolumesInput{}
  499. volumeResult, err := ec2Svc.DescribeVolumes(input)
  500. if err != nil {
  501. if aerr, ok := err.(awserr.Error); ok {
  502. switch aerr.Code() {
  503. default:
  504. return nil, aerr
  505. }
  506. } else {
  507. return nil, err
  508. }
  509. }
  510. return json.Marshal(volumeResult)
  511. }
  512. // QuerySQL can query a properly configured Athena database.
  513. // Used to fetch billing data.
  514. // Requires a json config in /var/configs with key region, output, and database.
  515. func (*AWS) QuerySQL(query string) ([]byte, error) {
  516. jsonFile, err := os.Open("/var/configs/key.json")
  517. if err == nil {
  518. byteValue, _ := ioutil.ReadAll(jsonFile)
  519. var result map[string]string
  520. json.Unmarshal([]byte(byteValue), &result)
  521. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  522. if err != nil {
  523. return nil, err
  524. }
  525. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  526. if err != nil {
  527. return nil, err
  528. }
  529. } else if os.IsNotExist(err) {
  530. klog.V(2).Infof("Using Default Credentials")
  531. } else {
  532. return nil, err
  533. }
  534. defer jsonFile.Close()
  535. athenaConfigs, err := os.Open("/var/configs/athena.json")
  536. if err != nil {
  537. return nil, err
  538. }
  539. defer athenaConfigs.Close()
  540. b, err := ioutil.ReadAll(athenaConfigs)
  541. if err != nil {
  542. return nil, err
  543. }
  544. var athenaConf map[string]string
  545. json.Unmarshal([]byte(b), &athenaConf)
  546. region := aws.String(athenaConf["region"])
  547. resultsBucket := athenaConf["output"]
  548. database := athenaConf["database"]
  549. c := &aws.Config{
  550. Region: region,
  551. }
  552. s := session.Must(session.NewSession(c))
  553. svc := athena.New(s)
  554. var e athena.StartQueryExecutionInput
  555. var r athena.ResultConfiguration
  556. r.SetOutputLocation(resultsBucket)
  557. e.SetResultConfiguration(&r)
  558. e.SetQueryString(query)
  559. var q athena.QueryExecutionContext
  560. q.SetDatabase(database)
  561. e.SetQueryExecutionContext(&q)
  562. res, err := svc.StartQueryExecution(&e)
  563. if err != nil {
  564. return nil, err
  565. }
  566. klog.V(2).Infof("StartQueryExecution result:")
  567. klog.V(2).Infof(res.GoString())
  568. var qri athena.GetQueryExecutionInput
  569. qri.SetQueryExecutionId(*res.QueryExecutionId)
  570. var qrop *athena.GetQueryExecutionOutput
  571. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  572. for {
  573. qrop, err = svc.GetQueryExecution(&qri)
  574. if err != nil {
  575. return nil, err
  576. }
  577. if *qrop.QueryExecution.Status.State != "RUNNING" {
  578. break
  579. }
  580. time.Sleep(duration)
  581. }
  582. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  583. var ip athena.GetQueryResultsInput
  584. ip.SetQueryExecutionId(*res.QueryExecutionId)
  585. op, err := svc.GetQueryResults(&ip)
  586. if err != nil {
  587. return nil, err
  588. }
  589. b, err := json.Marshal(op.ResultSet)
  590. if err != nil {
  591. return nil, err
  592. }
  593. return b, nil
  594. }
  595. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  596. }
  597. type spotInfo struct {
  598. Timestamp string `csv:"Timestamp"`
  599. UsageType string `csv:"UsageType"`
  600. Operation string `csv:"Operation"`
  601. InstanceID string `csv:"InstanceID"`
  602. MyBidID string `csv:"MyBidID"`
  603. MyMaxPrice string `csv:"MyMaxPrice"`
  604. MarketPrice string `csv:"MarketPrice"`
  605. Charge string `csv:"Charge"`
  606. Version string `csv:"Version"`
  607. }
  608. type fnames []*string
  609. func (f fnames) Len() int {
  610. return len(f)
  611. }
  612. func (f fnames) Swap(i, j int) {
  613. f[i], f[j] = f[j], f[i]
  614. }
  615. func (f fnames) Less(i, j int) bool {
  616. key1 := strings.Split(*f[i], ".")
  617. key2 := strings.Split(*f[j], ".")
  618. t1, err := time.Parse("2006-01-02-15", key1[1])
  619. if err != nil {
  620. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  621. return false
  622. }
  623. t2, err := time.Parse("2006-01-02-15", key2[1])
  624. if err != nil {
  625. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  626. return false
  627. }
  628. return t1.Before(t2)
  629. }
  630. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  631. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  632. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  633. if err != nil {
  634. return nil, err
  635. }
  636. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  637. if err != nil {
  638. return nil, err
  639. }
  640. }
  641. c := aws.NewConfig().WithRegion(region)
  642. s := session.Must(session.NewSession(c))
  643. s3Svc := s3.New(s)
  644. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  645. tNow := time.Now()
  646. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  647. ls := &s3.ListObjectsInput{
  648. Bucket: aws.String(bucket),
  649. Prefix: aws.String(prefix + "/" + projectID + "." + tOneDayAgo.Format("2006-01-02")),
  650. }
  651. ls2 := &s3.ListObjectsInput{
  652. Bucket: aws.String(bucket),
  653. Prefix: aws.String(prefix + "/" + projectID + "." + tNow.Format("2006-01-02")),
  654. }
  655. lso, err := s3Svc.ListObjects(ls)
  656. if err != nil {
  657. return nil, err
  658. }
  659. klog.V(2).Infof("Found %s files from yesterday", string(len(lso.Contents)))
  660. lso2, err := s3Svc.ListObjects(ls2)
  661. if err != nil {
  662. return nil, err
  663. }
  664. klog.V(2).Infof("Found %s files from today", string(len(lso.Contents)))
  665. var keys []*string
  666. for _, obj := range lso.Contents {
  667. keys = append(keys, obj.Key)
  668. }
  669. for _, obj := range lso2.Contents {
  670. keys = append(keys, obj.Key)
  671. }
  672. spots := make(map[string]*spotInfo)
  673. for _, key := range keys {
  674. getObj := &s3.GetObjectInput{
  675. Bucket: aws.String(bucket),
  676. Key: key,
  677. }
  678. buf := aws.NewWriteAtBuffer([]byte{})
  679. _, err := downloader.Download(buf, getObj)
  680. if err != nil {
  681. return nil, err
  682. }
  683. r := bytes.NewReader(buf.Bytes())
  684. gr, err := gzip.NewReader(r)
  685. if err != nil {
  686. return nil, err
  687. }
  688. csvReader := csv.NewReader(gr)
  689. csvReader.Comma = '\t'
  690. header, err := csvutil.Header(spotInfo{}, "csv")
  691. if err != nil {
  692. return nil, err
  693. }
  694. dec, err := csvutil.NewDecoder(csvReader, header...)
  695. if err != nil {
  696. return nil, err
  697. }
  698. for {
  699. if err := dec.Decode(&spotInfo{}); err == io.EOF {
  700. break
  701. }
  702. st := dec.Record()
  703. if len(st) == 9 { // it's tab separated but not quite a tsv
  704. klog.V(3).Infof("Found spot info %+v", st)
  705. spot := &spotInfo{
  706. Timestamp: st[0],
  707. UsageType: st[1],
  708. Operation: st[2],
  709. InstanceID: st[3],
  710. MyBidID: st[4],
  711. MyMaxPrice: st[5],
  712. MarketPrice: st[6],
  713. Charge: st[7],
  714. Version: st[8],
  715. }
  716. if spot.Version != supportedSpotFeedVersion {
  717. klog.V(1).Infof("Possibly unsupported spot data feed version " + spot.Version)
  718. }
  719. spots[spot.InstanceID] = spot
  720. }
  721. }
  722. gr.Close()
  723. }
  724. return spots, nil
  725. }