awsprovider.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strings"
  15. "time"
  16. "k8s.io/klog"
  17. "github.com/aws/aws-sdk-go/aws"
  18. "github.com/aws/aws-sdk-go/aws/awserr"
  19. "github.com/aws/aws-sdk-go/aws/session"
  20. "github.com/aws/aws-sdk-go/service/athena"
  21. "github.com/aws/aws-sdk-go/service/ec2"
  22. "github.com/aws/aws-sdk-go/service/s3"
  23. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  24. "github.com/jszwec/csvutil"
  25. v1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/client-go/kubernetes"
  28. )
  29. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  30. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  31. const supportedSpotFeedVersion = "1"
  32. // AWS represents an Amazon Provider
  33. type AWS struct {
  34. Pricing map[string]*AWSProductTerms
  35. SpotPricingByInstanceID map[string]*spotInfo
  36. ValidPricingKeys map[string]bool
  37. Clientset *kubernetes.Clientset
  38. BaseCPUPrice string
  39. BaseSpotCPUPrice string
  40. BaseSpotRAMPrice string
  41. SpotLabelName string
  42. SpotLabelValue string
  43. ServiceKeyName string
  44. ServiceKeySecret string
  45. SpotDataRegion string
  46. SpotDataBucket string
  47. SpotDataPrefix string
  48. ProjectID string
  49. }
  50. // AWSPricing maps a k8s node to an AWS Pricing "product"
  51. type AWSPricing struct {
  52. Products map[string]*AWSProduct `json:"products"`
  53. Terms AWSPricingTerms `json:"terms"`
  54. }
  55. // AWSProduct represents a purchased SKU
  56. type AWSProduct struct {
  57. Sku string `json:"sku"`
  58. Attributes AWSProductAttributes `json:"attributes"`
  59. }
  60. // AWSProductAttributes represents metadata about the product used to map to a node.
  61. type AWSProductAttributes struct {
  62. Location string `json:"location"`
  63. InstanceType string `json:"instanceType"`
  64. Memory string `json:"memory"`
  65. Storage string `json:"storage"`
  66. VCpu string `json:"vcpu"`
  67. UsageType string `json:"usagetype"`
  68. OperatingSystem string `json:"operatingSystem"`
  69. PreInstalledSw string `json:"preInstalledSw"`
  70. InstanceFamily string `json:"instanceFamily"`
  71. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  72. }
  73. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  74. type AWSPricingTerms struct {
  75. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  76. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  77. }
  78. // AWSOfferTerm is a sku extension used to pay for the node.
  79. type AWSOfferTerm struct {
  80. Sku string `json:"sku"`
  81. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  82. }
  83. // AWSRateCode encodes data about the price of a product
  84. type AWSRateCode struct {
  85. Unit string `json:"unit"`
  86. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  87. }
  88. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  89. type AWSCurrencyCode struct {
  90. USD string `json:"USD"`
  91. }
  92. // AWSProductTerms represents the full terms of the product
  93. type AWSProductTerms struct {
  94. Sku string `json:"sku"`
  95. OnDemand *AWSOfferTerm `json:"OnDemand"`
  96. Reserved *AWSOfferTerm `json:"Reserved"`
  97. Memory string `json:"memory"`
  98. Storage string `json:"storage"`
  99. VCpu string `json:"vcpu"`
  100. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  101. }
  102. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  103. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  104. // OnDemandRateCode is appended to an node sku
  105. const OnDemandRateCode = ".JRTCKXETXF"
  106. // ReservedRateCode is appended to a node sku
  107. const ReservedRateCode = ".38NPMPTW36"
  108. // HourlyRateCode is appended to a node sku
  109. const HourlyRateCode = ".6YS6EN2CT7"
  110. // KubeAttrConversion maps the k8s labels for region to an aws region
  111. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  112. locationToRegion := map[string]string{
  113. "US East (Ohio)": "us-east-2",
  114. "US East (N. Virginia)": "us-east-1",
  115. "US West (N. California)": "us-west-1",
  116. "US West (Oregon)": "us-west-2",
  117. "Asia Pacific (Mumbai)": "ap-south-1",
  118. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  119. "Asia Pacific (Seoul)": "ap-northeast-2",
  120. "Asia Pacific (Singapore)": "ap-southeast-1",
  121. "Asia Pacific (Sydney)": "ap-southeast-2",
  122. "Asia Pacific (Tokyo)": "ap-northeast-1",
  123. "Canada (Central)": "ca-central-1",
  124. "China (Beijing)": "cn-north-1",
  125. "China (Ningxia)": "cn-northwest-1",
  126. "EU (Frankfurt)": "eu-central-1",
  127. "EU (Ireland)": "eu-west-1",
  128. "EU (London)": "eu-west-2",
  129. "EU (Paris)": "eu-west-3",
  130. "EU (Stockholm)": "eu-north-1",
  131. "South America (São Paulo)": "sa-east-1",
  132. "AWS GovCloud (US-East)": "us-gov-east-1",
  133. "AWS GovCloud (US)": "us-gov-west-1",
  134. }
  135. operatingSystem = strings.ToLower(operatingSystem)
  136. region := locationToRegion[location]
  137. return region + "," + instanceType + "," + operatingSystem
  138. }
  139. type AwsSpotFeedInfo struct {
  140. BucketName string `json:"bucketName"`
  141. Prefix string `json:"prefix"`
  142. Region string `json:"region"`
  143. AccountID string `json:"accountId"`
  144. ServiceKeyName string `json:"serviceKeyName"`
  145. ServiceKeySecret string `json:"serviceKeySecret"`
  146. SpotLabel string `json:"spotLabel"`
  147. SpotLabelValue string `json:"spotLabelValue"`
  148. }
  149. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  150. c, err := GetDefaultPricingData("aws.json")
  151. if err != nil {
  152. return nil, err
  153. }
  154. return c, nil
  155. }
  156. func (aws *AWS) UpdateConfig(r io.Reader) (*CustomPricing, error) {
  157. a := AwsSpotFeedInfo{}
  158. err := json.NewDecoder(r).Decode(&a)
  159. if err != nil {
  160. return nil, err
  161. }
  162. c, err := GetDefaultPricingData("aws.json")
  163. if err != nil {
  164. return nil, err
  165. }
  166. c.ServiceKeyName = a.ServiceKeyName
  167. c.ServiceKeySecret = a.ServiceKeySecret
  168. c.SpotDataPrefix = a.Prefix
  169. c.SpotDataBucket = a.BucketName
  170. c.ProjectID = a.AccountID
  171. c.SpotDataRegion = a.Region
  172. c.SpotLabel = a.SpotLabel
  173. c.SpotLabelValue = a.SpotLabelValue
  174. cj, err := json.Marshal(c)
  175. if err != nil {
  176. return nil, err
  177. }
  178. path := os.Getenv("CONFIG_PATH")
  179. if path == "" {
  180. path = "/models/"
  181. }
  182. path += "aws.json"
  183. err = ioutil.WriteFile(path, cj, 0644)
  184. if err != nil {
  185. return nil, err
  186. }
  187. return c, nil
  188. }
  189. type awsKey struct {
  190. SpotLabelName string
  191. SpotLabelValue string
  192. Labels map[string]string
  193. ProviderID string
  194. }
  195. func (k *awsKey) GPUType() string {
  196. return ""
  197. }
  198. func (k *awsKey) ID() string {
  199. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  200. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  201. if matchNum == 2 {
  202. return group
  203. }
  204. }
  205. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  206. return ""
  207. }
  208. func (k *awsKey) Features() string {
  209. instanceType := k.Labels[v1.LabelInstanceType]
  210. var operatingSystem string
  211. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  212. if !ok {
  213. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  214. }
  215. region := k.Labels[v1.LabelZoneRegion]
  216. key := region + "," + instanceType + "," + operatingSystem
  217. usageType := "preemptible"
  218. spotKey := key + "," + usageType
  219. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  220. return spotKey
  221. }
  222. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  223. return spotKey
  224. }
  225. return key
  226. }
  227. // GetKey maps node labels to information needed to retrieve pricing data
  228. func (aws *AWS) GetKey(labels map[string]string) Key {
  229. return &awsKey{
  230. SpotLabelName: aws.SpotLabelName,
  231. SpotLabelValue: aws.SpotLabelValue,
  232. Labels: labels,
  233. ProviderID: labels["providerID"],
  234. }
  235. }
  236. func (aws *AWS) isPreemptible(key string) bool {
  237. s := strings.Split(key, ",")
  238. if len(s) == 4 && s[3] == "preemptible" {
  239. return true
  240. }
  241. return false
  242. }
  243. // DownloadPricingData fetches data from the AWS Pricing API
  244. func (aws *AWS) DownloadPricingData() error {
  245. c, err := GetDefaultPricingData("aws.json")
  246. if err != nil {
  247. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  248. }
  249. aws.BaseCPUPrice = c.CPU
  250. aws.BaseSpotCPUPrice = c.SpotCPU
  251. aws.BaseSpotRAMPrice = c.SpotRAM
  252. aws.SpotLabelName = c.SpotLabel
  253. aws.SpotLabelValue = c.SpotLabelValue
  254. aws.SpotDataBucket = c.SpotDataBucket
  255. aws.SpotDataPrefix = c.SpotDataPrefix
  256. aws.ProjectID = c.ProjectID
  257. aws.SpotDataRegion = c.SpotDataRegion
  258. aws.ServiceKeyName = c.ServiceKeyName
  259. aws.ServiceKeySecret = c.ServiceKeySecret
  260. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  261. return fmt.Errorf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  262. }
  263. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  264. if err != nil {
  265. return err
  266. }
  267. inputkeys := make(map[string]bool)
  268. for _, n := range nodeList.Items {
  269. labels := n.GetObjectMeta().GetLabels()
  270. key := aws.GetKey(labels)
  271. inputkeys[key.Features()] = true
  272. }
  273. aws.Pricing = make(map[string]*AWSProductTerms)
  274. aws.ValidPricingKeys = make(map[string]bool)
  275. skusToKeys := make(map[string]string)
  276. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  277. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  278. resp, err := http.Get(pricingURL)
  279. if err != nil {
  280. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  281. return err
  282. }
  283. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  284. dec := json.NewDecoder(resp.Body)
  285. for {
  286. t, err := dec.Token()
  287. if err == io.EOF {
  288. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  289. break
  290. }
  291. if t == "products" {
  292. _, err := dec.Token() // this should parse the opening "{""
  293. if err != nil {
  294. return err
  295. }
  296. for dec.More() {
  297. _, err := dec.Token() // the sku token
  298. if err != nil {
  299. return err
  300. }
  301. product := &AWSProduct{}
  302. err = dec.Decode(&product)
  303. if err != nil {
  304. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  305. break
  306. }
  307. if product.Attributes.PreInstalledSw == "NA" &&
  308. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  309. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  310. spotKey := key + ",preemptible"
  311. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  312. productTerms := &AWSProductTerms{
  313. Sku: product.Sku,
  314. Memory: product.Attributes.Memory,
  315. Storage: product.Attributes.Storage,
  316. VCpu: product.Attributes.VCpu,
  317. GPU: product.Attributes.GPU,
  318. }
  319. aws.Pricing[key] = productTerms
  320. aws.Pricing[spotKey] = productTerms
  321. skusToKeys[product.Sku] = key
  322. }
  323. aws.ValidPricingKeys[key] = true
  324. aws.ValidPricingKeys[spotKey] = true
  325. }
  326. }
  327. }
  328. if t == "terms" {
  329. _, err := dec.Token() // this should parse the opening "{""
  330. if err != nil {
  331. return err
  332. }
  333. termType, err := dec.Token()
  334. if err != nil {
  335. return err
  336. }
  337. if termType == "OnDemand" {
  338. _, err := dec.Token()
  339. if err != nil { // again, should parse an opening "{"
  340. return err
  341. }
  342. for dec.More() {
  343. sku, err := dec.Token()
  344. if err != nil {
  345. return err
  346. }
  347. _, err = dec.Token() // another opening "{"
  348. if err != nil {
  349. return err
  350. }
  351. skuOnDemand, err := dec.Token()
  352. if err != nil {
  353. return err
  354. }
  355. offerTerm := &AWSOfferTerm{}
  356. err = dec.Decode(&offerTerm)
  357. if err != nil {
  358. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  359. }
  360. if sku.(string)+OnDemandRateCode == skuOnDemand {
  361. key, ok := skusToKeys[sku.(string)]
  362. spotKey := key + ",preemptible"
  363. if ok {
  364. aws.Pricing[key].OnDemand = offerTerm
  365. aws.Pricing[spotKey].OnDemand = offerTerm
  366. }
  367. }
  368. _, err = dec.Token()
  369. if err != nil {
  370. return err
  371. }
  372. }
  373. _, err = dec.Token()
  374. if err != nil {
  375. return err
  376. }
  377. }
  378. }
  379. }
  380. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  381. if err != nil {
  382. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  383. } else {
  384. aws.SpotPricingByInstanceID = sp
  385. }
  386. return nil
  387. }
  388. // AllNodePricing returns all the billing data fetched.
  389. func (aws *AWS) AllNodePricing() (interface{}, error) {
  390. return aws.Pricing, nil
  391. }
  392. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  393. key := k.Features()
  394. if aws.isPreemptible(key) {
  395. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  396. var spotcost string
  397. arr := strings.Split(spotInfo.Charge, " ")
  398. if len(arr) == 2 {
  399. spotcost = arr[0]
  400. } else {
  401. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  402. }
  403. return &Node{
  404. Cost: spotcost,
  405. VCPU: terms.VCpu,
  406. RAM: terms.Memory,
  407. GPU: terms.GPU,
  408. Storage: terms.Storage,
  409. BaseCPUPrice: aws.BaseCPUPrice,
  410. UsageType: usageType,
  411. }, nil
  412. }
  413. return &Node{
  414. VCPU: terms.VCpu,
  415. VCPUCost: aws.BaseSpotCPUPrice,
  416. RAM: terms.Memory,
  417. GPU: terms.GPU,
  418. RAMCost: aws.BaseSpotRAMPrice,
  419. Storage: terms.Storage,
  420. BaseCPUPrice: aws.BaseCPUPrice,
  421. UsageType: usageType,
  422. }, nil
  423. }
  424. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  425. if !ok {
  426. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  427. }
  428. cost := c.PricePerUnit.USD
  429. return &Node{
  430. Cost: cost,
  431. VCPU: terms.VCpu,
  432. RAM: terms.Memory,
  433. GPU: terms.GPU,
  434. Storage: terms.Storage,
  435. BaseCPUPrice: aws.BaseCPUPrice,
  436. UsageType: usageType,
  437. }, nil
  438. }
  439. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  440. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  441. //return json.Marshal(aws.Pricing[key])
  442. key := k.Features()
  443. usageType := "ondemand"
  444. if aws.isPreemptible(key) {
  445. usageType = "preemptible"
  446. }
  447. terms, ok := aws.Pricing[key]
  448. if ok {
  449. return aws.createNode(terms, usageType, k)
  450. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  451. err := aws.DownloadPricingData()
  452. if err != nil {
  453. return nil, err
  454. }
  455. terms, termsOk := aws.Pricing[key]
  456. if !termsOk {
  457. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  458. }
  459. return aws.createNode(terms, usageType, k)
  460. } else { // Fall back to base pricing if we can't find the key.
  461. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  462. return &Node{
  463. Cost: aws.BaseCPUPrice,
  464. BaseCPUPrice: aws.BaseCPUPrice,
  465. UsageType: usageType,
  466. UsesBaseCPUPrice: true,
  467. }, nil
  468. }
  469. }
  470. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  471. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  472. defaultClusterName := "AWS Cluster #1"
  473. makeStructure := func(clusterName string) ([]byte, error) {
  474. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  475. m := make(map[string]string)
  476. m["name"] = clusterName
  477. m["provider"] = "AWS"
  478. return json.Marshal(m)
  479. }
  480. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  481. if len(maybeClusterId) != 0 {
  482. return makeStructure(maybeClusterId)
  483. }
  484. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  485. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  486. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  487. if err != nil {
  488. return nil, err
  489. }
  490. for _, n := range nodeList.Items {
  491. region := ""
  492. instanceId := ""
  493. providerId := n.Spec.ProviderID
  494. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  495. if matchNum == 1 {
  496. region = group
  497. } else if matchNum == 2 {
  498. instanceId = group
  499. }
  500. }
  501. if len(instanceId) == 0 {
  502. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  503. continue
  504. }
  505. c := &aws.Config{
  506. Region: aws.String(region),
  507. }
  508. s := session.Must(session.NewSession(c))
  509. ec2Svc := ec2.New(s)
  510. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  511. InstanceIds: []*string{
  512. aws.String(instanceId),
  513. },
  514. })
  515. if diErr != nil {
  516. // maybe log this?
  517. continue
  518. }
  519. if len(di.Reservations) != 1 {
  520. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  521. continue
  522. }
  523. res := di.Reservations[0]
  524. if len(res.Instances) != 1 {
  525. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  526. continue
  527. }
  528. inst := res.Instances[0]
  529. for _, tag := range inst.Tags {
  530. tagKey := *tag.Key
  531. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  532. if matchNum != 1 {
  533. continue
  534. }
  535. return makeStructure(group)
  536. }
  537. }
  538. }
  539. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  540. return makeStructure(defaultClusterName)
  541. }
  542. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  543. func (*AWS) AddServiceKey(formValues url.Values) error {
  544. keyID := formValues.Get("access_key_ID")
  545. key := formValues.Get("secret_access_key")
  546. m := make(map[string]string)
  547. m["access_key_ID"] = keyID
  548. m["secret_access_key"] = key
  549. result, err := json.Marshal(m)
  550. if err != nil {
  551. return err
  552. }
  553. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  554. }
  555. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  556. func (*AWS) GetDisks() ([]byte, error) {
  557. jsonFile, err := os.Open("/var/configs/key.json")
  558. if err == nil {
  559. byteValue, _ := ioutil.ReadAll(jsonFile)
  560. var result map[string]string
  561. err := json.Unmarshal([]byte(byteValue), &result)
  562. if err != nil {
  563. return nil, err
  564. }
  565. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  566. if err != nil {
  567. return nil, err
  568. }
  569. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  570. if err != nil {
  571. return nil, err
  572. }
  573. } else if os.IsNotExist(err) {
  574. klog.V(2).Infof("Using Default Credentials")
  575. } else {
  576. return nil, err
  577. }
  578. defer jsonFile.Close()
  579. clusterConfig, err := os.Open("/var/configs/cluster.json")
  580. if err != nil {
  581. return nil, err
  582. }
  583. defer clusterConfig.Close()
  584. b, err := ioutil.ReadAll(clusterConfig)
  585. if err != nil {
  586. return nil, err
  587. }
  588. var clusterConf map[string]string
  589. err = json.Unmarshal([]byte(b), &clusterConf)
  590. if err != nil {
  591. return nil, err
  592. }
  593. region := aws.String(clusterConf["region"])
  594. c := &aws.Config{
  595. Region: region,
  596. }
  597. s := session.Must(session.NewSession(c))
  598. ec2Svc := ec2.New(s)
  599. input := &ec2.DescribeVolumesInput{}
  600. volumeResult, err := ec2Svc.DescribeVolumes(input)
  601. if err != nil {
  602. if aerr, ok := err.(awserr.Error); ok {
  603. switch aerr.Code() {
  604. default:
  605. return nil, aerr
  606. }
  607. } else {
  608. return nil, err
  609. }
  610. }
  611. return json.Marshal(volumeResult)
  612. }
  613. func (*AWS) ExternalAllocations(start string, end string) ([]*OutOfClusterAllocation, error) {
  614. return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  615. }
  616. // QuerySQL can query a properly configured Athena database.
  617. // Used to fetch billing data.
  618. // Requires a json config in /var/configs with key region, output, and database.
  619. func (*AWS) QuerySQL(query string) ([]byte, error) {
  620. jsonFile, err := os.Open("/var/configs/key.json")
  621. if err == nil {
  622. byteValue, _ := ioutil.ReadAll(jsonFile)
  623. var result map[string]string
  624. json.Unmarshal([]byte(byteValue), &result)
  625. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  626. if err != nil {
  627. return nil, err
  628. }
  629. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  630. if err != nil {
  631. return nil, err
  632. }
  633. } else if os.IsNotExist(err) {
  634. klog.V(2).Infof("Using Default Credentials")
  635. } else {
  636. return nil, err
  637. }
  638. defer jsonFile.Close()
  639. athenaConfigs, err := os.Open("/var/configs/athena.json")
  640. if err != nil {
  641. return nil, err
  642. }
  643. defer athenaConfigs.Close()
  644. b, err := ioutil.ReadAll(athenaConfigs)
  645. if err != nil {
  646. return nil, err
  647. }
  648. var athenaConf map[string]string
  649. json.Unmarshal([]byte(b), &athenaConf)
  650. region := aws.String(athenaConf["region"])
  651. resultsBucket := athenaConf["output"]
  652. database := athenaConf["database"]
  653. c := &aws.Config{
  654. Region: region,
  655. }
  656. s := session.Must(session.NewSession(c))
  657. svc := athena.New(s)
  658. var e athena.StartQueryExecutionInput
  659. var r athena.ResultConfiguration
  660. r.SetOutputLocation(resultsBucket)
  661. e.SetResultConfiguration(&r)
  662. e.SetQueryString(query)
  663. var q athena.QueryExecutionContext
  664. q.SetDatabase(database)
  665. e.SetQueryExecutionContext(&q)
  666. res, err := svc.StartQueryExecution(&e)
  667. if err != nil {
  668. return nil, err
  669. }
  670. klog.V(2).Infof("StartQueryExecution result:")
  671. klog.V(2).Infof(res.GoString())
  672. var qri athena.GetQueryExecutionInput
  673. qri.SetQueryExecutionId(*res.QueryExecutionId)
  674. var qrop *athena.GetQueryExecutionOutput
  675. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  676. for {
  677. qrop, err = svc.GetQueryExecution(&qri)
  678. if err != nil {
  679. return nil, err
  680. }
  681. if *qrop.QueryExecution.Status.State != "RUNNING" {
  682. break
  683. }
  684. time.Sleep(duration)
  685. }
  686. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  687. var ip athena.GetQueryResultsInput
  688. ip.SetQueryExecutionId(*res.QueryExecutionId)
  689. op, err := svc.GetQueryResults(&ip)
  690. if err != nil {
  691. return nil, err
  692. }
  693. b, err := json.Marshal(op.ResultSet)
  694. if err != nil {
  695. return nil, err
  696. }
  697. return b, nil
  698. }
  699. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  700. }
  701. type spotInfo struct {
  702. Timestamp string `csv:"Timestamp"`
  703. UsageType string `csv:"UsageType"`
  704. Operation string `csv:"Operation"`
  705. InstanceID string `csv:"InstanceID"`
  706. MyBidID string `csv:"MyBidID"`
  707. MyMaxPrice string `csv:"MyMaxPrice"`
  708. MarketPrice string `csv:"MarketPrice"`
  709. Charge string `csv:"Charge"`
  710. Version string `csv:"Version"`
  711. }
  712. type fnames []*string
  713. func (f fnames) Len() int {
  714. return len(f)
  715. }
  716. func (f fnames) Swap(i, j int) {
  717. f[i], f[j] = f[j], f[i]
  718. }
  719. func (f fnames) Less(i, j int) bool {
  720. key1 := strings.Split(*f[i], ".")
  721. key2 := strings.Split(*f[j], ".")
  722. t1, err := time.Parse("2006-01-02-15", key1[1])
  723. if err != nil {
  724. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  725. return false
  726. }
  727. t2, err := time.Parse("2006-01-02-15", key2[1])
  728. if err != nil {
  729. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  730. return false
  731. }
  732. return t1.Before(t2)
  733. }
  734. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  735. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  736. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  737. if err != nil {
  738. return nil, err
  739. }
  740. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  741. if err != nil {
  742. return nil, err
  743. }
  744. }
  745. s3Prefix := projectID
  746. if len(prefix) != 0 {
  747. s3Prefix = prefix + "/" + s3Prefix
  748. }
  749. c := aws.NewConfig().WithRegion(region)
  750. s := session.Must(session.NewSession(c))
  751. s3Svc := s3.New(s)
  752. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  753. tNow := time.Now()
  754. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  755. ls := &s3.ListObjectsInput{
  756. Bucket: aws.String(bucket),
  757. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  758. }
  759. ls2 := &s3.ListObjectsInput{
  760. Bucket: aws.String(bucket),
  761. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  762. }
  763. lso, err := s3Svc.ListObjects(ls)
  764. if err != nil {
  765. return nil, err
  766. }
  767. lsoLen := len(lso.Contents)
  768. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  769. if lsoLen == 0 {
  770. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  771. }
  772. lso2, err := s3Svc.ListObjects(ls2)
  773. if err != nil {
  774. return nil, err
  775. }
  776. lso2Len := len(lso2.Contents)
  777. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  778. if lso2Len == 0 {
  779. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  780. }
  781. var keys []*string
  782. for _, obj := range lso.Contents {
  783. keys = append(keys, obj.Key)
  784. }
  785. for _, obj := range lso2.Contents {
  786. keys = append(keys, obj.Key)
  787. }
  788. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  789. header, err := csvutil.Header(spotInfo{}, "csv")
  790. if err != nil {
  791. return nil, err
  792. }
  793. fieldsPerRecord := len(header)
  794. spots := make(map[string]*spotInfo)
  795. for _, key := range keys {
  796. getObj := &s3.GetObjectInput{
  797. Bucket: aws.String(bucket),
  798. Key: key,
  799. }
  800. buf := aws.NewWriteAtBuffer([]byte{})
  801. _, err := downloader.Download(buf, getObj)
  802. if err != nil {
  803. return nil, err
  804. }
  805. r := bytes.NewReader(buf.Bytes())
  806. gr, err := gzip.NewReader(r)
  807. if err != nil {
  808. return nil, err
  809. }
  810. csvReader := csv.NewReader(gr)
  811. csvReader.Comma = '\t'
  812. csvReader.FieldsPerRecord = fieldsPerRecord
  813. dec, err := csvutil.NewDecoder(csvReader, header...)
  814. if err != nil {
  815. return nil, err
  816. }
  817. var foundVersion string
  818. for {
  819. spot := spotInfo{}
  820. err := dec.Decode(&spot)
  821. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  822. if err == io.EOF {
  823. break
  824. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  825. rec := dec.Record()
  826. // the first two "Record()" will be the comment lines
  827. // and they show up as len() == 1
  828. // the first of which is "#Version"
  829. // the second of which is "#Fields: "
  830. if len(rec) != 1 {
  831. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  832. continue
  833. }
  834. if len(foundVersion) == 0 {
  835. spotFeedVersion := rec[0]
  836. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  837. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  838. if matches != nil {
  839. foundVersion = matches[1]
  840. if foundVersion != supportedSpotFeedVersion {
  841. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  842. break
  843. }
  844. }
  845. continue
  846. } else if strings.Index(rec[0], "#") == 0 {
  847. continue
  848. } else {
  849. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  850. continue
  851. }
  852. } else if err != nil {
  853. klog.V(2).Infof("Error during spot info decode: %+v", err)
  854. continue
  855. }
  856. klog.V(3).Infof("Found spot info %+v", spot)
  857. spots[spot.InstanceID] = &spot
  858. }
  859. gr.Close()
  860. }
  861. return spots, nil
  862. }