awsprovider.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strings"
  15. "time"
  16. "k8s.io/klog"
  17. "github.com/aws/aws-sdk-go/aws"
  18. "github.com/aws/aws-sdk-go/aws/awserr"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/athena"
  21. "github.com/aws/aws-sdk-go/service/ec2"
  22. "github.com/aws/aws-sdk-go/service/s3"
  23. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  24. "github.com/jszwec/csvutil"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/client-go/kubernetes"
  28. )
  29. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  30. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  31. const supportedSpotFeedVersion = "1"
  32. // AWS represents an Amazon Provider
  33. type AWS struct {
  34. Pricing map[string]*AWSProductTerms
  35. SpotPricingByInstanceID map[string]*spotInfo
  36. ValidPricingKeys map[string]bool
  37. Clientset *kubernetes.Clientset
  38. BaseCPUPrice string
  39. BaseSpotCPUPrice string
  40. BaseSpotRAMPrice string
  41. SpotLabelName string
  42. SpotLabelValue string
  43. ServiceKeyName string
  44. ServiceKeySecret string
  45. SpotDataRegion string
  46. SpotDataBucket string
  47. SpotDataPrefix string
  48. ProjectID string
  49. }
  50. // AWSPricing maps a k8s node to an AWS Pricing "product"
  51. type AWSPricing struct {
  52. Products map[string]*AWSProduct `json:"products"`
  53. Terms AWSPricingTerms `json:"terms"`
  54. }
  55. // AWSProduct represents a purchased SKU
  56. type AWSProduct struct {
  57. Sku string `json:"sku"`
  58. Attributes AWSProductAttributes `json:"attributes"`
  59. }
  60. // AWSProductAttributes represents metadata about the product used to map to a node.
  61. type AWSProductAttributes struct {
  62. Location string `json:"location"`
  63. InstanceType string `json:"instanceType"`
  64. Memory string `json:"memory"`
  65. Storage string `json:"storage"`
  66. VCpu string `json:"vcpu"`
  67. UsageType string `json:"usagetype"`
  68. OperatingSystem string `json:"operatingSystem"`
  69. PreInstalledSw string `json:"preInstalledSw"`
  70. }
  71. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  72. type AWSPricingTerms struct {
  73. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  74. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  75. }
  76. // AWSOfferTerm is a sku extension used to pay for the node.
  77. type AWSOfferTerm struct {
  78. Sku string `json:"sku"`
  79. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  80. }
  81. // AWSRateCode encodes data about the price of a product
  82. type AWSRateCode struct {
  83. Unit string `json:"unit"`
  84. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  85. }
  86. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  87. type AWSCurrencyCode struct {
  88. USD string `json:"USD"`
  89. }
  90. // AWSProductTerms represents the full terms of the product
  91. type AWSProductTerms struct {
  92. Sku string `json:"sku"`
  93. OnDemand *AWSOfferTerm `json:"OnDemand"`
  94. Reserved *AWSOfferTerm `json:"Reserved"`
  95. Memory string `json:"memory"`
  96. Storage string `json:"storage"`
  97. VCpu string `json:"vcpu"`
  98. }
  99. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  100. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  101. // OnDemandRateCode is appended to an node sku
  102. const OnDemandRateCode = ".JRTCKXETXF"
  103. // ReservedRateCode is appended to a node sku
  104. const ReservedRateCode = ".38NPMPTW36"
  105. // HourlyRateCode is appended to a node sku
  106. const HourlyRateCode = ".6YS6EN2CT7"
  107. // KubeAttrConversion maps the k8s labels for region to an aws region
  108. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  109. locationToRegion := map[string]string{
  110. "US East (Ohio)": "us-east-2",
  111. "US East (N. Virginia)": "us-east-1",
  112. "US West (N. California)": "us-west-1",
  113. "US West (Oregon)": "us-west-2",
  114. "Asia Pacific (Mumbai)": "ap-south-1",
  115. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  116. "Asia Pacific (Seoul)": "ap-northeast-2",
  117. "Asia Pacific (Singapore)": "ap-southeast-1",
  118. "Asia Pacific (Sydney)": "ap-southeast-2",
  119. "Asia Pacific (Tokyo)": "ap-northeast-1",
  120. "Canada (Central)": "ca-central-1",
  121. "China (Beijing)": "cn-north-1",
  122. "China (Ningxia)": "cn-northwest-1",
  123. "EU (Frankfurt)": "eu-central-1",
  124. "EU (Ireland)": "eu-west-1",
  125. "EU (London)": "eu-west-2",
  126. "EU (Paris)": "eu-west-3",
  127. "EU (Stockholm)": "eu-north-1",
  128. "South America (São Paulo)": "sa-east-1",
  129. "AWS GovCloud (US-East)": "us-gov-east-1",
  130. "AWS GovCloud (US)": "us-gov-west-1",
  131. }
  132. operatingSystem = strings.ToLower(operatingSystem)
  133. region := locationToRegion[location]
  134. return region + "," + instanceType + "," + operatingSystem
  135. }
  136. type awsKey struct {
  137. SpotLabelName string
  138. SpotLabelValue string
  139. Labels map[string]string
  140. ProviderID string
  141. }
  142. func (k *awsKey) ID() string {
  143. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  144. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  145. if matchNum == 2 {
  146. return group
  147. }
  148. }
  149. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  150. return ""
  151. }
  152. func (k *awsKey) Features() string {
  153. instanceType := k.Labels[v1.LabelInstanceType]
  154. var operatingSystem string
  155. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  156. if !ok {
  157. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  158. }
  159. region := k.Labels[v1.LabelZoneRegion]
  160. key := region + "," + instanceType + "," + operatingSystem
  161. usageType := "preemptible"
  162. spotKey := key + "," + usageType
  163. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  164. return spotKey
  165. }
  166. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  167. return spotKey
  168. }
  169. return key
  170. }
  171. // GetKey maps node labels to information needed to retrieve pricing data
  172. func (aws *AWS) GetKey(labels map[string]string) Key {
  173. return &awsKey{
  174. SpotLabelName: aws.SpotLabelName,
  175. SpotLabelValue: aws.SpotLabelValue,
  176. Labels: labels,
  177. ProviderID: labels["providerID"],
  178. }
  179. }
  180. func (aws *AWS) isPreemptible(key string) bool {
  181. s := strings.Split(key, ",")
  182. if len(s) == 4 && s[3] == "preemptible" {
  183. return true
  184. }
  185. return false
  186. }
  187. // DownloadPricingData fetches data from the AWS Pricing API
  188. func (aws *AWS) DownloadPricingData() error {
  189. c, err := GetDefaultPricingData("aws.json")
  190. if err != nil {
  191. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  192. }
  193. aws.BaseCPUPrice = c.CPU
  194. aws.BaseSpotCPUPrice = c.SpotCPU
  195. aws.BaseSpotRAMPrice = c.SpotRAM
  196. aws.SpotLabelName = c.SpotLabel
  197. aws.SpotLabelValue = c.SpotLabelValue
  198. aws.SpotDataBucket = c.SpotDataBucket
  199. aws.SpotDataPrefix = c.SpotDataPrefix
  200. aws.ProjectID = c.ProjectID
  201. aws.SpotDataRegion = c.SpotDataRegion
  202. aws.ServiceKeyName = c.ServiceKeyName
  203. aws.ServiceKeySecret = c.ServiceKeySecret
  204. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  205. if err != nil {
  206. return err
  207. }
  208. inputkeys := make(map[string]bool)
  209. for _, n := range nodeList.Items {
  210. labels := n.GetObjectMeta().GetLabels()
  211. key := aws.GetKey(labels)
  212. inputkeys[key.Features()] = true
  213. }
  214. aws.Pricing = make(map[string]*AWSProductTerms)
  215. aws.ValidPricingKeys = make(map[string]bool)
  216. skusToKeys := make(map[string]string)
  217. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  218. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  219. resp, err := http.Get(pricingURL)
  220. if err != nil {
  221. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  222. return err
  223. }
  224. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  225. dec := json.NewDecoder(resp.Body)
  226. for {
  227. t, err := dec.Token()
  228. if err == io.EOF {
  229. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  230. break
  231. }
  232. if t == "products" {
  233. _, err := dec.Token() // this should parse the opening "{""
  234. if err != nil {
  235. return err
  236. }
  237. for dec.More() {
  238. _, err := dec.Token() // the sku token
  239. if err != nil {
  240. return err
  241. }
  242. product := &AWSProduct{}
  243. err = dec.Decode(&product)
  244. if err != nil {
  245. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  246. break
  247. }
  248. if product.Attributes.PreInstalledSw == "NA" &&
  249. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  250. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  251. spotKey := key + ",preemptible"
  252. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  253. productTerms := &AWSProductTerms{
  254. Sku: product.Sku,
  255. Memory: product.Attributes.Memory,
  256. Storage: product.Attributes.Storage,
  257. VCpu: product.Attributes.VCpu,
  258. }
  259. aws.Pricing[key] = productTerms
  260. aws.Pricing[spotKey] = productTerms
  261. skusToKeys[product.Sku] = key
  262. }
  263. aws.ValidPricingKeys[key] = true
  264. aws.ValidPricingKeys[spotKey] = true
  265. }
  266. }
  267. }
  268. if t == "terms" {
  269. _, err := dec.Token() // this should parse the opening "{""
  270. if err != nil {
  271. return err
  272. }
  273. termType, err := dec.Token()
  274. if err != nil {
  275. return err
  276. }
  277. if termType == "OnDemand" {
  278. _, err := dec.Token()
  279. if err != nil { // again, should parse an opening "{"
  280. return err
  281. }
  282. for dec.More() {
  283. sku, err := dec.Token()
  284. if err != nil {
  285. return err
  286. }
  287. _, err = dec.Token() // another opening "{"
  288. if err != nil {
  289. return err
  290. }
  291. skuOnDemand, err := dec.Token()
  292. if err != nil {
  293. return err
  294. }
  295. offerTerm := &AWSOfferTerm{}
  296. err = dec.Decode(&offerTerm)
  297. if err != nil {
  298. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  299. }
  300. if sku.(string)+OnDemandRateCode == skuOnDemand {
  301. key, ok := skusToKeys[sku.(string)]
  302. spotKey := key + ",preemptible"
  303. if ok {
  304. aws.Pricing[key].OnDemand = offerTerm
  305. aws.Pricing[spotKey].OnDemand = offerTerm
  306. }
  307. }
  308. _, err = dec.Token()
  309. if err != nil {
  310. return err
  311. }
  312. }
  313. _, err = dec.Token()
  314. if err != nil {
  315. return err
  316. }
  317. }
  318. }
  319. }
  320. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  321. if err != nil {
  322. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  323. } else {
  324. aws.SpotPricingByInstanceID = sp
  325. }
  326. return nil
  327. }
  328. // AllNodePricing returns all the billing data fetched.
  329. func (aws *AWS) AllNodePricing() (interface{}, error) {
  330. return aws.Pricing, nil
  331. }
  332. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  333. key := k.Features()
  334. if aws.isPreemptible(key) {
  335. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  336. var spotcost string
  337. arr := strings.Split(spotInfo.Charge, " ")
  338. if len(arr) == 2 {
  339. spotcost = arr[0]
  340. } else {
  341. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  342. }
  343. return &Node{
  344. Cost: spotcost,
  345. VCPU: terms.VCpu,
  346. RAM: terms.Memory,
  347. Storage: terms.Storage,
  348. BaseCPUPrice: aws.BaseCPUPrice,
  349. UsageType: usageType,
  350. }, nil
  351. }
  352. return &Node{
  353. VCPU: terms.VCpu,
  354. VCPUCost: aws.BaseSpotCPUPrice,
  355. RAM: terms.Memory,
  356. RAMCost: aws.BaseSpotRAMPrice,
  357. Storage: terms.Storage,
  358. BaseCPUPrice: aws.BaseCPUPrice,
  359. UsageType: usageType,
  360. }, nil
  361. }
  362. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  363. return &Node{
  364. Cost: cost,
  365. VCPU: terms.VCpu,
  366. RAM: terms.Memory,
  367. Storage: terms.Storage,
  368. BaseCPUPrice: aws.BaseCPUPrice,
  369. UsageType: usageType,
  370. }, nil
  371. }
  372. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  373. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  374. //return json.Marshal(aws.Pricing[key])
  375. key := k.Features()
  376. usageType := "ondemand"
  377. if aws.isPreemptible(key) {
  378. usageType = "preemptible"
  379. }
  380. terms, ok := aws.Pricing[key]
  381. if ok {
  382. return aws.createNode(terms, usageType, k)
  383. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  384. err := aws.DownloadPricingData()
  385. if err != nil {
  386. return nil, err
  387. }
  388. terms, termsOk := aws.Pricing[key]
  389. if !termsOk {
  390. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  391. }
  392. return aws.createNode(terms, usageType, k)
  393. } else { // Fall back to base pricing if we can't find the key.
  394. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  395. return &Node{
  396. Cost: aws.BaseCPUPrice,
  397. BaseCPUPrice: aws.BaseCPUPrice,
  398. UsageType: usageType,
  399. UsesBaseCPUPrice: true,
  400. }, nil
  401. }
  402. }
  403. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  404. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  405. defaultClusterName := "AWS Cluster #1"
  406. makeStructure := func(clusterName string) ([]byte, error) {
  407. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  408. m := make(map[string]string)
  409. m["name"] = clusterName
  410. m["provider"] = "AWS"
  411. return json.Marshal(m)
  412. }
  413. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  414. if len(maybeClusterId) != 0 {
  415. return makeStructure(maybeClusterId)
  416. }
  417. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  418. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  419. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  420. if err != nil {
  421. return nil, err
  422. }
  423. for _, n := range nodeList.Items {
  424. region := ""
  425. instanceId := ""
  426. providerId := n.Spec.ProviderID
  427. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  428. if matchNum == 1 {
  429. region = group
  430. } else if matchNum == 2 {
  431. instanceId = group
  432. }
  433. }
  434. if len(instanceId) == 0 {
  435. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  436. continue
  437. }
  438. c := &aws.Config{
  439. Region: aws.String(region),
  440. }
  441. s := session.Must(session.NewSession(c))
  442. ec2Svc := ec2.New(s)
  443. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  444. InstanceIds: []*string{
  445. aws.String(instanceId),
  446. },
  447. })
  448. if diErr != nil {
  449. // maybe log this?
  450. continue
  451. }
  452. if len(di.Reservations) != 1 {
  453. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  454. continue
  455. }
  456. res := di.Reservations[0]
  457. if len(res.Instances) != 1 {
  458. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  459. continue
  460. }
  461. inst := res.Instances[0]
  462. for _, tag := range inst.Tags {
  463. tagKey := *tag.Key
  464. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  465. if matchNum != 1 {
  466. continue
  467. }
  468. return makeStructure(group)
  469. }
  470. }
  471. }
  472. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  473. return makeStructure(defaultClusterName)
  474. }
  475. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  476. func (*AWS) AddServiceKey(formValues url.Values) error {
  477. keyID := formValues.Get("access_key_ID")
  478. key := formValues.Get("secret_access_key")
  479. m := make(map[string]string)
  480. m["access_key_ID"] = keyID
  481. m["secret_access_key"] = key
  482. result, err := json.Marshal(m)
  483. if err != nil {
  484. return err
  485. }
  486. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  487. }
  488. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  489. func (*AWS) GetDisks() ([]byte, error) {
  490. jsonFile, err := os.Open("/var/configs/key.json")
  491. if err == nil {
  492. byteValue, _ := ioutil.ReadAll(jsonFile)
  493. var result map[string]string
  494. err := json.Unmarshal([]byte(byteValue), &result)
  495. if err != nil {
  496. return nil, err
  497. }
  498. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  499. if err != nil {
  500. return nil, err
  501. }
  502. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  503. if err != nil {
  504. return nil, err
  505. }
  506. } else if os.IsNotExist(err) {
  507. klog.V(2).Infof("Using Default Credentials")
  508. } else {
  509. return nil, err
  510. }
  511. defer jsonFile.Close()
  512. clusterConfig, err := os.Open("/var/configs/cluster.json")
  513. if err != nil {
  514. return nil, err
  515. }
  516. defer clusterConfig.Close()
  517. b, err := ioutil.ReadAll(clusterConfig)
  518. if err != nil {
  519. return nil, err
  520. }
  521. var clusterConf map[string]string
  522. err = json.Unmarshal([]byte(b), &clusterConf)
  523. if err != nil {
  524. return nil, err
  525. }
  526. region := aws.String(clusterConf["region"])
  527. c := &aws.Config{
  528. Region: region,
  529. }
  530. s := session.Must(session.NewSession(c))
  531. ec2Svc := ec2.New(s)
  532. input := &ec2.DescribeVolumesInput{}
  533. volumeResult, err := ec2Svc.DescribeVolumes(input)
  534. if err != nil {
  535. if aerr, ok := err.(awserr.Error); ok {
  536. switch aerr.Code() {
  537. default:
  538. return nil, aerr
  539. }
  540. } else {
  541. return nil, err
  542. }
  543. }
  544. return json.Marshal(volumeResult)
  545. }
  546. func (*AWS) ExternalAllocations(start string, end string) ([]*OutOfClusterAllocation, error) {
  547. return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  548. }
  549. // QuerySQL can query a properly configured Athena database.
  550. // Used to fetch billing data.
  551. // Requires a json config in /var/configs with key region, output, and database.
  552. func (*AWS) QuerySQL(query string) ([]byte, error) {
  553. jsonFile, err := os.Open("/var/configs/key.json")
  554. if err == nil {
  555. byteValue, _ := ioutil.ReadAll(jsonFile)
  556. var result map[string]string
  557. json.Unmarshal([]byte(byteValue), &result)
  558. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  559. if err != nil {
  560. return nil, err
  561. }
  562. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  563. if err != nil {
  564. return nil, err
  565. }
  566. } else if os.IsNotExist(err) {
  567. klog.V(2).Infof("Using Default Credentials")
  568. } else {
  569. return nil, err
  570. }
  571. defer jsonFile.Close()
  572. athenaConfigs, err := os.Open("/var/configs/athena.json")
  573. if err != nil {
  574. return nil, err
  575. }
  576. defer athenaConfigs.Close()
  577. b, err := ioutil.ReadAll(athenaConfigs)
  578. if err != nil {
  579. return nil, err
  580. }
  581. var athenaConf map[string]string
  582. json.Unmarshal([]byte(b), &athenaConf)
  583. region := aws.String(athenaConf["region"])
  584. resultsBucket := athenaConf["output"]
  585. database := athenaConf["database"]
  586. c := &aws.Config{
  587. Region: region,
  588. }
  589. s := session.Must(session.NewSession(c))
  590. svc := athena.New(s)
  591. var e athena.StartQueryExecutionInput
  592. var r athena.ResultConfiguration
  593. r.SetOutputLocation(resultsBucket)
  594. e.SetResultConfiguration(&r)
  595. e.SetQueryString(query)
  596. var q athena.QueryExecutionContext
  597. q.SetDatabase(database)
  598. e.SetQueryExecutionContext(&q)
  599. res, err := svc.StartQueryExecution(&e)
  600. if err != nil {
  601. return nil, err
  602. }
  603. klog.V(2).Infof("StartQueryExecution result:")
  604. klog.V(2).Infof(res.GoString())
  605. var qri athena.GetQueryExecutionInput
  606. qri.SetQueryExecutionId(*res.QueryExecutionId)
  607. var qrop *athena.GetQueryExecutionOutput
  608. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  609. for {
  610. qrop, err = svc.GetQueryExecution(&qri)
  611. if err != nil {
  612. return nil, err
  613. }
  614. if *qrop.QueryExecution.Status.State != "RUNNING" {
  615. break
  616. }
  617. time.Sleep(duration)
  618. }
  619. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  620. var ip athena.GetQueryResultsInput
  621. ip.SetQueryExecutionId(*res.QueryExecutionId)
  622. op, err := svc.GetQueryResults(&ip)
  623. if err != nil {
  624. return nil, err
  625. }
  626. b, err := json.Marshal(op.ResultSet)
  627. if err != nil {
  628. return nil, err
  629. }
  630. return b, nil
  631. }
  632. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  633. }
  634. type spotInfo struct {
  635. Timestamp string `csv:"Timestamp"`
  636. UsageType string `csv:"UsageType"`
  637. Operation string `csv:"Operation"`
  638. InstanceID string `csv:"InstanceID"`
  639. MyBidID string `csv:"MyBidID"`
  640. MyMaxPrice string `csv:"MyMaxPrice"`
  641. MarketPrice string `csv:"MarketPrice"`
  642. Charge string `csv:"Charge"`
  643. Version string `csv:"Version"`
  644. }
  645. type fnames []*string
  646. func (f fnames) Len() int {
  647. return len(f)
  648. }
  649. func (f fnames) Swap(i, j int) {
  650. f[i], f[j] = f[j], f[i]
  651. }
  652. func (f fnames) Less(i, j int) bool {
  653. key1 := strings.Split(*f[i], ".")
  654. key2 := strings.Split(*f[j], ".")
  655. t1, err := time.Parse("2006-01-02-15", key1[1])
  656. if err != nil {
  657. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  658. return false
  659. }
  660. t2, err := time.Parse("2006-01-02-15", key2[1])
  661. if err != nil {
  662. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  663. return false
  664. }
  665. return t1.Before(t2)
  666. }
  667. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  668. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  669. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  670. if err != nil {
  671. return nil, err
  672. }
  673. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  674. if err != nil {
  675. return nil, err
  676. }
  677. }
  678. s3Prefix := projectID
  679. if len(prefix) != 0 {
  680. s3Prefix = prefix + "/" + s3Prefix
  681. }
  682. c := aws.NewConfig().WithRegion(region)
  683. s := session.Must(session.NewSession(c))
  684. s3Svc := s3.New(s)
  685. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  686. tNow := time.Now()
  687. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  688. ls := &s3.ListObjectsInput{
  689. Bucket: aws.String(bucket),
  690. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  691. }
  692. ls2 := &s3.ListObjectsInput{
  693. Bucket: aws.String(bucket),
  694. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  695. }
  696. lso, err := s3Svc.ListObjects(ls)
  697. if err != nil {
  698. return nil, err
  699. }
  700. lsoLen := len(lso.Contents)
  701. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  702. if lsoLen == 0 {
  703. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", ls.Bucket, ls.Prefix)
  704. }
  705. lso2, err := s3Svc.ListObjects(ls2)
  706. if err != nil {
  707. return nil, err
  708. }
  709. lso2Len := len(lso2.Contents)
  710. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  711. if lso2Len == 0 {
  712. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", ls2.Bucket, ls2.Prefix)
  713. }
  714. var keys []*string
  715. for _, obj := range lso.Contents {
  716. keys = append(keys, obj.Key)
  717. }
  718. for _, obj := range lso2.Contents {
  719. keys = append(keys, obj.Key)
  720. }
  721. spots := make(map[string]*spotInfo)
  722. for _, key := range keys {
  723. getObj := &s3.GetObjectInput{
  724. Bucket: aws.String(bucket),
  725. Key: key,
  726. }
  727. buf := aws.NewWriteAtBuffer([]byte{})
  728. _, err := downloader.Download(buf, getObj)
  729. if err != nil {
  730. return nil, err
  731. }
  732. r := bytes.NewReader(buf.Bytes())
  733. gr, err := gzip.NewReader(r)
  734. if err != nil {
  735. return nil, err
  736. }
  737. csvReader := csv.NewReader(gr)
  738. csvReader.Comma = '\t'
  739. header, err := csvutil.Header(spotInfo{}, "csv")
  740. if err != nil {
  741. return nil, err
  742. }
  743. dec, err := csvutil.NewDecoder(csvReader, header...)
  744. if err != nil {
  745. return nil, err
  746. }
  747. for {
  748. if err := dec.Decode(&spotInfo{}); err == io.EOF {
  749. break
  750. }
  751. st := dec.Record()
  752. if len(st) == 9 { // it's tab separated but not quite a tsv
  753. klog.V(3).Infof("Found spot info %+v", st)
  754. spot := &spotInfo{
  755. Timestamp: st[0],
  756. UsageType: st[1],
  757. Operation: st[2],
  758. InstanceID: st[3],
  759. MyBidID: st[4],
  760. MyMaxPrice: st[5],
  761. MarketPrice: st[6],
  762. Charge: st[7],
  763. Version: st[8],
  764. }
  765. if spot.Version != supportedSpotFeedVersion {
  766. klog.V(1).Infof("Possibly unsupported spot data feed version " + spot.Version)
  767. }
  768. spots[spot.InstanceID] = spot
  769. }
  770. }
  771. gr.Close()
  772. }
  773. return spots, nil
  774. }