awsprovider.go 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "k8s.io/klog"
  18. "github.com/aws/aws-sdk-go/aws"
  19. "github.com/aws/aws-sdk-go/aws/awserr"
  20. "github.com/aws/aws-sdk-go/aws/session"
  21. "github.com/aws/aws-sdk-go/service/athena"
  22. "github.com/aws/aws-sdk-go/service/ec2"
  23. "github.com/aws/aws-sdk-go/service/s3"
  24. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  25. "github.com/jszwec/csvutil"
  26. v1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/client-go/kubernetes"
  29. )
  30. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  31. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  32. const supportedSpotFeedVersion = "1"
  33. const SpotInfoUpdateType = "spotinfo"
  34. const AthenaInfoUpdateType = "athenainfo"
  35. // AWS represents an Amazon Provider
  36. type AWS struct {
  37. Pricing map[string]*AWSProductTerms
  38. SpotPricingByInstanceID map[string]*spotInfo
  39. ValidPricingKeys map[string]bool
  40. Clientset *kubernetes.Clientset
  41. BaseCPUPrice string
  42. BaseSpotCPUPrice string
  43. BaseSpotRAMPrice string
  44. SpotLabelName string
  45. SpotLabelValue string
  46. ServiceKeyName string
  47. ServiceKeySecret string
  48. SpotDataRegion string
  49. SpotDataBucket string
  50. SpotDataPrefix string
  51. ProjectID string
  52. *CustomProvider
  53. }
  54. // AWSPricing maps a k8s node to an AWS Pricing "product"
  55. type AWSPricing struct {
  56. Products map[string]*AWSProduct `json:"products"`
  57. Terms AWSPricingTerms `json:"terms"`
  58. }
  59. // AWSProduct represents a purchased SKU
  60. type AWSProduct struct {
  61. Sku string `json:"sku"`
  62. Attributes AWSProductAttributes `json:"attributes"`
  63. }
  64. // AWSProductAttributes represents metadata about the product used to map to a node.
  65. type AWSProductAttributes struct {
  66. Location string `json:"location"`
  67. InstanceType string `json:"instanceType"`
  68. Memory string `json:"memory"`
  69. Storage string `json:"storage"`
  70. VCpu string `json:"vcpu"`
  71. UsageType string `json:"usagetype"`
  72. OperatingSystem string `json:"operatingSystem"`
  73. PreInstalledSw string `json:"preInstalledSw"`
  74. InstanceFamily string `json:"instanceFamily"`
  75. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  76. }
  77. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  78. type AWSPricingTerms struct {
  79. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  80. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  81. }
  82. // AWSOfferTerm is a sku extension used to pay for the node.
  83. type AWSOfferTerm struct {
  84. Sku string `json:"sku"`
  85. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  86. }
  87. // AWSRateCode encodes data about the price of a product
  88. type AWSRateCode struct {
  89. Unit string `json:"unit"`
  90. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  91. }
  92. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  93. type AWSCurrencyCode struct {
  94. USD string `json:"USD"`
  95. }
  96. // AWSProductTerms represents the full terms of the product
  97. type AWSProductTerms struct {
  98. Sku string `json:"sku"`
  99. OnDemand *AWSOfferTerm `json:"OnDemand"`
  100. Reserved *AWSOfferTerm `json:"Reserved"`
  101. Memory string `json:"memory"`
  102. Storage string `json:"storage"`
  103. VCpu string `json:"vcpu"`
  104. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  105. }
  106. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  107. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  108. // OnDemandRateCode is appended to an node sku
  109. const OnDemandRateCode = ".JRTCKXETXF"
  110. // ReservedRateCode is appended to a node sku
  111. const ReservedRateCode = ".38NPMPTW36"
  112. // HourlyRateCode is appended to a node sku
  113. const HourlyRateCode = ".6YS6EN2CT7"
  114. // KubeAttrConversion maps the k8s labels for region to an aws region
  115. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  116. locationToRegion := map[string]string{
  117. "US East (Ohio)": "us-east-2",
  118. "US East (N. Virginia)": "us-east-1",
  119. "US West (N. California)": "us-west-1",
  120. "US West (Oregon)": "us-west-2",
  121. "Asia Pacific (Mumbai)": "ap-south-1",
  122. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  123. "Asia Pacific (Seoul)": "ap-northeast-2",
  124. "Asia Pacific (Singapore)": "ap-southeast-1",
  125. "Asia Pacific (Sydney)": "ap-southeast-2",
  126. "Asia Pacific (Tokyo)": "ap-northeast-1",
  127. "Canada (Central)": "ca-central-1",
  128. "China (Beijing)": "cn-north-1",
  129. "China (Ningxia)": "cn-northwest-1",
  130. "EU (Frankfurt)": "eu-central-1",
  131. "EU (Ireland)": "eu-west-1",
  132. "EU (London)": "eu-west-2",
  133. "EU (Paris)": "eu-west-3",
  134. "EU (Stockholm)": "eu-north-1",
  135. "South America (São Paulo)": "sa-east-1",
  136. "AWS GovCloud (US-East)": "us-gov-east-1",
  137. "AWS GovCloud (US)": "us-gov-west-1",
  138. }
  139. operatingSystem = strings.ToLower(operatingSystem)
  140. region := locationToRegion[location]
  141. return region + "," + instanceType + "," + operatingSystem
  142. }
  143. type AwsSpotFeedInfo struct {
  144. BucketName string `json:"bucketName"`
  145. Prefix string `json:"prefix"`
  146. Region string `json:"region"`
  147. AccountID string `json:"projectID"`
  148. ServiceKeyName string `json:"serviceKeyName"`
  149. ServiceKeySecret string `json:"serviceKeySecret"`
  150. SpotLabel string `json:"spotLabel"`
  151. SpotLabelValue string `json:"spotLabelValue"`
  152. }
  153. type AwsAthenaInfo struct {
  154. AthenaBucketName string `json:"athenaBucketName"`
  155. AthenaRegion string `json:"athenaRegion"`
  156. AthenaDatabase string `json:"athenaDatabase"`
  157. AthenaTable string `json:"athenaTable"`
  158. ServiceKeyName string `json:"serviceKeyName"`
  159. ServiceKeySecret string `json:"serviceKeySecret"`
  160. AccountID string `json:"projectID"`
  161. }
  162. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  163. c, err := GetDefaultPricingData("aws.json")
  164. if err != nil {
  165. return nil, err
  166. }
  167. return c, nil
  168. }
  169. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  170. c, err := GetDefaultPricingData("aws.json")
  171. if err != nil {
  172. return nil, err
  173. }
  174. if updateType == SpotInfoUpdateType {
  175. a := AwsSpotFeedInfo{}
  176. err := json.NewDecoder(r).Decode(&a)
  177. if err != nil {
  178. return nil, err
  179. }
  180. if err != nil {
  181. return nil, err
  182. }
  183. c.ServiceKeyName = a.ServiceKeyName
  184. c.ServiceKeySecret = a.ServiceKeySecret
  185. c.SpotDataPrefix = a.Prefix
  186. c.SpotDataBucket = a.BucketName
  187. c.ProjectID = a.AccountID
  188. c.SpotDataRegion = a.Region
  189. c.SpotLabel = a.SpotLabel
  190. c.SpotLabelValue = a.SpotLabelValue
  191. } else if updateType == AthenaInfoUpdateType {
  192. a := AwsAthenaInfo{}
  193. err := json.NewDecoder(r).Decode(&a)
  194. if err != nil {
  195. return nil, err
  196. }
  197. c.AthenaBucketName = a.AthenaBucketName
  198. c.AthenaRegion = a.AthenaRegion
  199. c.AthenaDatabase = a.AthenaDatabase
  200. c.AthenaTable = a.AthenaTable
  201. c.ServiceKeyName = a.ServiceKeyName
  202. c.ServiceKeySecret = a.ServiceKeySecret
  203. c.ProjectID = a.AccountID
  204. } else {
  205. a := make(map[string]string)
  206. err = json.NewDecoder(r).Decode(&a)
  207. if err != nil {
  208. return nil, err
  209. }
  210. for k, v := range a {
  211. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  212. err := SetCustomPricingField(c, kUpper, v)
  213. if err != nil {
  214. return nil, err
  215. }
  216. }
  217. }
  218. cj, err := json.Marshal(c)
  219. if err != nil {
  220. return nil, err
  221. }
  222. path := os.Getenv("CONFIG_PATH")
  223. if path == "" {
  224. path = "/models/"
  225. }
  226. path += "aws.json"
  227. err = ioutil.WriteFile(path, cj, 0644)
  228. if err != nil {
  229. return nil, err
  230. }
  231. return c, nil
  232. }
  233. type awsKey struct {
  234. SpotLabelName string
  235. SpotLabelValue string
  236. Labels map[string]string
  237. ProviderID string
  238. }
  239. func (k *awsKey) GPUType() string {
  240. return ""
  241. }
  242. func (k *awsKey) ID() string {
  243. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  244. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  245. if matchNum == 2 {
  246. return group
  247. }
  248. }
  249. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  250. return ""
  251. }
  252. func (k *awsKey) Features() string {
  253. instanceType := k.Labels[v1.LabelInstanceType]
  254. var operatingSystem string
  255. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  256. if !ok {
  257. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  258. }
  259. region := k.Labels[v1.LabelZoneRegion]
  260. key := region + "," + instanceType + "," + operatingSystem
  261. usageType := "preemptible"
  262. spotKey := key + "," + usageType
  263. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  264. return spotKey
  265. }
  266. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  267. return spotKey
  268. }
  269. return key
  270. }
  271. // GetKey maps node labels to information needed to retrieve pricing data
  272. func (aws *AWS) GetKey(labels map[string]string) Key {
  273. return &awsKey{
  274. SpotLabelName: aws.SpotLabelName,
  275. SpotLabelValue: aws.SpotLabelValue,
  276. Labels: labels,
  277. ProviderID: labels["providerID"],
  278. }
  279. }
  280. func (aws *AWS) isPreemptible(key string) bool {
  281. s := strings.Split(key, ",")
  282. if len(s) == 4 && s[3] == "preemptible" {
  283. return true
  284. }
  285. return false
  286. }
  287. // DownloadPricingData fetches data from the AWS Pricing API
  288. func (aws *AWS) DownloadPricingData() error {
  289. c, err := GetDefaultPricingData("aws.json")
  290. if err != nil {
  291. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  292. }
  293. aws.BaseCPUPrice = c.CPU
  294. aws.BaseSpotCPUPrice = c.SpotCPU
  295. aws.BaseSpotRAMPrice = c.SpotRAM
  296. aws.SpotLabelName = c.SpotLabel
  297. aws.SpotLabelValue = c.SpotLabelValue
  298. aws.SpotDataBucket = c.SpotDataBucket
  299. aws.SpotDataPrefix = c.SpotDataPrefix
  300. aws.ProjectID = c.ProjectID
  301. aws.SpotDataRegion = c.SpotDataRegion
  302. aws.ServiceKeyName = c.ServiceKeyName
  303. aws.ServiceKeySecret = c.ServiceKeySecret
  304. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  305. return fmt.Errorf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  306. }
  307. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  308. if err != nil {
  309. return err
  310. }
  311. inputkeys := make(map[string]bool)
  312. for _, n := range nodeList.Items {
  313. labels := n.GetObjectMeta().GetLabels()
  314. key := aws.GetKey(labels)
  315. inputkeys[key.Features()] = true
  316. }
  317. aws.Pricing = make(map[string]*AWSProductTerms)
  318. aws.ValidPricingKeys = make(map[string]bool)
  319. skusToKeys := make(map[string]string)
  320. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  321. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  322. resp, err := http.Get(pricingURL)
  323. if err != nil {
  324. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  325. return err
  326. }
  327. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  328. dec := json.NewDecoder(resp.Body)
  329. for {
  330. t, err := dec.Token()
  331. if err == io.EOF {
  332. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  333. break
  334. }
  335. if t == "products" {
  336. _, err := dec.Token() // this should parse the opening "{""
  337. if err != nil {
  338. return err
  339. }
  340. for dec.More() {
  341. _, err := dec.Token() // the sku token
  342. if err != nil {
  343. return err
  344. }
  345. product := &AWSProduct{}
  346. err = dec.Decode(&product)
  347. if err != nil {
  348. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  349. break
  350. }
  351. if product.Attributes.PreInstalledSw == "NA" &&
  352. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  353. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  354. spotKey := key + ",preemptible"
  355. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  356. productTerms := &AWSProductTerms{
  357. Sku: product.Sku,
  358. Memory: product.Attributes.Memory,
  359. Storage: product.Attributes.Storage,
  360. VCpu: product.Attributes.VCpu,
  361. GPU: product.Attributes.GPU,
  362. }
  363. aws.Pricing[key] = productTerms
  364. aws.Pricing[spotKey] = productTerms
  365. skusToKeys[product.Sku] = key
  366. }
  367. aws.ValidPricingKeys[key] = true
  368. aws.ValidPricingKeys[spotKey] = true
  369. }
  370. }
  371. }
  372. if t == "terms" {
  373. _, err := dec.Token() // this should parse the opening "{""
  374. if err != nil {
  375. return err
  376. }
  377. termType, err := dec.Token()
  378. if err != nil {
  379. return err
  380. }
  381. if termType == "OnDemand" {
  382. _, err := dec.Token()
  383. if err != nil { // again, should parse an opening "{"
  384. return err
  385. }
  386. for dec.More() {
  387. sku, err := dec.Token()
  388. if err != nil {
  389. return err
  390. }
  391. _, err = dec.Token() // another opening "{"
  392. if err != nil {
  393. return err
  394. }
  395. skuOnDemand, err := dec.Token()
  396. if err != nil {
  397. return err
  398. }
  399. offerTerm := &AWSOfferTerm{}
  400. err = dec.Decode(&offerTerm)
  401. if err != nil {
  402. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  403. }
  404. if sku.(string)+OnDemandRateCode == skuOnDemand {
  405. key, ok := skusToKeys[sku.(string)]
  406. spotKey := key + ",preemptible"
  407. if ok {
  408. aws.Pricing[key].OnDemand = offerTerm
  409. aws.Pricing[spotKey].OnDemand = offerTerm
  410. }
  411. }
  412. _, err = dec.Token()
  413. if err != nil {
  414. return err
  415. }
  416. }
  417. _, err = dec.Token()
  418. if err != nil {
  419. return err
  420. }
  421. }
  422. }
  423. }
  424. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  425. if err != nil {
  426. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  427. } else {
  428. aws.SpotPricingByInstanceID = sp
  429. }
  430. return nil
  431. }
  432. // AllNodePricing returns all the billing data fetched.
  433. func (aws *AWS) AllNodePricing() (interface{}, error) {
  434. return aws.Pricing, nil
  435. }
  436. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  437. key := k.Features()
  438. if aws.isPreemptible(key) {
  439. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  440. var spotcost string
  441. arr := strings.Split(spotInfo.Charge, " ")
  442. if len(arr) == 2 {
  443. spotcost = arr[0]
  444. } else {
  445. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  446. }
  447. return &Node{
  448. Cost: spotcost,
  449. VCPU: terms.VCpu,
  450. RAM: terms.Memory,
  451. GPU: terms.GPU,
  452. Storage: terms.Storage,
  453. BaseCPUPrice: aws.BaseCPUPrice,
  454. UsageType: usageType,
  455. }, nil
  456. }
  457. return &Node{
  458. VCPU: terms.VCpu,
  459. VCPUCost: aws.BaseSpotCPUPrice,
  460. RAM: terms.Memory,
  461. GPU: terms.GPU,
  462. RAMCost: aws.BaseSpotRAMPrice,
  463. Storage: terms.Storage,
  464. BaseCPUPrice: aws.BaseCPUPrice,
  465. UsageType: usageType,
  466. }, nil
  467. }
  468. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  469. if !ok {
  470. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  471. }
  472. cost := c.PricePerUnit.USD
  473. return &Node{
  474. Cost: cost,
  475. VCPU: terms.VCpu,
  476. RAM: terms.Memory,
  477. GPU: terms.GPU,
  478. Storage: terms.Storage,
  479. BaseCPUPrice: aws.BaseCPUPrice,
  480. UsageType: usageType,
  481. }, nil
  482. }
  483. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  484. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  485. //return json.Marshal(aws.Pricing[key])
  486. key := k.Features()
  487. usageType := "ondemand"
  488. if aws.isPreemptible(key) {
  489. usageType = "preemptible"
  490. }
  491. terms, ok := aws.Pricing[key]
  492. if ok {
  493. return aws.createNode(terms, usageType, k)
  494. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  495. err := aws.DownloadPricingData()
  496. if err != nil {
  497. return nil, err
  498. }
  499. terms, termsOk := aws.Pricing[key]
  500. if !termsOk {
  501. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  502. }
  503. return aws.createNode(terms, usageType, k)
  504. } else { // Fall back to base pricing if we can't find the key.
  505. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  506. return &Node{
  507. Cost: aws.BaseCPUPrice,
  508. BaseCPUPrice: aws.BaseCPUPrice,
  509. UsageType: usageType,
  510. UsesBaseCPUPrice: true,
  511. }, nil
  512. }
  513. }
  514. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  515. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  516. defaultClusterName := "AWS Cluster #1"
  517. makeStructure := func(clusterName string) ([]byte, error) {
  518. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  519. m := make(map[string]string)
  520. m["name"] = clusterName
  521. m["provider"] = "AWS"
  522. return json.Marshal(m)
  523. }
  524. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  525. if len(maybeClusterId) != 0 {
  526. return makeStructure(maybeClusterId)
  527. }
  528. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  529. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  530. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  531. if err != nil {
  532. return nil, err
  533. }
  534. for _, n := range nodeList.Items {
  535. region := ""
  536. instanceId := ""
  537. providerId := n.Spec.ProviderID
  538. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  539. if matchNum == 1 {
  540. region = group
  541. } else if matchNum == 2 {
  542. instanceId = group
  543. }
  544. }
  545. if len(instanceId) == 0 {
  546. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  547. continue
  548. }
  549. c := &aws.Config{
  550. Region: aws.String(region),
  551. }
  552. s := session.Must(session.NewSession(c))
  553. ec2Svc := ec2.New(s)
  554. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  555. InstanceIds: []*string{
  556. aws.String(instanceId),
  557. },
  558. })
  559. if diErr != nil {
  560. // maybe log this?
  561. continue
  562. }
  563. if len(di.Reservations) != 1 {
  564. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  565. continue
  566. }
  567. res := di.Reservations[0]
  568. if len(res.Instances) != 1 {
  569. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  570. continue
  571. }
  572. inst := res.Instances[0]
  573. for _, tag := range inst.Tags {
  574. tagKey := *tag.Key
  575. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  576. if matchNum != 1 {
  577. continue
  578. }
  579. return makeStructure(group)
  580. }
  581. }
  582. }
  583. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  584. return makeStructure(defaultClusterName)
  585. }
  586. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  587. func (*AWS) AddServiceKey(formValues url.Values) error {
  588. keyID := formValues.Get("access_key_ID")
  589. key := formValues.Get("secret_access_key")
  590. m := make(map[string]string)
  591. m["access_key_ID"] = keyID
  592. m["secret_access_key"] = key
  593. result, err := json.Marshal(m)
  594. if err != nil {
  595. return err
  596. }
  597. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  598. }
  599. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  600. func (*AWS) GetDisks() ([]byte, error) {
  601. jsonFile, err := os.Open("/var/configs/key.json")
  602. if err == nil {
  603. byteValue, _ := ioutil.ReadAll(jsonFile)
  604. var result map[string]string
  605. err := json.Unmarshal([]byte(byteValue), &result)
  606. if err != nil {
  607. return nil, err
  608. }
  609. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  610. if err != nil {
  611. return nil, err
  612. }
  613. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  614. if err != nil {
  615. return nil, err
  616. }
  617. } else if os.IsNotExist(err) {
  618. klog.V(2).Infof("Using Default Credentials")
  619. } else {
  620. return nil, err
  621. }
  622. defer jsonFile.Close()
  623. clusterConfig, err := os.Open("/var/configs/cluster.json")
  624. if err != nil {
  625. return nil, err
  626. }
  627. defer clusterConfig.Close()
  628. b, err := ioutil.ReadAll(clusterConfig)
  629. if err != nil {
  630. return nil, err
  631. }
  632. var clusterConf map[string]string
  633. err = json.Unmarshal([]byte(b), &clusterConf)
  634. if err != nil {
  635. return nil, err
  636. }
  637. region := aws.String(clusterConf["region"])
  638. c := &aws.Config{
  639. Region: region,
  640. }
  641. s := session.Must(session.NewSession(c))
  642. ec2Svc := ec2.New(s)
  643. input := &ec2.DescribeVolumesInput{}
  644. volumeResult, err := ec2Svc.DescribeVolumes(input)
  645. if err != nil {
  646. if aerr, ok := err.(awserr.Error); ok {
  647. switch aerr.Code() {
  648. default:
  649. return nil, aerr
  650. }
  651. } else {
  652. return nil, err
  653. }
  654. }
  655. return json.Marshal(volumeResult)
  656. }
  657. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  658. customPricing, err := a.GetConfig()
  659. if err != nil {
  660. return nil, err
  661. }
  662. query := fmt.Sprintf(`SELECT
  663. CAST(line_item_usage_start_date AS DATE) as start_date,
  664. resource_tags_user_kubernetes_%s,
  665. line_item_product_code,
  666. SUM(line_item_blended_cost) as blended_cost
  667. FROM %s as cost_data
  668. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  669. GROUP BY 1,2,3`, aggregator, customPricing.AthenaTable, start, end)
  670. if customPricing.ServiceKeyName != "" {
  671. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  672. if err != nil {
  673. return nil, err
  674. }
  675. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  676. if err != nil {
  677. return nil, err
  678. }
  679. }
  680. region := aws.String(customPricing.AthenaRegion)
  681. resultsBucket := customPricing.AthenaBucketName
  682. database := customPricing.AthenaDatabase
  683. c := &aws.Config{
  684. Region: region,
  685. }
  686. s := session.Must(session.NewSession(c))
  687. svc := athena.New(s)
  688. var e athena.StartQueryExecutionInput
  689. var r athena.ResultConfiguration
  690. r.SetOutputLocation(resultsBucket)
  691. e.SetResultConfiguration(&r)
  692. e.SetQueryString(query)
  693. var q athena.QueryExecutionContext
  694. q.SetDatabase(database)
  695. e.SetQueryExecutionContext(&q)
  696. res, err := svc.StartQueryExecution(&e)
  697. if err != nil {
  698. return nil, err
  699. }
  700. klog.V(2).Infof("StartQueryExecution result:")
  701. klog.V(2).Infof(res.GoString())
  702. var qri athena.GetQueryExecutionInput
  703. qri.SetQueryExecutionId(*res.QueryExecutionId)
  704. var qrop *athena.GetQueryExecutionOutput
  705. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  706. for {
  707. qrop, err = svc.GetQueryExecution(&qri)
  708. if err != nil {
  709. return nil, err
  710. }
  711. if *qrop.QueryExecution.Status.State != "RUNNING" {
  712. break
  713. }
  714. time.Sleep(duration)
  715. }
  716. var oocAllocs []*OutOfClusterAllocation
  717. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  718. var ip athena.GetQueryResultsInput
  719. ip.SetQueryExecutionId(*res.QueryExecutionId)
  720. op, err := svc.GetQueryResults(&ip)
  721. if err != nil {
  722. return nil, err
  723. }
  724. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  725. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  726. if err != nil {
  727. return nil, err
  728. }
  729. ooc := &OutOfClusterAllocation{
  730. Aggregator: aggregator,
  731. Environment: *r.Data[1].VarCharValue,
  732. Service: *r.Data[2].VarCharValue,
  733. Cost: cost,
  734. }
  735. oocAllocs = append(oocAllocs, ooc)
  736. }
  737. }
  738. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  739. }
  740. // QuerySQL can query a properly configured Athena database.
  741. // Used to fetch billing data.
  742. // Requires a json config in /var/configs with key region, output, and database.
  743. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  744. customPricing, err := a.GetConfig()
  745. if err != nil {
  746. return nil, err
  747. }
  748. if customPricing.ServiceKeyName != "" {
  749. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  750. if err != nil {
  751. return nil, err
  752. }
  753. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  754. if err != nil {
  755. return nil, err
  756. }
  757. }
  758. athenaConfigs, err := os.Open("/var/configs/athena.json")
  759. if err != nil {
  760. return nil, err
  761. }
  762. defer athenaConfigs.Close()
  763. b, err := ioutil.ReadAll(athenaConfigs)
  764. if err != nil {
  765. return nil, err
  766. }
  767. var athenaConf map[string]string
  768. json.Unmarshal([]byte(b), &athenaConf)
  769. region := aws.String(customPricing.AthenaRegion)
  770. resultsBucket := customPricing.AthenaBucketName
  771. database := customPricing.AthenaDatabase
  772. c := &aws.Config{
  773. Region: region,
  774. }
  775. s := session.Must(session.NewSession(c))
  776. svc := athena.New(s)
  777. var e athena.StartQueryExecutionInput
  778. var r athena.ResultConfiguration
  779. r.SetOutputLocation(resultsBucket)
  780. e.SetResultConfiguration(&r)
  781. e.SetQueryString(query)
  782. var q athena.QueryExecutionContext
  783. q.SetDatabase(database)
  784. e.SetQueryExecutionContext(&q)
  785. res, err := svc.StartQueryExecution(&e)
  786. if err != nil {
  787. return nil, err
  788. }
  789. klog.V(2).Infof("StartQueryExecution result:")
  790. klog.V(2).Infof(res.GoString())
  791. var qri athena.GetQueryExecutionInput
  792. qri.SetQueryExecutionId(*res.QueryExecutionId)
  793. var qrop *athena.GetQueryExecutionOutput
  794. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  795. for {
  796. qrop, err = svc.GetQueryExecution(&qri)
  797. if err != nil {
  798. return nil, err
  799. }
  800. if *qrop.QueryExecution.Status.State != "RUNNING" {
  801. break
  802. }
  803. time.Sleep(duration)
  804. }
  805. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  806. var ip athena.GetQueryResultsInput
  807. ip.SetQueryExecutionId(*res.QueryExecutionId)
  808. op, err := svc.GetQueryResults(&ip)
  809. if err != nil {
  810. return nil, err
  811. }
  812. b, err := json.Marshal(op.ResultSet)
  813. if err != nil {
  814. return nil, err
  815. }
  816. return b, nil
  817. }
  818. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  819. }
  820. type spotInfo struct {
  821. Timestamp string `csv:"Timestamp"`
  822. UsageType string `csv:"UsageType"`
  823. Operation string `csv:"Operation"`
  824. InstanceID string `csv:"InstanceID"`
  825. MyBidID string `csv:"MyBidID"`
  826. MyMaxPrice string `csv:"MyMaxPrice"`
  827. MarketPrice string `csv:"MarketPrice"`
  828. Charge string `csv:"Charge"`
  829. Version string `csv:"Version"`
  830. }
  831. type fnames []*string
  832. func (f fnames) Len() int {
  833. return len(f)
  834. }
  835. func (f fnames) Swap(i, j int) {
  836. f[i], f[j] = f[j], f[i]
  837. }
  838. func (f fnames) Less(i, j int) bool {
  839. key1 := strings.Split(*f[i], ".")
  840. key2 := strings.Split(*f[j], ".")
  841. t1, err := time.Parse("2006-01-02-15", key1[1])
  842. if err != nil {
  843. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  844. return false
  845. }
  846. t2, err := time.Parse("2006-01-02-15", key2[1])
  847. if err != nil {
  848. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  849. return false
  850. }
  851. return t1.Before(t2)
  852. }
  853. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  854. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  855. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  856. if err != nil {
  857. return nil, err
  858. }
  859. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  860. if err != nil {
  861. return nil, err
  862. }
  863. }
  864. s3Prefix := projectID
  865. if len(prefix) != 0 {
  866. s3Prefix = prefix + "/" + s3Prefix
  867. }
  868. c := aws.NewConfig().WithRegion(region)
  869. s := session.Must(session.NewSession(c))
  870. s3Svc := s3.New(s)
  871. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  872. tNow := time.Now()
  873. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  874. ls := &s3.ListObjectsInput{
  875. Bucket: aws.String(bucket),
  876. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  877. }
  878. ls2 := &s3.ListObjectsInput{
  879. Bucket: aws.String(bucket),
  880. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  881. }
  882. lso, err := s3Svc.ListObjects(ls)
  883. if err != nil {
  884. return nil, err
  885. }
  886. lsoLen := len(lso.Contents)
  887. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  888. if lsoLen == 0 {
  889. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  890. }
  891. lso2, err := s3Svc.ListObjects(ls2)
  892. if err != nil {
  893. return nil, err
  894. }
  895. lso2Len := len(lso2.Contents)
  896. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  897. if lso2Len == 0 {
  898. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  899. }
  900. var keys []*string
  901. for _, obj := range lso.Contents {
  902. keys = append(keys, obj.Key)
  903. }
  904. for _, obj := range lso2.Contents {
  905. keys = append(keys, obj.Key)
  906. }
  907. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  908. header, err := csvutil.Header(spotInfo{}, "csv")
  909. if err != nil {
  910. return nil, err
  911. }
  912. fieldsPerRecord := len(header)
  913. spots := make(map[string]*spotInfo)
  914. for _, key := range keys {
  915. getObj := &s3.GetObjectInput{
  916. Bucket: aws.String(bucket),
  917. Key: key,
  918. }
  919. buf := aws.NewWriteAtBuffer([]byte{})
  920. _, err := downloader.Download(buf, getObj)
  921. if err != nil {
  922. return nil, err
  923. }
  924. r := bytes.NewReader(buf.Bytes())
  925. gr, err := gzip.NewReader(r)
  926. if err != nil {
  927. return nil, err
  928. }
  929. csvReader := csv.NewReader(gr)
  930. csvReader.Comma = '\t'
  931. csvReader.FieldsPerRecord = fieldsPerRecord
  932. dec, err := csvutil.NewDecoder(csvReader, header...)
  933. if err != nil {
  934. return nil, err
  935. }
  936. var foundVersion string
  937. for {
  938. spot := spotInfo{}
  939. err := dec.Decode(&spot)
  940. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  941. if err == io.EOF {
  942. break
  943. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  944. rec := dec.Record()
  945. // the first two "Record()" will be the comment lines
  946. // and they show up as len() == 1
  947. // the first of which is "#Version"
  948. // the second of which is "#Fields: "
  949. if len(rec) != 1 {
  950. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  951. continue
  952. }
  953. if len(foundVersion) == 0 {
  954. spotFeedVersion := rec[0]
  955. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  956. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  957. if matches != nil {
  958. foundVersion = matches[1]
  959. if foundVersion != supportedSpotFeedVersion {
  960. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  961. break
  962. }
  963. }
  964. continue
  965. } else if strings.Index(rec[0], "#") == 0 {
  966. continue
  967. } else {
  968. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  969. continue
  970. }
  971. } else if err != nil {
  972. klog.V(2).Infof("Error during spot info decode: %+v", err)
  973. continue
  974. }
  975. klog.V(3).Infof("Found spot info %+v", spot)
  976. spots[spot.InstanceID] = &spot
  977. }
  978. gr.Close()
  979. }
  980. return spots, nil
  981. }