awsprovider.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "k8s.io/klog"
  19. "github.com/aws/aws-sdk-go/aws"
  20. "github.com/aws/aws-sdk-go/aws/awserr"
  21. "github.com/aws/aws-sdk-go/aws/session"
  22. "github.com/aws/aws-sdk-go/service/athena"
  23. "github.com/aws/aws-sdk-go/service/ec2"
  24. "github.com/aws/aws-sdk-go/service/s3"
  25. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  26. "github.com/jszwec/csvutil"
  27. v1 "k8s.io/api/core/v1"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/client-go/kubernetes"
  30. )
  31. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  32. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  33. const supportedSpotFeedVersion = "1"
  34. const SpotInfoUpdateType = "spotinfo"
  35. const AthenaInfoUpdateType = "athenainfo"
  36. // AWS represents an Amazon Provider
  37. type AWS struct {
  38. Pricing map[string]*AWSProductTerms
  39. SpotPricingByInstanceID map[string]*spotInfo
  40. ValidPricingKeys map[string]bool
  41. Clientset *kubernetes.Clientset
  42. BaseCPUPrice string
  43. BaseSpotCPUPrice string
  44. BaseSpotRAMPrice string
  45. SpotLabelName string
  46. SpotLabelValue string
  47. ServiceKeyName string
  48. ServiceKeySecret string
  49. SpotDataRegion string
  50. SpotDataBucket string
  51. SpotDataPrefix string
  52. ProjectID string
  53. DownloadPricingDataLock sync.RWMutex
  54. *CustomProvider
  55. }
  56. // AWSPricing maps a k8s node to an AWS Pricing "product"
  57. type AWSPricing struct {
  58. Products map[string]*AWSProduct `json:"products"`
  59. Terms AWSPricingTerms `json:"terms"`
  60. }
  61. // AWSProduct represents a purchased SKU
  62. type AWSProduct struct {
  63. Sku string `json:"sku"`
  64. Attributes AWSProductAttributes `json:"attributes"`
  65. }
  66. // AWSProductAttributes represents metadata about the product used to map to a node.
  67. type AWSProductAttributes struct {
  68. Location string `json:"location"`
  69. InstanceType string `json:"instanceType"`
  70. Memory string `json:"memory"`
  71. Storage string `json:"storage"`
  72. VCpu string `json:"vcpu"`
  73. UsageType string `json:"usagetype"`
  74. OperatingSystem string `json:"operatingSystem"`
  75. PreInstalledSw string `json:"preInstalledSw"`
  76. InstanceFamily string `json:"instanceFamily"`
  77. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  78. }
  79. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  80. type AWSPricingTerms struct {
  81. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  82. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  83. }
  84. // AWSOfferTerm is a sku extension used to pay for the node.
  85. type AWSOfferTerm struct {
  86. Sku string `json:"sku"`
  87. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  88. }
  89. // AWSRateCode encodes data about the price of a product
  90. type AWSRateCode struct {
  91. Unit string `json:"unit"`
  92. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  93. }
  94. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  95. type AWSCurrencyCode struct {
  96. USD string `json:"USD"`
  97. }
  98. // AWSProductTerms represents the full terms of the product
  99. type AWSProductTerms struct {
  100. Sku string `json:"sku"`
  101. OnDemand *AWSOfferTerm `json:"OnDemand"`
  102. Reserved *AWSOfferTerm `json:"Reserved"`
  103. Memory string `json:"memory"`
  104. Storage string `json:"storage"`
  105. VCpu string `json:"vcpu"`
  106. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  107. }
  108. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  109. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  110. // OnDemandRateCode is appended to an node sku
  111. const OnDemandRateCode = ".JRTCKXETXF"
  112. // ReservedRateCode is appended to a node sku
  113. const ReservedRateCode = ".38NPMPTW36"
  114. // HourlyRateCode is appended to a node sku
  115. const HourlyRateCode = ".6YS6EN2CT7"
  116. // KubeAttrConversion maps the k8s labels for region to an aws region
  117. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  118. locationToRegion := map[string]string{
  119. "US East (Ohio)": "us-east-2",
  120. "US East (N. Virginia)": "us-east-1",
  121. "US West (N. California)": "us-west-1",
  122. "US West (Oregon)": "us-west-2",
  123. "Asia Pacific (Mumbai)": "ap-south-1",
  124. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  125. "Asia Pacific (Seoul)": "ap-northeast-2",
  126. "Asia Pacific (Singapore)": "ap-southeast-1",
  127. "Asia Pacific (Sydney)": "ap-southeast-2",
  128. "Asia Pacific (Tokyo)": "ap-northeast-1",
  129. "Canada (Central)": "ca-central-1",
  130. "China (Beijing)": "cn-north-1",
  131. "China (Ningxia)": "cn-northwest-1",
  132. "EU (Frankfurt)": "eu-central-1",
  133. "EU (Ireland)": "eu-west-1",
  134. "EU (London)": "eu-west-2",
  135. "EU (Paris)": "eu-west-3",
  136. "EU (Stockholm)": "eu-north-1",
  137. "South America (São Paulo)": "sa-east-1",
  138. "AWS GovCloud (US-East)": "us-gov-east-1",
  139. "AWS GovCloud (US)": "us-gov-west-1",
  140. }
  141. operatingSystem = strings.ToLower(operatingSystem)
  142. region := locationToRegion[location]
  143. return region + "," + instanceType + "," + operatingSystem
  144. }
  145. type AwsSpotFeedInfo struct {
  146. BucketName string `json:"bucketName"`
  147. Prefix string `json:"prefix"`
  148. Region string `json:"region"`
  149. AccountID string `json:"projectID"`
  150. ServiceKeyName string `json:"serviceKeyName"`
  151. ServiceKeySecret string `json:"serviceKeySecret"`
  152. SpotLabel string `json:"spotLabel"`
  153. SpotLabelValue string `json:"spotLabelValue"`
  154. }
  155. type AwsAthenaInfo struct {
  156. AthenaBucketName string `json:"athenaBucketName"`
  157. AthenaRegion string `json:"athenaRegion"`
  158. AthenaDatabase string `json:"athenaDatabase"`
  159. AthenaTable string `json:"athenaTable"`
  160. ServiceKeyName string `json:"serviceKeyName"`
  161. ServiceKeySecret string `json:"serviceKeySecret"`
  162. AccountID string `json:"projectID"`
  163. }
  164. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  165. c, err := GetDefaultPricingData("aws.json")
  166. if err != nil {
  167. return nil, err
  168. }
  169. return c, nil
  170. }
  171. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  172. c, err := GetDefaultPricingData("aws.json")
  173. if err != nil {
  174. return nil, err
  175. }
  176. if updateType == SpotInfoUpdateType {
  177. a := AwsSpotFeedInfo{}
  178. err := json.NewDecoder(r).Decode(&a)
  179. if err != nil {
  180. return nil, err
  181. }
  182. if err != nil {
  183. return nil, err
  184. }
  185. c.ServiceKeyName = a.ServiceKeyName
  186. c.ServiceKeySecret = a.ServiceKeySecret
  187. c.SpotDataPrefix = a.Prefix
  188. c.SpotDataBucket = a.BucketName
  189. c.ProjectID = a.AccountID
  190. c.SpotDataRegion = a.Region
  191. c.SpotLabel = a.SpotLabel
  192. c.SpotLabelValue = a.SpotLabelValue
  193. } else if updateType == AthenaInfoUpdateType {
  194. a := AwsAthenaInfo{}
  195. err := json.NewDecoder(r).Decode(&a)
  196. if err != nil {
  197. return nil, err
  198. }
  199. c.AthenaBucketName = a.AthenaBucketName
  200. c.AthenaRegion = a.AthenaRegion
  201. c.AthenaDatabase = a.AthenaDatabase
  202. c.AthenaTable = a.AthenaTable
  203. c.ServiceKeyName = a.ServiceKeyName
  204. c.ServiceKeySecret = a.ServiceKeySecret
  205. c.ProjectID = a.AccountID
  206. } else {
  207. a := make(map[string]string)
  208. err = json.NewDecoder(r).Decode(&a)
  209. if err != nil {
  210. return nil, err
  211. }
  212. for k, v := range a {
  213. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  214. err := SetCustomPricingField(c, kUpper, v)
  215. if err != nil {
  216. return nil, err
  217. }
  218. }
  219. }
  220. cj, err := json.Marshal(c)
  221. if err != nil {
  222. return nil, err
  223. }
  224. path := os.Getenv("CONFIG_PATH")
  225. if path == "" {
  226. path = "/models/"
  227. }
  228. path += "aws.json"
  229. err = ioutil.WriteFile(path, cj, 0644)
  230. if err != nil {
  231. return nil, err
  232. }
  233. return c, nil
  234. }
  235. type awsKey struct {
  236. SpotLabelName string
  237. SpotLabelValue string
  238. Labels map[string]string
  239. ProviderID string
  240. }
  241. func (k *awsKey) GPUType() string {
  242. return ""
  243. }
  244. func (k *awsKey) ID() string {
  245. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  246. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  247. if matchNum == 2 {
  248. return group
  249. }
  250. }
  251. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  252. return ""
  253. }
  254. func (k *awsKey) Features() string {
  255. instanceType := k.Labels[v1.LabelInstanceType]
  256. var operatingSystem string
  257. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  258. if !ok {
  259. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  260. }
  261. region := k.Labels[v1.LabelZoneRegion]
  262. key := region + "," + instanceType + "," + operatingSystem
  263. usageType := "preemptible"
  264. spotKey := key + "," + usageType
  265. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  266. return spotKey
  267. }
  268. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  269. return spotKey
  270. }
  271. return key
  272. }
  273. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  274. return nil, nil
  275. }
  276. func (key *awsPVKey) Features() string {
  277. storageClass := key.StorageClassName
  278. if storageClass == "standard" {
  279. storageClass = "pdstandard"
  280. }
  281. return key.Labels[v1.LabelZoneRegion] + storageClass
  282. }
  283. type awsPVKey struct {
  284. Labels map[string]string
  285. StorageClassParameters map[string]string
  286. StorageClassName string
  287. }
  288. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  289. return &awsPVKey{
  290. Labels: pv.Labels,
  291. StorageClassName: pv.Spec.StorageClassName,
  292. }
  293. }
  294. // GetKey maps node labels to information needed to retrieve pricing data
  295. func (aws *AWS) GetKey(labels map[string]string) Key {
  296. return &awsKey{
  297. SpotLabelName: aws.SpotLabelName,
  298. SpotLabelValue: aws.SpotLabelValue,
  299. Labels: labels,
  300. ProviderID: labels["providerID"],
  301. }
  302. }
  303. func (aws *AWS) isPreemptible(key string) bool {
  304. s := strings.Split(key, ",")
  305. if len(s) == 4 && s[3] == "preemptible" {
  306. return true
  307. }
  308. return false
  309. }
  310. // DownloadPricingData fetches data from the AWS Pricing API
  311. func (aws *AWS) DownloadPricingData() error {
  312. aws.DownloadPricingDataLock.Lock()
  313. defer aws.DownloadPricingDataLock.Unlock()
  314. c, err := GetDefaultPricingData("aws.json")
  315. if err != nil {
  316. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  317. }
  318. aws.BaseCPUPrice = c.CPU
  319. aws.BaseSpotCPUPrice = c.SpotCPU
  320. aws.BaseSpotRAMPrice = c.SpotRAM
  321. aws.SpotLabelName = c.SpotLabel
  322. aws.SpotLabelValue = c.SpotLabelValue
  323. aws.SpotDataBucket = c.SpotDataBucket
  324. aws.SpotDataPrefix = c.SpotDataPrefix
  325. aws.ProjectID = c.ProjectID
  326. aws.SpotDataRegion = c.SpotDataRegion
  327. aws.ServiceKeyName = c.ServiceKeyName
  328. aws.ServiceKeySecret = c.ServiceKeySecret
  329. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  330. return fmt.Errorf("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  331. }
  332. nodeList, err := aws.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  333. if err != nil {
  334. return err
  335. }
  336. inputkeys := make(map[string]bool)
  337. for _, n := range nodeList.Items {
  338. labels := n.GetObjectMeta().GetLabels()
  339. key := aws.GetKey(labels)
  340. inputkeys[key.Features()] = true
  341. }
  342. aws.Pricing = make(map[string]*AWSProductTerms)
  343. aws.ValidPricingKeys = make(map[string]bool)
  344. skusToKeys := make(map[string]string)
  345. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  346. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  347. resp, err := http.Get(pricingURL)
  348. if err != nil {
  349. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  350. return err
  351. }
  352. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  353. dec := json.NewDecoder(resp.Body)
  354. for {
  355. t, err := dec.Token()
  356. if err == io.EOF {
  357. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  358. break
  359. }
  360. if t == "products" {
  361. _, err := dec.Token() // this should parse the opening "{""
  362. if err != nil {
  363. return err
  364. }
  365. for dec.More() {
  366. _, err := dec.Token() // the sku token
  367. if err != nil {
  368. return err
  369. }
  370. product := &AWSProduct{}
  371. err = dec.Decode(&product)
  372. if err != nil {
  373. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  374. break
  375. }
  376. if product.Attributes.PreInstalledSw == "NA" &&
  377. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  378. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  379. spotKey := key + ",preemptible"
  380. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  381. productTerms := &AWSProductTerms{
  382. Sku: product.Sku,
  383. Memory: product.Attributes.Memory,
  384. Storage: product.Attributes.Storage,
  385. VCpu: product.Attributes.VCpu,
  386. GPU: product.Attributes.GPU,
  387. }
  388. aws.Pricing[key] = productTerms
  389. aws.Pricing[spotKey] = productTerms
  390. skusToKeys[product.Sku] = key
  391. }
  392. aws.ValidPricingKeys[key] = true
  393. aws.ValidPricingKeys[spotKey] = true
  394. }
  395. }
  396. }
  397. if t == "terms" {
  398. _, err := dec.Token() // this should parse the opening "{""
  399. if err != nil {
  400. return err
  401. }
  402. termType, err := dec.Token()
  403. if err != nil {
  404. return err
  405. }
  406. if termType == "OnDemand" {
  407. _, err := dec.Token()
  408. if err != nil { // again, should parse an opening "{"
  409. return err
  410. }
  411. for dec.More() {
  412. sku, err := dec.Token()
  413. if err != nil {
  414. return err
  415. }
  416. _, err = dec.Token() // another opening "{"
  417. if err != nil {
  418. return err
  419. }
  420. skuOnDemand, err := dec.Token()
  421. if err != nil {
  422. return err
  423. }
  424. offerTerm := &AWSOfferTerm{}
  425. err = dec.Decode(&offerTerm)
  426. if err != nil {
  427. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  428. }
  429. if sku.(string)+OnDemandRateCode == skuOnDemand {
  430. key, ok := skusToKeys[sku.(string)]
  431. spotKey := key + ",preemptible"
  432. if ok {
  433. aws.Pricing[key].OnDemand = offerTerm
  434. aws.Pricing[spotKey].OnDemand = offerTerm
  435. }
  436. }
  437. _, err = dec.Token()
  438. if err != nil {
  439. return err
  440. }
  441. }
  442. _, err = dec.Token()
  443. if err != nil {
  444. return err
  445. }
  446. }
  447. }
  448. }
  449. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  450. if err != nil {
  451. klog.V(1).Infof("Error downloading spot data %s", err.Error())
  452. } else {
  453. aws.SpotPricingByInstanceID = sp
  454. }
  455. return nil
  456. }
  457. // AllNodePricing returns all the billing data fetched.
  458. func (aws *AWS) AllNodePricing() (interface{}, error) {
  459. aws.DownloadPricingDataLock.RLock()
  460. defer aws.DownloadPricingDataLock.RUnlock()
  461. return aws.Pricing, nil
  462. }
  463. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  464. key := k.Features()
  465. if aws.isPreemptible(key) {
  466. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  467. var spotcost string
  468. arr := strings.Split(spotInfo.Charge, " ")
  469. if len(arr) == 2 {
  470. spotcost = arr[0]
  471. } else {
  472. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  473. }
  474. return &Node{
  475. Cost: spotcost,
  476. VCPU: terms.VCpu,
  477. RAM: terms.Memory,
  478. GPU: terms.GPU,
  479. Storage: terms.Storage,
  480. BaseCPUPrice: aws.BaseCPUPrice,
  481. UsageType: usageType,
  482. }, nil
  483. }
  484. return &Node{
  485. VCPU: terms.VCpu,
  486. VCPUCost: aws.BaseSpotCPUPrice,
  487. RAM: terms.Memory,
  488. GPU: terms.GPU,
  489. RAMCost: aws.BaseSpotRAMPrice,
  490. Storage: terms.Storage,
  491. BaseCPUPrice: aws.BaseCPUPrice,
  492. UsageType: usageType,
  493. }, nil
  494. }
  495. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  496. if !ok {
  497. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  498. }
  499. cost := c.PricePerUnit.USD
  500. return &Node{
  501. Cost: cost,
  502. VCPU: terms.VCpu,
  503. RAM: terms.Memory,
  504. GPU: terms.GPU,
  505. Storage: terms.Storage,
  506. BaseCPUPrice: aws.BaseCPUPrice,
  507. UsageType: usageType,
  508. }, nil
  509. }
  510. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  511. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  512. aws.DownloadPricingDataLock.RLock()
  513. defer aws.DownloadPricingDataLock.RUnlock()
  514. key := k.Features()
  515. usageType := "ondemand"
  516. if aws.isPreemptible(key) {
  517. usageType = "preemptible"
  518. }
  519. terms, ok := aws.Pricing[key]
  520. if ok {
  521. return aws.createNode(terms, usageType, k)
  522. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  523. aws.DownloadPricingDataLock.RUnlock()
  524. err := aws.DownloadPricingData()
  525. aws.DownloadPricingDataLock.RLock()
  526. if err != nil {
  527. return nil, err
  528. }
  529. terms, termsOk := aws.Pricing[key]
  530. if !termsOk {
  531. return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  532. }
  533. return aws.createNode(terms, usageType, k)
  534. } else { // Fall back to base pricing if we can't find the key.
  535. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  536. return &Node{
  537. Cost: aws.BaseCPUPrice,
  538. BaseCPUPrice: aws.BaseCPUPrice,
  539. UsageType: usageType,
  540. UsesBaseCPUPrice: true,
  541. }, nil
  542. }
  543. }
  544. // ClusterName returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  545. func (awsProvider *AWS) ClusterName() ([]byte, error) {
  546. defaultClusterName := "AWS Cluster #1"
  547. makeStructure := func(clusterName string) ([]byte, error) {
  548. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  549. m := make(map[string]string)
  550. m["name"] = clusterName
  551. m["provider"] = "AWS"
  552. return json.Marshal(m)
  553. }
  554. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  555. if len(maybeClusterId) != 0 {
  556. return makeStructure(maybeClusterId)
  557. }
  558. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  559. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  560. nodeList, err := awsProvider.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
  561. if err != nil {
  562. return nil, err
  563. }
  564. for _, n := range nodeList.Items {
  565. region := ""
  566. instanceId := ""
  567. providerId := n.Spec.ProviderID
  568. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  569. if matchNum == 1 {
  570. region = group
  571. } else if matchNum == 2 {
  572. instanceId = group
  573. }
  574. }
  575. if len(instanceId) == 0 {
  576. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  577. continue
  578. }
  579. c := &aws.Config{
  580. Region: aws.String(region),
  581. }
  582. s := session.Must(session.NewSession(c))
  583. ec2Svc := ec2.New(s)
  584. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  585. InstanceIds: []*string{
  586. aws.String(instanceId),
  587. },
  588. })
  589. if diErr != nil {
  590. // maybe log this?
  591. continue
  592. }
  593. if len(di.Reservations) != 1 {
  594. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  595. continue
  596. }
  597. res := di.Reservations[0]
  598. if len(res.Instances) != 1 {
  599. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  600. continue
  601. }
  602. inst := res.Instances[0]
  603. for _, tag := range inst.Tags {
  604. tagKey := *tag.Key
  605. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  606. if matchNum != 1 {
  607. continue
  608. }
  609. return makeStructure(group)
  610. }
  611. }
  612. }
  613. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  614. return makeStructure(defaultClusterName)
  615. }
  616. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  617. func (*AWS) AddServiceKey(formValues url.Values) error {
  618. keyID := formValues.Get("access_key_ID")
  619. key := formValues.Get("secret_access_key")
  620. m := make(map[string]string)
  621. m["access_key_ID"] = keyID
  622. m["secret_access_key"] = key
  623. result, err := json.Marshal(m)
  624. if err != nil {
  625. return err
  626. }
  627. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  628. }
  629. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  630. func (*AWS) GetDisks() ([]byte, error) {
  631. jsonFile, err := os.Open("/var/configs/key.json")
  632. if err == nil {
  633. byteValue, _ := ioutil.ReadAll(jsonFile)
  634. var result map[string]string
  635. err := json.Unmarshal([]byte(byteValue), &result)
  636. if err != nil {
  637. return nil, err
  638. }
  639. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  640. if err != nil {
  641. return nil, err
  642. }
  643. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  644. if err != nil {
  645. return nil, err
  646. }
  647. } else if os.IsNotExist(err) {
  648. klog.V(2).Infof("Using Default Credentials")
  649. } else {
  650. return nil, err
  651. }
  652. defer jsonFile.Close()
  653. clusterConfig, err := os.Open("/var/configs/cluster.json")
  654. if err != nil {
  655. return nil, err
  656. }
  657. defer clusterConfig.Close()
  658. b, err := ioutil.ReadAll(clusterConfig)
  659. if err != nil {
  660. return nil, err
  661. }
  662. var clusterConf map[string]string
  663. err = json.Unmarshal([]byte(b), &clusterConf)
  664. if err != nil {
  665. return nil, err
  666. }
  667. region := aws.String(clusterConf["region"])
  668. c := &aws.Config{
  669. Region: region,
  670. }
  671. s := session.Must(session.NewSession(c))
  672. ec2Svc := ec2.New(s)
  673. input := &ec2.DescribeVolumesInput{}
  674. volumeResult, err := ec2Svc.DescribeVolumes(input)
  675. if err != nil {
  676. if aerr, ok := err.(awserr.Error); ok {
  677. switch aerr.Code() {
  678. default:
  679. return nil, aerr
  680. }
  681. } else {
  682. return nil, err
  683. }
  684. }
  685. return json.Marshal(volumeResult)
  686. }
  687. // ConvertToGlueColumnFormat takes a string and runs through various regex
  688. // and string replacement statements to convert it to a format compatible
  689. // with AWS Glue and Athena column names.
  690. // Following guidance from AWS provided here ('Column Names' section):
  691. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  692. // It returns a string containing the column name in proper column name format and length.
  693. func ConvertToGlueColumnFormat(column_name string) string {
  694. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  695. // An underscore is added in front of uppercase letters
  696. capital_underscore := regexp.MustCompile(`[A-Z]`)
  697. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  698. // Any non-alphanumeric characters are replaced with an underscore
  699. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  700. final = no_space_punc.ReplaceAllString(final, "_")
  701. // Duplicate underscores are removed
  702. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  703. final = no_dup_underscore.ReplaceAllString(final, "_")
  704. // Any leading and trailing underscores are removed
  705. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  706. final = no_front_end_underscore.ReplaceAllString(final, "")
  707. // Uppercase to lowercase
  708. final = strings.ToLower(final)
  709. // Longer column name than expected - remove _ left to right
  710. allowed_col_len := 128
  711. undersc_to_remove := len(final) - allowed_col_len
  712. if undersc_to_remove > 0 {
  713. final = strings.Replace(final, "_", "", undersc_to_remove)
  714. }
  715. // If removing all of the underscores still didn't
  716. // make the column name < 128 characters, trim it!
  717. if len(final) > allowed_col_len {
  718. final = final[:allowed_col_len]
  719. }
  720. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  721. return final
  722. }
  723. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  724. // "start" and "end" are dates of the format YYYY-MM-DD
  725. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  726. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  727. customPricing, err := a.GetConfig()
  728. if err != nil {
  729. return nil, err
  730. }
  731. aggregator_column_name := "resource_tags_user_kubernetes_" + aggregator
  732. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  733. query := fmt.Sprintf(`SELECT
  734. CAST(line_item_usage_start_date AS DATE) as start_date,
  735. %s,
  736. line_item_product_code,
  737. SUM(line_item_blended_cost) as blended_cost
  738. FROM %s as cost_data
  739. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  740. GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
  741. if customPricing.ServiceKeyName != "" {
  742. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  743. if err != nil {
  744. return nil, err
  745. }
  746. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  747. if err != nil {
  748. return nil, err
  749. }
  750. }
  751. region := aws.String(customPricing.AthenaRegion)
  752. resultsBucket := customPricing.AthenaBucketName
  753. database := customPricing.AthenaDatabase
  754. c := &aws.Config{
  755. Region: region,
  756. }
  757. s := session.Must(session.NewSession(c))
  758. svc := athena.New(s)
  759. var e athena.StartQueryExecutionInput
  760. var r athena.ResultConfiguration
  761. r.SetOutputLocation(resultsBucket)
  762. e.SetResultConfiguration(&r)
  763. e.SetQueryString(query)
  764. var q athena.QueryExecutionContext
  765. q.SetDatabase(database)
  766. e.SetQueryExecutionContext(&q)
  767. res, err := svc.StartQueryExecution(&e)
  768. if err != nil {
  769. return nil, err
  770. }
  771. klog.V(2).Infof("StartQueryExecution result:")
  772. klog.V(2).Infof(res.GoString())
  773. var qri athena.GetQueryExecutionInput
  774. qri.SetQueryExecutionId(*res.QueryExecutionId)
  775. var qrop *athena.GetQueryExecutionOutput
  776. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  777. for {
  778. qrop, err = svc.GetQueryExecution(&qri)
  779. if err != nil {
  780. return nil, err
  781. }
  782. if *qrop.QueryExecution.Status.State != "RUNNING" {
  783. break
  784. }
  785. time.Sleep(duration)
  786. }
  787. var oocAllocs []*OutOfClusterAllocation
  788. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  789. var ip athena.GetQueryResultsInput
  790. ip.SetQueryExecutionId(*res.QueryExecutionId)
  791. op, err := svc.GetQueryResults(&ip)
  792. if err != nil {
  793. return nil, err
  794. }
  795. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  796. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  797. if err != nil {
  798. return nil, err
  799. }
  800. ooc := &OutOfClusterAllocation{
  801. Aggregator: aggregator,
  802. Environment: *r.Data[1].VarCharValue,
  803. Service: *r.Data[2].VarCharValue,
  804. Cost: cost,
  805. }
  806. oocAllocs = append(oocAllocs, ooc)
  807. }
  808. }
  809. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  810. }
  811. // QuerySQL can query a properly configured Athena database.
  812. // Used to fetch billing data.
  813. // Requires a json config in /var/configs with key region, output, and database.
  814. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  815. customPricing, err := a.GetConfig()
  816. if err != nil {
  817. return nil, err
  818. }
  819. if customPricing.ServiceKeyName != "" {
  820. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  821. if err != nil {
  822. return nil, err
  823. }
  824. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  825. if err != nil {
  826. return nil, err
  827. }
  828. }
  829. athenaConfigs, err := os.Open("/var/configs/athena.json")
  830. if err != nil {
  831. return nil, err
  832. }
  833. defer athenaConfigs.Close()
  834. b, err := ioutil.ReadAll(athenaConfigs)
  835. if err != nil {
  836. return nil, err
  837. }
  838. var athenaConf map[string]string
  839. json.Unmarshal([]byte(b), &athenaConf)
  840. region := aws.String(customPricing.AthenaRegion)
  841. resultsBucket := customPricing.AthenaBucketName
  842. database := customPricing.AthenaDatabase
  843. c := &aws.Config{
  844. Region: region,
  845. }
  846. s := session.Must(session.NewSession(c))
  847. svc := athena.New(s)
  848. var e athena.StartQueryExecutionInput
  849. var r athena.ResultConfiguration
  850. r.SetOutputLocation(resultsBucket)
  851. e.SetResultConfiguration(&r)
  852. e.SetQueryString(query)
  853. var q athena.QueryExecutionContext
  854. q.SetDatabase(database)
  855. e.SetQueryExecutionContext(&q)
  856. res, err := svc.StartQueryExecution(&e)
  857. if err != nil {
  858. return nil, err
  859. }
  860. klog.V(2).Infof("StartQueryExecution result:")
  861. klog.V(2).Infof(res.GoString())
  862. var qri athena.GetQueryExecutionInput
  863. qri.SetQueryExecutionId(*res.QueryExecutionId)
  864. var qrop *athena.GetQueryExecutionOutput
  865. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  866. for {
  867. qrop, err = svc.GetQueryExecution(&qri)
  868. if err != nil {
  869. return nil, err
  870. }
  871. if *qrop.QueryExecution.Status.State != "RUNNING" {
  872. break
  873. }
  874. time.Sleep(duration)
  875. }
  876. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  877. var ip athena.GetQueryResultsInput
  878. ip.SetQueryExecutionId(*res.QueryExecutionId)
  879. op, err := svc.GetQueryResults(&ip)
  880. if err != nil {
  881. return nil, err
  882. }
  883. b, err := json.Marshal(op.ResultSet)
  884. if err != nil {
  885. return nil, err
  886. }
  887. return b, nil
  888. }
  889. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  890. }
  891. type spotInfo struct {
  892. Timestamp string `csv:"Timestamp"`
  893. UsageType string `csv:"UsageType"`
  894. Operation string `csv:"Operation"`
  895. InstanceID string `csv:"InstanceID"`
  896. MyBidID string `csv:"MyBidID"`
  897. MyMaxPrice string `csv:"MyMaxPrice"`
  898. MarketPrice string `csv:"MarketPrice"`
  899. Charge string `csv:"Charge"`
  900. Version string `csv:"Version"`
  901. }
  902. type fnames []*string
  903. func (f fnames) Len() int {
  904. return len(f)
  905. }
  906. func (f fnames) Swap(i, j int) {
  907. f[i], f[j] = f[j], f[i]
  908. }
  909. func (f fnames) Less(i, j int) bool {
  910. key1 := strings.Split(*f[i], ".")
  911. key2 := strings.Split(*f[j], ".")
  912. t1, err := time.Parse("2006-01-02-15", key1[1])
  913. if err != nil {
  914. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  915. return false
  916. }
  917. t2, err := time.Parse("2006-01-02-15", key2[1])
  918. if err != nil {
  919. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  920. return false
  921. }
  922. return t1.Before(t2)
  923. }
  924. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  925. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  926. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  927. if err != nil {
  928. return nil, err
  929. }
  930. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  931. if err != nil {
  932. return nil, err
  933. }
  934. }
  935. s3Prefix := projectID
  936. if len(prefix) != 0 {
  937. s3Prefix = prefix + "/" + s3Prefix
  938. }
  939. c := aws.NewConfig().WithRegion(region)
  940. s := session.Must(session.NewSession(c))
  941. s3Svc := s3.New(s)
  942. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  943. tNow := time.Now()
  944. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  945. ls := &s3.ListObjectsInput{
  946. Bucket: aws.String(bucket),
  947. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  948. }
  949. ls2 := &s3.ListObjectsInput{
  950. Bucket: aws.String(bucket),
  951. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  952. }
  953. lso, err := s3Svc.ListObjects(ls)
  954. if err != nil {
  955. return nil, err
  956. }
  957. lsoLen := len(lso.Contents)
  958. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  959. if lsoLen == 0 {
  960. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  961. }
  962. lso2, err := s3Svc.ListObjects(ls2)
  963. if err != nil {
  964. return nil, err
  965. }
  966. lso2Len := len(lso2.Contents)
  967. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  968. if lso2Len == 0 {
  969. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  970. }
  971. var keys []*string
  972. for _, obj := range lso.Contents {
  973. keys = append(keys, obj.Key)
  974. }
  975. for _, obj := range lso2.Contents {
  976. keys = append(keys, obj.Key)
  977. }
  978. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  979. header, err := csvutil.Header(spotInfo{}, "csv")
  980. if err != nil {
  981. return nil, err
  982. }
  983. fieldsPerRecord := len(header)
  984. spots := make(map[string]*spotInfo)
  985. for _, key := range keys {
  986. getObj := &s3.GetObjectInput{
  987. Bucket: aws.String(bucket),
  988. Key: key,
  989. }
  990. buf := aws.NewWriteAtBuffer([]byte{})
  991. _, err := downloader.Download(buf, getObj)
  992. if err != nil {
  993. return nil, err
  994. }
  995. r := bytes.NewReader(buf.Bytes())
  996. gr, err := gzip.NewReader(r)
  997. if err != nil {
  998. return nil, err
  999. }
  1000. csvReader := csv.NewReader(gr)
  1001. csvReader.Comma = '\t'
  1002. csvReader.FieldsPerRecord = fieldsPerRecord
  1003. dec, err := csvutil.NewDecoder(csvReader, header...)
  1004. if err != nil {
  1005. return nil, err
  1006. }
  1007. var foundVersion string
  1008. for {
  1009. spot := spotInfo{}
  1010. err := dec.Decode(&spot)
  1011. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1012. if err == io.EOF {
  1013. break
  1014. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1015. rec := dec.Record()
  1016. // the first two "Record()" will be the comment lines
  1017. // and they show up as len() == 1
  1018. // the first of which is "#Version"
  1019. // the second of which is "#Fields: "
  1020. if len(rec) != 1 {
  1021. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1022. continue
  1023. }
  1024. if len(foundVersion) == 0 {
  1025. spotFeedVersion := rec[0]
  1026. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1027. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1028. if matches != nil {
  1029. foundVersion = matches[1]
  1030. if foundVersion != supportedSpotFeedVersion {
  1031. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1032. break
  1033. }
  1034. }
  1035. continue
  1036. } else if strings.Index(rec[0], "#") == 0 {
  1037. continue
  1038. } else {
  1039. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1040. continue
  1041. }
  1042. } else if err != nil {
  1043. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1044. continue
  1045. }
  1046. klog.V(3).Infof("Found spot info %+v", spot)
  1047. spots[spot.InstanceID] = &spot
  1048. }
  1049. gr.Close()
  1050. }
  1051. return spots, nil
  1052. }