provider.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. package cloud
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "math"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "strconv"
  14. "strings"
  15. "time"
  16. "github.com/aws/aws-sdk-go/aws/awserr"
  17. "cloud.google.com/go/compute/metadata"
  18. "golang.org/x/oauth2"
  19. "golang.org/x/oauth2/google"
  20. compute "google.golang.org/api/compute/v1"
  21. "github.com/aws/aws-sdk-go/aws"
  22. "github.com/aws/aws-sdk-go/aws/credentials"
  23. "github.com/aws/aws-sdk-go/aws/session"
  24. "github.com/aws/aws-sdk-go/service/athena"
  25. "github.com/aws/aws-sdk-go/service/ec2"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/client-go/kubernetes"
  28. )
  29. type Provider interface {
  30. ClusterName() ([]byte, error)
  31. AddServiceKey(url.Values) error
  32. GetDisks() ([]byte, error)
  33. NodePricing(string) (*Node, error)
  34. AllNodePricing() (interface{}, error)
  35. DownloadPricingData() error
  36. GetKey(map[string]string) string
  37. QuerySQL(string) ([]byte, error)
  38. }
  39. type Node struct {
  40. Cost string
  41. VCPU string
  42. VCPUCost string
  43. RAM string
  44. RAMCost string
  45. Storage string
  46. StorageCost string
  47. }
  48. func NewProvider(clientset *kubernetes.Clientset, apiKey string) (Provider, error) {
  49. if metadata.OnGCE() {
  50. log.Printf("ON GCP AND KEY IS: %s", apiKey)
  51. return &GCP{
  52. Clientset: clientset,
  53. ApiKey: apiKey,
  54. }, nil
  55. } else {
  56. return &AWS{
  57. Clientset: clientset,
  58. }, nil
  59. }
  60. }
  61. type userAgentTransport struct {
  62. userAgent string
  63. base http.RoundTripper
  64. }
  65. func (t userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  66. req.Header.Set("User-Agent", t.userAgent)
  67. return t.base.RoundTrip(req)
  68. }
  69. type GCP struct {
  70. Pricing map[string]*GCPPricing
  71. Clientset *kubernetes.Clientset
  72. ApiKey string
  73. }
  74. func (*GCP) QuerySQL(query string) ([]byte, error) {
  75. return nil, nil
  76. }
  77. func (*GCP) ClusterName() ([]byte, error) {
  78. metadataClient := metadata.NewClient(&http.Client{Transport: userAgentTransport{
  79. userAgent: "kubecost",
  80. base: http.DefaultTransport,
  81. }})
  82. attribute, err := metadataClient.InstanceAttributeValue("cluster-name")
  83. if err != nil {
  84. return nil, err
  85. }
  86. m := make(map[string]string)
  87. m["name"] = attribute
  88. m["provider"] = "GCP"
  89. return json.Marshal(m)
  90. }
  91. func (*GCP) AddServiceKey(formValues url.Values) error {
  92. key := formValues.Get("key")
  93. k := []byte(key)
  94. return ioutil.WriteFile("/var/configs/key.json", k, 0644)
  95. }
  96. func (*GCP) GetDisks() ([]byte, error) {
  97. // metadata API setup
  98. metadataClient := metadata.NewClient(&http.Client{Transport: userAgentTransport{
  99. userAgent: "kubecost",
  100. base: http.DefaultTransport,
  101. }})
  102. projID, err := metadataClient.ProjectID()
  103. if err != nil {
  104. return nil, err
  105. }
  106. client, err := google.DefaultClient(oauth2.NoContext,
  107. "https://www.googleapis.com/auth/compute.readonly")
  108. if err != nil {
  109. return nil, err
  110. }
  111. svc, err := compute.New(client)
  112. if err != nil {
  113. return nil, err
  114. }
  115. res, err := svc.Disks.AggregatedList(projID).Do()
  116. if err != nil {
  117. return nil, err
  118. }
  119. return json.Marshal(res)
  120. }
  121. type GCPPricing struct {
  122. Name string `json:"name"`
  123. SKUID string `json:"skuId"`
  124. Description string `json:"description"`
  125. Category *GCPResourceInfo `json:"category"`
  126. ServiceRegions []string `json:"serviceRegions"`
  127. PricingInfo []*PricingInfo `json:"pricingInfo"`
  128. ServiceProviderName string `json:"serviceProviderName"`
  129. Node *Node `json:"node"`
  130. }
  131. type PricingInfo struct {
  132. Summary string `json:"summary"`
  133. PricingExpression *PricingExpression `json:"pricingExpression"`
  134. CurrencyConversionRate int `json:"currencyConversionRate"`
  135. EffectiveTime string `json:""`
  136. }
  137. type PricingExpression struct {
  138. UsageUnit string `json:"usageUnit"`
  139. UsageUnitDescription string `json:"usageUnitDescription"`
  140. BaseUnit string `json:"baseUnit"`
  141. BaseUnitConversionFactor int64 `json:"-"`
  142. DisplayQuantity int `json:"displayQuantity"`
  143. TieredRates []*TieredRates `json:"tieredRates"`
  144. }
  145. type TieredRates struct {
  146. StartUsageAmount int `json:"startUsageAmount"`
  147. UnitPrice *UnitPriceInfo `json:"unitPrice"`
  148. }
  149. type UnitPriceInfo struct {
  150. CurrencyCode string `json:"currencyCode"`
  151. Units string `json:"units"`
  152. Nanos float64 `json:"nanos"`
  153. }
  154. type GCPResourceInfo struct {
  155. ServiceDisplayName string `json:"serviceDisplayName"`
  156. ResourceFamily string `json:"resourceFamily"`
  157. ResourceGroup string `json:"resourceGroup"`
  158. UsageType string `json:"usageType"`
  159. }
  160. func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]bool) (map[string]*GCPPricing, string) {
  161. gcpPricingList := make(map[string]*GCPPricing)
  162. var nextPageToken string
  163. dec := json.NewDecoder(r)
  164. for {
  165. t, err := dec.Token()
  166. if err == io.EOF {
  167. break
  168. }
  169. //fmt.Printf("%v \n", t)
  170. if t == "skus" {
  171. dec.Token() // [
  172. for dec.More() {
  173. product := &GCPPricing{}
  174. err := dec.Decode(&product)
  175. if err != nil {
  176. fmt.Printf("Error: " + err.Error())
  177. break
  178. }
  179. usageType := strings.ToLower(product.Category.UsageType)
  180. instanceType := strings.ToLower(product.Category.ResourceGroup)
  181. if (instanceType == "ram" || instanceType == "cpu") && strings.Contains(strings.ToUpper(product.Description), "CUSTOM") {
  182. instanceType = "custom"
  183. }
  184. for _, sr := range product.ServiceRegions {
  185. region := sr
  186. candidateKey := region + "," + instanceType + "," + usageType
  187. if _, ok := inputKeys[candidateKey]; ok {
  188. lastRateIndex := len(product.PricingInfo[0].PricingExpression.TieredRates) - 1
  189. var nanos float64
  190. if len(product.PricingInfo) > 0 {
  191. nanos = product.PricingInfo[0].PricingExpression.TieredRates[lastRateIndex].UnitPrice.Nanos
  192. } else {
  193. continue
  194. }
  195. hourlyPrice := nanos * math.Pow10(-9)
  196. if hourlyPrice == 0 {
  197. continue
  198. } else if strings.Contains(strings.ToUpper(product.Description), "RAM") {
  199. if _, ok := gcpPricingList[candidateKey]; ok {
  200. gcpPricingList[candidateKey].Node.RAMCost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  201. } else {
  202. product.Node = &Node{
  203. RAMCost: strconv.FormatFloat(hourlyPrice, 'f', -1, 64),
  204. }
  205. log.Printf("NODE: %v", product.Node)
  206. gcpPricingList[candidateKey] = product
  207. }
  208. break
  209. } else {
  210. if _, ok := gcpPricingList[candidateKey]; ok {
  211. gcpPricingList[candidateKey].Node.VCPUCost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  212. } else {
  213. product.Node = &Node{
  214. VCPUCost: strconv.FormatFloat(hourlyPrice, 'f', -1, 64),
  215. }
  216. log.Printf("NODE: %v", product.Node)
  217. gcpPricingList[candidateKey] = product
  218. }
  219. break
  220. }
  221. }
  222. }
  223. }
  224. }
  225. if t == "nextPageToken" {
  226. pageToken, err := dec.Token()
  227. if err != nil {
  228. log.Printf("Error parsing nextpage token: " + err.Error())
  229. break
  230. }
  231. if pageToken.(string) != "" {
  232. nextPageToken = pageToken.(string)
  233. } else {
  234. nextPageToken = "done"
  235. }
  236. }
  237. }
  238. return gcpPricingList, nextPageToken
  239. }
  240. func (gcp *GCP) parsePages(inputKeys map[string]bool) (map[string]*GCPPricing, error) {
  241. var pages []map[string]*GCPPricing
  242. url := "https://cloudbilling.googleapis.com/v1/services/6F81-5844-456A/skus?key=" + gcp.ApiKey //AIzaSyDXQPG_MHUEy9neR7stolq6l0ujXmjJlvk
  243. log.Printf("URL: %s", url)
  244. var parsePagesHelper func(string) error
  245. parsePagesHelper = func(pageToken string) error {
  246. if pageToken == "done" {
  247. return nil
  248. } else if pageToken != "" {
  249. url = url + "&pageToken=" + pageToken
  250. }
  251. resp, err := http.Get(url)
  252. if err != nil {
  253. return err
  254. }
  255. page, token := gcp.parsePage(resp.Body, inputKeys)
  256. pages = append(pages, page)
  257. return parsePagesHelper(token)
  258. }
  259. err := parsePagesHelper("")
  260. returnPages := make(map[string]*GCPPricing)
  261. for _, page := range pages {
  262. for k, v := range page {
  263. returnPages[k] = v
  264. }
  265. }
  266. return returnPages, err
  267. }
  268. func (gcp *GCP) DownloadPricingData() error {
  269. nodeList, err := gcp.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  270. if err != nil {
  271. return err
  272. }
  273. inputkeys := make(map[string]bool)
  274. for _, n := range nodeList.Items {
  275. labels := n.GetObjectMeta().GetLabels()
  276. key := gcp.GetKey(labels)
  277. inputkeys[key] = true
  278. }
  279. pages, err := gcp.parsePages(inputkeys)
  280. if err != nil {
  281. return err
  282. }
  283. gcp.Pricing = pages
  284. return nil
  285. }
  286. func (gcp *GCP) GetKey(labels map[string]string) string {
  287. instanceType := strings.ToLower(strings.Join(strings.Split(labels["beta.kubernetes.io/instance-type"], "-")[:2], ""))
  288. if instanceType == "n1highmem" || instanceType == "n1highcpu" {
  289. instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
  290. } else if strings.HasPrefix(instanceType, "custom") {
  291. instanceType = "custom" // The suffix of custom does not matter
  292. }
  293. region := strings.ToLower(labels["failure-domain.beta.kubernetes.io/region"])
  294. var usageType string
  295. if t, ok := labels["cloud.google.com/gke-preemptible"]; ok && t == "true" {
  296. usageType = "preemptible"
  297. } else {
  298. usageType = "ondemand"
  299. }
  300. return region + "," + instanceType + "," + usageType
  301. }
  302. func (gcp *GCP) AllNodePricing() (interface{}, error) {
  303. return gcp.Pricing, nil
  304. }
  305. func (gcp *GCP) NodePricing(key string) (*Node, error) {
  306. if n, ok := gcp.Pricing[key]; ok {
  307. return n.Node, nil
  308. } else {
  309. log.Printf("Warning: no pricing data found for %s", key)
  310. return nil, fmt.Errorf("Warning: no pricing data found for %s", key)
  311. }
  312. }
  313. type AWS struct {
  314. Pricing map[string]*AWSProductTerms
  315. ValidPricingKeys map[string]bool
  316. Clientset *kubernetes.Clientset
  317. }
  318. type AWSPricing struct {
  319. Products map[string]*AWSProduct `json:"products"`
  320. Terms AWSPricingTerms `json:"terms"`
  321. }
  322. type AWSProduct struct {
  323. Sku string `json:"sku"`
  324. Attributes AWSProductAttributes `json:"attributes"`
  325. }
  326. type AWSProductAttributes struct {
  327. Location string `json:"location"`
  328. InstanceType string `json:"instanceType"`
  329. Memory string `json:"memory"`
  330. Storage string `json:"storage"`
  331. VCpu string `json:"vcpu"`
  332. UsageType string `json:"usagetype"`
  333. OperatingSystem string `json:"operatingSystem"`
  334. PreInstalledSw string `json:"preInstalledSw"`
  335. }
  336. type AWSPricingTerms struct {
  337. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  338. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  339. }
  340. type AWSOfferTerm struct {
  341. Sku string `json:"sku"`
  342. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  343. }
  344. type AWSRateCode struct {
  345. Unit string `json:"unit"`
  346. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  347. }
  348. type AWSCurrencyCode struct {
  349. USD string `json:"USD"`
  350. }
  351. type AWSProductTerms struct {
  352. Sku string `json:"sku"`
  353. OnDemand *AWSOfferTerm `json:"OnDemand"`
  354. Reserved *AWSOfferTerm `json:"Reserved"`
  355. Memory string `json:"memory"`
  356. Storage string `json:"storage"`
  357. VCpu string `json:"vcpu"`
  358. }
  359. const OnDemandRateCode = ".JRTCKXETXF"
  360. const ReservedRateCode = ".38NPMPTW36"
  361. const HourlyRateCode = ".6YS6EN2CT7"
  362. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  363. locationToRegion := map[string]string{
  364. "US East (Ohio)": "us-east-2",
  365. "US East (N. Virginia)": "us-east-1",
  366. "US West (N. California)": "us-west-1",
  367. "US West (Oregon)": "us-west-2",
  368. "Asia Pacific (Mumbai)": "ap-south-1",
  369. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  370. "Asia Pacific (Seoul)": "ap-northeast-2",
  371. "Asia Pacific (Singapore)": "ap-southeast-1",
  372. "Asia Pacific (Sydney)": "ap-southeast-2",
  373. "Asia Pacific (Tokyo)": "ap-northeast-1",
  374. "Canada (Central)": "ca-central-1",
  375. "China (Beijing)": "cn-north-1",
  376. "China (Ningxia)": "cn-northwest-1",
  377. "EU (Frankfurt)": "eu-central-1",
  378. "EU (Ireland)": "eu-west-1",
  379. "EU (London)": "eu-west-2",
  380. "EU (Paris)": "eu-west-3",
  381. "EU (Stockholm)": "eu-north-1",
  382. "South America (São Paulo)": "sa-east-1",
  383. "AWS GovCloud (US-East)": "us-gov-east-1",
  384. "AWS GovCloud (US)": "us-gov-west-1",
  385. }
  386. operatingSystem = strings.ToLower(operatingSystem)
  387. region := locationToRegion[location]
  388. return region + "," + instanceType + "," + operatingSystem
  389. }
  390. func (aws *AWS) GetKey(labels map[string]string) string {
  391. instanceType := labels["beta.kubernetes.io/instance-type"]
  392. operatingSystem := labels["beta.kubernetes.io/os"]
  393. region := labels["failure-domain.beta.kubernetes.io/region"]
  394. return region + "," + instanceType + "," + operatingSystem
  395. }
  396. func (aws *AWS) DownloadPricingData() error {
  397. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  398. if err != nil {
  399. return err
  400. }
  401. inputkeys := make(map[string]bool)
  402. for _, n := range nodeList.Items {
  403. labels := n.GetObjectMeta().GetLabels()
  404. key := aws.GetKey(labels)
  405. inputkeys[key] = true
  406. }
  407. aws.Pricing = make(map[string]*AWSProductTerms)
  408. aws.ValidPricingKeys = make(map[string]bool)
  409. skusToKeys := make(map[string]string)
  410. resp, err := http.Get("https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json")
  411. if err != nil {
  412. return err
  413. }
  414. dec := json.NewDecoder(resp.Body)
  415. for {
  416. t, err := dec.Token()
  417. if err == io.EOF {
  418. fmt.Printf("done \n")
  419. break
  420. }
  421. if t == "products" {
  422. dec.Token() //{
  423. for dec.More() {
  424. dec.Token() // the sku token
  425. product := &AWSProduct{}
  426. err := dec.Decode(&product)
  427. if err != nil {
  428. fmt.Printf("Error: " + err.Error())
  429. break
  430. }
  431. if product.Attributes.PreInstalledSw == "NA" &&
  432. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  433. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  434. if inputkeys[key] {
  435. aws.Pricing[key] = &AWSProductTerms{
  436. Sku: product.Sku,
  437. Memory: product.Attributes.Memory,
  438. Storage: product.Attributes.Storage,
  439. VCpu: product.Attributes.VCpu,
  440. }
  441. skusToKeys[product.Sku] = key
  442. }
  443. aws.ValidPricingKeys[key] = true
  444. }
  445. }
  446. }
  447. if t == "terms" {
  448. dec.Token()
  449. termType, _ := dec.Token()
  450. if termType == "OnDemand" {
  451. dec.Token() // {
  452. for dec.More() {
  453. sku, _ := dec.Token()
  454. dec.Token()
  455. skuOnDemand, _ := dec.Token()
  456. offerTerm := &AWSOfferTerm{}
  457. err := dec.Decode(&offerTerm)
  458. if err != nil {
  459. fmt.Printf("Error: " + err.Error())
  460. }
  461. if sku.(string)+OnDemandRateCode == skuOnDemand {
  462. key, ok := skusToKeys[sku.(string)]
  463. if ok {
  464. aws.Pricing[key].OnDemand = offerTerm
  465. }
  466. }
  467. dec.Token()
  468. }
  469. dec.Token()
  470. }
  471. }
  472. }
  473. if err != nil {
  474. return err
  475. }
  476. log.Printf("Body Parsed")
  477. return nil
  478. }
  479. func (aws *AWS) AllNodePricing() (interface{}, error) {
  480. return aws.Pricing, nil
  481. }
  482. func (aws *AWS) NodePricing(key string) (*Node, error) {
  483. //return json.Marshal(aws.Pricing[key])
  484. terms, ok := aws.Pricing[key]
  485. if ok {
  486. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  487. return &Node{
  488. Cost: cost,
  489. VCPU: terms.VCpu,
  490. RAM: terms.Memory,
  491. Storage: terms.Storage,
  492. }, nil
  493. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  494. err := aws.DownloadPricingData()
  495. if err != nil {
  496. return nil, err
  497. }
  498. terms := aws.Pricing[key]
  499. cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  500. return &Node{
  501. Cost: cost,
  502. VCPU: terms.VCpu,
  503. RAM: terms.Memory,
  504. Storage: terms.Storage,
  505. }, nil
  506. } else {
  507. return nil, errors.New("Invalid Pricing Key: " + key + "\n")
  508. }
  509. }
  510. func (*AWS) ClusterName() ([]byte, error) {
  511. attribute := "AWS Cluster #1"
  512. m := make(map[string]string)
  513. m["name"] = attribute
  514. m["provider"] = "AWS"
  515. return json.Marshal(m)
  516. }
  517. func (*AWS) AddServiceKey(formValues url.Values) error {
  518. keyID := formValues.Get("access_key_ID")
  519. key := formValues.Get("secret_access_key")
  520. m := make(map[string]string)
  521. m["access_key_ID"] = keyID
  522. m["secret_access_key"] = key
  523. json, err := json.Marshal(m)
  524. if err != nil {
  525. return err
  526. }
  527. return ioutil.WriteFile("/var/configs/key.json", json, 0644)
  528. }
  529. func (*AWS) GetDisks() ([]byte, error) {
  530. jsonFile, err := os.Open("/var/configs/key.json")
  531. if err == nil {
  532. byteValue, _ := ioutil.ReadAll(jsonFile)
  533. var result map[string]string
  534. json.Unmarshal([]byte(byteValue), &result)
  535. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  536. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  537. } else if os.IsNotExist(err) {
  538. log.Printf("Using Default Credentials")
  539. } else {
  540. return nil, err
  541. }
  542. defer jsonFile.Close()
  543. byteValue, _ := ioutil.ReadAll(jsonFile)
  544. var result map[string]string
  545. json.Unmarshal([]byte(byteValue), &result)
  546. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  547. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  548. c := &aws.Config{
  549. Region: aws.String("us-east-1"),
  550. Credentials: credentials.NewEnvCredentials(),
  551. }
  552. s := session.Must(session.NewSession(c))
  553. ec2Svc := ec2.New(s)
  554. input := &ec2.DescribeVolumesInput{}
  555. volumeResult, err := ec2Svc.DescribeVolumes(input)
  556. if err != nil {
  557. if aerr, ok := err.(awserr.Error); ok {
  558. switch aerr.Code() {
  559. default:
  560. return nil, aerr
  561. }
  562. } else {
  563. return nil, err
  564. }
  565. }
  566. return json.Marshal(volumeResult)
  567. }
  568. func (*AWS) QuerySQL(query string) ([]byte, error) {
  569. jsonFile, err := os.Open("/var/configs/key.json")
  570. if err == nil {
  571. byteValue, _ := ioutil.ReadAll(jsonFile)
  572. var result map[string]string
  573. json.Unmarshal([]byte(byteValue), &result)
  574. os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
  575. os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
  576. } else if os.IsNotExist(err) {
  577. log.Printf("Using Default Credentials")
  578. } else {
  579. return nil, err
  580. }
  581. defer jsonFile.Close()
  582. athenaConfigs, err := os.Open("/var/configs/athena.json")
  583. if err != nil {
  584. return nil, err
  585. }
  586. defer athenaConfigs.Close()
  587. bytes, _ := ioutil.ReadAll(athenaConfigs)
  588. var athenaConf map[string]string
  589. json.Unmarshal([]byte(bytes), &athenaConf)
  590. region := aws.String(athenaConf["region"])
  591. resultsBucket := athenaConf["output"]
  592. database := athenaConf["database"]
  593. c := &aws.Config{
  594. Region: region,
  595. Credentials: credentials.NewEnvCredentials(),
  596. }
  597. s := session.Must(session.NewSession(c))
  598. svc := athena.New(s)
  599. var e athena.StartQueryExecutionInput
  600. var r athena.ResultConfiguration
  601. r.SetOutputLocation(resultsBucket)
  602. e.SetResultConfiguration(&r)
  603. e.SetQueryString(query)
  604. var q athena.QueryExecutionContext
  605. q.SetDatabase(database)
  606. e.SetQueryExecutionContext(&q)
  607. res, err := svc.StartQueryExecution(&e)
  608. if err != nil {
  609. return nil, err
  610. }
  611. fmt.Println("StartQueryExecution result:")
  612. fmt.Println(res.GoString())
  613. var qri athena.GetQueryExecutionInput
  614. qri.SetQueryExecutionId(*res.QueryExecutionId)
  615. var qrop *athena.GetQueryExecutionOutput
  616. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  617. for {
  618. qrop, err = svc.GetQueryExecution(&qri)
  619. if err != nil {
  620. return nil, err
  621. }
  622. if *qrop.QueryExecution.Status.State != "RUNNING" {
  623. break
  624. }
  625. time.Sleep(duration)
  626. }
  627. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  628. var ip athena.GetQueryResultsInput
  629. ip.SetQueryExecutionId(*res.QueryExecutionId)
  630. op, err := svc.GetQueryResults(&ip)
  631. if err != nil {
  632. return nil, err
  633. }
  634. bytes, err := json.Marshal(op.ResultSet)
  635. if err != nil {
  636. return nil, err
  637. }
  638. return bytes, nil
  639. } else {
  640. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  641. }
  642. }