awsprovider.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strings"
  15. "time"
  16. "k8s.io/klog"
  17. "github.com/aws/aws-sdk-go/aws"
  18. "github.com/aws/aws-sdk-go/aws/awserr"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/athena"
  21. "github.com/aws/aws-sdk-go/service/ec2"
  22. "github.com/aws/aws-sdk-go/service/s3"
  23. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  24. "github.com/jszwec/csvutil"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/client-go/kubernetes"
  28. )
  29. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  30. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  31. const supportedSpotFeedVersion = "1"
  32. // AWS represents an Amazon Provider
  33. type AWS struct {
  34. Pricing map[string]*AWSProductTerms
  35. SpotPricingByInstanceID map[string]*spotInfo
  36. ValidPricingKeys map[string]bool
  37. Clientset *kubernetes.Clientset
  38. BaseCPUPrice string
  39. BaseSpotCPUPrice string
  40. BaseSpotRAMPrice string
  41. SpotLabelName string
  42. SpotLabelValue string
  43. ServiceKeyName string
  44. ServiceKeySecret string
  45. SpotDataRegion string
  46. SpotDataBucket string
  47. SpotDataPrefix string
  48. ProjectID string
  49. }
  50. // AWSPricing maps a k8s node to an AWS Pricing "product"
  51. type AWSPricing struct {
  52. Products map[string]*AWSProduct `json:"products"`
  53. Terms AWSPricingTerms `json:"terms"`
  54. }
  55. // AWSProduct represents a purchased SKU
  56. type AWSProduct struct {
  57. Sku string `json:"sku"`
  58. Attributes AWSProductAttributes `json:"attributes"`
  59. }
  60. // AWSProductAttributes represents metadata about the product used to map to a node.
  61. type AWSProductAttributes struct {
  62. Location string `json:"location"`
  63. InstanceType string `json:"instanceType"`
  64. Memory string `json:"memory"`
  65. Storage string `json:"storage"`
  66. VCpu string `json:"vcpu"`
  67. UsageType string `json:"usagetype"`
  68. OperatingSystem string `json:"operatingSystem"`
  69. PreInstalledSw string `json:"preInstalledSw"`
  70. }
  71. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  72. type AWSPricingTerms struct {
  73. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  74. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  75. }
  76. // AWSOfferTerm is a sku extension used to pay for the node.
  77. type AWSOfferTerm struct {
  78. Sku string `json:"sku"`
  79. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  80. }
  81. // AWSRateCode encodes data about the price of a product
  82. type AWSRateCode struct {
  83. Unit string `json:"unit"`
  84. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  85. }
  86. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  87. type AWSCurrencyCode struct {
  88. USD string `json:"USD"`
  89. }
  90. // AWSProductTerms represents the full terms of the product
  91. type AWSProductTerms struct {
  92. Sku string `json:"sku"`
  93. OnDemand *AWSOfferTerm `json:"OnDemand"`
  94. Reserved *AWSOfferTerm `json:"Reserved"`
  95. Memory string `json:"memory"`
  96. Storage string `json:"storage"`
  97. VCpu string `json:"vcpu"`
  98. }
  99. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  100. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  101. // OnDemandRateCode is appended to an node sku
  102. const OnDemandRateCode = ".JRTCKXETXF"
  103. // ReservedRateCode is appended to a node sku
  104. const ReservedRateCode = ".38NPMPTW36"
  105. // HourlyRateCode is appended to a node sku
  106. const HourlyRateCode = ".6YS6EN2CT7"
  107. // KubeAttrConversion maps the k8s labels for region to an aws region
  108. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  109. locationToRegion := map[string]string{
  110. "US East (Ohio)": "us-east-2",
  111. "US East (N. Virginia)": "us-east-1",
  112. "US West (N. California)": "us-west-1",
  113. "US West (Oregon)": "us-west-2",
  114. "Asia Pacific (Mumbai)": "ap-south-1",
  115. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  116. "Asia Pacific (Seoul)": "ap-northeast-2",
  117. "Asia Pacific (Singapore)": "ap-southeast-1",
  118. "Asia Pacific (Sydney)": "ap-southeast-2",
  119. "Asia Pacific (Tokyo)": "ap-northeast-1",
  120. "Canada (Central)": "ca-central-1",
  121. "China (Beijing)": "cn-north-1",
  122. "China (Ningxia)": "cn-northwest-1",
  123. "EU (Frankfurt)": "eu-central-1",
  124. "EU (Ireland)": "eu-west-1",
  125. "EU (London)": "eu-west-2",
  126. "EU (Paris)": "eu-west-3",
  127. "EU (Stockholm)": "eu-north-1",
  128. "South America (São Paulo)": "sa-east-1",
  129. "AWS GovCloud (US-East)": "us-gov-east-1",
  130. "AWS GovCloud (US)": "us-gov-west-1",
  131. }
  132. operatingSystem = strings.ToLower(operatingSystem)
  133. region := locationToRegion[location]
  134. return region + "," + instanceType + "," + operatingSystem
  135. }
  136. type awsKey struct {
  137. SpotLabelName string
  138. SpotLabelValue string
  139. Labels map[string]string
  140. ProviderID string
  141. }
  142. func (k *awsKey) ID() string {
  143. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  144. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  145. if matchNum == 2 {
  146. return group
  147. }
  148. }
  149. klog.V(3).Info("Could not find instance ID in \"%s\"", k.ProviderID)
  150. return ""
  151. }
  152. func (k *awsKey) Features() string {
  153. instanceType := k.Labels[v1.LabelInstanceType]
  154. var operatingSystem string
  155. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  156. if !ok {
  157. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  158. }
  159. region := k.Labels[v1.LabelZoneRegion]
  160. key := region + "," + instanceType + "," + operatingSystem
  161. usageType := "preemptible"
  162. spotKey := key + "," + usageType
  163. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  164. return spotKey
  165. }
  166. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  167. return spotKey
  168. }
  169. return key
  170. }
  171. // GetKey maps node labels to information needed to retrieve pricing data
  172. func (aws *AWS) GetKey(labels map[string]string) Key {
  173. return &awsKey{
  174. SpotLabelName: aws.SpotLabelName,
  175. SpotLabelValue: aws.SpotLabelValue,
  176. Labels: labels,
  177. ProviderID: labels["providerID"],
  178. }
  179. }
  180. func (aws *AWS) isPreemptible(key string) bool {
  181. s := strings.Split(key, ",")
  182. if len(s) == 4 && s[3] == "preemptible" {
  183. return true
  184. }
  185. return false
  186. }
  187. // DownloadPricingData fetches data from the AWS Pricing API
  188. func (aws *AWS) DownloadPricingData() error {
  189. c, err := GetDefaultPricingData("aws.json")
  190. if err != nil {
  191. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  192. }
  193. aws.BaseCPUPrice = c.CPU
  194. aws.BaseSpotCPUPrice = c.SpotCPU
  195. aws.BaseSpotRAMPrice = c.SpotRAM
  196. aws.SpotLabelName = c.SpotLabel
  197. aws.SpotLabelValue = c.SpotLabelValue
  198. aws.SpotDataBucket = c.SpotDataBucket
  199. aws.SpotDataPrefix = c.SpotDataPrefix
  200. aws.ProjectID = c.ProjectID
  201. aws.SpotDataRegion = c.SpotDataRegion
  202. aws.ServiceKeyName = c.ServiceKeyName
  203. aws.ServiceKeySecret = c.ServiceKeySecret
  204. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  205. if err != nil {
  206. return err
  207. }
  208. inputkeys := make(map[string]bool)
  209. for _, n := range nodeList.Items {
  210. labels := n.GetObjectMeta().GetLabels()
  211. key := aws.GetKey(labels)
  212. inputkeys[key.Features()] = true
  213. }
  214. aws.Pricing = make(map[string]*AWSProductTerms)
  215. aws.ValidPricingKeys = make(map[string]bool)
  216. skusToKeys := make(map[string]string)
  217. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  218. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  219. resp, err := http.Get(pricingURL)
  220. if err != nil {
  221. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  222. return err
  223. }
  224. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  225. dec := json.NewDecoder(resp.Body)
  226. for {
  227. t, err := dec.Token()
  228. if err == io.EOF {
  229. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  230. break
  231. }
  232. if t == "products" {
  233. dec.Token() //{
  234. for dec.More() {
  235. dec.Token() // the sku token
  236. product := &AWSProduct{}
  237. err := dec.Decode(&product)
  238. if err != nil {
  239. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  240. break
  241. }
  242. if product.Attributes.PreInstalledSw == "NA" &&
  243. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  244. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  245. spotKey := key + ",preemptible"
  246. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  247. productTerms := &AWSProductTerms{
  248. Sku: product.Sku,
  249. Memory: product.Attributes.Memory,
  250. Storage: product.Attributes.Storage,
  251. VCpu: product.Attributes.VCpu,
  252. }
  253. aws.Pricing[key] = productTerms
  254. aws.Pricing[spotKey] = productTerms
  255. skusToKeys[product.Sku] = key
  256. }
  257. aws.ValidPricingKeys[key] = true
  258. aws.ValidPricingKeys[spotKey] = true
  259. }
  260. }
  261. }
  262. if t == "terms" {
  263. dec.Token()
  264. termType, _ := dec.Token()
  265. if termType == "OnDemand" {
  266. dec.Token()
  267. for dec.More() {
  268. sku, _ := dec.Token()
  269. dec.Token()
  270. skuOnDemand, _ := dec.Token()
  271. offerTerm := &AWSOfferTerm{}
  272. err := dec.Decode(&offerTerm)
  273. if err != nil {
  274. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  275. }
  276. if sku.(string)+OnDemandRateCode == skuOnDemand {
  277. key, ok := skusToKeys[sku.(string)]
  278. spotKey := key + ",preemptible"
  279. if ok {
  280. aws.Pricing[key].OnDemand = offerTerm
  281. aws.Pricing[spotKey].OnDemand = offerTerm
  282. }
  283. }
  284. dec.Token()
  285. }
  286. dec.Token()
  287. }
  288. }
  289. }
  290. if err != nil {
  291. return err
  292. }
  293. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  294. if err != nil {
  295. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  296. } else {
  297. aws.SpotPricingByInstanceID = sp
  298. }
  299. return nil
  300. }
  301. // AllNodePricing returns all the billing data fetched.
  302. func (aws *AWS) AllNodePricing() (interface{}, error) {
  303. return aws.Pricing, nil
  304. }
  305. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  306. key := k.Features()
  307. if aws.isPreemptible(key) {
  308. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  309. var spotcost string
  310. arr := strings.Split(spotInfo.Charge, " ")
  311. if len(arr) == 2 {
  312. spotcost = arr[0]
  313. } else {
  314. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  315. }
  316. return &Node{
  317. Cost: spotcost,
  318. VCPU: terms.VCpu,
  319. RAM: terms.Memory,
  320. Storage: terms.Storage,
  321. BaseCPUPrice: aws.BaseCPUPrice,
  322. UsageType: usageType,
  323. }, nil
  324. }
  325. return &Node{
  326. VCPU: terms.VCpu,
  327. VCPUCost: aws.BaseSpotCPUPrice,
  328. RAM: terms.Memory,
  329. RAMCost: aws.BaseSpotRAMPrice,
  330. Storage: terms.Storage,
  331. BaseCPUPrice: aws.BaseCPUPrice,
  332. UsageType: usageType,
  333. }, nil
  334. }
  335. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  336. return &Node{
  337. Cost: cost,
  338. VCPU: terms.VCpu,
  339. RAM: terms.Memory,
  340. Storage: terms.Storage,
  341. BaseCPUPrice: aws.BaseCPUPrice,
  342. UsageType: usageType,
  343. }, nil
  344. }
  345. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  346. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  347. //return json.Marshal(aws.Pricing[key])
  348. key := k.Features()
  349. usageType := "ondemand"
  350. if aws.isPreemptible(key) {
  351. usageType = "preemptible"
  352. }
  353. terms, ok := aws.Pricing[key]
  354. if ok {
  355. return aws.createNode(terms, usageType, k)
  356. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  357. err := aws.DownloadPricingData()
  358. if err != nil {
  359. return nil, err
  360. }
  361. terms, termsOk := aws.Pricing[key]
  362. if !termsOk {
  363. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  364. }
  365. return aws.createNode(terms, usageType, k)
  366. } else { // Fall back to base pricing if we can't find the key.
  367. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  368. return &Node{
  369. Cost: aws.BaseCPUPrice,
  370. BaseCPUPrice: aws.BaseCPUPrice,
  371. UsageType: usageType,
  372. UsesBaseCPUPrice: true,
  373. }, nil
  374. }
  375. }
  376. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  377. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  378. defaultClusterName := "AWS Cluster #1"
  379. makeStructure := func(clusterName string) ([]byte, error) {
  380. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  381. m := make(map[string]string)
  382. m["name"] = clusterName
  383. m["provider"] = "AWS"
  384. return json.Marshal(m)
  385. }
  386. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  387. if len(maybeClusterId) != 0 {
  388. return makeStructure(maybeClusterId)
  389. }
  390. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  391. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  392. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  393. if err != nil {
  394. return nil, err
  395. }
  396. for _, n := range nodeList.Items {
  397. region := ""
  398. instanceId := ""
  399. providerId := n.Spec.ProviderID
  400. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  401. if matchNum == 1 {
  402. region = group
  403. } else if matchNum == 2 {
  404. instanceId = group
  405. }
  406. }
  407. if len(instanceId) == 0 {
  408. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  409. continue
  410. }
  411. c := &aws.Config{
  412. Region: aws.String(region),
  413. }
  414. s := session.Must(session.NewSession(c))
  415. ec2Svc := ec2.New(s)
  416. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  417. InstanceIds: []*string{
  418. aws.String(instanceId),
  419. },
  420. })
  421. if diErr != nil {
  422. // maybe log this?
  423. continue
  424. }
  425. if len(di.Reservations) != 1 {
  426. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  427. continue
  428. }
  429. res := di.Reservations[0]
  430. if len(res.Instances) != 1 {
  431. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  432. continue
  433. }
  434. inst := res.Instances[0]
  435. for _, tag := range inst.Tags {
  436. tagKey := *tag.Key
  437. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  438. if matchNum != 1 {
  439. continue
  440. }
  441. return makeStructure(group)
  442. }
  443. }
  444. }
  445. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  446. return makeStructure(defaultClusterName)
  447. }
  448. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  449. func (*AWS) AddServiceKey(formValues url.Values) error {
  450. keyID := formValues.Get("access_key_ID")
  451. key := formValues.Get("secret_access_key")
  452. m := make(map[string]string)
  453. m["access_key_ID"] = keyID
  454. m["secret_access_key"] = key
  455. result, err := json.Marshal(m)
  456. if err != nil {
  457. return err
  458. }
  459. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  460. }
  461. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  462. func (*AWS) GetDisks() ([]byte, error) {
  463. jsonFile, err := os.Open("/var/configs/key.json")
  464. if err == nil {
  465. byteValue, _ := ioutil.ReadAll(jsonFile)
  466. var result map[string]string
  467. err := json.Unmarshal([]byte(byteValue), &result)
  468. if err != nil {
  469. return nil, err
  470. }
  471. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  472. if err != nil {
  473. return nil, err
  474. }
  475. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  476. if err != nil {
  477. return nil, err
  478. }
  479. } else if os.IsNotExist(err) {
  480. klog.V(2).Infof("Using Default Credentials")
  481. } else {
  482. return nil, err
  483. }
  484. defer jsonFile.Close()
  485. clusterConfig, err := os.Open("/var/configs/cluster.json")
  486. if err != nil {
  487. return nil, err
  488. }
  489. defer clusterConfig.Close()
  490. b, err := ioutil.ReadAll(clusterConfig)
  491. if err != nil {
  492. return nil, err
  493. }
  494. var clusterConf map[string]string
  495. err = json.Unmarshal([]byte(b), &clusterConf)
  496. if err != nil {
  497. return nil, err
  498. }
  499. region := aws.String(clusterConf["region"])
  500. c := &aws.Config{
  501. Region: region,
  502. }
  503. s := session.Must(session.NewSession(c))
  504. ec2Svc := ec2.New(s)
  505. input := &ec2.DescribeVolumesInput{}
  506. volumeResult, err := ec2Svc.DescribeVolumes(input)
  507. if err != nil {
  508. if aerr, ok := err.(awserr.Error); ok {
  509. switch aerr.Code() {
  510. default:
  511. return nil, aerr
  512. }
  513. } else {
  514. return nil, err
  515. }
  516. }
  517. return json.Marshal(volumeResult)
  518. }
  519. // QuerySQL can query a properly configured Athena database.
  520. // Used to fetch billing data.
  521. // Requires a json config in /var/configs with key region, output, and database.
  522. func (*AWS) QuerySQL(query string) ([]byte, error) {
  523. jsonFile, err := os.Open("/var/configs/key.json")
  524. if err == nil {
  525. byteValue, _ := ioutil.ReadAll(jsonFile)
  526. var result map[string]string
  527. json.Unmarshal([]byte(byteValue), &result)
  528. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  529. if err != nil {
  530. return nil, err
  531. }
  532. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  533. if err != nil {
  534. return nil, err
  535. }
  536. } else if os.IsNotExist(err) {
  537. klog.V(2).Infof("Using Default Credentials")
  538. } else {
  539. return nil, err
  540. }
  541. defer jsonFile.Close()
  542. athenaConfigs, err := os.Open("/var/configs/athena.json")
  543. if err != nil {
  544. return nil, err
  545. }
  546. defer athenaConfigs.Close()
  547. b, err := ioutil.ReadAll(athenaConfigs)
  548. if err != nil {
  549. return nil, err
  550. }
  551. var athenaConf map[string]string
  552. json.Unmarshal([]byte(b), &athenaConf)
  553. region := aws.String(athenaConf["region"])
  554. resultsBucket := athenaConf["output"]
  555. database := athenaConf["database"]
  556. c := &aws.Config{
  557. Region: region,
  558. }
  559. s := session.Must(session.NewSession(c))
  560. svc := athena.New(s)
  561. var e athena.StartQueryExecutionInput
  562. var r athena.ResultConfiguration
  563. r.SetOutputLocation(resultsBucket)
  564. e.SetResultConfiguration(&r)
  565. e.SetQueryString(query)
  566. var q athena.QueryExecutionContext
  567. q.SetDatabase(database)
  568. e.SetQueryExecutionContext(&q)
  569. res, err := svc.StartQueryExecution(&e)
  570. if err != nil {
  571. return nil, err
  572. }
  573. klog.V(2).Infof("StartQueryExecution result:")
  574. klog.V(2).Infof(res.GoString())
  575. var qri athena.GetQueryExecutionInput
  576. qri.SetQueryExecutionId(*res.QueryExecutionId)
  577. var qrop *athena.GetQueryExecutionOutput
  578. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  579. for {
  580. qrop, err = svc.GetQueryExecution(&qri)
  581. if err != nil {
  582. return nil, err
  583. }
  584. if *qrop.QueryExecution.Status.State != "RUNNING" {
  585. break
  586. }
  587. time.Sleep(duration)
  588. }
  589. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  590. var ip athena.GetQueryResultsInput
  591. ip.SetQueryExecutionId(*res.QueryExecutionId)
  592. op, err := svc.GetQueryResults(&ip)
  593. if err != nil {
  594. return nil, err
  595. }
  596. b, err := json.Marshal(op.ResultSet)
  597. if err != nil {
  598. return nil, err
  599. }
  600. return b, nil
  601. }
  602. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  603. }
  604. type spotInfo struct {
  605. Timestamp string `csv:"Timestamp"`
  606. UsageType string `csv:"UsageType"`
  607. Operation string `csv:"Operation"`
  608. InstanceID string `csv:"InstanceID"`
  609. MyBidID string `csv:"MyBidID"`
  610. MyMaxPrice string `csv:"MyMaxPrice"`
  611. MarketPrice string `csv:"MarketPrice"`
  612. Charge string `csv:"Charge"`
  613. Version string `csv:"Version"`
  614. }
  615. type fnames []*string
  616. func (f fnames) Len() int {
  617. return len(f)
  618. }
  619. func (f fnames) Swap(i, j int) {
  620. f[i], f[j] = f[j], f[i]
  621. }
  622. func (f fnames) Less(i, j int) bool {
  623. key1 := strings.Split(*f[i], ".")
  624. key2 := strings.Split(*f[j], ".")
  625. t1, err := time.Parse("2006-01-02-15", key1[1])
  626. if err != nil {
  627. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  628. return false
  629. }
  630. t2, err := time.Parse("2006-01-02-15", key2[1])
  631. if err != nil {
  632. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  633. return false
  634. }
  635. return t1.Before(t2)
  636. }
  637. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  638. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  639. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  640. if err != nil {
  641. return nil, err
  642. }
  643. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  644. if err != nil {
  645. return nil, err
  646. }
  647. }
  648. s3Prefix := projectID
  649. if len(prefix) != 0 {
  650. s3Prefix = prefix + "/" + s3Prefix
  651. }
  652. c := aws.NewConfig().WithRegion(region)
  653. s := session.Must(session.NewSession(c))
  654. s3Svc := s3.New(s)
  655. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  656. tNow := time.Now()
  657. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  658. ls := &s3.ListObjectsInput{
  659. Bucket: aws.String(bucket),
  660. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  661. }
  662. ls2 := &s3.ListObjectsInput{
  663. Bucket: aws.String(bucket),
  664. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  665. }
  666. lso, err := s3Svc.ListObjects(ls)
  667. if err != nil {
  668. return nil, err
  669. }
  670. lsoLen := len(lso.Contents)
  671. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  672. if lsoLen == 0 {
  673. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", ls.Bucket, ls.Prefix)
  674. }
  675. lso2, err := s3Svc.ListObjects(ls2)
  676. if err != nil {
  677. return nil, err
  678. }
  679. lso2Len := len(lso2.Contents)
  680. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  681. if lso2Len == 0 {
  682. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", ls2.Bucket, ls2.Prefix)
  683. }
  684. var keys []*string
  685. for _, obj := range lso.Contents {
  686. keys = append(keys, obj.Key)
  687. }
  688. for _, obj := range lso2.Contents {
  689. keys = append(keys, obj.Key)
  690. }
  691. spots := make(map[string]*spotInfo)
  692. for _, key := range keys {
  693. getObj := &s3.GetObjectInput{
  694. Bucket: aws.String(bucket),
  695. Key: key,
  696. }
  697. buf := aws.NewWriteAtBuffer([]byte{})
  698. _, err := downloader.Download(buf, getObj)
  699. if err != nil {
  700. return nil, err
  701. }
  702. r := bytes.NewReader(buf.Bytes())
  703. gr, err := gzip.NewReader(r)
  704. if err != nil {
  705. return nil, err
  706. }
  707. csvReader := csv.NewReader(gr)
  708. csvReader.Comma = '\t'
  709. header, err := csvutil.Header(spotInfo{}, "csv")
  710. if err != nil {
  711. return nil, err
  712. }
  713. dec, err := csvutil.NewDecoder(csvReader, header...)
  714. if err != nil {
  715. return nil, err
  716. }
  717. for {
  718. if err := dec.Decode(&spotInfo{}); err == io.EOF {
  719. break
  720. }
  721. st := dec.Record()
  722. if len(st) == 9 { // it's tab separated but not quite a tsv
  723. klog.V(3).Infof("Found spot info %+v", st)
  724. spot := &spotInfo{
  725. Timestamp: st[0],
  726. UsageType: st[1],
  727. Operation: st[2],
  728. InstanceID: st[3],
  729. MyBidID: st[4],
  730. MyMaxPrice: st[5],
  731. MarketPrice: st[6],
  732. Charge: st[7],
  733. Version: st[8],
  734. }
  735. if spot.Version != supportedSpotFeedVersion {
  736. klog.V(1).Infof("Possibly unsupported spot data feed version " + spot.Version)
  737. }
  738. spots[spot.InstanceID] = spot
  739. }
  740. }
  741. gr.Close()
  742. }
  743. return spots, nil
  744. }