awsprovider.go 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "k8s.io/klog"
  19. "github.com/kubecost/cost-model/clustercache"
  20. "github.com/aws/aws-sdk-go/aws"
  21. "github.com/aws/aws-sdk-go/aws/awserr"
  22. "github.com/aws/aws-sdk-go/aws/session"
  23. "github.com/aws/aws-sdk-go/service/athena"
  24. "github.com/aws/aws-sdk-go/service/ec2"
  25. "github.com/aws/aws-sdk-go/service/s3"
  26. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  27. "github.com/jszwec/csvutil"
  28. v1 "k8s.io/api/core/v1"
  29. )
  30. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  31. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  32. const supportedSpotFeedVersion = "1"
  33. const SpotInfoUpdateType = "spotinfo"
  34. const AthenaInfoUpdateType = "athenainfo"
  35. // AWS represents an Amazon Provider
  36. type AWS struct {
  37. Pricing map[string]*AWSProductTerms
  38. SpotPricingByInstanceID map[string]*spotInfo
  39. ValidPricingKeys map[string]bool
  40. Clientset clustercache.ClusterCache
  41. BaseCPUPrice string
  42. BaseRAMPrice string
  43. BaseGPUPrice string
  44. BaseSpotCPUPrice string
  45. BaseSpotRAMPrice string
  46. SpotLabelName string
  47. SpotLabelValue string
  48. ServiceKeyName string
  49. ServiceKeySecret string
  50. SpotDataRegion string
  51. SpotDataBucket string
  52. SpotDataPrefix string
  53. ProjectID string
  54. DownloadPricingDataLock sync.RWMutex
  55. *CustomProvider
  56. }
  57. // AWSPricing maps a k8s node to an AWS Pricing "product"
  58. type AWSPricing struct {
  59. Products map[string]*AWSProduct `json:"products"`
  60. Terms AWSPricingTerms `json:"terms"`
  61. }
  62. // AWSProduct represents a purchased SKU
  63. type AWSProduct struct {
  64. Sku string `json:"sku"`
  65. Attributes AWSProductAttributes `json:"attributes"`
  66. }
  67. // AWSProductAttributes represents metadata about the product used to map to a node.
  68. type AWSProductAttributes struct {
  69. Location string `json:"location"`
  70. InstanceType string `json:"instanceType"`
  71. Memory string `json:"memory"`
  72. Storage string `json:"storage"`
  73. VCpu string `json:"vcpu"`
  74. UsageType string `json:"usagetype"`
  75. OperatingSystem string `json:"operatingSystem"`
  76. PreInstalledSw string `json:"preInstalledSw"`
  77. InstanceFamily string `json:"instanceFamily"`
  78. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  79. }
  80. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  81. type AWSPricingTerms struct {
  82. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  83. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  84. }
  85. // AWSOfferTerm is a sku extension used to pay for the node.
  86. type AWSOfferTerm struct {
  87. Sku string `json:"sku"`
  88. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  89. }
  90. // AWSRateCode encodes data about the price of a product
  91. type AWSRateCode struct {
  92. Unit string `json:"unit"`
  93. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  94. }
  95. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  96. type AWSCurrencyCode struct {
  97. USD string `json:"USD"`
  98. }
  99. // AWSProductTerms represents the full terms of the product
  100. type AWSProductTerms struct {
  101. Sku string `json:"sku"`
  102. OnDemand *AWSOfferTerm `json:"OnDemand"`
  103. Reserved *AWSOfferTerm `json:"Reserved"`
  104. Memory string `json:"memory"`
  105. Storage string `json:"storage"`
  106. VCpu string `json:"vcpu"`
  107. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  108. PV *PV `json:"pv"`
  109. }
  110. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  111. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  112. // OnDemandRateCode is appended to an node sku
  113. const OnDemandRateCode = ".JRTCKXETXF"
  114. // ReservedRateCode is appended to a node sku
  115. const ReservedRateCode = ".38NPMPTW36"
  116. // HourlyRateCode is appended to a node sku
  117. const HourlyRateCode = ".6YS6EN2CT7"
  118. // volTypes are used to map between AWS UsageTypes and
  119. // EBS volume types, as they would appear in K8s storage class
  120. // name and the EC2 API.
  121. var volTypes = map[string]string{
  122. "EBS:VolumeUsage.gp2": "gp2",
  123. "EBS:VolumeUsage": "standard",
  124. "EBS:VolumeUsage.sc1": "sc1",
  125. "EBS:VolumeP-IOPS.piops": "io1",
  126. "EBS:VolumeUsage.st1": "st1",
  127. "EBS:VolumeUsage.piops": "io1",
  128. "gp2": "EBS:VolumeUsage.gp2",
  129. "standard": "EBS:VolumeUsage",
  130. "sc1": "EBS:VolumeUsage.sc1",
  131. "io1": "EBS:VolumeUsage.piops",
  132. "st1": "EBS:VolumeUsage.st1",
  133. }
  134. // locationToRegion maps AWS region names (As they come from Billing)
  135. // to actual region identifiers
  136. var locationToRegion = map[string]string{
  137. "US East (Ohio)": "us-east-2",
  138. "US East (N. Virginia)": "us-east-1",
  139. "US West (N. California)": "us-west-1",
  140. "US West (Oregon)": "us-west-2",
  141. "Asia Pacific (Hong Kong)": "ap-east-1",
  142. "Asia Pacific (Mumbai)": "ap-south-1",
  143. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  144. "Asia Pacific (Seoul)": "ap-northeast-2",
  145. "Asia Pacific (Singapore)": "ap-southeast-1",
  146. "Asia Pacific (Sydney)": "ap-southeast-2",
  147. "Asia Pacific (Tokyo)": "ap-northeast-1",
  148. "Canada (Central)": "ca-central-1",
  149. "China (Beijing)": "cn-north-1",
  150. "China (Ningxia)": "cn-northwest-1",
  151. "EU (Frankfurt)": "eu-central-1",
  152. "EU (Ireland)": "eu-west-1",
  153. "EU (London)": "eu-west-2",
  154. "EU (Paris)": "eu-west-3",
  155. "EU (Stockholm)": "eu-north-1",
  156. "South America (Sao Paulo)": "sa-east-1",
  157. "AWS GovCloud (US-East)": "us-gov-east-1",
  158. "AWS GovCloud (US)": "us-gov-west-1",
  159. }
  160. var regionToBillingRegionCode = map[string]string{
  161. "us-east-2": "USE2",
  162. "us-east-1": "",
  163. "us-west-1": "USW1",
  164. "us-west-2": "USW2",
  165. "ap-east-1": "APE1",
  166. "ap-south-1": "APS3",
  167. "ap-northeast-3": "APN3",
  168. "ap-northeast-2": "APN2",
  169. "ap-southeast-1": "APS1",
  170. "ap-southeast-2": "APS2",
  171. "ap-northeast-1": "APN1",
  172. "ca-central-1": "CAN1",
  173. "cn-north-1": "",
  174. "cn-northwest-1": "",
  175. "eu-central-1": "EUC1",
  176. "eu-west-1": "EU",
  177. "eu-west-2": "EUW2",
  178. "eu-west-3": "EUW3",
  179. "eu-north-1": "EUN1",
  180. "sa-east-1": "SAE1",
  181. "us-gov-east-1": "UGE1",
  182. "us-gov-west-1": "UGW1",
  183. }
  184. func (aws *AWS) GetLocalStorageQuery(offset string) (string, error) {
  185. return "", nil
  186. }
  187. // KubeAttrConversion maps the k8s labels for region to an aws region
  188. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  189. operatingSystem = strings.ToLower(operatingSystem)
  190. region := locationToRegion[location]
  191. return region + "," + instanceType + "," + operatingSystem
  192. }
  193. type AwsSpotFeedInfo struct {
  194. BucketName string `json:"bucketName"`
  195. Prefix string `json:"prefix"`
  196. Region string `json:"region"`
  197. AccountID string `json:"projectID"`
  198. ServiceKeyName string `json:"serviceKeyName"`
  199. ServiceKeySecret string `json:"serviceKeySecret"`
  200. SpotLabel string `json:"spotLabel"`
  201. SpotLabelValue string `json:"spotLabelValue"`
  202. }
  203. type AwsAthenaInfo struct {
  204. AthenaBucketName string `json:"athenaBucketName"`
  205. AthenaRegion string `json:"athenaRegion"`
  206. AthenaDatabase string `json:"athenaDatabase"`
  207. AthenaTable string `json:"athenaTable"`
  208. ServiceKeyName string `json:"serviceKeyName"`
  209. ServiceKeySecret string `json:"serviceKeySecret"`
  210. AccountID string `json:"projectID"`
  211. }
  212. func (aws *AWS) GetManagementPlatform() (string, error) {
  213. nodes := aws.Clientset.GetAllNodes()
  214. if len(nodes) > 0 {
  215. n := nodes[0]
  216. version := n.Status.NodeInfo.KubeletVersion
  217. if strings.Contains(version, "eks") {
  218. return "eks", nil
  219. }
  220. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  221. return "kops", nil
  222. }
  223. }
  224. return "", nil
  225. }
  226. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  227. c, err := GetDefaultPricingData("aws.json")
  228. if c.Discount == "" {
  229. c.Discount = "0%"
  230. }
  231. if err != nil {
  232. return nil, err
  233. }
  234. return c, nil
  235. }
  236. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  237. c, err := GetDefaultPricingData("aws.json")
  238. if err != nil {
  239. return nil, err
  240. }
  241. if updateType == SpotInfoUpdateType {
  242. a := AwsSpotFeedInfo{}
  243. err := json.NewDecoder(r).Decode(&a)
  244. if err != nil {
  245. return nil, err
  246. }
  247. if err != nil {
  248. return nil, err
  249. }
  250. c.ServiceKeyName = a.ServiceKeyName
  251. c.ServiceKeySecret = a.ServiceKeySecret
  252. c.SpotDataPrefix = a.Prefix
  253. c.SpotDataBucket = a.BucketName
  254. c.ProjectID = a.AccountID
  255. c.SpotDataRegion = a.Region
  256. c.SpotLabel = a.SpotLabel
  257. c.SpotLabelValue = a.SpotLabelValue
  258. } else if updateType == AthenaInfoUpdateType {
  259. a := AwsAthenaInfo{}
  260. err := json.NewDecoder(r).Decode(&a)
  261. if err != nil {
  262. return nil, err
  263. }
  264. c.AthenaBucketName = a.AthenaBucketName
  265. c.AthenaRegion = a.AthenaRegion
  266. c.AthenaDatabase = a.AthenaDatabase
  267. c.AthenaTable = a.AthenaTable
  268. c.ServiceKeyName = a.ServiceKeyName
  269. c.ServiceKeySecret = a.ServiceKeySecret
  270. c.ProjectID = a.AccountID
  271. } else {
  272. a := make(map[string]string)
  273. err = json.NewDecoder(r).Decode(&a)
  274. if err != nil {
  275. return nil, err
  276. }
  277. for k, v := range a {
  278. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  279. err := SetCustomPricingField(c, kUpper, v)
  280. if err != nil {
  281. return nil, err
  282. }
  283. }
  284. }
  285. cj, err := json.Marshal(c)
  286. if err != nil {
  287. return nil, err
  288. }
  289. path := os.Getenv("CONFIG_PATH")
  290. if path == "" {
  291. path = "/models/"
  292. }
  293. path += "aws.json"
  294. remoteEnabled := os.Getenv(remoteEnabled)
  295. if remoteEnabled == "true" {
  296. err = UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
  297. if err != nil {
  298. return nil, err
  299. }
  300. }
  301. err = ioutil.WriteFile(path, cj, 0644)
  302. if err != nil {
  303. return nil, err
  304. }
  305. return c, nil
  306. }
  307. type awsKey struct {
  308. SpotLabelName string
  309. SpotLabelValue string
  310. Labels map[string]string
  311. ProviderID string
  312. }
  313. func (k *awsKey) GPUType() string {
  314. return ""
  315. }
  316. func (k *awsKey) ID() string {
  317. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  318. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  319. if matchNum == 2 {
  320. return group
  321. }
  322. }
  323. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  324. return ""
  325. }
  326. func (k *awsKey) Features() string {
  327. instanceType := k.Labels[v1.LabelInstanceType]
  328. var operatingSystem string
  329. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  330. if !ok {
  331. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  332. }
  333. region := k.Labels[v1.LabelZoneRegion]
  334. key := region + "," + instanceType + "," + operatingSystem
  335. usageType := "preemptible"
  336. spotKey := key + "," + usageType
  337. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  338. return spotKey
  339. }
  340. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  341. return spotKey
  342. }
  343. return key
  344. }
  345. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  346. pricing, ok := aws.Pricing[pvk.Features()]
  347. if !ok {
  348. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  349. return &PV{}, nil
  350. }
  351. return pricing.PV, nil
  352. }
  353. type awsPVKey struct {
  354. Labels map[string]string
  355. StorageClassParameters map[string]string
  356. StorageClassName string
  357. Name string
  358. }
  359. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  360. return &awsPVKey{
  361. Labels: pv.Labels,
  362. StorageClassName: pv.Spec.StorageClassName,
  363. StorageClassParameters: parameters,
  364. Name: pv.Name,
  365. }
  366. }
  367. func (key *awsPVKey) GetStorageClass() string {
  368. return key.StorageClassName
  369. }
  370. func (key *awsPVKey) Features() string {
  371. storageClass := key.StorageClassParameters["type"]
  372. if storageClass == "standard" {
  373. storageClass = "gp2"
  374. }
  375. // Storage class names are generally EBS volume types (gp2)
  376. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  377. // Converts between the 2
  378. region := key.Labels[v1.LabelZoneRegion]
  379. //if region == "" {
  380. // region = "us-east-1"
  381. //}
  382. class, ok := volTypes[storageClass]
  383. if !ok {
  384. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  385. }
  386. return region + "," + class
  387. }
  388. // GetKey maps node labels to information needed to retrieve pricing data
  389. func (aws *AWS) GetKey(labels map[string]string) Key {
  390. return &awsKey{
  391. SpotLabelName: aws.SpotLabelName,
  392. SpotLabelValue: aws.SpotLabelValue,
  393. Labels: labels,
  394. ProviderID: labels["providerID"],
  395. }
  396. }
  397. func (aws *AWS) isPreemptible(key string) bool {
  398. s := strings.Split(key, ",")
  399. if len(s) == 4 && s[3] == "preemptible" {
  400. return true
  401. }
  402. return false
  403. }
  404. // DownloadPricingData fetches data from the AWS Pricing API
  405. func (aws *AWS) DownloadPricingData() error {
  406. aws.DownloadPricingDataLock.Lock()
  407. defer aws.DownloadPricingDataLock.Unlock()
  408. c, err := GetDefaultPricingData("aws.json")
  409. if err != nil {
  410. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  411. }
  412. aws.BaseCPUPrice = c.CPU
  413. aws.BaseRAMPrice = c.RAM
  414. aws.BaseGPUPrice = c.GPU
  415. aws.BaseSpotCPUPrice = c.SpotCPU
  416. aws.BaseSpotRAMPrice = c.SpotRAM
  417. aws.SpotLabelName = c.SpotLabel
  418. aws.SpotLabelValue = c.SpotLabelValue
  419. aws.SpotDataBucket = c.SpotDataBucket
  420. aws.SpotDataPrefix = c.SpotDataPrefix
  421. aws.ProjectID = c.ProjectID
  422. aws.SpotDataRegion = c.SpotDataRegion
  423. aws.ServiceKeyName = c.ServiceKeyName
  424. aws.ServiceKeySecret = c.ServiceKeySecret
  425. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  426. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  427. }
  428. nodeList := aws.Clientset.GetAllNodes()
  429. inputkeys := make(map[string]bool)
  430. for _, n := range nodeList {
  431. labels := n.GetObjectMeta().GetLabels()
  432. key := aws.GetKey(labels)
  433. inputkeys[key.Features()] = true
  434. }
  435. pvList := aws.Clientset.GetAllPersistentVolumes()
  436. storageClasses := aws.Clientset.GetAllStorageClasses()
  437. storageClassMap := make(map[string]map[string]string)
  438. for _, storageClass := range storageClasses {
  439. params := storageClass.Parameters
  440. storageClassMap[storageClass.ObjectMeta.Name] = params
  441. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  442. storageClassMap["default"] = params
  443. storageClassMap[""] = params
  444. }
  445. }
  446. pvkeys := make(map[string]PVKey)
  447. for _, pv := range pvList {
  448. params, ok := storageClassMap[pv.Spec.StorageClassName]
  449. if !ok {
  450. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  451. continue
  452. }
  453. key := aws.GetPVKey(pv, params)
  454. pvkeys[key.Features()] = key
  455. }
  456. aws.Pricing = make(map[string]*AWSProductTerms)
  457. aws.ValidPricingKeys = make(map[string]bool)
  458. skusToKeys := make(map[string]string)
  459. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  460. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  461. resp, err := http.Get(pricingURL)
  462. if err != nil {
  463. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  464. return err
  465. }
  466. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  467. dec := json.NewDecoder(resp.Body)
  468. for {
  469. t, err := dec.Token()
  470. if err == io.EOF {
  471. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  472. break
  473. }
  474. if t == "products" {
  475. _, err := dec.Token() // this should parse the opening "{""
  476. if err != nil {
  477. return err
  478. }
  479. for dec.More() {
  480. _, err := dec.Token() // the sku token
  481. if err != nil {
  482. return err
  483. }
  484. product := &AWSProduct{}
  485. err = dec.Decode(&product)
  486. if err != nil {
  487. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  488. break
  489. }
  490. if product.Attributes.PreInstalledSw == "NA" &&
  491. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  492. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  493. spotKey := key + ",preemptible"
  494. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  495. productTerms := &AWSProductTerms{
  496. Sku: product.Sku,
  497. Memory: product.Attributes.Memory,
  498. Storage: product.Attributes.Storage,
  499. VCpu: product.Attributes.VCpu,
  500. GPU: product.Attributes.GPU,
  501. }
  502. aws.Pricing[key] = productTerms
  503. aws.Pricing[spotKey] = productTerms
  504. skusToKeys[product.Sku] = key
  505. }
  506. aws.ValidPricingKeys[key] = true
  507. aws.ValidPricingKeys[spotKey] = true
  508. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  509. // UsageTypes may be prefixed with a region code - we're removing this when using
  510. // volTypes to keep lookups generic
  511. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  512. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  513. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  514. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  515. spotKey := key + ",preemptible"
  516. pv := &PV{
  517. Class: volTypes[usageTypeNoRegion],
  518. Region: locationToRegion[product.Attributes.Location],
  519. }
  520. productTerms := &AWSProductTerms{
  521. Sku: product.Sku,
  522. PV: pv,
  523. }
  524. aws.Pricing[key] = productTerms
  525. aws.Pricing[spotKey] = productTerms
  526. skusToKeys[product.Sku] = key
  527. aws.ValidPricingKeys[key] = true
  528. aws.ValidPricingKeys[spotKey] = true
  529. }
  530. }
  531. }
  532. if t == "terms" {
  533. _, err := dec.Token() // this should parse the opening "{""
  534. if err != nil {
  535. return err
  536. }
  537. termType, err := dec.Token()
  538. if err != nil {
  539. return err
  540. }
  541. if termType == "OnDemand" {
  542. _, err := dec.Token()
  543. if err != nil { // again, should parse an opening "{"
  544. return err
  545. }
  546. for dec.More() {
  547. sku, err := dec.Token()
  548. if err != nil {
  549. return err
  550. }
  551. _, err = dec.Token() // another opening "{"
  552. if err != nil {
  553. return err
  554. }
  555. skuOnDemand, err := dec.Token()
  556. if err != nil {
  557. return err
  558. }
  559. offerTerm := &AWSOfferTerm{}
  560. err = dec.Decode(&offerTerm)
  561. if err != nil {
  562. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  563. }
  564. if sku.(string)+OnDemandRateCode == skuOnDemand {
  565. key, ok := skusToKeys[sku.(string)]
  566. spotKey := key + ",preemptible"
  567. if ok {
  568. aws.Pricing[key].OnDemand = offerTerm
  569. aws.Pricing[spotKey].OnDemand = offerTerm
  570. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  571. // If the specific UsageType is the per IO cost used on io1 volumes
  572. // we need to add the per IO cost to the io1 PV cost
  573. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  574. // Add the per IO cost to the PV object for the io1 volume type
  575. aws.Pricing[key].PV.CostPerIO = cost
  576. } else if strings.Contains(key, "EBS:Volume") {
  577. // If volume, we need to get hourly cost and add it to the PV object
  578. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  579. costFloat, _ := strconv.ParseFloat(cost, 64)
  580. hourlyPrice := costFloat / 730
  581. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  582. }
  583. }
  584. }
  585. _, err = dec.Token()
  586. if err != nil {
  587. return err
  588. }
  589. }
  590. _, err = dec.Token()
  591. if err != nil {
  592. return err
  593. }
  594. }
  595. }
  596. }
  597. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  598. if err != nil {
  599. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  600. } else {
  601. aws.SpotPricingByInstanceID = sp
  602. }
  603. return nil
  604. }
  605. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  606. func (c *AWS) NetworkPricing() (*Network, error) {
  607. cpricing, err := GetDefaultPricingData("aws.json")
  608. if err != nil {
  609. return nil, err
  610. }
  611. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  612. if err != nil {
  613. return nil, err
  614. }
  615. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  616. if err != nil {
  617. return nil, err
  618. }
  619. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  620. if err != nil {
  621. return nil, err
  622. }
  623. return &Network{
  624. ZoneNetworkEgressCost: znec,
  625. RegionNetworkEgressCost: rnec,
  626. InternetNetworkEgressCost: inec,
  627. }, nil
  628. }
  629. // AllNodePricing returns all the billing data fetched.
  630. func (aws *AWS) AllNodePricing() (interface{}, error) {
  631. aws.DownloadPricingDataLock.RLock()
  632. defer aws.DownloadPricingDataLock.RUnlock()
  633. return aws.Pricing, nil
  634. }
  635. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  636. key := k.Features()
  637. if aws.isPreemptible(key) {
  638. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  639. var spotcost string
  640. arr := strings.Split(spotInfo.Charge, " ")
  641. if len(arr) == 2 {
  642. spotcost = arr[0]
  643. } else {
  644. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  645. }
  646. klog.V(1).Infof("SPOT COST FOR %s: %s", k.Features, spotcost)
  647. return &Node{
  648. Cost: spotcost,
  649. VCPU: terms.VCpu,
  650. RAM: terms.Memory,
  651. GPU: terms.GPU,
  652. Storage: terms.Storage,
  653. BaseCPUPrice: aws.BaseCPUPrice,
  654. BaseRAMPrice: aws.BaseRAMPrice,
  655. BaseGPUPrice: aws.BaseGPUPrice,
  656. UsageType: usageType,
  657. }, nil
  658. }
  659. return &Node{
  660. VCPU: terms.VCpu,
  661. VCPUCost: aws.BaseSpotCPUPrice,
  662. RAM: terms.Memory,
  663. GPU: terms.GPU,
  664. RAMCost: aws.BaseSpotRAMPrice,
  665. Storage: terms.Storage,
  666. BaseCPUPrice: aws.BaseCPUPrice,
  667. BaseRAMPrice: aws.BaseRAMPrice,
  668. BaseGPUPrice: aws.BaseGPUPrice,
  669. UsageType: usageType,
  670. }, nil
  671. }
  672. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  673. if !ok {
  674. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  675. }
  676. cost := c.PricePerUnit.USD
  677. return &Node{
  678. Cost: cost,
  679. VCPU: terms.VCpu,
  680. RAM: terms.Memory,
  681. GPU: terms.GPU,
  682. Storage: terms.Storage,
  683. BaseCPUPrice: aws.BaseCPUPrice,
  684. BaseRAMPrice: aws.BaseRAMPrice,
  685. BaseGPUPrice: aws.BaseGPUPrice,
  686. UsageType: usageType,
  687. }, nil
  688. }
  689. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  690. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  691. aws.DownloadPricingDataLock.RLock()
  692. defer aws.DownloadPricingDataLock.RUnlock()
  693. key := k.Features()
  694. usageType := "ondemand"
  695. if aws.isPreemptible(key) {
  696. usageType = "preemptible"
  697. }
  698. terms, ok := aws.Pricing[key]
  699. if ok {
  700. return aws.createNode(terms, usageType, k)
  701. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  702. aws.DownloadPricingDataLock.RUnlock()
  703. err := aws.DownloadPricingData()
  704. aws.DownloadPricingDataLock.RLock()
  705. if err != nil {
  706. return &Node{
  707. Cost: aws.BaseCPUPrice,
  708. BaseCPUPrice: aws.BaseCPUPrice,
  709. BaseRAMPrice: aws.BaseRAMPrice,
  710. BaseGPUPrice: aws.BaseGPUPrice,
  711. UsageType: usageType,
  712. UsesBaseCPUPrice: true,
  713. }, err
  714. }
  715. terms, termsOk := aws.Pricing[key]
  716. if !termsOk {
  717. return &Node{
  718. Cost: aws.BaseCPUPrice,
  719. BaseCPUPrice: aws.BaseCPUPrice,
  720. BaseRAMPrice: aws.BaseRAMPrice,
  721. BaseGPUPrice: aws.BaseGPUPrice,
  722. UsageType: usageType,
  723. UsesBaseCPUPrice: true,
  724. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  725. }
  726. return aws.createNode(terms, usageType, k)
  727. } else { // Fall back to base pricing if we can't find the key.
  728. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  729. return &Node{
  730. Cost: aws.BaseCPUPrice,
  731. BaseCPUPrice: aws.BaseCPUPrice,
  732. BaseRAMPrice: aws.BaseRAMPrice,
  733. BaseGPUPrice: aws.BaseGPUPrice,
  734. UsageType: usageType,
  735. UsesBaseCPUPrice: true,
  736. }, nil
  737. }
  738. }
  739. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  740. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  741. defaultClusterName := "AWS Cluster #1"
  742. c, err := awsProvider.GetConfig()
  743. if err != nil {
  744. return nil, err
  745. }
  746. remote := os.Getenv(remoteEnabled)
  747. remoteEnabled := false
  748. if os.Getenv(remote) == "true" {
  749. remoteEnabled = true
  750. }
  751. if c.ClusterName != "" {
  752. m := make(map[string]string)
  753. m["name"] = c.ClusterName
  754. m["provider"] = "AWS"
  755. m["id"] = os.Getenv(clusterIDKey)
  756. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  757. return m, nil
  758. }
  759. makeStructure := func(clusterName string) (map[string]string, error) {
  760. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  761. m := make(map[string]string)
  762. m["name"] = clusterName
  763. m["provider"] = "AWS"
  764. m["id"] = os.Getenv(clusterIDKey)
  765. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  766. return m, nil
  767. }
  768. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  769. if len(maybeClusterId) != 0 {
  770. return makeStructure(maybeClusterId)
  771. }
  772. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  773. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  774. nodeList := awsProvider.Clientset.GetAllNodes()
  775. for _, n := range nodeList {
  776. region := ""
  777. instanceId := ""
  778. providerId := n.Spec.ProviderID
  779. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  780. if matchNum == 1 {
  781. region = group
  782. } else if matchNum == 2 {
  783. instanceId = group
  784. }
  785. }
  786. if len(instanceId) == 0 {
  787. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  788. continue
  789. }
  790. c := &aws.Config{
  791. Region: aws.String(region),
  792. }
  793. s := session.Must(session.NewSession(c))
  794. ec2Svc := ec2.New(s)
  795. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  796. InstanceIds: []*string{
  797. aws.String(instanceId),
  798. },
  799. })
  800. if diErr != nil {
  801. // maybe log this?
  802. continue
  803. }
  804. if len(di.Reservations) != 1 {
  805. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  806. continue
  807. }
  808. res := di.Reservations[0]
  809. if len(res.Instances) != 1 {
  810. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  811. continue
  812. }
  813. inst := res.Instances[0]
  814. for _, tag := range inst.Tags {
  815. tagKey := *tag.Key
  816. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  817. if matchNum != 1 {
  818. continue
  819. }
  820. return makeStructure(group)
  821. }
  822. }
  823. }
  824. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  825. return makeStructure(defaultClusterName)
  826. }
  827. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  828. func (*AWS) AddServiceKey(formValues url.Values) error {
  829. keyID := formValues.Get("access_key_ID")
  830. key := formValues.Get("secret_access_key")
  831. m := make(map[string]string)
  832. m["access_key_ID"] = keyID
  833. m["secret_access_key"] = key
  834. result, err := json.Marshal(m)
  835. if err != nil {
  836. return err
  837. }
  838. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  839. }
  840. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  841. func (*AWS) GetDisks() ([]byte, error) {
  842. jsonFile, err := os.Open("/var/configs/key.json")
  843. if err == nil {
  844. byteValue, _ := ioutil.ReadAll(jsonFile)
  845. var result map[string]string
  846. err := json.Unmarshal([]byte(byteValue), &result)
  847. if err != nil {
  848. return nil, err
  849. }
  850. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  851. if err != nil {
  852. return nil, err
  853. }
  854. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  855. if err != nil {
  856. return nil, err
  857. }
  858. } else if os.IsNotExist(err) {
  859. klog.V(2).Infof("Using Default Credentials")
  860. } else {
  861. return nil, err
  862. }
  863. defer jsonFile.Close()
  864. clusterConfig, err := os.Open("/var/configs/cluster.json")
  865. if err != nil {
  866. return nil, err
  867. }
  868. defer clusterConfig.Close()
  869. b, err := ioutil.ReadAll(clusterConfig)
  870. if err != nil {
  871. return nil, err
  872. }
  873. var clusterConf map[string]string
  874. err = json.Unmarshal([]byte(b), &clusterConf)
  875. if err != nil {
  876. return nil, err
  877. }
  878. region := aws.String(clusterConf["region"])
  879. c := &aws.Config{
  880. Region: region,
  881. }
  882. s := session.Must(session.NewSession(c))
  883. ec2Svc := ec2.New(s)
  884. input := &ec2.DescribeVolumesInput{}
  885. volumeResult, err := ec2Svc.DescribeVolumes(input)
  886. if err != nil {
  887. if aerr, ok := err.(awserr.Error); ok {
  888. switch aerr.Code() {
  889. default:
  890. return nil, aerr
  891. }
  892. } else {
  893. return nil, err
  894. }
  895. }
  896. return json.Marshal(volumeResult)
  897. }
  898. // ConvertToGlueColumnFormat takes a string and runs through various regex
  899. // and string replacement statements to convert it to a format compatible
  900. // with AWS Glue and Athena column names.
  901. // Following guidance from AWS provided here ('Column Names' section):
  902. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  903. // It returns a string containing the column name in proper column name format and length.
  904. func ConvertToGlueColumnFormat(column_name string) string {
  905. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  906. // An underscore is added in front of uppercase letters
  907. capital_underscore := regexp.MustCompile(`[A-Z]`)
  908. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  909. // Any non-alphanumeric characters are replaced with an underscore
  910. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  911. final = no_space_punc.ReplaceAllString(final, "_")
  912. // Duplicate underscores are removed
  913. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  914. final = no_dup_underscore.ReplaceAllString(final, "_")
  915. // Any leading and trailing underscores are removed
  916. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  917. final = no_front_end_underscore.ReplaceAllString(final, "")
  918. // Uppercase to lowercase
  919. final = strings.ToLower(final)
  920. // Longer column name than expected - remove _ left to right
  921. allowed_col_len := 128
  922. undersc_to_remove := len(final) - allowed_col_len
  923. if undersc_to_remove > 0 {
  924. final = strings.Replace(final, "_", "", undersc_to_remove)
  925. }
  926. // If removing all of the underscores still didn't
  927. // make the column name < 128 characters, trim it!
  928. if len(final) > allowed_col_len {
  929. final = final[:allowed_col_len]
  930. }
  931. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  932. return final
  933. }
  934. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  935. // "start" and "end" are dates of the format YYYY-MM-DD
  936. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  937. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  938. customPricing, err := a.GetConfig()
  939. if err != nil {
  940. return nil, err
  941. }
  942. aggregator_column_name := "resource_tags_user_" + aggregator
  943. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  944. query := fmt.Sprintf(`SELECT
  945. CAST(line_item_usage_start_date AS DATE) as start_date,
  946. %s,
  947. line_item_product_code,
  948. SUM(line_item_blended_cost) as blended_cost
  949. FROM %s as cost_data
  950. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  951. GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
  952. if customPricing.ServiceKeyName != "" {
  953. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  954. if err != nil {
  955. return nil, err
  956. }
  957. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  958. if err != nil {
  959. return nil, err
  960. }
  961. }
  962. region := aws.String(customPricing.AthenaRegion)
  963. resultsBucket := customPricing.AthenaBucketName
  964. database := customPricing.AthenaDatabase
  965. c := &aws.Config{
  966. Region: region,
  967. }
  968. s := session.Must(session.NewSession(c))
  969. svc := athena.New(s)
  970. var e athena.StartQueryExecutionInput
  971. var r athena.ResultConfiguration
  972. r.SetOutputLocation(resultsBucket)
  973. e.SetResultConfiguration(&r)
  974. e.SetQueryString(query)
  975. var q athena.QueryExecutionContext
  976. q.SetDatabase(database)
  977. e.SetQueryExecutionContext(&q)
  978. res, err := svc.StartQueryExecution(&e)
  979. if err != nil {
  980. return nil, err
  981. }
  982. klog.V(2).Infof("StartQueryExecution result:")
  983. klog.V(2).Infof(res.GoString())
  984. var qri athena.GetQueryExecutionInput
  985. qri.SetQueryExecutionId(*res.QueryExecutionId)
  986. var qrop *athena.GetQueryExecutionOutput
  987. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  988. for {
  989. qrop, err = svc.GetQueryExecution(&qri)
  990. if err != nil {
  991. return nil, err
  992. }
  993. if *qrop.QueryExecution.Status.State != "RUNNING" {
  994. break
  995. }
  996. time.Sleep(duration)
  997. }
  998. var oocAllocs []*OutOfClusterAllocation
  999. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1000. var ip athena.GetQueryResultsInput
  1001. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1002. op, err := svc.GetQueryResults(&ip)
  1003. if err != nil {
  1004. return nil, err
  1005. }
  1006. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1007. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1008. if err != nil {
  1009. return nil, err
  1010. }
  1011. ooc := &OutOfClusterAllocation{
  1012. Aggregator: aggregator,
  1013. Environment: *r.Data[1].VarCharValue,
  1014. Service: *r.Data[2].VarCharValue,
  1015. Cost: cost,
  1016. }
  1017. oocAllocs = append(oocAllocs, ooc)
  1018. }
  1019. }
  1020. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  1021. }
  1022. // QuerySQL can query a properly configured Athena database.
  1023. // Used to fetch billing data.
  1024. // Requires a json config in /var/configs with key region, output, and database.
  1025. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1026. customPricing, err := a.GetConfig()
  1027. if err != nil {
  1028. return nil, err
  1029. }
  1030. if customPricing.ServiceKeyName != "" {
  1031. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1032. if err != nil {
  1033. return nil, err
  1034. }
  1035. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1036. if err != nil {
  1037. return nil, err
  1038. }
  1039. }
  1040. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1041. if err != nil {
  1042. return nil, err
  1043. }
  1044. defer athenaConfigs.Close()
  1045. b, err := ioutil.ReadAll(athenaConfigs)
  1046. if err != nil {
  1047. return nil, err
  1048. }
  1049. var athenaConf map[string]string
  1050. json.Unmarshal([]byte(b), &athenaConf)
  1051. region := aws.String(customPricing.AthenaRegion)
  1052. resultsBucket := customPricing.AthenaBucketName
  1053. database := customPricing.AthenaDatabase
  1054. c := &aws.Config{
  1055. Region: region,
  1056. }
  1057. s := session.Must(session.NewSession(c))
  1058. svc := athena.New(s)
  1059. var e athena.StartQueryExecutionInput
  1060. var r athena.ResultConfiguration
  1061. r.SetOutputLocation(resultsBucket)
  1062. e.SetResultConfiguration(&r)
  1063. e.SetQueryString(query)
  1064. var q athena.QueryExecutionContext
  1065. q.SetDatabase(database)
  1066. e.SetQueryExecutionContext(&q)
  1067. res, err := svc.StartQueryExecution(&e)
  1068. if err != nil {
  1069. return nil, err
  1070. }
  1071. klog.V(2).Infof("StartQueryExecution result:")
  1072. klog.V(2).Infof(res.GoString())
  1073. var qri athena.GetQueryExecutionInput
  1074. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1075. var qrop *athena.GetQueryExecutionOutput
  1076. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1077. for {
  1078. qrop, err = svc.GetQueryExecution(&qri)
  1079. if err != nil {
  1080. return nil, err
  1081. }
  1082. if *qrop.QueryExecution.Status.State != "RUNNING" {
  1083. break
  1084. }
  1085. time.Sleep(duration)
  1086. }
  1087. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1088. var ip athena.GetQueryResultsInput
  1089. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1090. op, err := svc.GetQueryResults(&ip)
  1091. if err != nil {
  1092. return nil, err
  1093. }
  1094. b, err := json.Marshal(op.ResultSet)
  1095. if err != nil {
  1096. return nil, err
  1097. }
  1098. return b, nil
  1099. }
  1100. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1101. }
  1102. type spotInfo struct {
  1103. Timestamp string `csv:"Timestamp"`
  1104. UsageType string `csv:"UsageType"`
  1105. Operation string `csv:"Operation"`
  1106. InstanceID string `csv:"InstanceID"`
  1107. MyBidID string `csv:"MyBidID"`
  1108. MyMaxPrice string `csv:"MyMaxPrice"`
  1109. MarketPrice string `csv:"MarketPrice"`
  1110. Charge string `csv:"Charge"`
  1111. Version string `csv:"Version"`
  1112. }
  1113. type fnames []*string
  1114. func (f fnames) Len() int {
  1115. return len(f)
  1116. }
  1117. func (f fnames) Swap(i, j int) {
  1118. f[i], f[j] = f[j], f[i]
  1119. }
  1120. func (f fnames) Less(i, j int) bool {
  1121. key1 := strings.Split(*f[i], ".")
  1122. key2 := strings.Split(*f[j], ".")
  1123. t1, err := time.Parse("2006-01-02-15", key1[1])
  1124. if err != nil {
  1125. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1126. return false
  1127. }
  1128. t2, err := time.Parse("2006-01-02-15", key2[1])
  1129. if err != nil {
  1130. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1131. return false
  1132. }
  1133. return t1.Before(t2)
  1134. }
  1135. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1136. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1137. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  1138. if err != nil {
  1139. return nil, err
  1140. }
  1141. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  1142. if err != nil {
  1143. return nil, err
  1144. }
  1145. }
  1146. s3Prefix := projectID
  1147. if len(prefix) != 0 {
  1148. s3Prefix = prefix + "/" + s3Prefix
  1149. }
  1150. c := aws.NewConfig().WithRegion(region)
  1151. s := session.Must(session.NewSession(c))
  1152. s3Svc := s3.New(s)
  1153. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1154. tNow := time.Now()
  1155. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1156. ls := &s3.ListObjectsInput{
  1157. Bucket: aws.String(bucket),
  1158. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1159. }
  1160. ls2 := &s3.ListObjectsInput{
  1161. Bucket: aws.String(bucket),
  1162. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1163. }
  1164. lso, err := s3Svc.ListObjects(ls)
  1165. if err != nil {
  1166. return nil, err
  1167. }
  1168. lsoLen := len(lso.Contents)
  1169. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1170. if lsoLen == 0 {
  1171. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1172. }
  1173. lso2, err := s3Svc.ListObjects(ls2)
  1174. if err != nil {
  1175. return nil, err
  1176. }
  1177. lso2Len := len(lso2.Contents)
  1178. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1179. if lso2Len == 0 {
  1180. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1181. }
  1182. var keys []*string
  1183. for _, obj := range lso.Contents {
  1184. keys = append(keys, obj.Key)
  1185. }
  1186. for _, obj := range lso2.Contents {
  1187. keys = append(keys, obj.Key)
  1188. }
  1189. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1190. header, err := csvutil.Header(spotInfo{}, "csv")
  1191. if err != nil {
  1192. return nil, err
  1193. }
  1194. fieldsPerRecord := len(header)
  1195. spots := make(map[string]*spotInfo)
  1196. for _, key := range keys {
  1197. getObj := &s3.GetObjectInput{
  1198. Bucket: aws.String(bucket),
  1199. Key: key,
  1200. }
  1201. buf := aws.NewWriteAtBuffer([]byte{})
  1202. _, err := downloader.Download(buf, getObj)
  1203. if err != nil {
  1204. return nil, err
  1205. }
  1206. r := bytes.NewReader(buf.Bytes())
  1207. gr, err := gzip.NewReader(r)
  1208. if err != nil {
  1209. return nil, err
  1210. }
  1211. csvReader := csv.NewReader(gr)
  1212. csvReader.Comma = '\t'
  1213. csvReader.FieldsPerRecord = fieldsPerRecord
  1214. dec, err := csvutil.NewDecoder(csvReader, header...)
  1215. if err != nil {
  1216. return nil, err
  1217. }
  1218. var foundVersion string
  1219. for {
  1220. spot := spotInfo{}
  1221. err := dec.Decode(&spot)
  1222. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1223. if err == io.EOF {
  1224. break
  1225. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1226. rec := dec.Record()
  1227. // the first two "Record()" will be the comment lines
  1228. // and they show up as len() == 1
  1229. // the first of which is "#Version"
  1230. // the second of which is "#Fields: "
  1231. if len(rec) != 1 {
  1232. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1233. continue
  1234. }
  1235. if len(foundVersion) == 0 {
  1236. spotFeedVersion := rec[0]
  1237. klog.V(3).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1238. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1239. if matches != nil {
  1240. foundVersion = matches[1]
  1241. if foundVersion != supportedSpotFeedVersion {
  1242. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1243. break
  1244. }
  1245. }
  1246. continue
  1247. } else if strings.Index(rec[0], "#") == 0 {
  1248. continue
  1249. } else {
  1250. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1251. continue
  1252. }
  1253. } else if err != nil {
  1254. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1255. continue
  1256. }
  1257. klog.V(3).Infof("Found spot info %+v", spot)
  1258. spots[spot.InstanceID] = &spot
  1259. }
  1260. gr.Close()
  1261. }
  1262. return spots, nil
  1263. }
  1264. func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1265. }
  1266. /*
  1267. func (aws *AWS) getReservedInstances() ([]interface{}, error) {
  1268. customPricing, err := a.GetConfig()
  1269. if err != nil {
  1270. return nil, err
  1271. }
  1272. if customPricing.ServiceKeyName != "" {
  1273. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1274. if err != nil {
  1275. return nil, err
  1276. }
  1277. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1278. if err != nil {
  1279. return nil, err
  1280. }
  1281. }
  1282. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1283. if err != nil {
  1284. return nil, err
  1285. }
  1286. defer athenaConfigs.Close()
  1287. b, err := ioutil.ReadAll(athenaConfigs)
  1288. if err != nil {
  1289. return nil, err
  1290. }
  1291. var athenaConf map[string]string
  1292. json.Unmarshal([]byte(b), &athenaConf)
  1293. region := aws.String(customPricing.AthenaRegion)
  1294. c := &aws.Config{
  1295. Region: region,
  1296. }
  1297. s := session.Must(session.NewSession(c))
  1298. svc := ec2.New(s)
  1299. svc.DescribeReservedInstances()
  1300. }
  1301. */