awsprovider.go 69 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397
  1. package cloud
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "k8s.io/klog"
  18. "github.com/kubecost/cost-model/pkg/clustercache"
  19. "github.com/kubecost/cost-model/pkg/env"
  20. "github.com/kubecost/cost-model/pkg/errors"
  21. "github.com/kubecost/cost-model/pkg/log"
  22. "github.com/kubecost/cost-model/pkg/util"
  23. "github.com/aws/aws-sdk-go/aws"
  24. "github.com/aws/aws-sdk-go/aws/awserr"
  25. "github.com/aws/aws-sdk-go/aws/credentials"
  26. "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
  27. "github.com/aws/aws-sdk-go/aws/session"
  28. "github.com/aws/aws-sdk-go/service/athena"
  29. "github.com/aws/aws-sdk-go/service/ec2"
  30. "github.com/aws/aws-sdk-go/service/s3"
  31. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  32. "github.com/jszwec/csvutil"
  33. v1 "k8s.io/api/core/v1"
  34. )
  35. const awsReservedInstancePricePerHour = 0.0287
  36. const supportedSpotFeedVersion = "1"
  37. const SpotInfoUpdateType = "spotinfo"
  38. const AthenaInfoUpdateType = "athenainfo"
  39. // How often spot data is refreshed
  40. const SpotRefreshDuration = 15 * time.Minute
  41. const defaultConfigPath = "/var/configs/"
  42. var awsRegions = []string{
  43. "us-east-2",
  44. "us-east-1",
  45. "us-west-1",
  46. "us-west-2",
  47. "ap-east-1",
  48. "ap-south-1",
  49. "ap-northeast-3",
  50. "ap-northeast-2",
  51. "ap-southeast-1",
  52. "ap-southeast-2",
  53. "ap-northeast-1",
  54. "ca-central-1",
  55. "cn-north-1",
  56. "cn-northwest-1",
  57. "eu-central-1",
  58. "eu-west-1",
  59. "eu-west-2",
  60. "eu-west-3",
  61. "eu-north-1",
  62. "me-south-1",
  63. "sa-east-1",
  64. "us-gov-east-1",
  65. "us-gov-west-1",
  66. }
  67. // AWS represents an Amazon Provider
  68. type AWS struct {
  69. Pricing map[string]*AWSProductTerms
  70. SpotPricingByInstanceID map[string]*spotInfo
  71. SpotPricingUpdatedAt *time.Time
  72. SpotRefreshRunning bool
  73. SpotPricingLock sync.RWMutex
  74. RIPricingByInstanceID map[string]*RIData
  75. RIDataRunning bool
  76. RIDataLock sync.RWMutex
  77. SavingsPlanDataByInstanceID map[string]*SavingsPlanData
  78. SavingsPlanDataRunning bool
  79. SavingsPlanDataLock sync.RWMutex
  80. ValidPricingKeys map[string]bool
  81. Clientset clustercache.ClusterCache
  82. BaseCPUPrice string
  83. BaseRAMPrice string
  84. BaseGPUPrice string
  85. BaseSpotCPUPrice string
  86. BaseSpotRAMPrice string
  87. SpotLabelName string
  88. SpotLabelValue string
  89. ServiceKeyName string
  90. ServiceKeySecret string
  91. SpotDataRegion string
  92. SpotDataBucket string
  93. SpotDataPrefix string
  94. ProjectID string
  95. DownloadPricingDataLock sync.RWMutex
  96. Config *ProviderConfig
  97. ServiceAccountChecks map[string]*ServiceAccountCheck
  98. *CustomProvider
  99. }
  100. type AWSAccessKey struct {
  101. AccessKeyID string `json:"aws_access_key_id"`
  102. SecretAccessKey string `json:"aws_secret_access_key"`
  103. }
  104. // AWSPricing maps a k8s node to an AWS Pricing "product"
  105. type AWSPricing struct {
  106. Products map[string]*AWSProduct `json:"products"`
  107. Terms AWSPricingTerms `json:"terms"`
  108. }
  109. // AWSProduct represents a purchased SKU
  110. type AWSProduct struct {
  111. Sku string `json:"sku"`
  112. Attributes AWSProductAttributes `json:"attributes"`
  113. }
  114. // AWSProductAttributes represents metadata about the product used to map to a node.
  115. type AWSProductAttributes struct {
  116. Location string `json:"location"`
  117. InstanceType string `json:"instanceType"`
  118. Memory string `json:"memory"`
  119. Storage string `json:"storage"`
  120. VCpu string `json:"vcpu"`
  121. UsageType string `json:"usagetype"`
  122. OperatingSystem string `json:"operatingSystem"`
  123. PreInstalledSw string `json:"preInstalledSw"`
  124. InstanceFamily string `json:"instanceFamily"`
  125. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  126. }
  127. // AWSPricingTerms are how you pay for the node: OnDemand, Reserved, or (TODO) Spot
  128. type AWSPricingTerms struct {
  129. OnDemand map[string]map[string]*AWSOfferTerm `json:"OnDemand"`
  130. Reserved map[string]map[string]*AWSOfferTerm `json:"Reserved"`
  131. }
  132. // AWSOfferTerm is a sku extension used to pay for the node.
  133. type AWSOfferTerm struct {
  134. Sku string `json:"sku"`
  135. PriceDimensions map[string]*AWSRateCode `json:"priceDimensions"`
  136. }
  137. // AWSRateCode encodes data about the price of a product
  138. type AWSRateCode struct {
  139. Unit string `json:"unit"`
  140. PricePerUnit AWSCurrencyCode `json:"pricePerUnit"`
  141. }
  142. // AWSCurrencyCode is the localized currency. (TODO: support non-USD)
  143. type AWSCurrencyCode struct {
  144. USD string `json:"USD"`
  145. }
  146. // AWSProductTerms represents the full terms of the product
  147. type AWSProductTerms struct {
  148. Sku string `json:"sku"`
  149. OnDemand *AWSOfferTerm `json:"OnDemand"`
  150. Reserved *AWSOfferTerm `json:"Reserved"`
  151. Memory string `json:"memory"`
  152. Storage string `json:"storage"`
  153. VCpu string `json:"vcpu"`
  154. GPU string `json:"gpu"` // GPU represents the number of GPU on the instance
  155. PV *PV `json:"pv"`
  156. }
  157. // ClusterIdEnvVar is the environment variable in which one can manually set the ClusterId
  158. const ClusterIdEnvVar = "AWS_CLUSTER_ID"
  159. // OnDemandRateCode is appended to an node sku
  160. const OnDemandRateCode = ".JRTCKXETXF"
  161. // ReservedRateCode is appended to a node sku
  162. const ReservedRateCode = ".38NPMPTW36"
  163. // HourlyRateCode is appended to a node sku
  164. const HourlyRateCode = ".6YS6EN2CT7"
  165. // volTypes are used to map between AWS UsageTypes and
  166. // EBS volume types, as they would appear in K8s storage class
  167. // name and the EC2 API.
  168. var volTypes = map[string]string{
  169. "EBS:VolumeUsage.gp2": "gp2",
  170. "EBS:VolumeUsage": "standard",
  171. "EBS:VolumeUsage.sc1": "sc1",
  172. "EBS:VolumeP-IOPS.piops": "io1",
  173. "EBS:VolumeUsage.st1": "st1",
  174. "EBS:VolumeUsage.piops": "io1",
  175. "gp2": "EBS:VolumeUsage.gp2",
  176. "standard": "EBS:VolumeUsage",
  177. "sc1": "EBS:VolumeUsage.sc1",
  178. "io1": "EBS:VolumeUsage.piops",
  179. "st1": "EBS:VolumeUsage.st1",
  180. }
  181. // locationToRegion maps AWS region names (As they come from Billing)
  182. // to actual region identifiers
  183. var locationToRegion = map[string]string{
  184. "US East (Ohio)": "us-east-2",
  185. "US East (N. Virginia)": "us-east-1",
  186. "US West (N. California)": "us-west-1",
  187. "US West (Oregon)": "us-west-2",
  188. "Asia Pacific (Hong Kong)": "ap-east-1",
  189. "Asia Pacific (Mumbai)": "ap-south-1",
  190. "Asia Pacific (Osaka-Local)": "ap-northeast-3",
  191. "Asia Pacific (Seoul)": "ap-northeast-2",
  192. "Asia Pacific (Singapore)": "ap-southeast-1",
  193. "Asia Pacific (Sydney)": "ap-southeast-2",
  194. "Asia Pacific (Tokyo)": "ap-northeast-1",
  195. "Canada (Central)": "ca-central-1",
  196. "China (Beijing)": "cn-north-1",
  197. "China (Ningxia)": "cn-northwest-1",
  198. "EU (Frankfurt)": "eu-central-1",
  199. "EU (Ireland)": "eu-west-1",
  200. "EU (London)": "eu-west-2",
  201. "EU (Paris)": "eu-west-3",
  202. "EU (Stockholm)": "eu-north-1",
  203. "South America (Sao Paulo)": "sa-east-1",
  204. "AWS GovCloud (US-East)": "us-gov-east-1",
  205. "AWS GovCloud (US)": "us-gov-west-1",
  206. }
  207. var regionToBillingRegionCode = map[string]string{
  208. "us-east-2": "USE2",
  209. "us-east-1": "",
  210. "us-west-1": "USW1",
  211. "us-west-2": "USW2",
  212. "ap-east-1": "APE1",
  213. "ap-south-1": "APS3",
  214. "ap-northeast-3": "APN3",
  215. "ap-northeast-2": "APN2",
  216. "ap-southeast-1": "APS1",
  217. "ap-southeast-2": "APS2",
  218. "ap-northeast-1": "APN1",
  219. "ca-central-1": "CAN1",
  220. "cn-north-1": "",
  221. "cn-northwest-1": "",
  222. "eu-central-1": "EUC1",
  223. "eu-west-1": "EU",
  224. "eu-west-2": "EUW2",
  225. "eu-west-3": "EUW3",
  226. "eu-north-1": "EUN1",
  227. "sa-east-1": "SAE1",
  228. "us-gov-east-1": "UGE1",
  229. "us-gov-west-1": "UGW1",
  230. }
  231. var loadedAWSSecret bool = false
  232. var awsSecret *AWSAccessKey = nil
  233. func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
  234. return ""
  235. }
  236. // KubeAttrConversion maps the k8s labels for region to an aws region
  237. func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem string) string {
  238. operatingSystem = strings.ToLower(operatingSystem)
  239. region := locationToRegion[location]
  240. return region + "," + instanceType + "," + operatingSystem
  241. }
  242. type AwsSpotFeedInfo struct {
  243. BucketName string `json:"bucketName"`
  244. Prefix string `json:"prefix"`
  245. Region string `json:"region"`
  246. AccountID string `json:"projectID"`
  247. ServiceKeyName string `json:"serviceKeyName"`
  248. ServiceKeySecret string `json:"serviceKeySecret"`
  249. SpotLabel string `json:"spotLabel"`
  250. SpotLabelValue string `json:"spotLabelValue"`
  251. }
  252. type AwsAthenaInfo struct {
  253. AthenaBucketName string `json:"athenaBucketName"`
  254. AthenaRegion string `json:"athenaRegion"`
  255. AthenaDatabase string `json:"athenaDatabase"`
  256. AthenaTable string `json:"athenaTable"`
  257. ServiceKeyName string `json:"serviceKeyName"`
  258. ServiceKeySecret string `json:"serviceKeySecret"`
  259. AccountID string `json:"projectID"`
  260. MasterPayerARN string `json:"masterPayerARN"`
  261. }
  262. func (aws *AWS) GetManagementPlatform() (string, error) {
  263. nodes := aws.Clientset.GetAllNodes()
  264. if len(nodes) > 0 {
  265. n := nodes[0]
  266. version := n.Status.NodeInfo.KubeletVersion
  267. if strings.Contains(version, "eks") {
  268. return "eks", nil
  269. }
  270. if _, ok := n.Labels["kops.k8s.io/instancegroup"]; ok {
  271. return "kops", nil
  272. }
  273. }
  274. return "", nil
  275. }
  276. func (aws *AWS) GetConfig() (*CustomPricing, error) {
  277. c, err := aws.Config.GetCustomPricingData()
  278. if c.Discount == "" {
  279. c.Discount = "0%"
  280. }
  281. if c.NegotiatedDiscount == "" {
  282. c.NegotiatedDiscount = "0%"
  283. }
  284. if err != nil {
  285. return nil, err
  286. }
  287. return c, nil
  288. }
  289. func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
  290. return aws.Config.UpdateFromMap(a)
  291. }
  292. func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
  293. return aws.Config.Update(func(c *CustomPricing) error {
  294. if updateType == SpotInfoUpdateType {
  295. a := AwsSpotFeedInfo{}
  296. err := json.NewDecoder(r).Decode(&a)
  297. if err != nil {
  298. return err
  299. }
  300. c.ServiceKeyName = a.ServiceKeyName
  301. if a.ServiceKeySecret != "" {
  302. c.ServiceKeySecret = a.ServiceKeySecret
  303. }
  304. c.SpotDataPrefix = a.Prefix
  305. c.SpotDataBucket = a.BucketName
  306. c.ProjectID = a.AccountID
  307. c.SpotDataRegion = a.Region
  308. c.SpotLabel = a.SpotLabel
  309. c.SpotLabelValue = a.SpotLabelValue
  310. } else if updateType == AthenaInfoUpdateType {
  311. a := AwsAthenaInfo{}
  312. err := json.NewDecoder(r).Decode(&a)
  313. if err != nil {
  314. return err
  315. }
  316. c.AthenaBucketName = a.AthenaBucketName
  317. c.AthenaRegion = a.AthenaRegion
  318. c.AthenaDatabase = a.AthenaDatabase
  319. c.AthenaTable = a.AthenaTable
  320. c.ServiceKeyName = a.ServiceKeyName
  321. if a.ServiceKeySecret != "" {
  322. c.ServiceKeySecret = a.ServiceKeySecret
  323. }
  324. if a.MasterPayerARN != "" {
  325. c.MasterPayerARN = a.MasterPayerARN
  326. }
  327. c.AthenaProjectID = a.AccountID
  328. } else {
  329. a := make(map[string]interface{})
  330. err := json.NewDecoder(r).Decode(&a)
  331. if err != nil {
  332. return err
  333. }
  334. for k, v := range a {
  335. kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
  336. vstr, ok := v.(string)
  337. if ok {
  338. err := SetCustomPricingField(c, kUpper, vstr)
  339. if err != nil {
  340. return err
  341. }
  342. } else {
  343. sci := v.(map[string]interface{})
  344. sc := make(map[string]string)
  345. for k, val := range sci {
  346. sc[k] = val.(string)
  347. }
  348. c.SharedCosts = sc //todo: support reflection/multiple map fields
  349. }
  350. }
  351. }
  352. if env.IsRemoteEnabled() {
  353. err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
  354. if err != nil {
  355. return err
  356. }
  357. }
  358. return nil
  359. })
  360. }
  361. type awsKey struct {
  362. SpotLabelName string
  363. SpotLabelValue string
  364. Labels map[string]string
  365. ProviderID string
  366. }
  367. func (k *awsKey) GPUType() string {
  368. return ""
  369. }
  370. func (k *awsKey) ID() string {
  371. provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
  372. for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
  373. if matchNum == 2 {
  374. return group
  375. }
  376. }
  377. klog.V(3).Infof("Could not find instance ID in \"%s\"", k.ProviderID)
  378. return ""
  379. }
  380. func (k *awsKey) Features() string {
  381. instanceType := k.Labels[v1.LabelInstanceType]
  382. var operatingSystem string
  383. operatingSystem, ok := k.Labels[v1.LabelOSStable]
  384. if !ok {
  385. operatingSystem = k.Labels["beta.kubernetes.io/os"]
  386. }
  387. region := k.Labels[v1.LabelZoneRegion]
  388. key := region + "," + instanceType + "," + operatingSystem
  389. usageType := "preemptible"
  390. spotKey := key + "," + usageType
  391. if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
  392. return spotKey
  393. }
  394. if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
  395. return spotKey
  396. }
  397. return key
  398. }
  399. func (aws *AWS) PVPricing(pvk PVKey) (*PV, error) {
  400. pricing, ok := aws.Pricing[pvk.Features()]
  401. if !ok {
  402. klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
  403. return &PV{}, nil
  404. }
  405. return pricing.PV, nil
  406. }
  407. type awsPVKey struct {
  408. Labels map[string]string
  409. StorageClassParameters map[string]string
  410. StorageClassName string
  411. Name string
  412. DefaultRegion string
  413. }
  414. func (aws *AWS) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
  415. return &awsPVKey{
  416. Labels: pv.Labels,
  417. StorageClassName: pv.Spec.StorageClassName,
  418. StorageClassParameters: parameters,
  419. Name: pv.Name,
  420. DefaultRegion: defaultRegion,
  421. }
  422. }
  423. func (key *awsPVKey) GetStorageClass() string {
  424. return key.StorageClassName
  425. }
  426. func (key *awsPVKey) Features() string {
  427. storageClass := key.StorageClassParameters["type"]
  428. if storageClass == "standard" {
  429. storageClass = "gp2"
  430. }
  431. // Storage class names are generally EBS volume types (gp2)
  432. // Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
  433. // Converts between the 2
  434. region := key.Labels[v1.LabelZoneRegion]
  435. //if region == "" {
  436. // region = "us-east-1"
  437. //}
  438. class, ok := volTypes[storageClass]
  439. if !ok {
  440. klog.V(4).Infof("No voltype mapping for %s's storageClass: %s", key.Name, storageClass)
  441. }
  442. return region + "," + class
  443. }
  444. // GetKey maps node labels to information needed to retrieve pricing data
  445. func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
  446. return &awsKey{
  447. SpotLabelName: aws.SpotLabelName,
  448. SpotLabelValue: aws.SpotLabelValue,
  449. Labels: labels,
  450. ProviderID: labels["providerID"],
  451. }
  452. }
  453. func (aws *AWS) isPreemptible(key string) bool {
  454. s := strings.Split(key, ",")
  455. if len(s) == 4 && s[3] == "preemptible" {
  456. return true
  457. }
  458. return false
  459. }
  460. // DownloadPricingData fetches data from the AWS Pricing API
  461. func (aws *AWS) DownloadPricingData() error {
  462. aws.DownloadPricingDataLock.Lock()
  463. defer aws.DownloadPricingDataLock.Unlock()
  464. if aws.ServiceAccountChecks == nil {
  465. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  466. }
  467. c, err := aws.Config.GetCustomPricingData()
  468. if err != nil {
  469. klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
  470. }
  471. aws.BaseCPUPrice = c.CPU
  472. aws.BaseRAMPrice = c.RAM
  473. aws.BaseGPUPrice = c.GPU
  474. aws.BaseSpotCPUPrice = c.SpotCPU
  475. aws.BaseSpotRAMPrice = c.SpotRAM
  476. aws.SpotLabelName = c.SpotLabel
  477. aws.SpotLabelValue = c.SpotLabelValue
  478. aws.SpotDataBucket = c.SpotDataBucket
  479. aws.SpotDataPrefix = c.SpotDataPrefix
  480. aws.ProjectID = c.ProjectID
  481. aws.SpotDataRegion = c.SpotDataRegion
  482. skn, sks := aws.getAWSAuth(false, c)
  483. aws.ServiceKeyName = skn
  484. aws.ServiceKeySecret = sks
  485. if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
  486. klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
  487. }
  488. nodeList := aws.Clientset.GetAllNodes()
  489. inputkeys := make(map[string]bool)
  490. for _, n := range nodeList {
  491. labels := n.GetObjectMeta().GetLabels()
  492. key := aws.GetKey(labels, n)
  493. inputkeys[key.Features()] = true
  494. }
  495. pvList := aws.Clientset.GetAllPersistentVolumes()
  496. storageClasses := aws.Clientset.GetAllStorageClasses()
  497. storageClassMap := make(map[string]map[string]string)
  498. for _, storageClass := range storageClasses {
  499. params := storageClass.Parameters
  500. storageClassMap[storageClass.ObjectMeta.Name] = params
  501. if storageClass.GetAnnotations()["storageclass.kubernetes.io/is-default-class"] == "true" || storageClass.GetAnnotations()["storageclass.beta.kubernetes.io/is-default-class"] == "true" {
  502. storageClassMap["default"] = params
  503. storageClassMap[""] = params
  504. }
  505. }
  506. pvkeys := make(map[string]PVKey)
  507. for _, pv := range pvList {
  508. params, ok := storageClassMap[pv.Spec.StorageClassName]
  509. if !ok {
  510. klog.V(2).Infof("Unable to find params for storageClassName %s, falling back to default pricing", pv.Spec.StorageClassName)
  511. continue
  512. }
  513. key := aws.GetPVKey(pv, params, "")
  514. pvkeys[key.Features()] = key
  515. }
  516. // RIDataRunning establishes the existance of the goroutine. Since it's possible we
  517. // run multiple downloads, we don't want to create multiple go routines if one already exists
  518. if !aws.RIDataRunning && c.AthenaBucketName != "" {
  519. err = aws.GetReservationDataFromAthena() // Block until one run has completed.
  520. if err != nil {
  521. klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
  522. } else { // If we make one successful run, check on new reservation data every hour
  523. go func() {
  524. defer errors.HandlePanic()
  525. aws.RIDataRunning = true
  526. for {
  527. klog.Infof("Reserved Instance watcher running... next update in 1h")
  528. time.Sleep(time.Hour)
  529. err := aws.GetReservationDataFromAthena()
  530. if err != nil {
  531. klog.Infof("Error updating RI data: %s", err.Error())
  532. }
  533. }
  534. }()
  535. }
  536. }
  537. if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
  538. err = aws.GetSavingsPlanDataFromAthena()
  539. if err != nil {
  540. klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
  541. } else {
  542. go func() {
  543. defer errors.HandlePanic()
  544. aws.SavingsPlanDataRunning = true
  545. for {
  546. klog.Infof("Savings Plan watcher running... next update in 1h")
  547. time.Sleep(time.Hour)
  548. err := aws.GetSavingsPlanDataFromAthena()
  549. if err != nil {
  550. klog.Infof("Error updating Savings Plan data: %s", err.Error())
  551. }
  552. }
  553. }()
  554. }
  555. }
  556. aws.Pricing = make(map[string]*AWSProductTerms)
  557. aws.ValidPricingKeys = make(map[string]bool)
  558. skusToKeys := make(map[string]string)
  559. pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
  560. klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
  561. resp, err := http.Get(pricingURL)
  562. if err != nil {
  563. klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
  564. return err
  565. }
  566. klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
  567. dec := json.NewDecoder(resp.Body)
  568. for {
  569. t, err := dec.Token()
  570. if err == io.EOF {
  571. klog.V(2).Infof("done loading \"%s\"\n", pricingURL)
  572. break
  573. }
  574. if t == "products" {
  575. _, err := dec.Token() // this should parse the opening "{""
  576. if err != nil {
  577. return err
  578. }
  579. for dec.More() {
  580. _, err := dec.Token() // the sku token
  581. if err != nil {
  582. return err
  583. }
  584. product := &AWSProduct{}
  585. err = dec.Decode(&product)
  586. if err != nil {
  587. klog.V(1).Infof("Error parsing response from \"%s\": %v", pricingURL, err.Error())
  588. break
  589. }
  590. if product.Attributes.PreInstalledSw == "NA" &&
  591. (strings.HasPrefix(product.Attributes.UsageType, "BoxUsage") || strings.Contains(product.Attributes.UsageType, "-BoxUsage")) {
  592. key := aws.KubeAttrConversion(product.Attributes.Location, product.Attributes.InstanceType, product.Attributes.OperatingSystem)
  593. spotKey := key + ",preemptible"
  594. if inputkeys[key] || inputkeys[spotKey] { // Just grab the sku even if spot, and change the price later.
  595. productTerms := &AWSProductTerms{
  596. Sku: product.Sku,
  597. Memory: product.Attributes.Memory,
  598. Storage: product.Attributes.Storage,
  599. VCpu: product.Attributes.VCpu,
  600. GPU: product.Attributes.GPU,
  601. }
  602. aws.Pricing[key] = productTerms
  603. aws.Pricing[spotKey] = productTerms
  604. skusToKeys[product.Sku] = key
  605. }
  606. aws.ValidPricingKeys[key] = true
  607. aws.ValidPricingKeys[spotKey] = true
  608. } else if strings.Contains(product.Attributes.UsageType, "EBS:Volume") {
  609. // UsageTypes may be prefixed with a region code - we're removing this when using
  610. // volTypes to keep lookups generic
  611. usageTypeRegx := regexp.MustCompile(".*(-|^)(EBS.+)")
  612. usageTypeMatch := usageTypeRegx.FindStringSubmatch(product.Attributes.UsageType)
  613. usageTypeNoRegion := usageTypeMatch[len(usageTypeMatch)-1]
  614. key := locationToRegion[product.Attributes.Location] + "," + usageTypeNoRegion
  615. spotKey := key + ",preemptible"
  616. pv := &PV{
  617. Class: volTypes[usageTypeNoRegion],
  618. Region: locationToRegion[product.Attributes.Location],
  619. }
  620. productTerms := &AWSProductTerms{
  621. Sku: product.Sku,
  622. PV: pv,
  623. }
  624. aws.Pricing[key] = productTerms
  625. aws.Pricing[spotKey] = productTerms
  626. skusToKeys[product.Sku] = key
  627. aws.ValidPricingKeys[key] = true
  628. aws.ValidPricingKeys[spotKey] = true
  629. }
  630. }
  631. }
  632. if t == "terms" {
  633. _, err := dec.Token() // this should parse the opening "{""
  634. if err != nil {
  635. return err
  636. }
  637. termType, err := dec.Token()
  638. if err != nil {
  639. return err
  640. }
  641. if termType == "OnDemand" {
  642. _, err := dec.Token()
  643. if err != nil { // again, should parse an opening "{"
  644. return err
  645. }
  646. for dec.More() {
  647. sku, err := dec.Token()
  648. if err != nil {
  649. return err
  650. }
  651. _, err = dec.Token() // another opening "{"
  652. if err != nil {
  653. return err
  654. }
  655. skuOnDemand, err := dec.Token()
  656. if err != nil {
  657. return err
  658. }
  659. offerTerm := &AWSOfferTerm{}
  660. err = dec.Decode(&offerTerm)
  661. if err != nil {
  662. klog.V(1).Infof("Error decoding AWS Offer Term: " + err.Error())
  663. }
  664. if sku.(string)+OnDemandRateCode == skuOnDemand {
  665. key, ok := skusToKeys[sku.(string)]
  666. spotKey := key + ",preemptible"
  667. if ok {
  668. aws.Pricing[key].OnDemand = offerTerm
  669. aws.Pricing[spotKey].OnDemand = offerTerm
  670. if strings.Contains(key, "EBS:VolumeP-IOPS.piops") {
  671. // If the specific UsageType is the per IO cost used on io1 volumes
  672. // we need to add the per IO cost to the io1 PV cost
  673. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  674. // Add the per IO cost to the PV object for the io1 volume type
  675. aws.Pricing[key].PV.CostPerIO = cost
  676. } else if strings.Contains(key, "EBS:Volume") {
  677. // If volume, we need to get hourly cost and add it to the PV object
  678. cost := offerTerm.PriceDimensions[sku.(string)+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
  679. costFloat, _ := strconv.ParseFloat(cost, 64)
  680. hourlyPrice := costFloat / 730
  681. aws.Pricing[key].PV.Cost = strconv.FormatFloat(hourlyPrice, 'f', -1, 64)
  682. }
  683. }
  684. }
  685. _, err = dec.Token()
  686. if err != nil {
  687. return err
  688. }
  689. }
  690. _, err = dec.Token()
  691. if err != nil {
  692. return err
  693. }
  694. }
  695. }
  696. }
  697. // Always run spot pricing refresh when performing download
  698. aws.refreshSpotPricing(true)
  699. // Only start a single refresh goroutine
  700. if !aws.SpotRefreshRunning {
  701. aws.SpotRefreshRunning = true
  702. go func() {
  703. defer errors.HandlePanic()
  704. for {
  705. klog.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
  706. time.Sleep(SpotRefreshDuration)
  707. // Reoccurring refresh checks update times
  708. aws.refreshSpotPricing(false)
  709. }
  710. }()
  711. }
  712. return nil
  713. }
  714. func (aws *AWS) refreshSpotPricing(force bool) {
  715. aws.SpotPricingLock.Lock()
  716. defer aws.SpotPricingLock.Unlock()
  717. now := time.Now().UTC()
  718. updateTime := now.Add(-SpotRefreshDuration)
  719. // Return if there was an update time set and an hour hasn't elapsed
  720. if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
  721. return
  722. }
  723. sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
  724. if err != nil {
  725. klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
  726. return
  727. }
  728. // update time last updated
  729. aws.SpotPricingUpdatedAt = &now
  730. aws.SpotPricingByInstanceID = sp
  731. }
  732. // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
  733. func (aws *AWS) NetworkPricing() (*Network, error) {
  734. cpricing, err := aws.Config.GetCustomPricingData()
  735. if err != nil {
  736. return nil, err
  737. }
  738. znec, err := strconv.ParseFloat(cpricing.ZoneNetworkEgress, 64)
  739. if err != nil {
  740. return nil, err
  741. }
  742. rnec, err := strconv.ParseFloat(cpricing.RegionNetworkEgress, 64)
  743. if err != nil {
  744. return nil, err
  745. }
  746. inec, err := strconv.ParseFloat(cpricing.InternetNetworkEgress, 64)
  747. if err != nil {
  748. return nil, err
  749. }
  750. return &Network{
  751. ZoneNetworkEgressCost: znec,
  752. RegionNetworkEgressCost: rnec,
  753. InternetNetworkEgressCost: inec,
  754. }, nil
  755. }
  756. // AllNodePricing returns all the billing data fetched.
  757. func (aws *AWS) AllNodePricing() (interface{}, error) {
  758. aws.DownloadPricingDataLock.RLock()
  759. defer aws.DownloadPricingDataLock.RUnlock()
  760. return aws.Pricing, nil
  761. }
  762. func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
  763. aws.SpotPricingLock.RLock()
  764. defer aws.SpotPricingLock.RUnlock()
  765. info, ok := aws.SpotPricingByInstanceID[instanceID]
  766. return info, ok
  767. }
  768. func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
  769. aws.RIDataLock.RLock()
  770. defer aws.RIDataLock.RUnlock()
  771. data, ok := aws.RIPricingByInstanceID[instanceID]
  772. return data, ok
  773. }
  774. func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
  775. aws.SavingsPlanDataLock.RLock()
  776. defer aws.SavingsPlanDataLock.RUnlock()
  777. data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
  778. return data, ok
  779. }
  780. func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
  781. key := k.Features()
  782. if spotInfo, ok := aws.spotPricing(k.ID()); ok {
  783. var spotcost string
  784. klog.V(3).Infof("Looking up spot data from feed for node %s", k.ID())
  785. arr := strings.Split(spotInfo.Charge, " ")
  786. if len(arr) == 2 {
  787. spotcost = arr[0]
  788. } else {
  789. klog.V(2).Infof("Spot data for node %s is missing", k.ID())
  790. }
  791. return &Node{
  792. Cost: spotcost,
  793. VCPU: terms.VCpu,
  794. RAM: terms.Memory,
  795. GPU: terms.GPU,
  796. Storage: terms.Storage,
  797. BaseCPUPrice: aws.BaseCPUPrice,
  798. BaseRAMPrice: aws.BaseRAMPrice,
  799. BaseGPUPrice: aws.BaseGPUPrice,
  800. UsageType: usageType,
  801. }, nil
  802. } else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
  803. klog.Infof("Node %s marked preemitible but we have no data in spot feed", k.ID())
  804. return &Node{
  805. VCPU: terms.VCpu,
  806. VCPUCost: aws.BaseSpotCPUPrice,
  807. RAM: terms.Memory,
  808. GPU: terms.GPU,
  809. RAMCost: aws.BaseSpotRAMPrice,
  810. Storage: terms.Storage,
  811. BaseCPUPrice: aws.BaseCPUPrice,
  812. BaseRAMPrice: aws.BaseRAMPrice,
  813. BaseGPUPrice: aws.BaseGPUPrice,
  814. UsageType: usageType,
  815. }, nil
  816. } else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
  817. strCost := fmt.Sprintf("%f", sp.EffectiveCost)
  818. return &Node{
  819. Cost: strCost,
  820. VCPU: terms.VCpu,
  821. RAM: terms.Memory,
  822. GPU: terms.GPU,
  823. Storage: terms.Storage,
  824. BaseCPUPrice: aws.BaseCPUPrice,
  825. BaseRAMPrice: aws.BaseRAMPrice,
  826. BaseGPUPrice: aws.BaseGPUPrice,
  827. UsageType: usageType,
  828. }, nil
  829. } else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
  830. strCost := fmt.Sprintf("%f", ri.EffectiveCost)
  831. return &Node{
  832. Cost: strCost,
  833. VCPU: terms.VCpu,
  834. RAM: terms.Memory,
  835. GPU: terms.GPU,
  836. Storage: terms.Storage,
  837. BaseCPUPrice: aws.BaseCPUPrice,
  838. BaseRAMPrice: aws.BaseRAMPrice,
  839. BaseGPUPrice: aws.BaseGPUPrice,
  840. UsageType: usageType,
  841. }, nil
  842. }
  843. c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
  844. if !ok {
  845. return nil, fmt.Errorf("Could not fetch data for \"%s\"", k.ID())
  846. }
  847. cost := c.PricePerUnit.USD
  848. return &Node{
  849. Cost: cost,
  850. VCPU: terms.VCpu,
  851. RAM: terms.Memory,
  852. GPU: terms.GPU,
  853. Storage: terms.Storage,
  854. BaseCPUPrice: aws.BaseCPUPrice,
  855. BaseRAMPrice: aws.BaseRAMPrice,
  856. BaseGPUPrice: aws.BaseGPUPrice,
  857. UsageType: usageType,
  858. }, nil
  859. }
  860. // NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
  861. func (aws *AWS) NodePricing(k Key) (*Node, error) {
  862. aws.DownloadPricingDataLock.RLock()
  863. defer aws.DownloadPricingDataLock.RUnlock()
  864. key := k.Features()
  865. usageType := "ondemand"
  866. if aws.isPreemptible(key) {
  867. usageType = "preemptible"
  868. }
  869. terms, ok := aws.Pricing[key]
  870. if ok {
  871. return aws.createNode(terms, usageType, k)
  872. } else if _, ok := aws.ValidPricingKeys[key]; ok {
  873. aws.DownloadPricingDataLock.RUnlock()
  874. err := aws.DownloadPricingData()
  875. aws.DownloadPricingDataLock.RLock()
  876. if err != nil {
  877. return &Node{
  878. Cost: aws.BaseCPUPrice,
  879. BaseCPUPrice: aws.BaseCPUPrice,
  880. BaseRAMPrice: aws.BaseRAMPrice,
  881. BaseGPUPrice: aws.BaseGPUPrice,
  882. UsageType: usageType,
  883. UsesBaseCPUPrice: true,
  884. }, err
  885. }
  886. terms, termsOk := aws.Pricing[key]
  887. if !termsOk {
  888. return &Node{
  889. Cost: aws.BaseCPUPrice,
  890. BaseCPUPrice: aws.BaseCPUPrice,
  891. BaseRAMPrice: aws.BaseRAMPrice,
  892. BaseGPUPrice: aws.BaseGPUPrice,
  893. UsageType: usageType,
  894. UsesBaseCPUPrice: true,
  895. }, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
  896. }
  897. return aws.createNode(terms, usageType, k)
  898. } else { // Fall back to base pricing if we can't find the key.
  899. klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
  900. return &Node{
  901. Cost: aws.BaseCPUPrice,
  902. BaseCPUPrice: aws.BaseCPUPrice,
  903. BaseRAMPrice: aws.BaseRAMPrice,
  904. BaseGPUPrice: aws.BaseGPUPrice,
  905. UsageType: usageType,
  906. UsesBaseCPUPrice: true,
  907. }, nil
  908. }
  909. }
  910. // ClusterInfo returns an object that represents the cluster. TODO: actually return the name of the cluster. Blocked on cluster federation.
  911. func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
  912. defaultClusterName := "AWS Cluster #1"
  913. c, err := awsProvider.GetConfig()
  914. if err != nil {
  915. return nil, err
  916. }
  917. remoteEnabled := env.IsRemoteEnabled()
  918. if c.ClusterName != "" {
  919. m := make(map[string]string)
  920. m["name"] = c.ClusterName
  921. m["provider"] = "AWS"
  922. m["id"] = env.GetClusterID()
  923. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  924. return m, nil
  925. }
  926. makeStructure := func(clusterName string) (map[string]string, error) {
  927. klog.V(2).Infof("Returning \"%s\" as ClusterName", clusterName)
  928. m := make(map[string]string)
  929. m["name"] = clusterName
  930. m["provider"] = "AWS"
  931. m["id"] = env.GetClusterID()
  932. m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
  933. return m, nil
  934. }
  935. maybeClusterId := env.GetAWSClusterID()
  936. if len(maybeClusterId) != 0 {
  937. return makeStructure(maybeClusterId)
  938. }
  939. // TODO: This should be cached, it can take a long time to hit the API
  940. //provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)")
  941. //clusterIdRx := regexp.MustCompile("^kubernetes\\.io/cluster/([^/]+)")
  942. //klog.Infof("nodelist get here %s", time.Now())
  943. //nodeList := awsProvider.Clientset.GetAllNodes()
  944. //klog.Infof("nodelist done here %s", time.Now())
  945. /*for _, n := range nodeList {
  946. region := ""
  947. instanceId := ""
  948. providerId := n.Spec.ProviderID
  949. for matchNum, group := range provIdRx.FindStringSubmatch(providerId) {
  950. if matchNum == 1 {
  951. region = group
  952. } else if matchNum == 2 {
  953. instanceId = group
  954. }
  955. }
  956. if len(instanceId) == 0 {
  957. klog.V(2).Infof("Unable to decode Node.ProviderID \"%s\", skipping it", providerId)
  958. continue
  959. }
  960. c := &aws.Config{
  961. Region: aws.String(region),
  962. }
  963. s := session.Must(session.NewSession(c))
  964. ec2Svc := ec2.New(s)
  965. di, diErr := ec2Svc.DescribeInstances(&ec2.DescribeInstancesInput{
  966. InstanceIds: []*string{
  967. aws.String(instanceId),
  968. },
  969. })
  970. if diErr != nil {
  971. klog.Infof("Error describing instances: %s", diErr)
  972. continue
  973. }
  974. if len(di.Reservations) != 1 {
  975. klog.V(2).Infof("Expected 1 Reservation back from DescribeInstances(%s), received %d", instanceId, len(di.Reservations))
  976. continue
  977. }
  978. res := di.Reservations[0]
  979. if len(res.Instances) != 1 {
  980. klog.V(2).Infof("Expected 1 Instance back from DescribeInstances(%s), received %d", instanceId, len(res.Instances))
  981. continue
  982. }
  983. inst := res.Instances[0]
  984. for _, tag := range inst.Tags {
  985. tagKey := *tag.Key
  986. for matchNum, group := range clusterIdRx.FindStringSubmatch(tagKey) {
  987. if matchNum != 1 {
  988. continue
  989. }
  990. return makeStructure(group)
  991. }
  992. }
  993. }*/
  994. klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
  995. return makeStructure(defaultClusterName)
  996. }
  997. // Gets the aws key id and secret
  998. func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
  999. if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
  1000. aws.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1001. }
  1002. // 1. Check config values first (set from frontend UI)
  1003. if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
  1004. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1005. Message: "AWS ServiceKey exists",
  1006. Status: true,
  1007. }
  1008. return cp.ServiceKeyName, cp.ServiceKeySecret
  1009. }
  1010. // 2. Check for secret
  1011. s, _ := aws.loadAWSAuthSecret(forceReload)
  1012. if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
  1013. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1014. Message: "AWS ServiceKey exists",
  1015. Status: true,
  1016. }
  1017. return s.AccessKeyID, s.SecretAccessKey
  1018. }
  1019. // 3. Fall back to env vars
  1020. if env.GetAWSAccessKeyID() == "" || env.GetAWSAccessKeyID() == "" {
  1021. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1022. Message: "AWS ServiceKey exists",
  1023. Status: false,
  1024. }
  1025. } else {
  1026. aws.ServiceAccountChecks["hasKey"] = &ServiceAccountCheck{
  1027. Message: "AWS ServiceKey exists",
  1028. Status: true,
  1029. }
  1030. }
  1031. return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
  1032. }
  1033. // Load once and cache the result (even on failure). This is an install time secret, so
  1034. // we don't expect the secret to change. If it does, however, we can force reload using
  1035. // the input parameter.
  1036. func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
  1037. if !force && loadedAWSSecret {
  1038. return awsSecret, nil
  1039. }
  1040. loadedAWSSecret = true
  1041. exists, err := util.FileExists(authSecretPath)
  1042. if !exists || err != nil {
  1043. return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
  1044. }
  1045. result, err := ioutil.ReadFile(authSecretPath)
  1046. if err != nil {
  1047. return nil, err
  1048. }
  1049. var ak AWSAccessKey
  1050. err = json.Unmarshal(result, &ak)
  1051. if err != nil {
  1052. return nil, err
  1053. }
  1054. awsSecret = &ak
  1055. return awsSecret, nil
  1056. }
  1057. func (aws *AWS) configureAWSAuth() error {
  1058. accessKeyID := aws.ServiceKeyName
  1059. accessKeySecret := aws.ServiceKeySecret
  1060. if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1061. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1062. if err != nil {
  1063. return err
  1064. }
  1065. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1066. if err != nil {
  1067. return err
  1068. }
  1069. }
  1070. return nil
  1071. }
  1072. func getClusterConfig(ccFile string) (map[string]string, error) {
  1073. clusterConfig, err := os.Open(ccFile)
  1074. if err != nil {
  1075. return nil, err
  1076. }
  1077. defer clusterConfig.Close()
  1078. b, err := ioutil.ReadAll(clusterConfig)
  1079. if err != nil {
  1080. return nil, err
  1081. }
  1082. var clusterConf map[string]string
  1083. err = json.Unmarshal([]byte(b), &clusterConf)
  1084. if err != nil {
  1085. return nil, err
  1086. }
  1087. return clusterConf, nil
  1088. }
  1089. // SetKeyEnv ensures that the two environment variables necessary to configure
  1090. // a new AWS Session are set.
  1091. func (a *AWS) SetKeyEnv() error {
  1092. // TODO add this to the helm chart, mirroring the cost-model
  1093. // configPath := env.GetConfigPath()
  1094. configPath := defaultConfigPath
  1095. path := configPath + "aws.json"
  1096. if _, err := os.Stat(path); err != nil {
  1097. if os.IsNotExist(err) {
  1098. log.DedupedErrorf(5, "file %s does not exist", path)
  1099. } else {
  1100. log.DedupedErrorf(5, "other file open error: %s", err)
  1101. }
  1102. return err
  1103. }
  1104. jsonFile, err := os.Open(path)
  1105. defer jsonFile.Close()
  1106. configMap := map[string]string{}
  1107. configBytes, err := ioutil.ReadAll(jsonFile)
  1108. if err != nil {
  1109. return err
  1110. }
  1111. json.Unmarshal([]byte(configBytes), &configMap)
  1112. keyName := configMap["awsServiceKeyName"]
  1113. keySecret := configMap["awsServiceKeySecret"]
  1114. // These are required before calling NewEnvCredentials below
  1115. env.Set(env.AWSAccessKeyIDEnvVar, keyName)
  1116. env.Set(env.AWSAccessKeySecretEnvVar, keySecret)
  1117. return nil
  1118. }
  1119. func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
  1120. sess, err := session.NewSession(&aws.Config{
  1121. Region: aws.String(region),
  1122. Credentials: credentials.NewEnvCredentials(),
  1123. })
  1124. if err != nil {
  1125. return nil, err
  1126. }
  1127. ec2Svc := ec2.New(sess)
  1128. return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
  1129. }
  1130. func (a *AWS) GetAddresses() ([]byte, error) {
  1131. if err := a.SetKeyEnv(); err != nil {
  1132. return nil, err
  1133. }
  1134. addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
  1135. errorCh := make(chan error, len(awsRegions))
  1136. var wg sync.WaitGroup
  1137. wg.Add(len(awsRegions))
  1138. // Get volumes from each AWS region
  1139. for _, r := range awsRegions {
  1140. // Fetch IP address response and send results and errors to their
  1141. // respective channels
  1142. go func(region string) {
  1143. defer wg.Done()
  1144. defer errors.HandlePanic()
  1145. // Query for first page of volume results
  1146. resp, err := a.getAddressesForRegion(region)
  1147. if err != nil {
  1148. if aerr, ok := err.(awserr.Error); ok {
  1149. switch aerr.Code() {
  1150. default:
  1151. errorCh <- aerr
  1152. }
  1153. return
  1154. } else {
  1155. errorCh <- err
  1156. return
  1157. }
  1158. }
  1159. addressCh <- resp
  1160. }(r)
  1161. }
  1162. // Close the result channels after everything has been sent
  1163. go func() {
  1164. defer errors.HandlePanic()
  1165. wg.Wait()
  1166. close(errorCh)
  1167. close(addressCh)
  1168. }()
  1169. addresses := []*ec2.Address{}
  1170. for adds := range addressCh {
  1171. addresses = append(addresses, adds.Addresses...)
  1172. }
  1173. errors := []error{}
  1174. for err := range errorCh {
  1175. log.DedupedWarningf(5, "unable to get addresses: %s", err)
  1176. errors = append(errors, err)
  1177. }
  1178. // Return error if no addresses are returned
  1179. if len(errors) > 0 && len(addresses) == 0 {
  1180. return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
  1181. }
  1182. // Format the response this way to match the JSON-encoded formatting of a single response
  1183. // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
  1184. // a "Addresss" key at the top level.
  1185. return json.Marshal(map[string][]*ec2.Address{
  1186. "Addresses": addresses,
  1187. })
  1188. }
  1189. func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
  1190. sess, err := session.NewSession(&aws.Config{
  1191. Region: aws.String(region),
  1192. Credentials: credentials.NewEnvCredentials(),
  1193. })
  1194. if err != nil {
  1195. return nil, err
  1196. }
  1197. ec2Svc := ec2.New(sess)
  1198. return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
  1199. MaxResults: &maxResults,
  1200. NextToken: nextToken,
  1201. })
  1202. }
  1203. // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
  1204. func (a *AWS) GetDisks() ([]byte, error) {
  1205. if err := a.SetKeyEnv(); err != nil {
  1206. return nil, err
  1207. }
  1208. volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
  1209. errorCh := make(chan error, len(awsRegions))
  1210. var wg sync.WaitGroup
  1211. wg.Add(len(awsRegions))
  1212. // Get volumes from each AWS region
  1213. for _, r := range awsRegions {
  1214. // Fetch volume response and send results and errors to their
  1215. // respective channels
  1216. go func(region string) {
  1217. defer wg.Done()
  1218. defer errors.HandlePanic()
  1219. // Query for first page of volume results
  1220. resp, err := a.getDisksForRegion(region, 1000, nil)
  1221. if err != nil {
  1222. if aerr, ok := err.(awserr.Error); ok {
  1223. switch aerr.Code() {
  1224. default:
  1225. errorCh <- aerr
  1226. }
  1227. return
  1228. } else {
  1229. errorCh <- err
  1230. return
  1231. }
  1232. }
  1233. volumeCh <- resp
  1234. // A NextToken indicates more pages of results. Keep querying
  1235. // until all pages are retrieved.
  1236. for resp.NextToken != nil {
  1237. resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
  1238. if err != nil {
  1239. if aerr, ok := err.(awserr.Error); ok {
  1240. switch aerr.Code() {
  1241. default:
  1242. errorCh <- aerr
  1243. }
  1244. return
  1245. } else {
  1246. errorCh <- err
  1247. return
  1248. }
  1249. }
  1250. volumeCh <- resp
  1251. }
  1252. }(r)
  1253. }
  1254. // Close the result channels after everything has been sent
  1255. go func() {
  1256. defer errors.HandlePanic()
  1257. wg.Wait()
  1258. close(errorCh)
  1259. close(volumeCh)
  1260. }()
  1261. volumes := []*ec2.Volume{}
  1262. for vols := range volumeCh {
  1263. volumes = append(volumes, vols.Volumes...)
  1264. }
  1265. errors := []error{}
  1266. for err := range errorCh {
  1267. log.DedupedWarningf(5, "unable to get disks: %s", err)
  1268. errors = append(errors, err)
  1269. }
  1270. // Return error if no volumes are returned
  1271. if len(errors) > 0 && len(volumes) == 0 {
  1272. return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
  1273. }
  1274. // Format the response this way to match the JSON-encoded formatting of a single response
  1275. // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
  1276. // a "Volumes" key at the top level.
  1277. return json.Marshal(map[string][]*ec2.Volume{
  1278. "Volumes": volumes,
  1279. })
  1280. }
  1281. // ConvertToGlueColumnFormat takes a string and runs through various regex
  1282. // and string replacement statements to convert it to a format compatible
  1283. // with AWS Glue and Athena column names.
  1284. // Following guidance from AWS provided here ('Column Names' section):
  1285. // https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/run-athena-sql.html
  1286. // It returns a string containing the column name in proper column name format and length.
  1287. func ConvertToGlueColumnFormat(column_name string) string {
  1288. klog.V(5).Infof("Converting string \"%s\" to proper AWS Glue column name.", column_name)
  1289. // An underscore is added in front of uppercase letters
  1290. capital_underscore := regexp.MustCompile(`[A-Z]`)
  1291. final := capital_underscore.ReplaceAllString(column_name, `_$0`)
  1292. // Any non-alphanumeric characters are replaced with an underscore
  1293. no_space_punc := regexp.MustCompile(`[\s]{1,}|[^A-Za-z0-9]`)
  1294. final = no_space_punc.ReplaceAllString(final, "_")
  1295. // Duplicate underscores are removed
  1296. no_dup_underscore := regexp.MustCompile(`_{2,}`)
  1297. final = no_dup_underscore.ReplaceAllString(final, "_")
  1298. // Any leading and trailing underscores are removed
  1299. no_front_end_underscore := regexp.MustCompile(`(^\_|\_$)`)
  1300. final = no_front_end_underscore.ReplaceAllString(final, "")
  1301. // Uppercase to lowercase
  1302. final = strings.ToLower(final)
  1303. // Longer column name than expected - remove _ left to right
  1304. allowed_col_len := 128
  1305. undersc_to_remove := len(final) - allowed_col_len
  1306. if undersc_to_remove > 0 {
  1307. final = strings.Replace(final, "_", "", undersc_to_remove)
  1308. }
  1309. // If removing all of the underscores still didn't
  1310. // make the column name < 128 characters, trim it!
  1311. if len(final) > allowed_col_len {
  1312. final = final[:allowed_col_len]
  1313. }
  1314. klog.V(5).Infof("Column name being returned: \"%s\". Length: \"%d\".", final, len(final))
  1315. return final
  1316. }
  1317. func generateAWSGroupBy(lastIdx int) string {
  1318. sequence := []string{}
  1319. for i := 1; i < lastIdx+1; i++ {
  1320. sequence = append(sequence, strconv.Itoa(i))
  1321. }
  1322. return strings.Join(sequence, ",")
  1323. }
  1324. func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
  1325. customPricing, err := a.GetConfig()
  1326. if err != nil {
  1327. return nil, err
  1328. }
  1329. if customPricing.ServiceKeyName != "" {
  1330. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1331. if err != nil {
  1332. return nil, err
  1333. }
  1334. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1335. if err != nil {
  1336. return nil, err
  1337. }
  1338. }
  1339. region := aws.String(customPricing.AthenaRegion)
  1340. resultsBucket := customPricing.AthenaBucketName
  1341. database := customPricing.AthenaDatabase
  1342. c := &aws.Config{
  1343. Region: region,
  1344. }
  1345. s := session.Must(session.NewSession(c))
  1346. svc := athena.New(s)
  1347. if customPricing.MasterPayerARN != "" {
  1348. creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
  1349. svc = athena.New(s, &aws.Config{
  1350. Region: region,
  1351. Credentials: creds,
  1352. })
  1353. }
  1354. var e athena.StartQueryExecutionInput
  1355. var r athena.ResultConfiguration
  1356. r.SetOutputLocation(resultsBucket)
  1357. e.SetResultConfiguration(&r)
  1358. e.SetQueryString(query)
  1359. var q athena.QueryExecutionContext
  1360. q.SetDatabase(database)
  1361. e.SetQueryExecutionContext(&q)
  1362. res, err := svc.StartQueryExecution(&e)
  1363. if err != nil {
  1364. return nil, err
  1365. }
  1366. klog.V(2).Infof("StartQueryExecution result:")
  1367. klog.V(2).Infof(res.GoString())
  1368. var qri athena.GetQueryExecutionInput
  1369. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1370. var qrop *athena.GetQueryExecutionOutput
  1371. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1372. for {
  1373. qrop, err = svc.GetQueryExecution(&qri)
  1374. if err != nil {
  1375. return nil, err
  1376. }
  1377. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1378. break
  1379. }
  1380. time.Sleep(duration)
  1381. }
  1382. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1383. var ip athena.GetQueryResultsInput
  1384. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1385. return svc.GetQueryResults(&ip)
  1386. } else {
  1387. return nil, fmt.Errorf("No results available for %s", query)
  1388. }
  1389. }
  1390. type SavingsPlanData struct {
  1391. ResourceID string
  1392. EffectiveCost float64
  1393. SavingsPlanARN string
  1394. MostRecentDate string
  1395. }
  1396. func (a *AWS) GetSavingsPlanDataFromAthena() error {
  1397. cfg, err := a.GetConfig()
  1398. if err != nil {
  1399. return err
  1400. }
  1401. if cfg.AthenaBucketName == "" {
  1402. return fmt.Errorf("No Athena Bucket configured")
  1403. }
  1404. if a.SavingsPlanDataByInstanceID == nil {
  1405. a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
  1406. }
  1407. tNow := time.Now()
  1408. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1409. start := tOneDayAgo.Format("2006-01-02")
  1410. end := tNow.Format("2006-01-02")
  1411. q := `SELECT
  1412. line_item_usage_start_date,
  1413. savings_plan_savings_plan_a_r_n,
  1414. line_item_resource_id,
  1415. savings_plan_savings_plan_effective_cost
  1416. FROM %s as cost_data
  1417. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1418. AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY
  1419. line_item_usage_start_date DESC`
  1420. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1421. op, err := a.QueryAthenaBillingData(query)
  1422. if err != nil {
  1423. return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
  1424. }
  1425. klog.Infof("Fetching SavingsPlan data...")
  1426. if len(op.ResultSet.Rows) > 1 {
  1427. a.SavingsPlanDataLock.Lock()
  1428. mostRecentDate := ""
  1429. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1430. d := *r.Data[0].VarCharValue
  1431. if mostRecentDate == "" {
  1432. mostRecentDate = d
  1433. } else if mostRecentDate != d { // Get all most recent assignments
  1434. break
  1435. }
  1436. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1437. if err != nil {
  1438. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1439. }
  1440. r := &SavingsPlanData{
  1441. ResourceID: *r.Data[2].VarCharValue,
  1442. EffectiveCost: cost,
  1443. SavingsPlanARN: *r.Data[1].VarCharValue,
  1444. MostRecentDate: d,
  1445. }
  1446. a.SavingsPlanDataByInstanceID[r.ResourceID] = r
  1447. }
  1448. klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
  1449. for k, r := range a.SavingsPlanDataByInstanceID {
  1450. klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1451. }
  1452. a.SavingsPlanDataLock.Unlock()
  1453. } else {
  1454. klog.Infof("No savings plan applied instance data found")
  1455. }
  1456. return nil
  1457. }
  1458. type RIData struct {
  1459. ResourceID string
  1460. EffectiveCost float64
  1461. ReservationARN string
  1462. MostRecentDate string
  1463. }
  1464. func (a *AWS) GetReservationDataFromAthena() error {
  1465. cfg, err := a.GetConfig()
  1466. if err != nil {
  1467. return err
  1468. }
  1469. if cfg.AthenaBucketName == "" {
  1470. return fmt.Errorf("No Athena Bucket configured")
  1471. }
  1472. if a.RIPricingByInstanceID == nil {
  1473. a.RIPricingByInstanceID = make(map[string]*RIData)
  1474. }
  1475. tNow := time.Now()
  1476. tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1477. start := tOneDayAgo.Format("2006-01-02")
  1478. end := tNow.Format("2006-01-02")
  1479. q := `SELECT
  1480. line_item_usage_start_date,
  1481. reservation_reservation_a_r_n,
  1482. line_item_resource_id,
  1483. reservation_effective_cost
  1484. FROM %s as cost_data
  1485. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
  1486. AND reservation_reservation_a_r_n <> '' ORDER BY
  1487. line_item_usage_start_date DESC`
  1488. query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
  1489. op, err := a.QueryAthenaBillingData(query)
  1490. if err != nil {
  1491. return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
  1492. }
  1493. klog.Infof("Fetching RI data...")
  1494. if len(op.ResultSet.Rows) > 1 {
  1495. a.RIDataLock.Lock()
  1496. mostRecentDate := ""
  1497. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
  1498. d := *r.Data[0].VarCharValue
  1499. if mostRecentDate == "" {
  1500. mostRecentDate = d
  1501. } else if mostRecentDate != d { // Get all most recent assignments
  1502. break
  1503. }
  1504. cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
  1505. if err != nil {
  1506. klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
  1507. }
  1508. r := &RIData{
  1509. ResourceID: *r.Data[2].VarCharValue,
  1510. EffectiveCost: cost,
  1511. ReservationARN: *r.Data[1].VarCharValue,
  1512. MostRecentDate: d,
  1513. }
  1514. a.RIPricingByInstanceID[r.ResourceID] = r
  1515. }
  1516. klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
  1517. for k, r := range a.RIPricingByInstanceID {
  1518. klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
  1519. }
  1520. a.RIDataLock.Unlock()
  1521. } else {
  1522. klog.Infof("No reserved instance data found")
  1523. }
  1524. return nil
  1525. }
  1526. // ExternalAllocations represents tagged assets outside the scope of kubernetes.
  1527. // "start" and "end" are dates of the format YYYY-MM-DD
  1528. // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
  1529. func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
  1530. customPricing, err := a.GetConfig()
  1531. if err != nil {
  1532. return nil, err
  1533. }
  1534. formattedAggregators := []string{}
  1535. for _, agg := range aggregators {
  1536. aggregator_column_name := "resource_tags_user_" + agg
  1537. aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
  1538. formattedAggregators = append(formattedAggregators, aggregator_column_name)
  1539. }
  1540. aggregatorNames := strings.Join(formattedAggregators, ",")
  1541. aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
  1542. aggregatorOr = aggregatorOr + " <> ''"
  1543. filter_column_name := "resource_tags_user_" + filterType
  1544. filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
  1545. var query string
  1546. var lastIdx int
  1547. if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
  1548. lastIdx = len(formattedAggregators) + 3
  1549. groupby := generateAWSGroupBy(lastIdx)
  1550. query = fmt.Sprintf(`SELECT
  1551. CAST(line_item_usage_start_date AS DATE) as start_date,
  1552. %s,
  1553. line_item_product_code,
  1554. %s,
  1555. SUM(line_item_blended_cost) as blended_cost
  1556. FROM %s as cost_data
  1557. WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1558. GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
  1559. } else {
  1560. lastIdx = len(formattedAggregators) + 2
  1561. groupby := generateAWSGroupBy(lastIdx)
  1562. query = fmt.Sprintf(`SELECT
  1563. CAST(line_item_usage_start_date AS DATE) as start_date,
  1564. %s,
  1565. line_item_product_code,
  1566. SUM(line_item_blended_cost) as blended_cost
  1567. FROM %s as cost_data
  1568. WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
  1569. GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
  1570. }
  1571. klog.V(3).Infof("Running Query: %s", query)
  1572. if customPricing.ServiceKeyName != "" {
  1573. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1574. if err != nil {
  1575. return nil, err
  1576. }
  1577. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1578. if err != nil {
  1579. return nil, err
  1580. }
  1581. }
  1582. region := aws.String(customPricing.AthenaRegion)
  1583. resultsBucket := customPricing.AthenaBucketName
  1584. database := customPricing.AthenaDatabase
  1585. c := &aws.Config{
  1586. Region: region,
  1587. }
  1588. s := session.Must(session.NewSession(c))
  1589. svc := athena.New(s)
  1590. var e athena.StartQueryExecutionInput
  1591. var r athena.ResultConfiguration
  1592. r.SetOutputLocation(resultsBucket)
  1593. e.SetResultConfiguration(&r)
  1594. e.SetQueryString(query)
  1595. var q athena.QueryExecutionContext
  1596. q.SetDatabase(database)
  1597. e.SetQueryExecutionContext(&q)
  1598. res, err := svc.StartQueryExecution(&e)
  1599. if err != nil {
  1600. return nil, err
  1601. }
  1602. klog.V(2).Infof("StartQueryExecution result:")
  1603. klog.V(2).Infof(res.GoString())
  1604. var qri athena.GetQueryExecutionInput
  1605. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1606. var qrop *athena.GetQueryExecutionOutput
  1607. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1608. for {
  1609. qrop, err = svc.GetQueryExecution(&qri)
  1610. if err != nil {
  1611. return nil, err
  1612. }
  1613. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1614. break
  1615. }
  1616. time.Sleep(duration)
  1617. }
  1618. var oocAllocs []*OutOfClusterAllocation
  1619. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1620. var ip athena.GetQueryResultsInput
  1621. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1622. op, err := svc.GetQueryResults(&ip)
  1623. if err != nil {
  1624. return nil, err
  1625. }
  1626. if len(op.ResultSet.Rows) > 1 {
  1627. for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows))] {
  1628. cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
  1629. if err != nil {
  1630. return nil, err
  1631. }
  1632. environment := ""
  1633. for _, d := range r.Data[1 : len(formattedAggregators)+1] {
  1634. if *d.VarCharValue != "" {
  1635. environment = *d.VarCharValue // just set to the first nonempty match
  1636. }
  1637. break
  1638. }
  1639. ooc := &OutOfClusterAllocation{
  1640. Aggregator: strings.Join(aggregators, ","),
  1641. Environment: environment,
  1642. Service: *r.Data[len(formattedAggregators)+1].VarCharValue,
  1643. Cost: cost,
  1644. }
  1645. oocAllocs = append(oocAllocs, ooc)
  1646. }
  1647. } else {
  1648. klog.V(1).Infof("No results available for %s at database %s between %s and %s", strings.Join(formattedAggregators, ","), customPricing.AthenaTable, start, end)
  1649. }
  1650. }
  1651. if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
  1652. gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
  1653. if err != nil {
  1654. klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
  1655. }
  1656. gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
  1657. if err != nil {
  1658. klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
  1659. }
  1660. oocAllocs = append(oocAllocs, gcpOOC...)
  1661. }
  1662. return oocAllocs, nil
  1663. }
  1664. // QuerySQL can query a properly configured Athena database.
  1665. // Used to fetch billing data.
  1666. // Requires a json config in /var/configs with key region, output, and database.
  1667. func (a *AWS) QuerySQL(query string) ([]byte, error) {
  1668. customPricing, err := a.GetConfig()
  1669. if err != nil {
  1670. return nil, err
  1671. }
  1672. if customPricing.ServiceKeyName != "" {
  1673. err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
  1674. if err != nil {
  1675. return nil, err
  1676. }
  1677. err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
  1678. if err != nil {
  1679. return nil, err
  1680. }
  1681. }
  1682. athenaConfigs, err := os.Open("/var/configs/athena.json")
  1683. if err != nil {
  1684. return nil, err
  1685. }
  1686. defer athenaConfigs.Close()
  1687. b, err := ioutil.ReadAll(athenaConfigs)
  1688. if err != nil {
  1689. return nil, err
  1690. }
  1691. var athenaConf map[string]string
  1692. json.Unmarshal([]byte(b), &athenaConf)
  1693. region := aws.String(customPricing.AthenaRegion)
  1694. resultsBucket := customPricing.AthenaBucketName
  1695. database := customPricing.AthenaDatabase
  1696. c := &aws.Config{
  1697. Region: region,
  1698. }
  1699. s := session.Must(session.NewSession(c))
  1700. svc := athena.New(s)
  1701. var e athena.StartQueryExecutionInput
  1702. var r athena.ResultConfiguration
  1703. r.SetOutputLocation(resultsBucket)
  1704. e.SetResultConfiguration(&r)
  1705. e.SetQueryString(query)
  1706. var q athena.QueryExecutionContext
  1707. q.SetDatabase(database)
  1708. e.SetQueryExecutionContext(&q)
  1709. res, err := svc.StartQueryExecution(&e)
  1710. if err != nil {
  1711. return nil, err
  1712. }
  1713. klog.V(2).Infof("StartQueryExecution result:")
  1714. klog.V(2).Infof(res.GoString())
  1715. var qri athena.GetQueryExecutionInput
  1716. qri.SetQueryExecutionId(*res.QueryExecutionId)
  1717. var qrop *athena.GetQueryExecutionOutput
  1718. duration := time.Duration(2) * time.Second // Pause for 2 seconds
  1719. for {
  1720. qrop, err = svc.GetQueryExecution(&qri)
  1721. if err != nil {
  1722. return nil, err
  1723. }
  1724. if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
  1725. break
  1726. }
  1727. time.Sleep(duration)
  1728. }
  1729. if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
  1730. var ip athena.GetQueryResultsInput
  1731. ip.SetQueryExecutionId(*res.QueryExecutionId)
  1732. op, err := svc.GetQueryResults(&ip)
  1733. if err != nil {
  1734. return nil, err
  1735. }
  1736. b, err := json.Marshal(op.ResultSet)
  1737. if err != nil {
  1738. return nil, err
  1739. }
  1740. return b, nil
  1741. }
  1742. return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
  1743. }
  1744. type spotInfo struct {
  1745. Timestamp string `csv:"Timestamp"`
  1746. UsageType string `csv:"UsageType"`
  1747. Operation string `csv:"Operation"`
  1748. InstanceID string `csv:"InstanceID"`
  1749. MyBidID string `csv:"MyBidID"`
  1750. MyMaxPrice string `csv:"MyMaxPrice"`
  1751. MarketPrice string `csv:"MarketPrice"`
  1752. Charge string `csv:"Charge"`
  1753. Version string `csv:"Version"`
  1754. }
  1755. type fnames []*string
  1756. func (f fnames) Len() int {
  1757. return len(f)
  1758. }
  1759. func (f fnames) Swap(i, j int) {
  1760. f[i], f[j] = f[j], f[i]
  1761. }
  1762. func (f fnames) Less(i, j int) bool {
  1763. key1 := strings.Split(*f[i], ".")
  1764. key2 := strings.Split(*f[j], ".")
  1765. t1, err := time.Parse("2006-01-02-15", key1[1])
  1766. if err != nil {
  1767. klog.V(1).Info("Unable to parse timestamp" + key1[1])
  1768. return false
  1769. }
  1770. t2, err := time.Parse("2006-01-02-15", key2[1])
  1771. if err != nil {
  1772. klog.V(1).Info("Unable to parse timestamp" + key2[1])
  1773. return false
  1774. }
  1775. return t1.Before(t2)
  1776. }
  1777. func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
  1778. if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
  1779. a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
  1780. }
  1781. // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
  1782. if accessKeyID != "" && accessKeySecret != "" {
  1783. err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
  1784. if err != nil {
  1785. return nil, err
  1786. }
  1787. err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
  1788. if err != nil {
  1789. return nil, err
  1790. }
  1791. }
  1792. s3Prefix := projectID
  1793. if len(prefix) != 0 {
  1794. s3Prefix = prefix + "/" + s3Prefix
  1795. }
  1796. c := aws.NewConfig().WithRegion(region)
  1797. s := session.Must(session.NewSession(c))
  1798. s3Svc := s3.New(s)
  1799. downloader := s3manager.NewDownloaderWithClient(s3Svc)
  1800. tNow := time.Now()
  1801. tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
  1802. ls := &s3.ListObjectsInput{
  1803. Bucket: aws.String(bucket),
  1804. Prefix: aws.String(s3Prefix + "." + tOneDayAgo.Format("2006-01-02")),
  1805. }
  1806. ls2 := &s3.ListObjectsInput{
  1807. Bucket: aws.String(bucket),
  1808. Prefix: aws.String(s3Prefix + "." + tNow.Format("2006-01-02")),
  1809. }
  1810. lso, err := s3Svc.ListObjects(ls)
  1811. if err != nil {
  1812. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1813. Message: "Bucket List Permissions Available",
  1814. Status: false,
  1815. AdditionalInfo: err.Error(),
  1816. }
  1817. return nil, err
  1818. } else {
  1819. a.ServiceAccountChecks["bucketList"] = &ServiceAccountCheck{
  1820. Message: "Bucket List Permissions Available",
  1821. Status: true,
  1822. }
  1823. }
  1824. lsoLen := len(lso.Contents)
  1825. klog.V(2).Infof("Found %d spot data files from yesterday", lsoLen)
  1826. if lsoLen == 0 {
  1827. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls.Bucket, *ls.Prefix)
  1828. }
  1829. lso2, err := s3Svc.ListObjects(ls2)
  1830. if err != nil {
  1831. return nil, err
  1832. }
  1833. lso2Len := len(lso2.Contents)
  1834. klog.V(2).Infof("Found %d spot data files from today", lso2Len)
  1835. if lso2Len == 0 {
  1836. klog.V(5).Infof("ListObjects \"s3://%s/%s\" produced no keys", *ls2.Bucket, *ls2.Prefix)
  1837. }
  1838. // TODO: Worth it to use LastModifiedDate to determine if we should reparse the spot data?
  1839. var keys []*string
  1840. for _, obj := range lso.Contents {
  1841. keys = append(keys, obj.Key)
  1842. }
  1843. for _, obj := range lso2.Contents {
  1844. keys = append(keys, obj.Key)
  1845. }
  1846. versionRx := regexp.MustCompile("^#Version: (\\d+)\\.\\d+$")
  1847. header, err := csvutil.Header(spotInfo{}, "csv")
  1848. if err != nil {
  1849. return nil, err
  1850. }
  1851. fieldsPerRecord := len(header)
  1852. spots := make(map[string]*spotInfo)
  1853. for _, key := range keys {
  1854. getObj := &s3.GetObjectInput{
  1855. Bucket: aws.String(bucket),
  1856. Key: key,
  1857. }
  1858. buf := aws.NewWriteAtBuffer([]byte{})
  1859. _, err := downloader.Download(buf, getObj)
  1860. if err != nil {
  1861. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1862. Message: "Object Get Permissions Available",
  1863. Status: false,
  1864. AdditionalInfo: err.Error(),
  1865. }
  1866. return nil, err
  1867. } else {
  1868. a.ServiceAccountChecks["objectList"] = &ServiceAccountCheck{
  1869. Message: "Object Get Permissions Available",
  1870. Status: true,
  1871. }
  1872. }
  1873. r := bytes.NewReader(buf.Bytes())
  1874. gr, err := gzip.NewReader(r)
  1875. if err != nil {
  1876. return nil, err
  1877. }
  1878. csvReader := csv.NewReader(gr)
  1879. csvReader.Comma = '\t'
  1880. csvReader.FieldsPerRecord = fieldsPerRecord
  1881. dec, err := csvutil.NewDecoder(csvReader, header...)
  1882. if err != nil {
  1883. return nil, err
  1884. }
  1885. var foundVersion string
  1886. for {
  1887. spot := spotInfo{}
  1888. err := dec.Decode(&spot)
  1889. csvParseErr, isCsvParseErr := err.(*csv.ParseError)
  1890. if err == io.EOF {
  1891. break
  1892. } else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
  1893. rec := dec.Record()
  1894. // the first two "Record()" will be the comment lines
  1895. // and they show up as len() == 1
  1896. // the first of which is "#Version"
  1897. // the second of which is "#Fields: "
  1898. if len(rec) != 1 {
  1899. klog.V(2).Infof("Expected %d spot info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
  1900. continue
  1901. }
  1902. if len(foundVersion) == 0 {
  1903. spotFeedVersion := rec[0]
  1904. klog.V(4).Infof("Spot feed version is \"%s\"", spotFeedVersion)
  1905. matches := versionRx.FindStringSubmatch(spotFeedVersion)
  1906. if matches != nil {
  1907. foundVersion = matches[1]
  1908. if foundVersion != supportedSpotFeedVersion {
  1909. klog.V(2).Infof("Unsupported spot info feed version: wanted \"%s\" got \"%s\"", supportedSpotFeedVersion, foundVersion)
  1910. break
  1911. }
  1912. }
  1913. continue
  1914. } else if strings.Index(rec[0], "#") == 0 {
  1915. continue
  1916. } else {
  1917. klog.V(3).Infof("skipping non-TSV line: %s", rec)
  1918. continue
  1919. }
  1920. } else if err != nil {
  1921. klog.V(2).Infof("Error during spot info decode: %+v", err)
  1922. continue
  1923. }
  1924. log.DedupedInfof(5, "Found spot info for: %s", spot.InstanceID)
  1925. spots[spot.InstanceID] = &spot
  1926. }
  1927. gr.Close()
  1928. }
  1929. return spots, nil
  1930. }
  1931. func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
  1932. /*
  1933. numReserved := len(a.ReservedInstances)
  1934. // Early return if no reserved instance data loaded
  1935. if numReserved == 0 {
  1936. klog.V(4).Infof("[Reserved] No Reserved Instances")
  1937. return
  1938. }
  1939. cfg, err := a.GetConfig()
  1940. defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
  1941. if err != nil {
  1942. klog.V(3).Infof("Could not parse default cpu price")
  1943. defaultCPU = 0.031611
  1944. }
  1945. defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
  1946. if err != nil {
  1947. klog.V(3).Infof("Could not parse default ram price")
  1948. defaultRAM = 0.004237
  1949. }
  1950. cpuToRAMRatio := defaultCPU / defaultRAM
  1951. now := time.Now()
  1952. instances := make(map[string][]*AWSReservedInstance)
  1953. for _, r := range a.ReservedInstances {
  1954. if now.Before(r.StartDate) || now.After(r.EndDate) {
  1955. klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
  1956. continue
  1957. }
  1958. _, ok := instances[r.Region]
  1959. if !ok {
  1960. instances[r.Region] = []*AWSReservedInstance{r}
  1961. } else {
  1962. instances[r.Region] = append(instances[r.Region], r)
  1963. }
  1964. }
  1965. awsNodes := make(map[string]*v1.Node)
  1966. currentNodes := a.Clientset.GetAllNodes()
  1967. // Create a node name -> node map
  1968. for _, awsNode := range currentNodes {
  1969. awsNodes[awsNode.GetName()] = awsNode
  1970. }
  1971. // go through all provider nodes using k8s nodes for region
  1972. for nodeName, node := range nodes {
  1973. // Reset reserved allocation to prevent double allocation
  1974. node.Reserved = nil
  1975. kNode, ok := awsNodes[nodeName]
  1976. if !ok {
  1977. klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
  1978. continue
  1979. }
  1980. nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
  1981. if !ok {
  1982. klog.V(1).Infof("[Reserved] Could not find node region")
  1983. continue
  1984. }
  1985. reservedInstances, ok := instances[nodeRegion]
  1986. if !ok {
  1987. klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
  1988. continue
  1989. }
  1990. // Determine the InstanceType of the node
  1991. instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
  1992. if !ok {
  1993. continue
  1994. }
  1995. ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
  1996. if err != nil {
  1997. continue
  1998. }
  1999. ramGB := ramBytes / 1024 / 1024 / 1024
  2000. cpu, err := strconv.ParseFloat(node.VCPU, 64)
  2001. if err != nil {
  2002. continue
  2003. }
  2004. ramMultiple := cpu*cpuToRAMRatio + ramGB
  2005. node.Reserved = &ReservedInstanceData{
  2006. ReservedCPU: 0,
  2007. ReservedRAM: 0,
  2008. }
  2009. for i, reservedInstance := range reservedInstances {
  2010. if reservedInstance.InstanceType == instanceType {
  2011. // Use < 0 to mark as ALL
  2012. node.Reserved.ReservedCPU = -1
  2013. node.Reserved.ReservedRAM = -1
  2014. // Set Costs based on CPU/RAM ratios
  2015. ramPrice := reservedInstance.PricePerHour / ramMultiple
  2016. node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
  2017. node.Reserved.RAMCost = ramPrice
  2018. // Remove the reserve from the temporary slice to prevent
  2019. // being reallocated
  2020. instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
  2021. break
  2022. }
  2023. }
  2024. }*/
  2025. }
  2026. type AWSReservedInstance struct {
  2027. Zone string
  2028. Region string
  2029. InstanceType string
  2030. InstanceCount int64
  2031. InstanceTenacy string
  2032. StartDate time.Time
  2033. EndDate time.Time
  2034. PricePerHour float64
  2035. }
  2036. func (ari *AWSReservedInstance) String() string {
  2037. return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
  2038. }
  2039. func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
  2040. return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
  2041. }
  2042. func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
  2043. var pricePerHour float64
  2044. if len(ri.RecurringCharges) > 0 {
  2045. for _, rc := range ri.RecurringCharges {
  2046. if isReservedInstanceHourlyPrice(rc) {
  2047. pricePerHour = *rc.Amount
  2048. break
  2049. }
  2050. }
  2051. }
  2052. // If we're still unable to resolve hourly price, try fixed -> hourly
  2053. if pricePerHour == 0 {
  2054. if ri.Duration != nil && ri.FixedPrice != nil {
  2055. var durHours float64
  2056. durSeconds := float64(*ri.Duration)
  2057. fixedPrice := float64(*ri.FixedPrice)
  2058. if durSeconds != 0 && fixedPrice != 0 {
  2059. durHours = durSeconds / 60 / 60
  2060. pricePerHour = fixedPrice / durHours
  2061. }
  2062. }
  2063. }
  2064. if pricePerHour == 0 {
  2065. return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
  2066. }
  2067. return pricePerHour, nil
  2068. }
  2069. func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
  2070. c := &aws.Config{
  2071. Region: aws.String(region),
  2072. }
  2073. s := session.Must(session.NewSession(c))
  2074. svc := ec2.New(s)
  2075. response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
  2076. if err != nil {
  2077. return nil, err
  2078. }
  2079. var reservedInstances []*AWSReservedInstance
  2080. for _, ri := range response.ReservedInstances {
  2081. var zone string
  2082. if ri.AvailabilityZone != nil {
  2083. zone = *ri.AvailabilityZone
  2084. }
  2085. pricePerHour, err := getReservedInstancePrice(ri)
  2086. if err != nil {
  2087. klog.V(1).Infof("Error Resolving Price: %s", err.Error())
  2088. continue
  2089. }
  2090. reservedInstances = append(reservedInstances, &AWSReservedInstance{
  2091. Zone: zone,
  2092. Region: region,
  2093. InstanceType: *ri.InstanceType,
  2094. InstanceCount: *ri.InstanceCount,
  2095. InstanceTenacy: *ri.InstanceTenancy,
  2096. StartDate: *ri.Start,
  2097. EndDate: *ri.End,
  2098. PricePerHour: pricePerHour,
  2099. })
  2100. }
  2101. return reservedInstances, nil
  2102. }
  2103. func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
  2104. err := a.configureAWSAuth()
  2105. if err != nil {
  2106. return nil, fmt.Errorf("Error Configuring aws auth: %s", err.Error())
  2107. }
  2108. var reservedInstances []*AWSReservedInstance
  2109. nodes := a.Clientset.GetAllNodes()
  2110. regionsSeen := make(map[string]bool)
  2111. for _, node := range nodes {
  2112. region, ok := node.Labels[v1.LabelZoneRegion]
  2113. if !ok {
  2114. continue
  2115. }
  2116. if regionsSeen[region] {
  2117. continue
  2118. }
  2119. ris, err := getRegionReservedInstances(region)
  2120. if err != nil {
  2121. klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
  2122. continue
  2123. }
  2124. regionsSeen[region] = true
  2125. reservedInstances = append(reservedInstances, ris...)
  2126. }
  2127. return reservedInstances, nil
  2128. }
  2129. func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
  2130. checks := []*ServiceAccountCheck{}
  2131. for _, v := range a.ServiceAccountChecks {
  2132. checks = append(checks, v)
  2133. }
  2134. return &ServiceAccountStatus{
  2135. Checks: checks,
  2136. }
  2137. }