awsprovider.go 56 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "k8s.io/klog"
  18. "github.com/kubecost/cost-model/pkg/clustercache"
  19. "github.com/kubecost/cost-model/pkg/util"
  20. "github.com/aws/aws-sdk-go/aws"
  21. "github.com/aws/aws-sdk-go/aws/awserr"
  22. "github.com/aws/aws-sdk-go/aws/session"
  23. "github.com/aws/aws-sdk-go/service/athena"
  24. "github.com/aws/aws-sdk-go/service/ec2"
  25. "github.com/aws/aws-sdk-go/service/s3"
  26. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  27. "github.com/jszwec/csvutil"
  28. v1 "k8s.io/api/core/v1"
  29. )
  30. const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
  31. const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
  32. const awsReservedInstancePricePerHour = 0.0287
  33. const supportedSpotFeedVersion = "1"
  34. const SpotInfoUpdateType = "spotinfo"
  35. const AthenaInfoUpdateType = "athenainfo"
  36. // AWS represents an Amazon Provider
  37. type AWS struct {
  38. Pricing map[string]*AWSProductTerms
  39. SpotPricingByInstanceID map[string]*spotInfo
  40. RIPricingByInstanceID map[string]*RIData
  41. RIDataRunning bool
  42. RIDataLock sync.RWMutex
  43. ValidPricingKeys map[string]bool
  44. Clientset clustercache.ClusterCache
  45. BaseCPUPrice string
  46. BaseRAMPrice string
  47. BaseGPUPrice string
  48. BaseSpotCPUPrice string
  49. BaseSpotRAMPrice string
  50. SpotLabelName string
  51. SpotLabelValue string
  52. ServiceKeyName string
  53. ServiceKeySecret string
  54. SpotDataRegion string
  55. SpotDataBucket string
  56. SpotDataPrefix string
  57. ProjectID string
  58. DownloadPricingDataLock sync.RWMutex
  59. Config *ProviderConfig
  60. *CustomProvider
  61. }
  62. type AWSAccessKey struct {
  63. AccessKeyID string `json:"aws_access_key_id"`
  64. SecretAccessKey string `json:"aws_secret_access_key"`
  65. }
  66. // AWSPricing maps a k8s node to an AWS Pricing "product"
  67. type AWSPricing struct {
  68. Products map[string]*AWSProduct `json:"products"`
  69. Terms AWSPricingTerms `json:"terms"`
  70. }
  71. // AWSProduct represents a purchased SKU
  72. type AWSProduct struct {
  73. Sku string `json:"sku"`
  74. Attributes AWSProductAttributes `json:"attributes"`
  75. }
  76. // AWSProductAttributes represents metadata about the product used to map to a node.
  77. type AWSProductAttributes struct {
  78. Location string `json:"location"`
  79. InstanceType string `json:"instanceType"`
  80. Memory string `json:"memory"`
  81. Storage string `json:"storage"`
  82. VCpu string `json:"vcpu"`
  83. UsageType string `json:"usagetype"`
  84. OperatingSystem string `json:"operatingSystem"`
  85. PreInstalledSw string `json:"preInstalledSw"`
  86. InstanceFamily string `json:"instanceFamily"`
  87. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  88. }
  89. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  90. type AWSPricingTerms struct {
  91. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  92. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  93. }
  94. // AWSOfferTerm is a sku extension used to pay for the node.
  95. type AWSOfferTerm struct {
  96. Sku string `json:"sku"`
  97. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  98. }
  99. // AWSRateCode encodes data about the price of a product
  100. type AWSRateCode struct {
  101. Unit string `json:"unit"`
  102. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  103. }
  104. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  105. type AWSCurrencyCode struct {
  106. USD string `json:"USD"`
  107. }
  108. // AWSProductTerms represents the full terms of the product
  109. type AWSProductTerms struct {
  110. Sku string `json:"sku"`
  111. OnDemand *AWSOfferTerm `json:"OnDemand"`
  112. Reserved *AWSOfferTerm `json:"Reserved"`
  113. Memory string `json:"memory"`
  114. Storage string `json:"storage"`
  115. VCpu string `json:"vcpu"`
  116. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  117. PV *PV `json:"pv"`
  118. }
  119. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  120. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  121. // OnDemandRateCode is appended to an node sku
  122. const OnDemandRateCode = ".JRTCKXETXF"
  123. // ReservedRateCode is appended to a node sku
  124. const ReservedRateCode = ".38NPMPTW36"
  125. // HourlyRateCode is appended to a node sku
  126. const HourlyRateCode = ".6YS6EN2CT7"
  127. // volTypes are used to map between AWS UsageTypes and
  128. // EBS volume types, as they would appear in K8s storage class
  129. // name and the EC2 API.
  130. var volTypes = map[string]string{
  131. "EBS:VolumeUsage.gp2": "gp2",
  132. "EBS:VolumeUsage": "standard",
  133. "EBS:VolumeUsage.sc1": "sc1",
  134. "EBS:VolumeP-IOPS.piops": "io1",
  135. "EBS:VolumeUsage.st1": "st1",
  136. "EBS:VolumeUsage.piops": "io1",
  137. "gp2": "EBS:VolumeUsage.gp2",
  138. "standard": "EBS:VolumeUsage",
  139. "sc1": "EBS:VolumeUsage.sc1",
  140. "io1": "EBS:VolumeUsage.piops",
  141. "st1": "EBS:VolumeUsage.st1",
  142. }
  143. // locationToRegion maps AWS region names (As they come from Billing)
  144. // to actual region identifiers
  145. var locationToRegion = map[string]string{
  146. "US East (Ohio)": "us-east-2",
  147. "US East (N. Virginia)": "us-east-1",
  148. "US West (N. California)": "us-west-1",
  149. "US West (Oregon)": "us-west-2",
  150. "Asia Pacific (Hong Kong)": "ap-east-1",
  151. "Asia Pacific (Mumbai)": "ap-south-1",
  152. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  153. "Asia Pacific (Seoul)": "ap-northeast-2",
  154. "Asia Pacific (Singapore)": "ap-southeast-1",
  155. "Asia Pacific (Sydney)": "ap-southeast-2",
  156. "Asia Pacific (Tokyo)": "ap-northeast-1",
  157. "Canada (Central)": "ca-central-1",
  158. "China (Beijing)": "cn-north-1",
  159. "China (Ningxia)": "cn-northwest-1",
  160. "EU (Frankfurt)": "eu-central-1",
  161. "EU (Ireland)": "eu-west-1",
  162. "EU (London)": "eu-west-2",
  163. "EU (Paris)": "eu-west-3",
  164. "EU (Stockholm)": "eu-north-1",
  165. "South America (Sao Paulo)": "sa-east-1",
  166. "AWS GovCloud (US-East)": "us-gov-east-1",
  167. "AWS GovCloud (US)": "us-gov-west-1",
  168. }
  169. var regionToBillingRegionCode = map[string]string{
  170. "us-east-2": "USE2",
  171. "us-east-1": "",
  172. "us-west-1": "USW1",
  173. "us-west-2": "USW2",
  174. "ap-east-1": "APE1",
  175. "ap-south-1": "APS3",
  176. "ap-northeast-3": "APN3",
  177. "ap-northeast-2": "APN2",
  178. "ap-southeast-1": "APS1",
  179. "ap-southeast-2": "APS2",
  180. "ap-northeast-1": "APN1",
  181. "ca-central-1": "CAN1",
  182. "cn-north-1": "",
  183. "cn-northwest-1": "",
  184. "eu-central-1": "EUC1",
  185. "eu-west-1": "EU",
  186. "eu-west-2": "EUW2",
  187. "eu-west-3": "EUW3",
  188. "eu-north-1": "EUN1",
  189. "sa-east-1": "SAE1",
  190. "us-gov-east-1": "UGE1",
  191. "us-gov-west-1": "UGW1",
  192. }
  193. var loadedAWSSecret bool = false
  194. var awsSecret *AWSAccessKey = nil
  195. func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
  196. return ""
  197. }
  198. // KubeAttrConversion maps the k8s labels for region to an aws region
  199. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  200. operatingSystem = strings.ToLower(operatingSystem)
  201. region := locationToRegion[location]
  202. return region + "," + instanceType + "," + operatingSystem
  203. }
  204. type AwsSpotFeedInfo struct {
  205. BucketName string `json:"bucketName"`
  206. Prefix string `json:"prefix"`
  207. Region string `json:"region"`
  208. AccountID string `json:"projectID"`
  209. ServiceKeyName string `json:"serviceKeyName"`
  210. ServiceKeySecret string `json:"serviceKeySecret"`
  211. SpotLabel string `json:"spotLabel"`
  212. SpotLabelValue string `json:"spotLabelValue"`
  213. }
  214. type AwsAthenaInfo struct {
  215. AthenaBucketName string `json:"athenaBucketName"`
  216. AthenaRegion string `json:"athenaRegion"`
  217. AthenaDatabase string `json:"athenaDatabase"`
  218. AthenaTable string `json:"athenaTable"`
  219. ServiceKeyName string `json:"serviceKeyName"`
  220. ServiceKeySecret string `json:"serviceKeySecret"`
  221. AccountID string `json:"projectID"`
  222. }
  223. func (aws *AWS) GetManagementPlatform() (string, error) {
  224. nodes := aws.Clientset.GetAllNodes()
  225. if len(nodes) > 0 {
  226. n := nodes[0]
  227. version := n.Status.NodeInfo.KubeletVersion
  228. if strings.Contains(version, "eks") {
  229. return "eks", nil
  230. }
  231. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  232. return "kops", nil
  233. }
  234. }
  235. return "", nil
  236. }
  237. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  238. c, err := aws.Config.GetCustomPricingData()
  239. if c.Discount == "" {
  240. c.Discount = "0%"
  241. }
  242. if c.NegotiatedDiscount == "" {
  243. c.NegotiatedDiscount = "0%"
  244. }
  245. if err != nil {
  246. return nil, err
  247. }
  248. return c, nil
  249. }
  250. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  251. return aws.Config.UpdateFromMap(a)
  252. }
  253. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  254. return aws.Config.Update(func(c *CustomPricing) error {
  255. if updateType == SpotInfoUpdateType {
  256. a := AwsSpotFeedInfo{}
  257. err := json.NewDecoder(r).Decode(&a)
  258. if err != nil {
  259. return err
  260. }
  261. c.ServiceKeyName = a.ServiceKeyName
  262. c.ServiceKeySecret = a.ServiceKeySecret
  263. c.SpotDataPrefix = a.Prefix
  264. c.SpotDataBucket = a.BucketName
  265. c.ProjectID = a.AccountID
  266. c.SpotDataRegion = a.Region
  267. c.SpotLabel = a.SpotLabel
  268. c.SpotLabelValue = a.SpotLabelValue
  269. } else if updateType == AthenaInfoUpdateType {
  270. a := AwsAthenaInfo{}
  271. err := json.NewDecoder(r).Decode(&a)
  272. if err != nil {
  273. return err
  274. }
  275. c.AthenaBucketName = a.AthenaBucketName
  276. c.AthenaRegion = a.AthenaRegion
  277. c.AthenaDatabase = a.AthenaDatabase
  278. c.AthenaTable = a.AthenaTable
  279. c.ServiceKeyName = a.ServiceKeyName
  280. c.ServiceKeySecret = a.ServiceKeySecret
  281. c.AthenaProjectID = a.AccountID
  282. } else {
  283. a := make(map[string]interface{})
  284. err := json.NewDecoder(r).Decode(&a)
  285. if err != nil {
  286. return err
  287. }
  288. for k, v := range a {
  289. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  290. vstr, ok := v.(string)
  291. if ok {
  292. err := SetCustomPricingField(c, kUpper, vstr)
  293. if err != nil {
  294. return err
  295. }
  296. } else {
  297. sci := v.(map[string]interface{})
  298. sc := make(map[string]string)
  299. for k, val := range sci {
  300. sc[k] = val.(string)
  301. }
  302. c.SharedCosts = sc //todo: support reflection/multiple map fields
  303. }
  304. }
  305. }
  306. remoteEnabled := os.Getenv(remoteEnabled)
  307. if remoteEnabled == "true" {
  308. err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
  309. if err != nil {
  310. return err
  311. }
  312. }
  313. return nil
  314. })
  315. }
  316. type awsKey struct {
  317. SpotLabelName string
  318. SpotLabelValue string
  319. Labels map[string]string
  320. ProviderID string
  321. }
  322. func (k *awsKey) GPUType() string {
  323. return ""
  324. }
  325. func (k *awsKey) ID() string {
  326. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  327. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  328. if matchNum == 2 {
  329. return group
  330. }
  331. }
  332. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  333. return ""
  334. }
  335. func (k *awsKey) Features() string {
  336. instanceType := k.Labels[v1.LabelInstanceType]
  337. var operatingSystem string
  338. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  339. if !ok {
  340. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  341. }
  342. region := k.Labels[v1.LabelZoneRegion]
  343. key := region + "," + instanceType + "," + operatingSystem
  344. usageType := "preemptible"
  345. spotKey := key + "," + usageType
  346. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  347. return spotKey
  348. }
  349. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  350. return spotKey
  351. }
  352. return key
  353. }
  354. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  355. pricing, ok := aws.Pricing[pvk.Features()]
  356. if !ok {
  357. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  358. return &PV{}, nil
  359. }
  360. return pricing.PV, nil
  361. }
  362. type awsPVKey struct {
  363. Labels map[string]string
  364. StorageClassParameters map[string]string
  365. StorageClassName string
  366. Name string
  367. DefaultRegion string
  368. }
  369. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  370. return &awsPVKey{
  371. Labels: pv.Labels,
  372. StorageClassName: pv.Spec.StorageClassName,
  373. StorageClassParameters: parameters,
  374. Name: pv.Name,
  375. DefaultRegion: defaultRegion,
  376. }
  377. }
  378. func (key *awsPVKey) GetStorageClass() string {
  379. return key.StorageClassName
  380. }
  381. func (key *awsPVKey) Features() string {
  382. storageClass := key.StorageClassParameters["type"]
  383. if storageClass == "standard" {
  384. storageClass = "gp2"
  385. }
  386. // Storage class names are generally EBS volume types (gp2)
  387. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  388. // Converts between the 2
  389. region := key.Labels[v1.LabelZoneRegion]
  390. //if region == "" {
  391. // region = "us-east-1"
  392. //}
  393. class, ok := volTypes[storageClass]
  394. if !ok {
  395. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  396. }
  397. return region + "," + class
  398. }
  399. // GetKey maps node labels to information needed to retrieve pricing data
  400. func (aws *AWS) GetKey(labels map[string]string) Key {
  401. return &awsKey{
  402. SpotLabelName: aws.SpotLabelName,
  403. SpotLabelValue: aws.SpotLabelValue,
  404. Labels: labels,
  405. ProviderID: labels["providerID"],
  406. }
  407. }
  408. func (aws *AWS) isPreemptible(key string) bool {
  409. s := strings.Split(key, ",")
  410. if len(s) == 4 && s[3] == "preemptible" {
  411. return true
  412. }
  413. return false
  414. }
  415. // DownloadPricingData fetches data from the AWS Pricing API
  416. func (aws *AWS) DownloadPricingData() error {
  417. aws.DownloadPricingDataLock.Lock()
  418. defer aws.DownloadPricingDataLock.Unlock()
  419. c, err := aws.Config.GetCustomPricingData()
  420. if err != nil {
  421. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  422. }
  423. aws.BaseCPUPrice = c.CPU
  424. aws.BaseRAMPrice = c.RAM
  425. aws.BaseGPUPrice = c.GPU
  426. aws.BaseSpotCPUPrice = c.SpotCPU
  427. aws.BaseSpotRAMPrice = c.SpotRAM
  428. aws.SpotLabelName = c.SpotLabel
  429. aws.SpotLabelValue = c.SpotLabelValue
  430. aws.SpotDataBucket = c.SpotDataBucket
  431. aws.SpotDataPrefix = c.SpotDataPrefix
  432. aws.ProjectID = c.ProjectID
  433. aws.SpotDataRegion = c.SpotDataRegion
  434. skn, sks := aws.getAWSAuth(false, c)
  435. aws.ServiceKeyName = skn
  436. aws.ServiceKeySecret = sks
  437. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  438. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  439. }
  440. nodeList := aws.Clientset.GetAllNodes()
  441. inputkeys := make(map[string]bool)
  442. for _, n := range nodeList {
  443. labels := n.GetObjectMeta().GetLabels()
  444. key := aws.GetKey(labels)
  445. inputkeys[key.Features()] = true
  446. }
  447. pvList := aws.Clientset.GetAllPersistentVolumes()
  448. storageClasses := aws.Clientset.GetAllStorageClasses()
  449. storageClassMap := make(map[string]map[string]string)
  450. for _, storageClass := range storageClasses {
  451. params := storageClass.Parameters
  452. storageClassMap[storageClass.ObjectMeta.Name] = params
  453. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  454. storageClassMap["default"] = params
  455. storageClassMap[""] = params
  456. }
  457. }
  458. pvkeys := make(map[string]PVKey)
  459. for _, pv := range pvList {
  460. params, ok := storageClassMap[pv.Spec.StorageClassName]
  461. if !ok {
  462. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  463. continue
  464. }
  465. key := aws.GetPVKey(pv, params, "")
  466. pvkeys[key.Features()] = key
  467. }
  468. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  469. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  470. if err != nil {
  471. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  472. } else { // If we make one successful run, check on new reservation data every hour
  473. go func() {
  474. for {
  475. aws.RIDataRunning = true
  476. klog.Infof("Reserved Instance watcher running... next update in 1h")
  477. time.Sleep(time.Hour)
  478. err := aws.GetReservationDataFromAthena()
  479. if err != nil {
  480. klog.Infof("Error updating RI data: %s", err.Error())
  481. }
  482. }
  483. }()
  484. }
  485. }
  486. aws.Pricing = make(map[string]*AWSProductTerms)
  487. aws.ValidPricingKeys = make(map[string]bool)
  488. skusToKeys := make(map[string]string)
  489. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  490. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  491. resp, err := http.Get(pricingURL)
  492. if err != nil {
  493. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  494. return err
  495. }
  496. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  497. dec := json.NewDecoder(resp.Body)
  498. for {
  499. t, err := dec.Token()
  500. if err == io.EOF {
  501. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  502. break
  503. }
  504. if t == "products" {
  505. _, err := dec.Token() // this should parse the opening "{""
  506. if err != nil {
  507. return err
  508. }
  509. for dec.More() {
  510. _, err := dec.Token() // the sku token
  511. if err != nil {
  512. return err
  513. }
  514. product := &AWSProduct{}
  515. err = dec.Decode(&product)
  516. if err != nil {
  517. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  518. break
  519. }
  520. if product.Attributes.PreInstalledSw == "NA" &&
  521. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  522. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  523. spotKey := key + ",preemptible"
  524. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  525. productTerms := &AWSProductTerms{
  526. Sku: product.Sku,
  527. Memory: product.Attributes.Memory,
  528. Storage: product.Attributes.Storage,
  529. VCpu: product.Attributes.VCpu,
  530. GPU: product.Attributes.GPU,
  531. }
  532. aws.Pricing[key] = productTerms
  533. aws.Pricing[spotKey] = productTerms
  534. skusToKeys[product.Sku] = key
  535. }
  536. aws.ValidPricingKeys[key] = true
  537. aws.ValidPricingKeys[spotKey] = true
  538. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  539. // UsageTypes may be prefixed with a region code - we're removing this when using
  540. // volTypes to keep lookups generic
  541. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  542. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  543. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  544. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  545. spotKey := key + ",preemptible"
  546. pv := &PV{
  547. Class: volTypes[usageTypeNoRegion],
  548. Region: locationToRegion[product.Attributes.Location],
  549. }
  550. productTerms := &AWSProductTerms{
  551. Sku: product.Sku,
  552. PV: pv,
  553. }
  554. aws.Pricing[key] = productTerms
  555. aws.Pricing[spotKey] = productTerms
  556. skusToKeys[product.Sku] = key
  557. aws.ValidPricingKeys[key] = true
  558. aws.ValidPricingKeys[spotKey] = true
  559. }
  560. }
  561. }
  562. if t == "terms" {
  563. _, err := dec.Token() // this should parse the opening "{""
  564. if err != nil {
  565. return err
  566. }
  567. termType, err := dec.Token()
  568. if err != nil {
  569. return err
  570. }
  571. if termType == "OnDemand" {
  572. _, err := dec.Token()
  573. if err != nil { // again, should parse an opening "{"
  574. return err
  575. }
  576. for dec.More() {
  577. sku, err := dec.Token()
  578. if err != nil {
  579. return err
  580. }
  581. _, err = dec.Token() // another opening "{"
  582. if err != nil {
  583. return err
  584. }
  585. skuOnDemand, err := dec.Token()
  586. if err != nil {
  587. return err
  588. }
  589. offerTerm := &AWSOfferTerm{}
  590. err = dec.Decode(&offerTerm)
  591. if err != nil {
  592. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  593. }
  594. if sku.(string)+OnDemandRateCode == skuOnDemand {
  595. key, ok := skusToKeys[sku.(string)]
  596. spotKey := key + ",preemptible"
  597. if ok {
  598. aws.Pricing[key].OnDemand = offerTerm
  599. aws.Pricing[spotKey].OnDemand = offerTerm
  600. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  601. // If the specific UsageType is the per IO cost used on io1 volumes
  602. // we need to add the per IO cost to the io1 PV cost
  603. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  604. // Add the per IO cost to the PV object for the io1 volume type
  605. aws.Pricing[key].PV.CostPerIO = cost
  606. } else if strings.Contains(key, "EBS:Volume") {
  607. // If volume, we need to get hourly cost and add it to the PV object
  608. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  609. costFloat, _ := strconv.ParseFloat(cost, 64)
  610. hourlyPrice := costFloat / 730
  611. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  612. }
  613. }
  614. }
  615. _, err = dec.Token()
  616. if err != nil {
  617. return err
  618. }
  619. }
  620. _, err = dec.Token()
  621. if err != nil {
  622. return err
  623. }
  624. }
  625. }
  626. }
  627. sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  628. if err != nil {
  629. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  630. } else {
  631. aws.SpotPricingByInstanceID = sp
  632. }
  633. return nil
  634. }
  635. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  636. func (aws *AWS) NetworkPricing() (*Network, error) {
  637. cpricing, err := aws.Config.GetCustomPricingData()
  638. if err != nil {
  639. return nil, err
  640. }
  641. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  642. if err != nil {
  643. return nil, err
  644. }
  645. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  646. if err != nil {
  647. return nil, err
  648. }
  649. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  650. if err != nil {
  651. return nil, err
  652. }
  653. return &Network{
  654. ZoneNetworkEgressCost: znec,
  655. RegionNetworkEgressCost: rnec,
  656. InternetNetworkEgressCost: inec,
  657. }, nil
  658. }
  659. // AllNodePricing returns all the billing data fetched.
  660. func (aws *AWS) AllNodePricing() (interface{}, error) {
  661. aws.DownloadPricingDataLock.RLock()
  662. defer aws.DownloadPricingDataLock.RUnlock()
  663. return aws.Pricing, nil
  664. }
  665. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  666. key := k.Features()
  667. aws.RIDataLock.RLock()
  668. defer aws.RIDataLock.RUnlock()
  669. if aws.isPreemptible(key) {
  670. if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
  671. var spotcost string
  672. arr := strings.Split(spotInfo.Charge, " ")
  673. if len(arr) == 2 {
  674. spotcost = arr[0]
  675. } else {
  676. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  677. }
  678. return &Node{
  679. Cost: spotcost,
  680. VCPU: terms.VCpu,
  681. RAM: terms.Memory,
  682. GPU: terms.GPU,
  683. Storage: terms.Storage,
  684. BaseCPUPrice: aws.BaseCPUPrice,
  685. BaseRAMPrice: aws.BaseRAMPrice,
  686. BaseGPUPrice: aws.BaseGPUPrice,
  687. UsageType: usageType,
  688. }, nil
  689. }
  690. return &Node{
  691. VCPU: terms.VCpu,
  692. VCPUCost: aws.BaseSpotCPUPrice,
  693. RAM: terms.Memory,
  694. GPU: terms.GPU,
  695. RAMCost: aws.BaseSpotRAMPrice,
  696. Storage: terms.Storage,
  697. BaseCPUPrice: aws.BaseCPUPrice,
  698. BaseRAMPrice: aws.BaseRAMPrice,
  699. BaseGPUPrice: aws.BaseGPUPrice,
  700. UsageType: usageType,
  701. }, nil
  702. } else if ri, ok := aws.RIPricingByInstanceID[k.ID()]; ok {
  703. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  704. return &Node{
  705. Cost: strCost,
  706. VCPU: terms.VCpu,
  707. RAM: terms.Memory,
  708. GPU: terms.GPU,
  709. Storage: terms.Storage,
  710. BaseCPUPrice: aws.BaseCPUPrice,
  711. BaseRAMPrice: aws.BaseRAMPrice,
  712. BaseGPUPrice: aws.BaseGPUPrice,
  713. UsageType: usageType,
  714. }, nil
  715. }
  716. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  717. if !ok {
  718. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  719. }
  720. cost := c.PricePerUnit.USD
  721. return &Node{
  722. Cost: cost,
  723. VCPU: terms.VCpu,
  724. RAM: terms.Memory,
  725. GPU: terms.GPU,
  726. Storage: terms.Storage,
  727. BaseCPUPrice: aws.BaseCPUPrice,
  728. BaseRAMPrice: aws.BaseRAMPrice,
  729. BaseGPUPrice: aws.BaseGPUPrice,
  730. UsageType: usageType,
  731. }, nil
  732. }
  733. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  734. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  735. aws.DownloadPricingDataLock.RLock()
  736. defer aws.DownloadPricingDataLock.RUnlock()
  737. key := k.Features()
  738. usageType := "ondemand"
  739. if aws.isPreemptible(key) {
  740. usageType = "preemptible"
  741. }
  742. terms, ok := aws.Pricing[key]
  743. if ok {
  744. return aws.createNode(terms, usageType, k)
  745. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  746. aws.DownloadPricingDataLock.RUnlock()
  747. err := aws.DownloadPricingData()
  748. aws.DownloadPricingDataLock.RLock()
  749. if err != nil {
  750. return &Node{
  751. Cost: aws.BaseCPUPrice,
  752. BaseCPUPrice: aws.BaseCPUPrice,
  753. BaseRAMPrice: aws.BaseRAMPrice,
  754. BaseGPUPrice: aws.BaseGPUPrice,
  755. UsageType: usageType,
  756. UsesBaseCPUPrice: true,
  757. }, err
  758. }
  759. terms, termsOk := aws.Pricing[key]
  760. if !termsOk {
  761. return &Node{
  762. Cost: aws.BaseCPUPrice,
  763. BaseCPUPrice: aws.BaseCPUPrice,
  764. BaseRAMPrice: aws.BaseRAMPrice,
  765. BaseGPUPrice: aws.BaseGPUPrice,
  766. UsageType: usageType,
  767. UsesBaseCPUPrice: true,
  768. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  769. }
  770. return aws.createNode(terms, usageType, k)
  771. } else { // Fall back to base pricing if we can't find the key.
  772. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  773. return &Node{
  774. Cost: aws.BaseCPUPrice,
  775. BaseCPUPrice: aws.BaseCPUPrice,
  776. BaseRAMPrice: aws.BaseRAMPrice,
  777. BaseGPUPrice: aws.BaseGPUPrice,
  778. UsageType: usageType,
  779. UsesBaseCPUPrice: true,
  780. }, nil
  781. }
  782. }
  783. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  784. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  785. defaultClusterName := "AWS Cluster #1"
  786. c, err := awsProvider.GetConfig()
  787. if err != nil {
  788. return nil, err
  789. }
  790. remote := os.Getenv(remoteEnabled)
  791. remoteEnabled := false
  792. if os.Getenv(remote) == "true" {
  793. remoteEnabled = true
  794. }
  795. if c.ClusterName != "" {
  796. m := make(map[string]string)
  797. m["name"] = c.ClusterName
  798. m["provider"] = "AWS"
  799. m["id"] = os.Getenv(clusterIDKey)
  800. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  801. return m, nil
  802. }
  803. makeStructure := func(clusterName string) (map[string]string, error) {
  804. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  805. m := make(map[string]string)
  806. m["name"] = clusterName
  807. m["provider"] = "AWS"
  808. m["id"] = os.Getenv(clusterIDKey)
  809. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  810. return m, nil
  811. }
  812. maybeClusterId := os.Getenv(ClusterIdEnvVar)
  813. if len(maybeClusterId) != 0 {
  814. return makeStructure(maybeClusterId)
  815. }
  816. // TODO: This should be cached, it can take a long time to hit the API
  817. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  818. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  819. //klog.Infof("nodelist get here %s", time.Now())
  820. //nodeList := awsProvider.Clientset.GetAllNodes()
  821. //klog.Infof("nodelist done here %s", time.Now())
  822. /*for _, n := range nodeList {
  823. region := ""
  824. instanceId := ""
  825. providerId := n.Spec.ProviderID
  826. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  827. if matchNum == 1 {
  828. region = group
  829. } else if matchNum == 2 {
  830. instanceId = group
  831. }
  832. }
  833. if len(instanceId) == 0 {
  834. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  835. continue
  836. }
  837. c := &aws.Config{
  838. Region: aws.String(region),
  839. }
  840. s := session.Must(session.NewSession(c))
  841. ec2Svc := ec2.New(s)
  842. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  843. InstanceIds: []*string{
  844. aws.String(instanceId),
  845. },
  846. })
  847. if diErr != nil {
  848. klog.Infof("Error describing instances: %s", diErr)
  849. continue
  850. }
  851. if len(di.Reservations) != 1 {
  852. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  853. continue
  854. }
  855. res := di.Reservations[0]
  856. if len(res.Instances) != 1 {
  857. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  858. continue
  859. }
  860. inst := res.Instances[0]
  861. for _, tag := range inst.Tags {
  862. tagKey := *tag.Key
  863. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  864. if matchNum != 1 {
  865. continue
  866. }
  867. return makeStructure(group)
  868. }
  869. }
  870. }*/
  871. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
  872. return makeStructure(defaultClusterName)
  873. }
  874. // Gets the aws key id and secret
  875. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  876. // 1. Check config values first (set from frontend UI)
  877. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  878. return cp.ServiceKeyName, cp.ServiceKeySecret
  879. }
  880. // 2. Check for secret
  881. s, _ := aws.loadAWSAuthSecret(forceReload)
  882. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  883. return s.AccessKeyID, s.SecretAccessKey
  884. }
  885. // 3. Fall back to env vars
  886. return os.Getenv(awsAccessKeyIDEnvVar), os.Getenv(awsAccessKeySecretEnvVar)
  887. }
  888. // Load once and cache the result (even on failure). This is an install time secret, so
  889. // we don't expect the secret to change. If it does, however, we can force reload using
  890. // the input parameter.
  891. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  892. if !force && loadedAWSSecret {
  893. return awsSecret, nil
  894. }
  895. loadedAWSSecret = true
  896. exists, err := util.FileExists(authSecretPath)
  897. if !exists || err != nil {
  898. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  899. }
  900. result, err := ioutil.ReadFile(authSecretPath)
  901. if err != nil {
  902. return nil, err
  903. }
  904. var ak AWSAccessKey
  905. err = json.Unmarshal(result, &ak)
  906. if err != nil {
  907. return nil, err
  908. }
  909. awsSecret = &ak
  910. return awsSecret, nil
  911. }
  912. func (aws *AWS) configureAWSAuth() error {
  913. accessKeyID := aws.ServiceKeyName
  914. accessKeySecret := aws.ServiceKeySecret
  915. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  916. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  917. if err != nil {
  918. return err
  919. }
  920. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  921. if err != nil {
  922. return err
  923. }
  924. }
  925. return nil
  926. }
  927. func getClusterConfig(ccFile string) (map[string]string, error) {
  928. clusterConfig, err := os.Open(ccFile)
  929. if err != nil {
  930. return nil, err
  931. }
  932. defer clusterConfig.Close()
  933. b, err := ioutil.ReadAll(clusterConfig)
  934. if err != nil {
  935. return nil, err
  936. }
  937. var clusterConf map[string]string
  938. err = json.Unmarshal([]byte(b), &clusterConf)
  939. if err != nil {
  940. return nil, err
  941. }
  942. return clusterConf, nil
  943. }
  944. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  945. func (a *AWS) GetDisks() ([]byte, error) {
  946. err := a.configureAWSAuth()
  947. if err != nil {
  948. return nil, err
  949. }
  950. clusterConfig, err := getClusterConfig("/var/configs/cluster.json")
  951. if err != nil {
  952. return nil, err
  953. }
  954. region := aws.String(clusterConfig["region"])
  955. c := &aws.Config{
  956. Region: region,
  957. }
  958. s := session.Must(session.NewSession(c))
  959. ec2Svc := ec2.New(s)
  960. input := &ec2.DescribeVolumesInput{}
  961. volumeResult, err := ec2Svc.DescribeVolumes(input)
  962. if err != nil {
  963. if aerr, ok := err.(awserr.Error); ok {
  964. switch aerr.Code() {
  965. default:
  966. return nil, aerr
  967. }
  968. } else {
  969. return nil, err
  970. }
  971. }
  972. return json.Marshal(volumeResult)
  973. }
  974. // ConvertToGlueColumnFormat takes a string and runs through various regex
  975. // and string replacement statements to convert it to a format compatible
  976. // with AWS Glue and Athena column names.
  977. // Following guidance from AWS provided here ('Column Names' section):
  978. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  979. // It returns a string containing the column name in proper column name format and length.
  980. func ConvertToGlueColumnFormat(column_name string) string {
  981. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  982. // An underscore is added in front of uppercase letters
  983. capital_underscore := regexp.MustCompile(`[A-Z]`)
  984. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  985. // Any non-alphanumeric characters are replaced with an underscore
  986. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  987. final = no_space_punc.ReplaceAllString(final, "_")
  988. // Duplicate underscores are removed
  989. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  990. final = no_dup_underscore.ReplaceAllString(final, "_")
  991. // Any leading and trailing underscores are removed
  992. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  993. final = no_front_end_underscore.ReplaceAllString(final, "")
  994. // Uppercase to lowercase
  995. final = strings.ToLower(final)
  996. // Longer column name than expected - remove _ left to right
  997. allowed_col_len := 128
  998. undersc_to_remove := len(final) - allowed_col_len
  999. if undersc_to_remove > 0 {
  1000. final = strings.Replace(final, "_", "", undersc_to_remove)
  1001. }
  1002. // If removing all of the underscores still didn't
  1003. // make the column name < 128 characters, trim it!
  1004. if len(final) > allowed_col_len {
  1005. final = final[:allowed_col_len]
  1006. }
  1007. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  1008. return final
  1009. }
  1010. func generateAWSGroupBy(lastIdx int) string {
  1011. sequence := []string{}
  1012. for i := 1; i < lastIdx+1; i++ {
  1013. sequence = append(sequence, strconv.Itoa(i))
  1014. }
  1015. return strings.Join(sequence, ",")
  1016. }
  1017. func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
  1018. customPricing, err := a.GetConfig()
  1019. if err != nil {
  1020. return nil, err
  1021. }
  1022. if customPricing.ServiceKeyName != "" {
  1023. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1024. if err != nil {
  1025. return nil, err
  1026. }
  1027. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1028. if err != nil {
  1029. return nil, err
  1030. }
  1031. }
  1032. region := aws.String(customPricing.AthenaRegion)
  1033. resultsBucket := customPricing.AthenaBucketName
  1034. database := customPricing.AthenaDatabase
  1035. c := &aws.Config{
  1036. Region: region,
  1037. }
  1038. s := session.Must(session.NewSession(c))
  1039. svc := athena.New(s)
  1040. var e athena.StartQueryExecutionInput
  1041. var r athena.ResultConfiguration
  1042. r.SetOutputLocation(resultsBucket)
  1043. e.SetResultConfiguration(&r)
  1044. e.SetQueryString(query)
  1045. var q athena.QueryExecutionContext
  1046. q.SetDatabase(database)
  1047. e.SetQueryExecutionContext(&q)
  1048. res, err := svc.StartQueryExecution(&e)
  1049. if err != nil {
  1050. return nil, err
  1051. }
  1052. klog.V(2).Infof("StartQueryExecution result:")
  1053. klog.V(2).Infof(res.GoString())
  1054. var qri athena.GetQueryExecutionInput
  1055. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1056. var qrop *athena.GetQueryExecutionOutput
  1057. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1058. for {
  1059. qrop, err = svc.GetQueryExecution(&qri)
  1060. if err != nil {
  1061. return nil, err
  1062. }
  1063. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1064. break
  1065. }
  1066. time.Sleep(duration)
  1067. }
  1068. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1069. var ip athena.GetQueryResultsInput
  1070. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1071. return svc.GetQueryResults(&ip)
  1072. } else {
  1073. return nil, fmt.Errorf("No results available for %s", query)
  1074. }
  1075. }
  1076. type RIData struct {
  1077. ResourceID string
  1078. EffectiveCost float64
  1079. ReservationARN string
  1080. MostRecentDate string
  1081. }
  1082. func (a *AWS) GetReservationDataFromAthena() error {
  1083. cfg, err := a.GetConfig()
  1084. if err != nil {
  1085. return err
  1086. }
  1087. if cfg.AthenaBucketName == "" {
  1088. return fmt.Errorf("No Athena Bucket configured")
  1089. }
  1090. if a.RIPricingByInstanceID == nil {
  1091. a.RIPricingByInstanceID = make(map[string]*RIData)
  1092. }
  1093. tNow := time.Now()
  1094. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1095. start := tOneDayAgo.Format("2006-01-02")
  1096. end := tNow.Format("2006-01-02")
  1097. q := `SELECT
  1098. line_item_usage_start_date,
  1099. reservation_reservation_a_r_n,
  1100. line_item_resource_id,
  1101. reservation_effective_cost
  1102. FROM %s as cost_data
  1103. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1104. AND reservation_reservation_a_r_n <> '' ORDER BY
  1105. line_item_usage_start_date DESC`
  1106. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1107. op, err := a.QueryAthenaBillingData(query)
  1108. if err != nil {
  1109. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1110. }
  1111. klog.Infof("Fetching RI data...")
  1112. if len(op.ResultSet.Rows) > 1 {
  1113. a.RIDataLock.Lock()
  1114. mostRecentDate := ""
  1115. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1116. d := *r.Data[0].VarCharValue
  1117. if mostRecentDate == "" {
  1118. mostRecentDate = d
  1119. } else if mostRecentDate != d { // Get all most recent assignments
  1120. break
  1121. }
  1122. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1123. if err != nil {
  1124. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1125. }
  1126. r := &RIData{
  1127. ResourceID: *r.Data[2].VarCharValue,
  1128. EffectiveCost: cost,
  1129. ReservationARN: *r.Data[1].VarCharValue,
  1130. MostRecentDate: d,
  1131. }
  1132. a.RIPricingByInstanceID[r.ResourceID] = r
  1133. }
  1134. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1135. for k, r := range a.RIPricingByInstanceID {
  1136. klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1137. }
  1138. a.RIDataLock.Unlock()
  1139. } else {
  1140. klog.Infof("No reserved instance data found")
  1141. }
  1142. return nil
  1143. }
  1144. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  1145. // "start" and "end" are dates of the format YYYY-MM-DD
  1146. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  1147. func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
  1148. customPricing, err := a.GetConfig()
  1149. if err != nil {
  1150. return nil, err
  1151. }
  1152. formattedAggregators := []string{}
  1153. for _, agg := range aggregators {
  1154. aggregator_column_name := "resource_tags_user_" + agg
  1155. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  1156. formattedAggregators = append(formattedAggregators, aggregator_column_name)
  1157. }
  1158. aggregatorNames := strings.Join(formattedAggregators, ",")
  1159. filter_column_name := "resource_tags_user_" + filterType
  1160. filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
  1161. var query string
  1162. var lastIdx int
  1163. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1164. lastIdx = len(formattedAggregators) + 3
  1165. groupby := generateAWSGroupBy(lastIdx)
  1166. query = fmt.Sprintf(`SELECT
  1167. CAST(line_item_usage_start_date AS DATE) as start_date,
  1168. %s,
  1169. line_item_product_code,
  1170. %s,
  1171. SUM(line_item_blended_cost) as blended_cost
  1172. FROM %s as cost_data
  1173. WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1174. GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, groupby)
  1175. } else {
  1176. lastIdx = len(formattedAggregators) + 2
  1177. groupby := generateAWSGroupBy(lastIdx)
  1178. query = fmt.Sprintf(`SELECT
  1179. CAST(line_item_usage_start_date AS DATE) as start_date,
  1180. %s,
  1181. line_item_product_code,
  1182. SUM(line_item_blended_cost) as blended_cost
  1183. FROM %s as cost_data
  1184. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1185. GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, groupby)
  1186. }
  1187. klog.V(3).Infof("Running Query: %s", query)
  1188. if customPricing.ServiceKeyName != "" {
  1189. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1190. if err != nil {
  1191. return nil, err
  1192. }
  1193. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1194. if err != nil {
  1195. return nil, err
  1196. }
  1197. }
  1198. region := aws.String(customPricing.AthenaRegion)
  1199. resultsBucket := customPricing.AthenaBucketName
  1200. database := customPricing.AthenaDatabase
  1201. c := &aws.Config{
  1202. Region: region,
  1203. }
  1204. s := session.Must(session.NewSession(c))
  1205. svc := athena.New(s)
  1206. var e athena.StartQueryExecutionInput
  1207. var r athena.ResultConfiguration
  1208. r.SetOutputLocation(resultsBucket)
  1209. e.SetResultConfiguration(&r)
  1210. e.SetQueryString(query)
  1211. var q athena.QueryExecutionContext
  1212. q.SetDatabase(database)
  1213. e.SetQueryExecutionContext(&q)
  1214. res, err := svc.StartQueryExecution(&e)
  1215. if err != nil {
  1216. return nil, err
  1217. }
  1218. klog.V(2).Infof("StartQueryExecution result:")
  1219. klog.V(2).Infof(res.GoString())
  1220. var qri athena.GetQueryExecutionInput
  1221. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1222. var qrop *athena.GetQueryExecutionOutput
  1223. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1224. for {
  1225. qrop, err = svc.GetQueryExecution(&qri)
  1226. if err != nil {
  1227. return nil, err
  1228. }
  1229. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1230. break
  1231. }
  1232. time.Sleep(duration)
  1233. }
  1234. var oocAllocs []*OutOfClusterAllocation
  1235. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1236. var ip athena.GetQueryResultsInput
  1237. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1238. op, err := svc.GetQueryResults(&ip)
  1239. if err != nil {
  1240. return nil, err
  1241. }
  1242. if len(op.ResultSet.Rows) > 1 {
  1243. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1244. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1245. if err != nil {
  1246. return nil, err
  1247. }
  1248. environment := ""
  1249. for _, d := range r.Data[1 : len(formattedAggregators)+1] {
  1250. if *d.VarCharValue != "" {
  1251. environment = *d.VarCharValue // just set to the first nonempty match
  1252. }
  1253. break
  1254. }
  1255. ooc := &OutOfClusterAllocation{
  1256. Aggregator: strings.Join(aggregators, ","),
  1257. Environment: environment,
  1258. Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
  1259. Cost: cost,
  1260. }
  1261. oocAllocs = append(oocAllocs, ooc)
  1262. }
  1263. } else {
  1264. klog.V(1).Infof("No results available for %s at database %s between %s and %s", strings.Join(formattedAggregators, ","), customPricing.AthenaTable, start, end)
  1265. }
  1266. }
  1267. if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
  1268. gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
  1269. if err != nil {
  1270. klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
  1271. }
  1272. gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
  1273. if err != nil {
  1274. klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
  1275. }
  1276. oocAllocs = append(oocAllocs, gcpOOC...)
  1277. }
  1278. return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
  1279. }
  1280. // QuerySQL can query a properly configured Athena database.
  1281. // Used to fetch billing data.
  1282. // Requires a json config in /var/configs with key region, output, and database.
  1283. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1284. customPricing, err := a.GetConfig()
  1285. if err != nil {
  1286. return nil, err
  1287. }
  1288. if customPricing.ServiceKeyName != "" {
  1289. err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1290. if err != nil {
  1291. return nil, err
  1292. }
  1293. err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1294. if err != nil {
  1295. return nil, err
  1296. }
  1297. }
  1298. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1299. if err != nil {
  1300. return nil, err
  1301. }
  1302. defer athenaConfigs.Close()
  1303. b, err := ioutil.ReadAll(athenaConfigs)
  1304. if err != nil {
  1305. return nil, err
  1306. }
  1307. var athenaConf map[string]string
  1308. json.Unmarshal([]byte(b), &athenaConf)
  1309. region := aws.String(customPricing.AthenaRegion)
  1310. resultsBucket := customPricing.AthenaBucketName
  1311. database := customPricing.AthenaDatabase
  1312. c := &aws.Config{
  1313. Region: region,
  1314. }
  1315. s := session.Must(session.NewSession(c))
  1316. svc := athena.New(s)
  1317. var e athena.StartQueryExecutionInput
  1318. var r athena.ResultConfiguration
  1319. r.SetOutputLocation(resultsBucket)
  1320. e.SetResultConfiguration(&r)
  1321. e.SetQueryString(query)
  1322. var q athena.QueryExecutionContext
  1323. q.SetDatabase(database)
  1324. e.SetQueryExecutionContext(&q)
  1325. res, err := svc.StartQueryExecution(&e)
  1326. if err != nil {
  1327. return nil, err
  1328. }
  1329. klog.V(2).Infof("StartQueryExecution result:")
  1330. klog.V(2).Infof(res.GoString())
  1331. var qri athena.GetQueryExecutionInput
  1332. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1333. var qrop *athena.GetQueryExecutionOutput
  1334. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1335. for {
  1336. qrop, err = svc.GetQueryExecution(&qri)
  1337. if err != nil {
  1338. return nil, err
  1339. }
  1340. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1341. break
  1342. }
  1343. time.Sleep(duration)
  1344. }
  1345. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1346. var ip athena.GetQueryResultsInput
  1347. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1348. op, err := svc.GetQueryResults(&ip)
  1349. if err != nil {
  1350. return nil, err
  1351. }
  1352. b, err := json.Marshal(op.ResultSet)
  1353. if err != nil {
  1354. return nil, err
  1355. }
  1356. return b, nil
  1357. }
  1358. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1359. }
  1360. type spotInfo struct {
  1361. Timestamp string `csv:"Timestamp"`
  1362. UsageType string `csv:"UsageType"`
  1363. Operation string `csv:"Operation"`
  1364. InstanceID string `csv:"InstanceID"`
  1365. MyBidID string `csv:"MyBidID"`
  1366. MyMaxPrice string `csv:"MyMaxPrice"`
  1367. MarketPrice string `csv:"MarketPrice"`
  1368. Charge string `csv:"Charge"`
  1369. Version string `csv:"Version"`
  1370. }
  1371. type fnames []*string
  1372. func (f fnames) Len() int {
  1373. return len(f)
  1374. }
  1375. func (f fnames) Swap(i, j int) {
  1376. f[i], f[j] = f[j], f[i]
  1377. }
  1378. func (f fnames) Less(i, j int) bool {
  1379. key1 := strings.Split(*f[i], ".")
  1380. key2 := strings.Split(*f[j], ".")
  1381. t1, err := time.Parse("2006-01-02-15", key1[1])
  1382. if err != nil {
  1383. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1384. return false
  1385. }
  1386. t2, err := time.Parse("2006-01-02-15", key2[1])
  1387. if err != nil {
  1388. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1389. return false
  1390. }
  1391. return t1.Before(t2)
  1392. }
  1393. func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1394. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1395. err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
  1396. if err != nil {
  1397. return nil, err
  1398. }
  1399. err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
  1400. if err != nil {
  1401. return nil, err
  1402. }
  1403. }
  1404. s3Prefix := projectID
  1405. if len(prefix) != 0 {
  1406. s3Prefix = prefix + "/" + s3Prefix
  1407. }
  1408. c := aws.NewConfig().WithRegion(region)
  1409. s := session.Must(session.NewSession(c))
  1410. s3Svc := s3.New(s)
  1411. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1412. tNow := time.Now()
  1413. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1414. ls := &s3.ListObjectsInput{
  1415. Bucket: aws.String(bucket),
  1416. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1417. }
  1418. ls2 := &s3.ListObjectsInput{
  1419. Bucket: aws.String(bucket),
  1420. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1421. }
  1422. lso, err := s3Svc.ListObjects(ls)
  1423. if err != nil {
  1424. return nil, err
  1425. }
  1426. lsoLen := len(lso.Contents)
  1427. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1428. if lsoLen == 0 {
  1429. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1430. }
  1431. lso2, err := s3Svc.ListObjects(ls2)
  1432. if err != nil {
  1433. return nil, err
  1434. }
  1435. lso2Len := len(lso2.Contents)
  1436. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1437. if lso2Len == 0 {
  1438. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1439. }
  1440. var keys []*string
  1441. for _, obj := range lso.Contents {
  1442. keys = append(keys, obj.Key)
  1443. }
  1444. for _, obj := range lso2.Contents {
  1445. keys = append(keys, obj.Key)
  1446. }
  1447. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1448. header, err := csvutil.Header(spotInfo{}, "csv")
  1449. if err != nil {
  1450. return nil, err
  1451. }
  1452. fieldsPerRecord := len(header)
  1453. spots := make(map[string]*spotInfo)
  1454. for _, key := range keys {
  1455. getObj := &s3.GetObjectInput{
  1456. Bucket: aws.String(bucket),
  1457. Key: key,
  1458. }
  1459. buf := aws.NewWriteAtBuffer([]byte{})
  1460. _, err := downloader.Download(buf, getObj)
  1461. if err != nil {
  1462. return nil, err
  1463. }
  1464. r := bytes.NewReader(buf.Bytes())
  1465. gr, err := gzip.NewReader(r)
  1466. if err != nil {
  1467. return nil, err
  1468. }
  1469. csvReader := csv.NewReader(gr)
  1470. csvReader.Comma = '\t'
  1471. csvReader.FieldsPerRecord = fieldsPerRecord
  1472. dec, err := csvutil.NewDecoder(csvReader, header...)
  1473. if err != nil {
  1474. return nil, err
  1475. }
  1476. var foundVersion string
  1477. for {
  1478. spot := spotInfo{}
  1479. err := dec.Decode(&spot)
  1480. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1481. if err == io.EOF {
  1482. break
  1483. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1484. rec := dec.Record()
  1485. // the first two "Record()" will be the comment lines
  1486. // and they show up as len() == 1
  1487. // the first of which is "#Version"
  1488. // the second of which is "#Fields: "
  1489. if len(rec) != 1 {
  1490. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1491. continue
  1492. }
  1493. if len(foundVersion) == 0 {
  1494. spotFeedVersion := rec[0]
  1495. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1496. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1497. if matches != nil {
  1498. foundVersion = matches[1]
  1499. if foundVersion != supportedSpotFeedVersion {
  1500. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1501. break
  1502. }
  1503. }
  1504. continue
  1505. } else if strings.Index(rec[0], "#") == 0 {
  1506. continue
  1507. } else {
  1508. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1509. continue
  1510. }
  1511. } else if err != nil {
  1512. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1513. continue
  1514. }
  1515. klog.V(4).Infof("Found spot info %+v", spot)
  1516. spots[spot.InstanceID] = &spot
  1517. }
  1518. gr.Close()
  1519. }
  1520. return spots, nil
  1521. }
  1522. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1523. /*
  1524. numReserved := len(a.ReservedInstances)
  1525. // Early return if no reserved instance data loaded
  1526. if numReserved == 0 {
  1527. klog.V(4).Infof("[Reserved] No Reserved Instances")
  1528. return
  1529. }
  1530. cfg, err := a.GetConfig()
  1531. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  1532. if err != nil {
  1533. klog.V(3).Infof("Could not parse default cpu price")
  1534. defaultCPU = 0.031611
  1535. }
  1536. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  1537. if err != nil {
  1538. klog.V(3).Infof("Could not parse default ram price")
  1539. defaultRAM = 0.004237
  1540. }
  1541. cpuToRAMRatio := defaultCPU / defaultRAM
  1542. now := time.Now()
  1543. instances := make(map[string][]*AWSReservedInstance)
  1544. for _, r := range a.ReservedInstances {
  1545. if now.Before(r.StartDate) || now.After(r.EndDate) {
  1546. klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
  1547. continue
  1548. }
  1549. _, ok := instances[r.Region]
  1550. if !ok {
  1551. instances[r.Region] = []*AWSReservedInstance{r}
  1552. } else {
  1553. instances[r.Region] = append(instances[r.Region], r)
  1554. }
  1555. }
  1556. awsNodes := make(map[string]*v1.Node)
  1557. currentNodes := a.Clientset.GetAllNodes()
  1558. // Create a node name -> node map
  1559. for _, awsNode := range currentNodes {
  1560. awsNodes[awsNode.GetName()] = awsNode
  1561. }
  1562. // go through all provider nodes using k8s nodes for region
  1563. for nodeName, node := range nodes {
  1564. // Reset reserved allocation to prevent double allocation
  1565. node.Reserved = nil
  1566. kNode, ok := awsNodes[nodeName]
  1567. if !ok {
  1568. klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
  1569. continue
  1570. }
  1571. nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
  1572. if !ok {
  1573. klog.V(1).Infof("[Reserved] Could not find node region")
  1574. continue
  1575. }
  1576. reservedInstances, ok := instances[nodeRegion]
  1577. if !ok {
  1578. klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
  1579. continue
  1580. }
  1581. // Determine the InstanceType of the node
  1582. instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
  1583. if !ok {
  1584. continue
  1585. }
  1586. ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
  1587. if err != nil {
  1588. continue
  1589. }
  1590. ramGB := ramBytes / 1024 / 1024 / 1024
  1591. cpu, err := strconv.ParseFloat(node.VCPU, 64)
  1592. if err != nil {
  1593. continue
  1594. }
  1595. ramMultiple := cpu*cpuToRAMRatio + ramGB
  1596. node.Reserved = &ReservedInstanceData{
  1597. ReservedCPU: 0,
  1598. ReservedRAM: 0,
  1599. }
  1600. for i, reservedInstance := range reservedInstances {
  1601. if reservedInstance.InstanceType == instanceType {
  1602. // Use < 0 to mark as ALL
  1603. node.Reserved.ReservedCPU = -1
  1604. node.Reserved.ReservedRAM = -1
  1605. // Set Costs based on CPU/RAM ratios
  1606. ramPrice := reservedInstance.PricePerHour / ramMultiple
  1607. node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
  1608. node.Reserved.RAMCost = ramPrice
  1609. // Remove the reserve from the temporary slice to prevent
  1610. // being reallocated
  1611. instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
  1612. break
  1613. }
  1614. }
  1615. }*/
  1616. }
  1617. type AWSReservedInstance struct {
  1618. Zone string
  1619. Region string
  1620. InstanceType string
  1621. InstanceCount int64
  1622. InstanceTenacy string
  1623. StartDate time.Time
  1624. EndDate time.Time
  1625. PricePerHour float64
  1626. }
  1627. func (ari *AWSReservedInstance) String() string {
  1628. return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
  1629. }
  1630. func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
  1631. return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
  1632. }
  1633. func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
  1634. var pricePerHour float64
  1635. if len(ri.RecurringCharges) > 0 {
  1636. for _, rc := range ri.RecurringCharges {
  1637. if isReservedInstanceHourlyPrice(rc) {
  1638. pricePerHour = *rc.Amount
  1639. break
  1640. }
  1641. }
  1642. }
  1643. // If we're still unable to resolve hourly price, try fixed -> hourly
  1644. if pricePerHour == 0 {
  1645. if ri.Duration != nil && ri.FixedPrice != nil {
  1646. var durHours float64
  1647. durSeconds := float64(*ri.Duration)
  1648. fixedPrice := float64(*ri.FixedPrice)
  1649. if durSeconds != 0 && fixedPrice != 0 {
  1650. durHours = durSeconds / 60 / 60
  1651. pricePerHour = fixedPrice / durHours
  1652. }
  1653. }
  1654. }
  1655. if pricePerHour == 0 {
  1656. return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
  1657. }
  1658. return pricePerHour, nil
  1659. }
  1660. func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
  1661. c := &aws.Config{
  1662. Region: aws.String(region),
  1663. }
  1664. s := session.Must(session.NewSession(c))
  1665. svc := ec2.New(s)
  1666. response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
  1667. if err != nil {
  1668. return nil, err
  1669. }
  1670. var reservedInstances []*AWSReservedInstance
  1671. for _, ri := range response.ReservedInstances {
  1672. var zone string
  1673. if ri.AvailabilityZone != nil {
  1674. zone = *ri.AvailabilityZone
  1675. }
  1676. pricePerHour, err := getReservedInstancePrice(ri)
  1677. if err != nil {
  1678. klog.V(1).Infof("Error Resolving Price: %s", err.Error())
  1679. continue
  1680. }
  1681. reservedInstances = append(reservedInstances, &AWSReservedInstance{
  1682. Zone: zone,
  1683. Region: region,
  1684. InstanceType: *ri.InstanceType,
  1685. InstanceCount: *ri.InstanceCount,
  1686. InstanceTenacy: *ri.InstanceTenancy,
  1687. StartDate: *ri.Start,
  1688. EndDate: *ri.End,
  1689. PricePerHour: pricePerHour,
  1690. })
  1691. }
  1692. return reservedInstances, nil
  1693. }
  1694. func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
  1695. err := a.configureAWSAuth()
  1696. if err != nil {
  1697. return nil, fmt.Errorf("Error Configuring aws auth: %s", err.Error())
  1698. }
  1699. var reservedInstances []*AWSReservedInstance
  1700. nodes := a.Clientset.GetAllNodes()
  1701. regionsSeen := make(map[string]bool)
  1702. for _, node := range nodes {
  1703. region, ok := node.Labels[v1.LabelZoneRegion]
  1704. if !ok {
  1705. continue
  1706. }
  1707. if regionsSeen[region] {
  1708. continue
  1709. }
  1710. ris, err := getRegionReservedInstances(region)
  1711. if err != nil {
  1712. klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
  1713. continue
  1714. }
  1715. regionsSeen[region] = true
  1716. reservedInstances = append(reservedInstances, ris...)
  1717. }
  1718. return reservedInstances, nil
  1719. }