awsprovider.go 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "k8s.io/klog"
  19. "github.com/kubecost/cost-model/clustercache"
  20. "github.com/aws/aws-sdk-go/aws"
  21. "github.com/aws/aws-sdk-go/aws/awserr"
  22. "github.com/aws/aws-sdk-go/aws/session"
  23. "github.com/aws/aws-sdk-go/service/athena"
  24. "github.com/aws/aws-sdk-go/service/ec2"
  25. "github.com/aws/aws-sdk-go/service/s3"
  26. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  27. "github.com/jszwec/csvutil"
  28. v1 "k8s.io/api/core/v1"
  29. )
  30. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  31. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  32. const supportedSpotFeedVersion = "1"
  33. const SpotInfoUpdateType = "spotinfo"
  34. const AthenaInfoUpdateType = "athenainfo"
  35. // AWS represents an Amazon Provider
  36. type AWS struct {
  37. Pricing map[string]*AWSProductTerms
  38. SpotPricingByInstanceID map[string]*spotInfo
  39. ValidPricingKeys map[string]bool
  40. Clientset clustercache.ClusterCache
  41. BaseCPUPrice string
  42. BaseRAMPrice string
  43. BaseGPUPrice string
  44. BaseSpotCPUPrice string
  45. BaseSpotRAMPrice string
  46. SpotLabelName string
  47. SpotLabelValue string
  48. ServiceKeyName string
  49. ServiceKeySecret string
  50. SpotDataRegion string
  51. SpotDataBucket string
  52. SpotDataPrefix string
  53. ProjectID string
  54. DownloadPricingDataLock sync.RWMutex
  55. *CustomProvider
  56. }
  57. // AWSPricing maps a k8s node to an AWS Pricing "product"
  58. type AWSPricing struct {
  59. Products map[string]*AWSProduct `json:"products"`
  60. Terms AWSPricingTerms `json:"terms"`
  61. }
  62. // AWSProduct represents a purchased SKU
  63. type AWSProduct struct {
  64. Sku string `json:"sku"`
  65. Attributes AWSProductAttributes `json:"attributes"`
  66. }
  67. // AWSProductAttributes represents metadata about the product used to map to a node.
  68. type AWSProductAttributes struct {
  69. Location string `json:"location"`
  70. InstanceType string `json:"instanceType"`
  71. Memory string `json:"memory"`
  72. Storage string `json:"storage"`
  73. VCpu string `json:"vcpu"`
  74. UsageType string `json:"usagetype"`
  75. OperatingSystem string `json:"operatingSystem"`
  76. PreInstalledSw string `json:"preInstalledSw"`
  77. InstanceFamily string `json:"instanceFamily"`
  78. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  79. }
  80. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  81. type AWSPricingTerms struct {
  82. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  83. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  84. }
  85. // AWSOfferTerm is a sku extension used to pay for the node.
  86. type AWSOfferTerm struct {
  87. Sku string `json:"sku"`
  88. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  89. }
  90. // AWSRateCode encodes data about the price of a product
  91. type AWSRateCode struct {
  92. Unit string `json:"unit"`
  93. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  94. }
  95. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  96. type AWSCurrencyCode struct {
  97. USD string `json:"USD"`
  98. }
  99. // AWSProductTerms represents the full terms of the product
  100. type AWSProductTerms struct {
  101. Sku string `json:"sku"`
  102. OnDemand *AWSOfferTerm `json:"OnDemand"`
  103. Reserved *AWSOfferTerm `json:"Reserved"`
  104. Memory string `json:"memory"`
  105. Storage string `json:"storage"`
  106. VCpu string `json:"vcpu"`
  107. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  108. PV *PV `json:"pv"`
  109. }
  110. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  111. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  112. // OnDemandRateCode is appended to an node sku
  113. const OnDemandRateCode = ".JRTCKXETXF"
  114. // ReservedRateCode is appended to a node sku
  115. const ReservedRateCode = ".38NPMPTW36"
  116. // HourlyRateCode is appended to a node sku
  117. const HourlyRateCode = ".6YS6EN2CT7"
  118. // volTypes are used to map between AWS UsageTypes and
  119. // EBS volume types, as they would appear in K8s storage class
  120. // name and the EC2 API.
  121. var volTypes = map[string]string{
  122. "EBS:VolumeUsage.gp2": "gp2",
  123. "EBS:VolumeUsage": "standard",
  124. "EBS:VolumeUsage.sc1": "sc1",
  125. "EBS:VolumeP-IOPS.piops": "io1",
  126. "EBS:VolumeUsage.st1": "st1",
  127. "EBS:VolumeUsage.piops": "io1",
  128. "gp2": "EBS:VolumeUsage.gp2",
  129. "standard": "EBS:VolumeUsage",
  130. "sc1": "EBS:VolumeUsage.sc1",
  131. "io1": "EBS:VolumeUsage.piops",
  132. "st1": "EBS:VolumeUsage.st1",
  133. }
  134. // locationToRegion maps AWS region names (As they come from Billing)
  135. // to actual region identifiers
  136. var locationToRegion = map[string]string{
  137. "US East (Ohio)": "us-east-2",
  138. "US East (N. Virginia)": "us-east-1",
  139. "US West (N. California)": "us-west-1",
  140. "US West (Oregon)": "us-west-2",
  141. "Asia Pacific (Hong Kong)": "ap-east-1",
  142. "Asia Pacific (Mumbai)": "ap-south-1",
  143. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  144. "Asia Pacific (Seoul)": "ap-northeast-2",
  145. "Asia Pacific (Singapore)": "ap-southeast-1",
  146. "Asia Pacific (Sydney)": "ap-southeast-2",
  147. "Asia Pacific (Tokyo)": "ap-northeast-1",
  148. "Canada (Central)": "ca-central-1",
  149. "China (Beijing)": "cn-north-1",
  150. "China (Ningxia)": "cn-northwest-1",
  151. "EU (Frankfurt)": "eu-central-1",
  152. "EU (Ireland)": "eu-west-1",
  153. "EU (London)": "eu-west-2",
  154. "EU (Paris)": "eu-west-3",
  155. "EU (Stockholm)": "eu-north-1",
  156. "South America (Sao Paulo)": "sa-east-1",
  157. "AWS GovCloud (US-East)": "us-gov-east-1",
  158. "AWS GovCloud (US)": "us-gov-west-1",
  159. }
  160. var regionToBillingRegionCode = map[string]string{
  161. "us-east-2": "USE2",
  162. "us-east-1": "",
  163. "us-west-1": "USW1",
  164. "us-west-2": "USW2",
  165. "ap-east-1": "APE1",
  166. "ap-south-1": "APS3",
  167. "ap-northeast-3": "APN3",
  168. "ap-northeast-2": "APN2",
  169. "ap-southeast-1": "APS1",
  170. "ap-southeast-2": "APS2",
  171. "ap-northeast-1": "APN1",
  172. "ca-central-1": "CAN1",
  173. "cn-north-1": "",
  174. "cn-northwest-1": "",
  175. "eu-central-1": "EUC1",
  176. "eu-west-1": "EU",
  177. "eu-west-2": "EUW2",
  178. "eu-west-3": "EUW3",
  179. "eu-north-1": "EUN1",
  180. "sa-east-1": "SAE1",
  181. "us-gov-east-1": "UGE1",
  182. "us-gov-west-1": "UGW1",
  183. }
  184. func (aws *AWS) GetLocalStorageQuery(offset string) (string, error) {
  185. return "", nil
  186. }
  187. // KubeAttrConversion maps the k8s labels for region to an aws region
  188. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  189. operatingSystem = strings.ToLower(operatingSystem)
  190. region := locationToRegion[location]
  191. return region + "," + instanceType + "," + operatingSystem
  192. }
  193. type AwsSpotFeedInfo struct {
  194. BucketName string `json:"bucketName"`
  195. Prefix string `json:"prefix"`
  196. Region string `json:"region"`
  197. AccountID string `json:"projectID"`
  198. ServiceKeyName string `json:"serviceKeyName"`
  199. ServiceKeySecret string `json:"serviceKeySecret"`
  200. SpotLabel string `json:"spotLabel"`
  201. SpotLabelValue string `json:"spotLabelValue"`
  202. }
  203. type AwsAthenaInfo struct {
  204. AthenaBucketName string `json:"athenaBucketName"`
  205. AthenaRegion string `json:"athenaRegion"`
  206. AthenaDatabase string `json:"athenaDatabase"`
  207. AthenaTable string `json:"athenaTable"`
  208. ServiceKeyName string `json:"serviceKeyName"`
  209. ServiceKeySecret string `json:"serviceKeySecret"`
  210. AccountID string `json:"projectID"`
  211. }
  212. func (aws *AWS) GetManagementPlatform() (string, error) {
  213. nodes := aws.Clientset.GetAllNodes()
  214. if len(nodes) > 0 {
  215. n := nodes[0]
  216. version := n.Status.NodeInfo.KubeletVersion
  217. if strings.Contains(version, "eks") {
  218. return "eks", nil
  219. }
  220. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  221. return "kops", nil
  222. }
  223. }
  224. return "", nil
  225. }
  226. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  227. c, err := GetDefaultPricingData("aws.json")
  228. if c.Discount == "" {
  229. c.Discount = "0%"
  230. }
  231. if err != nil {
  232. return nil, err
  233. }
  234. return c, nil
  235. }
  236. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  237. c, err := GetDefaultPricingData("aws.json")
  238. if err != nil {
  239. return nil, err
  240. }
  241. if updateType == SpotInfoUpdateType {
  242. a := AwsSpotFeedInfo{}
  243. err := json.NewDecoder(r).Decode(&a)
  244. if err != nil {
  245. return nil, err
  246. }
  247. if err != nil {
  248. return nil, err
  249. }
  250. c.ServiceKeyName = a.ServiceKeyName
  251. c.ServiceKeySecret = a.ServiceKeySecret
  252. c.SpotDataPrefix = a.Prefix
  253. c.SpotDataBucket = a.BucketName
  254. c.ProjectID = a.AccountID
  255. c.SpotDataRegion = a.Region
  256. c.SpotLabel = a.SpotLabel
  257. c.SpotLabelValue = a.SpotLabelValue
  258. } else if updateType == AthenaInfoUpdateType {
  259. a := AwsAthenaInfo{}
  260. err := json.NewDecoder(r).Decode(&a)
  261. if err != nil {
  262. return nil, err
  263. }
  264. c.AthenaBucketName = a.AthenaBucketName
  265. c.AthenaRegion = a.AthenaRegion
  266. c.AthenaDatabase = a.AthenaDatabase
  267. c.AthenaTable = a.AthenaTable
  268. c.ServiceKeyName = a.ServiceKeyName
  269. c.ServiceKeySecret = a.ServiceKeySecret
  270. c.ProjectID = a.AccountID
  271. } else {
  272. a := make(map[string]string)
  273. err = json.NewDecoder(r).Decode(&a)
  274. if err != nil {
  275. return nil, err
  276. }
  277. for k, v := range a {
  278. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  279. err := SetCustomPricingField(c, kUpper, v)
  280. if err != nil {
  281. return nil, err
  282. }
  283. }
  284. }
  285. cj, err := json.Marshal(c)
  286. if err != nil {
  287. return nil, err
  288. }
  289. path := os.Getenv("CONFIG_PATH")
  290. if path == "" {
  291. path = "/models/"
  292. }
  293. path += "aws.json"
  294. remoteEnabled := os.Getenv(remoteEnabled)
  295. if remoteEnabled == "true" {
  296. err = UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
  297. if err != nil {
  298. return nil, err
  299. }
  300. }
  301. err = ioutil.WriteFile(path, cj, 0644)
  302. if err != nil {
  303. return nil, err
  304. }
  305. return c, nil
  306. }
  307. type awsKey struct {
  308. SpotLabelName string
  309. SpotLabelValue string
  310. Labels map[string]string
  311. ProviderID string
  312. }
  313. func (k *awsKey) GPUType() string {
  314. return ""
  315. }
  316. func (k *awsKey) ID() string {
  317. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  318. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  319. if matchNum == 2 {
  320. return group
  321. }
  322. }
  323. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  324. return ""
  325. }
  326. func (k *awsKey) Features() string {
  327. instanceType := k.Labels[v1.LabelInstanceType]
  328. var operatingSystem string
  329. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  330. if !ok {
  331. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  332. }
  333. region := k.Labels[v1.LabelZoneRegion]
  334. key := region + "," + instanceType + "," + operatingSystem
  335. usageType := "preemptible"
  336. spotKey := key + "," + usageType
  337. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  338. return spotKey
  339. }
  340. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  341. return spotKey
  342. }
  343. return key
  344. }
  345. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  346. pricing, ok := aws.Pricing[pvk.Features()]
  347. if !ok {
  348. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  349. return &PV{}, nil
  350. }
  351. return pricing.PV, nil
  352. }
  353. type awsPVKey struct {
  354. Labels map[string]string
  355. StorageClassParameters map[string]string
  356. StorageClassName string
  357. Name string
  358. }
  359. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string) PVKey {
  360. return &awsPVKey{
  361. Labels: pv.Labels,
  362. StorageClassName: pv.Spec.StorageClassName,
  363. StorageClassParameters: parameters,
  364. Name: pv.Name,
  365. }
  366. }
  367. func (key *awsPVKey) GetStorageClass() string {
  368. return key.StorageClassName
  369. }
  370. func (key *awsPVKey) Features() string {
  371. storageClass := key.StorageClassParameters["type"]
  372. if storageClass == "standard" {
  373. storageClass = "gp2"
  374. }
  375. // Storage class names are generally EBS volume types (gp2)
  376. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  377. // Converts between the 2
  378. region := key.Labels[v1.LabelZoneRegion]
  379. //if region == "" {
  380. // region = "us-east-1"
  381. //}
  382. class, ok := volTypes[storageClass]
  383. if !ok {
  384. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  385. }
  386. return region + "," + class
  387. }
  388. // GetKey maps node labels to information needed to retrieve pricing data
  389. func (aws *AWS) GetKey(labels map[string]string) Key {
  390. return &awsKey{
  391. SpotLabelName: aws.SpotLabelName,
  392. SpotLabelValue: aws.SpotLabelValue,
  393. Labels: labels,
  394. ProviderID: labels["providerID"],
  395. }
  396. }
  397. func (aws *AWS) isPreemptible(key string) bool {
  398. s := strings.Split(key, ",")
  399. if len(s) == 4 && s[3] == "preemptible" {
  400. return true
  401. }
  402. return false
  403. }
  404. // DownloadPricingData fetches data from the AWS Pricing API
  405. func (aws *AWS) DownloadPricingData() error {
  406. aws.DownloadPricingDataLock.Lock()
  407. defer aws.DownloadPricingDataLock.Unlock()
  408. c, err := GetDefaultPricingData("aws.json")
  409. if err != nil {
  410. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  411. }
  412. aws.BaseCPUPrice = c.CPU
  413. aws.BaseRAMPrice = c.RAM
  414. aws.BaseGPUPrice = c.GPU
  415. aws.BaseSpotCPUPrice = c.SpotCPU
  416. aws.BaseSpotRAMPrice = c.SpotRAM
  417. aws.SpotLabelName = c.SpotLabel
  418. aws.SpotLabelValue = c.SpotLabelValue
  419. aws.SpotDataBucket = c.SpotDataBucket
  420. aws.SpotDataPrefix = c.SpotDataPrefix
  421. aws.ProjectID = c.ProjectID
  422. aws.SpotDataRegion = c.SpotDataRegion
  423. aws.ServiceKeyName = c.ServiceKeyName
  424. aws.ServiceKeySecret = c.ServiceKeySecret
  425. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  426. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  427. }
  428. nodeList := aws.Clientset.GetAllNodes()
  429. inputkeys := make(map[string]bool)
  430. for _, n := range nodeList {
  431. labels := n.GetObjectMeta().GetLabels()
  432. key := aws.GetKey(labels)
  433. inputkeys[key.Features()] = true
  434. }
  435. pvList := aws.Clientset.GetAllPersistentVolumes()
  436. storageClasses := aws.Clientset.GetAllStorageClasses()
  437. storageClassMap := make(map[string]map[string]string)
  438. for _, storageClass := range storageClasses {
  439. params := storageClass.Parameters
  440. storageClassMap[storageClass.ObjectMeta.Name] = params
  441. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  442. storageClassMap["default"] = params
  443. storageClassMap[""] = params
  444. }
  445. }
  446. pvkeys := make(map[string]PVKey)
  447. for _, pv := range pvList {
  448. params, ok := storageClassMap[pv.Spec.StorageClassName]
  449. if !ok {
  450. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  451. continue
  452. }
  453. key := aws.GetPVKey(pv, params)
  454. pvkeys[key.Features()] = key
  455. }
  456. aws.Pricing = make(map[string]*AWSProductTerms)
  457. aws.ValidPricingKeys = make(map[string]bool)
  458. skusToKeys := make(map[string]string)
  459. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  460. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  461. resp, err := http.Get(pricingURL)
  462. if err != nil {
  463. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  464. return err
  465. }
  466. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  467. dec := json.NewDecoder(resp.Body)
  468. for {
  469. t, err := dec.Token()
  470. if err == io.EOF {
  471. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  472. break
  473. }
  474. if t == "products" {
  475. _, err := dec.Token() // this should parse the opening "{""
  476. if err != nil {
  477. return err
  478. }
  479. for dec.More() {
  480. _, err := dec.Token() // the sku token
  481. if err != nil {
  482. return err
  483. }
  484. product := &AWSProduct{}
  485. err = dec.Decode(&product)
  486. if err != nil {
  487. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  488. break
  489. }
  490. if product.Attributes.PreInstalledSw == "NA" &&
  491. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  492. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  493. spotKey := key + ",preemptible"
  494. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  495. productTerms := &AWSProductTerms{
  496. Sku: product.Sku,
  497. Memory: product.Attributes.Memory,
  498. Storage: product.Attributes.Storage,
  499. VCpu: product.Attributes.VCpu,
  500. GPU: product.Attributes.GPU,
  501. }
  502. aws.Pricing[key] = productTerms
  503. aws.Pricing[spotKey] = productTerms
  504. skusToKeys[product.Sku] = key
  505. }
  506. aws.ValidPricingKeys[key] = true
  507. aws.ValidPricingKeys[spotKey] = true
  508. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  509. // UsageTypes may be prefixed with a region code - we're removing this when using
  510. // volTypes to keep lookups generic
  511. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  512. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  513. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  514. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  515. spotKey := key + ",preemptible"
  516. pv := &PV{
  517. Class: volTypes[usageTypeNoRegion],
  518. Region: locationToRegion[product.Attributes.Location],
  519. }
  520. productTerms := &AWSProductTerms{
  521. Sku: product.Sku,
  522. PV: pv,
  523. }
  524. aws.Pricing[key] = productTerms
  525. aws.Pricing[spotKey] = productTerms
  526. skusToKeys[product.Sku] = key
  527. aws.ValidPricingKeys[key] = true
  528. aws.ValidPricingKeys[spotKey] = true
  529. }
  530. }
  531. }
  532. if t == "terms" {
  533. _, err := dec.Token() // this should parse the opening "{""
  534. if err != nil {
  535. return err
  536. }
  537. termType, err := dec.Token()
  538. if err != nil {
  539. return err
  540. }
  541. if termType == "OnDemand" {
  542. _, err := dec.Token()
  543. if err != nil { // again, should parse an opening "{"
  544. return err
  545. }
  546. for dec.More() {
  547. sku, err := dec.Token()
  548. if err != nil {
  549. return err
  550. }
  551. _, err = dec.Token() // another opening "{"
  552. if err != nil {
  553. return err
  554. }
  555. skuOnDemand, err := dec.Token()
  556. if err != nil {
  557. return err
  558. }
  559. offerTerm := &AWSOfferTerm{}
  560. err = dec.Decode(&offerTerm)
  561. if err != nil {
  562. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  563. }
  564. if sku.(string)+OnDemandRateCode == skuOnDemand {
  565. key, ok := skusToKeys[sku.(string)]
  566. spotKey := key + ",preemptible"
  567. if ok {
  568. aws.Pricing[key].OnDemand = offerTerm
  569. aws.Pricing[spotKey].OnDemand = offerTerm
  570. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  571. // If the specific UsageType is the per IO cost used on io1 volumes
  572. // we need to add the per IO cost to the io1 PV cost
  573. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  574. // Add the per IO cost to the PV object for the io1 volume type
  575. aws.Pricing[key].PV.CostPerIO = cost
  576. } else if strings.Contains(key, "EBS:Volume") {
  577. // If volume, we need to get hourly cost and add it to the PV object
  578. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  579. costFloat, _ := strconv.ParseFloat(cost, 64)
  580. hourlyPrice := costFloat / 730
  581. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  582. }
  583. }
  584. }
  585. _, err = dec.Token()
  586. if err != nil {
  587. return err
  588. }
  589. }
  590. _, err = dec.Token()
  591. if err != nil {
  592. return err
  593. }
  594. }
  595. }
  596. }
  597. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  598. if err != nil {
  599. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  600. } else {
  601. aws.SpotPricingByInstanceID = sp
  602. }
  603. return nil
  604. }
  605. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  606. func (c *AWS) NetworkPricing() (*Network, error) {
  607. cpricing, err := GetDefaultPricingData("aws.json")
  608. if err != nil {
  609. return nil, err
  610. }
  611. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  612. if err != nil {
  613. return nil, err
  614. }
  615. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  616. if err != nil {
  617. return nil, err
  618. }
  619. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  620. if err != nil {
  621. return nil, err
  622. }
  623. return &Network{
  624. ZoneNetworkEgressCost: znec,
  625. RegionNetworkEgressCost: rnec,
  626. InternetNetworkEgressCost: inec,
  627. }, nil
  628. }
  629. // AllNodePricing returns all the billing data fetched.
  630. func (aws *AWS) AllNodePricing() (interface{}, error) {
  631. aws.DownloadPricingDataLock.RLock()
  632. defer aws.DownloadPricingDataLock.RUnlock()
  633. return aws.Pricing, nil
  634. }
  635. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  636. key := k.Features()
  637. if aws.isPreemptible(key) {
  638. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  639. var spotcost string
  640. arr := strings.Split(spotInfo.Charge, " ")
  641. if len(arr) == 2 {
  642. spotcost = arr[0]
  643. } else {
  644. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  645. }
  646. return &Node{
  647. Cost: spotcost,
  648. VCPU: terms.VCpu,
  649. RAM: terms.Memory,
  650. GPU: terms.GPU,
  651. Storage: terms.Storage,
  652. BaseCPUPrice: aws.BaseCPUPrice,
  653. BaseRAMPrice: aws.BaseRAMPrice,
  654. BaseGPUPrice: aws.BaseGPUPrice,
  655. UsageType: usageType,
  656. }, nil
  657. }
  658. return &Node{
  659. VCPU: terms.VCpu,
  660. VCPUCost: aws.BaseSpotCPUPrice,
  661. RAM: terms.Memory,
  662. GPU: terms.GPU,
  663. RAMCost: aws.BaseSpotRAMPrice,
  664. Storage: terms.Storage,
  665. BaseCPUPrice: aws.BaseCPUPrice,
  666. BaseRAMPrice: aws.BaseRAMPrice,
  667. BaseGPUPrice: aws.BaseGPUPrice,
  668. UsageType: usageType,
  669. }, nil
  670. }
  671. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  672. if !ok {
  673. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  674. }
  675. cost := c.PricePerUnit.USD
  676. return &Node{
  677. Cost: cost,
  678. VCPU: terms.VCpu,
  679. RAM: terms.Memory,
  680. GPU: terms.GPU,
  681. Storage: terms.Storage,
  682. BaseCPUPrice: aws.BaseCPUPrice,
  683. BaseRAMPrice: aws.BaseRAMPrice,
  684. BaseGPUPrice: aws.BaseGPUPrice,
  685. UsageType: usageType,
  686. }, nil
  687. }
  688. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  689. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  690. aws.DownloadPricingDataLock.RLock()
  691. defer aws.DownloadPricingDataLock.RUnlock()
  692. key := k.Features()
  693. usageType := "ondemand"
  694. if aws.isPreemptible(key) {
  695. usageType = "preemptible"
  696. }
  697. terms, ok := aws.Pricing[key]
  698. if ok {
  699. return aws.createNode(terms, usageType, k)
  700. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  701. aws.DownloadPricingDataLock.RUnlock()
  702. err := aws.DownloadPricingData()
  703. aws.DownloadPricingDataLock.RLock()
  704. if err != nil {
  705. return &Node{
  706. Cost: aws.BaseCPUPrice,
  707. BaseCPUPrice: aws.BaseCPUPrice,
  708. BaseRAMPrice: aws.BaseRAMPrice,
  709. BaseGPUPrice: aws.BaseGPUPrice,
  710. UsageType: usageType,
  711. UsesBaseCPUPrice: true,
  712. }, err
  713. }
  714. terms, termsOk := aws.Pricing[key]
  715. if !termsOk {
  716. return &Node{
  717. Cost: aws.BaseCPUPrice,
  718. BaseCPUPrice: aws.BaseCPUPrice,
  719. BaseRAMPrice: aws.BaseRAMPrice,
  720. BaseGPUPrice: aws.BaseGPUPrice,
  721. UsageType: usageType,
  722. UsesBaseCPUPrice: true,
  723. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  724. }
  725. return aws.createNode(terms, usageType, k)
  726. } else { // Fall back to base pricing if we can't find the key.
  727. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  728. return &Node{
  729. Cost: aws.BaseCPUPrice,
  730. BaseCPUPrice: aws.BaseCPUPrice,
  731. BaseRAMPrice: aws.BaseRAMPrice,
  732. BaseGPUPrice: aws.BaseGPUPrice,
  733. UsageType: usageType,
  734. UsesBaseCPUPrice: true,
  735. }, nil
  736. }
  737. }
  738. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  739. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  740. defaultClusterName := "AWS Cluster #1"
  741. c, err := awsProvider.GetConfig()
  742. if err != nil {
  743. return nil, err
  744. }
  745. remote := os.Getenv(remoteEnabled)
  746. remoteEnabled := false
  747. if os.Getenv(remote) == "true" {
  748. remoteEnabled = true
  749. }
  750. if c.ClusterName != "" {
  751. m := make(map[string]string)
  752. m["name"] = c.ClusterName
  753. m["provider"] = "AWS"
  754. m["id"] = os.Getenv(clusterIDKey)
  755. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  756. return m, nil
  757. }
  758. makeStructure := func(clusterName string) (map[string]string, error) {
  759. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  760. m := make(map[string]string)
  761. m["name"] = clusterName
  762. m["provider"] = "AWS"
  763. m["id"] = os.Getenv(clusterIDKey)
  764. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  765. return m, nil
  766. }
  767. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  768. if len(maybeClusterId) != 0 {
  769. return makeStructure(maybeClusterId)
  770. }
  771. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  772. clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  773. nodeList := awsProvider.Clientset.GetAllNodes()
  774. for _, n := range nodeList {
  775. region := ""
  776. instanceId := ""
  777. providerId := n.Spec.ProviderID
  778. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  779. if matchNum == 1 {
  780. region = group
  781. } else if matchNum == 2 {
  782. instanceId = group
  783. }
  784. }
  785. if len(instanceId) == 0 {
  786. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  787. continue
  788. }
  789. c := &aws.Config{
  790. Region: aws.String(region),
  791. }
  792. s := session.Must(session.NewSession(c))
  793. ec2Svc := ec2.New(s)
  794. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  795. InstanceIds: []*string{
  796. aws.String(instanceId),
  797. },
  798. })
  799. if diErr != nil {
  800. // maybe log this?
  801. continue
  802. }
  803. if len(di.Reservations) != 1 {
  804. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  805. continue
  806. }
  807. res := di.Reservations[0]
  808. if len(res.Instances) != 1 {
  809. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  810. continue
  811. }
  812. inst := res.Instances[0]
  813. for _, tag := range inst.Tags {
  814. tagKey := *tag.Key
  815. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  816. if matchNum != 1 {
  817. continue
  818. }
  819. return makeStructure(group)
  820. }
  821. }
  822. }
  823. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  824. return makeStructure(defaultClusterName)
  825. }
  826. // AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
  827. func (*AWS) AddServiceKey(formValues url.Values) error {
  828. keyID := formValues.Get("access_key_ID")
  829. key := formValues.Get("secret_access_key")
  830. m := make(map[string]string)
  831. m["access_key_ID"] = keyID
  832. m["secret_access_key"] = key
  833. result, err := json.Marshal(m)
  834. if err != nil {
  835. return err
  836. }
  837. return ioutil.WriteFile("/var/configs/key.json", result, 0644)
  838. }
  839. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  840. func (*AWS) GetDisks() ([]byte, error) {
  841. jsonFile, err := os.Open("/var/configs/key.json")
  842. if err == nil {
  843. byteValue, _ := ioutil.ReadAll(jsonFile)
  844. var result map[string]string
  845. err := json.Unmarshal([]byte(byteValue), &result)
  846. if err != nil {
  847. return nil, err
  848. }
  849. err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
  850. if err != nil {
  851. return nil, err
  852. }
  853. err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
  854. if err != nil {
  855. return nil, err
  856. }
  857. } else if os.IsNotExist(err) {
  858. klog.V(2).Infof("Using Default Credentials")
  859. } else {
  860. return nil, err
  861. }
  862. defer jsonFile.Close()
  863. clusterConfig, err := os.Open("/var/configs/cluster.json")
  864. if err != nil {
  865. return nil, err
  866. }
  867. defer clusterConfig.Close()
  868. b, err := ioutil.ReadAll(clusterConfig)
  869. if err != nil {
  870. return nil, err
  871. }
  872. var clusterConf map[string]string
  873. err = json.Unmarshal([]byte(b), &clusterConf)
  874. if err != nil {
  875. return nil, err
  876. }
  877. region := aws.String(clusterConf["region"])
  878. c := &aws.Config{
  879. Region: region,
  880. }
  881. s := session.Must(session.NewSession(c))
  882. ec2Svc := ec2.New(s)
  883. input := &ec2.DescribeVolumesInput{}
  884. volumeResult, err := ec2Svc.DescribeVolumes(input)
  885. if err != nil {
  886. if aerr, ok := err.(awserr.Error); ok {
  887. switch aerr.Code() {
  888. default:
  889. return nil, aerr
  890. }
  891. } else {
  892. return nil, err
  893. }
  894. }
  895. return json.Marshal(volumeResult)
  896. }
  897. // ConvertToGlueColumnFormat takes a string and runs through various regex
  898. // and string replacement statements to convert it to a format compatible
  899. // with AWS Glue and Athena column names.
  900. // Following guidance from AWS provided here ('Column Names' section):
  901. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  902. // It returns a string containing the column name in proper column name format and length.
  903. func ConvertToGlueColumnFormat(column_name string) string {
  904. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  905. // An underscore is added in front of uppercase letters
  906. capital_underscore := regexp.MustCompile(`[A-Z]`)
  907. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  908. // Any non-alphanumeric characters are replaced with an underscore
  909. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  910. final = no_space_punc.ReplaceAllString(final, "_")
  911. // Duplicate underscores are removed
  912. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  913. final = no_dup_underscore.ReplaceAllString(final, "_")
  914. // Any leading and trailing underscores are removed
  915. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  916. final = no_front_end_underscore.ReplaceAllString(final, "")
  917. // Uppercase to lowercase
  918. final = strings.ToLower(final)
  919. // Longer column name than expected - remove _ left to right
  920. allowed_col_len := 128
  921. undersc_to_remove := len(final) - allowed_col_len
  922. if undersc_to_remove > 0 {
  923. final = strings.Replace(final, "_", "", undersc_to_remove)
  924. }
  925. // If removing all of the underscores still didn't
  926. // make the column name < 128 characters, trim it!
  927. if len(final) > allowed_col_len {
  928. final = final[:allowed_col_len]
  929. }
  930. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  931. return final
  932. }
  933. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  934. // "start" and "end" are dates of the format YYYY-MM-DD
  935. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  936. func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
  937. customPricing, err := a.GetConfig()
  938. if err != nil {
  939. return nil, err
  940. }
  941. aggregator_column_name := "resource_tags_user_" + aggregator
  942. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  943. query := fmt.Sprintf(`SELECT
  944. CAST(line_item_usage_start_date AS DATE) as start_date,
  945. %s,
  946. line_item_product_code,
  947. SUM(line_item_blended_cost) as blended_cost
  948. FROM %s as cost_data
  949. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  950. GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
  951. if customPricing.ServiceKeyName != "" {
  952. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  953. if err != nil {
  954. return nil, err
  955. }
  956. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  957. if err != nil {
  958. return nil, err
  959. }
  960. }
  961. region := aws.String(customPricing.AthenaRegion)
  962. resultsBucket := customPricing.AthenaBucketName
  963. database := customPricing.AthenaDatabase
  964. c := &aws.Config{
  965. Region: region,
  966. }
  967. s := session.Must(session.NewSession(c))
  968. svc := athena.New(s)
  969. var e athena.StartQueryExecutionInput
  970. var r athena.ResultConfiguration
  971. r.SetOutputLocation(resultsBucket)
  972. e.SetResultConfiguration(&r)
  973. e.SetQueryString(query)
  974. var q athena.QueryExecutionContext
  975. q.SetDatabase(database)
  976. e.SetQueryExecutionContext(&q)
  977. res, err := svc.StartQueryExecution(&e)
  978. if err != nil {
  979. return nil, err
  980. }
  981. klog.V(2).Infof("StartQueryExecution result:")
  982. klog.V(2).Infof(res.GoString())
  983. var qri athena.GetQueryExecutionInput
  984. qri.SetQueryExecutionId(*res.QueryExecutionId)
  985. var qrop *athena.GetQueryExecutionOutput
  986. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  987. for {
  988. qrop, err = svc.GetQueryExecution(&qri)
  989. if err != nil {
  990. return nil, err
  991. }
  992. if *qrop.QueryExecution.Status.State != "RUNNING" {
  993. break
  994. }
  995. time.Sleep(duration)
  996. }
  997. var oocAllocs []*OutOfClusterAllocation
  998. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  999. var ip athena.GetQueryResultsInput
  1000. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1001. op, err := svc.GetQueryResults(&ip)
  1002. if err != nil {
  1003. return nil, err
  1004. }
  1005. if len(op.ResultSet.Rows) > 1 {
  1006. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1007. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1008. if err != nil {
  1009. return nil, err
  1010. }
  1011. ooc := &OutOfClusterAllocation{
  1012. Aggregator: aggregator,
  1013. Environment: *r.Data[1].VarCharValue,
  1014. Service: *r.Data[2].VarCharValue,
  1015. Cost: cost,
  1016. }
  1017. oocAllocs = append(oocAllocs, ooc)
  1018. }
  1019. } else {
  1020. klog.V(1).Infof("No results available for %s at database %s between %s and %s", aggregator_column_name, customPricing.AthenaTable, start, end)
  1021. }
  1022. }
  1023. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  1024. }
  1025. // QuerySQL can query a properly configured Athena database.
  1026. // Used to fetch billing data.
  1027. // Requires a json config in /var/configs with key region, output, and database.
  1028. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1029. customPricing, err := a.GetConfig()
  1030. if err != nil {
  1031. return nil, err
  1032. }
  1033. if customPricing.ServiceKeyName != "" {
  1034. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1035. if err != nil {
  1036. return nil, err
  1037. }
  1038. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1039. if err != nil {
  1040. return nil, err
  1041. }
  1042. }
  1043. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1044. if err != nil {
  1045. return nil, err
  1046. }
  1047. defer athenaConfigs.Close()
  1048. b, err := ioutil.ReadAll(athenaConfigs)
  1049. if err != nil {
  1050. return nil, err
  1051. }
  1052. var athenaConf map[string]string
  1053. json.Unmarshal([]byte(b), &athenaConf)
  1054. region := aws.String(customPricing.AthenaRegion)
  1055. resultsBucket := customPricing.AthenaBucketName
  1056. database := customPricing.AthenaDatabase
  1057. c := &aws.Config{
  1058. Region: region,
  1059. }
  1060. s := session.Must(session.NewSession(c))
  1061. svc := athena.New(s)
  1062. var e athena.StartQueryExecutionInput
  1063. var r athena.ResultConfiguration
  1064. r.SetOutputLocation(resultsBucket)
  1065. e.SetResultConfiguration(&r)
  1066. e.SetQueryString(query)
  1067. var q athena.QueryExecutionContext
  1068. q.SetDatabase(database)
  1069. e.SetQueryExecutionContext(&q)
  1070. res, err := svc.StartQueryExecution(&e)
  1071. if err != nil {
  1072. return nil, err
  1073. }
  1074. klog.V(2).Infof("StartQueryExecution result:")
  1075. klog.V(2).Infof(res.GoString())
  1076. var qri athena.GetQueryExecutionInput
  1077. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1078. var qrop *athena.GetQueryExecutionOutput
  1079. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1080. for {
  1081. qrop, err = svc.GetQueryExecution(&qri)
  1082. if err != nil {
  1083. return nil, err
  1084. }
  1085. if *qrop.QueryExecution.Status.State != "RUNNING" {
  1086. break
  1087. }
  1088. time.Sleep(duration)
  1089. }
  1090. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1091. var ip athena.GetQueryResultsInput
  1092. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1093. op, err := svc.GetQueryResults(&ip)
  1094. if err != nil {
  1095. return nil, err
  1096. }
  1097. b, err := json.Marshal(op.ResultSet)
  1098. if err != nil {
  1099. return nil, err
  1100. }
  1101. return b, nil
  1102. }
  1103. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1104. }
  1105. type spotInfo struct {
  1106. Timestamp string `csv:"Timestamp"`
  1107. UsageType string `csv:"UsageType"`
  1108. Operation string `csv:"Operation"`
  1109. InstanceID string `csv:"InstanceID"`
  1110. MyBidID string `csv:"MyBidID"`
  1111. MyMaxPrice string `csv:"MyMaxPrice"`
  1112. MarketPrice string `csv:"MarketPrice"`
  1113. Charge string `csv:"Charge"`
  1114. Version string `csv:"Version"`
  1115. }
  1116. type fnames []*string
  1117. func (f fnames) Len() int {
  1118. return len(f)
  1119. }
  1120. func (f fnames) Swap(i, j int) {
  1121. f[i], f[j] = f[j], f[i]
  1122. }
  1123. func (f fnames) Less(i, j int) bool {
  1124. key1 := strings.Split(*f[i], ".")
  1125. key2 := strings.Split(*f[j], ".")
  1126. t1, err := time.Parse("2006-01-02-15", key1[1])
  1127. if err != nil {
  1128. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1129. return false
  1130. }
  1131. t2, err := time.Parse("2006-01-02-15", key2[1])
  1132. if err != nil {
  1133. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1134. return false
  1135. }
  1136. return t1.Before(t2)
  1137. }
  1138. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1139. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1140. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  1141. if err != nil {
  1142. return nil, err
  1143. }
  1144. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  1145. if err != nil {
  1146. return nil, err
  1147. }
  1148. }
  1149. s3Prefix := projectID
  1150. if len(prefix) != 0 {
  1151. s3Prefix = prefix + "/" + s3Prefix
  1152. }
  1153. c := aws.NewConfig().WithRegion(region)
  1154. s := session.Must(session.NewSession(c))
  1155. s3Svc := s3.New(s)
  1156. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1157. tNow := time.Now()
  1158. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1159. ls := &s3.ListObjectsInput{
  1160. Bucket: aws.String(bucket),
  1161. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1162. }
  1163. ls2 := &s3.ListObjectsInput{
  1164. Bucket: aws.String(bucket),
  1165. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1166. }
  1167. lso, err := s3Svc.ListObjects(ls)
  1168. if err != nil {
  1169. return nil, err
  1170. }
  1171. lsoLen := len(lso.Contents)
  1172. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1173. if lsoLen == 0 {
  1174. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1175. }
  1176. lso2, err := s3Svc.ListObjects(ls2)
  1177. if err != nil {
  1178. return nil, err
  1179. }
  1180. lso2Len := len(lso2.Contents)
  1181. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1182. if lso2Len == 0 {
  1183. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1184. }
  1185. var keys []*string
  1186. for _, obj := range lso.Contents {
  1187. keys = append(keys, obj.Key)
  1188. }
  1189. for _, obj := range lso2.Contents {
  1190. keys = append(keys, obj.Key)
  1191. }
  1192. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1193. header, err := csvutil.Header(spotInfo{}, "csv")
  1194. if err != nil {
  1195. return nil, err
  1196. }
  1197. fieldsPerRecord := len(header)
  1198. spots := make(map[string]*spotInfo)
  1199. for _, key := range keys {
  1200. getObj := &s3.GetObjectInput{
  1201. Bucket: aws.String(bucket),
  1202. Key: key,
  1203. }
  1204. buf := aws.NewWriteAtBuffer([]byte{})
  1205. _, err := downloader.Download(buf, getObj)
  1206. if err != nil {
  1207. return nil, err
  1208. }
  1209. r := bytes.NewReader(buf.Bytes())
  1210. gr, err := gzip.NewReader(r)
  1211. if err != nil {
  1212. return nil, err
  1213. }
  1214. csvReader := csv.NewReader(gr)
  1215. csvReader.Comma = '\t'
  1216. csvReader.FieldsPerRecord = fieldsPerRecord
  1217. dec, err := csvutil.NewDecoder(csvReader, header...)
  1218. if err != nil {
  1219. return nil, err
  1220. }
  1221. var foundVersion string
  1222. for {
  1223. spot := spotInfo{}
  1224. err := dec.Decode(&spot)
  1225. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1226. if err == io.EOF {
  1227. break
  1228. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1229. rec := dec.Record()
  1230. // the first two "Record()" will be the comment lines
  1231. // and they show up as len() == 1
  1232. // the first of which is "#Version"
  1233. // the second of which is "#Fields: "
  1234. if len(rec) != 1 {
  1235. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1236. continue
  1237. }
  1238. if len(foundVersion) == 0 {
  1239. spotFeedVersion := rec[0]
  1240. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1241. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1242. if matches != nil {
  1243. foundVersion = matches[1]
  1244. if foundVersion != supportedSpotFeedVersion {
  1245. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1246. break
  1247. }
  1248. }
  1249. continue
  1250. } else if strings.Index(rec[0], "#") == 0 {
  1251. continue
  1252. } else {
  1253. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1254. continue
  1255. }
  1256. } else if err != nil {
  1257. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1258. continue
  1259. }
  1260. klog.V(4).Infof("Found spot info %+v", spot)
  1261. spots[spot.InstanceID] = &spot
  1262. }
  1263. gr.Close()
  1264. }
  1265. return spots, nil
  1266. }
  1267. func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1268. }
  1269. /*
  1270. func (aws *AWS) getReservedInstances() ([]interface{}, error) {
  1271. customPricing, err := a.GetConfig()
  1272. if err != nil {
  1273. return nil, err
  1274. }
  1275. if customPricing.ServiceKeyName != "" {
  1276. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1277. if err != nil {
  1278. return nil, err
  1279. }
  1280. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1281. if err != nil {
  1282. return nil, err
  1283. }
  1284. }
  1285. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1286. if err != nil {
  1287. return nil, err
  1288. }
  1289. defer athenaConfigs.Close()
  1290. b, err := ioutil.ReadAll(athenaConfigs)
  1291. if err != nil {
  1292. return nil, err
  1293. }
  1294. var athenaConf map[string]string
  1295. json.Unmarshal([]byte(b), &athenaConf)
  1296. region := aws.String(customPricing.AthenaRegion)
  1297. c := &aws.Config{
  1298. Region: region,
  1299. }
  1300. s := session.Must(session.NewSession(c))
  1301. svc := ec2.New(s)
  1302. svc.DescribeReservedInstances()
  1303. }
  1304. */